From: Martin Pitt Date: Fri, 9 Dec 2022 07:47:09 +0000 (+0100) Subject: warp-server: Move handlers and filters into submodules X-Git-Url: https://piware.de/gitweb/?p=learn-rust.git;a=commitdiff_plain;h=6d7b20b8f00be8b9cc86ad835d64bceca8a8b628 warp-server: Move handlers and filters into submodules Exposing the filters as functions makes them accessible to unit testing. --- diff --git a/warp-server/src/main.rs b/warp-server/src/main.rs index 0664684..b23f2af 100644 --- a/warp-server/src/main.rs +++ b/warp-server/src/main.rs @@ -1,85 +1,98 @@ -use futures_util::{FutureExt, StreamExt, SinkExt}; -use warp::Filter; +mod handlers { + use futures_util::{FutureExt, StreamExt, SinkExt}; -// GET /hello/warp => 200 OK with body "Hello, warp!" -async fn hello(name: String, agent: String) -> Result { - Ok(format!("Hello, {} from {}!", name, agent)) -} + // GET /hello/warp => 200 OK with body "Hello, warp!" + pub async fn hello(name: String, agent: String) -> Result { + Ok(format!("Hello, {} from {}!", name, agent)) + } -// websocat ws://127.0.0.1:3030/ws-echo -async fn ws_echo_connected(websocket: warp::ws::WebSocket) { - // echo all messages back - let (tx, rx) = websocket.split(); - rx.forward(tx).map(|result| { - if let Err(e) = result { - log::warn!("websocket error: {:?}", e); - } - }).await; -} + // websocat ws://127.0.0.1:3030/ws-echo + pub async fn ws_echo_connected(websocket: warp::ws::WebSocket) { + // echo all messages back + let (tx, rx) = websocket.split(); + rx.forward(tx).map(|result| { + if let Err(e) = result { + log::warn!("websocket error: {:?}", e); + } + }).await; + } -// websocat ws://127.0.0.1:3030/ws-rev -async fn ws_rev_connected(websocket: warp::ws::WebSocket) { - // echo all messages back - tokio::task::spawn(async { - let (mut tx, mut rx) = websocket.split(); - while let Some(message) = rx.next().await { - let msg = match message { - Ok(msg) => msg, - Err(e) => { - log::error!("websocket error: {}", e); - break; - } - }; - log::info!("ws_rev_connected: got message: {:?}", msg); + // websocat ws://127.0.0.1:3030/ws-rev + pub async fn ws_rev_connected(websocket: warp::ws::WebSocket) { + // echo all messages back + tokio::task::spawn(async { + let (mut tx, mut rx) = websocket.split(); + while let Some(message) = rx.next().await { + let msg = match message { + Ok(msg) => msg, + Err(e) => { + log::error!("websocket error: {}", e); + break; + } + }; + log::info!("ws_rev_connected: got message: {:?}", msg); - if msg.is_close() { - break; - } - if msg.is_text() { - let text = msg.to_str().unwrap(); - let rev = text.chars().rev().collect::(); - if let Err(e) = tx.send(warp::ws::Message::text(rev)).await { - // disconnected - log::info!("peer disconnected: {}", e); + if msg.is_close() { break; } - } - if msg.is_binary() { - let mut rev = msg.into_bytes(); - rev.reverse(); - if let Err(e) = tx.send(warp::ws::Message::binary(rev)).await { - // disconnected - log::info!("peer disconnected: {}", e); - break; + if msg.is_text() { + let text = msg.to_str().unwrap(); + let rev = text.chars().rev().collect::(); + if let Err(e) = tx.send(warp::ws::Message::text(rev)).await { + // disconnected + log::info!("peer disconnected: {}", e); + break; + } + } + if msg.is_binary() { + let mut rev = msg.into_bytes(); + rev.reverse(); + if let Err(e) = tx.send(warp::ws::Message::binary(rev)).await { + // disconnected + log::info!("peer disconnected: {}", e); + break; + } } } - } - log::info!("ws_rev_connected ended"); - }); + log::info!("ws_rev_connected ended"); + }); + } } -#[tokio::main] -async fn main() { - env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); +mod filters { + use warp::Filter; + use super::handlers; - let hello = warp::path!("hello" / String) - .and(warp::header::("user-agent")) - .and_then(hello); + pub fn hello() -> impl Filter + Clone { + warp::path!("hello" / String) + .and(warp::header::("user-agent")) + .and_then(handlers::hello) + } - let ws_echo = warp::path("ws-echo") - .and(warp::ws()) - .map(|ws: warp::ws::Ws| { ws.on_upgrade(ws_echo_connected) }); + pub fn ws_echo() -> impl Filter + Clone { + warp::path("ws-echo") + .and(warp::ws()) + .map(|ws: warp::ws::Ws| { ws.on_upgrade(handlers::ws_echo_connected) }) + } - let ws_rev = warp::path("ws-rev") - .and(warp::ws()) - .map(|ws: warp::ws::Ws| { ws.on_upgrade(ws_rev_connected) }); + pub fn ws_rev() -> impl Filter + Clone { + warp::path("ws-rev") + .and(warp::ws()) + .map(|ws: warp::ws::Ws| { ws.on_upgrade(handlers::ws_rev_connected) }) + } - let api = hello - .or(ws_echo) - .or(ws_rev) - .with(warp::log("warp-server")); + pub fn api() -> impl Filter + Clone { + hello() + .or(ws_echo()) + .or(ws_rev()) + .with(warp::log("warp-server")) + } +} - warp::serve(api) +#[tokio::main] +async fn main() { + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); + warp::serve(filters::api()) .run(([127, 0, 0, 1], 3030)) .await; }