arci_ros2/
node.rs
1use std::{
2 sync::{
3 atomic::{AtomicBool, Ordering},
4 Arc, Mutex, MutexGuard,
5 },
6 time::Duration,
7};
8
9#[derive(Clone)]
11pub struct Node {
12 inner: Arc<NodeInner>,
13}
14
15struct NodeInner {
16 node: Mutex<r2r::Node>,
17 has_spin_thread: AtomicBool,
18}
19
20impl Node {
21 pub fn new(name: &str, namespace: &str) -> Result<Self, arci::Error> {
23 let ctx = r2r::Context::create().map_err(anyhow::Error::from)?;
24 Self::with_context(ctx, name, namespace)
25 }
26
27 pub fn with_context(
29 ctx: r2r::Context,
30 name: &str,
31 namespace: &str,
32 ) -> Result<Self, arci::Error> {
33 let node = r2r::Node::create(ctx, name, namespace).map_err(anyhow::Error::from)?;
34 Ok(Self {
35 inner: Arc::new(NodeInner {
36 node: Mutex::new(node),
37 has_spin_thread: AtomicBool::new(false),
38 }),
39 })
40 }
41
42 pub fn r2r(&self) -> MutexGuard<'_, r2r::Node> {
44 self.inner.node.lock().unwrap()
45 }
46
47 pub fn run_spin_thread(&self, interval: Duration) {
49 if self.inner.has_spin_thread.swap(true, Ordering::Relaxed) {
50 return;
51 }
52 let node = self.clone();
53 tokio::spawn(async move {
54 while Arc::strong_count(&node.inner) > 1 {
55 node.spin_once(interval).await;
56 }
57 });
58 }
59
60 pub async fn spin_once(&self, duration: Duration) {
62 let now = std::time::Instant::now();
63 self.r2r().spin_once(Duration::ZERO);
66 tokio::time::sleep(duration.saturating_sub(now.elapsed())).await;
67 }
68}