Receive responses
The final step is to receive the response back from the manager task. The GET
command needs to get the value and the SET
command needs to know if the operation completed successfully.
To pass the response, an oneshot
channel is used. The oneshot
channel is a single-producer, single-consumer channel optimized for sending a single value. In our case, the single value is the response.
Similar to mpsc
, oneshot::channel()
returns a sender and receiver handle.
use tokio::sync::oneshot;
let (tx, rx) = oneshot::channel();
Unlike mpsc
, no capacity is specified as the capacity is always one. Additionally, neither handle can be cloned.
To receive responses from the manager task, before sending a command, a oneshot
channel is created. The Sender
half of the channel is included in the command to the manager task. The receive half is used to receive the response.
First, update Command
to include the Sender
. For convenience, a type alias is used to reference the Sender
.
use tokio::sync::oneshot;
use bytes::Bytes;
/// Multiple different commands are multiplexed over a single channel.
#[derive(Debug)]
enum Command {
Get {
key: String,
resp: Responder<Option<Bytes>>,
},
Set {
key: String,
val: Vec<u8>,
resp: Responder<()>,
},
}
/// Provided by the requester and used by the manager task to send
/// the command response back to the requester.
type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;
Now, update the tasks issuing the commands to include the oneshot::Sender
.
let t1 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Get {
key: "hello".to_string(),
resp: resp_tx,
};
// Send the GET request
tx.send(cmd).await.unwrap();
// Await the response
let res = resp_rx.await;
println!("GOT = {:?}", res);
});
let t2 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Set {
key: "foo".to_string(),
val: b"bar".to_vec(),
resp: resp_tx,
};
// Send the SET request
tx2.send(cmd).await.unwrap();
// Await the response
let res = resp_rx.await;
println!("GOT = {:?}", res);
});
Finally, update the manager task to send the response over the oneshot
channel.
while let Some(cmd) = rx.recv().await {
match cmd {
Command::Get { key, resp } => {
let res = client.get(&key).await;
// Ignore errors
let _ = resp.send(res);
}
Command::Set { key, val, resp } => {
let res = client.set(&key, val.into()).await;
// Ignore errors
let _ = resp.send(res);
}
}
}
Calling send
on oneshot::Sender
completes immediately and does not require an .await
. This is because send
on an oneshot
channel will always fail or succeed immediately without any form of waiting.
Sending a value on a oneshot channel returns Err
when the receiver half has dropped. This indicates the receiver is no longer interested in the response. In our scenario, the receiver cancelling interest is an acceptable event. The Err
returned by resp.send(...)
does not need to be handled.
You can find the entire code here.