1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
use futures::{channel::oneshot, Future, TryFutureExt};
use std::{
    ffi::{c_void, CString},
    fmt::Debug,
    marker::PhantomData,
    sync::{Mutex, Once, Weak},
};

use crate::{error::*, msg_types::*, qos::QosProfile};
use r2r_rcl::*;

// The publish function is thread safe. ROS2 docs state:
// =============
//
// This function is thread safe so long as access to both the
// publisher and the" `ros_message` is synchronized."  That means that
// calling rcl_publish() from multiple threads is allowed, but"
// calling rcl_publish() at the same time as non-thread safe
// publisher" functions is not, e.g. calling rcl_publish() and
// rcl_publisher_fini()" concurrently is not allowed."  Before calling
// rcl_publish() the message can change and after calling"
// rcl_publish() the message can change, but it cannot be changed
// during the" publish call."  The same `ros_message`, however, can be
// passed to multiple calls of" rcl_publish() simultaneously, even if
// the publishers differ."  The `ros_message` is unmodified by
// rcl_publish()."
//
// TODO: I guess there is a potential error source in destructuring
// while calling publish. I don't think its worth to protect with a
// mutex/rwlock for this though...
//
// Methods that mutate need to called from the thread owning the Node.
// I don't think we can count on Node being generally thread-safe.
// So keep pub/sub management and polling contained to one thread
// and send out publishers.

unsafe impl<T> Send for Publisher<T> where T: WrappedTypesupport {}

pub(crate) struct Publisher_ {
    handle: rcl_publisher_t,

    // TODO use a mpsc to avoid the mutex?
    poll_inter_process_subscriber_channels: Mutex<Vec<oneshot::Sender<()>>>,
}

impl Publisher_ {
    fn get_inter_process_subscription_count(&self) -> Result<usize> {
        // See https://github.com/ros2/rclcpp/issues/623

        let mut inter_process_subscription_count = 0;

        let result = unsafe {
            rcl_publisher_get_subscription_count(
                &self.handle as *const rcl_publisher_t,
                &mut inter_process_subscription_count as *mut usize,
            )
        };

        if result == RCL_RET_OK as i32 {
            Ok(inter_process_subscription_count)
        } else {
            Err(Error::from_rcl_error(result))
        }
    }

    pub(crate) fn poll_has_inter_process_subscribers(&self) {
        let mut poll_inter_process_subscriber_channels =
            self.poll_inter_process_subscriber_channels.lock().unwrap();

        if poll_inter_process_subscriber_channels.is_empty() {
            return;
        }
        let inter_process_subscription_count = self.get_inter_process_subscription_count();
        match inter_process_subscription_count {
            Ok(0) => {
                // not available...
            }
            Ok(_) => {
                // send ok and close channels
                while let Some(sender) = poll_inter_process_subscriber_channels.pop() {
                    let _res = sender.send(()); // we ignore if receiver dropped.
                }
            }
            Err(_) => {
                // error, close all channels
                poll_inter_process_subscriber_channels.clear();
            }
        }
    }

    pub(crate) fn destroy(mut self, node: &mut rcl_node_t) {
        let _ret = unsafe { rcl_publisher_fini(&mut self.handle as *mut _, node) };

        // TODO: check ret
    }
}

/// A ROS (typed) publisher.
///
/// This contains a `Weak Arc` to a typed publisher. As such it is safe to
/// move between threads.
#[derive(Debug, Clone)]
pub struct Publisher<T>
where
    T: WrappedTypesupport,
{
    pub(crate) handle: Weak<Publisher_>,
    type_: PhantomData<T>,
}

unsafe impl Send for PublisherUntyped {}

/// A ROS (untyped) publisher.
///
/// This contains a `Weak Arc` to an "untyped" publisher. As such it is safe to
/// move between threads.
#[derive(Debug, Clone)]
pub struct PublisherUntyped {
    pub(crate) handle: Weak<Publisher_>,
    type_: String,
}

pub fn make_publisher<T>(handle: Weak<Publisher_>) -> Publisher<T>
where
    T: WrappedTypesupport,
{
    Publisher {
        handle,
        type_: PhantomData,
    }
}

