From 2a3abc5cfab55a80f35438d195a2151478aa4f93 Mon Sep 17 00:00:00 2001 From: Martin Pitt Date: Fri, 16 Sep 2022 08:47:48 +0200 Subject: [PATCH] async-http: Serve requests concurrently Move to asyncstd TCPListener and futures Stream, so that the incoming loop does not serialize/block requests any more. This is still single-threaded. That can be demonstrated with replacing the async sleep with a sync one (commented out). Then /sleep will block other requests again. --- async-http/Cargo.toml | 1 + async-http/src/main.rs | 25 +++++++++++++------------ 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/async-http/Cargo.toml b/async-http/Cargo.toml index 92c94f5..30b4930 100644 --- a/async-http/Cargo.toml +++ b/async-http/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +futures = "0.3" log = "0.4" env_logger = "0.9" diff --git a/async-http/src/main.rs b/async-http/src/main.rs index c36be0a..cf0b46d 100644 --- a/async-http/src/main.rs +++ b/async-http/src/main.rs @@ -1,29 +1,28 @@ use std::fs; -use std::io::prelude::*; -use std::net::{ TcpListener, TcpStream }; use std::time::Duration; +use async_std::prelude::*; +use async_std::net::{ TcpListener, TcpStream }; use async_std::task; +use futures::stream::StreamExt; #[async_std::main] async fn main() { env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); // Listen for incoming TCP connections on localhost port 7878 - let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); + let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap(); - // Block forever, handling each request that arrives at this IP address - for stream in listener.incoming() { - let stream = stream.unwrap(); - // not concurrent - handle_connection(stream).await; - } + listener.incoming().for_each_concurrent(/* limit */ None, |tcpstream| async move { + let tcpstream = tcpstream.unwrap(); + handle_connection(tcpstream).await; + }).await; } async fn handle_connection(mut stream: TcpStream) { // Read the first 1024 bytes of data from the stream let mut buffer = [0; 1024]; - assert!(stream.read(&mut buffer).unwrap() > 0); + assert!(stream.read(&mut buffer).await.unwrap() > 0); // Respond with greetings or a 404, // depending on the data in the request @@ -31,6 +30,8 @@ async fn handle_connection(mut stream: TcpStream) { ("HTTP/1.1 200 OK", "index.html") } else if buffer.starts_with(b"GET /sleep HTTP/1.1\r\n") { task::sleep(Duration::from_secs(5)).await; + // sync version, to demonstrate concurrent async vs. parallel threads + // std::thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 201 Sleep", "index.html") } else { ("HTTP/1.1 404 NOT FOUND", "404.html") @@ -41,6 +42,6 @@ async fn handle_connection(mut stream: TcpStream) { // Write response back to the stream, // and flush the stream to ensure the response is sent back to the client let response = format!("{status_line}\r\n\r\n{contents}"); - stream.write_all(response.as_bytes()).unwrap(); - stream.flush().unwrap(); + stream.write_all(response.as_bytes()).await.unwrap(); + stream.flush().await.unwrap(); } -- 2.39.2