1use futures::{
2 channel::{mpsc, oneshot},
3 future::{self, join_all, FutureExt, TryFutureExt},
4 stream::{Stream, StreamExt},
5};
6use indexmap::IndexMap;
7use std::{
8 collections::HashMap,
9 ffi::{CStr, CString},
10 future::Future,
11 marker::PhantomPinned,
12 mem::MaybeUninit,
13 pin::Pin,
14 sync::{Arc, Mutex},
15 time::Duration,
16};
17
18use r2r_actions::*;
19use r2r_rcl::*;
20
21#[cfg(r2r__rosgraph_msgs__msg__Clock)]
22use crate::time_source::TimeSource;
23use crate::{
24 action_clients::*,
25 action_clients_untyped::*,
26 action_servers::*,
27 clients::*,
28 clocks::*,
29 context::*,
30 error::*,
31 msg_types::{generated_msgs::rcl_interfaces, *},
32 parameters::*,
33 publishers::*,
34 qos::QosProfile,
35 services::*,
36 subscribers::*,
37};
38
39pub struct Node {
45 context: Context,
46 pub params: Arc<Mutex<IndexMap<String, Parameter>>>,
48 pub(crate) node_handle: Box<rcl_node_t>,
49 pub(crate) subscribers: Vec<Box<dyn Subscriber_>>,
51 services: Vec<Arc<Mutex<dyn Service_>>>,
53 clients: Vec<Arc<Mutex<dyn Client_>>>,
55 action_clients: Vec<Arc<Mutex<dyn ActionClient_>>>,
57 action_servers: Vec<Arc<Mutex<dyn ActionServer_>>>,
59 timers: Vec<Timer_>,
61 pubs: Vec<Arc<Publisher_>>,
63 ros_clock: Arc<Mutex<Clock>>,
65 #[cfg(r2r__rosgraph_msgs__msg__Clock)]
67 time_source: TimeSource,
68}
69
70unsafe impl Send for Node {}
71
72impl Node {
73 pub fn name(&self) -> Result<String> {
75 let cstr = unsafe { rcl_node_get_name(self.node_handle.as_ref()) };
76 if cstr.is_null() {
77 return Err(Error::RCL_RET_NODE_INVALID);
78 }
79 let s = unsafe { CStr::from_ptr(cstr) };
80 Ok(s.to_str().unwrap_or("").to_owned())
81 }
82
83 pub fn fully_qualified_name(&self) -> Result<String> {
85 let cstr = unsafe { rcl_node_get_fully_qualified_name(self.node_handle.as_ref()) };
86 if cstr.is_null() {
87 return Err(Error::RCL_RET_NODE_INVALID);
88 }
89 let s = unsafe { CStr::from_ptr(cstr) };
90 Ok(s.to_str().unwrap_or("").to_owned())
91 }
92
93 pub fn namespace(&self) -> Result<String> {
95 let cstr = unsafe { rcl_node_get_namespace(self.node_handle.as_ref()) };
96 if cstr.is_null() {
97 return Err(Error::RCL_RET_NODE_INVALID);
98 }
99 let s = unsafe { CStr::from_ptr(cstr) };
100 Ok(s.to_str().unwrap_or("").to_owned())
101 }
102
103 fn load_params(&mut self) -> Result<()> {
104 let ctx = self.context.context_handle.lock().unwrap();
105 let mut params: Box<*mut rcl_params_t> = Box::new(std::ptr::null_mut());
106
107 let ret =
108 unsafe { rcl_arguments_get_param_overrides(&ctx.global_arguments, params.as_mut()) };
109 if ret != RCL_RET_OK as i32 {
110 log::error!("could not read parameters: {}", ret);
111 return Err(Error::from_rcl_error(ret));
112 }
113
114 if params.is_null() {
115 return Ok(());
116 }
117
118 unsafe {
119 if (*(*params.as_ref())).node_names == std::ptr::null_mut()
120 || (*(*params.as_ref())).params == std::ptr::null_mut()
121 {
122 rcl_yaml_node_struct_fini(*params);
123 return Ok(());
124 }
125 }
126
127 let node_names = unsafe {
128 std::slice::from_raw_parts(
129 (*(*params.as_ref())).node_names,
130 (*(*params.as_ref())).num_nodes,
131 )
132 };
133
134 let node_params = unsafe {
135 std::slice::from_raw_parts(
136 (*(*params.as_ref())).params,
137 (*(*params.as_ref())).num_nodes,
138 )
139 };
140
141 let qualified_name = self.fully_qualified_name()?;
142 let name = self.name()?;
143
144 for (nn, np) in node_names.iter().zip(node_params) {
145 let node_name_cstr = unsafe { CStr::from_ptr(*nn) };
146 let node_name = node_name_cstr.to_str().unwrap_or("");
147
148 if !(node_name == "/**"
152 || node_name == "**"
153 || qualified_name == node_name
154 || name == node_name)
155 {
156 continue;
157 }
158
159 let mut params = self.params.lock().unwrap();
161 if np.parameter_names != std::ptr::null_mut()
162 && np.parameter_values != std::ptr::null_mut()
163 {
164 let param_names =
165 unsafe { std::slice::from_raw_parts(np.parameter_names, np.num_params) };
166
167 let param_values =
168 unsafe { std::slice::from_raw_parts(np.parameter_values, np.num_params) };
169
170 for (s, v) in param_names.iter().zip(param_values) {
171 let s = unsafe { CStr::from_ptr(*s) };
172 let key = s.to_str().unwrap_or("");
173 let val = ParameterValue::from_rcl(v);
174 params.insert(key.to_owned(), Parameter::new(val));
175 }
176 }
177 }
178
179 unsafe { rcl_yaml_node_struct_fini(*params) };
180 Ok(())
181 }
182
183 pub fn create(ctx: Context, name: &str, namespace: &str) -> Result<Node> {
185 let (res, node_handle) = {
186 let mut ctx_handle = ctx.context_handle.lock().unwrap();
187
188 let c_node_name = CString::new(name).unwrap();
189 let c_node_ns = CString::new(namespace).unwrap();
190 let mut node_handle: Box<rcl_node_t> =
191 unsafe { Box::new(rcl_get_zero_initialized_node()) };
192 let res = unsafe {
193 let node_options = rcl_node_get_default_options();
194 rcl_node_init(
195 node_handle.as_mut(),
196 c_node_name.as_ptr(),
197 c_node_ns.as_ptr(),
198 ctx_handle.as_mut(),
199 &node_options as *const _,
200 )
201 };
202 (res, node_handle)
203 };
204
205 if res == RCL_RET_OK as i32 {
206 let ros_clock = Arc::new(Mutex::new(Clock::create(ClockType::RosTime)?));
207 #[cfg(r2r__rosgraph_msgs__msg__Clock)]
208 let time_source = {
209 let time_source = TimeSource::new();
210 time_source.attach_ros_clock(Arc::downgrade(&ros_clock))?;
211 time_source
212 };
213
214 let mut node = Node {
215 params: Arc::new(Mutex::new(IndexMap::new())),
216 context: ctx,
217 node_handle,
218 subscribers: Vec::new(),
219 services: Vec::new(),
220 clients: Vec::new(),
221 action_clients: Vec::new(),
222 action_servers: Vec::new(),
223 timers: Vec::new(),
224 pubs: Vec::new(),
225 ros_clock,
226 #[cfg(r2r__rosgraph_msgs__msg__Clock)]
227 time_source,
228 };
229 node.load_params()?;
230 Ok(node)
231 } else {
232 log::error!("could not create node{}", res);
233 Err(Error::from_rcl_error(res))
234 }
235 }
236
237 pub fn make_parameter_handler(
246 &mut self,
247 ) -> Result<(impl Future<Output = ()> + Send, impl Stream<Item = (String, ParameterValue)>)>
248 {
249 self.make_parameter_handler_internal(None)
250 }
251
252 pub fn make_derived_parameter_handler(
269 &mut self, params_struct: Arc<Mutex<dyn RosParams + Send>>,
270 ) -> Result<(impl Future<Output = ()> + Send, impl Stream<Item = (String, ParameterValue)>)>
271 {
272 self.make_parameter_handler_internal(Some(params_struct))
273 }
274
275 fn make_parameter_handler_internal(
276 &mut self, params_struct: Option<Arc<Mutex<dyn RosParams + Send>>>,
277 ) -> Result<(impl Future<Output = ()> + Send, impl Stream<Item = (String, ParameterValue)>)>
278 {
279 if let Some(ps) = ¶ms_struct {
280 ps.lock()
282 .unwrap()
283 .register_parameters("", None, &mut self.params.lock().unwrap())?;
284 }
285 let mut handlers: Vec<std::pin::Pin<Box<dyn Future<Output = ()> + Send>>> = Vec::new();
286
287 let (mut set_event_tx, event_rx) = mpsc::channel::<(String, ParameterValue)>(10);
288 let mut set_atomically_event_tx = set_event_tx.clone();
289
290 let node_name = self.name()?;
291
292 let set_params_request_stream = self
294 .create_service::<rcl_interfaces::srv::SetParameters::Service>(
295 &format!("{}/set_parameters", node_name),
296 QosProfile::default(),
297 )?;
298
299 let params = self.params.clone();
300 let params_struct_clone = params_struct.clone();
301 let set_params_future = set_params_request_stream.for_each(
302 move |req: ServiceRequest<rcl_interfaces::srv::SetParameters::Service>| {
303 let mut result = rcl_interfaces::srv::SetParameters::Response::default();
304 for p in &req.message.parameters {
305 let val = ParameterValue::from_parameter_value_msg(p.value.clone());
306 let changed = params
307 .lock()
308 .unwrap()
309 .get(&p.name)
310 .map(|v| v.value != val)
311 .unwrap_or(true); let r = if let Some(ps) = ¶ms_struct_clone {
313 let result = ps.lock().unwrap().set_parameter(&p.name, &val);
315 if result.is_ok() {
316 params
318 .lock()
319 .unwrap()
320 .entry(p.name.clone())
321 .and_modify(|p| p.value = val.clone());
322 }
323 rcl_interfaces::msg::SetParametersResult {
324 successful: result.is_ok(),
325 reason: result.err().map_or("".into(), |e| e.to_string()),
326 }
327 } else {
328 params
330 .lock()
331 .unwrap()
332 .entry(p.name.clone())
333 .and_modify(|p| p.value = val.clone())
334 .or_insert(Parameter::new(val.clone()));
335 rcl_interfaces::msg::SetParametersResult {
336 successful: true,
337 reason: "".into(),
338 }
339 };
340 if changed && r.successful {
342 if let Err(e) = set_event_tx.try_send((p.name.clone(), val)) {
343 log::debug!("Warning: could not send parameter event ({}).", e);
344 }
345 }
346 result.results.push(r);
347 }
348 req.respond(result)
349 .expect("could not send reply to set parameter request");
350 future::ready(())
351 },
352 );
353 handlers.push(Box::pin(set_params_future));
354
355 let get_params_request_stream = self
357 .create_service::<rcl_interfaces::srv::GetParameters::Service>(
358 &format!("{}/get_parameters", node_name),
359 QosProfile::default(),
360 )?;
361
362 let params = self.params.clone();
363 let params_struct_clone = params_struct.clone();
364 let get_params_future = get_params_request_stream.for_each(
365 move |req: ServiceRequest<rcl_interfaces::srv::GetParameters::Service>| {
366 let params = params.lock().unwrap();
367 let values = req
368 .message
369 .names
370 .iter()
371 .map(|n| {
372 if let Some(ps) = ¶ms_struct_clone {
374 if let Ok(value) = ps.lock().unwrap().get_parameter(n) {
375 return value;
376 }
377 }
378 match params.get(n) {
380 Some(v) => v.value.clone(),
381 None => ParameterValue::NotSet,
382 }
383 })
384 .map(|v| v.into_parameter_value_msg())
385 .collect::<Vec<rcl_interfaces::msg::ParameterValue>>();
386
387 let result = rcl_interfaces::srv::GetParameters::Response { values };
388 req.respond(result)
389 .expect("could not send reply to set parameter request");
390 future::ready(())
391 },
392 );
393
394 handlers.push(Box::pin(get_params_future));
395
396 use rcl_interfaces::srv::ListParameters;
398 let list_params_request_stream = self.create_service::<ListParameters::Service>(
399 &format!("{}/list_parameters", node_name),
400 QosProfile::default(),
401 )?;
402
403 let params = self.params.clone();
404 let list_params_future = list_params_request_stream.for_each(
405 move |req: ServiceRequest<ListParameters::Service>| {
406 Self::handle_list_parameters(req, ¶ms)
407 },
408 );
409
410 handlers.push(Box::pin(list_params_future));
411
412 use rcl_interfaces::srv::DescribeParameters;
414 let desc_params_request_stream = self.create_service::<DescribeParameters::Service>(
415 &format!("{node_name}/describe_parameters"),
416 QosProfile::default(),
417 )?;
418
419 let params = self.params.clone();
420 let desc_params_future = desc_params_request_stream.for_each(
421 move |req: ServiceRequest<DescribeParameters::Service>| {
422 Self::handle_desc_parameters(req, ¶ms)
423 },
424 );
425
426 handlers.push(Box::pin(desc_params_future));
427
428 use rcl_interfaces::srv::GetParameterTypes;
430 let get_param_types_request_stream = self.create_service::<GetParameterTypes::Service>(
431 &format!("{node_name}/get_parameter_types"),
432 QosProfile::default(),
433 )?;
434
435 let params = self.params.clone();
436 let get_param_types_future = get_param_types_request_stream.for_each(
437 move |req: ServiceRequest<GetParameterTypes::Service>| {
438 let params = params.lock().unwrap();
439 let types = req
440 .message
441 .names
442 .iter()
443 .map(|name| match params.get(name) {
444 Some(param) => param.value.into_parameter_type(),
445 None => rcl_interfaces::msg::ParameterType::PARAMETER_NOT_SET as u8,
446 })
447 .collect();
448 req.respond(GetParameterTypes::Response { types })
449 .expect("could not send reply to get parameter types request");
450 future::ready(())
451 },
452 );
453
454 handlers.push(Box::pin(get_param_types_future));
455
456 let set_params_atomically_request_stream =
458 self.create_service::<rcl_interfaces::srv::SetParametersAtomically::Service>(
459 &format!("{}/set_parameters_atomically", node_name),
460 QosProfile::default(),
461 )?;
462
463 let params = self.params.clone();
464 let params_struct_clone = params_struct.clone();
465 let set_params_atomically_future = set_params_atomically_request_stream.for_each(
466 move |req: ServiceRequest<rcl_interfaces::srv::SetParametersAtomically::Service>| {
467 let mut result = rcl_interfaces::srv::SetParametersAtomically::Response::default();
468 result.result.successful = true;
469 if let Some(ps) = ¶ms_struct_clone {
470 for p in &req.message.parameters {
471 let val = ParameterValue::from_parameter_value_msg(p.value.clone());
472 if let Err(e) = ps.lock().unwrap().check_parameter(&p.name, &val) {
473 result.result.successful = false;
474 result.result.reason = format!("Can't set parameter {}: {}", p.name, e);
475 break;
476 }
477 }
478 }
479 if result.result.successful {
480 for p in &req.message.parameters {
482 let val = ParameterValue::from_parameter_value_msg(p.value.clone());
483 let changed = params
484 .lock()
485 .unwrap()
486 .get(&p.name)
487 .map(|v| v.value != val)
488 .unwrap_or(true); let r = if let Some(ps) = ¶ms_struct_clone {
490 let result = ps.lock().unwrap().set_parameter(&p.name, &val);
492 if result.is_ok() {
493 params
495 .lock()
496 .unwrap()
497 .entry(p.name.clone())
498 .and_modify(|p| p.value = val.clone());
499 }
500 rcl_interfaces::msg::SetParametersResult {
501 successful: result.is_ok(),
502 reason: result.err().map_or("".into(), |e| e.to_string()),
503 }
504 } else {
505 params
507 .lock()
508 .unwrap()
509 .entry(p.name.clone())
510 .and_modify(|p| p.value = val.clone())
511 .or_insert(Parameter::new(val.clone()));
512 rcl_interfaces::msg::SetParametersResult {
513 successful: true,
514 reason: "".into(),
515 }
516 };
517 if changed && r.successful {
519 if let Err(e) = set_atomically_event_tx.try_send((p.name.clone(), val)) {
520 log::debug!("Warning: could not send parameter event ({}).", e);
521 }
522 }
523 }
524 }
525 req.respond(result)
526 .expect("could not send reply to set parameter request");
527 future::ready(())
528 },
529 );
530 handlers.push(Box::pin(set_params_atomically_future));
531
532 #[cfg(r2r__rosgraph_msgs__msg__Clock)]
533 {
534 let use_sim_time = {
536 let params = self.params.lock().unwrap();
537 params
538 .get("use_sim_time")
539 .and_then(|param| {
540 if let ParameterValue::Bool(val) = param.value {
541 Some(val)
542 } else {
543 log::error!("Parameter use_sim_time is not bool. Assuming false");
544 None
545 }
546 })
547 .unwrap_or(false)
548 };
549 if use_sim_time {
550 let ts = self.time_source.clone();
551 ts.enable_sim_time(self)?;
552 }
553 }
554
555 Ok((join_all(handlers).map(|_| ()), event_rx))
557 }
558
559 fn handle_list_parameters(
560 req: ServiceRequest<rcl_interfaces::srv::ListParameters::Service>,
561 params: &Arc<Mutex<IndexMap<String, Parameter>>>,
562 ) -> future::Ready<()> {
563 use rcl_interfaces::srv::ListParameters;
564
565 let depth = req.message.depth;
566 let prefixes = &req.message.prefixes;
567 let separator = '.';
568 let params = params.lock().unwrap();
569 let mut result = rcl_interfaces::msg::ListParametersResult {
570 names: vec![],
571 prefixes: vec![],
572 };
573 for (name, _) in params.iter().filter(|(name, _)| {
574 let get_all = prefixes.is_empty()
575 && ((depth == ListParameters::Request::DEPTH_RECURSIVE as u64)
576 || name.matches(separator).count() < depth as usize);
577 let prefix_matches = prefixes.iter().any(|prefix| {
578 if *name == prefix {
579 return true;
580 } else if name.starts_with(&format!("{prefix}{separator}")) {
581 let substr = &name[prefix.len()..];
582 return (depth == ListParameters::Request::DEPTH_RECURSIVE as u64)
583 || substr.matches(separator).count() < depth as usize;
584 }
585 false
586 });
587 get_all || prefix_matches
588 }) {
589 result.names.push(name.clone());
590 if let Some(last_separator) = name.rfind(separator) {
591 let prefix = &name[0..last_separator];
592 if result.prefixes.iter().all(|p| p != prefix) {
593 result.prefixes.push(prefix.to_string());
594 }
595 }
596 }
597 req.respond(ListParameters::Response { result })
598 .expect("could not send reply to list parameter request");
599 future::ready(())
600 }
601
602 fn handle_desc_parameters(
603 req: ServiceRequest<rcl_interfaces::srv::DescribeParameters::Service>,
604 params: &Arc<Mutex<IndexMap<String, Parameter>>>,
605 ) -> future::Ready<()> {
606 use rcl_interfaces::{msg::ParameterDescriptor, srv::DescribeParameters};
607 let mut descriptors = Vec::<ParameterDescriptor>::new();
608 let params = params.lock().unwrap();
609 for name in &req.message.names {
610 let default = Parameter::empty();
611 let param = params.get(name).unwrap_or(&default);
612 descriptors.push(ParameterDescriptor {
613 name: name.clone(),
614 type_: param.value.into_parameter_type(),
615 description: param.description.to_string(),
616 ..Default::default()
617 });
618 }
619 req.respond(DescribeParameters::Response { descriptors })
620 .expect("could not send reply to describe parameters request");
621 future::ready(())
622 }
623
624 pub fn get_parameter<T>(&self, name: &str) -> Result<T>
626 where
627 ParameterValue: TryInto<T, Error = WrongParameterType>,
628 {
629 let params = self.params.lock().unwrap();
630 let value = params
631 .get(name)
632 .map(|parameter| parameter.value.clone())
633 .unwrap_or(ParameterValue::NotSet);
634
635 let value: T =
636 value
637 .try_into()
638 .map_err(|error: WrongParameterType| Error::ParameterWrongType {
639 name: name.to_string(),
640 expected_type: error.expected_type_name,
641 actual_type: error.actual_type_name,
642 })?;
643
644 Ok(value)
645 }
646
647 pub fn subscribe<T: 'static>(
651 &mut self, topic: &str, qos_profile: QosProfile,
652 ) -> Result<impl Stream<Item = T> + Unpin>
653 where
654 T: WrappedTypesupport,
655 {
656 let subscription_handle =
657 create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?;
658 let (sender, receiver) = mpsc::channel::<T>(10);
659
660 let ws = TypedSubscriber {
661 rcl_handle: subscription_handle,
662 sender,
663 };
664 self.subscribers.push(Box::new(ws));
665 Ok(receiver)
666 }
667
668 pub fn subscribe_native<T: 'static>(
672 &mut self, topic: &str, qos_profile: QosProfile,
673 ) -> Result<impl Stream<Item = WrappedNativeMsg<T>> + Unpin>
674 where
675 T: WrappedTypesupport,
676 {
677 let subscription_handle =
678 create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?;
679 let (sender, receiver) = mpsc::channel::<WrappedNativeMsg<T>>(10);
680
681 let ws = NativeSubscriber {
682 rcl_handle: subscription_handle,
683 sender,
684 };
685 self.subscribers.push(Box::new(ws));
686 Ok(receiver)
687 }
688
689 pub fn subscribe_untyped(
694 &mut self, topic: &str, topic_type: &str, qos_profile: QosProfile,
695 ) -> Result<impl Stream<Item = Result<serde_json::Value>> + Unpin> {
696 let msg = WrappedNativeMsgUntyped::new_from(topic_type)?;
697 let subscription_handle =
698 create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts, qos_profile)?;
699 let (sender, receiver) = mpsc::channel::<Result<serde_json::Value>>(10);
700
701 let ws = UntypedSubscriber {
702 rcl_handle: subscription_handle,
703 topic_type: topic_type.to_string(),
704 sender,
705 };
706 self.subscribers.push(Box::new(ws));
707 Ok(receiver)
708 }
709
710 pub fn subscribe_raw(
715 &mut self, topic: &str, topic_type: &str, qos_profile: QosProfile,
716 ) -> Result<impl Stream<Item = Vec<u8>> + Unpin> {
717 let msg = WrappedNativeMsgUntyped::new_from(topic_type)?;
725
726 let mut msg_buf: rcl_serialized_message_t =
729 unsafe { rcutils_get_zero_initialized_uint8_array() };
730 let ret = unsafe {
731 rcutils_uint8_array_init(
732 &mut msg_buf as *mut rcl_serialized_message_t,
733 0,
734 &rcutils_get_default_allocator(),
735 )
736 };
737
738 if ret != RCL_RET_OK as i32 {
739 return Err(Error::from_rcl_error(ret));
740 }
741
742 let subscription_handle =
743 create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts, qos_profile)?;
744 let (sender, receiver) = mpsc::channel::<Vec<u8>>(10);
745
746 let ws = RawSubscriber {
747 rcl_handle: subscription_handle,
748 msg_buf,
749 sender,
750 };
751 self.subscribers.push(Box::new(ws));
752 Ok(receiver)
753 }
754
755 pub fn create_service<T: 'static>(
760 &mut self, service_name: &str, qos_profile: QosProfile,
761 ) -> Result<impl Stream<Item = ServiceRequest<T>> + Unpin>
762 where
763 T: WrappedServiceTypeSupport,
764 {
765 let service_handle = create_service_helper(
766 self.node_handle.as_mut(),
767 service_name,
768 T::get_ts(),
769 qos_profile,
770 )?;
771 let (sender, receiver) = mpsc::channel::<ServiceRequest<T>>(10);
772
773 let ws = TypedService::<T> {
774 rcl_handle: service_handle,
775 sender,
776 };
777
778 self.services.push(Arc::new(Mutex::new(ws)));
779 Ok(receiver)
780 }
781
782 pub fn create_client<T: 'static>(
786 &mut self, service_name: &str, qos_profile: QosProfile,
787 ) -> Result<Client<T>>
788 where
789 T: WrappedServiceTypeSupport,
790 {
791 let client_handle = create_client_helper(
792 self.node_handle.as_mut(),
793 service_name,
794 T::get_ts(),
795 qos_profile,
796 )?;
797 let ws = TypedClient::<T> {
798 rcl_handle: client_handle,
799 response_channels: Vec::new(),
800 poll_available_channels: Vec::new(),
801 };
802
803 let client_arc = Arc::new(Mutex::new(ws));
804 let c = make_client(Arc::downgrade(&client_arc));
805 self.clients.push(client_arc);
806 Ok(c)
807 }
808
809 pub fn create_client_untyped(
816 &mut self, service_name: &str, service_type: &str, qos_profile: QosProfile,
817 ) -> Result<ClientUntyped> {
818 let service_type = UntypedServiceSupport::new_from(service_type)?;
819 let client_handle = create_client_helper(
820 self.node_handle.as_mut(),
821 service_name,
822 service_type.ts,
823 qos_profile,
824 )?;
825 let client = UntypedClient_ {
826 service_type,
827 rcl_handle: client_handle,
828 response_channels: Vec::new(),
829 poll_available_channels: Vec::new(),
830 };
831
832 let client_arc = Arc::new(Mutex::new(client));
833 let c = make_untyped_client(Arc::downgrade(&client_arc));
834 self.clients.push(client_arc);
835 Ok(c)
836 }
837
838 pub fn is_available(
846 client: &dyn IsAvailablePollable,
847 ) -> Result<impl Future<Output = Result<()>>> {
848 let (sender, receiver) = oneshot::channel();
849 client.register_poll_available(sender)?;
850 Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID))
851 }
852
853 pub fn create_action_client<T: 'static>(&mut self, action_name: &str) -> Result<ActionClient<T>>
857 where
858 T: WrappedActionTypeSupport,
859 {
860 let client_handle =
861 create_action_client_helper(self.node_handle.as_mut(), action_name, T::get_ts())?;
862 let client = WrappedActionClient::<T> {
863 rcl_handle: client_handle,
864 goal_response_channels: Vec::new(),
865 cancel_response_channels: Vec::new(),
866 feedback_senders: Vec::new(),
867 result_senders: Vec::new(),
868 result_requests: Vec::new(),
869 goal_status: HashMap::new(),
870 poll_available_channels: Vec::new(),
871 };
872
873 let client_arc = Arc::new(Mutex::new(client));
874 self.action_clients.push(client_arc.clone());
875 let c = make_action_client(Arc::downgrade(&client_arc));
876 Ok(c)
877 }
878
879 pub fn create_action_client_untyped(
886 &mut self, action_name: &str, action_type: &str,
887 ) -> Result<ActionClientUntyped> {
888 let action_type_support = UntypedActionSupport::new_from(action_type)?;
889 let client_handle = create_action_client_helper(
890 self.node_handle.as_mut(),
891 action_name,
892 action_type_support.ts,
893 )?;
894 let client = WrappedActionClientUntyped {
895 action_type_support,
896 rcl_handle: client_handle,
897 goal_response_channels: Vec::new(),
898 cancel_response_channels: Vec::new(),
899 feedback_senders: Vec::new(),
900 result_senders: Vec::new(),
901 result_requests: Vec::new(),
902 goal_status: HashMap::new(),
903 poll_available_channels: Vec::new(),
904 };
905
906 let client_arc = Arc::new(Mutex::new(client));
907 self.action_clients.push(client_arc.clone());
908 let c = make_action_client_untyped(Arc::downgrade(&client_arc));
909 Ok(c)
910 }
911
912 pub fn create_action_server<T: 'static>(
917 &mut self, action_name: &str,
918 ) -> Result<impl Stream<Item = ActionServerGoalRequest<T>> + Unpin>
919 where
920 T: WrappedActionTypeSupport,
921 {
922 let mut clock_handle = MaybeUninit::<rcl_clock_t>::uninit();
924 let ret = unsafe {
925 rcl_ros_clock_init(clock_handle.as_mut_ptr(), &mut rcutils_get_default_allocator())
926 };
927 if ret != RCL_RET_OK as i32 {
928 log::error!("could not create steady clock: {}", ret);
929 return Err(Error::from_rcl_error(ret));
930 }
931 let mut clock_handle = Box::new(unsafe { clock_handle.assume_init() });
932
933 let (goal_request_sender, goal_request_receiver) =
934 mpsc::channel::<ActionServerGoalRequest<T>>(10);
935
936 let server_handle = create_action_server_helper(
937 self.node_handle.as_mut(),
938 action_name,
939 clock_handle.as_mut(),
940 T::get_ts(),
941 )?;
942 let server = WrappedActionServer::<T> {
943 rcl_handle: server_handle,
944 clock_handle,
945 goal_request_sender,
946 active_cancel_requests: Vec::new(),
947 cancel_senders: HashMap::new(),
948 goals: HashMap::new(),
949 result_msgs: HashMap::new(),
950 result_requests: HashMap::new(),
951 };
952
953 let server_arc = Arc::new(Mutex::new(server));
954 self.action_servers.push(server_arc);
955 Ok(goal_request_receiver)
956 }
957
958 pub fn create_publisher<T>(
960 &mut self, topic: &str, qos_profile: QosProfile,
961 ) -> Result<Publisher<T>>
962 where
963 T: WrappedTypesupport,
964 {
965 let publisher_handle =
966 create_publisher_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?;
967 let arc = Arc::new(publisher_handle);
968 let p = make_publisher(Arc::downgrade(&arc));
969 self.pubs.push(arc);
970 Ok(p)
971 }
972
973 pub fn create_publisher_untyped(
977 &mut self, topic: &str, topic_type: &str, qos_profile: QosProfile,
978 ) -> Result<PublisherUntyped> {
979 let dummy = WrappedNativeMsgUntyped::new_from(topic_type)?;
980 let publisher_handle =
981 create_publisher_helper(self.node_handle.as_mut(), topic, dummy.ts, qos_profile)?;
982 let arc = Arc::new(publisher_handle);
983 let p = make_publisher_untyped(Arc::downgrade(&arc), topic_type.to_owned());
984 self.pubs.push(arc);
985 Ok(p)
986 }
987
988 pub fn destroy_publisher<T: WrappedTypesupport>(&mut self, p: Publisher<T>) {
990 if let Some(handle) = p.handle.upgrade() {
991 self.pubs.iter().position(|p| Arc::ptr_eq(p, &handle))
993 .map(|i| self.pubs.swap_remove(i));
994
995 let handle = wait_until_unwrapped(handle);
996 handle.destroy(self.node_handle.as_mut());
997 }
998 }
999
1000 pub fn destroy_publisher_untyped(&mut self, p: PublisherUntyped) {
1002 if let Some(handle) = p.handle.upgrade() {
1003 self.pubs.iter().position(|p| Arc::ptr_eq(p, &handle))
1005 .map(|i| self.pubs.swap_remove(i));
1006
1007 let handle = wait_until_unwrapped(handle);
1008 handle.destroy(self.node_handle.as_mut());
1009 }
1010 }
1011
1012 pub fn spin_once(&mut self, timeout: Duration) {
1022 for a in &mut self.action_servers {
1024 a.lock().unwrap().send_completed_cancel_requests();
1025 }
1026
1027 for c in &mut self.clients {
1029 c.lock().unwrap().poll_available(self.node_handle.as_mut());
1030 }
1031
1032 for c in &mut self.action_clients {
1033 c.lock().unwrap().poll_available(self.node_handle.as_mut());
1034 }
1035
1036 for p in &self.pubs {
1037 p.poll_has_inter_process_subscribers();
1038 }
1039
1040 let timeout = timeout.as_nanos() as i64;
1041 let mut ws = unsafe { rcl_get_zero_initialized_wait_set() };
1042
1043 let mut total_action_subs = 0;
1051 let mut total_action_clients = 0;
1052 for c in &self.action_clients {
1053 let mut num_subs = 0;
1054 let mut num_gc = 0;
1055 let mut num_timers = 0;
1056 let mut num_clients = 0;
1057 let mut num_services = 0;
1058
1059 action_client_get_num_waits(
1060 c.lock().unwrap().handle(),
1061 &mut num_subs,
1062 &mut num_gc,
1063 &mut num_timers,
1064 &mut num_clients,
1065 &mut num_services,
1066 )
1067 .expect("could not get action client wait sets");
1068 assert_eq!(num_subs, 2);
1070 assert_eq!(num_clients, 3);
1071 assert_eq!(num_gc, 0);
1072 assert_eq!(num_timers, 0);
1073 assert_eq!(num_services, 0);
1074
1075 total_action_subs += num_subs;
1076 total_action_clients += num_clients;
1077 }
1078
1079 let mut total_action_timers = 0;
1081 let mut total_action_services = 0;
1082 for s in &self.action_servers {
1083 let mut num_subs = 0;
1084 let mut num_gc = 0;
1085 let mut num_timers = 0;
1086 let mut num_clients = 0;
1087 let mut num_services = 0;
1088
1089 action_server_get_num_waits(
1090 s.lock().unwrap().handle(),
1091 &mut num_subs,
1092 &mut num_gc,
1093 &mut num_timers,
1094 &mut num_clients,
1095 &mut num_services,
1096 )
1097 .expect("could not get action client wait sets");
1098 assert_eq!(num_subs, 0);
1100 assert_eq!(num_clients, 0);
1101 assert_eq!(num_gc, 0);
1102 assert_eq!(num_timers, 1);
1103 assert_eq!(num_services, 3);
1104
1105 total_action_timers += num_timers;
1106 total_action_services += num_services;
1107 }
1108
1109 {
1110 let mut ctx = self.context.context_handle.lock().unwrap();
1111
1112 unsafe {
1113 rcl_wait_set_init(
1114 &mut ws,
1115 self.subscribers.len() + total_action_subs,
1116 0,
1117 self.timers.len() + total_action_timers,
1118 self.clients.len() + total_action_clients,
1119 self.services.len() + total_action_services,
1120 0,
1121 ctx.as_mut(),
1122 rcutils_get_default_allocator(),
1123 );
1124 }
1125 }
1126 unsafe {
1127 rcl_wait_set_clear(&mut ws);
1128 }
1129
1130 for s in &self.subscribers {
1131 unsafe {
1132 rcl_wait_set_add_subscription(&mut ws, s.handle(), std::ptr::null_mut());
1133 }
1134 }
1135
1136 for s in &self.timers {
1137 unsafe {
1138 rcl_wait_set_add_timer(&mut ws, s.get_handle(), std::ptr::null_mut());
1139 }
1140 }
1141
1142 for s in &self.clients {
1143 unsafe {
1144 rcl_wait_set_add_client(&mut ws, s.lock().unwrap().handle(), std::ptr::null_mut());
1145 }
1146 }
1147
1148 for s in &self.services {
1149 unsafe {
1150 rcl_wait_set_add_service(&mut ws, s.lock().unwrap().handle(), std::ptr::null_mut());
1151 }
1152 }
1153
1154 for ac in &self.action_clients {
1160 unsafe {
1161 rcl_action_wait_set_add_action_client(
1162 &mut ws,
1163 ac.lock().unwrap().handle(),
1164 std::ptr::null_mut(),
1165 std::ptr::null_mut(),
1166 );
1167 }
1168 }
1169 for acs in &self.action_servers {
1170 unsafe {
1171 rcl_action_wait_set_add_action_server(
1172 &mut ws,
1173 acs.lock().unwrap().handle(),
1174 std::ptr::null_mut(),
1175 );
1176 }
1177 }
1178
1179 let ret = unsafe { rcl_wait(&mut ws, timeout) };
1180
1181 if ret == RCL_RET_TIMEOUT as i32 {
1182 unsafe {
1183 rcl_wait_set_fini(&mut ws);
1184 }
1185 return;
1186 }
1187
1188 let mut subs_to_remove = vec![];
1189 if ws.subscriptions != std::ptr::null_mut() {
1190 let ws_subs =
1191 unsafe { std::slice::from_raw_parts(ws.subscriptions, self.subscribers.len()) };
1192 for (s, ws_s) in self.subscribers.iter_mut().zip(ws_subs) {
1193 if ws_s != &std::ptr::null() {
1194 let dropped = s.handle_incoming();
1195 if dropped {
1196 s.destroy(&mut self.node_handle);
1197 subs_to_remove.push(*s.handle());
1198 }
1199 }
1200 }
1201 }
1202 self.subscribers
1203 .retain(|s| !subs_to_remove.contains(s.handle()));
1204
1205 let mut timers_to_remove = vec![];
1206 if ws.timers != std::ptr::null_mut() {
1207 let ws_timers = unsafe { std::slice::from_raw_parts(ws.timers, self.timers.len()) };
1208 for (s, ws_s) in self.timers.iter_mut().zip(ws_timers) {
1209 if ws_s != &std::ptr::null() {
1210 let dropped = s.handle_incoming();
1212 if dropped {
1213 timers_to_remove.push((*s.timer_handle).to_owned());
1214 }
1215 }
1216 }
1217 }
1218 self.timers
1220 .retain(|t| !timers_to_remove.contains(&*t.timer_handle));
1221
1222 if ws.clients != std::ptr::null_mut() {
1223 let ws_clients = unsafe { std::slice::from_raw_parts(ws.clients, self.clients.len()) };
1224 for (s, ws_s) in self.clients.iter_mut().zip(ws_clients) {
1225 if ws_s != &std::ptr::null() {
1226 let mut s = s.lock().unwrap();
1227 s.handle_response();
1228 }
1229 }
1230 }
1231
1232 let mut services_to_remove = vec![];
1233 if ws.services != std::ptr::null_mut() {
1234 let ws_services =
1235 unsafe { std::slice::from_raw_parts(ws.services, self.services.len()) };
1236 for (s, ws_s) in self.services.iter_mut().zip(ws_services) {
1237 if ws_s != &std::ptr::null() {
1238 let mut service = s.lock().unwrap();
1239 let dropped = service.handle_request(s.clone());
1240 if dropped {
1241 service.destroy(&mut self.node_handle);
1242 services_to_remove.push(*service.handle());
1243 }
1244 }
1245 }
1246 }
1247 self.services
1248 .retain(|s| !services_to_remove.contains(s.lock().unwrap().handle()));
1249
1250 for ac in &self.action_clients {
1251 let mut is_feedback_ready = false;
1252 let mut is_status_ready = false;
1253 let mut is_goal_response_ready = false;
1254 let mut is_cancel_response_ready = false;
1255 let mut is_result_response_ready = false;
1256
1257 let ret = unsafe {
1258 rcl_action_client_wait_set_get_entities_ready(
1259 &ws,
1260 ac.lock().unwrap().handle(),
1261 &mut is_feedback_ready,
1262 &mut is_status_ready,
1263 &mut is_goal_response_ready,
1264 &mut is_cancel_response_ready,
1265 &mut is_result_response_ready,
1266 )
1267 };
1268
1269 if ret != RCL_RET_OK as i32 {
1270 continue;
1271 }
1272
1273 if is_feedback_ready {
1274 let mut acs = ac.lock().unwrap();
1275 acs.handle_feedback_msg();
1276 }
1277
1278 if is_status_ready {
1279 let mut acs = ac.lock().unwrap();
1280 acs.handle_status_msg();
1281 }
1282
1283 if is_goal_response_ready {
1284 let mut acs = ac.lock().unwrap();
1285 acs.handle_goal_response();
1286 }
1287
1288 if is_cancel_response_ready {
1289 let mut acs = ac.lock().unwrap();
1290 acs.handle_cancel_response();
1291 }
1292
1293 if is_result_response_ready {
1294 let mut acs = ac.lock().unwrap();
1295 acs.handle_result_response();
1296 }
1297 }
1298
1299 for s in &self.action_servers {
1300 let mut is_goal_request_ready = false;
1301 let mut is_cancel_request_ready = false;
1302 let mut is_result_request_ready = false;
1303 let mut is_goal_expired = false;
1304
1305 let ret = unsafe {
1306 rcl_action_server_wait_set_get_entities_ready(
1307 &ws,
1308 s.lock().unwrap().handle(),
1309 &mut is_goal_request_ready,
1310 &mut is_cancel_request_ready,
1311 &mut is_result_request_ready,
1312 &mut is_goal_expired,
1313 )
1314 };
1315
1316 if ret != RCL_RET_OK as i32 {
1317 continue;
1318 }
1319
1320 if is_goal_request_ready {
1321 let mut acs = s.lock().unwrap();
1322 acs.handle_goal_request(s.clone());
1323 }
1324
1325 if is_cancel_request_ready {
1326 let mut acs = s.lock().unwrap();
1327 acs.handle_cancel_request();
1328 }
1329
1330 if is_result_request_ready {
1331 let mut acs = s.lock().unwrap();
1332 acs.handle_result_request();
1333 }
1334
1335 if is_goal_expired {
1336 let mut acs = s.lock().unwrap();
1337 acs.handle_goal_expired();
1338 }
1339 }
1340
1341 unsafe {
1342 rcl_wait_set_fini(&mut ws);
1343 }
1344 }
1345
1346 pub fn get_topic_names_and_types(&self) -> Result<HashMap<String, Vec<String>>> {
1349 let mut tnat = unsafe { rmw_get_zero_initialized_names_and_types() };
1350 let ret = unsafe {
1351 rcl_get_topic_names_and_types(
1352 self.node_handle.as_ref(),
1353 &mut rcutils_get_default_allocator(),
1354 false,
1355 &mut tnat,
1356 )
1357 };
1358 if ret != RCL_RET_OK as i32 {
1359 log::error!("could not get topic names and types {}", ret);
1360 return Err(Error::from_rcl_error(ret));
1361 }
1362
1363 let mut res = HashMap::new();
1364 if tnat.names.data != std::ptr::null_mut() && tnat.types != std::ptr::null_mut() {
1365 let names = unsafe { std::slice::from_raw_parts(tnat.names.data, tnat.names.size) };
1366 let types = unsafe { std::slice::from_raw_parts(tnat.types, tnat.names.size) };
1367
1368 for (n, t) in names.iter().zip(types) {
1369 assert!(!(*n).is_null());
1370 let topic_name = unsafe { CStr::from_ptr(*n).to_str().unwrap().to_owned() };
1371 assert!(!t.data.is_null());
1372 let topic_types = unsafe { std::slice::from_raw_parts(t, t.size) };
1373 let topic_types: Vec<String> = unsafe {
1374 topic_types
1375 .iter()
1376 .map(|t| {
1377 assert!(*(t.data) != std::ptr::null_mut());
1378 CStr::from_ptr(*(t.data)).to_str().unwrap().to_owned()
1379 })
1380 .collect()
1381 };
1382 res.insert(topic_name, topic_types);
1383 }
1384 }
1385 unsafe {
1386 rmw_names_and_types_fini(&mut tnat);
1387 } Ok(res)
1389 }
1390
1391 pub fn get_publishers_info_by_topic(
1392 &self, topic_name: &str, no_mangle: bool,
1393 ) -> Result<Vec<TopicEndpointInfo>> {
1394 let node = self.node_handle.as_ref();
1395
1396 let topic_c_string =
1397 CString::new(topic_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
1398
1399 let mut allocator = unsafe { rcutils_get_default_allocator() };
1400
1401 let mut info_array: rcl_topic_endpoint_info_array_t =
1402 unsafe { rmw_get_zero_initialized_topic_endpoint_info_array() };
1403
1404 let result = unsafe {
1405 rcl_get_publishers_info_by_topic(
1406 node,
1407 &mut allocator,
1408 topic_c_string.as_ptr(),
1409 no_mangle,
1410 &mut info_array,
1411 )
1412 };
1413
1414 if result != RCL_RET_OK as i32 {
1415 unsafe { rmw_topic_endpoint_info_array_fini(&mut info_array, &mut allocator) };
1416 return Err(Error::from_rcl_error(result));
1417 }
1418
1419 let topic_info_list = convert_info_array_to_vec(&info_array);
1421
1422 let result = unsafe { rmw_topic_endpoint_info_array_fini(&mut info_array, &mut allocator) };
1423
1424 if result != RCL_RET_OK as i32 {
1425 return Err(Error::from_rcl_error(result));
1426 }
1427
1428 Ok(topic_info_list)
1429 }
1430
1431 pub fn create_wall_timer(&mut self, period: Duration) -> Result<Timer> {
1437 let mut clock = Clock::create(ClockType::SteadyTime)?;
1438 let timer_handle = self.create_timer_helper(&mut clock, period)?;
1439
1440 let (tx, rx) = mpsc::channel::<Duration>(1);
1441
1442 let timer = Timer_ {
1443 timer_handle,
1444 _clock: Some(clock), sender: tx,
1446 };
1447 self.timers.push(timer);
1448
1449 let out_timer = Timer { receiver: rx };
1450
1451 Ok(out_timer)
1452 }
1453
1454 pub fn create_timer(&mut self, period: Duration) -> Result<Timer> {
1460 let mut clock = self.ros_clock.lock().unwrap();
1461 let timer_handle = self.create_timer_helper(&mut clock, period)?;
1462
1463 let (tx, rx) = mpsc::channel::<Duration>(1);
1464
1465 let timer = Timer_ {
1466 timer_handle,
1467 _clock: None, sender: tx,
1469 };
1470 self.timers.push(timer);
1471
1472 let out_timer = Timer { receiver: rx };
1473
1474 Ok(out_timer)
1475 }
1476
1477 fn create_timer_helper(
1478 &self, clock: &mut Clock, period: Duration,
1479 ) -> Result<Pin<Box<RclTimer>>> {
1480 let mut timer_handle = unsafe { Box::pin(RclTimer::new()) };
1482
1483 let mut ctx = self.context.context_handle.lock().unwrap();
1484 let ret = unsafe {
1485 rcl_timer_init(
1486 &mut timer_handle.as_mut().get_unchecked_mut().handle,
1487 clock.clock_handle.as_mut(),
1488 ctx.as_mut(),
1489 period.as_nanos() as i64,
1490 None,
1491 rcutils_get_default_allocator(),
1492 )
1493 };
1494
1495 if ret != RCL_RET_OK as i32 {
1496 log::error!("could not create timer: {}", ret);
1497 return Err(Error::from_rcl_error(ret));
1498 }
1499
1500 Ok(timer_handle)
1501 }
1502
1503 pub fn logger(&self) -> &str {
1505 let ptr = unsafe { rcl_node_get_logger_name(self.node_handle.as_ref()) };
1506 if ptr.is_null() {
1507 return "";
1508 }
1509 let s = unsafe { CStr::from_ptr(ptr) };
1510 s.to_str().unwrap_or("")
1511 }
1512
1513 #[cfg(r2r__rosgraph_msgs__msg__Clock)]
1517 pub fn get_time_source(&self) -> TimeSource {
1518 self.time_source.clone()
1519 }
1520
1521 pub fn get_ros_clock(&self) -> Arc<Mutex<Clock>> {
1525 self.ros_clock.clone()
1526 }
1527}
1528
1529#[derive(Debug, Clone, PartialEq)]
1530struct RclTimer {
1531 handle: rcl_timer_t,
1532 _pin: PhantomPinned, }
1534
1535impl RclTimer {
1536 unsafe fn new() -> Self {
1537 Self {
1538 handle: rcl_get_zero_initialized_timer(),
1539 _pin: PhantomPinned,
1540 }
1541 }
1542}
1543
1544struct Timer_ {
1545 timer_handle: Pin<Box<RclTimer>>,
1546 _clock: Option<Clock>, sender: mpsc::Sender<Duration>,
1548}
1549
1550impl Timer_ {
1551 fn get_handle(&self) -> *const rcl_timer_t {
1552 &self.timer_handle.handle
1553 }
1554
1555 unsafe fn get_handle_mut(&mut self) -> *mut rcl_timer_t {
1561 self.timer_handle
1562 .as_mut()
1563 .map_unchecked_mut(|s| &mut s.handle)
1564 .get_unchecked_mut()
1565 }
1566
1567 fn handle_incoming(&mut self) -> bool {
1568 let mut is_ready = false;
1569 let ret = unsafe { rcl_timer_is_ready(self.get_handle(), &mut is_ready) };
1570 if ret == RCL_RET_OK as i32 && is_ready {
1571 let mut nanos = 0i64;
1572 let ret = unsafe { rcl_timer_get_time_since_last_call(self.get_handle(), &mut nanos) };
1574 if ret == RCL_RET_OK as i32 {
1575 let ret = unsafe { rcl_timer_call(self.get_handle_mut()) };
1576 if ret == RCL_RET_OK as i32 {
1577 if let Err(e) = self.sender.try_send(Duration::from_nanos(nanos as u64)) {
1578 if e.is_disconnected() {
1579 return true;
1581 }
1582 if e.is_full() {
1583 log::debug!(
1584 "Warning: timer tick not handled in time - no wakeup will occur"
1585 );
1586 }
1587 }
1588 }
1589 }
1590 }
1591 false
1592 }
1593}
1594
1595impl Drop for Timer_ {
1596 fn drop(&mut self) {
1597 let _ret = unsafe { rcl_timer_fini(self.get_handle_mut()) };
1598 }
1599}
1600
1601pub struct Timer {
1603 receiver: mpsc::Receiver<Duration>,
1604}
1605
1606impl Timer {
1607 pub async fn tick(&mut self) -> Result<Duration> {
1611 let next = self.receiver.next().await;
1612 if let Some(elapsed) = next {
1613 Ok(elapsed)
1614 } else {
1615 Err(Error::RCL_RET_TIMER_INVALID)
1616 }
1617 }
1618}
1619
1620fn wait_until_unwrapped<T>(mut a: Arc<T>) -> T {
1626 loop {
1627 match Arc::try_unwrap(a) {
1628 Ok(b) => return b,
1629 Err(t) => a = t,
1630 }
1631 }
1632}
1633
1634impl Drop for Node {
1635 fn drop(&mut self) {
1636 let _ctx_handle = self.context.context_handle.lock().unwrap();
1638
1639 for s in &mut self.subscribers {
1640 s.destroy(&mut self.node_handle);
1641 }
1642 for c in &mut self.clients {
1643 c.lock().unwrap().destroy(&mut self.node_handle);
1644 }
1645 for s in &mut self.services {
1646 s.lock().unwrap().destroy(&mut self.node_handle);
1647 }
1648 for c in &mut self.action_clients {
1649 c.lock().unwrap().destroy(&mut self.node_handle);
1650 }
1651 for s in &mut self.action_servers {
1652 s.lock().unwrap().destroy(&mut self.node_handle);
1653 }
1654 while let Some(p) = self.pubs.pop() {
1655 let p = wait_until_unwrapped(p);
1656
1657 p.destroy(self.node_handle.as_mut());
1658 }
1659 unsafe {
1660 rcl_node_fini(self.node_handle.as_mut());
1661 }
1662 }
1663}
1664
1665pub trait IsAvailablePollable {
1666 fn register_poll_available(&self, sender: oneshot::Sender<()>) -> Result<()>;
1667}
1668
1669pub struct TopicEndpointInfo {
1670 pub node_name: String,
1671 pub node_namespace: String,
1672 pub topic_type: String,
1673 pub endpoint_gid: [u8; RMW_GID_STORAGE_SIZE as usize],
1674 pub qos_profile: QosProfile,
1675}
1676
1677impl From<rmw_topic_endpoint_info_t> for TopicEndpointInfo {
1678 fn from(info: rmw_topic_endpoint_info_t) -> Self {
1679 let node_name = unsafe { CStr::from_ptr(info.node_name) }
1681 .to_string_lossy()
1682 .into_owned();
1683 let node_namespace = unsafe { CStr::from_ptr(info.node_namespace) }
1684 .to_string_lossy()
1685 .into_owned();
1686 let topic_type = unsafe { CStr::from_ptr(info.topic_type) }
1687 .to_string_lossy()
1688 .into_owned();
1689
1690 let endpoint_gid: [u8; RMW_GID_STORAGE_SIZE as usize] = info.endpoint_gid;
1692
1693 let qos_profile = QosProfile::from(info.qos_profile); TopicEndpointInfo {
1697 node_name,
1698 node_namespace,
1699 topic_type,
1700 endpoint_gid,
1701 qos_profile,
1702 }
1703 }
1704}
1705
1706fn convert_info_array_to_vec(
1707 info_array: &rcl_topic_endpoint_info_array_t,
1708) -> Vec<TopicEndpointInfo> {
1709 let mut topic_info_list = Vec::with_capacity(info_array.size);
1710
1711 if info_array.info_array != std::ptr::null_mut() {
1712 unsafe {
1713 let infos = std::slice::from_raw_parts(info_array.info_array, info_array.size);
1714 for &info in infos {
1715 let endpoint_info = TopicEndpointInfo::from(info);
1716 topic_info_list.push(endpoint_info);
1717 }
1718 }
1719 }
1720
1721 topic_info_list
1722}