]> piware.de Git - learn-rust.git/commitdiff
warp-server: Move handlers and filters into submodules
authorMartin Pitt <martin@piware.de>
Fri, 9 Dec 2022 07:47:09 +0000 (08:47 +0100)
committerMartin Pitt <martin@piware.de>
Fri, 9 Dec 2022 07:47:09 +0000 (08:47 +0100)
Exposing the filters as functions makes them accessible to unit testing.

warp-server/src/main.rs

index 0664684196c3173d1b1a2361c0039cbf9b230001..b23f2af1d39e2af95dc304b96264ed33dc9e457c 100644 (file)
@@ -1,85 +1,98 @@
-use futures_util::{FutureExt, StreamExt, SinkExt};
-use warp::Filter;
+mod handlers {
+    use futures_util::{FutureExt, StreamExt, SinkExt};
 
-// GET /hello/warp => 200 OK with body "Hello, warp!"
-async fn hello(name: String, agent: String) -> Result<impl warp::Reply, warp::Rejection> {
-    Ok(format!("Hello, {} from {}!", name, agent))
-}
+    // GET /hello/warp => 200 OK with body "Hello, warp!"
+    pub async fn hello(name: String, agent: String) -> Result<impl warp::Reply, warp::Rejection> {
+        Ok(format!("Hello, {} from {}!", name, agent))
+    }
 
-// websocat ws://127.0.0.1:3030/ws-echo
-async fn ws_echo_connected(websocket: warp::ws::WebSocket) {
-    // echo all messages back
-    let (tx, rx) = websocket.split();
-    rx.forward(tx).map(|result| {
-        if let Err(e) = result {
-            log::warn!("websocket error: {:?}", e);
-        }
-    }).await;
-}
+    // websocat ws://127.0.0.1:3030/ws-echo
+    pub async fn ws_echo_connected(websocket: warp::ws::WebSocket) {
+        // echo all messages back
+        let (tx, rx) = websocket.split();
+        rx.forward(tx).map(|result| {
+            if let Err(e) = result {
+                log::warn!("websocket error: {:?}", e);
+            }
+        }).await;
+    }
 
-// websocat ws://127.0.0.1:3030/ws-rev
-async fn ws_rev_connected(websocket: warp::ws::WebSocket) {
-    // echo all messages back
-    tokio::task::spawn(async {
-        let (mut tx, mut rx) = websocket.split();
-        while let Some(message) = rx.next().await {
-            let msg = match message {
-                Ok(msg) =>  msg,
-                Err(e) => {
-                    log::error!("websocket error: {}", e);
-                    break;
-                }
-            };
-            log::info!("ws_rev_connected: got message: {:?}", msg);
+    // websocat ws://127.0.0.1:3030/ws-rev
+    pub async fn ws_rev_connected(websocket: warp::ws::WebSocket) {
+        // echo all messages back
+        tokio::task::spawn(async {
+            let (mut tx, mut rx) = websocket.split();
+            while let Some(message) = rx.next().await {
+                let msg = match message {
+                    Ok(msg) =>  msg,
+                    Err(e) => {
+                        log::error!("websocket error: {}", e);
+                        break;
+                    }
+                };
+                log::info!("ws_rev_connected: got message: {:?}", msg);
 
-            if msg.is_close() {
-                break;
-            }
-            if msg.is_text() {
-                let text = msg.to_str().unwrap();
-                let rev = text.chars().rev().collect::<String>();
-                if let Err(e) = tx.send(warp::ws::Message::text(rev)).await {
-                    // disconnected
-                    log::info!("peer disconnected: {}", e);
+                if msg.is_close() {
                     break;
                 }
-            }
-            if msg.is_binary() {
-                let mut rev = msg.into_bytes();
-                rev.reverse();
-                if let Err(e) = tx.send(warp::ws::Message::binary(rev)).await {
-                    // disconnected
-                    log::info!("peer disconnected: {}", e);
-                    break;
+                if msg.is_text() {
+                    let text = msg.to_str().unwrap();
+                    let rev = text.chars().rev().collect::<String>();
+                    if let Err(e) = tx.send(warp::ws::Message::text(rev)).await {
+                        // disconnected
+                        log::info!("peer disconnected: {}", e);
+                        break;
+                    }
+                }
+                if msg.is_binary() {
+                    let mut rev = msg.into_bytes();
+                    rev.reverse();
+                    if let Err(e) = tx.send(warp::ws::Message::binary(rev)).await {
+                        // disconnected
+                        log::info!("peer disconnected: {}", e);
+                        break;
+                    }
                 }
             }
-        }
-        log::info!("ws_rev_connected ended");
-    });
+            log::info!("ws_rev_connected ended");
+        });
+    }
 }
 
-#[tokio::main]
-async fn main() {
-    env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
+mod filters {
+    use warp::Filter;
+    use super::handlers;
 
-    let hello = warp::path!("hello" / String)
-        .and(warp::header::<String>("user-agent"))
-        .and_then(hello);
+    pub fn hello() -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
+        warp::path!("hello" / String)
+            .and(warp::header::<String>("user-agent"))
+            .and_then(handlers::hello)
+    }
 
-    let ws_echo = warp::path("ws-echo")
-        .and(warp::ws())
-        .map(|ws: warp::ws::Ws| { ws.on_upgrade(ws_echo_connected) });
+    pub fn ws_echo() -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
+        warp::path("ws-echo")
+            .and(warp::ws())
+            .map(|ws: warp::ws::Ws| { ws.on_upgrade(handlers::ws_echo_connected) })
+    }
 
-    let ws_rev = warp::path("ws-rev")
-        .and(warp::ws())
-        .map(|ws: warp::ws::Ws| { ws.on_upgrade(ws_rev_connected) });
+    pub fn ws_rev() -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
+        warp::path("ws-rev")
+            .and(warp::ws())
+            .map(|ws: warp::ws::Ws| { ws.on_upgrade(handlers::ws_rev_connected) })
+    }
 
-    let api = hello
-        .or(ws_echo)
-        .or(ws_rev)
-        .with(warp::log("warp-server"));
+    pub fn api() -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
+        hello()
+            .or(ws_echo())
+            .or(ws_rev())
+            .with(warp::log("warp-server"))
+    }
+}
 
-    warp::serve(api)
+#[tokio::main]
+async fn main() {
+    env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
+    warp::serve(filters::api())
         .run(([127, 0, 0, 1], 3030))
         .await;
 }