]> piware.de Git - learn-rust.git/blob - warp-server/src/main.rs
b23f2af1d39e2af95dc304b96264ed33dc9e457c
[learn-rust.git] / warp-server / src / main.rs
1 mod handlers {
2     use futures_util::{FutureExt, StreamExt, SinkExt};
3
4     // GET /hello/warp => 200 OK with body "Hello, warp!"
5     pub async fn hello(name: String, agent: String) -> Result<impl warp::Reply, warp::Rejection> {
6         Ok(format!("Hello, {} from {}!", name, agent))
7     }
8
9     // websocat ws://127.0.0.1:3030/ws-echo
10     pub async fn ws_echo_connected(websocket: warp::ws::WebSocket) {
11         // echo all messages back
12         let (tx, rx) = websocket.split();
13         rx.forward(tx).map(|result| {
14             if let Err(e) = result {
15                 log::warn!("websocket error: {:?}", e);
16             }
17         }).await;
18     }
19
20     // websocat ws://127.0.0.1:3030/ws-rev
21     pub async fn ws_rev_connected(websocket: warp::ws::WebSocket) {
22         // echo all messages back
23         tokio::task::spawn(async {
24             let (mut tx, mut rx) = websocket.split();
25             while let Some(message) = rx.next().await {
26                 let msg = match message {
27                     Ok(msg) =>  msg,
28                     Err(e) => {
29                         log::error!("websocket error: {}", e);
30                         break;
31                     }
32                 };
33                 log::info!("ws_rev_connected: got message: {:?}", msg);
34
35                 if msg.is_close() {
36                     break;
37                 }
38                 if msg.is_text() {
39                     let text = msg.to_str().unwrap();
40                     let rev = text.chars().rev().collect::<String>();
41                     if let Err(e) = tx.send(warp::ws::Message::text(rev)).await {
42                         // disconnected
43                         log::info!("peer disconnected: {}", e);
44                         break;
45                     }
46                 }
47                 if msg.is_binary() {
48                     let mut rev = msg.into_bytes();
49                     rev.reverse();
50                     if let Err(e) = tx.send(warp::ws::Message::binary(rev)).await {
51                         // disconnected
52                         log::info!("peer disconnected: {}", e);
53                         break;
54                     }
55                 }
56             }
57             log::info!("ws_rev_connected ended");
58         });
59     }
60 }
61
62 mod filters {
63     use warp::Filter;
64     use super::handlers;
65
66     pub fn hello() -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
67         warp::path!("hello" / String)
68             .and(warp::header::<String>("user-agent"))
69             .and_then(handlers::hello)
70     }
71
72     pub fn ws_echo() -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
73         warp::path("ws-echo")
74             .and(warp::ws())
75             .map(|ws: warp::ws::Ws| { ws.on_upgrade(handlers::ws_echo_connected) })
76     }
77
78     pub fn ws_rev() -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
79         warp::path("ws-rev")
80             .and(warp::ws())
81             .map(|ws: warp::ws::Ws| { ws.on_upgrade(handlers::ws_rev_connected) })
82     }
83
84     pub fn api() -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
85         hello()
86             .or(ws_echo())
87             .or(ws_rev())
88             .with(warp::log("warp-server"))
89     }
90 }
91
92 #[tokio::main]
93 async fn main() {
94     env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
95     warp::serve(filters::api())
96         .run(([127, 0, 0, 1], 3030))
97         .await;
98 }