From acfe8f5f123d63623559446d08a01bf29987e5b1 Mon Sep 17 00:00:00 2001 From: Martin Pitt Date: Wed, 9 Nov 2022 23:23:58 +0100 Subject: [PATCH] warp-server: Add route for reversing websocket messages --- warp-server/src/main.rs | 52 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 48 insertions(+), 4 deletions(-) diff --git a/warp-server/src/main.rs b/warp-server/src/main.rs index 06977e6..0664684 100644 --- a/warp-server/src/main.rs +++ b/warp-server/src/main.rs @@ -1,4 +1,4 @@ -use futures_util::{FutureExt, StreamExt}; +use futures_util::{FutureExt, StreamExt, SinkExt}; use warp::Filter; // GET /hello/warp => 200 OK with body "Hello, warp!" @@ -17,6 +17,47 @@ async fn ws_echo_connected(websocket: warp::ws::WebSocket) { }).await; } +// websocat ws://127.0.0.1:3030/ws-rev +async fn ws_rev_connected(websocket: warp::ws::WebSocket) { + // echo all messages back + tokio::task::spawn(async { + let (mut tx, mut rx) = websocket.split(); + while let Some(message) = rx.next().await { + let msg = match message { + Ok(msg) => msg, + Err(e) => { + log::error!("websocket error: {}", e); + break; + } + }; + log::info!("ws_rev_connected: got message: {:?}", msg); + + if msg.is_close() { + break; + } + if msg.is_text() { + let text = msg.to_str().unwrap(); + let rev = text.chars().rev().collect::(); + if let Err(e) = tx.send(warp::ws::Message::text(rev)).await { + // disconnected + log::info!("peer disconnected: {}", e); + break; + } + } + if msg.is_binary() { + let mut rev = msg.into_bytes(); + rev.reverse(); + if let Err(e) = tx.send(warp::ws::Message::binary(rev)).await { + // disconnected + log::info!("peer disconnected: {}", e); + break; + } + } + } + log::info!("ws_rev_connected ended"); + }); +} + #[tokio::main] async fn main() { env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); @@ -27,12 +68,15 @@ async fn main() { let ws_echo = warp::path("ws-echo") .and(warp::ws()) - .map(|ws: warp::ws::Ws| { - ws.on_upgrade(|websocket| { ws_echo_connected(websocket) }) - }); + .map(|ws: warp::ws::Ws| { ws.on_upgrade(ws_echo_connected) }); + + let ws_rev = warp::path("ws-rev") + .and(warp::ws()) + .map(|ws: warp::ws::Ws| { ws.on_upgrade(ws_rev_connected) }); let api = hello .or(ws_echo) + .or(ws_rev) .with(warp::log("warp-server")); warp::serve(api) -- 2.39.5