arci_ros2/
node.rs

1use std::{
2    sync::{
3        atomic::{AtomicBool, Ordering},
4        Arc, Mutex, MutexGuard,
5    },
6    time::Duration,
7};
8
9/// ROS2 node. This is a wrapper around `Arc<Mutex<r2r::Node>>`.
10#[derive(Clone)]
11pub struct Node {
12    inner: Arc<NodeInner>,
13}
14
15struct NodeInner {
16    node: Mutex<r2r::Node>,
17    has_spin_thread: AtomicBool,
18}
19
20impl Node {
21    /// Creates a new ROS2 node.
22    pub fn new(name: &str, namespace: &str) -> Result<Self, arci::Error> {
23        let ctx = r2r::Context::create().map_err(anyhow::Error::from)?;
24        Self::with_context(ctx, name, namespace)
25    }
26
27    /// Creates a new ROS2 node with `r2r::Context`.
28    pub fn with_context(
29        ctx: r2r::Context,
30        name: &str,
31        namespace: &str,
32    ) -> Result<Self, arci::Error> {
33        let node = r2r::Node::create(ctx, name, namespace).map_err(anyhow::Error::from)?;
34        Ok(Self {
35            inner: Arc::new(NodeInner {
36                node: Mutex::new(node),
37                has_spin_thread: AtomicBool::new(false),
38            }),
39        })
40    }
41
42    /// Gets underlying `r2r::Node`.
43    pub fn r2r(&self) -> MutexGuard<'_, r2r::Node> {
44        self.inner.node.lock().unwrap()
45    }
46
47    /// Creates a thread to spin the ROS2 node.
48    pub fn run_spin_thread(&self, interval: Duration) {
49        if self.inner.has_spin_thread.swap(true, Ordering::Relaxed) {
50            return;
51        }
52        let node = self.clone();
53        tokio::spawn(async move {
54            while Arc::strong_count(&node.inner) > 1 {
55                node.spin_once(interval).await;
56            }
57        });
58    }
59
60    /// Spins the ROS2 node.
61    pub async fn spin_once(&self, duration: Duration) {
62        let now = std::time::Instant::now();
63        // Sleep with tokio::time::sleep instead of spin_once, since spin_once
64        // blocks the current thread while still holding the lock.
65        self.r2r().spin_once(Duration::ZERO);
66        tokio::time::sleep(duration.saturating_sub(now.elapsed())).await;
67    }
68}