//! Small and simple ThreadPool implementation from the rust book. //! //! Provides an abstraction to manage multiple threads. //! Using this abstraction will give you following advantages: //! - Customizable pool size //! - Graceful shutdown of the threads use std::error::Error; use std::fmt; use std::sync::mpsc; use std::sync::Arc; use std::sync::Mutex; use std::thread; /// Error types #[derive(Debug)] pub enum ThreadPoolError { PoolCreationError, } impl fmt::Display for ThreadPoolError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { ThreadPoolError::PoolCreationError => f.write_str("ThreadPoolCreationError"), } } } impl Error for ThreadPoolError { fn description(&self) -> &str { match *self { ThreadPoolError::PoolCreationError => "Could not create a thread pool", } } } type Job = Box; enum Message { NewJob(Job), Terminate, } /// The thread pool containing all workers. /// /// # Examples /// ``` /// use poolth::ThreadPool; /// /// //creates a thread pool holding 4 threads. /// let pool = ThreadPool::new(4).unwrap(); /// pool.execute(println("Hello World")) /// ``` pub struct ThreadPool { workers: Vec, sender: mpsc::Sender, } impl ThreadPool { /// Construct a new ThreadPool using a custom size. /// /// # Examples /// ``` /// use poolth::ThreadPool; /// /// //creates a thread pool holding 4 threads. /// let pool = ThreadPool::new(4).unwrap(); /// ``` /// /// # Errors /// Creating a new ThreadPool will return an [ThreadPoolError::PoolCreationError] if the size <= 0. /// ``` /// use poolth::ThreadPool; /// /// // panics due the `unwrap()` call /// let pool = ThreadPool::new(0).unwrap(); /// // => thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: PoolCreationError' /// ``` /// pub fn new(size: usize) -> Result { if size <= 0 { return Err(ThreadPoolError::PoolCreationError); } let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers: Vec = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))) } Ok(ThreadPool { workers, sender }) } /// Run a closure in a worker of the pool. /// # Examples /// ``` /// use poolth::ThreadPool; /// /// let pool = ThreadPool::new(1).unwrap(); /// pool.execute(|| println!("Hello World")) /// ``` /// Output: `"Hello World"` /// pub fn execute(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(Message::NewJob(job)).unwrap(); } } /// For a graceful shutdown of the thread pool. impl Drop for ThreadPool { /// Sends a terminate message and wait until the worker is done. fn drop(&mut self) { for _ in &self.workers { self.sender.send(Message::Terminate).unwrap(); } println!("Shutting down all workers."); for worker in &mut self.workers { println!("Shutting down worker {}", worker.id); if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } } } } struct Worker { id: usize, thread: Option>, } impl Worker { fn new(id: usize, receiver: Arc>>) -> Worker { let thread = thread::spawn(move || loop { let message = receiver.lock().unwrap().recv().unwrap(); match message { Message::NewJob(job) => { println!("Worker {} got a job; executing.", id); job(); } Message::Terminate => { println!("Worker {} was told to terminate.", id); break; } } }); Worker { id, thread: Some(thread), } } }