X-Git-Url: https://piware.de/gitweb/?p=learn-rust.git;a=blobdiff_plain;f=async-http%2Fsrc%2Fmain.rs;h=1a8dddb2ed8f7627dece48d51e14d282218ac845;hp=d9e3a26bfaf8c22af808f51a61e1f50b09534a17;hb=b68e0d3408450f97832e8a1fcbc673cf3ecbd15f;hpb=700074ce7c06235b6ae17e2a165301f2fe6374ed diff --git a/async-http/src/main.rs b/async-http/src/main.rs index d9e3a26..1a8dddb 100644 --- a/async-http/src/main.rs +++ b/async-http/src/main.rs @@ -1,33 +1,39 @@ use std::fs; -use std::io::prelude::*; -use std::net::TcpListener; -use std::net::TcpStream; +use std::time::Duration; -fn main() { +use async_std::prelude::*; +use async_std::io::{ Read, Write }; +use async_std::net::{ TcpListener }; +use async_std::task; +use futures::stream::StreamExt; + +#[async_std::main] +async fn main() { env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); // Listen for incoming TCP connections on localhost port 7878 - let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); - - // Block forever, handling each request that arrives at this IP address - for stream in listener.incoming() { - let stream = stream.unwrap(); + let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap(); - handle_connection(stream); - } + listener.incoming().for_each_concurrent(/* limit */ None, |tcpstream| async move { + let tcpstream = tcpstream.unwrap(); + task::spawn(handle_connection(tcpstream)); + }).await; } -fn handle_connection(mut stream: TcpStream) { +async fn handle_connection(mut stream: impl Read + Write + Unpin) { // Read the first 1024 bytes of data from the stream let mut buffer = [0; 1024]; - stream.read(&mut buffer).unwrap(); - - let get = b"GET / HTTP/1.1\r\n"; + assert!(stream.read(&mut buffer).await.unwrap() > 0); // Respond with greetings or a 404, // depending on the data in the request - let (status_line, filename) = if buffer.starts_with(get) { + let (status_line, filename) = if buffer.starts_with(b"GET / HTTP/1.1\r\n") { ("HTTP/1.1 200 OK", "index.html") + } else if buffer.starts_with(b"GET /sleep HTTP/1.1\r\n") { + task::sleep(Duration::from_secs(5)).await; + // sync version, to demonstrate concurrent async vs. parallel threads + // std::thread::sleep(Duration::from_secs(5)); + ("HTTP/1.1 201 Sleep", "index.html") } else { ("HTTP/1.1 404 NOT FOUND", "404.html") }; @@ -37,6 +43,73 @@ fn handle_connection(mut stream: TcpStream) { // Write response back to the stream, // and flush the stream to ensure the response is sent back to the client let response = format!("{status_line}\r\n\r\n{contents}"); - stream.write_all(response.as_bytes()).unwrap(); - stream.flush().unwrap(); + stream.write_all(response.as_bytes()).await.unwrap(); + stream.flush().await.unwrap(); +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::cmp; + use std::pin::Pin; + + use futures::io::Error; + use futures::task::{Context, Poll}; + + struct MockTcpStream { + read_data: Vec, + write_data: Vec, + } + + impl Read for MockTcpStream { + fn poll_read( + self: Pin<&mut Self>, + _: &mut Context, + buf: &mut [u8], + ) -> Poll> { + let size: usize = cmp::min(self.read_data.len(), buf.len()); + buf[..size].copy_from_slice(&self.read_data[..size]); + Poll::Ready(Ok(size)) + } + } + + impl Write for MockTcpStream { + fn poll_write( + mut self: Pin<&mut Self>, + _: &mut Context, + buf: &[u8], + ) -> Poll> { + self.write_data = Vec::from(buf); + + Poll::Ready(Ok(buf.len())) + } + + fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + } + + impl Unpin for MockTcpStream {} + + #[async_std::test] + async fn test_handle_connection() { + let input_bytes = b"GET / HTTP/1.1\r\n"; + let mut contents = vec![0u8; 1024]; + contents[..input_bytes.len()].clone_from_slice(input_bytes); + let mut stream = MockTcpStream { + read_data: contents, + write_data: Vec::new(), + }; + + handle_connection(&mut stream).await; + + let expected_contents = fs::read_to_string("index.html").unwrap(); + let expected_response = format!("HTTP/1.1 200 OK\r\n\r\n{}", expected_contents); + assert!(stream.write_data.starts_with(expected_response.as_bytes())); + } }