use bytes::Bytes;
use mini_redis::client;
-use tokio::sync::mpsc;
+use tokio::sync::{ mpsc, oneshot };
+
+/// Provided by the requester and used by the manager task to send
+/// the command response back to the requester.
+type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;
#[derive(Debug)]
enum Command {
Get {
key: String,
+ resp: Responder<Option<Bytes>>,
},
Set {
key: String,
val: Bytes,
+ resp: Responder<()>,
}
}
while let Some(cmd) = manager_rx.recv().await {
match cmd {
- Command::Get { key } => { client.get(&key).await.unwrap(); }
- Command::Set { key, val } => { client.set(&key, val).await.unwrap(); }
+ Command::Get { key, resp } => { resp.send(client.get(&key).await).unwrap(); }
+ Command::Set { key, val, resp } => { resp.send(client.set(&key, val).await).unwrap(); }
}
}
});
let manager_tx2 = manager_tx.clone();
// Spawn two tasks, one gets a key, the other sets a key
let t1 = tokio::spawn(async move {
- manager_tx.send(Command::Get { key: "hello".to_string() }).await.unwrap();
+ let (resp_tx, resp_rx) = oneshot::channel();
+ manager_tx.send(Command::Get { key: "hello".to_string(), resp: resp_tx }).await.unwrap();
+ let res = resp_rx.await;
+ println!("t1: got {:?}", res);
});
let t2 = tokio::spawn(async move {
- manager_tx2.send(Command::Set { key: "hello".to_string(), val: "bar".into() }).await.unwrap();
+ let (resp_tx, resp_rx) = oneshot::channel();
+ manager_tx2.send(Command::Set { key: "hello".to_string(), val: "bar".into(), resp: resp_tx }).await.unwrap();
+ let res = resp_rx.await.unwrap();
+ println!("t2: got {:?}", res);
});
t1.await.unwrap();