r2r/
time_source.rs
1#![cfg(r2r__rosgraph_msgs__msg__Clock)]
2
3use crate::{
4 builtin_interfaces::msg::Time,
5 error::*,
6 msg_types::{VoidPtr, WrappedNativeMsg},
7 rosgraph_msgs,
8 subscribers::{create_subscription_helper, Subscriber_},
9 Clock, ClockType, Node, QosProfile, WrappedTypesupport,
10};
11use r2r_rcl::{
12 rcl_node_t, rcl_subscription_fini, rcl_subscription_t, rcl_take, rcl_time_point_value_t,
13 rmw_message_info_t, RCL_RET_OK,
14};
15use std::sync::{Arc, Mutex, Weak};
16
17#[derive(Clone)]
27pub struct TimeSource {
28 inner: Arc<Mutex<TimeSource_>>,
29}
30
31pub(crate) struct TimeSource_ {
32 managed_clocks: Vec<Weak<Mutex<Clock>>>,
33 subscriber_state: TimeSourceSubscriberState,
34 simulated_time_enabled: bool,
35 last_time_msg: rcl_time_point_value_t,
36}
37
38#[derive(Copy, Clone)]
39enum TimeSourceSubscriberState {
40 None, Active,
42 ToBeDestroyed,
43}
44
45struct TimeSourceSubscriber {
46 subscriber_handle: rcl_subscription_t,
47 time_source: TimeSource,
48}
49
50impl TimeSource {
51 pub(crate) fn new() -> Self {
52 Self {
53 inner: Arc::new(Mutex::new(TimeSource_::new())),
54 }
55 }
56
57 pub fn attach_ros_clock(&self, clock: Weak<Mutex<Clock>>) -> Result<()> {
62 let mut time_source = self.inner.lock().unwrap();
63 let clock_valid = clock
64 .upgrade()
65 .map(|clock_arc| {
66 let mut clock = clock_arc.lock().unwrap();
67
68 if !matches!(clock.get_clock_type(), ClockType::RosTime) {
69 return Err(Error::ClockTypeNotRosTime);
70 }
71
72 if time_source.simulated_time_enabled {
73 clock.enable_ros_time_override(time_source.last_time_msg)?;
74 }
75
76 Ok(())
77 })
78 .transpose()?
79 .is_some();
80 if clock_valid {
81 time_source.managed_clocks.push(clock);
82 }
83 Ok(())
86 }
87
88 pub fn enable_sim_time(&self, node: &mut Node) -> Result<()> {
94 let mut inner = self.inner.lock().unwrap();
95 if inner.simulated_time_enabled {
96 return Ok(());
98 }
99
100 inner.simulated_time_enabled = true;
101
102 match inner.subscriber_state {
103 TimeSourceSubscriberState::None => {
104 let subscriber = TimeSourceSubscriber::new(&mut node.node_handle, self.clone())?;
105 node.subscribers.push(Box::new(subscriber));
106 inner.subscriber_state = TimeSourceSubscriberState::Active;
107 }
108 TimeSourceSubscriberState::ToBeDestroyed => {
109 inner.subscriber_state = TimeSourceSubscriberState::Active;
110 }
111 TimeSourceSubscriberState::Active => {
112 }
114 }
115
116 let initial_time = inner.last_time_msg;
117 inner.for_each_managed_clock(|clock| {
119 clock.enable_ros_time_override(initial_time).unwrap();
123 });
124
125 Ok(())
126 }
127
128 pub fn disable_sim_time(&self) {
133 let mut inner = self.inner.lock().unwrap();
134 if inner.simulated_time_enabled {
135 inner.simulated_time_enabled = false;
136
137 inner.for_each_managed_clock(|clock| {
139 clock.disable_ros_time_override().unwrap();
143 });
144 }
145
146 if matches!(inner.subscriber_state, TimeSourceSubscriberState::Active) {
147 inner.subscriber_state = TimeSourceSubscriberState::ToBeDestroyed;
148 }
149 }
150}
151
152impl TimeSource_ {
153 fn new() -> Self {
154 Self {
155 managed_clocks: vec![],
156 subscriber_state: TimeSourceSubscriberState::None,
157 simulated_time_enabled: false,
158 last_time_msg: 0,
159 }
160 }
161
162 fn for_each_managed_clock<F>(&mut self, mut f: F)
163 where
164 F: FnMut(&mut Clock),
165 {
166 self.managed_clocks.retain(|weak_clock| {
167 let Some(clock_arc) = weak_clock.upgrade() else {
168 return false;
170 };
171
172 let mut clock = clock_arc.lock().unwrap();
173 f(&mut clock);
174
175 true
177 });
178 }
179
180 fn set_clock_time(&mut self, time_msg: Time) {
182 let time = time_msg.into();
183 self.last_time_msg = time;
184 self.for_each_managed_clock(|clock| {
185 clock.set_ros_time_override(time).unwrap()
189 });
190 }
191}
192
193impl TimeSourceSubscriber {
194 fn new(node_handle: &mut rcl_node_t, time_source: TimeSource) -> Result<TimeSourceSubscriber> {
195 let qos = QosProfile::default().keep_last(1).best_effort();
197
198 let subscriber = create_subscription_helper(
199 node_handle,
200 "/clock",
201 crate::rosgraph_msgs::msg::Clock::get_ts(),
202 qos,
203 )?;
204 Ok(Self {
205 subscriber_handle: subscriber,
206 time_source,
207 })
208 }
209}
210
211impl Subscriber_ for TimeSourceSubscriber {
212 fn handle(&self) -> &rcl_subscription_t {
213 &self.subscriber_handle
214 }
215
216 fn handle_incoming(&mut self) -> bool {
217 let mut msg_info = rmw_message_info_t::default(); let mut clock_msg = WrappedNativeMsg::<rosgraph_msgs::msg::Clock>::new();
220 let ret = unsafe {
221 rcl_take(
222 &self.subscriber_handle,
223 clock_msg.void_ptr_mut(),
224 &mut msg_info,
225 std::ptr::null_mut(),
226 )
227 };
228
229 let mut inner_time_source = self.time_source.inner.lock().unwrap();
230 if ret == RCL_RET_OK as i32 {
231 let msg = rosgraph_msgs::msg::Clock::from_native(&clock_msg);
232
233 inner_time_source.set_clock_time(msg.clock);
234 }
235
236 match inner_time_source.subscriber_state {
237 TimeSourceSubscriberState::Active => {
238 false
240 }
241 TimeSourceSubscriberState::ToBeDestroyed => {
242 inner_time_source.subscriber_state = TimeSourceSubscriberState::None;
243 true
245 }
246 TimeSourceSubscriberState::None => unreachable!(),
247 }
248 }
249
250 fn destroy(&mut self, node: &mut rcl_node_t) {
251 unsafe {
252 rcl_subscription_fini(&mut self.subscriber_handle, node);
253 }
254 }
255}