pub fn make_publisher_untyped(handle: Weak<Publisher_>, type_: String) -> PublisherUntyped {
    PublisherUntyped { handle, type_ }
}

pub fn create_publisher_helper(
    node: &mut rcl_node_t, topic: &str, typesupport: *const rosidl_message_type_support_t,
    qos_profile: QosProfile,
) -> Result<Publisher_> {
    let mut publisher_handle = unsafe { rcl_get_zero_initialized_publisher() };
    let topic_c_string = CString::new(topic).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;

    let result = unsafe {
        let mut publisher_options = rcl_publisher_get_default_options();
        publisher_options.qos = qos_profile.into();
        rcl_publisher_init(
            &mut publisher_handle,
            node,
            typesupport,
            topic_c_string.as_ptr(),
            &publisher_options,
        )
    };
    if result == RCL_RET_OK as i32 {
        Ok(Publisher_ {
            handle: publisher_handle,
            poll_inter_process_subscriber_channels: Mutex::new(Vec::new()),
        })
    } else {
        Err(Error::from_rcl_error(result))
    }
}

impl PublisherUntyped {
    /// Publish an "untyped" ROS message represented by a `serde_json::Value`.
    ///
    /// It is up to the user to make sure the fields are correct.
    pub fn publish(&self, msg: serde_json::Value) -> Result<()> {
        // upgrade to actual ref. if still alive
        let publisher = self
            .handle
            .upgrade()
            .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;

        let native_msg = WrappedNativeMsgUntyped::new_from(&self.type_)?;
        native_msg.from_json(msg)?;

        let result = unsafe {
            rcl_publish(
                &publisher.handle as *const rcl_publisher_t,
                native_msg.void_ptr(),
                std::ptr::null_mut(),
            )
        };

        if result == RCL_RET_OK as i32 {
            Ok(())
        } else {
            log::error!("could not publish {}", result);
            Err(Error::from_rcl_error(result))
        }
    }

    /// Publish an pre-serialized ROS message represented by a `&[u8]`.
    ///
    /// It is up to the user to make sure data is a valid ROS serialized message.
    pub fn publish_raw(&self, data: &[u8]) -> Result<()> {
        // TODO should this be an unsafe function? I'm not sure what happens if the data is malformed ..

        // upgrade to actual ref. if still alive
        let publisher = self
            .handle
            .upgrade()
            .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;

        // Safety: Not retained beyond this function
        let msg_buf = rcl_serialized_message_t {
            buffer: data.as_ptr() as *mut u8,
            buffer_length: data.len(),
            buffer_capacity: data.len(),

            // Since its read only, this should never be used ..
            allocator: unsafe { rcutils_get_default_allocator() },
        };

        let result = unsafe {
            rcl_publish_serialized_message(
                &publisher.handle,
                &msg_buf as *const rcl_serialized_message_t,
                std::ptr::null_mut(),
            )
        };

        if result == RCL_RET_OK as i32 {
            Ok(())
        } else {
            log::error!("could not publish {}", result);
            Err(Error::from_rcl_error(result))
        }
    }

    /// Gets the number of external subscribers (i.e. it doesn't
    /// count subscribers from the same process).
    pub fn get_inter_process_subscription_count(&self) -> Result<usize> {
        self.handle
            .upgrade()
            .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?
            .get_inter_process_subscription_count()
    }

    /// Waits for at least one external subscriber to begin subscribing to the
    /// topic. It doesn't count subscribers from the same process.
    pub fn wait_for_inter_process_subscribers(&self) -> Result<impl Future<Output = Result<()>>> {
        let (sender, receiver) = oneshot::channel();

        self.handle
            .upgrade()
            .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?
            .poll_inter_process_subscriber_channels
            .lock()
            .unwrap()
            .push(sender);

        Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID))
    }
}

