1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
#![cfg(r2r__rosgraph_msgs__msg__Clock)]

use crate::{
    builtin_interfaces::msg::Time,
    error::*,
    msg_types::{VoidPtr, WrappedNativeMsg},
    rosgraph_msgs,
    subscribers::{create_subscription_helper, Subscriber_},
    Clock, ClockType, Node, QosProfile, WrappedTypesupport,
};
use r2r_rcl::{
    rcl_node_t, rcl_subscription_fini, rcl_subscription_t, rcl_take, rcl_time_point_value_t,
    rmw_message_info_t, RCL_RET_OK,
};
use std::sync::{Arc, Mutex, Weak};

/// Provides time from `/clock` topic to attached ROS clocks
///
/// By default only clock used by ROS timers is attached and time from `/clock` topic is disabled.
///
/// The time from `/clock` topic can be activated by either of these:
/// - calling [`TimeSource::enable_sim_time`]
/// - having registered parameter handler and launching the node with parameter `use_sim_time:=true`
///
/// Similar to `rclcpp/time_source.hpp`
#[derive(Clone)]
pub struct TimeSource {
    inner: Arc<Mutex<TimeSource_>>,
}

pub(crate) struct TimeSource_ {
    managed_clocks: Vec<Weak<Mutex<Clock>>>,
    subscriber_state: TimeSourceSubscriberState,
    simulated_time_enabled: bool,
    last_time_msg: rcl_time_point_value_t,
}

#[derive(Copy, Clone)]
enum TimeSourceSubscriberState {
    None, // subscriber does not exist
    Active,
    ToBeDestroyed,
}

struct TimeSourceSubscriber {
    subscriber_handle: rcl_subscription_t,
    time_source: TimeSource,
}

impl TimeSource {
    pub(crate) fn new() -> Self {
        Self {
            inner: Arc::new(Mutex::new(TimeSource_::new())),
        }
    }

    /// Attach clock of type [`RosTime`](ClockType::RosTime) to the [`TimeSource`]
    ///
    /// If the simulated time is enabled the [`TimeSource`] will distribute simulated time
    /// to all attached clocks.
    pub fn attach_ros_clock(&self, clock: Weak<Mutex<Clock>>) -> Result<()> {
        let mut time_source = self.inner.lock().unwrap();
        let clock_valid = clock
            .upgrade()
            .map(|clock_arc| {
                let mut clock = clock_arc.lock().unwrap();

                if !matches!(clock.get_clock_type(), ClockType::RosTime) {
                    return Err(Error::ClockTypeNotRosTime);
                }

                if time_source.simulated_time_enabled {
                    clock.enable_ros_time_override(time_source.last_time_msg)?;
                }

                Ok(())
            })
            .transpose()?
            .is_some();
        if clock_valid {
            time_source.managed_clocks.push(clock);
        }
        // if upgrade is none no need to attach the clock since it is already dropped

        Ok(())
    }

    /// Enables usage of simulated time
    ///
    /// Simulated time is provided on topic `"/clock"` in the message [rosgraph_msgs::msg::Clock].
    ///
    /// See example: sim_time_publisher.rs
    pub fn enable_sim_time(&self, node: &mut Node) -> Result<()> {
        let mut inner = self.inner.lock().unwrap();
        if inner.simulated_time_enabled {
            // already enabled nothing to do
            return Ok(());
        }

        inner.simulated_time_enabled = true;

        match inner.subscriber_state {
            TimeSourceSubscriberState::None => {
                let subscriber = TimeSourceSubscriber::new(&mut node.node_handle, self.clone())?;
                node.subscribers.push(Box::new(subscriber));
                inner.subscriber_state = TimeSourceSubscriberState::Active;
            }
            TimeSourceSubscriberState::ToBeDestroyed => {
                inner.subscriber_state = TimeSourceSubscriberState::Active;
            }
            TimeSourceSubscriberState::Active => {
                // nothing to do
            }
        }

        let initial_time = inner.last_time_msg;
        // enable ros time override on all attached clocks
        inner.for_each_managed_clock(|clock| {
            // This should never panic:
            // This could only fail if the clock is invalid or not RosTime, but the clock is
            // attached only if it is valid clock with type RosTime.
            clock.enable_ros_time_override(initial_time).unwrap();
        });

        Ok(())
    }

