r2r/
subscribers.rs

1use futures::channel::mpsc;
2use std::ffi::CString;
3
4use crate::{error::*, msg_types::*, qos::QosProfile};
5use r2r_rcl::*;
6use std::ffi::{c_void, CStr};
7
8pub trait Subscriber_ {
9    fn handle(&self) -> &rcl_subscription_t;
10    /// Returns true if the subscriber stream has been dropped.
11    fn handle_incoming(&mut self) -> bool;
12    fn destroy(&mut self, node: &mut rcl_node_t);
13}
14
15pub struct TypedSubscriber<T>
16where
17    T: WrappedTypesupport,
18{
19    pub rcl_handle: rcl_subscription_t,
20    pub sender: mpsc::Sender<T>,
21}
22
23pub struct NativeSubscriber<T>
24where
25    T: WrappedTypesupport,
26{
27    pub rcl_handle: rcl_subscription_t,
28    pub sender: mpsc::Sender<WrappedNativeMsg<T>>,
29}
30
31pub struct UntypedSubscriber {
32    pub rcl_handle: rcl_subscription_t,
33    pub topic_type: String,
34    pub sender: mpsc::Sender<Result<serde_json::Value>>,
35}
36
37pub struct RawSubscriber {
38    pub rcl_handle: rcl_subscription_t,
39    pub msg_buf: rcl_serialized_message_t,
40    pub sender: mpsc::Sender<Vec<u8>>,
41}
42
43impl<T: 'static> Subscriber_ for TypedSubscriber<T>
44where
45    T: WrappedTypesupport,
46{
47    fn handle(&self) -> &rcl_subscription_t {
48        &self.rcl_handle
49    }
50
51    fn handle_incoming(&mut self) -> bool {
52        let mut msg_info = rmw_message_info_t::default(); // we dont care for now
53        let mut msg = WrappedNativeMsg::<T>::new();
54        let ret = unsafe {
55            rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut())
56        };
57        if ret == RCL_RET_OK as i32 {
58            let msg = T::from_native(&msg);
59            if let Err(e) = self.sender.try_send(msg) {
60                if e.is_disconnected() {
61                    // user dropped the handle to the stream, signal removal.
62                    return true;
63                }
64                log::debug!("error {:?}", e)
65            }
66        }
67        false
68    }
69
70    fn destroy(&mut self, node: &mut rcl_node_t) {
71        unsafe {
72            rcl_subscription_fini(&mut self.rcl_handle, node);
73        }
74    }
75}
76
77impl<T: 'static> Subscriber_ for NativeSubscriber<T>
78where
79    T: WrappedTypesupport,
80{
81    fn handle(&self) -> &rcl_subscription_t {
82        &self.rcl_handle
83    }
84
85    fn handle_incoming(&mut self) -> bool {
86        let mut msg_info = rmw_message_info_t::default(); // we dont care for now
87        let msg = unsafe {
88            if rcl_subscription_can_loan_messages(&self.rcl_handle) {
89                let mut loaned_msg: *mut c_void = std::ptr::null_mut();
90                let ret = rcl_take_loaned_message(
91                    &self.rcl_handle,
92                    &mut loaned_msg,
93                    &mut msg_info,
94                    std::ptr::null_mut(),
95                );
96                if ret != RCL_RET_OK as i32 {
97                    return false;
98                }
99                let handle_box = Box::new(self.rcl_handle);
100                let deallocator = Box::new(|msg: *mut T::CStruct| {
101                    let handle_ptr = Box::into_raw(handle_box);
102                    let ret =
103                        rcl_return_loaned_message_from_subscription(handle_ptr, msg as *mut c_void);
104                    if ret == RCL_RET_OK as i32 {
105                        drop(Box::from_raw(handle_ptr));
106                    } else {
107                        let topic_str = rcl_subscription_get_topic_name(handle_ptr);
108                        let topic = CStr::from_ptr(topic_str).to_str().expect("to_str() call failed").to_owned();
109                        drop(Box::from_raw(handle_ptr));
110
111                        let err_str = rcutils_get_error_string();
112                        let err_str_ptr = &(err_str.str_) as *const std::os::raw::c_char;
113                        let error_msg = CStr::from_ptr(err_str_ptr);
114
115                        // Returning a loan shouldn't fail unless one of the handles or pointers
116                        // is invalid, both of which indicate a severe bug. Panicking is therefore
117                        // more appropriate than leaking the loaned message.
118                        panic!(
119                            "rcl_return_loaned_message_from_subscription() \
120                            failed for subscription on topic {}: {}",
121                            topic,
122                            error_msg.to_str().expect("to_str() call failed")
123                        );
124                    }
125                });
126                WrappedNativeMsg::<T>::from_loaned(loaned_msg as *mut T::CStruct, deallocator)
127            } else {
128                let mut new_msg = WrappedNativeMsg::<T>::new();
129                let ret = rcl_take(
130                    &self.rcl_handle,
131                    new_msg.void_ptr_mut(),
132                    &mut msg_info,
133                    std::ptr::null_mut(),
134                );
135                if ret != RCL_RET_OK as i32 {
136                    return false;
137                }
138                new_msg
139            }
140        };
141        if let Err(e) = self.sender.try_send(msg) {
142            if e.is_disconnected() {
143                // user dropped the handle to the stream, signal removal.
144                return true;
145            }
146            log::error!("error {:?}", e)
147        }
148        false
149    }
150
151    fn destroy(&mut self, node: &mut rcl_node_t) {
152        unsafe {
153            rcl_subscription_fini(&mut self.rcl_handle, node);
154        }
155    }
156}
157
158impl Subscriber_ for UntypedSubscriber {
159    fn handle(&self) -> &rcl_subscription_t {
160        &self.rcl_handle
161    }
162
163    fn handle_incoming(&mut self) -> bool {
164        let mut msg_info = rmw_message_info_t::default(); // we dont care for now
165        let mut msg = WrappedNativeMsgUntyped::new_from(&self.topic_type)
166            .unwrap_or_else(|_| panic!("no typesupport for {}", self.topic_type));
167        let ret = unsafe {
168            rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut())
169        };
170        if ret == RCL_RET_OK as i32 {
171            let json = msg.to_json();
172            if let Err(e) = self.sender.try_send(json) {
173                if e.is_disconnected() {
174                    // user dropped the handle to the stream, signal removal.
175                    return true;
176                }
177                log::debug!("error {:?}", e)
178            }
179        }
180        false
181    }
182
183    fn destroy(&mut self, node: &mut rcl_node_t) {
184        unsafe {
185            rcl_subscription_fini(&mut self.rcl_handle, node);
186        }
187    }
188}
189
190impl Subscriber_ for RawSubscriber {
191    fn handle(&self) -> &rcl_subscription_t {
192        &self.rcl_handle
193    }
194
195    fn handle_incoming(&mut self) -> bool {
196        let mut msg_info = rmw_message_info_t::default(); // we dont care for now
197        let ret = unsafe {
198            rcl_take_serialized_message(
199                &self.rcl_handle,
200                &mut self.msg_buf as *mut rcl_serialized_message_t,
201                &mut msg_info,
202                std::ptr::null_mut(),
203            )
204        };
205        if ret != RCL_RET_OK as i32 {
206            log::error!("failed to take serialized message");
207            return false;
208        }
209
210        let data_bytes = if self.msg_buf.buffer == std::ptr::null_mut() {
211            Vec::new()
212        } else {
213            unsafe {
214                std::slice::from_raw_parts(self.msg_buf.buffer, self.msg_buf.buffer_length).to_vec()
215            }
216        };
217
218        if let Err(e) = self.sender.try_send(data_bytes) {
219            if e.is_disconnected() {
220                // user dropped the handle to the stream, signal removal.
221                return true;
222            }
223            log::debug!("error {:?}", e)
224        }
225
226        false
227    }
228
229    fn destroy(&mut self, node: &mut rcl_node_t) {
230        unsafe {
231            rcl_subscription_fini(&mut self.rcl_handle, node);
232            rcutils_uint8_array_fini(&mut self.msg_buf as *mut rcl_serialized_message_t);
233        }
234    }
235}
236
237pub fn create_subscription_helper(
238    node: &mut rcl_node_t, topic: &str, ts: *const rosidl_message_type_support_t,
239    qos_profile: QosProfile,
240) -> Result<rcl_subscription_t> {
241    let mut subscription_handle = unsafe { rcl_get_zero_initialized_subscription() };
242    let topic_c_string = CString::new(topic).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
243
244    let result = unsafe {
245        let mut subscription_options = rcl_subscription_get_default_options();
246        subscription_options.qos = qos_profile.into();
247        rcl_subscription_init(
248            &mut subscription_handle,
249            node,
250            ts,
251            topic_c_string.as_ptr(),
252            &subscription_options,
253        )
254    };
255    if result == RCL_RET_OK as i32 {
256        Ok(subscription_handle)
257    } else {
258        Err(Error::from_rcl_error(result))
259    }
260}