]> piware.de Git - learn-rust.git/commitdiff
warp-server: Add route for reversing websocket messages
authorMartin Pitt <martin@piware.de>
Wed, 9 Nov 2022 22:23:58 +0000 (23:23 +0100)
committerMartin Pitt <martin@piware.de>
Wed, 9 Nov 2022 22:23:58 +0000 (23:23 +0100)
warp-server/src/main.rs

index 06977e62cf4e50911b5e00ba6f866c2daa49dc52..0664684196c3173d1b1a2361c0039cbf9b230001 100644 (file)
@@ -1,4 +1,4 @@
-use futures_util::{FutureExt, StreamExt};
+use futures_util::{FutureExt, StreamExt, SinkExt};
 use warp::Filter;
 
 // GET /hello/warp => 200 OK with body "Hello, warp!"
@@ -17,6 +17,47 @@ async fn ws_echo_connected(websocket: warp::ws::WebSocket) {
     }).await;
 }
 
+// websocat ws://127.0.0.1:3030/ws-rev
+async fn ws_rev_connected(websocket: warp::ws::WebSocket) {
+    // echo all messages back
+    tokio::task::spawn(async {
+        let (mut tx, mut rx) = websocket.split();
+        while let Some(message) = rx.next().await {
+            let msg = match message {
+                Ok(msg) =>  msg,
+                Err(e) => {
+                    log::error!("websocket error: {}", e);
+                    break;
+                }
+            };
+            log::info!("ws_rev_connected: got message: {:?}", msg);
+
+            if msg.is_close() {
+                break;
+            }
+            if msg.is_text() {
+                let text = msg.to_str().unwrap();
+                let rev = text.chars().rev().collect::<String>();
+                if let Err(e) = tx.send(warp::ws::Message::text(rev)).await {
+                    // disconnected
+                    log::info!("peer disconnected: {}", e);
+                    break;
+                }
+            }
+            if msg.is_binary() {
+                let mut rev = msg.into_bytes();
+                rev.reverse();
+                if let Err(e) = tx.send(warp::ws::Message::binary(rev)).await {
+                    // disconnected
+                    log::info!("peer disconnected: {}", e);
+                    break;
+                }
+            }
+        }
+        log::info!("ws_rev_connected ended");
+    });
+}
+
 #[tokio::main]
 async fn main() {
     env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
@@ -27,12 +68,15 @@ async fn main() {
 
     let ws_echo = warp::path("ws-echo")
         .and(warp::ws())
-        .map(|ws: warp::ws::Ws| {
-            ws.on_upgrade(|websocket| { ws_echo_connected(websocket) })
-        });
+        .map(|ws: warp::ws::Ws| { ws.on_upgrade(ws_echo_connected) });
+
+    let ws_rev = warp::path("ws-rev")
+        .and(warp::ws())
+        .map(|ws: warp::ws::Ws| { ws.on_upgrade(ws_rev_connected) });
 
     let api = hello
         .or(ws_echo)
+        .or(ws_rev)
         .with(warp::log("warp-server"));
 
     warp::serve(api)