tiny_http/util/
messages_queue.rs

1use std::collections::VecDeque;
2use std::sync::{Arc, Condvar, Mutex};
3use std::time::{Duration, Instant};
4
5enum Control<T> {
6    Elem(T),
7    Unblock,
8}
9
10pub struct MessagesQueue<T>
11where
12    T: Send,
13{
14    queue: Mutex<VecDeque<Control<T>>>,
15    condvar: Condvar,
16}
17
18impl<T> MessagesQueue<T>
19where
20    T: Send,
21{
22    pub fn with_capacity(capacity: usize) -> Arc<MessagesQueue<T>> {
23        Arc::new(MessagesQueue {
24            queue: Mutex::new(VecDeque::with_capacity(capacity)),
25            condvar: Condvar::new(),
26        })
27    }
28
29    /// Pushes an element to the queue.
30    pub fn push(&self, value: T) {
31        let mut queue = self.queue.lock().unwrap();
32        queue.push_back(Control::Elem(value));
33        self.condvar.notify_one();
34    }
35
36    /// Unblock one thread stuck in pop loop.
37    pub fn unblock(&self) {
38        let mut queue = self.queue.lock().unwrap();
39        queue.push_back(Control::Unblock);
40        self.condvar.notify_one();
41    }
42
43    /// Pops an element. Blocks until one is available.
44    /// Returns None in case unblock() was issued.
45    pub fn pop(&self) -> Option<T> {
46        let mut queue = self.queue.lock().unwrap();
47
48        loop {
49            match queue.pop_front() {
50                Some(Control::Elem(value)) => return Some(value),
51                Some(Control::Unblock) => return None,
52                None => (),
53            }
54
55            queue = self.condvar.wait(queue).unwrap();
56        }
57    }
58
59    /// Tries to pop an element without blocking.
60    pub fn try_pop(&self) -> Option<T> {
61        let mut queue = self.queue.lock().unwrap();
62        match queue.pop_front() {
63            Some(Control::Elem(value)) => Some(value),
64            Some(Control::Unblock) | None => None,
65        }
66    }
67
68    /// Tries to pop an element without blocking
69    /// more than the specified timeout duration
70    /// or unblock() was issued
71    pub fn pop_timeout(&self, timeout: Duration) -> Option<T> {
72        let mut queue = self.queue.lock().unwrap();
73        let mut duration = timeout;
74        loop {
75            match queue.pop_front() {
76                Some(Control::Elem(value)) => return Some(value),
77                Some(Control::Unblock) => return None,
78                None => (),
79            }
80            let now = Instant::now();
81            let (_queue, result) = self.condvar.wait_timeout(queue, timeout).unwrap();
82            queue = _queue;
83            let sleep_time = now.elapsed();
84            duration = if duration > sleep_time {
85                duration - sleep_time
86            } else {
87                Duration::from_millis(0)
88            };
89            if result.timed_out()
90                || (duration.as_secs() == 0 && duration.subsec_nanos() < 1_000_000)
91            {
92                return None;
93            }
94        }
95    }
96}