rosrust/util/
lossy_channel.rs

1use crate::util::killable_channel::{channel, KillMode, Killer, Receiver, SendMode, Sender};
2use crate::util::FAILED_TO_LOCK;
3use crossbeam::channel;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex};
6
7#[allow(clippy::mutex_atomic)]
8pub fn lossy_channel<T>(queue_size: usize) -> (LossySender<T>, LossyReceiver<T>) {
9    let (killer, data_tx, receiver) = channel(SendMode::Unbounded, KillMode::Async);
10    let is_open = Arc::new(AtomicBool::new(true));
11    let queue_size = Arc::new(Mutex::new(queue_size));
12    let sender = LossySender {
13        data_tx,
14        data_rx: receiver.data_rx.clone(),
15        killer,
16        is_open,
17        queue_size,
18    };
19    (sender, receiver)
20}
21
22#[derive(Clone)]
23pub struct LossySender<T> {
24    data_tx: Sender<T>,
25    data_rx: channel::Receiver<T>,
26    killer: Killer,
27    is_open: Arc<AtomicBool>,
28    pub queue_size: Arc<Mutex<usize>>,
29}
30
31impl<T> LossySender<T> {
32    pub fn try_send(&self, msg: T) -> Result<(), channel::TrySendError<T>> {
33        if !self.is_open.load(Ordering::SeqCst) {
34            return Err(channel::TrySendError::Disconnected(msg));
35        }
36        self.data_tx.try_send(msg)?;
37        self.remove_extra_data();
38        Ok(())
39    }
40
41    pub fn close(&mut self) -> Result<(), channel::SendError<()>> {
42        self.is_open.store(false, Ordering::SeqCst);
43        self.killer.send()
44    }
45
46    fn remove_extra_data(&self) {
47        let queue_size: usize = *self.queue_size.lock().expect(FAILED_TO_LOCK);
48        while self.data_rx.len() > queue_size {
49            if self.data_rx.try_recv().is_err() {
50                log::error!("Failed to remove excess data from message queue");
51                break;
52            }
53        }
54    }
55
56    pub fn set_queue_size(&self, queue_size: usize) {
57        *self.queue_size.lock().expect(FAILED_TO_LOCK) = queue_size;
58    }
59
60    pub fn set_queue_size_max(&self, queue_size: usize) {
61        let mut current_size = self.queue_size.lock().expect(FAILED_TO_LOCK);
62        *current_size = current_size.max(queue_size);
63    }
64}
65
66pub type LossyReceiver<T> = Receiver<T>;