r2r/
publishers.rs

1use futures::{channel::oneshot, Future, TryFutureExt};
2use std::{
3    ffi::{c_void, CString},
4    fmt::Debug,
5    marker::PhantomData,
6    sync::{Mutex, Once, Weak},
7};
8
9use crate::{error::*, msg_types::*, qos::QosProfile};
10use r2r_rcl::*;
11
12// The publish function is thread safe. ROS2 docs state:
13// =============
14//
15// This function is thread safe so long as access to both the
16// publisher and the" `ros_message` is synchronized."  That means that
17// calling rcl_publish() from multiple threads is allowed, but"
18// calling rcl_publish() at the same time as non-thread safe
19// publisher" functions is not, e.g. calling rcl_publish() and
20// rcl_publisher_fini()" concurrently is not allowed."  Before calling
21// rcl_publish() the message can change and after calling"
22// rcl_publish() the message can change, but it cannot be changed
23// during the" publish call."  The same `ros_message`, however, can be
24// passed to multiple calls of" rcl_publish() simultaneously, even if
25// the publishers differ."  The `ros_message` is unmodified by
26// rcl_publish()."
27//
28// TODO: I guess there is a potential error source in destructuring
29// while calling publish. I don't think its worth to protect with a
30// mutex/rwlock for this though...
31//
32// Methods that mutate need to called from the thread owning the Node.
33// I don't think we can count on Node being generally thread-safe.
34// So keep pub/sub management and polling contained to one thread
35// and send out publishers.
36
37unsafe impl<T> Send for Publisher<T> where T: WrappedTypesupport {}
38
39pub(crate) struct Publisher_ {
40    handle: rcl_publisher_t,
41
42    // TODO use a mpsc to avoid the mutex?
43    poll_inter_process_subscriber_channels: Mutex<Vec<oneshot::Sender<()>>>,
44}
45
46impl Publisher_ {
47    fn get_inter_process_subscription_count(&self) -> Result<usize> {
48        // See https://github.com/ros2/rclcpp/issues/623
49
50        let mut inter_process_subscription_count = 0;
51
52        let result = unsafe {
53            rcl_publisher_get_subscription_count(
54                &self.handle as *const rcl_publisher_t,
55                &mut inter_process_subscription_count as *mut usize,
56            )
57        };
58
59        if result == RCL_RET_OK as i32 {
60            Ok(inter_process_subscription_count)
61        } else {
62            Err(Error::from_rcl_error(result))
63        }
64    }
65
66    pub(crate) fn poll_has_inter_process_subscribers(&self) {
67        let mut poll_inter_process_subscriber_channels =
68            self.poll_inter_process_subscriber_channels.lock().unwrap();
69
70        if poll_inter_process_subscriber_channels.is_empty() {
71            return;
72        }
73        let inter_process_subscription_count = self.get_inter_process_subscription_count();
74        match inter_process_subscription_count {
75            Ok(0) => {
76                // not available...
77            }
78            Ok(_) => {
79                // send ok and close channels
80                while let Some(sender) = poll_inter_process_subscriber_channels.pop() {
81                    let _res = sender.send(()); // we ignore if receiver dropped.
82                }
83            }
84            Err(_) => {
85                // error, close all channels
86                poll_inter_process_subscriber_channels.clear();
87            }
88        }
89    }
90
91    pub(crate) fn destroy(mut self, node: &mut rcl_node_t) {
92        let _ret = unsafe { rcl_publisher_fini(&mut self.handle as *mut _, node) };
93
94        // TODO: check ret
95    }
96}
97
98/// A ROS (typed) publisher.
99///
100/// This contains a `Weak Arc` to a typed publisher. As such it is safe to
101/// move between threads.
102#[derive(Debug, Clone)]
103pub struct Publisher<T>
104where
105    T: WrappedTypesupport,
106{
107    pub(crate) handle: Weak<Publisher_>,
108    type_: PhantomData<T>,
109}
110
111unsafe impl Send for PublisherUntyped {}
112
113/// A ROS (untyped) publisher.
114///
115/// This contains a `Weak Arc` to an "untyped" publisher. As such it is safe to
116/// move between threads.
117#[derive(Debug, Clone)]
118pub struct PublisherUntyped {
119    pub(crate) handle: Weak<Publisher_>,
120    type_: String,
121}
122
123pub fn make_publisher<T>(handle: Weak<Publisher_>) -> Publisher<T>
124where
125    T: WrappedTypesupport,
126{
127    Publisher {
128        handle,
129        type_: PhantomData,
130    }
131}
132
133pub fn make_publisher_untyped(handle: Weak<Publisher_>, type_: String) -> PublisherUntyped {
134    PublisherUntyped { handle, type_ }
135}
136
137pub fn create_publisher_helper(
138    node: &mut rcl_node_t, topic: &str, typesupport: *const rosidl_message_type_support_t,
139    qos_profile: QosProfile,
140) -> Result<Publisher_> {
141    let mut publisher_handle = unsafe { rcl_get_zero_initialized_publisher() };
142    let topic_c_string = CString::new(topic).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
143
144    let result = unsafe {
145        let mut publisher_options = rcl_publisher_get_default_options();
146        publisher_options.qos = qos_profile.into();
147        rcl_publisher_init(
148            &mut publisher_handle,
149            node,
150            typesupport,
151            topic_c_string.as_ptr(),
152            &publisher_options,
153        )
154    };
155    if result == RCL_RET_OK as i32 {
156        Ok(Publisher_ {
157            handle: publisher_handle,
158            poll_inter_process_subscriber_channels: Mutex::new(Vec::new()),
159        })
160    } else {
161        Err(Error::from_rcl_error(result))
162    }
163}
164
165impl PublisherUntyped {
166    /// Publish an "untyped" ROS message represented by a `serde_json::Value`.
167    ///
168    /// It is up to the user to make sure the fields are correct.
169    pub fn publish(&self, msg: serde_json::Value) -> Result<()> {
170        // upgrade to actual ref. if still alive
171        let publisher = self
172            .handle
173            .upgrade()
174            .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;
175
176        let native_msg = WrappedNativeMsgUntyped::new_from(&self.type_)?;
177        native_msg.from_json(msg)?;
178
179        let result = unsafe {
180            rcl_publish(
181                &publisher.handle as *const rcl_publisher_t,
182                native_msg.void_ptr(),
183                std::ptr::null_mut(),
184            )
185        };
186
187        if result == RCL_RET_OK as i32 {
188            Ok(())
189        } else {
190            log::error!("could not publish {}", result);
191            Err(Error::from_rcl_error(result))
192        }
193    }
194
195    /// Publish an pre-serialized ROS message represented by a `&[u8]`.
196    ///
197    /// It is up to the user to make sure data is a valid ROS serialized message.
198    pub fn publish_raw(&self, data: &[u8]) -> Result<()> {
199        // TODO should this be an unsafe function? I'm not sure what happens if the data is malformed ..
200
201        // upgrade to actual ref. if still alive
202        let publisher = self
203            .handle
204            .upgrade()
205            .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;
206
207        // Safety: Not retained beyond this function
208        let msg_buf = rcl_serialized_message_t {
209            buffer: data.as_ptr() as *mut u8,
210            buffer_length: data.len(),
211            buffer_capacity: data.len(),
212
213            // Since its read only, this should never be used ..
214            allocator: unsafe { rcutils_get_default_allocator() },
215        };
216
217        let result = unsafe {
218            rcl_publish_serialized_message(
219                &publisher.handle,
220                &msg_buf as *const rcl_serialized_message_t,
221                std::ptr::null_mut(),
222            )
223        };
224
225        if result == RCL_RET_OK as i32 {
226            Ok(())
227        } else {
228            log::error!("could not publish {}", result);
229            Err(Error::from_rcl_error(result))
230        }
231    }
232
233    /// Gets the number of external subscribers (i.e. it doesn't
234    /// count subscribers from the same process).
235    pub fn get_inter_process_subscription_count(&self) -> Result<usize> {
236        self.handle
237            .upgrade()
238            .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?
239            .get_inter_process_subscription_count()
240    }
241
242    /// Waits for at least one external subscriber to begin subscribing to the
243    /// topic. It doesn't count subscribers from the same process.
244    pub fn wait_for_inter_process_subscribers(&self) -> Result<impl Future<Output = Result<()>>> {
245        let (sender, receiver) = oneshot::channel();
246
247        self.handle
248            .upgrade()
249            .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?
250            .poll_inter_process_subscriber_channels
251            .lock()
252            .unwrap()
253            .push(sender);
254
255        Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID))
256    }
257}
258
259impl<T: 'static> Publisher<T>
260where
261    T: WrappedTypesupport,
262{
263    /// Publish a ROS message.
264    pub fn publish(&self, msg: &T) -> Result<()>
265    where
266        T: WrappedTypesupport,
267    {
268        // upgrade to actual ref. if still alive
269        let publisher = self
270            .handle
271            .upgrade()
272            .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;
273        let native_msg: WrappedNativeMsg<T> = WrappedNativeMsg::<T>::from(msg);
274        let result = unsafe {
275            rcl_publish(
276                &publisher.handle as *const rcl_publisher_t,
277                native_msg.void_ptr(),
278                std::ptr::null_mut(),
279            )
280        };
281
282        if result == RCL_RET_OK as i32 {
283            Ok(())
284        } else {
285            log::error!("could not publish {}", result);
286            Err(Error::from_rcl_error(result))
287        }
288    }
289
290    pub fn borrow_loaned_message(&self) -> Result<WrappedNativeMsg<T>>
291    where
292        T: WrappedTypesupport,
293    {
294        // upgrade to actual ref. if still alive
295        let publisher = self
296            .handle
297            .upgrade()
298            .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;
299
300        if unsafe { rcl_publisher_can_loan_messages(&publisher.handle as *const rcl_publisher_t) } {
301            let mut loaned_msg: *mut c_void = std::ptr::null_mut();
302            let ret = unsafe {
303                rcl_borrow_loaned_message(
304                    &publisher.handle as *const rcl_publisher_t,
305                    T::get_ts(),
306                    &mut loaned_msg,
307                )
308            };
309            if ret != RCL_RET_OK as i32 {
310                log::error!("Failed getting loaned message");
311                return Err(Error::from_rcl_error(ret));
312            }
313
314            let handle_box = Box::new(publisher.handle);
315            let msg = WrappedNativeMsg::<T>::from_loaned(
316                loaned_msg as *mut T::CStruct,
317                Box::new(|msg: *mut T::CStruct| {
318                    let ret = unsafe {
319                        let handle_ptr = Box::into_raw(handle_box);
320                        let ret = rcl_return_loaned_message_from_publisher(
321                            handle_ptr,
322                            msg as *mut c_void,
323                        );
324                        drop(Box::from_raw(handle_ptr));
325                        ret
326                    };
327
328                    if ret != RCL_RET_OK as i32 {
329                        panic!("rcl_deallocate_loaned_message failed");
330                    }
331                }),
332            );
333            Ok(msg)
334        } else {
335            static LOG_LOANED_ERROR: Once = Once::new();
336            LOG_LOANED_ERROR.call_once(|| {
337                log::error!(
338                    "Currently used middleware can't loan messages. Local allocator will be used."
339                );
340            });
341
342            Ok(WrappedNativeMsg::<T>::new())
343        }
344    }
345
346    /// Publish a "native" ROS message.
347    ///
348    /// This function is useful if you want to bypass the generated
349    /// rust types as it lets you work with the raw C struct.
350    pub fn publish_native(&self, msg: &mut WrappedNativeMsg<T>) -> Result<()>
351    where
352        T: WrappedTypesupport,
353    {
354        // upgrade to actual ref. if still alive
355        let publisher = self
356            .handle
357            .upgrade()
358            .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;
359
360        let result = if msg.is_loaned {
361            unsafe {
362                // signal that we are relinquishing responsibility of the memory
363                msg.release();
364
365                // publish and return loaned message to middleware
366                rcl_publish_loaned_message(
367                    &publisher.handle as *const rcl_publisher_t,
368                    msg.void_ptr_mut(),
369                    std::ptr::null_mut(),
370                )
371            }
372        } else {
373            unsafe {
374                rcl_publish(
375                    &publisher.handle as *const rcl_publisher_t,
376                    msg.void_ptr(),
377                    std::ptr::null_mut(),
378                )
379            }
380        };
381
382        if result == RCL_RET_OK as i32 {
383            Ok(())
384        } else {
385            log::error!("could not publish native {}", result);
386            Err(Error::from_rcl_error(result))
387        }
388    }
389
390    /// Gets the number of external subscribers (i.e. it doesn't
391    /// count subscribers from the same process).
392    pub fn get_inter_process_subscription_count(&self) -> Result<usize> {
393        self.handle
394            .upgrade()
395            .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?
396            .get_inter_process_subscription_count()
397    }
398
399    /// Waits for at least one external subscriber to begin subscribing to the
400    /// topic. It doesn't count subscribers from the same process.
401    pub fn wait_for_inter_process_subscribers(&self) -> Result<impl Future<Output = Result<()>>> {
402        let (sender, receiver) = oneshot::channel();
403
404        self.handle
405            .upgrade()
406            .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?
407            .poll_inter_process_subscriber_channels
408            .lock()
409            .unwrap()
410            .push(sender);
411
412        Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID))
413    }
414}