r2r/
clients.rs

1use futures::{channel::oneshot, TryFutureExt};
2use std::{
3    ffi::CString,
4    future::Future,
5    mem::MaybeUninit,
6    sync::{Mutex, Weak},
7};
8
9use crate::{error::*, msg_types::*, QosProfile};
10use r2r_rcl::*;
11
12/// ROS service client.
13///
14/// This is a handle to a service client wrapped in a `Mutex` inside a
15/// `Weak` `Arc`. As such you can pass it between threads safely.
16pub struct Client<T>
17where
18    T: WrappedServiceTypeSupport,
19{
20    client: Weak<Mutex<TypedClient<T>>>,
21}
22
23impl<T: 'static> Client<T>
24where
25    T: WrappedServiceTypeSupport,
26{
27    /// Make a service request.
28    ///
29    /// Returns a `Future` of the `Response` type.
30    pub fn request(&self, msg: &T::Request) -> Result<impl Future<Output = Result<T::Response>>>
31    where
32        T: WrappedServiceTypeSupport,
33    {
34        // upgrade to actual ref. if still alive
35        let client = self.client.upgrade().ok_or(Error::RCL_RET_CLIENT_INVALID)?;
36        let mut client = client.lock().unwrap();
37        client.request(msg)
38    }
39}
40
41/// ROS "untyped" service client.
42///
43/// The untyped client is useful when you don't know the concrete type
44/// at compile time. Messages are represented by `serde_json::Value`.
45///
46/// This is a handle to a service client wrapped in a `Mutex` inside a
47/// `Weak` `Arc`. As such you can pass it between threads safely.
48pub struct ClientUntyped {
49    client: Weak<Mutex<UntypedClient_>>,
50}
51
52impl ClientUntyped {
53    /// Make an "untyped" service request.
54    ///
55    /// The request is a `serde_json::Value`. It is up to the user to
56    /// make sure the fields in the json object are correct.
57    ///
58    /// Returns a `Future` of Result<serde_json::Value>.
59    pub fn request(
60        &self, msg: serde_json::Value,
61    ) -> Result<impl Future<Output = Result<Result<serde_json::Value>>>> {
62        // upgrade to actual ref. if still alive
63        let client = self.client.upgrade().ok_or(Error::RCL_RET_CLIENT_INVALID)?;
64        let mut client = client.lock().unwrap();
65        client.request(msg)
66    }
67}
68
69pub fn make_client<T>(client: Weak<Mutex<TypedClient<T>>>) -> Client<T>
70where
71    T: WrappedServiceTypeSupport,
72{
73    Client { client }
74}
75
76pub fn make_untyped_client(client: Weak<Mutex<UntypedClient_>>) -> ClientUntyped {
77    ClientUntyped { client }
78}
79
80unsafe impl<T> Send for TypedClient<T> where T: WrappedServiceTypeSupport {}
81
82impl<T: 'static> TypedClient<T>
83where
84    T: WrappedServiceTypeSupport,
85{
86    pub fn request(&mut self, msg: &T::Request) -> Result<impl Future<Output = Result<T::Response>>>
87    where
88        T: WrappedServiceTypeSupport,
89    {
90        let native_msg: WrappedNativeMsg<T::Request> = WrappedNativeMsg::<T::Request>::from(msg);
91        let mut seq_no = 0i64;
92        let result =
93            unsafe { rcl_send_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no) };
94
95        let (sender, receiver) = oneshot::channel::<T::Response>();
96
97        if result == RCL_RET_OK as i32 {
98            self.response_channels.push((seq_no, sender));
99            // instead of "canceled" we return invalid client.
100            Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID))
101        } else {
102            log::error!("could not send request {}", result);
103            Err(Error::from_rcl_error(result))
104        }
105    }
106}
107
108unsafe impl Send for UntypedClient_ {}
109
110impl UntypedClient_ {
111    pub fn request(
112        &mut self, msg: serde_json::Value,
113    ) -> Result<impl Future<Output = Result<Result<serde_json::Value>>>> {
114        let native_msg = (self.service_type.make_request_msg)();
115        native_msg.from_json(msg)?;
116
117        let mut seq_no = 0i64;
118        let result =
119            unsafe { rcl_send_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no) };
120
121        let (sender, receiver) = oneshot::channel::<Result<serde_json::Value>>();
122
123        if result == RCL_RET_OK as i32 {
124            self.response_channels.push((seq_no, sender));
125            // instead of "canceled" we return invalid client.
126            Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID))
127        } else {
128            log::error!("could not send request {}", result);
129            Err(Error::from_rcl_error(result))
130        }
131    }
132}
133
134pub trait Client_ {
135    fn handle(&self) -> &rcl_client_t;
136    fn handle_response(&mut self);
137    fn register_poll_available(&mut self, s: oneshot::Sender<()>);
138    fn poll_available(&mut self, node: &mut rcl_node_t);
139    fn destroy(&mut self, node: &mut rcl_node_t);
140}
141
142pub struct TypedClient<T>
143where
144    T: WrappedServiceTypeSupport,
145{
146    pub rcl_handle: rcl_client_t,
147    pub response_channels: Vec<(i64, oneshot::Sender<T::Response>)>,
148    pub poll_available_channels: Vec<oneshot::Sender<()>>,
149}
150
151impl<T: 'static> Client_ for TypedClient<T>
152where
153    T: WrappedServiceTypeSupport,
154{
155    fn handle(&self) -> &rcl_client_t {
156        &self.rcl_handle
157    }
158
159    fn handle_response(&mut self) {
160        let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
161        let mut response_msg = WrappedNativeMsg::<T::Response>::new();
162
163        let ret = unsafe {
164            rcl_take_response(
165                &self.rcl_handle,
166                request_id.as_mut_ptr(),
167                response_msg.void_ptr_mut(),
168            )
169        };
170        if ret == RCL_RET_OK as i32 {
171            let request_id = unsafe { request_id.assume_init() };
172            if let Some(idx) = self
173                .response_channels
174                .iter()
175                .position(|(id, _)| id == &request_id.sequence_number)
176            {
177                let (_, sender) = self.response_channels.swap_remove(idx);
178                let response = T::Response::from_native(&response_msg);
179                match sender.send(response) {
180                    Ok(()) => {}
181                    Err(e) => {
182                        log::debug!("error sending to client: {:?}", e);
183                    }
184                }
185            } else {
186                let we_have: String = self
187                    .response_channels
188                    .iter()
189                    .map(|(id, _)| id.to_string())
190                    .collect::<Vec<_>>()
191                    .join(",");
192                log::error!(
193                    "no such req id: {}, we have [{}], ignoring",
194                    request_id.sequence_number,
195                    we_have
196                );
197            }
198        } // TODO handle failure.
199    }
200
201    fn register_poll_available(&mut self, s: oneshot::Sender<()>) {
202        self.poll_available_channels.push(s);
203    }
204
205    fn poll_available(&mut self, node: &mut rcl_node_t) {
206        if self.poll_available_channels.is_empty() {
207            return;
208        }
209        let available = service_available_helper(node, self.handle());
210        match available {
211            Ok(true) => {
212                // send ok and close channels
213                while let Some(sender) = self.poll_available_channels.pop() {
214                    let _res = sender.send(()); // we ignore if receiver dropped.
215                }
216            }
217            Ok(false) => {
218                // not available...
219            }
220            Err(_) => {
221                // error, close all channels
222                self.poll_available_channels.clear();
223            }
224        }
225    }
226
227    fn destroy(&mut self, node: &mut rcl_node_t) {
228        unsafe {
229            rcl_client_fini(&mut self.rcl_handle, node);
230        }
231    }
232}
233
234pub struct UntypedClient_ {
235    pub service_type: UntypedServiceSupport,
236    pub rcl_handle: rcl_client_t,
237    pub response_channels: Vec<(i64, oneshot::Sender<Result<serde_json::Value>>)>,
238    pub poll_available_channels: Vec<oneshot::Sender<()>>,
239}
240
241impl Client_ for UntypedClient_ {
242    fn handle(&self) -> &rcl_client_t {
243        &self.rcl_handle
244    }
245
246    fn handle_response(&mut self) {
247        let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
248        let mut response_msg = (self.service_type.make_response_msg)();
249
250        let ret = unsafe {
251            rcl_take_response(
252                &self.rcl_handle,
253                request_id.as_mut_ptr(),
254                response_msg.void_ptr_mut(),
255            )
256        };
257        if ret == RCL_RET_OK as i32 {
258            let request_id = unsafe { request_id.assume_init() };
259            if let Some(idx) = self
260                .response_channels
261                .iter()
262                .position(|(id, _)| id == &request_id.sequence_number)
263            {
264                let (_, sender) = self.response_channels.swap_remove(idx);
265                let response = response_msg.to_json();
266                match sender.send(response) {
267                    Ok(()) => {}
268                    Err(e) => {
269                        log::debug!("error sending to client: {:?}", e);
270                    }
271                }
272            } else {
273                let we_have: String = self
274                    .response_channels
275                    .iter()
276                    .map(|(id, _)| id.to_string())
277                    .collect::<Vec<_>>()
278                    .join(",");
279                log::error!(
280                    "no such req id: {}, we have [{}], ignoring",
281                    request_id.sequence_number,
282                    we_have
283                );
284            }
285        } // TODO handle failure.
286    }
287
288    fn register_poll_available(&mut self, s: oneshot::Sender<()>) {
289        self.poll_available_channels.push(s);
290    }
291
292    fn poll_available(&mut self, node: &mut rcl_node_t) {
293        if self.poll_available_channels.is_empty() {
294            return;
295        }
296        let available = service_available_helper(node, self.handle());
297        match available {
298            Ok(true) => {
299                // send ok and close channels
300                while let Some(sender) = self.poll_available_channels.pop() {
301                    let _res = sender.send(()); // we ignore if receiver dropped.
302                }
303            }
304            Ok(false) => {
305                // not available...
306            }
307            Err(_) => {
308                // error, close all channels
309                self.poll_available_channels.clear();
310            }
311        }
312    }
313
314    fn destroy(&mut self, node: &mut rcl_node_t) {
315        unsafe {
316            rcl_client_fini(&mut self.rcl_handle, node);
317        }
318    }
319}
320
321pub fn create_client_helper(
322    node: *mut rcl_node_t, service_name: &str, service_ts: *const rosidl_service_type_support_t,
323    qos_profile: QosProfile,
324) -> Result<rcl_client_t> {
325    let mut client_handle = unsafe { rcl_get_zero_initialized_client() };
326    let service_name_c_string =
327        CString::new(service_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
328
329    let result = unsafe {
330        let mut client_options = rcl_client_get_default_options();
331        client_options.qos = qos_profile.into();
332        rcl_client_init(
333            &mut client_handle,
334            node,
335            service_ts,
336            service_name_c_string.as_ptr(),
337            &client_options,
338        )
339    };
340    if result == RCL_RET_OK as i32 {
341        Ok(client_handle)
342    } else {
343        Err(Error::from_rcl_error(result))
344    }
345}
346
347pub fn service_available_helper(node: &mut rcl_node_t, client: &rcl_client_t) -> Result<bool> {
348    let mut avail = false;
349    let result = unsafe { rcl_service_server_is_available(node, client, &mut avail) };
350
351    if result == RCL_RET_OK as i32 {
352        Ok(avail)
353    } else {
354        Err(Error::from_rcl_error(result))
355    }
356}
357
358use crate::nodes::IsAvailablePollable;
359
360impl<T: 'static> IsAvailablePollable for Client<T>
361where
362    T: WrappedServiceTypeSupport,
363{
364    fn register_poll_available(&self, sender: oneshot::Sender<()>) -> Result<()> {
365        let client = self.client.upgrade().ok_or(Error::RCL_RET_CLIENT_INVALID)?;
366        let mut client = client.lock().unwrap();
367        client.register_poll_available(sender);
368        Ok(())
369    }
370}
371
372impl IsAvailablePollable for ClientUntyped {
373    fn register_poll_available(&self, sender: oneshot::Sender<()>) -> Result<()> {
374        let client = self.client.upgrade().ok_or(Error::RCL_RET_CLIENT_INVALID)?;
375        let mut client = client.lock().unwrap();
376        client.register_poll_available(sender);
377        Ok(())
378    }
379}