    /// Disables usage of simulated time
    ///
    /// This will schedule removal of internal subscriber to the `"/clock"` topic on the next
    /// receipt of [`rosgraph_msgs::msg::Clock`] message.
    pub fn disable_sim_time(&self) {
        let mut inner = self.inner.lock().unwrap();
        if inner.simulated_time_enabled {
            inner.simulated_time_enabled = false;

            // disable ros time override on all attached clocks
            inner.for_each_managed_clock(|clock| {
                // This should never panic:
                // This could only fail if the clock is invalid or not RosTime, but the clock is
                // attached only if it is valid clock with type RosTime.
                clock.disable_ros_time_override().unwrap();
            });
        }

        if matches!(inner.subscriber_state, TimeSourceSubscriberState::Active) {
            inner.subscriber_state = TimeSourceSubscriberState::ToBeDestroyed;
        }
    }
}

impl TimeSource_ {
    fn new() -> Self {
        Self {
            managed_clocks: vec![],
            subscriber_state: TimeSourceSubscriberState::None,
            simulated_time_enabled: false,
            last_time_msg: 0,
        }
    }

    fn for_each_managed_clock<F>(&mut self, mut f: F)
    where
        F: FnMut(&mut Clock),
    {
        self.managed_clocks.retain(|weak_clock| {
            let Some(clock_arc) = weak_clock.upgrade() else {
                // clock can be deleted
                return false;
            };

            let mut clock = clock_arc.lock().unwrap();
            f(&mut clock);

            // retain clock
            true
        });
    }

    // this is similar to internal rclcpp function `set_all_clocks` in `time_source.cpp`
    fn set_clock_time(&mut self, time_msg: Time) {
        let time = time_msg.into();
        self.last_time_msg = time;
        self.for_each_managed_clock(|clock| {
            // This should never panic:
            // This could only fail if the clock is invalid or not RosTime, but the clock is
            // attached only if it is valid RosTime clock.
            clock.set_ros_time_override(time).unwrap()
        });
    }
}

impl TimeSourceSubscriber {
    fn new(node_handle: &mut rcl_node_t, time_source: TimeSource) -> Result<TimeSourceSubscriber> {
        // The values are set based on default values in rclcpp
        let qos = QosProfile::default().keep_last(1).best_effort();

        let subscriber = create_subscription_helper(
            node_handle,
            "/clock",
            crate::rosgraph_msgs::msg::Clock::get_ts(),
            qos,
        )?;
        Ok(Self {
            subscriber_handle: subscriber,
            time_source,
        })
    }
}

impl Subscriber_ for TimeSourceSubscriber {
    fn handle(&self) -> &rcl_subscription_t {
        &self.subscriber_handle
    }

    fn handle_incoming(&mut self) -> bool {
        // update clock
        let mut msg_info = rmw_message_info_t::default(); // we dont care for now
        let mut clock_msg = WrappedNativeMsg::<rosgraph_msgs::msg::Clock>::new();
        let ret = unsafe {
            rcl_take(
                &self.subscriber_handle,
                clock_msg.void_ptr_mut(),
                &mut msg_info,
                std::ptr::null_mut(),
            )
        };

        let mut inner_time_source = self.time_source.inner.lock().unwrap();
        if ret == RCL_RET_OK as i32 {
            let msg = rosgraph_msgs::msg::Clock::from_native(&clock_msg);

            inner_time_source.set_clock_time(msg.clock);
        }

        match inner_time_source.subscriber_state {
            TimeSourceSubscriberState::Active => {
                // keep the subscriber
                false
            }
            TimeSourceSubscriberState::ToBeDestroyed => {
                inner_time_source.subscriber_state = TimeSourceSubscriberState::None;
                // destroy the subscriber
                true
            }
            TimeSourceSubscriberState::None => unreachable!(),
        }
    }

    fn destroy(&mut self, node: &mut rcl_node_t) {
        unsafe {
            rcl_subscription_fini(&mut self.subscriber_handle, node);
        }
    }
}