From 4b27e7395888e450ce52e74bf259715105f3071c Mon Sep 17 00:00:00 2001 From: Martin Pitt Date: Sun, 19 Sep 2021 11:48:51 +0200 Subject: [PATCH] simple-http: Implement ThreadPool cleanup Introduce Terminate message. Test this by only accepting two requests and then letting `pool` go out of scope. --- simple-http/src/bin/main.rs | 2 +- simple-http/src/lib.rs | 31 +++++++++++++++++++++++++------ 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/simple-http/src/bin/main.rs b/simple-http/src/bin/main.rs index ff3af27..4d642aa 100644 --- a/simple-http/src/bin/main.rs +++ b/simple-http/src/bin/main.rs @@ -65,7 +65,7 @@ fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); let pool = ThreadPool::new(4); - for stream in listener.incoming() { + for stream in listener.incoming() /* for testing cleanup: .take(2) */ { let stream = stream.unwrap(); pool.execute(|| handle_connection(stream)); } diff --git a/simple-http/src/lib.rs b/simple-http/src/lib.rs index 4b7d736..3c5caa6 100644 --- a/simple-http/src/lib.rs +++ b/simple-http/src/lib.rs @@ -3,17 +3,32 @@ use std::sync::{Arc, mpsc, Mutex}; type Job = Box; +enum Message { + NewJob(Job), + Terminate, +} + struct Worker { id: usize, thread: Option>, } impl Worker { - fn new(id: usize, receiver: Arc>>) -> Worker { + fn new(id: usize, receiver: Arc>>) -> Worker { let thread = Some(thread::spawn(move || loop { - let job = receiver.lock().unwrap().recv().unwrap(); - println!("Worker {} got a job, executing", id); - job(); + let message = receiver.lock().unwrap().recv().unwrap(); + + match message { + Message::NewJob(job) => { + println!("Worker {} got a job, executing", id); + job(); + }, + + Message::Terminate => { + println!("Worker {} got terminated", id); + break; + } + } })); Worker { id, thread } } @@ -21,7 +36,7 @@ impl Worker { pub struct ThreadPool { workers: Vec, - sender: mpsc::Sender, + sender: mpsc::Sender, } impl ThreadPool { @@ -47,12 +62,16 @@ impl ThreadPool { pub fn execute(&self, f: F) where F: FnOnce() + Send + 'static { - self.sender.send(Box::new(f)).unwrap(); + self.sender.send(Message::NewJob(Box::new(f))).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { + for _ in &self.workers { + self.sender.send(Message::Terminate).unwrap(); + } + for worker in &mut self.workers { println!("Shutting down worker {}", worker.id); -- 2.39.5