1use futures::{channel::oneshot, Future, TryFutureExt};
2use std::{
3 ffi::{c_void, CString},
4 fmt::Debug,
5 marker::PhantomData,
6 sync::{Mutex, Once, Weak},
7};
8
9use crate::{error::*, msg_types::*, qos::QosProfile};
10use r2r_rcl::*;
11
12unsafe impl<T> Send for Publisher<T> where T: WrappedTypesupport {}
38
39pub(crate) struct Publisher_ {
40 handle: rcl_publisher_t,
41
42 poll_inter_process_subscriber_channels: Mutex<Vec<oneshot::Sender<()>>>,
44}
45
46impl Publisher_ {
47 fn get_inter_process_subscription_count(&self) -> Result<usize> {
48 let mut inter_process_subscription_count = 0;
51
52 let result = unsafe {
53 rcl_publisher_get_subscription_count(
54 &self.handle as *const rcl_publisher_t,
55 &mut inter_process_subscription_count as *mut usize,
56 )
57 };
58
59 if result == RCL_RET_OK as i32 {
60 Ok(inter_process_subscription_count)
61 } else {
62 Err(Error::from_rcl_error(result))
63 }
64 }
65
66 pub(crate) fn poll_has_inter_process_subscribers(&self) {
67 let mut poll_inter_process_subscriber_channels =
68 self.poll_inter_process_subscriber_channels.lock().unwrap();
69
70 if poll_inter_process_subscriber_channels.is_empty() {
71 return;
72 }
73 let inter_process_subscription_count = self.get_inter_process_subscription_count();
74 match inter_process_subscription_count {
75 Ok(0) => {
76 }
78 Ok(_) => {
79 while let Some(sender) = poll_inter_process_subscriber_channels.pop() {
81 let _res = sender.send(()); }
83 }
84 Err(_) => {
85 poll_inter_process_subscriber_channels.clear();
87 }
88 }
89 }
90
91 pub(crate) fn destroy(mut self, node: &mut rcl_node_t) {
92 let _ret = unsafe { rcl_publisher_fini(&mut self.handle as *mut _, node) };
93
94 }
96}
97
98#[derive(Debug, Clone)]
103pub struct Publisher<T>
104where
105 T: WrappedTypesupport,
106{
107 pub(crate) handle: Weak<Publisher_>,
108 type_: PhantomData<T>,
109}
110
111unsafe impl Send for PublisherUntyped {}
112
113#[derive(Debug, Clone)]
118pub struct PublisherUntyped {
119 pub(crate) handle: Weak<Publisher_>,
120 type_: String,
121}
122
123pub fn make_publisher<T>(handle: Weak<Publisher_>) -> Publisher<T>
124where
125 T: WrappedTypesupport,
126{
127 Publisher {
128 handle,
129 type_: PhantomData,
130 }
131}
132
133pub fn make_publisher_untyped(handle: Weak<Publisher_>, type_: String) -> PublisherUntyped {
134 PublisherUntyped { handle, type_ }
135}
136
137pub fn create_publisher_helper(
138 node: &mut rcl_node_t, topic: &str, typesupport: *const rosidl_message_type_support_t,
139 qos_profile: QosProfile,
140) -> Result<Publisher_> {
141 let mut publisher_handle = unsafe { rcl_get_zero_initialized_publisher() };
142 let topic_c_string = CString::new(topic).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
143
144 let result = unsafe {
145 let mut publisher_options = rcl_publisher_get_default_options();
146 publisher_options.qos = qos_profile.into();
147 rcl_publisher_init(
148 &mut publisher_handle,
149 node,
150 typesupport,
151 topic_c_string.as_ptr(),
152 &publisher_options,
153 )
154 };
155 if result == RCL_RET_OK as i32 {
156 Ok(Publisher_ {
157 handle: publisher_handle,
158 poll_inter_process_subscriber_channels: Mutex::new(Vec::new()),
159 })
160 } else {
161 Err(Error::from_rcl_error(result))
162 }
163}
164
165impl PublisherUntyped {
166 pub fn publish(&self, msg: serde_json::Value) -> Result<()> {
170 let publisher = self
172 .handle
173 .upgrade()
174 .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;
175
176 let native_msg = WrappedNativeMsgUntyped::new_from(&self.type_)?;
177 native_msg.from_json(msg)?;
178
179 let result = unsafe {
180 rcl_publish(
181 &publisher.handle as *const rcl_publisher_t,
182 native_msg.void_ptr(),
183 std::ptr::null_mut(),
184 )
185 };
186
187 if result == RCL_RET_OK as i32 {
188 Ok(())
189 } else {
190 log::error!("could not publish {}", result);
191 Err(Error::from_rcl_error(result))
192 }
193 }
194
195 pub fn publish_raw(&self, data: &[u8]) -> Result<()> {
199 let publisher = self
203 .handle
204 .upgrade()
205 .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;
206
207 let msg_buf = rcl_serialized_message_t {
209 buffer: data.as_ptr() as *mut u8,
210 buffer_length: data.len(),
211 buffer_capacity: data.len(),
212
213 allocator: unsafe { rcutils_get_default_allocator() },
215 };
216
217 let result = unsafe {
218 rcl_publish_serialized_message(
219 &publisher.handle,
220 &msg_buf as *const rcl_serialized_message_t,
221 std::ptr::null_mut(),
222 )
223 };
224
225 if result == RCL_RET_OK as i32 {
226 Ok(())
227 } else {
228 log::error!("could not publish {}", result);
229 Err(Error::from_rcl_error(result))
230 }
231 }
232
233 pub fn get_inter_process_subscription_count(&self) -> Result<usize> {
236 self.handle
237 .upgrade()
238 .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?
239 .get_inter_process_subscription_count()
240 }
241
242 pub fn wait_for_inter_process_subscribers(&self) -> Result<impl Future<Output = Result<()>>> {
245 let (sender, receiver) = oneshot::channel();
246
247 self.handle
248 .upgrade()
249 .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?
250 .poll_inter_process_subscriber_channels
251 .lock()
252 .unwrap()
253 .push(sender);
254
255 Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID))
256 }
257}
258
259impl<T: 'static> Publisher<T>
260where
261 T: WrappedTypesupport,
262{
263 pub fn publish(&self, msg: &T) -> Result<()>
265 where
266 T: WrappedTypesupport,
267 {
268 let publisher = self
270 .handle
271 .upgrade()
272 .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;
273 let native_msg: WrappedNativeMsg<T> = WrappedNativeMsg::<T>::from(msg);
274 let result = unsafe {
275 rcl_publish(
276 &publisher.handle as *const rcl_publisher_t,
277 native_msg.void_ptr(),
278 std::ptr::null_mut(),
279 )
280 };
281
282 if result == RCL_RET_OK as i32 {
283 Ok(())
284 } else {
285 log::error!("could not publish {}", result);
286 Err(Error::from_rcl_error(result))
287 }
288 }
289
290 pub fn borrow_loaned_message(&self) -> Result<WrappedNativeMsg<T>>
291 where
292 T: WrappedTypesupport,
293 {
294 let publisher = self
296 .handle
297 .upgrade()
298 .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;
299
300 if unsafe { rcl_publisher_can_loan_messages(&publisher.handle as *const rcl_publisher_t) } {
301 let mut loaned_msg: *mut c_void = std::ptr::null_mut();
302 let ret = unsafe {
303 rcl_borrow_loaned_message(
304 &publisher.handle as *const rcl_publisher_t,
305 T::get_ts(),
306 &mut loaned_msg,
307 )
308 };
309 if ret != RCL_RET_OK as i32 {
310 log::error!("Failed getting loaned message");
311 return Err(Error::from_rcl_error(ret));
312 }
313
314 let handle_box = Box::new(publisher.handle);
315 let msg = WrappedNativeMsg::<T>::from_loaned(
316 loaned_msg as *mut T::CStruct,
317 Box::new(|msg: *mut T::CStruct| {
318 let ret = unsafe {
319 let handle_ptr = Box::into_raw(handle_box);
320 let ret = rcl_return_loaned_message_from_publisher(
321 handle_ptr,
322 msg as *mut c_void,
323 );
324 drop(Box::from_raw(handle_ptr));
325 ret
326 };
327
328 if ret != RCL_RET_OK as i32 {
329 panic!("rcl_deallocate_loaned_message failed");
330 }
331 }),
332 );
333 Ok(msg)
334 } else {
335 static LOG_LOANED_ERROR: Once = Once::new();
336 LOG_LOANED_ERROR.call_once(|| {
337 log::error!(
338 "Currently used middleware can't loan messages. Local allocator will be used."
339 );
340 });
341
342 Ok(WrappedNativeMsg::<T>::new())
343 }
344 }
345
346 pub fn publish_native(&self, msg: &mut WrappedNativeMsg<T>) -> Result<()>
351 where
352 T: WrappedTypesupport,
353 {
354 let publisher = self
356 .handle
357 .upgrade()
358 .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;
359
360 let result = if msg.is_loaned {
361 unsafe {
362 msg.release();
364
365 rcl_publish_loaned_message(
367 &publisher.handle as *const rcl_publisher_t,
368 msg.void_ptr_mut(),
369 std::ptr::null_mut(),
370 )
371 }
372 } else {
373 unsafe {
374 rcl_publish(
375 &publisher.handle as *const rcl_publisher_t,
376 msg.void_ptr(),
377 std::ptr::null_mut(),
378 )
379 }
380 };
381
382 if result == RCL_RET_OK as i32 {
383 Ok(())
384 } else {
385 log::error!("could not publish native {}", result);
386 Err(Error::from_rcl_error(result))
387 }
388 }
389
390 pub fn get_inter_process_subscription_count(&self) -> Result<usize> {
393 self.handle
394 .upgrade()
395 .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?
396 .get_inter_process_subscription_count()
397 }
398
399 pub fn wait_for_inter_process_subscribers(&self) -> Result<impl Future<Output = Result<()>>> {
402 let (sender, receiver) = oneshot::channel();
403
404 self.handle
405 .upgrade()
406 .ok_or(Error::RCL_RET_PUBLISHER_INVALID)?
407 .poll_inter_process_subscriber_channels
408 .lock()
409 .unwrap()
410 .push(sender);
411
412 Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID))
413 }
414}