r2r/
time_source.rs

1#![cfg(r2r__rosgraph_msgs__msg__Clock)]
2
3use crate::{
4    builtin_interfaces::msg::Time,
5    error::*,
6    msg_types::{VoidPtr, WrappedNativeMsg},
7    rosgraph_msgs,
8    subscribers::{create_subscription_helper, Subscriber_},
9    Clock, ClockType, Node, QosProfile, WrappedTypesupport,
10};
11use r2r_rcl::{
12    rcl_node_t, rcl_subscription_fini, rcl_subscription_t, rcl_take, rcl_time_point_value_t,
13    rmw_message_info_t, RCL_RET_OK,
14};
15use std::sync::{Arc, Mutex, Weak};
16
17/// Provides time from `/clock` topic to attached ROS clocks
18///
19/// By default only clock used by ROS timers is attached and time from `/clock` topic is disabled.
20///
21/// The time from `/clock` topic can be activated by either of these:
22/// - calling [`TimeSource::enable_sim_time`]
23/// - having registered parameter handler and launching the node with parameter `use_sim_time:=true`
24///
25/// Similar to `rclcpp/time_source.hpp`
26#[derive(Clone)]
27pub struct TimeSource {
28    inner: Arc<Mutex<TimeSource_>>,
29}
30
31pub(crate) struct TimeSource_ {
32    managed_clocks: Vec<Weak<Mutex<Clock>>>,
33    subscriber_state: TimeSourceSubscriberState,
34    simulated_time_enabled: bool,
35    last_time_msg: rcl_time_point_value_t,
36}
37
38#[derive(Copy, Clone)]
39enum TimeSourceSubscriberState {
40    None, // subscriber does not exist
41    Active,
42    ToBeDestroyed,
43}
44
45struct TimeSourceSubscriber {
46    subscriber_handle: rcl_subscription_t,
47    time_source: TimeSource,
48}
49
50impl TimeSource {
51    pub(crate) fn new() -> Self {
52        Self {
53            inner: Arc::new(Mutex::new(TimeSource_::new())),
54        }
55    }
56
57    /// Attach clock of type [`RosTime`](ClockType::RosTime) to the [`TimeSource`]
58    ///
59    /// If the simulated time is enabled the [`TimeSource`] will distribute simulated time
60    /// to all attached clocks.
61    pub fn attach_ros_clock(&self, clock: Weak<Mutex<Clock>>) -> Result<()> {
62        let mut time_source = self.inner.lock().unwrap();
63        let clock_valid = clock
64            .upgrade()
65            .map(|clock_arc| {
66                let mut clock = clock_arc.lock().unwrap();
67
68                if !matches!(clock.get_clock_type(), ClockType::RosTime) {
69                    return Err(Error::ClockTypeNotRosTime);
70                }
71
72                if time_source.simulated_time_enabled {
73                    clock.enable_ros_time_override(time_source.last_time_msg)?;
74                }
75
76                Ok(())
77            })
78            .transpose()?
79            .is_some();
80        if clock_valid {
81            time_source.managed_clocks.push(clock);
82        }
83        // if upgrade is none no need to attach the clock since it is already dropped
84
85        Ok(())
86    }
87
88    /// Enables usage of simulated time
89    ///
90    /// Simulated time is provided on topic `"/clock"` in the message [rosgraph_msgs::msg::Clock].
91    ///
92    /// See example: sim_time_publisher.rs
93    pub fn enable_sim_time(&self, node: &mut Node) -> Result<()> {
94        let mut inner = self.inner.lock().unwrap();
95        if inner.simulated_time_enabled {
96            // already enabled nothing to do
97            return Ok(());
98        }
99
100        inner.simulated_time_enabled = true;
101
102        match inner.subscriber_state {
103            TimeSourceSubscriberState::None => {
104                let subscriber = TimeSourceSubscriber::new(&mut node.node_handle, self.clone())?;
105                node.subscribers.push(Box::new(subscriber));
106                inner.subscriber_state = TimeSourceSubscriberState::Active;
107            }
108            TimeSourceSubscriberState::ToBeDestroyed => {
109                inner.subscriber_state = TimeSourceSubscriberState::Active;
110            }
111            TimeSourceSubscriberState::Active => {
112                // nothing to do
113            }
114        }
115
116        let initial_time = inner.last_time_msg;
117        // enable ros time override on all attached clocks
118        inner.for_each_managed_clock(|clock| {
119            // This should never panic:
120            // This could only fail if the clock is invalid or not RosTime, but the clock is
121            // attached only if it is valid clock with type RosTime.
122            clock.enable_ros_time_override(initial_time).unwrap();
123        });
124
125        Ok(())
126    }
127
128    /// Disables usage of simulated time
129    ///
130    /// This will schedule removal of internal subscriber to the `"/clock"` topic on the next
131    /// receipt of [`rosgraph_msgs::msg::Clock`] message.
132    pub fn disable_sim_time(&self) {
133        let mut inner = self.inner.lock().unwrap();
134        if inner.simulated_time_enabled {
135            inner.simulated_time_enabled = false;
136
137            // disable ros time override on all attached clocks
138            inner.for_each_managed_clock(|clock| {
139                // This should never panic:
140                // This could only fail if the clock is invalid or not RosTime, but the clock is
141                // attached only if it is valid clock with type RosTime.
142                clock.disable_ros_time_override().unwrap();
143            });
144        }
145
146        if matches!(inner.subscriber_state, TimeSourceSubscriberState::Active) {
147            inner.subscriber_state = TimeSourceSubscriberState::ToBeDestroyed;
148        }
149    }
150}
151
152impl TimeSource_ {
153    fn new() -> Self {
154        Self {
155            managed_clocks: vec![],
156            subscriber_state: TimeSourceSubscriberState::None,
157            simulated_time_enabled: false,
158            last_time_msg: 0,
159        }
160    }
161
162    fn for_each_managed_clock<F>(&mut self, mut f: F)
163    where
164        F: FnMut(&mut Clock),
165    {
166        self.managed_clocks.retain(|weak_clock| {
167            let Some(clock_arc) = weak_clock.upgrade() else {
168                // clock can be deleted
169                return false;
170            };
171
172            let mut clock = clock_arc.lock().unwrap();
173            f(&mut clock);
174
175            // retain clock
176            true
177        });
178    }
179
180    // this is similar to internal rclcpp function `set_all_clocks` in `time_source.cpp`
181    fn set_clock_time(&mut self, time_msg: Time) {
182        let time = time_msg.into();
183        self.last_time_msg = time;
184        self.for_each_managed_clock(|clock| {
185            // This should never panic:
186            // This could only fail if the clock is invalid or not RosTime, but the clock is
187            // attached only if it is valid RosTime clock.
188            clock.set_ros_time_override(time).unwrap()
189        });
190    }
191}
192
193impl TimeSourceSubscriber {
194    fn new(node_handle: &mut rcl_node_t, time_source: TimeSource) -> Result<TimeSourceSubscriber> {
195        // The values are set based on default values in rclcpp
196        let qos = QosProfile::default().keep_last(1).best_effort();
197
198        let subscriber = create_subscription_helper(
199            node_handle,
200            "/clock",
201            crate::rosgraph_msgs::msg::Clock::get_ts(),
202            qos,
203        )?;
204        Ok(Self {
205            subscriber_handle: subscriber,
206            time_source,
207        })
208    }
209}
210
211impl Subscriber_ for TimeSourceSubscriber {
212    fn handle(&self) -> &rcl_subscription_t {
213        &self.subscriber_handle
214    }
215
216    fn handle_incoming(&mut self) -> bool {
217        // update clock
218        let mut msg_info = rmw_message_info_t::default(); // we dont care for now
219        let mut clock_msg = WrappedNativeMsg::<rosgraph_msgs::msg::Clock>::new();
220        let ret = unsafe {
221            rcl_take(
222                &self.subscriber_handle,
223                clock_msg.void_ptr_mut(),
224                &mut msg_info,
225                std::ptr::null_mut(),
226            )
227        };
228
229        let mut inner_time_source = self.time_source.inner.lock().unwrap();
230        if ret == RCL_RET_OK as i32 {
231            let msg = rosgraph_msgs::msg::Clock::from_native(&clock_msg);
232
233            inner_time_source.set_clock_time(msg.clock);
234        }
235
236        match inner_time_source.subscriber_state {
237            TimeSourceSubscriberState::Active => {
238                // keep the subscriber
239                false
240            }
241            TimeSourceSubscriberState::ToBeDestroyed => {
242                inner_time_source.subscriber_state = TimeSourceSubscriberState::None;
243                // destroy the subscriber
244                true
245            }
246            TimeSourceSubscriberState::None => unreachable!(),
247        }
248    }
249
250    fn destroy(&mut self, node: &mut rcl_node_t) {
251        unsafe {
252            rcl_subscription_fini(&mut self.subscriber_handle, node);
253        }
254    }
255}