r2r/
nodes.rs

1use futures::{
2    channel::{mpsc, oneshot},
3    future::{self, join_all, FutureExt, TryFutureExt},
4    stream::{Stream, StreamExt},
5};
6use indexmap::IndexMap;
7use std::{
8    collections::HashMap,
9    ffi::{CStr, CString},
10    future::Future,
11    marker::PhantomPinned,
12    mem::MaybeUninit,
13    pin::Pin,
14    sync::{Arc, Mutex},
15    time::Duration,
16};
17
18use r2r_actions::*;
19use r2r_rcl::*;
20
21#[cfg(r2r__rosgraph_msgs__msg__Clock)]
22use crate::time_source::TimeSource;
23use crate::{
24    action_clients::*,
25    action_clients_untyped::*,
26    action_servers::*,
27    clients::*,
28    clocks::*,
29    context::*,
30    error::*,
31    msg_types::{generated_msgs::rcl_interfaces, *},
32    parameters::*,
33    publishers::*,
34    qos::QosProfile,
35    services::*,
36    subscribers::*,
37};
38
39/// A ROS Node.
40///
41/// This struct owns all subscribes, publishers, etc.  To get events
42/// from the ROS network into your ros application, `spin_once` should
43/// be called continously.
44pub struct Node {
45    context: Context,
46    /// ROS parameters.
47    pub params: Arc<Mutex<IndexMap<String, Parameter>>>,
48    pub(crate) node_handle: Box<rcl_node_t>,
49    // the node owns the subscribers
50    pub(crate) subscribers: Vec<Box<dyn Subscriber_>>,
51    // services,
52    services: Vec<Arc<Mutex<dyn Service_>>>,
53    // service clients
54    clients: Vec<Arc<Mutex<dyn Client_>>>,
55    // action clients
56    action_clients: Vec<Arc<Mutex<dyn ActionClient_>>>,
57    // action servers
58    action_servers: Vec<Arc<Mutex<dyn ActionServer_>>>,
59    // timers,
60    timers: Vec<Timer_>,
61    // and the publishers, whom we allow to be shared.. hmm.
62    pubs: Vec<Arc<Publisher_>>,
63    // RosTime clock used by all timers created by create_timer()
64    ros_clock: Arc<Mutex<Clock>>,
65    // time source that provides simulated time
66    #[cfg(r2r__rosgraph_msgs__msg__Clock)]
67    time_source: TimeSource,
68}
69
70unsafe impl Send for Node {}
71
72impl Node {
73    /// Returns the name of the node.
74    pub fn name(&self) -> Result<String> {
75        let cstr = unsafe { rcl_node_get_name(self.node_handle.as_ref()) };
76        if cstr.is_null() {
77            return Err(Error::RCL_RET_NODE_INVALID);
78        }
79        let s = unsafe { CStr::from_ptr(cstr) };
80        Ok(s.to_str().unwrap_or("").to_owned())
81    }
82
83    /// Returns the fully qualified name of the node.
84    pub fn fully_qualified_name(&self) -> Result<String> {
85        let cstr = unsafe { rcl_node_get_fully_qualified_name(self.node_handle.as_ref()) };
86        if cstr.is_null() {
87            return Err(Error::RCL_RET_NODE_INVALID);
88        }
89        let s = unsafe { CStr::from_ptr(cstr) };
90        Ok(s.to_str().unwrap_or("").to_owned())
91    }
92
93    /// Returns the namespace of the node.
94    pub fn namespace(&self) -> Result<String> {
95        let cstr = unsafe { rcl_node_get_namespace(self.node_handle.as_ref()) };
96        if cstr.is_null() {
97            return Err(Error::RCL_RET_NODE_INVALID);
98        }
99        let s = unsafe { CStr::from_ptr(cstr) };
100        Ok(s.to_str().unwrap_or("").to_owned())
101    }
102
103    fn load_params(&mut self) -> Result<()> {
104        let ctx = self.context.context_handle.lock().unwrap();
105        let mut params: Box<*mut rcl_params_t> = Box::new(std::ptr::null_mut());
106
107        let ret =
108            unsafe { rcl_arguments_get_param_overrides(&ctx.global_arguments, params.as_mut()) };
109        if ret != RCL_RET_OK as i32 {
110            log::error!("could not read parameters: {}", ret);
111            return Err(Error::from_rcl_error(ret));
112        }
113
114        if params.is_null() {
115            return Ok(());
116        }
117
118        unsafe {
119            if (*(*params.as_ref())).node_names == std::ptr::null_mut()
120                || (*(*params.as_ref())).params == std::ptr::null_mut()
121            {
122                rcl_yaml_node_struct_fini(*params);
123                return Ok(());
124            }
125        }
126
127        let node_names = unsafe {
128            std::slice::from_raw_parts(
129                (*(*params.as_ref())).node_names,
130                (*(*params.as_ref())).num_nodes,
131            )
132        };
133
134        let node_params = unsafe {
135            std::slice::from_raw_parts(
136                (*(*params.as_ref())).params,
137                (*(*params.as_ref())).num_nodes,
138            )
139        };
140
141        let qualified_name = self.fully_qualified_name()?;
142        let name = self.name()?;
143
144        for (nn, np) in node_names.iter().zip(node_params) {
145            let node_name_cstr = unsafe { CStr::from_ptr(*nn) };
146            let node_name = node_name_cstr.to_str().unwrap_or("");
147
148            // This is copied from rclcpp, but there is a comment there suggesting
149            // that more wildcards will be added in the future. Take note and mimic
150            // their behavior.
151            if !(node_name == "/**"
152                || node_name == "**"
153                || qualified_name == node_name
154                || name == node_name)
155            {
156                continue;
157            }
158
159            // make key value pairs.
160            let mut params = self.params.lock().unwrap();
161            if np.parameter_names != std::ptr::null_mut()
162                && np.parameter_values != std::ptr::null_mut()
163            {
164                let param_names =
165                    unsafe { std::slice::from_raw_parts(np.parameter_names, np.num_params) };
166
167                let param_values =
168                    unsafe { std::slice::from_raw_parts(np.parameter_values, np.num_params) };
169
170                for (s, v) in param_names.iter().zip(param_values) {
171                    let s = unsafe { CStr::from_ptr(*s) };
172                    let key = s.to_str().unwrap_or("");
173                    let val = ParameterValue::from_rcl(v);
174                    params.insert(key.to_owned(), Parameter::new(val));
175                }
176            }
177        }
178
179        unsafe { rcl_yaml_node_struct_fini(*params) };
180        Ok(())
181    }
182
183    /// Creates a ROS node.
184    pub fn create(ctx: Context, name: &str, namespace: &str) -> Result<Node> {
185        let (res, node_handle) = {
186            let mut ctx_handle = ctx.context_handle.lock().unwrap();
187
188            let c_node_name = CString::new(name).unwrap();
189            let c_node_ns = CString::new(namespace).unwrap();
190            let mut node_handle: Box<rcl_node_t> =
191                unsafe { Box::new(rcl_get_zero_initialized_node()) };
192            let res = unsafe {
193                let node_options = rcl_node_get_default_options();
194                rcl_node_init(
195                    node_handle.as_mut(),
196                    c_node_name.as_ptr(),
197                    c_node_ns.as_ptr(),
198                    ctx_handle.as_mut(),
199                    &node_options as *const _,
200                )
201            };
202            (res, node_handle)
203        };
204
205        if res == RCL_RET_OK as i32 {
206            let ros_clock = Arc::new(Mutex::new(Clock::create(ClockType::RosTime)?));
207            #[cfg(r2r__rosgraph_msgs__msg__Clock)]
208            let time_source = {
209                let time_source = TimeSource::new();
210                time_source.attach_ros_clock(Arc::downgrade(&ros_clock))?;
211                time_source
212            };
213
214            let mut node = Node {
215                params: Arc::new(Mutex::new(IndexMap::new())),
216                context: ctx,
217                node_handle,
218                subscribers: Vec::new(),
219                services: Vec::new(),
220                clients: Vec::new(),
221                action_clients: Vec::new(),
222                action_servers: Vec::new(),
223                timers: Vec::new(),
224                pubs: Vec::new(),
225                ros_clock,
226                #[cfg(r2r__rosgraph_msgs__msg__Clock)]
227                time_source,
228            };
229            node.load_params()?;
230            Ok(node)
231        } else {
232            log::error!("could not create node{}", res);
233            Err(Error::from_rcl_error(res))
234        }
235    }
236
237    /// Creates parameter service handlers for the Node.
238    ///
239    /// This function returns a tuple (`Future`, `Stream`), where the
240    /// future should be spawned on onto the executor of choice. The
241    /// `Stream` produces events whenever parameters change from
242    /// external sources. The event elements of the event stream
243    /// include the name of the parameter which was updated as well as
244    /// its new value.
245    pub fn make_parameter_handler(
246        &mut self,
247    ) -> Result<(impl Future<Output = ()> + Send, impl Stream<Item = (String, ParameterValue)>)>
248    {
249        self.make_parameter_handler_internal(None)
250    }
251
252    /// Creates parameter service handlers for the Node based on the
253    /// [`RosParams`] trait.
254    ///
255    /// Supported parameter names and types are given by the
256    /// `params_struct` parameter (usually referring to a structure).
257    /// Fields of the structure will be updated based on the command
258    /// line parameters (if any) and later whenever a parameter gets
259    /// changed from external sources. Updated fields will be visible
260    /// outside of the node via the GetParameters service.
261    ///
262    /// This function returns a tuple (`Future`, `Stream`), where the
263    /// future should be spawned on onto the executor of choice. The
264    /// `Stream` produces events whenever parameters change from
265    /// external sources. The event elements of the event stream
266    /// include the name of the parameter which was updated as well as
267    /// its new value.
268    pub fn make_derived_parameter_handler(
269        &mut self, params_struct: Arc<Mutex<dyn RosParams + Send>>,
270    ) -> Result<(impl Future<Output = ()> + Send, impl Stream<Item = (String, ParameterValue)>)>
271    {
272        self.make_parameter_handler_internal(Some(params_struct))
273    }
274
275    fn make_parameter_handler_internal(
276        &mut self, params_struct: Option<Arc<Mutex<dyn RosParams + Send>>>,
277    ) -> Result<(impl Future<Output = ()> + Send, impl Stream<Item = (String, ParameterValue)>)>
278    {
279        if let Some(ps) = &params_struct {
280            // register all parameters
281            ps.lock()
282                .unwrap()
283                .register_parameters("", None, &mut self.params.lock().unwrap())?;
284        }
285        let mut handlers: Vec<std::pin::Pin<Box<dyn Future<Output = ()> + Send>>> = Vec::new();
286
287        let (mut set_event_tx, event_rx) = mpsc::channel::<(String, ParameterValue)>(10);
288        let mut set_atomically_event_tx = set_event_tx.clone();
289
290        let node_name = self.name()?;
291
292        // rcl_interfaces/srv/SetParameters
293        let set_params_request_stream = self
294            .create_service::<rcl_interfaces::srv::SetParameters::Service>(
295                &format!("{}/set_parameters", node_name),
296                QosProfile::default(),
297            )?;
298
299        let params = self.params.clone();
300        let params_struct_clone = params_struct.clone();
301        let set_params_future = set_params_request_stream.for_each(
302            move |req: ServiceRequest<rcl_interfaces::srv::SetParameters::Service>| {
303                let mut result = rcl_interfaces::srv::SetParameters::Response::default();
304                for p in &req.message.parameters {
305                    let val = ParameterValue::from_parameter_value_msg(p.value.clone());
306                    let changed = params
307                        .lock()
308                        .unwrap()
309                        .get(&p.name)
310                        .map(|v| v.value != val)
311                        .unwrap_or(true); // changed=true if new
312                    let r = if let Some(ps) = &params_struct_clone {
313                        // Update parameter structure
314                        let result = ps.lock().unwrap().set_parameter(&p.name, &val);
315                        if result.is_ok() {
316                            // Also update Node::params
317                            params
318                                .lock()
319                                .unwrap()
320                                .entry(p.name.clone())
321                                .and_modify(|p| p.value = val.clone());
322                        }
323                        rcl_interfaces::msg::SetParametersResult {
324                            successful: result.is_ok(),
325                            reason: result.err().map_or("".into(), |e| e.to_string()),
326                        }
327                    } else {
328                        // No parameter structure - update only Node::params
329                        params
330                            .lock()
331                            .unwrap()
332                            .entry(p.name.clone())
333                            .and_modify(|p| p.value = val.clone())
334                            .or_insert(Parameter::new(val.clone()));
335                        rcl_interfaces::msg::SetParametersResult {
336                            successful: true,
337                            reason: "".into(),
338                        }
339                    };
340                    // if the value changed, send out new value on parameter event stream
341                    if changed && r.successful {
342                        if let Err(e) = set_event_tx.try_send((p.name.clone(), val)) {
343                            log::debug!("Warning: could not send parameter event ({}).", e);
344                        }
345                    }
346                    result.results.push(r);
347                }
348                req.respond(result)
349                    .expect("could not send reply to set parameter request");
350                future::ready(())
351            },
352        );
353        handlers.push(Box::pin(set_params_future));
354
355        // rcl_interfaces/srv/GetParameters
356        let get_params_request_stream = self
357            .create_service::<rcl_interfaces::srv::GetParameters::Service>(
358                &format!("{}/get_parameters", node_name),
359                QosProfile::default(),
360            )?;
361
362        let params = self.params.clone();
363        let params_struct_clone = params_struct.clone();
364        let get_params_future = get_params_request_stream.for_each(
365            move |req: ServiceRequest<rcl_interfaces::srv::GetParameters::Service>| {
366                let params = params.lock().unwrap();
367                let values = req
368                    .message
369                    .names
370                    .iter()
371                    .map(|n| {
372                        // First try to get the parameter from the param structure
373                        if let Some(ps) = &params_struct_clone {
374                            if let Ok(value) = ps.lock().unwrap().get_parameter(n) {
375                                return value;
376                            }
377                        }
378                        // Otherwise get it from node HashMap
379                        match params.get(n) {
380                            Some(v) => v.value.clone(),
381                            None => ParameterValue::NotSet,
382                        }
383                    })
384                    .map(|v| v.into_parameter_value_msg())
385                    .collect::<Vec<rcl_interfaces::msg::ParameterValue>>();
386
387                let result = rcl_interfaces::srv::GetParameters::Response { values };
388                req.respond(result)
389                    .expect("could not send reply to set parameter request");
390                future::ready(())
391            },
392        );
393
394        handlers.push(Box::pin(get_params_future));
395
396        // rcl_interfaces/srv/ListParameters
397        use rcl_interfaces::srv::ListParameters;
398        let list_params_request_stream = self.create_service::<ListParameters::Service>(
399            &format!("{}/list_parameters", node_name),
400            QosProfile::default(),
401        )?;
402
403        let params = self.params.clone();
404        let list_params_future = list_params_request_stream.for_each(
405            move |req: ServiceRequest<ListParameters::Service>| {
406                Self::handle_list_parameters(req, &params)
407            },
408        );
409
410        handlers.push(Box::pin(list_params_future));
411
412        // rcl_interfaces/srv/DescribeParameters
413        use rcl_interfaces::srv::DescribeParameters;
414        let desc_params_request_stream = self.create_service::<DescribeParameters::Service>(
415            &format!("{node_name}/describe_parameters"),
416            QosProfile::default(),
417        )?;
418
419        let params = self.params.clone();
420        let desc_params_future = desc_params_request_stream.for_each(
421            move |req: ServiceRequest<DescribeParameters::Service>| {
422                Self::handle_desc_parameters(req, &params)
423            },
424        );
425
426        handlers.push(Box::pin(desc_params_future));
427
428        // rcl_interfaces/srv/GetParameterTypes
429        use rcl_interfaces::srv::GetParameterTypes;
430        let get_param_types_request_stream = self.create_service::<GetParameterTypes::Service>(
431            &format!("{node_name}/get_parameter_types"),
432            QosProfile::default(),
433        )?;
434
435        let params = self.params.clone();
436        let get_param_types_future = get_param_types_request_stream.for_each(
437            move |req: ServiceRequest<GetParameterTypes::Service>| {
438                let params = params.lock().unwrap();
439                let types = req
440                    .message
441                    .names
442                    .iter()
443                    .map(|name| match params.get(name) {
444                        Some(param) => param.value.into_parameter_type(),
445                        None => rcl_interfaces::msg::ParameterType::PARAMETER_NOT_SET as u8,
446                    })
447                    .collect();
448                req.respond(GetParameterTypes::Response { types })
449                    .expect("could not send reply to get parameter types request");
450                future::ready(())
451            },
452        );
453
454        handlers.push(Box::pin(get_param_types_future));
455
456        // rcl_interfaces/srv/SetParametersAtomically
457        let set_params_atomically_request_stream =
458            self.create_service::<rcl_interfaces::srv::SetParametersAtomically::Service>(
459                &format!("{}/set_parameters_atomically", node_name),
460                QosProfile::default(),
461            )?;
462
463        let params = self.params.clone();
464        let params_struct_clone = params_struct.clone();
465        let set_params_atomically_future = set_params_atomically_request_stream.for_each(
466            move |req: ServiceRequest<rcl_interfaces::srv::SetParametersAtomically::Service>| {
467                let mut result = rcl_interfaces::srv::SetParametersAtomically::Response::default();
468                result.result.successful = true;
469                if let Some(ps) = &params_struct_clone {
470                    for p in &req.message.parameters {
471                        let val = ParameterValue::from_parameter_value_msg(p.value.clone());
472                        if let Err(e) = ps.lock().unwrap().check_parameter(&p.name, &val) {
473                            result.result.successful = false;
474                            result.result.reason = format!("Can't set parameter {}: {}", p.name, e);
475                            break;
476                        }
477                    }
478                }
479                if result.result.successful {
480                    // Since we checked them above now we assume these will be set ok...
481                    for p in &req.message.parameters {
482                        let val = ParameterValue::from_parameter_value_msg(p.value.clone());
483                        let changed = params
484                            .lock()
485                            .unwrap()
486                            .get(&p.name)
487                            .map(|v| v.value != val)
488                            .unwrap_or(true); // changed=true if new
489                        let r = if let Some(ps) = &params_struct_clone {
490                            // Update parameter structure
491                            let result = ps.lock().unwrap().set_parameter(&p.name, &val);
492                            if result.is_ok() {
493                                // Also update Node::params
494                                params
495                                    .lock()
496                                    .unwrap()
497                                    .entry(p.name.clone())
498                                    .and_modify(|p| p.value = val.clone());
499                            }
500                            rcl_interfaces::msg::SetParametersResult {
501                                successful: result.is_ok(),
502                                reason: result.err().map_or("".into(), |e| e.to_string()),
503                            }
504                        } else {
505                            // No parameter structure - update only Node::params
506                            params
507                                .lock()
508                                .unwrap()
509                                .entry(p.name.clone())
510                                .and_modify(|p| p.value = val.clone())
511                                .or_insert(Parameter::new(val.clone()));
512                            rcl_interfaces::msg::SetParametersResult {
513                                successful: true,
514                                reason: "".into(),
515                            }
516                        };
517                        // if the value changed, send out new value on parameter event stream
518                        if changed && r.successful {
519                            if let Err(e) = set_atomically_event_tx.try_send((p.name.clone(), val)) {
520                                log::debug!("Warning: could not send parameter event ({}).", e);
521                            }
522                        }
523                    }
524                }
525                req.respond(result)
526                    .expect("could not send reply to set parameter request");
527                future::ready(())
528            },
529        );
530        handlers.push(Box::pin(set_params_atomically_future));
531
532        #[cfg(r2r__rosgraph_msgs__msg__Clock)]
533        {
534            // create TimeSource based on value of use_sim_time parameter
535            let use_sim_time = {
536                let params = self.params.lock().unwrap();
537                params
538                    .get("use_sim_time")
539                    .and_then(|param| {
540                        if let ParameterValue::Bool(val) = param.value {
541                            Some(val)
542                        } else {
543                            log::error!("Parameter use_sim_time is not bool. Assuming false");
544                            None
545                        }
546                    })
547                    .unwrap_or(false)
548            };
549            if use_sim_time {
550                let ts = self.time_source.clone();
551                ts.enable_sim_time(self)?;
552            }
553        }
554
555        // we don't care about the result, the futures will not complete anyway.
556        Ok((join_all(handlers).map(|_| ()), event_rx))
557    }
558
559    fn handle_list_parameters(
560        req: ServiceRequest<rcl_interfaces::srv::ListParameters::Service>,
561        params: &Arc<Mutex<IndexMap<String, Parameter>>>,
562    ) -> future::Ready<()> {
563        use rcl_interfaces::srv::ListParameters;
564
565        let depth = req.message.depth;
566        let prefixes = &req.message.prefixes;
567        let separator = '.';
568        let params = params.lock().unwrap();
569        let mut result = rcl_interfaces::msg::ListParametersResult {
570            names: vec![],
571            prefixes: vec![],
572        };
573        for (name, _) in params.iter().filter(|(name, _)| {
574            let get_all = prefixes.is_empty()
575                && ((depth == ListParameters::Request::DEPTH_RECURSIVE as u64)
576                    || name.matches(separator).count() < depth as usize);
577            let prefix_matches = prefixes.iter().any(|prefix| {
578                if *name == prefix {
579                    return true;
580                } else if name.starts_with(&format!("{prefix}{separator}")) {
581                    let substr = &name[prefix.len()..];
582                    return (depth == ListParameters::Request::DEPTH_RECURSIVE as u64)
583                        || substr.matches(separator).count() < depth as usize;
584                }
585                false
586            });
587            get_all || prefix_matches
588        }) {
589            result.names.push(name.clone());
590            if let Some(last_separator) = name.rfind(separator) {
591                let prefix = &name[0..last_separator];
592                if result.prefixes.iter().all(|p| p != prefix) {
593                    result.prefixes.push(prefix.to_string());
594                }
595            }
596        }
597        req.respond(ListParameters::Response { result })
598            .expect("could not send reply to list parameter request");
599        future::ready(())
600    }
601
602    fn handle_desc_parameters(
603        req: ServiceRequest<rcl_interfaces::srv::DescribeParameters::Service>,
604        params: &Arc<Mutex<IndexMap<String, Parameter>>>,
605    ) -> future::Ready<()> {
606        use rcl_interfaces::{msg::ParameterDescriptor, srv::DescribeParameters};
607        let mut descriptors = Vec::<ParameterDescriptor>::new();
608        let params = params.lock().unwrap();
609        for name in &req.message.names {
610            let default = Parameter::empty();
611            let param = params.get(name).unwrap_or(&default);
612            descriptors.push(ParameterDescriptor {
613                name: name.clone(),
614                type_: param.value.into_parameter_type(),
615                description: param.description.to_string(),
616                ..Default::default()
617            });
618        }
619        req.respond(DescribeParameters::Response { descriptors })
620            .expect("could not send reply to describe parameters request");
621        future::ready(())
622    }
623
624    /// Fetch a single ROS parameter.
625    pub fn get_parameter<T>(&self, name: &str) -> Result<T>
626    where
627        ParameterValue: TryInto<T, Error = WrongParameterType>,
628    {
629        let params = self.params.lock().unwrap();
630        let value = params
631            .get(name)
632            .map(|parameter| parameter.value.clone())
633            .unwrap_or(ParameterValue::NotSet);
634
635        let value: T =
636            value
637                .try_into()
638                .map_err(|error: WrongParameterType| Error::ParameterWrongType {
639                    name: name.to_string(),
640                    expected_type: error.expected_type_name,
641                    actual_type: error.actual_type_name,
642                })?;
643
644        Ok(value)
645    }
646
647    /// Subscribe to a ROS topic.
648    ///
649    /// This function returns a `Stream` of ros messages.
650    pub fn subscribe<T: 'static>(
651        &mut self, topic: &str, qos_profile: QosProfile,
652    ) -> Result<impl Stream<Item = T> + Unpin>
653    where
654        T: WrappedTypesupport,
655    {
656        let subscription_handle =
657            create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?;
658        let (sender, receiver) = mpsc::channel::<T>(10);
659
660        let ws = TypedSubscriber {
661            rcl_handle: subscription_handle,
662            sender,
663        };
664        self.subscribers.push(Box::new(ws));
665        Ok(receiver)
666    }
667
668    /// Subscribe to a ROS topic.
669    ///
670    /// This function returns a `Stream` of ros messages without the rust convenience types.
671    pub fn subscribe_native<T: 'static>(
672        &mut self, topic: &str, qos_profile: QosProfile,
673    ) -> Result<impl Stream<Item = WrappedNativeMsg<T>> + Unpin>
674    where
675        T: WrappedTypesupport,
676    {
677        let subscription_handle =
678            create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?;
679        let (sender, receiver) = mpsc::channel::<WrappedNativeMsg<T>>(10);
680
681        let ws = NativeSubscriber {
682            rcl_handle: subscription_handle,
683            sender,
684        };
685        self.subscribers.push(Box::new(ws));
686        Ok(receiver)
687    }
688
689    /// Subscribe to a ROS topic.
690    ///
691    /// This function returns a `Stream` of ros messages as `serde_json::Value`:s.
692    /// Useful when you cannot know the type of the message at compile time.
693    pub fn subscribe_untyped(
694        &mut self, topic: &str, topic_type: &str, qos_profile: QosProfile,
695    ) -> Result<impl Stream<Item = Result<serde_json::Value>> + Unpin> {
696        let msg = WrappedNativeMsgUntyped::new_from(topic_type)?;
697        let subscription_handle =
698            create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts, qos_profile)?;
699        let (sender, receiver) = mpsc::channel::<Result<serde_json::Value>>(10);
700
701        let ws = UntypedSubscriber {
702            rcl_handle: subscription_handle,
703            topic_type: topic_type.to_string(),
704            sender,
705        };
706        self.subscribers.push(Box::new(ws));
707        Ok(receiver)
708    }
709
710    /// Subscribe to a ROS topic.
711    ///
712    /// This function returns a `Stream` of ros messages as non-deserialized `Vec<u8>`:s.
713    /// Useful if you just want to pass the data along to another part of the system.
714    pub fn subscribe_raw(
715        &mut self, topic: &str, topic_type: &str, qos_profile: QosProfile,
716    ) -> Result<impl Stream<Item = Vec<u8>> + Unpin> {
717        // TODO is it possible to handle the raw message without type support?
718        //
719        // Passing null ts to rcl_subscription_init throws an error ..
720        //
721        // It does not seem possible to not have a type support, which is a shame
722        // because it means we always have to build the message types even if we
723        // are just after the raw bytes.
724        let msg = WrappedNativeMsgUntyped::new_from(topic_type)?;
725
726        // Keep a buffer to reduce number of allocations. The rmw will
727        // resize it if the message size exceeds the buffer size.
728        let mut msg_buf: rcl_serialized_message_t =
729            unsafe { rcutils_get_zero_initialized_uint8_array() };
730        let ret = unsafe {
731            rcutils_uint8_array_init(
732                &mut msg_buf as *mut rcl_serialized_message_t,
733                0,
734                &rcutils_get_default_allocator(),
735            )
736        };
737
738        if ret != RCL_RET_OK as i32 {
739            return Err(Error::from_rcl_error(ret));
740        }
741
742        let subscription_handle =
743            create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts, qos_profile)?;
744        let (sender, receiver) = mpsc::channel::<Vec<u8>>(10);
745
746        let ws = RawSubscriber {
747            rcl_handle: subscription_handle,
748            msg_buf,
749            sender,
750        };
751        self.subscribers.push(Box::new(ws));
752        Ok(receiver)
753    }
754
755    /// Create a ROS service.
756    ///
757    /// This function returns a `Stream` of `ServiceRequest`:s. Call
758    /// `respond` on the Service Request to send the reply.
759    pub fn create_service<T: 'static>(
760        &mut self, service_name: &str, qos_profile: QosProfile,
761    ) -> Result<impl Stream<Item = ServiceRequest<T>> + Unpin>
762    where
763        T: WrappedServiceTypeSupport,
764    {
765        let service_handle = create_service_helper(
766            self.node_handle.as_mut(),
767            service_name,
768            T::get_ts(),
769            qos_profile,
770        )?;
771        let (sender, receiver) = mpsc::channel::<ServiceRequest<T>>(10);
772
773        let ws = TypedService::<T> {
774            rcl_handle: service_handle,
775            sender,
776        };
777
778        self.services.push(Arc::new(Mutex::new(ws)));
779        Ok(receiver)
780    }
781
782    /// Create a ROS service client.
783    ///
784    /// A service client is used to make requests to a ROS service server.
785    pub fn create_client<T: 'static>(
786        &mut self, service_name: &str, qos_profile: QosProfile,
787    ) -> Result<Client<T>>
788    where
789        T: WrappedServiceTypeSupport,
790    {
791        let client_handle = create_client_helper(
792            self.node_handle.as_mut(),
793            service_name,
794            T::get_ts(),
795            qos_profile,
796        )?;
797        let ws = TypedClient::<T> {
798            rcl_handle: client_handle,
799            response_channels: Vec::new(),
800            poll_available_channels: Vec::new(),
801        };
802
803        let client_arc = Arc::new(Mutex::new(ws));
804        let c = make_client(Arc::downgrade(&client_arc));
805        self.clients.push(client_arc);
806        Ok(c)
807    }
808
809    /// Create a ROS service client.
810    ///
811    /// A service client is used to make requests to a ROS service
812    /// server. This function returns an `UntypedClient`, which deals
813    /// with `serde_json::Value`s instead of concrete types.  Useful
814    /// when you cannot know the type of the message at compile time.
815    pub fn create_client_untyped(
816        &mut self, service_name: &str, service_type: &str, qos_profile: QosProfile,
817    ) -> Result<ClientUntyped> {
818        let service_type = UntypedServiceSupport::new_from(service_type)?;
819        let client_handle = create_client_helper(
820            self.node_handle.as_mut(),
821            service_name,
822            service_type.ts,
823            qos_profile,
824        )?;
825        let client = UntypedClient_ {
826            service_type,
827            rcl_handle: client_handle,
828            response_channels: Vec::new(),
829            poll_available_channels: Vec::new(),
830        };
831
832        let client_arc = Arc::new(Mutex::new(client));
833        let c = make_untyped_client(Arc::downgrade(&client_arc));
834        self.clients.push(client_arc);
835        Ok(c)
836    }
837
838    /// Register a client for wakeup when the service or action server is available to the node.
839    ///
840    /// Returns a `Future` that completes when the service/action server is available.
841    ///
842    /// This function will register the client to be polled in
843    /// `spin_once` until available, so spin_once must be called
844    /// repeatedly in order to get the wakeup.
845    pub fn is_available(
846        client: &dyn IsAvailablePollable,
847    ) -> Result<impl Future<Output = Result<()>>> {
848        let (sender, receiver) = oneshot::channel();
849        client.register_poll_available(sender)?;
850        Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID))
851    }
852
853    /// Create a ROS action client.
854    ///
855    /// An action client is used to make requests to a ROS action server.
856    pub fn create_action_client<T: 'static>(&mut self, action_name: &str) -> Result<ActionClient<T>>
857    where
858        T: WrappedActionTypeSupport,
859    {
860        let client_handle =
861            create_action_client_helper(self.node_handle.as_mut(), action_name, T::get_ts())?;
862        let client = WrappedActionClient::<T> {
863            rcl_handle: client_handle,
864            goal_response_channels: Vec::new(),
865            cancel_response_channels: Vec::new(),
866            feedback_senders: Vec::new(),
867            result_senders: Vec::new(),
868            result_requests: Vec::new(),
869            goal_status: HashMap::new(),
870            poll_available_channels: Vec::new(),
871        };
872
873        let client_arc = Arc::new(Mutex::new(client));
874        self.action_clients.push(client_arc.clone());
875        let c = make_action_client(Arc::downgrade(&client_arc));
876        Ok(c)
877    }
878
879    /// Create a ROS action client.
880    ///
881    /// A action client is used to make requests to a ROS service
882    /// server. This function returns a `ActionClientUntyped`, which deals
883    /// with `serde_json::Value`s instead of concrete types.  Useful
884    /// when you cannot know the type of the message at compile time.
885    pub fn create_action_client_untyped(
886        &mut self, action_name: &str, action_type: &str,
887    ) -> Result<ActionClientUntyped> {
888        let action_type_support = UntypedActionSupport::new_from(action_type)?;
889        let client_handle = create_action_client_helper(
890            self.node_handle.as_mut(),
891            action_name,
892            action_type_support.ts,
893        )?;
894        let client = WrappedActionClientUntyped {
895            action_type_support,
896            rcl_handle: client_handle,
897            goal_response_channels: Vec::new(),
898            cancel_response_channels: Vec::new(),
899            feedback_senders: Vec::new(),
900            result_senders: Vec::new(),
901            result_requests: Vec::new(),
902            goal_status: HashMap::new(),
903            poll_available_channels: Vec::new(),
904        };
905
906        let client_arc = Arc::new(Mutex::new(client));
907        self.action_clients.push(client_arc.clone());
908        let c = make_action_client_untyped(Arc::downgrade(&client_arc));
909        Ok(c)
910    }
911
912    /// Create a ROS action server.
913    ///
914    /// This function returns a stream of `GoalRequest`s, which needs
915    /// to be either accepted or rejected.
916    pub fn create_action_server<T: 'static>(
917        &mut self, action_name: &str,
918    ) -> Result<impl Stream<Item = ActionServerGoalRequest<T>> + Unpin>
919    where
920        T: WrappedActionTypeSupport,
921    {
922        // for now automatically create a ros clock...
923        let mut clock_handle = MaybeUninit::<rcl_clock_t>::uninit();
924        let ret = unsafe {
925            rcl_ros_clock_init(clock_handle.as_mut_ptr(), &mut rcutils_get_default_allocator())
926        };
927        if ret != RCL_RET_OK as i32 {
928            log::error!("could not create steady clock: {}", ret);
929            return Err(Error::from_rcl_error(ret));
930        }
931        let mut clock_handle = Box::new(unsafe { clock_handle.assume_init() });
932
933        let (goal_request_sender, goal_request_receiver) =
934            mpsc::channel::<ActionServerGoalRequest<T>>(10);
935
936        let server_handle = create_action_server_helper(
937            self.node_handle.as_mut(),
938            action_name,
939            clock_handle.as_mut(),
940            T::get_ts(),
941        )?;
942        let server = WrappedActionServer::<T> {
943            rcl_handle: server_handle,
944            clock_handle,
945            goal_request_sender,
946            active_cancel_requests: Vec::new(),
947            cancel_senders: HashMap::new(),
948            goals: HashMap::new(),
949            result_msgs: HashMap::new(),
950            result_requests: HashMap::new(),
951        };
952
953        let server_arc = Arc::new(Mutex::new(server));
954        self.action_servers.push(server_arc);
955        Ok(goal_request_receiver)
956    }
957
958    /// Create a ROS publisher.
959    pub fn create_publisher<T>(
960        &mut self, topic: &str, qos_profile: QosProfile,
961    ) -> Result<Publisher<T>>
962    where
963        T: WrappedTypesupport,
964    {
965        let publisher_handle =
966            create_publisher_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?;
967        let arc = Arc::new(publisher_handle);
968        let p = make_publisher(Arc::downgrade(&arc));
969        self.pubs.push(arc);
970        Ok(p)
971    }
972
973    /// Create a ROS publisher with a type given at runtime, where the data may either be
974    /// supplied as JSON (using the `publish` method) or a pre-serialized ROS message
975    /// (i.e. &[u8], using the `publish_raw` method).
976    pub fn create_publisher_untyped(
977        &mut self, topic: &str, topic_type: &str, qos_profile: QosProfile,
978    ) -> Result<PublisherUntyped> {
979        let dummy = WrappedNativeMsgUntyped::new_from(topic_type)?;
980        let publisher_handle =
981            create_publisher_helper(self.node_handle.as_mut(), topic, dummy.ts, qos_profile)?;
982        let arc = Arc::new(publisher_handle);
983        let p = make_publisher_untyped(Arc::downgrade(&arc), topic_type.to_owned());
984        self.pubs.push(arc);
985        Ok(p)
986    }
987
988    /// Destroy a ROS publisher.
989    pub fn destroy_publisher<T: WrappedTypesupport>(&mut self, p: Publisher<T>) {
990        if let Some(handle) = p.handle.upgrade() {
991            // Remove handle from list of publishers.
992            self.pubs.iter().position(|p| Arc::ptr_eq(p, &handle))
993                .map(|i| self.pubs.swap_remove(i));
994
995            let handle = wait_until_unwrapped(handle);
996            handle.destroy(self.node_handle.as_mut());
997        }
998    }
999
1000    /// Destroy a ROS publisher.
1001    pub fn destroy_publisher_untyped(&mut self, p: PublisherUntyped) {
1002        if let Some(handle) = p.handle.upgrade() {
1003            // Remove handle from list of publishers.
1004            self.pubs.iter().position(|p| Arc::ptr_eq(p, &handle))
1005                .map(|i| self.pubs.swap_remove(i));
1006
1007            let handle = wait_until_unwrapped(handle);
1008            handle.destroy(self.node_handle.as_mut());
1009        }
1010    }
1011
1012    /// Spin the ROS node.
1013    ///
1014    /// This handles wakeups of all subscribes, services, etc on the
1015    /// ros side. In turn, this will complete future and wake up
1016    /// streams on the rust side. This needs to be called repeatedly
1017    /// (see the examples).
1018    ///
1019    /// `timeout` is a duration specifying how long the spin should
1020    /// block for if there are no pending events.
1021    pub fn spin_once(&mut self, timeout: Duration) {
1022        // first handle any completed action cancellation responses
1023        for a in &mut self.action_servers {
1024            a.lock().unwrap().send_completed_cancel_requests();
1025        }
1026
1027        // as well as polling any services/action servers for availability
1028        for c in &mut self.clients {
1029            c.lock().unwrap().poll_available(self.node_handle.as_mut());
1030        }
1031
1032        for c in &mut self.action_clients {
1033            c.lock().unwrap().poll_available(self.node_handle.as_mut());
1034        }
1035
1036        for p in &self.pubs {
1037            p.poll_has_inter_process_subscribers();
1038        }
1039
1040        let timeout = timeout.as_nanos() as i64;
1041        let mut ws = unsafe { rcl_get_zero_initialized_wait_set() };
1042
1043        // #[doc = "* This function is thread-safe for unique wait sets with unique contents."]
1044        // #[doc = "* This function cannot operate on the same wait set in multiple threads, and"]
1045        // #[doc = "* the wait sets may not share content."]
1046        // #[doc = "* For example, calling rcl_wait() in two threads on two different wait sets"]
1047        // #[doc = "* that both contain a single, shared guard condition is undefined behavior."]
1048
1049        // count action client wait set needs
1050        let mut total_action_subs = 0;
1051        let mut total_action_clients = 0;
1052        for c in &self.action_clients {
1053            let mut num_subs = 0;
1054            let mut num_gc = 0;
1055            let mut num_timers = 0;
1056            let mut num_clients = 0;
1057            let mut num_services = 0;
1058
1059            action_client_get_num_waits(
1060                c.lock().unwrap().handle(),
1061                &mut num_subs,
1062                &mut num_gc,
1063                &mut num_timers,
1064                &mut num_clients,
1065                &mut num_services,
1066            )
1067            .expect("could not get action client wait sets");
1068            // sanity check
1069            assert_eq!(num_subs, 2);
1070            assert_eq!(num_clients, 3);
1071            assert_eq!(num_gc, 0);
1072            assert_eq!(num_timers, 0);
1073            assert_eq!(num_services, 0);
1074
1075            total_action_subs += num_subs;
1076            total_action_clients += num_clients;
1077        }
1078
1079        // count action server wait set needs
1080        let mut total_action_timers = 0;
1081        let mut total_action_services = 0;
1082        for s in &self.action_servers {
1083            let mut num_subs = 0;
1084            let mut num_gc = 0;
1085            let mut num_timers = 0;
1086            let mut num_clients = 0;
1087            let mut num_services = 0;
1088
1089            action_server_get_num_waits(
1090                s.lock().unwrap().handle(),
1091                &mut num_subs,
1092                &mut num_gc,
1093                &mut num_timers,
1094                &mut num_clients,
1095                &mut num_services,
1096            )
1097            .expect("could not get action client wait sets");
1098            // sanity check
1099            assert_eq!(num_subs, 0);
1100            assert_eq!(num_clients, 0);
1101            assert_eq!(num_gc, 0);
1102            assert_eq!(num_timers, 1);
1103            assert_eq!(num_services, 3);
1104
1105            total_action_timers += num_timers;
1106            total_action_services += num_services;
1107        }
1108
1109        {
1110            let mut ctx = self.context.context_handle.lock().unwrap();
1111
1112            unsafe {
1113                rcl_wait_set_init(
1114                    &mut ws,
1115                    self.subscribers.len() + total_action_subs,
1116                    0,
1117                    self.timers.len() + total_action_timers,
1118                    self.clients.len() + total_action_clients,
1119                    self.services.len() + total_action_services,
1120                    0,
1121                    ctx.as_mut(),
1122                    rcutils_get_default_allocator(),
1123                );
1124            }
1125        }
1126        unsafe {
1127            rcl_wait_set_clear(&mut ws);
1128        }
1129
1130        for s in &self.subscribers {
1131            unsafe {
1132                rcl_wait_set_add_subscription(&mut ws, s.handle(), std::ptr::null_mut());
1133            }
1134        }
1135
1136        for s in &self.timers {
1137            unsafe {
1138                rcl_wait_set_add_timer(&mut ws, s.get_handle(), std::ptr::null_mut());
1139            }
1140        }
1141
1142        for s in &self.clients {
1143            unsafe {
1144                rcl_wait_set_add_client(&mut ws, s.lock().unwrap().handle(), std::ptr::null_mut());
1145            }
1146        }
1147
1148        for s in &self.services {
1149            unsafe {
1150                rcl_wait_set_add_service(&mut ws, s.lock().unwrap().handle(), std::ptr::null_mut());
1151            }
1152        }
1153
1154        // code (further) below assumes that actions are added last... perhaps a
1155        // bad assumption.  e.g. we add subscriptions and timers of
1156        // the node before ones created automatically by actions. we
1157        // then assume that we can count on the waitables created by
1158        // the actions are added at the end of the wait set arrays
1159        for ac in &self.action_clients {
1160            unsafe {
1161                rcl_action_wait_set_add_action_client(
1162                    &mut ws,
1163                    ac.lock().unwrap().handle(),
1164                    std::ptr::null_mut(),
1165                    std::ptr::null_mut(),
1166                );
1167            }
1168        }
1169        for acs in &self.action_servers {
1170            unsafe {
1171                rcl_action_wait_set_add_action_server(
1172                    &mut ws,
1173                    acs.lock().unwrap().handle(),
1174                    std::ptr::null_mut(),
1175                );
1176            }
1177        }
1178
1179        let ret = unsafe { rcl_wait(&mut ws, timeout) };
1180
1181        if ret == RCL_RET_TIMEOUT as i32 {
1182            unsafe {
1183                rcl_wait_set_fini(&mut ws);
1184            }
1185            return;
1186        }
1187
1188        let mut subs_to_remove = vec![];
1189        if ws.subscriptions != std::ptr::null_mut() {
1190            let ws_subs =
1191                unsafe { std::slice::from_raw_parts(ws.subscriptions, self.subscribers.len()) };
1192            for (s, ws_s) in self.subscribers.iter_mut().zip(ws_subs) {
1193                if ws_s != &std::ptr::null() {
1194                    let dropped = s.handle_incoming();
1195                    if dropped {
1196                        s.destroy(&mut self.node_handle);
1197                        subs_to_remove.push(*s.handle());
1198                    }
1199                }
1200            }
1201        }
1202        self.subscribers
1203            .retain(|s| !subs_to_remove.contains(s.handle()));
1204
1205        let mut timers_to_remove = vec![];
1206        if ws.timers != std::ptr::null_mut() {
1207            let ws_timers = unsafe { std::slice::from_raw_parts(ws.timers, self.timers.len()) };
1208            for (s, ws_s) in self.timers.iter_mut().zip(ws_timers) {
1209                if ws_s != &std::ptr::null() {
1210                    // TODO: move this to impl Timer
1211                    let dropped = s.handle_incoming();
1212                    if dropped {
1213                        timers_to_remove.push((*s.timer_handle).to_owned());
1214                    }
1215                }
1216            }
1217        }
1218        // drop timers scheduled for deletion
1219        self.timers
1220            .retain(|t| !timers_to_remove.contains(&*t.timer_handle));
1221
1222        if ws.clients != std::ptr::null_mut() {
1223            let ws_clients = unsafe { std::slice::from_raw_parts(ws.clients, self.clients.len()) };
1224            for (s, ws_s) in self.clients.iter_mut().zip(ws_clients) {
1225                if ws_s != &std::ptr::null() {
1226                    let mut s = s.lock().unwrap();
1227                    s.handle_response();
1228                }
1229            }
1230        }
1231
1232        let mut services_to_remove = vec![];
1233        if ws.services != std::ptr::null_mut() {
1234            let ws_services =
1235                unsafe { std::slice::from_raw_parts(ws.services, self.services.len()) };
1236            for (s, ws_s) in self.services.iter_mut().zip(ws_services) {
1237                if ws_s != &std::ptr::null() {
1238                    let mut service = s.lock().unwrap();
1239                    let dropped = service.handle_request(s.clone());
1240                    if dropped {
1241                        service.destroy(&mut self.node_handle);
1242                        services_to_remove.push(*service.handle());
1243                    }
1244                }
1245            }
1246        }
1247        self.services
1248            .retain(|s| !services_to_remove.contains(s.lock().unwrap().handle()));
1249
1250        for ac in &self.action_clients {
1251            let mut is_feedback_ready = false;
1252            let mut is_status_ready = false;
1253            let mut is_goal_response_ready = false;
1254            let mut is_cancel_response_ready = false;
1255            let mut is_result_response_ready = false;
1256
1257            let ret = unsafe {
1258                rcl_action_client_wait_set_get_entities_ready(
1259                    &ws,
1260                    ac.lock().unwrap().handle(),
1261                    &mut is_feedback_ready,
1262                    &mut is_status_ready,
1263                    &mut is_goal_response_ready,
1264                    &mut is_cancel_response_ready,
1265                    &mut is_result_response_ready,
1266                )
1267            };
1268
1269            if ret != RCL_RET_OK as i32 {
1270                continue;
1271            }
1272
1273            if is_feedback_ready {
1274                let mut acs = ac.lock().unwrap();
1275                acs.handle_feedback_msg();
1276            }
1277
1278            if is_status_ready {
1279                let mut acs = ac.lock().unwrap();
1280                acs.handle_status_msg();
1281            }
1282
1283            if is_goal_response_ready {
1284                let mut acs = ac.lock().unwrap();
1285                acs.handle_goal_response();
1286            }
1287
1288            if is_cancel_response_ready {
1289                let mut acs = ac.lock().unwrap();
1290                acs.handle_cancel_response();
1291            }
1292
1293            if is_result_response_ready {
1294                let mut acs = ac.lock().unwrap();
1295                acs.handle_result_response();
1296            }
1297        }
1298
1299        for s in &self.action_servers {
1300            let mut is_goal_request_ready = false;
1301            let mut is_cancel_request_ready = false;
1302            let mut is_result_request_ready = false;
1303            let mut is_goal_expired = false;
1304
1305            let ret = unsafe {
1306                rcl_action_server_wait_set_get_entities_ready(
1307                    &ws,
1308                    s.lock().unwrap().handle(),
1309                    &mut is_goal_request_ready,
1310                    &mut is_cancel_request_ready,
1311                    &mut is_result_request_ready,
1312                    &mut is_goal_expired,
1313                )
1314            };
1315
1316            if ret != RCL_RET_OK as i32 {
1317                continue;
1318            }
1319
1320            if is_goal_request_ready {
1321                let mut acs = s.lock().unwrap();
1322                acs.handle_goal_request(s.clone());
1323            }
1324
1325            if is_cancel_request_ready {
1326                let mut acs = s.lock().unwrap();
1327                acs.handle_cancel_request();
1328            }
1329
1330            if is_result_request_ready {
1331                let mut acs = s.lock().unwrap();
1332                acs.handle_result_request();
1333            }
1334
1335            if is_goal_expired {
1336                let mut acs = s.lock().unwrap();
1337                acs.handle_goal_expired();
1338            }
1339        }
1340
1341        unsafe {
1342            rcl_wait_set_fini(&mut ws);
1343        }
1344    }
1345
1346    /// Returns a map of topic names and type names of the publishers
1347    /// visible to this node.
1348    pub fn get_topic_names_and_types(&self) -> Result<HashMap<String, Vec<String>>> {
1349        let mut tnat = unsafe { rmw_get_zero_initialized_names_and_types() };
1350        let ret = unsafe {
1351            rcl_get_topic_names_and_types(
1352                self.node_handle.as_ref(),
1353                &mut rcutils_get_default_allocator(),
1354                false,
1355                &mut tnat,
1356            )
1357        };
1358        if ret != RCL_RET_OK as i32 {
1359            log::error!("could not get topic names and types {}", ret);
1360            return Err(Error::from_rcl_error(ret));
1361        }
1362
1363        let mut res = HashMap::new();
1364        if tnat.names.data != std::ptr::null_mut() && tnat.types != std::ptr::null_mut() {
1365            let names = unsafe { std::slice::from_raw_parts(tnat.names.data, tnat.names.size) };
1366            let types = unsafe { std::slice::from_raw_parts(tnat.types, tnat.names.size) };
1367
1368            for (n, t) in names.iter().zip(types) {
1369                assert!(!(*n).is_null());
1370                let topic_name = unsafe { CStr::from_ptr(*n).to_str().unwrap().to_owned() };
1371                assert!(!t.data.is_null());
1372                let topic_types = unsafe { std::slice::from_raw_parts(t, t.size) };
1373                let topic_types: Vec<String> = unsafe {
1374                    topic_types
1375                        .iter()
1376                        .map(|t| {
1377                            assert!(*(t.data) != std::ptr::null_mut());
1378                            CStr::from_ptr(*(t.data)).to_str().unwrap().to_owned()
1379                        })
1380                        .collect()
1381                };
1382                res.insert(topic_name, topic_types);
1383            }
1384        }
1385        unsafe {
1386            rmw_names_and_types_fini(&mut tnat);
1387        } // TODO: check return value
1388        Ok(res)
1389    }
1390
1391    pub fn get_publishers_info_by_topic(
1392        &self, topic_name: &str, no_mangle: bool,
1393    ) -> Result<Vec<TopicEndpointInfo>> {
1394        let node = self.node_handle.as_ref();
1395
1396        let topic_c_string =
1397            CString::new(topic_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
1398
1399        let mut allocator = unsafe { rcutils_get_default_allocator() };
1400
1401        let mut info_array: rcl_topic_endpoint_info_array_t =
1402            unsafe { rmw_get_zero_initialized_topic_endpoint_info_array() };
1403
1404        let result = unsafe {
1405            rcl_get_publishers_info_by_topic(
1406                node,
1407                &mut allocator,
1408                topic_c_string.as_ptr(),
1409                no_mangle,
1410                &mut info_array,
1411            )
1412        };
1413
1414        if result != RCL_RET_OK as i32 {
1415            unsafe { rmw_topic_endpoint_info_array_fini(&mut info_array, &mut allocator) };
1416            return Err(Error::from_rcl_error(result));
1417        }
1418
1419        // Convert info_array to Vec<TopicEndpointInfo>
1420        let topic_info_list = convert_info_array_to_vec(&info_array);
1421
1422        let result = unsafe { rmw_topic_endpoint_info_array_fini(&mut info_array, &mut allocator) };
1423
1424        if result != RCL_RET_OK as i32 {
1425            return Err(Error::from_rcl_error(result));
1426        }
1427
1428        Ok(topic_info_list)
1429    }
1430
1431    /// Create a ROS wall timer.
1432    ///
1433    /// Create a ROS timer that is woken up by spin every `period`.
1434    ///
1435    /// This timer uses [`ClockType::SteadyTime`] clock.
1436    pub fn create_wall_timer(&mut self, period: Duration) -> Result<Timer> {
1437        let mut clock = Clock::create(ClockType::SteadyTime)?;
1438        let timer_handle = self.create_timer_helper(&mut clock, period)?;
1439
1440        let (tx, rx) = mpsc::channel::<Duration>(1);
1441
1442        let timer = Timer_ {
1443            timer_handle,
1444            _clock: Some(clock), // The timer owns the clock.
1445            sender: tx,
1446        };
1447        self.timers.push(timer);
1448
1449        let out_timer = Timer { receiver: rx };
1450
1451        Ok(out_timer)
1452    }
1453
1454    /// Create a ROS timer
1455    ///
1456    /// Create a ROS timer that is woken up by spin every `period`.
1457    ///
1458    /// This timer uses node's [`ClockType::RosTime`] clock.
1459    pub fn create_timer(&mut self, period: Duration) -> Result<Timer> {
1460        let mut clock = self.ros_clock.lock().unwrap();
1461        let timer_handle = self.create_timer_helper(&mut clock, period)?;
1462
1463        let (tx, rx) = mpsc::channel::<Duration>(1);
1464
1465        let timer = Timer_ {
1466            timer_handle,
1467            _clock: None, // The timer does not own the clock (the node owns it).
1468            sender: tx,
1469        };
1470        self.timers.push(timer);
1471
1472        let out_timer = Timer { receiver: rx };
1473
1474        Ok(out_timer)
1475    }
1476
1477    fn create_timer_helper(
1478        &self, clock: &mut Clock, period: Duration,
1479    ) -> Result<Pin<Box<RclTimer>>> {
1480        // rcl expects that the address of the rcl_timer_t does not change.
1481        let mut timer_handle = unsafe { Box::pin(RclTimer::new()) };
1482
1483        let mut ctx = self.context.context_handle.lock().unwrap();
1484        let ret = unsafe {
1485            rcl_timer_init(
1486                &mut timer_handle.as_mut().get_unchecked_mut().handle,
1487                clock.clock_handle.as_mut(),
1488                ctx.as_mut(),
1489                period.as_nanos() as i64,
1490                None,
1491                rcutils_get_default_allocator(),
1492            )
1493        };
1494
1495        if ret != RCL_RET_OK as i32 {
1496            log::error!("could not create timer: {}", ret);
1497            return Err(Error::from_rcl_error(ret));
1498        }
1499
1500        Ok(timer_handle)
1501    }
1502
1503    /// Get the ros logger name for this node.
1504    pub fn logger(&self) -> &str {
1505        let ptr = unsafe { rcl_node_get_logger_name(self.node_handle.as_ref()) };
1506        if ptr.is_null() {
1507            return "";
1508        }
1509        let s = unsafe { CStr::from_ptr(ptr) };
1510        s.to_str().unwrap_or("")
1511    }
1512
1513    /// Get TimeSource of the node
1514    ///
1515    /// See: [`TimeSource`]
1516    #[cfg(r2r__rosgraph_msgs__msg__Clock)]
1517    pub fn get_time_source(&self) -> TimeSource {
1518        self.time_source.clone()
1519    }
1520
1521    /// Get ROS clock of the node
1522    ///
1523    /// This is the same clock that is used by ROS timers created in [`Node::create_timer`].
1524    pub fn get_ros_clock(&self) -> Arc<Mutex<Clock>> {
1525        self.ros_clock.clone()
1526    }
1527}
1528
1529#[derive(Debug, Clone, PartialEq)]
1530struct RclTimer {
1531    handle: rcl_timer_t,
1532    _pin: PhantomPinned, // To prevent Unpin implementation
1533}
1534
1535impl RclTimer {
1536    unsafe fn new() -> Self {
1537        Self {
1538            handle: rcl_get_zero_initialized_timer(),
1539            _pin: PhantomPinned,
1540        }
1541    }
1542}
1543
1544struct Timer_ {
1545    timer_handle: Pin<Box<RclTimer>>,
1546    _clock: Option<Clock>, // Some(clock) if the timer owns the clock, just here to be dropped properly later.
1547    sender: mpsc::Sender<Duration>,
1548}
1549
1550impl Timer_ {
1551    fn get_handle(&self) -> *const rcl_timer_t {
1552        &self.timer_handle.handle
1553    }
1554
1555    /// Get mutable pointer to handle
1556    ///
1557    /// SAFETY:
1558    ///     Pointer valid only for modification.
1559    ///     Must not invalidate or replace the timer unless in Drop.
1560    unsafe fn get_handle_mut(&mut self) -> *mut rcl_timer_t {
1561        self.timer_handle
1562            .as_mut()
1563            .map_unchecked_mut(|s| &mut s.handle)
1564            .get_unchecked_mut()
1565    }
1566
1567    fn handle_incoming(&mut self) -> bool {
1568        let mut is_ready = false;
1569        let ret = unsafe { rcl_timer_is_ready(self.get_handle(), &mut is_ready) };
1570        if ret == RCL_RET_OK as i32 && is_ready {
1571            let mut nanos = 0i64;
1572            // todo: error handling
1573            let ret = unsafe { rcl_timer_get_time_since_last_call(self.get_handle(), &mut nanos) };
1574            if ret == RCL_RET_OK as i32 {
1575                let ret = unsafe { rcl_timer_call(self.get_handle_mut()) };
1576                if ret == RCL_RET_OK as i32 {
1577                    if let Err(e) = self.sender.try_send(Duration::from_nanos(nanos as u64)) {
1578                        if e.is_disconnected() {
1579                            // client dropped the timer handle, let's drop our timer as well.
1580                            return true;
1581                        }
1582                        if e.is_full() {
1583                            log::debug!(
1584                                "Warning: timer tick not handled in time - no wakeup will occur"
1585                            );
1586                        }
1587                    }
1588                }
1589            }
1590        }
1591        false
1592    }
1593}
1594
1595impl Drop for Timer_ {
1596    fn drop(&mut self) {
1597        let _ret = unsafe { rcl_timer_fini(self.get_handle_mut()) };
1598    }
1599}
1600
1601/// A ROS timer.
1602pub struct Timer {
1603    receiver: mpsc::Receiver<Duration>,
1604}
1605
1606impl Timer {
1607    /// Completes when the next instant in the interval has been reached.
1608    ///
1609    /// Returns the time passed since the timer was last woken up.
1610    pub async fn tick(&mut self) -> Result<Duration> {
1611        let next = self.receiver.next().await;
1612        if let Some(elapsed) = next {
1613            Ok(elapsed)
1614        } else {
1615            Err(Error::RCL_RET_TIMER_INVALID)
1616        }
1617    }
1618}
1619
1620// Since publishers are temporarily upgraded to owners during the
1621// actual publish but are not the ones that handle cleanup, we simply
1622// wait until there are no other owners in the cleanup procedure. The
1623// next time a publisher wants to publish they will fail because the
1624// value in the Arc has been dropped. Hacky but works.
1625fn wait_until_unwrapped<T>(mut a: Arc<T>) -> T {
1626    loop {
1627        match Arc::try_unwrap(a) {
1628            Ok(b) => return b,
1629            Err(t) => a = t,
1630        }
1631    }
1632}
1633
1634impl Drop for Node {
1635    fn drop(&mut self) {
1636        // fini functions are not thread safe so lock the context.
1637        let _ctx_handle = self.context.context_handle.lock().unwrap();
1638
1639        for s in &mut self.subscribers {
1640            s.destroy(&mut self.node_handle);
1641        }
1642        for c in &mut self.clients {
1643            c.lock().unwrap().destroy(&mut self.node_handle);
1644        }
1645        for s in &mut self.services {
1646            s.lock().unwrap().destroy(&mut self.node_handle);
1647        }
1648        for c in &mut self.action_clients {
1649            c.lock().unwrap().destroy(&mut self.node_handle);
1650        }
1651        for s in &mut self.action_servers {
1652            s.lock().unwrap().destroy(&mut self.node_handle);
1653        }
1654        while let Some(p) = self.pubs.pop() {
1655            let p = wait_until_unwrapped(p);
1656
1657            p.destroy(self.node_handle.as_mut());
1658        }
1659        unsafe {
1660            rcl_node_fini(self.node_handle.as_mut());
1661        }
1662    }
1663}
1664
1665pub trait IsAvailablePollable {
1666    fn register_poll_available(&self, sender: oneshot::Sender<()>) -> Result<()>;
1667}
1668
1669pub struct TopicEndpointInfo {
1670    pub node_name: String,
1671    pub node_namespace: String,
1672    pub topic_type: String,
1673    pub endpoint_gid: [u8; RMW_GID_STORAGE_SIZE as usize],
1674    pub qos_profile: QosProfile,
1675}
1676
1677impl From<rmw_topic_endpoint_info_t> for TopicEndpointInfo {
1678    fn from(info: rmw_topic_endpoint_info_t) -> Self {
1679        // Convert C strings to Rust String
1680        let node_name = unsafe { CStr::from_ptr(info.node_name) }
1681            .to_string_lossy()
1682            .into_owned();
1683        let node_namespace = unsafe { CStr::from_ptr(info.node_namespace) }
1684            .to_string_lossy()
1685            .into_owned();
1686        let topic_type = unsafe { CStr::from_ptr(info.topic_type) }
1687            .to_string_lossy()
1688            .into_owned();
1689
1690        // Copy the endpoint_gid array
1691        let endpoint_gid: [u8; RMW_GID_STORAGE_SIZE as usize] = info.endpoint_gid;
1692
1693        // Convert qos_profile
1694        let qos_profile = QosProfile::from(info.qos_profile); // Adjust this line based on how QosProfile is defined
1695
1696        TopicEndpointInfo {
1697            node_name,
1698            node_namespace,
1699            topic_type,
1700            endpoint_gid,
1701            qos_profile,
1702        }
1703    }
1704}
1705
1706fn convert_info_array_to_vec(
1707    info_array: &rcl_topic_endpoint_info_array_t,
1708) -> Vec<TopicEndpointInfo> {
1709    let mut topic_info_list = Vec::with_capacity(info_array.size);
1710
1711    if info_array.info_array != std::ptr::null_mut() {
1712        unsafe {
1713            let infos = std::slice::from_raw_parts(info_array.info_array, info_array.size);
1714            for &info in infos {
1715                let endpoint_info = TopicEndpointInfo::from(info);
1716                topic_info_list.push(endpoint_info);
1717            }
1718        }
1719    }
1720
1721    topic_info_list
1722}