r2r/
subscribers.rs
1use futures::channel::mpsc;
2use std::ffi::CString;
3
4use crate::{error::*, msg_types::*, qos::QosProfile};
5use r2r_rcl::*;
6use std::ffi::{c_void, CStr};
7
8pub trait Subscriber_ {
9 fn handle(&self) -> &rcl_subscription_t;
10 fn handle_incoming(&mut self) -> bool;
12 fn destroy(&mut self, node: &mut rcl_node_t);
13}
14
15pub struct TypedSubscriber<T>
16where
17 T: WrappedTypesupport,
18{
19 pub rcl_handle: rcl_subscription_t,
20 pub sender: mpsc::Sender<T>,
21}
22
23pub struct NativeSubscriber<T>
24where
25 T: WrappedTypesupport,
26{
27 pub rcl_handle: rcl_subscription_t,
28 pub sender: mpsc::Sender<WrappedNativeMsg<T>>,
29}
30
31pub struct UntypedSubscriber {
32 pub rcl_handle: rcl_subscription_t,
33 pub topic_type: String,
34 pub sender: mpsc::Sender<Result<serde_json::Value>>,
35}
36
37pub struct RawSubscriber {
38 pub rcl_handle: rcl_subscription_t,
39 pub msg_buf: rcl_serialized_message_t,
40 pub sender: mpsc::Sender<Vec<u8>>,
41}
42
43impl<T: 'static> Subscriber_ for TypedSubscriber<T>
44where
45 T: WrappedTypesupport,
46{
47 fn handle(&self) -> &rcl_subscription_t {
48 &self.rcl_handle
49 }
50
51 fn handle_incoming(&mut self) -> bool {
52 let mut msg_info = rmw_message_info_t::default(); let mut msg = WrappedNativeMsg::<T>::new();
54 let ret = unsafe {
55 rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut())
56 };
57 if ret == RCL_RET_OK as i32 {
58 let msg = T::from_native(&msg);
59 if let Err(e) = self.sender.try_send(msg) {
60 if e.is_disconnected() {
61 return true;
63 }
64 log::debug!("error {:?}", e)
65 }
66 }
67 false
68 }
69
70 fn destroy(&mut self, node: &mut rcl_node_t) {
71 unsafe {
72 rcl_subscription_fini(&mut self.rcl_handle, node);
73 }
74 }
75}
76
77impl<T: 'static> Subscriber_ for NativeSubscriber<T>
78where
79 T: WrappedTypesupport,
80{
81 fn handle(&self) -> &rcl_subscription_t {
82 &self.rcl_handle
83 }
84
85 fn handle_incoming(&mut self) -> bool {
86 let mut msg_info = rmw_message_info_t::default(); let msg = unsafe {
88 if rcl_subscription_can_loan_messages(&self.rcl_handle) {
89 let mut loaned_msg: *mut c_void = std::ptr::null_mut();
90 let ret = rcl_take_loaned_message(
91 &self.rcl_handle,
92 &mut loaned_msg,
93 &mut msg_info,
94 std::ptr::null_mut(),
95 );
96 if ret != RCL_RET_OK as i32 {
97 return false;
98 }
99 let handle_box = Box::new(self.rcl_handle);
100 let deallocator = Box::new(|msg: *mut T::CStruct| {
101 let handle_ptr = Box::into_raw(handle_box);
102 let ret =
103 rcl_return_loaned_message_from_subscription(handle_ptr, msg as *mut c_void);
104 if ret == RCL_RET_OK as i32 {
105 drop(Box::from_raw(handle_ptr));
106 } else {
107 let topic_str = rcl_subscription_get_topic_name(handle_ptr);
108 let topic = CStr::from_ptr(topic_str).to_str().expect("to_str() call failed").to_owned();
109 drop(Box::from_raw(handle_ptr));
110
111 let err_str = rcutils_get_error_string();
112 let err_str_ptr = &(err_str.str_) as *const std::os::raw::c_char;
113 let error_msg = CStr::from_ptr(err_str_ptr);
114
115 panic!(
119 "rcl_return_loaned_message_from_subscription() \
120 failed for subscription on topic {}: {}",
121 topic,
122 error_msg.to_str().expect("to_str() call failed")
123 );
124 }
125 });
126 WrappedNativeMsg::<T>::from_loaned(loaned_msg as *mut T::CStruct, deallocator)
127 } else {
128 let mut new_msg = WrappedNativeMsg::<T>::new();
129 let ret = rcl_take(
130 &self.rcl_handle,
131 new_msg.void_ptr_mut(),
132 &mut msg_info,
133 std::ptr::null_mut(),
134 );
135 if ret != RCL_RET_OK as i32 {
136 return false;
137 }
138 new_msg
139 }
140 };
141 if let Err(e) = self.sender.try_send(msg) {
142 if e.is_disconnected() {
143 return true;
145 }
146 log::error!("error {:?}", e)
147 }
148 false
149 }
150
151 fn destroy(&mut self, node: &mut rcl_node_t) {
152 unsafe {
153 rcl_subscription_fini(&mut self.rcl_handle, node);
154 }
155 }
156}
157
158impl Subscriber_ for UntypedSubscriber {
159 fn handle(&self) -> &rcl_subscription_t {
160 &self.rcl_handle
161 }
162
163 fn handle_incoming(&mut self) -> bool {
164 let mut msg_info = rmw_message_info_t::default(); let mut msg = WrappedNativeMsgUntyped::new_from(&self.topic_type)
166 .unwrap_or_else(|_| panic!("no typesupport for {}", self.topic_type));
167 let ret = unsafe {
168 rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut())
169 };
170 if ret == RCL_RET_OK as i32 {
171 let json = msg.to_json();
172 if let Err(e) = self.sender.try_send(json) {
173 if e.is_disconnected() {
174 return true;
176 }
177 log::debug!("error {:?}", e)
178 }
179 }
180 false
181 }
182
183 fn destroy(&mut self, node: &mut rcl_node_t) {
184 unsafe {
185 rcl_subscription_fini(&mut self.rcl_handle, node);
186 }
187 }
188}
189
190impl Subscriber_ for RawSubscriber {
191 fn handle(&self) -> &rcl_subscription_t {
192 &self.rcl_handle
193 }
194
195 fn handle_incoming(&mut self) -> bool {
196 let mut msg_info = rmw_message_info_t::default(); let ret = unsafe {
198 rcl_take_serialized_message(
199 &self.rcl_handle,
200 &mut self.msg_buf as *mut rcl_serialized_message_t,
201 &mut msg_info,
202 std::ptr::null_mut(),
203 )
204 };
205 if ret != RCL_RET_OK as i32 {
206 log::error!("failed to take serialized message");
207 return false;
208 }
209
210 let data_bytes = if self.msg_buf.buffer == std::ptr::null_mut() {
211 Vec::new()
212 } else {
213 unsafe {
214 std::slice::from_raw_parts(self.msg_buf.buffer, self.msg_buf.buffer_length).to_vec()
215 }
216 };
217
218 if let Err(e) = self.sender.try_send(data_bytes) {
219 if e.is_disconnected() {
220 return true;
222 }
223 log::debug!("error {:?}", e)
224 }
225
226 false
227 }
228
229 fn destroy(&mut self, node: &mut rcl_node_t) {
230 unsafe {
231 rcl_subscription_fini(&mut self.rcl_handle, node);
232 rcutils_uint8_array_fini(&mut self.msg_buf as *mut rcl_serialized_message_t);
233 }
234 }
235}
236
237pub fn create_subscription_helper(
238 node: &mut rcl_node_t, topic: &str, ts: *const rosidl_message_type_support_t,
239 qos_profile: QosProfile,
240) -> Result<rcl_subscription_t> {
241 let mut subscription_handle = unsafe { rcl_get_zero_initialized_subscription() };
242 let topic_c_string = CString::new(topic).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
243
244 let result = unsafe {
245 let mut subscription_options = rcl_subscription_get_default_options();
246 subscription_options.qos = qos_profile.into();
247 rcl_subscription_init(
248 &mut subscription_handle,
249 node,
250 ts,
251 topic_c_string.as_ptr(),
252 &subscription_options,
253 )
254 };
255 if result == RCL_RET_OK as i32 {
256 Ok(subscription_handle)
257 } else {
258 Err(Error::from_rcl_error(result))
259 }
260}