impl<T: 'static> Publisher<T>
where
    T: WrappedTypesupport,
{
    /// Publish a ROS message.
    pub fn publish(&self, msg: &T) -> Result<()>
    where
        T: WrappedTypesupport,
    {
        // upgrade to actual ref. if still alive
        let publisher = self
            .handle
            .upgrade()
            .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;
        let native_msg: WrappedNativeMsg<T> = WrappedNativeMsg::<T>::from(msg);
        let result = unsafe {
            rcl_publish(
                &publisher.handle as *const rcl_publisher_t,
                native_msg.void_ptr(),
                std::ptr::null_mut(),
            )
        };

        if result == RCL_RET_OK as i32 {
            Ok(())
        } else {
            log::error!("could not publish {}", result);
            Err(Error::from_rcl_error(result))
        }
    }

    pub fn borrow_loaned_message(&self) -> Result<WrappedNativeMsg<T>>
    where
        T: WrappedTypesupport,
    {
        // upgrade to actual ref. if still alive
        let publisher = self
            .handle
            .upgrade()
            .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;

        if unsafe { rcl_publisher_can_loan_messages(&publisher.handle as *const rcl_publisher_t) } {
            let mut loaned_msg: *mut c_void = std::ptr::null_mut();
            let ret = unsafe {
                rcl_borrow_loaned_message(
                    &publisher.handle as *const rcl_publisher_t,
                    T::get_ts(),
                    &mut loaned_msg,
                )
            };
            if ret != RCL_RET_OK as i32 {
                log::error!("Failed getting loaned message");
                return Err(Error::from_rcl_error(ret));
            }

            let handle_box = Box::new(publisher.handle);
            let msg = WrappedNativeMsg::<T>::from_loaned(
                loaned_msg as *mut T::CStruct,
                Box::new(|msg: *mut T::CStruct| {
                    let ret = unsafe {
                        let handle_ptr = Box::into_raw(handle_box);
                        let ret = rcl_return_loaned_message_from_publisher(
                            handle_ptr,
                            msg as *mut c_void,
                        );
                        drop(Box::from_raw(handle_ptr));
                        ret
                    };

                    if ret != RCL_RET_OK as i32 {
                        panic!("rcl_deallocate_loaned_message failed");
                    }
                }),
            );
            Ok(msg)
        } else {
            static LOG_LOANED_ERROR: Once = Once::new();
            LOG_LOANED_ERROR.call_once(|| {
                log::error!(
                    "Currently used middleware can't loan messages. Local allocator will be used."
                );
            });

            Ok(WrappedNativeMsg::<T>::new())
        }
    }

    /// Publish a "native" ROS message.
    ///
    /// This function is useful if you want to bypass the generated
    /// rust types as it lets you work with the raw C struct.
    pub fn publish_native(&self, msg: &mut WrappedNativeMsg<T>) -> Result<()>
    where
        T: WrappedTypesupport,
    {
        // upgrade to actual ref. if still alive
        let publisher = self
            .handle
            .upgrade()
            .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;

        let result = if msg.is_loaned {
            unsafe {
                // signal that we are relinquishing responsibility of the memory
                msg.release();

                // publish and return loaned message to middleware
                rcl_publish_loaned_message(
                    &publisher.handle as *const rcl_publisher_t,
                    msg.void_ptr_mut(),
                    std::ptr::null_mut(),
                )
            }
        } else {
            unsafe {
                rcl_publish(
                    &publisher.handle as *const rcl_publisher_t,
                    msg.void_ptr(),
                    std::ptr::null_mut(),
                )
            }
        };

        if result == RCL_RET_OK as i32 {
            Ok(())
        } else {
            log::error!("could not publish native {}", result);
            Err(Error::from_rcl_error(result))
        }
    }

    /// Gets the number of external subscribers (i.e. it doesn't
    /// count subscribers from the same process).
    pub fn get_inter_process_subscription_count(&self) -> Result<usize> {
        self.handle
            .upgrade()
            .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?
            .get_inter_process_subscription_count()
    }

    /// Waits for at least one external subscriber to begin subscribing to the
    /// topic. It doesn't count subscribers from the same process.
    pub fn wait_for_inter_process_subscribers(&self) -> Result<impl Future<Output = Result<()>>> {
        let (sender, receiver) = oneshot::channel();

        self.handle
            .upgrade()
            .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?
            .poll_inter_process_subscriber_channels
            .lock()
            .unwrap()
            .push(sender);

        Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID))
    }
}