X-Git-Url: https://piware.de/gitweb/?p=learn-rust.git;a=blobdiff_plain;f=warp-server%2Fsrc%2Fmain.rs;fp=warp-server%2Fsrc%2Fmain.rs;h=0664684196c3173d1b1a2361c0039cbf9b230001;hp=b23f2af1d39e2af95dc304b96264ed33dc9e457c;hb=354db3bfde77c75c9d53a76b7f3bc09de3243e00;hpb=6d7b20b8f00be8b9cc86ad835d64bceca8a8b628 diff --git a/warp-server/src/main.rs b/warp-server/src/main.rs index b23f2af..0664684 100644 --- a/warp-server/src/main.rs +++ b/warp-server/src/main.rs @@ -1,98 +1,85 @@ -mod handlers { - use futures_util::{FutureExt, StreamExt, SinkExt}; +use futures_util::{FutureExt, StreamExt, SinkExt}; +use warp::Filter; - // GET /hello/warp => 200 OK with body "Hello, warp!" - pub async fn hello(name: String, agent: String) -> Result { - Ok(format!("Hello, {} from {}!", name, agent)) - } - - // websocat ws://127.0.0.1:3030/ws-echo - pub async fn ws_echo_connected(websocket: warp::ws::WebSocket) { - // echo all messages back - let (tx, rx) = websocket.split(); - rx.forward(tx).map(|result| { - if let Err(e) = result { - log::warn!("websocket error: {:?}", e); - } - }).await; - } +// GET /hello/warp => 200 OK with body "Hello, warp!" +async fn hello(name: String, agent: String) -> Result { + Ok(format!("Hello, {} from {}!", name, agent)) +} - // websocat ws://127.0.0.1:3030/ws-rev - pub async fn ws_rev_connected(websocket: warp::ws::WebSocket) { - // echo all messages back - tokio::task::spawn(async { - let (mut tx, mut rx) = websocket.split(); - while let Some(message) = rx.next().await { - let msg = match message { - Ok(msg) => msg, - Err(e) => { - log::error!("websocket error: {}", e); - break; - } - }; - log::info!("ws_rev_connected: got message: {:?}", msg); +// websocat ws://127.0.0.1:3030/ws-echo +async fn ws_echo_connected(websocket: warp::ws::WebSocket) { + // echo all messages back + let (tx, rx) = websocket.split(); + rx.forward(tx).map(|result| { + if let Err(e) = result { + log::warn!("websocket error: {:?}", e); + } + }).await; +} - if msg.is_close() { +// websocat ws://127.0.0.1:3030/ws-rev +async fn ws_rev_connected(websocket: warp::ws::WebSocket) { + // echo all messages back + tokio::task::spawn(async { + let (mut tx, mut rx) = websocket.split(); + while let Some(message) = rx.next().await { + let msg = match message { + Ok(msg) => msg, + Err(e) => { + log::error!("websocket error: {}", e); break; } - if msg.is_text() { - let text = msg.to_str().unwrap(); - let rev = text.chars().rev().collect::(); - if let Err(e) = tx.send(warp::ws::Message::text(rev)).await { - // disconnected - log::info!("peer disconnected: {}", e); - break; - } + }; + log::info!("ws_rev_connected: got message: {:?}", msg); + + if msg.is_close() { + break; + } + if msg.is_text() { + let text = msg.to_str().unwrap(); + let rev = text.chars().rev().collect::(); + if let Err(e) = tx.send(warp::ws::Message::text(rev)).await { + // disconnected + log::info!("peer disconnected: {}", e); + break; } - if msg.is_binary() { - let mut rev = msg.into_bytes(); - rev.reverse(); - if let Err(e) = tx.send(warp::ws::Message::binary(rev)).await { - // disconnected - log::info!("peer disconnected: {}", e); - break; - } + } + if msg.is_binary() { + let mut rev = msg.into_bytes(); + rev.reverse(); + if let Err(e) = tx.send(warp::ws::Message::binary(rev)).await { + // disconnected + log::info!("peer disconnected: {}", e); + break; } } - log::info!("ws_rev_connected ended"); - }); - } + } + log::info!("ws_rev_connected ended"); + }); } -mod filters { - use warp::Filter; - use super::handlers; +#[tokio::main] +async fn main() { + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); - pub fn hello() -> impl Filter + Clone { - warp::path!("hello" / String) - .and(warp::header::("user-agent")) - .and_then(handlers::hello) - } + let hello = warp::path!("hello" / String) + .and(warp::header::("user-agent")) + .and_then(hello); - pub fn ws_echo() -> impl Filter + Clone { - warp::path("ws-echo") - .and(warp::ws()) - .map(|ws: warp::ws::Ws| { ws.on_upgrade(handlers::ws_echo_connected) }) - } + let ws_echo = warp::path("ws-echo") + .and(warp::ws()) + .map(|ws: warp::ws::Ws| { ws.on_upgrade(ws_echo_connected) }); - pub fn ws_rev() -> impl Filter + Clone { - warp::path("ws-rev") - .and(warp::ws()) - .map(|ws: warp::ws::Ws| { ws.on_upgrade(handlers::ws_rev_connected) }) - } + let ws_rev = warp::path("ws-rev") + .and(warp::ws()) + .map(|ws: warp::ws::Ws| { ws.on_upgrade(ws_rev_connected) }); - pub fn api() -> impl Filter + Clone { - hello() - .or(ws_echo()) - .or(ws_rev()) - .with(warp::log("warp-server")) - } -} + let api = hello + .or(ws_echo) + .or(ws_rev) + .with(warp::log("warp-server")); -#[tokio::main] -async fn main() { - env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); - warp::serve(filters::api()) + warp::serve(api) .run(([127, 0, 0, 1], 3030)) .await; }