1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
use std::{
    sync::{
        atomic::{AtomicBool, Ordering},
        Arc, RwLock,
    },
    time::{Duration, Instant, SystemTime},
};

use futures::{
    future::FutureExt,
    stream::{Stream, StreamExt},
};
use r2r::builtin_interfaces::msg::Time;

const BILLION: u128 = 1_000_000_000;

// TODO: timeout
pub(crate) async fn wait(is_done: Arc<AtomicBool>) {
    loop {
        if is_done.load(Ordering::Relaxed) {
            break;
        }
        tokio::time::sleep(Duration::from_millis(10)).await;
    }
}

pub(crate) fn subscribe_thread<T: Send + 'static, U: Send + Sync + 'static>(
    mut subscriber: impl Stream<Item = T> + Send + Unpin + 'static,
    buf: Arc<RwLock<U>>,
    mut f: impl FnMut(T) -> U + Send + 'static,
) {
    tokio::spawn(async move {
        while Arc::strong_count(&buf) > 1 {
            if let Some(val) = subscriber.next().await {
                let res = f(val);
                *buf.write().unwrap() = res;
            }
            tokio::time::sleep(Duration::from_millis(100)).await;
        }
    });
}

pub(crate) fn subscribe_one<T: Send>(
    mut subscriber: impl Stream<Item = T> + Send + Unpin,
    timeout: Duration,
) -> Option<T> {
    // https://github.com/openrr/openrr/pull/501#discussion_r746183161
    std::thread::scope(|s| {
        s.spawn(|| {
            tokio::runtime::Builder::new_current_thread()
                .enable_all()
                .build()
                .unwrap()
                .block_on(async move {
                    let start = Instant::now();
                    loop {
                        if let Some(v) = subscriber.next().now_or_never() {
                            return v;
                        }
                        if start.elapsed() > timeout {
                            return None; // timeout
                        }
                        tokio::time::sleep(Duration::from_millis(100)).await;
                    }
                })
        })
        .join()
        .unwrap()
    })
}

pub fn convert_system_time_to_ros2_time(time: &SystemTime) -> Time {
    let mut ros_clock = r2r::Clock::create(r2r::ClockType::RosTime).unwrap();
    let ros_now = ros_clock.get_now().unwrap();
    let system_now = SystemTime::now();

    let nano = if system_now < *time {
        time.duration_since(system_now).unwrap().as_nanos() + ros_now.as_nanos()
    } else {
        ros_now.as_nanos() - system_now.duration_since(*time).unwrap().as_nanos()
    };

    Time {
        sec: (nano / BILLION) as i32,
        nanosec: (nano % BILLION) as u32,
    }
}

pub fn convert_ros2_time_to_system_time(time: &Time) -> SystemTime {
    let mut ros_clock = r2r::Clock::create(r2r::ClockType::RosTime).unwrap();
    let ros_now = ros_clock.get_now().unwrap();
    let system_now = SystemTime::now();
    let ros_time_nanos = time.sec as u128 * BILLION + time.nanosec as u128;
    let ros_now_nanos = ros_now.as_nanos();

    if ros_now_nanos < ros_time_nanos {
        system_now
            .checked_add(Duration::from_nanos(
                (ros_time_nanos - ros_now_nanos) as u64,
            ))
            .unwrap()
    } else {
        system_now
            .checked_sub(Duration::from_nanos(
                (ros_now_nanos - ros_time_nanos) as u64,
            ))
            .unwrap()
    }
}