rosrust/api/
clock.rs

1use crate::util::FAILED_TO_LOCK;
2use crossbeam::sync::{Parker, Unparker};
3use ros_message::{Duration, Time};
4use std::cell::Cell;
5use std::cmp;
6use std::collections::BinaryHeap;
7use std::sync::{Arc, Mutex};
8use std::thread::sleep;
9use std::time::{Duration as StdDuration, SystemTime, UNIX_EPOCH};
10
11static BEFORE_EPOCH: &str = "Requested time is before UNIX epoch.";
12
13pub struct Delay {
14    clock: Arc<dyn Clock>,
15    delay: Duration,
16}
17
18impl Delay {
19    pub fn new(clock: Arc<dyn Clock>, delay: Duration) -> Self {
20        Self { clock, delay }
21    }
22
23    pub fn sleep(self) {
24        self.clock.sleep(self.delay);
25    }
26}
27
28pub struct Rate {
29    clock: Arc<dyn Clock>,
30    next: Cell<Time>,
31    delay: Duration,
32}
33
34impl Rate {
35    pub fn new(clock: Arc<dyn Clock>, delay: Duration) -> Rate {
36        let start = clock.now();
37        Rate {
38            clock,
39            next: Cell::new(start),
40            delay,
41        }
42    }
43
44    pub fn sleep(&self) {
45        let new_time = self.next.get() + self.delay;
46        self.next.set(new_time);
47        self.clock.wait_until(new_time);
48    }
49}
50
51pub trait Clock: Send + Sync {
52    fn now(&self) -> Time;
53    fn sleep(&self, d: Duration);
54    fn wait_until(&self, t: Time);
55    fn await_init(&self) {}
56}
57
58#[derive(Clone, Default)]
59pub struct RealClock {}
60
61impl Clock for RealClock {
62    #[inline]
63    fn now(&self) -> Time {
64        let time = SystemTime::now()
65            .duration_since(UNIX_EPOCH)
66            .expect(BEFORE_EPOCH);
67        Time {
68            sec: time.as_secs() as u32,
69            nsec: time.subsec_nanos(),
70        }
71    }
72
73    #[inline]
74    fn sleep(&self, d: Duration) {
75        if d < Duration::default() {
76            return;
77        }
78        sleep(StdDuration::new(d.sec as u64, d.nsec as u32));
79    }
80
81    #[inline]
82    fn wait_until(&self, t: Time) {
83        self.sleep(t - self.now());
84    }
85}
86
87struct Timeout {
88    timestamp: Time,
89    unparker: Unparker,
90}
91
92impl Drop for Timeout {
93    fn drop(&mut self) {
94        self.unparker.unpark();
95    }
96}
97
98impl cmp::PartialEq for Timeout {
99    fn eq(&self, other: &Self) -> bool {
100        self.timestamp == other.timestamp
101    }
102}
103
104impl cmp::PartialOrd for Timeout {
105    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
106        self.timestamp
107            .partial_cmp(&other.timestamp)
108            .map(cmp::Ordering::reverse)
109    }
110}
111
112impl cmp::Eq for Timeout {}
113
114impl cmp::Ord for Timeout {
115    fn cmp(&self, other: &Self) -> cmp::Ordering {
116        self.timestamp.cmp(&other.timestamp).reverse()
117    }
118}
119
120#[derive(Default)]
121pub struct SimData {
122    current: Time,
123    timeouts: BinaryHeap<Timeout>,
124}
125
126#[derive(Default)]
127pub struct SimulatedClock {
128    pub data: Mutex<SimData>,
129}
130
131impl SimulatedClock {
132    pub fn trigger(&self, time: Time) {
133        let mut data = self.data.lock().expect(FAILED_TO_LOCK);
134        data.current = time;
135        loop {
136            match data.timeouts.peek() {
137                None => break,
138                Some(next) if next.timestamp > data.current => break,
139                _ => {}
140            }
141            data.timeouts.pop();
142        }
143    }
144}
145
146impl Clock for SimulatedClock {
147    #[inline]
148    fn now(&self) -> Time {
149        self.data.lock().expect(FAILED_TO_LOCK).current
150    }
151
152    #[inline]
153    fn sleep(&self, d: Duration) {
154        if d.sec < 0 || d.nsec < 0 {
155            return;
156        }
157        let current = { self.data.lock().expect(FAILED_TO_LOCK).current };
158        self.wait_until(current + d);
159    }
160
161    #[inline]
162    fn wait_until(&self, timestamp: Time) {
163        let parker = Parker::new();
164        let unparker = parker.unparker().clone();
165        {
166            self.data
167                .lock()
168                .expect(FAILED_TO_LOCK)
169                .timeouts
170                .push(Timeout {
171                    timestamp,
172                    unparker,
173                });
174        }
175        parker.park()
176    }
177
178    fn await_init(&self) {
179        if self.data.lock().expect(FAILED_TO_LOCK).current == Time::default() {
180            self.wait_until(Time::from_nanos(1));
181        }
182    }
183}