X-Git-Url: https://piware.de/gitweb/?a=blobdiff_plain;f=warp-server%2Fsrc%2Fmain.rs;h=7aa6f977d831c977800cb248818231a3b85d21fe;hb=be3212fb7c5a4b7a9f595f51f821c79cbda9df3a;hp=f4983e73ed45895573a7ca16dbf11459a4ac0348;hpb=05975f1d5fcc331c82f3b9786a5f4afbc027d4cd;p=learn-rust.git diff --git a/warp-server/src/main.rs b/warp-server/src/main.rs index f4983e7..7aa6f97 100644 --- a/warp-server/src/main.rs +++ b/warp-server/src/main.rs @@ -1,29 +1,101 @@ -use futures_util::{FutureExt, StreamExt}; +use futures_util::{FutureExt, StreamExt, SinkExt}; use warp::Filter; -#[tokio::main] -async fn main() { - // GET /hello/warp => 200 OK with body "Hello, warp!" +// GET /hello/warp => 200 OK with body "Hello, warp!" +async fn hello(name: String, agent: String) -> Result { + Ok(format!("Hello, {} from {}!", name, agent)) +} + +// websocat ws://127.0.0.1:3030/ws-echo +async fn ws_echo_connected(websocket: warp::ws::WebSocket) { + // echo all messages back + let (tx, rx) = websocket.split(); + rx.forward(tx).map(|result| { + if let Err(e) = result { + log::warn!("websocket error: {:?}", e); + } + }).await; +} + +// websocat ws://127.0.0.1:3030/ws-rev +async fn ws_rev_connected(websocket: warp::ws::WebSocket) { + // echo all messages back + tokio::task::spawn(async { + let (mut tx, mut rx) = websocket.split(); + while let Some(message) = rx.next().await { + let msg = match message { + Ok(msg) => msg, + Err(e) => { + log::error!("websocket error: {}", e); + break; + } + }; + log::info!("ws_rev_connected: got message: {:?}", msg); + + if msg.is_close() { + break; + } + if msg.is_text() { + let text = msg.to_str().unwrap(); + let rev = text.chars().rev().collect::(); + if let Err(e) = tx.send(warp::ws::Message::text(rev)).await { + // disconnected + log::info!("peer disconnected: {}", e); + break; + } + } + if msg.is_binary() { + let mut rev = msg.into_bytes(); + rev.reverse(); + if let Err(e) = tx.send(warp::ws::Message::binary(rev)).await { + // disconnected + log::info!("peer disconnected: {}", e); + break; + } + } + } + log::info!("ws_rev_connected ended"); + }); +} + +pub fn api() -> impl Filter + Clone { let hello = warp::path!("hello" / String) .and(warp::header::("user-agent")) - .map(|name, agent| format!("Hello, {} from {}!", name, agent)); + .and_then(hello); - // websocat ws://127.0.0.1:3030/ws-echo - let echo = warp::path("ws-echo") + let ws_echo = warp::path("ws-echo") .and(warp::ws()) - .map(|ws: warp::ws::Ws| { - ws.on_upgrade(|websocket| { - // echo all messages back - let (tx, rx) = websocket.split(); - rx.forward(tx).map(|result| { - if let Err(e) = result { - eprintln!("websocket error: {:?}", e); - } - }) - }) - }); - - warp::serve(hello.or(echo)) + .map(|ws: warp::ws::Ws| { ws.on_upgrade(ws_echo_connected) }); + + let ws_rev = warp::path("ws-rev") + .and(warp::ws()) + .map(|ws: warp::ws::Ws| { ws.on_upgrade(ws_rev_connected) }); + + hello + .or(ws_echo) + .or(ws_rev) + .with(warp::log("warp-server")) +} + +#[tokio::main] +async fn main() { + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); + + warp::serve(api()) .run(([127, 0, 0, 1], 3030)) .await; } + +#[cfg(test)] +mod tests { + #[tokio::test] + async fn test_hello() { + let res = warp::test::request() + .path("/hello/rust") + .header("user-agent", "TestBrowser 0.1") + .reply(&super::api()) + .await; + assert_eq!(res.status(), 200); + assert_eq!(res.body(), "Hello, rust from TestBrowser 0.1!"); + } +}