X-Git-Url: https://piware.de/gitweb/?p=learn-rust.git;a=blobdiff_plain;f=warp-server%2Fsrc%2Fmain.rs;h=0664684196c3173d1b1a2361c0039cbf9b230001;hp=f4983e73ed45895573a7ca16dbf11459a4ac0348;hb=acfe8f5f123d63623559446d08a01bf29987e5b1;hpb=05975f1d5fcc331c82f3b9786a5f4afbc027d4cd diff --git a/warp-server/src/main.rs b/warp-server/src/main.rs index f4983e7..0664684 100644 --- a/warp-server/src/main.rs +++ b/warp-server/src/main.rs @@ -1,29 +1,85 @@ -use futures_util::{FutureExt, StreamExt}; +use futures_util::{FutureExt, StreamExt, SinkExt}; use warp::Filter; +// GET /hello/warp => 200 OK with body "Hello, warp!" +async fn hello(name: String, agent: String) -> Result { + Ok(format!("Hello, {} from {}!", name, agent)) +} + +// websocat ws://127.0.0.1:3030/ws-echo +async fn ws_echo_connected(websocket: warp::ws::WebSocket) { + // echo all messages back + let (tx, rx) = websocket.split(); + rx.forward(tx).map(|result| { + if let Err(e) = result { + log::warn!("websocket error: {:?}", e); + } + }).await; +} + +// websocat ws://127.0.0.1:3030/ws-rev +async fn ws_rev_connected(websocket: warp::ws::WebSocket) { + // echo all messages back + tokio::task::spawn(async { + let (mut tx, mut rx) = websocket.split(); + while let Some(message) = rx.next().await { + let msg = match message { + Ok(msg) => msg, + Err(e) => { + log::error!("websocket error: {}", e); + break; + } + }; + log::info!("ws_rev_connected: got message: {:?}", msg); + + if msg.is_close() { + break; + } + if msg.is_text() { + let text = msg.to_str().unwrap(); + let rev = text.chars().rev().collect::(); + if let Err(e) = tx.send(warp::ws::Message::text(rev)).await { + // disconnected + log::info!("peer disconnected: {}", e); + break; + } + } + if msg.is_binary() { + let mut rev = msg.into_bytes(); + rev.reverse(); + if let Err(e) = tx.send(warp::ws::Message::binary(rev)).await { + // disconnected + log::info!("peer disconnected: {}", e); + break; + } + } + } + log::info!("ws_rev_connected ended"); + }); +} + #[tokio::main] async fn main() { - // GET /hello/warp => 200 OK with body "Hello, warp!" + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); + let hello = warp::path!("hello" / String) .and(warp::header::("user-agent")) - .map(|name, agent| format!("Hello, {} from {}!", name, agent)); + .and_then(hello); + + let ws_echo = warp::path("ws-echo") + .and(warp::ws()) + .map(|ws: warp::ws::Ws| { ws.on_upgrade(ws_echo_connected) }); - // websocat ws://127.0.0.1:3030/ws-echo - let echo = warp::path("ws-echo") + let ws_rev = warp::path("ws-rev") .and(warp::ws()) - .map(|ws: warp::ws::Ws| { - ws.on_upgrade(|websocket| { - // echo all messages back - let (tx, rx) = websocket.split(); - rx.forward(tx).map(|result| { - if let Err(e) = result { - eprintln!("websocket error: {:?}", e); - } - }) - }) - }); - - warp::serve(hello.or(echo)) + .map(|ws: warp::ws::Ws| { ws.on_upgrade(ws_rev_connected) }); + + let api = hello + .or(ws_echo) + .or(ws_rev) + .with(warp::log("warp-server")); + + warp::serve(api) .run(([127, 0, 0, 1], 3030)) .await; }