r2r/time_source.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255
#![cfg(r2r__rosgraph_msgs__msg__Clock)]
use crate::{
builtin_interfaces::msg::Time,
error::*,
msg_types::{VoidPtr, WrappedNativeMsg},
rosgraph_msgs,
subscribers::{create_subscription_helper, Subscriber_},
Clock, ClockType, Node, QosProfile, WrappedTypesupport,
};
use r2r_rcl::{
rcl_node_t, rcl_subscription_fini, rcl_subscription_t, rcl_take, rcl_time_point_value_t,
rmw_message_info_t, RCL_RET_OK,
};
use std::sync::{Arc, Mutex, Weak};
/// Provides time from `/clock` topic to attached ROS clocks
///
/// By default only clock used by ROS timers is attached and time from `/clock` topic is disabled.
///
/// The time from `/clock` topic can be activated by either of these:
/// - calling [`TimeSource::enable_sim_time`]
/// - having registered parameter handler and launching the node with parameter `use_sim_time:=true`
///
/// Similar to `rclcpp/time_source.hpp`
#[derive(Clone)]
pub struct TimeSource {
inner: Arc<Mutex<TimeSource_>>,
}
pub(crate) struct TimeSource_ {
managed_clocks: Vec<Weak<Mutex<Clock>>>,
subscriber_state: TimeSourceSubscriberState,
simulated_time_enabled: bool,
last_time_msg: rcl_time_point_value_t,
}
#[derive(Copy, Clone)]
enum TimeSourceSubscriberState {
None, // subscriber does not exist
Active,
ToBeDestroyed,
}
struct TimeSourceSubscriber {
subscriber_handle: rcl_subscription_t,
time_source: TimeSource,
}
impl TimeSource {
pub(crate) fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(TimeSource_::new())),
}
}
/// Attach clock of type [`RosTime`](ClockType::RosTime) to the [`TimeSource`]
///
/// If the simulated time is enabled the [`TimeSource`] will distribute simulated time
/// to all attached clocks.
pub fn attach_ros_clock(&self, clock: Weak<Mutex<Clock>>) -> Result<()> {
let mut time_source = self.inner.lock().unwrap();
let clock_valid = clock
.upgrade()
.map(|clock_arc| {
let mut clock = clock_arc.lock().unwrap();
if !matches!(clock.get_clock_type(), ClockType::RosTime) {
return Err(Error::ClockTypeNotRosTime);
}
if time_source.simulated_time_enabled {
clock.enable_ros_time_override(time_source.last_time_msg)?;
}
Ok(())
})
.transpose()?
.is_some();
if clock_valid {
time_source.managed_clocks.push(clock);
}
// if upgrade is none no need to attach the clock since it is already dropped
Ok(())
}
/// Enables usage of simulated time
///
/// Simulated time is provided on topic `"/clock"` in the message [rosgraph_msgs::msg::Clock].
///
/// See example: sim_time_publisher.rs
pub fn enable_sim_time(&self, node: &mut Node) -> Result<()> {
let mut inner = self.inner.lock().unwrap();
if inner.simulated_time_enabled {
// already enabled nothing to do
return Ok(());
}
inner.simulated_time_enabled = true;
match inner.subscriber_state {
TimeSourceSubscriberState::None => {
let subscriber = TimeSourceSubscriber::new(&mut node.node_handle, self.clone())?;
node.subscribers.push(Box::new(subscriber));
inner.subscriber_state = TimeSourceSubscriberState::Active;
}
TimeSourceSubscriberState::ToBeDestroyed => {
inner.subscriber_state = TimeSourceSubscriberState::Active;
}
TimeSourceSubscriberState::Active => {
// nothing to do
}
}
let initial_time = inner.last_time_msg;
// enable ros time override on all attached clocks
inner.for_each_managed_clock(|clock| {
// This should never panic:
// This could only fail if the clock is invalid or not RosTime, but the clock is
// attached only if it is valid clock with type RosTime.
clock.enable_ros_time_override(initial_time).unwrap();
});
Ok(())
}
/// Disables usage of simulated time
///
/// This will schedule removal of internal subscriber to the `"/clock"` topic on the next
/// receipt of [`rosgraph_msgs::msg::Clock`] message.
pub fn disable_sim_time(&self) {
let mut inner = self.inner.lock().unwrap();
if inner.simulated_time_enabled {
inner.simulated_time_enabled = false;
// disable ros time override on all attached clocks
inner.for_each_managed_clock(|clock| {
// This should never panic:
// This could only fail if the clock is invalid or not RosTime, but the clock is
// attached only if it is valid clock with type RosTime.
clock.disable_ros_time_override().unwrap();
});
}
if matches!(inner.subscriber_state, TimeSourceSubscriberState::Active) {
inner.subscriber_state = TimeSourceSubscriberState::ToBeDestroyed;
}
}
}
impl TimeSource_ {
fn new() -> Self {
Self {
managed_clocks: vec![],
subscriber_state: TimeSourceSubscriberState::None,
simulated_time_enabled: false,
last_time_msg: 0,
}
}
fn for_each_managed_clock<F>(&mut self, mut f: F)
where
F: FnMut(&mut Clock),
{
self.managed_clocks.retain(|weak_clock| {
let Some(clock_arc) = weak_clock.upgrade() else {
// clock can be deleted
return false;
};
let mut clock = clock_arc.lock().unwrap();
f(&mut clock);
// retain clock
true
});
}
// this is similar to internal rclcpp function `set_all_clocks` in `time_source.cpp`
fn set_clock_time(&mut self, time_msg: Time) {
let time = time_msg.into();
self.last_time_msg = time;
self.for_each_managed_clock(|clock| {
// This should never panic:
// This could only fail if the clock is invalid or not RosTime, but the clock is
// attached only if it is valid RosTime clock.
clock.set_ros_time_override(time).unwrap()
});
}
}
impl TimeSourceSubscriber {
fn new(node_handle: &mut rcl_node_t, time_source: TimeSource) -> Result<TimeSourceSubscriber> {
// The values are set based on default values in rclcpp
let qos = QosProfile::default().keep_last(1).best_effort();
let subscriber = create_subscription_helper(
node_handle,
"/clock",
crate::rosgraph_msgs::msg::Clock::get_ts(),
qos,
)?;
Ok(Self {
subscriber_handle: subscriber,
time_source,
})
}
}
impl Subscriber_ for TimeSourceSubscriber {
fn handle(&self) -> &rcl_subscription_t {
&self.subscriber_handle
}
fn handle_incoming(&mut self) -> bool {
// update clock
let mut msg_info = rmw_message_info_t::default(); // we dont care for now
let mut clock_msg = WrappedNativeMsg::<rosgraph_msgs::msg::Clock>::new();
let ret = unsafe {
rcl_take(
&self.subscriber_handle,
clock_msg.void_ptr_mut(),
&mut msg_info,
std::ptr::null_mut(),
)
};
let mut inner_time_source = self.time_source.inner.lock().unwrap();
if ret == RCL_RET_OK as i32 {
let msg = rosgraph_msgs::msg::Clock::from_native(&clock_msg);
inner_time_source.set_clock_time(msg.clock);
}
match inner_time_source.subscriber_state {
TimeSourceSubscriberState::Active => {
// keep the subscriber
false
}
TimeSourceSubscriberState::ToBeDestroyed => {
inner_time_source.subscriber_state = TimeSourceSubscriberState::None;
// destroy the subscriber
true
}
TimeSourceSubscriberState::None => unreachable!(),
}
}
fn destroy(&mut self, node: &mut rcl_node_t) {
unsafe {
rcl_subscription_fini(&mut self.subscriber_handle, node);
}
}
}