]> piware.de Git - learn-rust.git/blob - warp-server/src/main.rs
warp-server: Add unit tests for websocket routes
[learn-rust.git] / warp-server / src / main.rs
1 use futures_util::{FutureExt, StreamExt, SinkExt};
2 use warp::Filter;
3
4 // GET /hello/warp => 200 OK with body "Hello, warp!"
5 async fn hello(name: String, agent: String) -> Result<impl warp::Reply, warp::Rejection> {
6     Ok(format!("Hello, {} from {}!", name, agent))
7 }
8
9 // websocat ws://127.0.0.1:3030/ws-echo
10 async fn ws_echo_connected(websocket: warp::ws::WebSocket) {
11     // echo all messages back
12     let (tx, rx) = websocket.split();
13     rx.forward(tx).map(|result| {
14         if let Err(e) = result {
15             log::warn!("websocket error: {:?}", e);
16         }
17     }).await;
18 }
19
20 // websocat ws://127.0.0.1:3030/ws-rev
21 async fn ws_rev_connected(websocket: warp::ws::WebSocket) {
22     // echo all messages back
23     tokio::task::spawn(async {
24         let (mut tx, mut rx) = websocket.split();
25         while let Some(message) = rx.next().await {
26             let msg = match message {
27                 Ok(msg) =>  msg,
28                 Err(e) => {
29                     log::error!("websocket error: {}", e);
30                     break;
31                 }
32             };
33             log::info!("ws_rev_connected: got message: {:?}", msg);
34
35             if msg.is_close() {
36                 break;
37             }
38             if msg.is_text() {
39                 let text = msg.to_str().unwrap();
40                 let rev = text.chars().rev().collect::<String>();
41                 if let Err(e) = tx.send(warp::ws::Message::text(rev)).await {
42                     // disconnected
43                     log::info!("peer disconnected: {}", e);
44                     break;
45                 }
46             }
47             if msg.is_binary() {
48                 let mut rev = msg.into_bytes();
49                 rev.reverse();
50                 if let Err(e) = tx.send(warp::ws::Message::binary(rev)).await {
51                     // disconnected
52                     log::info!("peer disconnected: {}", e);
53                     break;
54                 }
55             }
56         }
57         log::info!("ws_rev_connected ended");
58     });
59 }
60
61 pub fn api() -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
62     let hello = warp::path!("hello" / String)
63         .and(warp::header::<String>("user-agent"))
64         .and_then(hello);
65
66     let ws_echo = warp::path("ws-echo")
67         .and(warp::ws())
68         .map(|ws: warp::ws::Ws| { ws.on_upgrade(ws_echo_connected) });
69
70     let ws_rev = warp::path("ws-rev")
71         .and(warp::ws())
72         .map(|ws: warp::ws::Ws| { ws.on_upgrade(ws_rev_connected) });
73
74     hello
75         .or(ws_echo)
76         .or(ws_rev)
77         .with(warp::log("warp-server"))
78 }
79
80 #[tokio::main]
81 async fn main() {
82     env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
83
84     warp::serve(api())
85         .run(([127, 0, 0, 1], 3030))
86         .await;
87 }
88
89 #[cfg(test)]
90 mod tests {
91     #[tokio::test]
92     async fn test_hello() {
93         let res = warp::test::request()
94             .path("/hello/rust")
95             .header("user-agent", "TestBrowser 0.1")
96             .reply(&super::api())
97             .await;
98         assert_eq!(res.status(), 200);
99         assert_eq!(res.body(), "Hello, rust from TestBrowser 0.1!");
100     }
101
102     #[tokio::test]
103     async fn test_ws_echo() {
104         let mut client = warp::test::ws()
105             .path("/ws-echo")
106             .handshake(super::api())
107             .await
108             .expect("handshake failed");
109
110         // text
111         client.send_text("Hello").await;
112         let reply = client.recv().await.unwrap();
113         assert_eq!(reply.to_str().unwrap(), "Hello");
114
115         // binary
116         let msg: Vec<u8> = vec![42, 99];
117         client.send(warp::ws::Message::binary(msg.clone())).await;
118         let reply = client.recv().await.unwrap();
119         assert_eq!(reply.as_bytes(), &msg);
120
121         //close
122         client.send(warp::ws::Message::close()).await;
123         client.recv_closed().await.unwrap();
124     }
125
126     #[tokio::test]
127     async fn test_ws_rev() {
128         let mut client = warp::test::ws()
129             .path("/ws-rev")
130             .handshake(super::api())
131             .await
132             .expect("handshake failed");
133
134         // text
135         client.send_text("Hello\n").await;
136         let reply = client.recv().await.unwrap();
137         assert_eq!(reply.to_str().unwrap(), "\nolleH");
138
139         // binary
140         let msg: Vec<u8> = vec![42, 99];
141         client.send(warp::ws::Message::binary(msg)).await;
142         let reply = client.recv().await.unwrap();
143         assert_eq!(reply.as_bytes(), vec![99, 42]);
144
145         //close
146         client.send(warp::ws::Message::close()).await;
147         client.recv_closed().await.unwrap();
148     }
149 }