arci_ros2/
utils.rs
1use std::{
2 sync::{
3 atomic::{AtomicBool, Ordering},
4 Arc, RwLock,
5 },
6 time::{Duration, Instant, SystemTime},
7};
8
9use futures::{
10 future::FutureExt,
11 stream::{Stream, StreamExt},
12};
13use r2r::builtin_interfaces::msg::Time;
14
15const BILLION: u128 = 1_000_000_000;
16
17pub(crate) async fn wait(is_done: Arc<AtomicBool>) {
19 loop {
20 if is_done.load(Ordering::Relaxed) {
21 break;
22 }
23 tokio::time::sleep(Duration::from_millis(10)).await;
24 }
25}
26
27pub(crate) fn subscribe_thread<T: Send + 'static, U: Send + Sync + 'static>(
28 mut subscriber: impl Stream<Item = T> + Send + Unpin + 'static,
29 buf: Arc<RwLock<U>>,
30 mut f: impl FnMut(T) -> U + Send + 'static,
31) {
32 tokio::spawn(async move {
33 while Arc::strong_count(&buf) > 1 {
34 if let Some(val) = subscriber.next().await {
35 let res = f(val);
36 *buf.write().unwrap() = res;
37 }
38 tokio::time::sleep(Duration::from_millis(100)).await;
39 }
40 });
41}
42
43pub(crate) fn subscribe_one<T: Send>(
44 mut subscriber: impl Stream<Item = T> + Send + Unpin,
45 timeout: Duration,
46) -> Option<T> {
47 std::thread::scope(|s| {
49 s.spawn(|| {
50 tokio::runtime::Builder::new_current_thread()
51 .enable_all()
52 .build()
53 .unwrap()
54 .block_on(async move {
55 let start = Instant::now();
56 loop {
57 if let Some(v) = subscriber.next().now_or_never() {
58 return v;
59 }
60 if start.elapsed() > timeout {
61 return None; }
63 tokio::time::sleep(Duration::from_millis(100)).await;
64 }
65 })
66 })
67 .join()
68 .unwrap()
69 })
70}
71
72pub fn convert_system_time_to_ros2_time(time: &SystemTime) -> Time {
73 let mut ros_clock = r2r::Clock::create(r2r::ClockType::RosTime).unwrap();
74 let ros_now = ros_clock.get_now().unwrap();
75 let system_now = SystemTime::now();
76
77 let nano = if system_now < *time {
78 time.duration_since(system_now).unwrap().as_nanos() + ros_now.as_nanos()
79 } else {
80 ros_now.as_nanos() - system_now.duration_since(*time).unwrap().as_nanos()
81 };
82
83 Time {
84 sec: (nano / BILLION) as i32,
85 nanosec: (nano % BILLION) as u32,
86 }
87}
88
89pub fn convert_ros2_time_to_system_time(time: &Time) -> SystemTime {
90 let mut ros_clock = r2r::Clock::create(r2r::ClockType::RosTime).unwrap();
91 let ros_now = ros_clock.get_now().unwrap();
92 let system_now = SystemTime::now();
93 let ros_time_nanos = time.sec as u128 * BILLION + time.nanosec as u128;
94 let ros_now_nanos = ros_now.as_nanos();
95
96 if ros_now_nanos < ros_time_nanos {
97 system_now
98 .checked_add(Duration::from_nanos(
99 (ros_time_nanos - ros_now_nanos) as u64,
100 ))
101 .unwrap()
102 } else {
103 system_now
104 .checked_sub(Duration::from_nanos(
105 (ros_now_nanos - ros_time_nanos) as u64,
106 ))
107 .unwrap()
108 }
109}