tiny_http/util/
task_pool.rs
1use std::collections::VecDeque;
2use std::sync::atomic::{AtomicUsize, Ordering};
3use std::sync::{Arc, Condvar, Mutex};
4use std::thread;
5use std::time::Duration;
6
7pub struct TaskPool {
12 sharing: Arc<Sharing>,
13}
14
15struct Sharing {
16 todo: Mutex<VecDeque<Box<dyn FnMut() + Send>>>,
18
19 condvar: Condvar,
21
22 active_tasks: AtomicUsize,
24
25 waiting_tasks: AtomicUsize,
27}
28
29static MIN_THREADS: usize = 4;
31
32struct Registration<'a> {
33 nb: &'a AtomicUsize,
34}
35
36impl<'a> Registration<'a> {
37 fn new(nb: &'a AtomicUsize) -> Registration<'a> {
38 nb.fetch_add(1, Ordering::Release);
39 Registration { nb }
40 }
41}
42
43impl<'a> Drop for Registration<'a> {
44 fn drop(&mut self) {
45 self.nb.fetch_sub(1, Ordering::Release);
46 }
47}
48
49impl TaskPool {
50 pub fn new() -> TaskPool {
51 let pool = TaskPool {
52 sharing: Arc::new(Sharing {
53 todo: Mutex::new(VecDeque::new()),
54 condvar: Condvar::new(),
55 active_tasks: AtomicUsize::new(0),
56 waiting_tasks: AtomicUsize::new(0),
57 }),
58 };
59
60 for _ in 0..MIN_THREADS {
61 pool.add_thread(None)
62 }
63
64 pool
65 }
66
67 pub fn spawn(&self, code: Box<dyn FnMut() + Send>) {
70 let mut queue = self.sharing.todo.lock().unwrap();
71
72 if self.sharing.waiting_tasks.load(Ordering::Acquire) == 0 {
73 self.add_thread(Some(code));
74 } else {
75 queue.push_back(code);
76 self.sharing.condvar.notify_one();
77 }
78 }
79
80 fn add_thread(&self, initial_fn: Option<Box<dyn FnMut() + Send>>) {
81 let sharing = self.sharing.clone();
82
83 thread::spawn(move || {
84 let sharing = sharing;
85 let _active_guard = Registration::new(&sharing.active_tasks);
86
87 if let Some(mut f) = initial_fn {
88 f();
89 }
90
91 loop {
92 let mut task: Box<dyn FnMut() + Send> = {
93 let mut todo = sharing.todo.lock().unwrap();
94
95 let task;
96 loop {
97 if let Some(poped_task) = todo.pop_front() {
98 task = poped_task;
99 break;
100 }
101 let _waiting_guard = Registration::new(&sharing.waiting_tasks);
102
103 let received =
104 if sharing.active_tasks.load(Ordering::Acquire) <= MIN_THREADS {
105 todo = sharing.condvar.wait(todo).unwrap();
106 true
107 } else {
108 let (new_lock, waitres) = sharing
109 .condvar
110 .wait_timeout(todo, Duration::from_millis(5000))
111 .unwrap();
112 todo = new_lock;
113 !waitres.timed_out()
114 };
115
116 if !received && todo.is_empty() {
117 return;
118 }
119 }
120
121 task
122 };
123
124 task();
125 }
126 });
127 }
128}
129
130impl Drop for TaskPool {
131 fn drop(&mut self) {
132 self.sharing
133 .active_tasks
134 .store(999_999_999, Ordering::Release);
135 self.sharing.condvar.notify_all();
136 }
137}