-use futures_util::{FutureExt, StreamExt};
+use futures_util::{FutureExt, StreamExt, SinkExt};
use warp::Filter;
// GET /hello/warp => 200 OK with body "Hello, warp!"
}).await;
}
+// websocat ws://127.0.0.1:3030/ws-rev
+async fn ws_rev_connected(websocket: warp::ws::WebSocket) {
+ // echo all messages back
+ tokio::task::spawn(async {
+ let (mut tx, mut rx) = websocket.split();
+ while let Some(message) = rx.next().await {
+ let msg = match message {
+ Ok(msg) => msg,
+ Err(e) => {
+ log::error!("websocket error: {}", e);
+ break;
+ }
+ };
+ log::info!("ws_rev_connected: got message: {:?}", msg);
+
+ if msg.is_close() {
+ break;
+ }
+ if msg.is_text() {
+ let text = msg.to_str().unwrap();
+ let rev = text.chars().rev().collect::<String>();
+ if let Err(e) = tx.send(warp::ws::Message::text(rev)).await {
+ // disconnected
+ log::info!("peer disconnected: {}", e);
+ break;
+ }
+ }
+ if msg.is_binary() {
+ let mut rev = msg.into_bytes();
+ rev.reverse();
+ if let Err(e) = tx.send(warp::ws::Message::binary(rev)).await {
+ // disconnected
+ log::info!("peer disconnected: {}", e);
+ break;
+ }
+ }
+ }
+ log::info!("ws_rev_connected ended");
+ });
+}
+
#[tokio::main]
async fn main() {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
let ws_echo = warp::path("ws-echo")
.and(warp::ws())
- .map(|ws: warp::ws::Ws| {
- ws.on_upgrade(|websocket| { ws_echo_connected(websocket) })
- });
+ .map(|ws: warp::ws::Ws| { ws.on_upgrade(ws_echo_connected) });
+
+ let ws_rev = warp::path("ws-rev")
+ .and(warp::ws())
+ .map(|ws: warp::ws::Ws| { ws.on_upgrade(ws_rev_connected) });
let api = hello
.or(ws_echo)
+ .or(ws_rev)
.with(warp::log("warp-server"));
warp::serve(api)