06b289af1405e0ed5be440a54a126134aea6bd0d
[learn-rust.git] / warp-server / src / main.rs
1 use futures_util::{FutureExt, StreamExt, SinkExt};
2 use warp::Filter;
3
4 // GET /hello/warp => 200 OK with body "Hello, warp!"
5 async fn hello(name: String, agent: String) -> Result<impl warp::Reply, warp::Rejection> {
6     Ok(format!("Hello, {} from {}!", name, agent))
7 }
8
9 // websocat ws://127.0.0.1:3030/ws-echo
10 async fn ws_echo_connected(websocket: warp::ws::WebSocket) {
11     // echo all messages back
12     let (tx, rx) = websocket.split();
13     rx.forward(tx).map(|result| {
14         if let Err(e) = result {
15             log::warn!("websocket error: {:?}", e);
16         }
17     }).await;
18 }
19
20 // websocat ws://127.0.0.1:3030/ws-rev
21 async fn ws_rev_connected(websocket: warp::ws::WebSocket) {
22     // echo all messages back
23     tokio::task::spawn(async {
24         let (mut tx, mut rx) = websocket.split();
25         while let Some(message) = rx.next().await {
26             let msg = match message {
27                 Ok(msg) =>  msg,
28                 Err(e) => {
29                     log::error!("websocket error: {}", e);
30                     break;
31                 }
32             };
33             log::info!("ws_rev_connected: got message: {:?}", msg);
34
35             if msg.is_close() {
36                 break;
37             }
38             if msg.is_text() {
39                 let text = msg.to_str().unwrap();
40                 let rev = text.chars().rev().collect::<String>();
41                 if let Err(e) = tx.send(warp::ws::Message::text(rev)).await {
42                     // disconnected
43                     log::info!("peer disconnected: {}", e);
44                     break;
45                 }
46             }
47             if msg.is_binary() {
48                 let mut rev = msg.into_bytes();
49                 rev.reverse();
50                 if let Err(e) = tx.send(warp::ws::Message::binary(rev)).await {
51                     // disconnected
52                     log::info!("peer disconnected: {}", e);
53                     break;
54                 }
55             }
56         }
57         log::info!("ws_rev_connected ended");
58     });
59 }
60
61 pub fn api() -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
62     let hello = warp::path!("hello" / String)
63         .and(warp::header::<String>("user-agent"))
64         .and_then(hello);
65
66     let ws_echo = warp::path("ws-echo")
67         .and(warp::ws())
68         .map(|ws: warp::ws::Ws| { ws.on_upgrade(ws_echo_connected) });
69
70     let ws_rev = warp::path("ws-rev")
71         .and(warp::ws())
72         .map(|ws: warp::ws::Ws| { ws.on_upgrade(ws_rev_connected) });
73
74     let static_dir = warp::path("dir")
75         .and(warp::fs::dir("../static"));
76
77     hello
78         .or(ws_echo)
79         .or(ws_rev)
80         .or(static_dir)
81         .with(warp::log("warp-server"))
82 }
83
84 #[tokio::main]
85 async fn main() {
86     env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
87
88     warp::serve(api())
89         .run(([127, 0, 0, 1], 3030))
90         .await;
91 }
92
93 #[cfg(test)]
94 mod tests {
95     #[tokio::test]
96     async fn test_hello() {
97         let res = warp::test::request()
98             .path("/hello/rust")
99             .header("user-agent", "TestBrowser 0.1")
100             .reply(&super::api())
101             .await;
102         assert_eq!(res.status(), 200);
103         assert_eq!(res.body(), "Hello, rust from TestBrowser 0.1!");
104     }
105
106     #[tokio::test]
107     async fn test_ws_echo() {
108         let mut client = warp::test::ws()
109             .path("/ws-echo")
110             .handshake(super::api())
111             .await
112             .expect("handshake failed");
113
114         // text
115         client.send_text("Hello").await;
116         let reply = client.recv().await.unwrap();
117         assert_eq!(reply.to_str().unwrap(), "Hello");
118
119         // binary
120         let msg: Vec<u8> = vec![42, 99];
121         client.send(warp::ws::Message::binary(msg.clone())).await;
122         let reply = client.recv().await.unwrap();
123         assert_eq!(reply.as_bytes(), &msg);
124
125         //close
126         client.send(warp::ws::Message::close()).await;
127         client.recv_closed().await.unwrap();
128     }
129
130     #[tokio::test]
131     async fn test_ws_rev() {
132         let mut client = warp::test::ws()
133             .path("/ws-rev")
134             .handshake(super::api())
135             .await
136             .expect("handshake failed");
137
138         // text
139         client.send_text("Hello\n").await;
140         let reply = client.recv().await.unwrap();
141         assert_eq!(reply.to_str().unwrap(), "\nolleH");
142
143         // binary
144         let msg: Vec<u8> = vec![42, 99];
145         client.send(warp::ws::Message::binary(msg)).await;
146         let reply = client.recv().await.unwrap();
147         assert_eq!(reply.as_bytes(), vec![99, 42]);
148
149         //close
150         client.send(warp::ws::Message::close()).await;
151         client.recv_closed().await.unwrap();
152     }
153
154     #[tokio::test]
155     async fn test_static_dir() {
156         let res = warp::test::request()
157             .path("/dir/plain.txt")
158             .reply(&super::api())
159             .await;
160         assert_eq!(res.status(), 200);
161         assert_eq!(res.body(), "Hello world! This is uncompressed text.\n");
162
163         // subdir
164         let res = warp::test::request()
165             .path("/dir/dir1/optzip.txt")
166             .reply(&super::api())
167             .await;
168         assert_eq!(res.status(), 200);
169         assert_eq!(res.body(), "This file is available uncompressed or compressed\n\
170                                 AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\n");
171
172         // fs::dir does not support transparent decompression
173         let res = warp::test::request()
174             .path("/dir/onlycompressed.txt")
175             .reply(&super::api())
176             .await;
177         assert_eq!(res.status(), 404);
178     }
179 }