simple-http: Implement ThreadPool cleanup
authorMartin Pitt <martin@piware.de>
Sun, 19 Sep 2021 09:48:51 +0000 (11:48 +0200)
committerMartin Pitt <martin@piware.de>
Sun, 19 Sep 2021 09:48:51 +0000 (11:48 +0200)
Introduce Terminate message. Test this by only accepting two requests
and then letting `pool` go out of scope.

simple-http/src/bin/main.rs
simple-http/src/lib.rs

index ff3af27cc51c09985a0d6b999373a4de717d4d97..4d642aa2848b7a075e8561ca8b3a659d5a5111c3 100644 (file)
@@ -65,7 +65,7 @@ fn main() {
     let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
     let pool = ThreadPool::new(4);
 
-    for stream in listener.incoming() {
+    for stream in listener.incoming() /* for testing cleanup: .take(2) */ {
         let stream = stream.unwrap();
         pool.execute(|| handle_connection(stream));
     }
index 4b7d7362afa241b94f34a2e3166f6c44288eb90a..3c5caa679a3e4525d3147585dde045a859b92e7d 100644 (file)
@@ -3,17 +3,32 @@ use std::sync::{Arc, mpsc, Mutex};
 
 type Job = Box<dyn FnOnce() + Send + 'static>;
 
+enum Message {
+    NewJob(Job),
+    Terminate,
+}
+
 struct Worker {
     id: usize,
     thread: Option<thread::JoinHandle<()>>,
 }
 
 impl Worker {
-    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
+    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
         let thread = Some(thread::spawn(move || loop {
-            let job = receiver.lock().unwrap().recv().unwrap();
-            println!("Worker {} got a job, executing", id);
-            job();
+            let message = receiver.lock().unwrap().recv().unwrap();
+
+            match message {
+                Message::NewJob(job) => {
+                    println!("Worker {} got a job, executing", id);
+                    job();
+                },
+
+                Message::Terminate => {
+                    println!("Worker {} got terminated", id);
+                    break;
+                }
+            }
         }));
         Worker { id, thread }
     }
@@ -21,7 +36,7 @@ impl Worker {
 
 pub struct ThreadPool {
     workers: Vec<Worker>,
-    sender: mpsc::Sender<Job>,
+    sender: mpsc::Sender<Message>,
 }
 
 impl ThreadPool {
@@ -47,12 +62,16 @@ impl ThreadPool {
     pub fn execute<F>(&self, f: F)
     where F: FnOnce() + Send + 'static
     {
-        self.sender.send(Box::new(f)).unwrap();
+        self.sender.send(Message::NewJob(Box::new(f))).unwrap();
     }
 }
 
 impl Drop for ThreadPool {
     fn drop(&mut self) {
+        for _ in &self.workers {
+            self.sender.send(Message::Terminate).unwrap();
+        }
+
         for worker in &mut self.workers {
             println!("Shutting down worker {}", worker.id);