use std::fs;
-use std::io::prelude::*;
-use std::net::TcpListener;
-use std::net::TcpStream;
+use std::time::Duration;
-fn main() {
+use async_std::prelude::*;
+use async_std::io::{ Read, Write };
+use async_std::net::{ TcpListener };
+use async_std::task;
+use futures::stream::StreamExt;
+
+#[async_std::main]
+async fn main() {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
// Listen for incoming TCP connections on localhost port 7878
- let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
+ let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
- // Block forever, handling each request that arrives at this IP address
- for stream in listener.incoming() {
- let stream = stream.unwrap();
- handle_connection(stream);
- }
+ listener.incoming().for_each_concurrent(/* limit */ None, |tcpstream| async move {
+ let tcpstream = tcpstream.unwrap();
+ task::spawn(handle_connection(tcpstream));
+ }).await;
}
-fn handle_connection(mut stream: TcpStream) {
+async fn handle_connection(mut stream: impl Read + Write + Unpin) {
// Read the first 1024 bytes of data from the stream
let mut buffer = [0; 1024];
- assert!(stream.read(&mut buffer).unwrap() > 0);
-
- let get = b"GET / HTTP/1.1\r\n";
+ assert!(stream.read(&mut buffer).await.unwrap() > 0);
// Respond with greetings or a 404,
// depending on the data in the request
- let (status_line, filename) = if buffer.starts_with(get) {
+ let (status_line, filename) = if buffer.starts_with(b"GET / HTTP/1.1\r\n") {
("HTTP/1.1 200 OK", "index.html")
+ } else if buffer.starts_with(b"GET /sleep HTTP/1.1\r\n") {
+ task::sleep(Duration::from_secs(5)).await;
+ // sync version, to demonstrate concurrent async vs. parallel threads
+ // std::thread::sleep(Duration::from_secs(5));
+ ("HTTP/1.1 201 Sleep", "index.html")
} else {
("HTTP/1.1 404 NOT FOUND", "404.html")
};
// Write response back to the stream,
// and flush the stream to ensure the response is sent back to the client
let response = format!("{status_line}\r\n\r\n{contents}");
- stream.write_all(response.as_bytes()).unwrap();
- stream.flush().unwrap();
+ stream.write_all(response.as_bytes()).await.unwrap();
+ stream.flush().await.unwrap();
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ use std::cmp;
+ use std::pin::Pin;
+
+ use futures::io::Error;
+ use futures::task::{Context, Poll};
+
+ struct MockTcpStream {
+ read_data: Vec<u8>,
+ write_data: Vec<u8>,
+ }
+
+ impl Read for MockTcpStream {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ _: &mut Context,
+ buf: &mut [u8],
+ ) -> Poll<Result<usize, Error>> {
+ let size: usize = cmp::min(self.read_data.len(), buf.len());
+ buf[..size].copy_from_slice(&self.read_data[..size]);
+ Poll::Ready(Ok(size))
+ }
+ }
+
+ impl Write for MockTcpStream {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ _: &mut Context,
+ buf: &[u8],
+ ) -> Poll<Result<usize, Error>> {
+ self.write_data = Vec::from(buf);
+
+ Poll::Ready(Ok(buf.len()))
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Error>> {
+ Poll::Ready(Ok(()))
+ }
+ }
+
+ impl Unpin for MockTcpStream {}
+
+ #[async_std::test]
+ async fn test_handle_connection() {
+ let input_bytes = b"GET / HTTP/1.1\r\n";
+ let mut contents = vec![0u8; 1024];
+ contents[..input_bytes.len()].clone_from_slice(input_bytes);
+ let mut stream = MockTcpStream {
+ read_data: contents,
+ write_data: Vec::new(),
+ };
+
+ handle_connection(&mut stream).await;
+
+ let expected_contents = fs::read_to_string("index.html").unwrap();
+ let expected_response = format!("HTTP/1.1 200 OK\r\n\r\n{}", expected_contents);
+ assert!(stream.write_data.starts_with(expected_response.as_bytes()));
+ }
}