From: Martin Pitt Date: Fri, 6 Jan 2023 14:47:19 +0000 (+0100) Subject: concepts: rustfmt X-Git-Url: https://piware.de/gitweb/?p=learn-rust.git;a=commitdiff_plain;h=HEAD;hp=8a185a1213d5ad2d53df30bac8cfb1987a5cff9f concepts: rustfmt --- diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml new file mode 100644 index 0000000..4208b99 --- /dev/null +++ b/actix-server/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "actix-server" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +actix = "0.13" +actix-web = "4" +actix-web-actors = "4.1" +actix-files = "0.6" +env_logger = "0.9" +log = "0.4" + +[dev-dependencies] +actix-test = "0.1" +futures-util = "0.3" diff --git a/actix-server/src/main.rs b/actix-server/src/main.rs new file mode 100644 index 0000000..44ed45c --- /dev/null +++ b/actix-server/src/main.rs @@ -0,0 +1,241 @@ +use std::path::Path; + +use actix::{Actor, ActorContext, StreamHandler}; +use actix_web::{get, route, web, App, Error, HttpRequest, HttpResponse, HttpServer, Responder, Result}; +use actix_web::http::header; +use actix_web::middleware::Logger; +use actix_files::{Files, NamedFile}; +use actix_web_actors::ws; + +#[route("/hello/{name}", method="GET", method="HEAD")] +async fn hello(params: web::Path, req: HttpRequest) -> Result { + let name = params.into_inner(); + + match req.headers().get(header::USER_AGENT) { + Some(agent) => Ok(format!("Hello {} from {}!", name, agent.to_str().unwrap())), + None => Ok(format!("Hello {}!", name)) + } +} + +#[get("/file/{path:.*}")] +async fn static_file(params: web::Path, req: HttpRequest) -> Result { + let request_path = params.into_inner(); + let disk_path = "../static/".to_string() + &request_path; + + // if the client accepts gzip encoding, try that first + if let Some(accept_encoding) = req.headers().get(header::ACCEPT_ENCODING) { + if accept_encoding.to_str().unwrap().contains("gzip") { + let path_gz = disk_path.clone() + ".gz"; + if Path::new(&path_gz).is_file() { + log::debug!("client accepts gzip encoding, sending pre-compressed file {}", &path_gz); + return Ok(NamedFile::open_async(path_gz).await? + .customize() + .insert_header(header::ContentEncoding::Gzip)); + } + } + } + + // uncompressed file + Ok(NamedFile::open_async(disk_path).await?.customize()) +} + +struct WsEcho; + +impl Actor for WsEcho { + type Context = ws::WebsocketContext; +} + +impl StreamHandler> for WsEcho { + fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { + log::info!("WsEcho got message {:?}", msg); + match msg { + Ok(ws::Message::Ping(msg)) => ctx.pong(&msg), + Ok(ws::Message::Text(text)) => ctx.text(text), + Ok(ws::Message::Binary(bin)) => ctx.binary(bin), + Ok(ws::Message::Close(reason)) => { + ctx.close(reason); + ctx.stop(); + }, + _ => ctx.stop(), + } + } +} + +#[get("/ws-echo")] +async fn ws_echo(req: HttpRequest, stream: web::Payload) -> Result { + ws::start(WsEcho {}, &req, stream) +} + +struct WsRev; + +impl Actor for WsRev { + type Context = ws::WebsocketContext; +} + +impl StreamHandler> for WsRev { + fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { + log::info!("WsRev got message {:?}", msg); + match msg { + Ok(ws::Message::Ping(msg)) => ctx.pong(&msg), + Ok(ws::Message::Text(text)) => { + let rev = text.chars().rev().collect::(); + ctx.text(rev); + }, + Ok(ws::Message::Binary(bin)) => { + let mut rev = bin.to_vec(); + rev.reverse(); + ctx.binary(rev); + } + Ok(ws::Message::Close(reason)) => { + ctx.close(reason); + ctx.stop(); + }, + _ => ctx.stop(), + } + } +} + +#[get("/ws-rev")] +async fn ws_rev(req: HttpRequest, stream: web::Payload) -> Result { + ws::start(WsRev {}, &req, stream) +} + +// App is a template soup, too hard as a proper function +macro_rules! get_app { + () => { + App::new() + .service(hello) + .service(static_file) + .service(Files::new("/dir", "../static")) + .service(ws_echo) + .service(ws_rev) + } +} + +#[actix_web::main] +async fn main() -> std::io::Result<()> { + env_logger::init_from_env(env_logger::Env::default().default_filter_or("info")); + + HttpServer::new(|| { + get_app!() + .wrap(Logger::default()) + }) + .bind(("127.0.0.1", 3030))? + .run() + .await +} + +#[cfg(test)] +mod tests { + use actix_web::{App, body, test, web}; + use actix_web::http::{header, StatusCode}; + use actix_web_actors::ws; + use actix_files::Files; + + use futures_util::sink::SinkExt; + use futures_util::StreamExt; + + use super::{hello, static_file, ws_echo, ws_rev}; + + #[actix_web::test] + async fn test_hello() { + let app = test::init_service(get_app!()).await; + + // no user-agent + let req = test::TestRequest::get().uri("/hello/rust").to_request(); + let res = test::call_service(&app, req).await; + assert!(res.status().is_success()); + assert_eq!(body::to_bytes(res.into_body()).await.unwrap(), + web::Bytes::from_static(b"Hello rust!")); + + // with user-agent + let req = test::TestRequest::get() + .uri("/hello/rust") + .insert_header((header::USER_AGENT, "TestBrowser 0.1")) + .to_request(); + let res = test::call_service(&app, req).await; + assert!(res.status().is_success()); + assert_eq!(body::to_bytes(res.into_body()).await.unwrap(), + web::Bytes::from_static(b"Hello rust from TestBrowser 0.1!")); + } + + #[actix_web::test] + async fn test_static_dir() { + let app = test::init_service(get_app!()).await; + + let req = test::TestRequest::get().uri("/dir/plain.txt").to_request(); + let res = test::call_service(&app, req).await; + assert!(res.status().is_success()); + assert_eq!(body::to_bytes(res.into_body()).await.unwrap(), + web::Bytes::from_static(b"Hello world! This is uncompressed text.\n")); + + // subdir + let req = test::TestRequest::get().uri("/dir/dir1/optzip.txt").to_request(); + let res = test::call_service(&app, req).await; + assert!(res.status().is_success()); + assert_eq!(body::to_bytes(res.into_body()).await.unwrap(), + web::Bytes::from_static(b"This file is available uncompressed or compressed\n\ + AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\n")); + + // does not support transparent decompression + let req = test::TestRequest::get().uri("/dir/onlycompressed.txt").to_request(); + let res = test::call_service(&app, req).await; + assert_eq!(res.status(), StatusCode::NOT_FOUND); + } + + #[actix_web::test] + async fn test_static_file() { + let app = test::init_service(get_app!()).await; + + // uncompressed + let req = test::TestRequest::get().uri("/file/dir1/optzip.txt").to_request(); + let res = test::call_service(&app, req).await; + assert!(res.status().is_success()); + assert_eq!(body::to_bytes(res.into_body()).await.unwrap(), + web::Bytes::from_static(b"This file is available uncompressed or compressed\n\ + AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\n")); + + // gzipped + let req = test::TestRequest::get() + .uri("/file/dir1/optzip.txt") + .insert_header((header::ACCEPT_ENCODING, "deflate, gzip")) + .to_request(); + let res = test::call_service(&app, req).await; + assert!(res.status().is_success()); + let res_bytes = body::to_bytes(res.into_body()).await.unwrap(); + assert_eq!(res_bytes.len(), 63); // file size of ../static/dir1/optzip.txt.gz + assert_eq!(res_bytes[0], 31); + } + + #[actix_web::test] + async fn test_ws_echo() { + let mut srv = actix_test::start(|| get_app!()); + let mut client = srv.ws_at("/ws-echo").await.unwrap(); + + // text echo + client.send(ws::Message::Text("hello".into())).await.unwrap(); + let received = client.next().await.unwrap().unwrap(); + assert_eq!(received, ws::Frame::Text("hello".into())); + + // binary echo + client.send(ws::Message::Binary(web::Bytes::from_static(&[42, 99]))).await.unwrap(); + let received = client.next().await.unwrap().unwrap(); + assert_eq!(received, ws::Frame::Binary(web::Bytes::from_static(&[42, 99]))); + } + + #[actix_web::test] + async fn test_ws_rev() { + let mut srv = actix_test::start(|| get_app!()); + let mut client = srv.ws_at("/ws-rev").await.unwrap(); + + // text reversed + client.send(ws::Message::Text("hello".into())).await.unwrap(); + let received = client.next().await.unwrap().unwrap(); + assert_eq!(received, ws::Frame::Text("olleh".into())); + + // binary reversed + client.send(ws::Message::Binary(web::Bytes::from_static(&[42, 99]))).await.unwrap(); + let received = client.next().await.unwrap().unwrap(); + assert_eq!(received, ws::Frame::Binary(web::Bytes::from_static(&[99, 42]))); + } +} diff --git a/axum-server/Cargo.toml b/axum-server/Cargo.toml new file mode 100644 index 0000000..9033d18 --- /dev/null +++ b/axum-server/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "axum-server" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +axum = { version = "0.5", features = ["ws", "headers"] } +hyper = { version = "0.14" } +tokio = { version = "1", features = ["full"] } +tower = "0.4" +tower-http = { version = "0.3", features = ["trace", "fs"] } +tracing = "0.1" +tracing-subscriber = "0.3" diff --git a/axum-server/src/main.rs b/axum-server/src/main.rs new file mode 100644 index 0000000..fa44ac1 --- /dev/null +++ b/axum-server/src/main.rs @@ -0,0 +1,145 @@ +use std::io; + +use axum::{ + routing::{get, get_service}, + extract::{Path, TypedHeader, ws}, + http::{StatusCode}, + response, + Router}; + +async fn hello(Path(name): Path, user_agent: Option>) -> impl response::IntoResponse { + if let Some(TypedHeader(user_agent)) = user_agent { + (StatusCode::OK, format!("Hello {} from {}", name, user_agent)) + } else { + (StatusCode::OK, format!("Hello {}", name)) + } +} + +async fn ws_echo(mut socket: ws::WebSocket) { + while let Some(msg) = socket.recv().await { + if let Ok(msg) = msg { + tracing::debug!("websocket got message: {:?}", msg); + + let reply = match msg { + ws::Message::Text(t) => ws::Message::Text(t), + ws::Message::Binary(b) => ws::Message::Binary(b), + // axum handles Ping/Pong by itself + ws::Message::Ping(_) => { continue }, + ws::Message::Pong(_) => { continue }, + ws::Message::Close(_) => { break } + }; + + if socket.send(reply).await + .is_err() { + tracing::info!("websocket client disconnected"); + break; + } + } + else { + tracing::info!("websocket client disconnected"); + break; + } + } +} + +fn app() -> Router { + Router::new() + .route("/hello/:name", get(hello)) + .nest("/dir", + get_service(tower_http::services::ServeDir::new("../static").precompressed_gzip()) + .handle_error(|e: io::Error| async move { + (StatusCode::INTERNAL_SERVER_ERROR, format!("Unhandled internal error: {}", e)) + }) + ) + .route("/ws-echo", get(|ws: ws::WebSocketUpgrade| async {ws.on_upgrade(ws_echo)})) + .layer( + tower::ServiceBuilder::new() + .layer( + tower_http::trace::TraceLayer::new_for_http() + .make_span_with(tower_http::trace::DefaultMakeSpan::default().include_headers(true)), + ) + ) +} + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt::init(); + + let addr = std::net::SocketAddr::from(([127, 0, 0, 1], 3030)); + tracing::info!("listening on {}", addr); + axum::Server::bind(&addr) + .serve(app().into_make_service()) + .await + .unwrap(); +} + +#[cfg(test)] +mod tests { + use axum::{ + http::{Request, StatusCode}, + response::Response, + body::Body + }; + use tower::ServiceExt; // for `oneshot` + + async fn assert_res_ok_body(res: Response, expected_body: &[u8]) { + assert_eq!(res.status(), StatusCode::OK); + assert_eq!(hyper::body::to_bytes(res.into_body()).await.unwrap(), expected_body); + } + + #[tokio::test] + async fn test_hello() { + // no user-agent + let res = super::app() + .oneshot(Request::builder().uri("/hello/rust").body(Body::empty()).unwrap()) + .await + .unwrap(); + assert_res_ok_body(res, b"Hello rust").await; + + // with user-agent + let res = super::app() + .oneshot(Request::builder() + .uri("/hello/rust") + .header("user-agent", "TestBrowser 0.1") + .body(Body::empty()).unwrap()) + .await + .unwrap(); + assert_res_ok_body(res, b"Hello rust from TestBrowser 0.1").await; + } + + #[tokio::test] + async fn test_static_dir() { + let res = super::app() + .oneshot(Request::builder().uri("/dir/plain.txt").body(Body::empty()).unwrap()) + .await + .unwrap(); + assert_res_ok_body(res, b"Hello world! This is uncompressed text.\n").await; + + // transparent .gz lookup, without gzip transfer encoding + let res = super::app() + .oneshot(Request::builder() + .uri("/dir/dir1/optzip.txt") + .header("accept-encoding", "deflate") + .body(Body::empty()).unwrap()) + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK); + // that returns the uncompressed file + assert_res_ok_body(res, b"This file is available uncompressed or compressed\n\ + AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\n").await; + + // transparent .gz lookup, with gzip transfer encoding + let res = super::app() + .oneshot(Request::builder() + .uri("/dir/dir1/optzip.txt") + .header("accept-encoding", "deflate, gzip") + .body(Body::empty()).unwrap()) + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK); + let res_bytes: &[u8] = &hyper::body::to_bytes(res.into_body()).await.unwrap(); + // that returns the compressed file + assert_eq!(res_bytes.len(), 63); // file size of ../static/dir1/optzip.txt.gz + assert_eq!(res_bytes[0], 31); + } +} diff --git a/concepts/src/lib.rs b/concepts/src/lib.rs index 5f1feea..f2b1c10 100644 --- a/concepts/src/lib.rs +++ b/concepts/src/lib.rs @@ -1,11 +1,10 @@ +use std::collections::HashMap; use std::fs::File; use std::io::prelude::*; -use std::collections::HashMap; pub fn read_file(path: &str) -> Result { let mut s = String::new(); - File::open(path)? - .read_to_string(&mut s)?; + File::open(path)?.read_to_string(&mut s)?; Ok(s) } @@ -71,7 +70,10 @@ where V: Copy, { pub fn new(calc: T) -> Cacher { - Cacher { calc, values: HashMap::new() } + Cacher { + calc, + values: HashMap::new(), + } } pub fn value(&mut self, arg: A) -> V { @@ -87,7 +89,7 @@ where } pub struct Counter5 { - count: u32 + count: u32, } impl Counter5 { @@ -157,7 +159,7 @@ trait State { struct Draft {} impl State for Draft { fn request_review(&self) -> Box { - Box::new(PendingReview {acks: 0}) + Box::new(PendingReview { acks: 0 }) } fn approve(&mut self) -> Box { @@ -176,14 +178,16 @@ struct PendingReview { impl State for PendingReview { fn request_review(&self) -> Box { - Box::new(Self {acks: self.acks}) + Box::new(Self { acks: self.acks }) } fn approve(&mut self) -> Box { if self.acks >= 1 { Box::new(Published {}) } else { - Box::new(Self {acks: self.acks + 1}) + Box::new(Self { + acks: self.acks + 1, + }) } } @@ -218,7 +222,9 @@ pub struct TPost { impl TPost { pub fn new() -> TPostDraft { - TPostDraft {content: String::new()} + TPostDraft { + content: String::new(), + } } pub fn content(&self) -> &str { @@ -236,7 +242,9 @@ impl TPostDraft { } pub fn request_review(self) -> TPostReview { - TPostReview {content: self.content} + TPostReview { + content: self.content, + } } } @@ -246,10 +254,14 @@ pub struct TPostReview { impl TPostReview { pub fn approve(self) -> TPost { - TPost {content: self.content} + TPost { + content: self.content, + } } pub fn reject(self) -> TPostDraft { - TPostDraft {content: self.content} + TPostDraft { + content: self.content, + } } } diff --git a/concepts/src/main.rs b/concepts/src/main.rs index 4b6df64..be26bdb 100644 --- a/concepts/src/main.rs +++ b/concepts/src/main.rs @@ -1,10 +1,10 @@ -mod word_utils; mod lib; +mod word_utils; use std::collections::HashMap; -use std::io::{prelude::*, ErrorKind}; use std::fs::{self, File}; -use std::{thread, time, sync}; +use std::io::{prelude::*, ErrorKind}; +use std::{sync, thread, time}; use lib::*; use word_utils::{first_word, second_word}; @@ -15,7 +15,10 @@ fn test_strings() { println!("second word: '{}'", second_word(&s).unwrap()); let s2 = "hello dude blah"; - println!("second word of single: '{}'", second_word(s2).unwrap_or("(none)")); + println!( + "second word of single: '{}'", + second_word(s2).unwrap_or("(none)") + ); match second_word(s2) { Some(w) => println!("match: second word of '{}' exists: {}", s2, w), @@ -66,18 +69,25 @@ fn test_hashmaps() { println!("scores after doubling: {:?}", scores); // double scores of immutable hashmap (rebuild it) - let collect_scores: HashMap<_, _> = collect_scores.into_iter() + let collect_scores: HashMap<_, _> = collect_scores + .into_iter() .map(|(k, v)| (k, 2 * v)) .collect(); - println!("collect_scores after rebuilding with doubling: {:?}", collect_scores); + println!( + "collect_scores after rebuilding with doubling: {:?}", + collect_scores + ); } fn test_files() { if let Ok(mut f) = File::open("Cargo.toml") { let mut contents = String::new(); match f.read_to_string(&mut contents) { - Ok(len) => println!("successfully opened Cargo.toml: {:?}, contents {} bytes:\n{}\n----------", f, len, contents), - Err(e) => panic!("could not read file: {:?}", e) + Ok(len) => println!( + "successfully opened Cargo.toml: {:?}, contents {} bytes:\n{}\n----------", + f, len, contents + ), + Err(e) => panic!("could not read file: {:?}", e), } } else { println!("could not open Cargo.toml"); @@ -102,13 +112,13 @@ fn test_files() { // using the '?' operator match read_file("Cargo.toml") { Ok(s) => println!("Cargo.toml contents:\n{}\n-------------", s), - Err(e) => println!("Could not open Cargo.toml: {:?}", e) + Err(e) => println!("Could not open Cargo.toml: {:?}", e), } // using std API match fs::read_to_string("Cargo.toml") { Ok(s) => println!("Cargo.toml contents:\n{}\n-------------", s), - Err(e) => println!("Could not open Cargo.toml: {:?}", e) + Err(e) => println!("Could not open Cargo.toml: {:?}", e), } } @@ -125,7 +135,10 @@ fn test_generics() { println!("str_list: {:?}", str_list); let string_list = vec!["aaaa".to_string(), "xxxxx".to_string(), "ffff".to_string()]; - println!("largest string (with cloning): {}", largest_clone(&string_list)); + println!( + "largest string (with cloning): {}", + largest_clone(&string_list) + ); println!("largest string (with ref): {}", largest_ref(&string_list)); println!("string_list: {:?}", string_list); @@ -144,18 +157,36 @@ fn test_closures() { 2 * x }); - println!("1st int call for value 1: {}", expensive_int_result.value(1)); - println!("2nd int call for value 1: {}", expensive_int_result.value(1)); - println!("1st int call for value 2: {}", expensive_int_result.value(2)); + println!( + "1st int call for value 1: {}", + expensive_int_result.value(1) + ); + println!( + "2nd int call for value 1: {}", + expensive_int_result.value(1) + ); + println!( + "1st int call for value 2: {}", + expensive_int_result.value(2) + ); let mut expensive_str_result = Cacher::new(|x: &str| { println!("calculating expensive str result for {}", x); x.len() }); - println!("1st int call for value abc: {}", expensive_str_result.value("abc")); - println!("2nd int call for value abc: {}", expensive_str_result.value("abc")); - println!("1st int call for value defg: {}", expensive_str_result.value("defg")); + println!( + "1st int call for value abc: {}", + expensive_str_result.value("abc") + ); + println!( + "2nd int call for value abc: {}", + expensive_str_result.value("abc") + ); + println!( + "1st int call for value defg: {}", + expensive_str_result.value("defg") + ); } fn test_iterators() { @@ -262,13 +293,13 @@ fn test_dyn_traits() { post.request_review(); assert_eq!("", post.content()); - post.approve(); // first + post.approve(); // first assert_eq!("", post.content()); - post.approve(); // second + post.approve(); // second assert_eq!(text, post.content()); - post.reject(); // no-op + post.reject(); // no-op assert_eq!(text, post.content()); } diff --git a/gtk4-hello-world/src/main.rs b/gtk4-hello-world/src/main.rs index 4ebedc0..06d096c 100644 --- a/gtk4-hello-world/src/main.rs +++ b/gtk4-hello-world/src/main.rs @@ -13,7 +13,7 @@ fn build_ui(app: &Application) { .margin_end(12) .build(); - button.connect_clicked(move |button| { + button.connect_clicked(|button| { button.set_label("Hello world!"); }); diff --git a/static/README.md b/static/README.md new file mode 100644 index 0000000..41f2f35 --- /dev/null +++ b/static/README.md @@ -0,0 +1 @@ +This directory contains static files for serving through HTTP frameworks. diff --git a/static/dir1/optzip.txt b/static/dir1/optzip.txt new file mode 100644 index 0000000..cc31dba --- /dev/null +++ b/static/dir1/optzip.txt @@ -0,0 +1,2 @@ +This file is available uncompressed or compressed +AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA diff --git a/static/dir1/optzip.txt.gz b/static/dir1/optzip.txt.gz new file mode 100644 index 0000000..5742cd6 Binary files /dev/null and b/static/dir1/optzip.txt.gz differ diff --git a/static/onlycompressed.txt.gz b/static/onlycompressed.txt.gz new file mode 100644 index 0000000..8542d28 Binary files /dev/null and b/static/onlycompressed.txt.gz differ diff --git a/static/plain.txt b/static/plain.txt new file mode 100644 index 0000000..9300012 --- /dev/null +++ b/static/plain.txt @@ -0,0 +1 @@ +Hello world! This is uncompressed text. diff --git a/warp-server/Cargo.toml b/warp-server/Cargo.toml new file mode 100644 index 0000000..3f804c5 --- /dev/null +++ b/warp-server/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "warp-server" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +futures-util = { version = "0.3", default-features = false, features = ["sink"] } +tokio = { version = "1", features = ["full"] } +env_logger = "0.9" +log = "0.4" +warp = "0.3" diff --git a/warp-server/src/main.rs b/warp-server/src/main.rs new file mode 100644 index 0000000..06b289a --- /dev/null +++ b/warp-server/src/main.rs @@ -0,0 +1,179 @@ +use futures_util::{FutureExt, StreamExt, SinkExt}; +use warp::Filter; + +// GET /hello/warp => 200 OK with body "Hello, warp!" +async fn hello(name: String, agent: String) -> Result { + Ok(format!("Hello, {} from {}!", name, agent)) +} + +// websocat ws://127.0.0.1:3030/ws-echo +async fn ws_echo_connected(websocket: warp::ws::WebSocket) { + // echo all messages back + let (tx, rx) = websocket.split(); + rx.forward(tx).map(|result| { + if let Err(e) = result { + log::warn!("websocket error: {:?}", e); + } + }).await; +} + +// websocat ws://127.0.0.1:3030/ws-rev +async fn ws_rev_connected(websocket: warp::ws::WebSocket) { + // echo all messages back + tokio::task::spawn(async { + let (mut tx, mut rx) = websocket.split(); + while let Some(message) = rx.next().await { + let msg = match message { + Ok(msg) => msg, + Err(e) => { + log::error!("websocket error: {}", e); + break; + } + }; + log::info!("ws_rev_connected: got message: {:?}", msg); + + if msg.is_close() { + break; + } + if msg.is_text() { + let text = msg.to_str().unwrap(); + let rev = text.chars().rev().collect::(); + if let Err(e) = tx.send(warp::ws::Message::text(rev)).await { + // disconnected + log::info!("peer disconnected: {}", e); + break; + } + } + if msg.is_binary() { + let mut rev = msg.into_bytes(); + rev.reverse(); + if let Err(e) = tx.send(warp::ws::Message::binary(rev)).await { + // disconnected + log::info!("peer disconnected: {}", e); + break; + } + } + } + log::info!("ws_rev_connected ended"); + }); +} + +pub fn api() -> impl Filter + Clone { + let hello = warp::path!("hello" / String) + .and(warp::header::("user-agent")) + .and_then(hello); + + let ws_echo = warp::path("ws-echo") + .and(warp::ws()) + .map(|ws: warp::ws::Ws| { ws.on_upgrade(ws_echo_connected) }); + + let ws_rev = warp::path("ws-rev") + .and(warp::ws()) + .map(|ws: warp::ws::Ws| { ws.on_upgrade(ws_rev_connected) }); + + let static_dir = warp::path("dir") + .and(warp::fs::dir("../static")); + + hello + .or(ws_echo) + .or(ws_rev) + .or(static_dir) + .with(warp::log("warp-server")) +} + +#[tokio::main] +async fn main() { + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); + + warp::serve(api()) + .run(([127, 0, 0, 1], 3030)) + .await; +} + +#[cfg(test)] +mod tests { + #[tokio::test] + async fn test_hello() { + let res = warp::test::request() + .path("/hello/rust") + .header("user-agent", "TestBrowser 0.1") + .reply(&super::api()) + .await; + assert_eq!(res.status(), 200); + assert_eq!(res.body(), "Hello, rust from TestBrowser 0.1!"); + } + + #[tokio::test] + async fn test_ws_echo() { + let mut client = warp::test::ws() + .path("/ws-echo") + .handshake(super::api()) + .await + .expect("handshake failed"); + + // text + client.send_text("Hello").await; + let reply = client.recv().await.unwrap(); + assert_eq!(reply.to_str().unwrap(), "Hello"); + + // binary + let msg: Vec = vec![42, 99]; + client.send(warp::ws::Message::binary(msg.clone())).await; + let reply = client.recv().await.unwrap(); + assert_eq!(reply.as_bytes(), &msg); + + //close + client.send(warp::ws::Message::close()).await; + client.recv_closed().await.unwrap(); + } + + #[tokio::test] + async fn test_ws_rev() { + let mut client = warp::test::ws() + .path("/ws-rev") + .handshake(super::api()) + .await + .expect("handshake failed"); + + // text + client.send_text("Hello\n").await; + let reply = client.recv().await.unwrap(); + assert_eq!(reply.to_str().unwrap(), "\nolleH"); + + // binary + let msg: Vec = vec![42, 99]; + client.send(warp::ws::Message::binary(msg)).await; + let reply = client.recv().await.unwrap(); + assert_eq!(reply.as_bytes(), vec![99, 42]); + + //close + client.send(warp::ws::Message::close()).await; + client.recv_closed().await.unwrap(); + } + + #[tokio::test] + async fn test_static_dir() { + let res = warp::test::request() + .path("/dir/plain.txt") + .reply(&super::api()) + .await; + assert_eq!(res.status(), 200); + assert_eq!(res.body(), "Hello world! This is uncompressed text.\n"); + + // subdir + let res = warp::test::request() + .path("/dir/dir1/optzip.txt") + .reply(&super::api()) + .await; + assert_eq!(res.status(), 200); + assert_eq!(res.body(), "This file is available uncompressed or compressed\n\ + AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\n"); + + // fs::dir does not support transparent decompression + let res = warp::test::request() + .path("/dir/onlycompressed.txt") + .reply(&super::api()) + .await; + assert_eq!(res.status(), 404); + } +}