tiny_http/util/
task_pool.rs

1use std::collections::VecDeque;
2use std::sync::atomic::{AtomicUsize, Ordering};
3use std::sync::{Arc, Condvar, Mutex};
4use std::thread;
5use std::time::Duration;
6
7/// Manages a collection of threads.
8///
9/// A new thread is created every time all the existing threads are full.
10/// Any idle thread will automatically die after a few seconds.
11pub struct TaskPool {
12    sharing: Arc<Sharing>,
13}
14
15struct Sharing {
16    // list of the tasks to be done by worker threads
17    todo: Mutex<VecDeque<Box<dyn FnMut() + Send>>>,
18
19    // condvar that will be notified whenever a task is added to `todo`
20    condvar: Condvar,
21
22    // number of total worker threads running
23    active_tasks: AtomicUsize,
24
25    // number of idle worker threads
26    waiting_tasks: AtomicUsize,
27}
28
29/// Minimum number of active threads.
30static MIN_THREADS: usize = 4;
31
32struct Registration<'a> {
33    nb: &'a AtomicUsize,
34}
35
36impl<'a> Registration<'a> {
37    fn new(nb: &'a AtomicUsize) -> Registration<'a> {
38        nb.fetch_add(1, Ordering::Release);
39        Registration { nb }
40    }
41}
42
43impl<'a> Drop for Registration<'a> {
44    fn drop(&mut self) {
45        self.nb.fetch_sub(1, Ordering::Release);
46    }
47}
48
49impl TaskPool {
50    pub fn new() -> TaskPool {
51        let pool = TaskPool {
52            sharing: Arc::new(Sharing {
53                todo: Mutex::new(VecDeque::new()),
54                condvar: Condvar::new(),
55                active_tasks: AtomicUsize::new(0),
56                waiting_tasks: AtomicUsize::new(0),
57            }),
58        };
59
60        for _ in 0..MIN_THREADS {
61            pool.add_thread(None)
62        }
63
64        pool
65    }
66
67    /// Executes a function in a thread.
68    /// If no thread is available, spawns a new one.
69    pub fn spawn(&self, code: Box<dyn FnMut() + Send>) {
70        let mut queue = self.sharing.todo.lock().unwrap();
71
72        if self.sharing.waiting_tasks.load(Ordering::Acquire) == 0 {
73            self.add_thread(Some(code));
74        } else {
75            queue.push_back(code);
76            self.sharing.condvar.notify_one();
77        }
78    }
79
80    fn add_thread(&self, initial_fn: Option<Box<dyn FnMut() + Send>>) {
81        let sharing = self.sharing.clone();
82
83        thread::spawn(move || {
84            let sharing = sharing;
85            let _active_guard = Registration::new(&sharing.active_tasks);
86
87            if let Some(mut f) = initial_fn {
88                f();
89            }
90
91            loop {
92                let mut task: Box<dyn FnMut() + Send> = {
93                    let mut todo = sharing.todo.lock().unwrap();
94
95                    let task;
96                    loop {
97                        if let Some(poped_task) = todo.pop_front() {
98                            task = poped_task;
99                            break;
100                        }
101                        let _waiting_guard = Registration::new(&sharing.waiting_tasks);
102
103                        let received =
104                            if sharing.active_tasks.load(Ordering::Acquire) <= MIN_THREADS {
105                                todo = sharing.condvar.wait(todo).unwrap();
106                                true
107                            } else {
108                                let (new_lock, waitres) = sharing
109                                    .condvar
110                                    .wait_timeout(todo, Duration::from_millis(5000))
111                                    .unwrap();
112                                todo = new_lock;
113                                !waitres.timed_out()
114                            };
115
116                        if !received && todo.is_empty() {
117                            return;
118                        }
119                    }
120
121                    task
122                };
123
124                task();
125            }
126        });
127    }
128}
129
130impl Drop for TaskPool {
131    fn drop(&mut self) {
132        self.sharing
133            .active_tasks
134            .store(999_999_999, Ordering::Release);
135        self.sharing.condvar.notify_all();
136    }
137}