1 use futures_util::{FutureExt, StreamExt, SinkExt};
4 // GET /hello/warp => 200 OK with body "Hello, warp!"
5 async fn hello(name: String, agent: String) -> Result<impl warp::Reply, warp::Rejection> {
6 Ok(format!("Hello, {} from {}!", name, agent))
9 // websocat ws://127.0.0.1:3030/ws-echo
10 async fn ws_echo_connected(websocket: warp::ws::WebSocket) {
11 // echo all messages back
12 let (tx, rx) = websocket.split();
13 rx.forward(tx).map(|result| {
14 if let Err(e) = result {
15 log::warn!("websocket error: {:?}", e);
20 // websocat ws://127.0.0.1:3030/ws-rev
21 async fn ws_rev_connected(websocket: warp::ws::WebSocket) {
22 // echo all messages back
23 tokio::task::spawn(async {
24 let (mut tx, mut rx) = websocket.split();
25 while let Some(message) = rx.next().await {
26 let msg = match message {
29 log::error!("websocket error: {}", e);
33 log::info!("ws_rev_connected: got message: {:?}", msg);
39 let text = msg.to_str().unwrap();
40 let rev = text.chars().rev().collect::<String>();
41 if let Err(e) = tx.send(warp::ws::Message::text(rev)).await {
43 log::info!("peer disconnected: {}", e);
48 let mut rev = msg.into_bytes();
50 if let Err(e) = tx.send(warp::ws::Message::binary(rev)).await {
52 log::info!("peer disconnected: {}", e);
57 log::info!("ws_rev_connected ended");
61 pub fn api() -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
62 let hello = warp::path!("hello" / String)
63 .and(warp::header::<String>("user-agent"))
66 let ws_echo = warp::path("ws-echo")
68 .map(|ws: warp::ws::Ws| { ws.on_upgrade(ws_echo_connected) });
70 let ws_rev = warp::path("ws-rev")
72 .map(|ws: warp::ws::Ws| { ws.on_upgrade(ws_rev_connected) });
74 let static_dir = warp::path("dir")
75 .and(warp::fs::dir("../static"));
81 .with(warp::log("warp-server"))
86 env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
89 .run(([127, 0, 0, 1], 3030))
96 async fn test_hello() {
97 let res = warp::test::request()
99 .header("user-agent", "TestBrowser 0.1")
100 .reply(&super::api())
102 assert_eq!(res.status(), 200);
103 assert_eq!(res.body(), "Hello, rust from TestBrowser 0.1!");
107 async fn test_ws_echo() {
108 let mut client = warp::test::ws()
110 .handshake(super::api())
112 .expect("handshake failed");
115 client.send_text("Hello").await;
116 let reply = client.recv().await.unwrap();
117 assert_eq!(reply.to_str().unwrap(), "Hello");
120 let msg: Vec<u8> = vec![42, 99];
121 client.send(warp::ws::Message::binary(msg.clone())).await;
122 let reply = client.recv().await.unwrap();
123 assert_eq!(reply.as_bytes(), &msg);
126 client.send(warp::ws::Message::close()).await;
127 client.recv_closed().await.unwrap();
131 async fn test_ws_rev() {
132 let mut client = warp::test::ws()
134 .handshake(super::api())
136 .expect("handshake failed");
139 client.send_text("Hello\n").await;
140 let reply = client.recv().await.unwrap();
141 assert_eq!(reply.to_str().unwrap(), "\nolleH");
144 let msg: Vec<u8> = vec![42, 99];
145 client.send(warp::ws::Message::binary(msg)).await;
146 let reply = client.recv().await.unwrap();
147 assert_eq!(reply.as_bytes(), vec![99, 42]);
150 client.send(warp::ws::Message::close()).await;
151 client.recv_closed().await.unwrap();
155 async fn test_static_dir() {
156 let res = warp::test::request()
157 .path("/dir/plain.txt")
158 .reply(&super::api())
160 assert_eq!(res.status(), 200);
161 assert_eq!(res.body(), "Hello world! This is uncompressed text.\n");
164 let res = warp::test::request()
165 .path("/dir/dir1/optzip.txt")
166 .reply(&super::api())
168 assert_eq!(res.status(), 200);
169 assert_eq!(res.body(), "This file is available uncompressed or compressed\n\
170 AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\n");
172 // fs::dir does not support transparent decompression
173 let res = warp::test::request()
174 .path("/dir/onlycompressed.txt")
175 .reply(&super::api())
177 assert_eq!(res.status(), 404);