rosrust/util/
lossy_channel.rs
1use crate::util::killable_channel::{channel, KillMode, Killer, Receiver, SendMode, Sender};
2use crate::util::FAILED_TO_LOCK;
3use crossbeam::channel;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex};
6
7#[allow(clippy::mutex_atomic)]
8pub fn lossy_channel<T>(queue_size: usize) -> (LossySender<T>, LossyReceiver<T>) {
9 let (killer, data_tx, receiver) = channel(SendMode::Unbounded, KillMode::Async);
10 let is_open = Arc::new(AtomicBool::new(true));
11 let queue_size = Arc::new(Mutex::new(queue_size));
12 let sender = LossySender {
13 data_tx,
14 data_rx: receiver.data_rx.clone(),
15 killer,
16 is_open,
17 queue_size,
18 };
19 (sender, receiver)
20}
21
22#[derive(Clone)]
23pub struct LossySender<T> {
24 data_tx: Sender<T>,
25 data_rx: channel::Receiver<T>,
26 killer: Killer,
27 is_open: Arc<AtomicBool>,
28 pub queue_size: Arc<Mutex<usize>>,
29}
30
31impl<T> LossySender<T> {
32 pub fn try_send(&self, msg: T) -> Result<(), channel::TrySendError<T>> {
33 if !self.is_open.load(Ordering::SeqCst) {
34 return Err(channel::TrySendError::Disconnected(msg));
35 }
36 self.data_tx.try_send(msg)?;
37 self.remove_extra_data();
38 Ok(())
39 }
40
41 pub fn close(&mut self) -> Result<(), channel::SendError<()>> {
42 self.is_open.store(false, Ordering::SeqCst);
43 self.killer.send()
44 }
45
46 fn remove_extra_data(&self) {
47 let queue_size: usize = *self.queue_size.lock().expect(FAILED_TO_LOCK);
48 while self.data_rx.len() > queue_size {
49 if self.data_rx.try_recv().is_err() {
50 log::error!("Failed to remove excess data from message queue");
51 break;
52 }
53 }
54 }
55
56 pub fn set_queue_size(&self, queue_size: usize) {
57 *self.queue_size.lock().expect(FAILED_TO_LOCK) = queue_size;
58 }
59
60 pub fn set_queue_size_max(&self, queue_size: usize) {
61 let mut current_size = self.queue_size.lock().expect(FAILED_TO_LOCK);
62 *current_size = current_size.max(queue_size);
63 }
64}
65
66pub type LossyReceiver<T> = Receiver<T>;