arci_ros2/
utils.rs

1use std::{
2    sync::{
3        atomic::{AtomicBool, Ordering},
4        Arc, RwLock,
5    },
6    time::{Duration, Instant, SystemTime},
7};
8
9use futures::{
10    future::FutureExt,
11    stream::{Stream, StreamExt},
12};
13use r2r::builtin_interfaces::msg::Time;
14
15const BILLION: u128 = 1_000_000_000;
16
17// TODO: timeout
18pub(crate) async fn wait(is_done: Arc<AtomicBool>) {
19    loop {
20        if is_done.load(Ordering::Relaxed) {
21            break;
22        }
23        tokio::time::sleep(Duration::from_millis(10)).await;
24    }
25}
26
27pub(crate) fn subscribe_thread<T: Send + 'static, U: Send + Sync + 'static>(
28    mut subscriber: impl Stream<Item = T> + Send + Unpin + 'static,
29    buf: Arc<RwLock<U>>,
30    mut f: impl FnMut(T) -> U + Send + 'static,
31) {
32    tokio::spawn(async move {
33        while Arc::strong_count(&buf) > 1 {
34            if let Some(val) = subscriber.next().await {
35                let res = f(val);
36                *buf.write().unwrap() = res;
37            }
38            tokio::time::sleep(Duration::from_millis(100)).await;
39        }
40    });
41}
42
43pub(crate) fn subscribe_one<T: Send>(
44    mut subscriber: impl Stream<Item = T> + Send + Unpin,
45    timeout: Duration,
46) -> Option<T> {
47    // https://github.com/openrr/openrr/pull/501#discussion_r746183161
48    std::thread::scope(|s| {
49        s.spawn(|| {
50            tokio::runtime::Builder::new_current_thread()
51                .enable_all()
52                .build()
53                .unwrap()
54                .block_on(async move {
55                    let start = Instant::now();
56                    loop {
57                        if let Some(v) = subscriber.next().now_or_never() {
58                            return v;
59                        }
60                        if start.elapsed() > timeout {
61                            return None; // timeout
62                        }
63                        tokio::time::sleep(Duration::from_millis(100)).await;
64                    }
65                })
66        })
67        .join()
68        .unwrap()
69    })
70}
71
72pub fn convert_system_time_to_ros2_time(time: &SystemTime) -> Time {
73    let mut ros_clock = r2r::Clock::create(r2r::ClockType::RosTime).unwrap();
74    let ros_now = ros_clock.get_now().unwrap();
75    let system_now = SystemTime::now();
76
77    let nano = if system_now < *time {
78        time.duration_since(system_now).unwrap().as_nanos() + ros_now.as_nanos()
79    } else {
80        ros_now.as_nanos() - system_now.duration_since(*time).unwrap().as_nanos()
81    };
82
83    Time {
84        sec: (nano / BILLION) as i32,
85        nanosec: (nano % BILLION) as u32,
86    }
87}
88
89pub fn convert_ros2_time_to_system_time(time: &Time) -> SystemTime {
90    let mut ros_clock = r2r::Clock::create(r2r::ClockType::RosTime).unwrap();
91    let ros_now = ros_clock.get_now().unwrap();
92    let system_now = SystemTime::now();
93    let ros_time_nanos = time.sec as u128 * BILLION + time.nanosec as u128;
94    let ros_now_nanos = ros_now.as_nanos();
95
96    if ros_now_nanos < ros_time_nanos {
97        system_now
98            .checked_add(Duration::from_nanos(
99                (ros_time_nanos - ros_now_nanos) as u64,
100            ))
101            .unwrap()
102    } else {
103        system_now
104            .checked_sub(Duration::from_nanos(
105                (ros_now_nanos - ros_time_nanos) as u64,
106            ))
107            .unwrap()
108    }
109}