tiny_http/util/
messages_queue.rs
1use std::collections::VecDeque;
2use std::sync::{Arc, Condvar, Mutex};
3use std::time::{Duration, Instant};
4
5enum Control<T> {
6 Elem(T),
7 Unblock,
8}
9
10pub struct MessagesQueue<T>
11where
12 T: Send,
13{
14 queue: Mutex<VecDeque<Control<T>>>,
15 condvar: Condvar,
16}
17
18impl<T> MessagesQueue<T>
19where
20 T: Send,
21{
22 pub fn with_capacity(capacity: usize) -> Arc<MessagesQueue<T>> {
23 Arc::new(MessagesQueue {
24 queue: Mutex::new(VecDeque::with_capacity(capacity)),
25 condvar: Condvar::new(),
26 })
27 }
28
29 pub fn push(&self, value: T) {
31 let mut queue = self.queue.lock().unwrap();
32 queue.push_back(Control::Elem(value));
33 self.condvar.notify_one();
34 }
35
36 pub fn unblock(&self) {
38 let mut queue = self.queue.lock().unwrap();
39 queue.push_back(Control::Unblock);
40 self.condvar.notify_one();
41 }
42
43 pub fn pop(&self) -> Option<T> {
46 let mut queue = self.queue.lock().unwrap();
47
48 loop {
49 match queue.pop_front() {
50 Some(Control::Elem(value)) => return Some(value),
51 Some(Control::Unblock) => return None,
52 None => (),
53 }
54
55 queue = self.condvar.wait(queue).unwrap();
56 }
57 }
58
59 pub fn try_pop(&self) -> Option<T> {
61 let mut queue = self.queue.lock().unwrap();
62 match queue.pop_front() {
63 Some(Control::Elem(value)) => Some(value),
64 Some(Control::Unblock) | None => None,
65 }
66 }
67
68 pub fn pop_timeout(&self, timeout: Duration) -> Option<T> {
72 let mut queue = self.queue.lock().unwrap();
73 let mut duration = timeout;
74 loop {
75 match queue.pop_front() {
76 Some(Control::Elem(value)) => return Some(value),
77 Some(Control::Unblock) => return None,
78 None => (),
79 }
80 let now = Instant::now();
81 let (_queue, result) = self.condvar.wait_timeout(queue, timeout).unwrap();
82 queue = _queue;
83 let sleep_time = now.elapsed();
84 duration = if duration > sleep_time {
85 duration - sleep_time
86 } else {
87 Duration::from_millis(0)
88 };
89 if result.timed_out()
90 || (duration.as_secs() == 0 && duration.subsec_nanos() < 1_000_000)
91 {
92 return None;
93 }
94 }
95 }
96}