r2r/
services.rs

1use futures::channel::mpsc;
2use std::{
3    ffi::CString,
4    mem::MaybeUninit,
5    sync::{Arc, Mutex, Weak},
6};
7
8use crate::{error::*, msg_types::*, QosProfile};
9use r2r_rcl::*;
10
11/// Encapsulates a service request.
12///
13/// In contrast to having a callback from Request -> Response
14/// types that is called synchronously, the service request can be
15/// moved around and completed asynchronously.
16///
17/// To complete the request, call the `respond` function.
18#[derive(Clone)]
19pub struct ServiceRequest<T>
20where
21    T: WrappedServiceTypeSupport,
22{
23    pub message: T::Request,
24    request_id: rmw_request_id_t,
25    service: Weak<Mutex<dyn Service_>>,
26}
27
28unsafe impl<T> Send for ServiceRequest<T> where T: WrappedServiceTypeSupport {}
29
30impl<T> ServiceRequest<T>
31where
32    T: 'static + WrappedServiceTypeSupport,
33{
34    /// Complete the service request, consuming the request in the process.
35    pub fn respond(self, msg: T::Response) -> Result<()> {
36        let service = self
37            .service
38            .upgrade()
39            .ok_or(Error::RCL_RET_ACTION_SERVER_INVALID)?;
40        let mut service = service.lock().unwrap();
41        let native_msg = WrappedNativeMsg::<T::Response>::from(&msg);
42        service.send_response(self.request_id, Box::new(native_msg))
43    }
44}
45
46pub trait Service_ {
47    fn handle(&self) -> &rcl_service_t;
48    fn send_response(&mut self, request_id: rmw_request_id_t, msg: Box<dyn VoidPtr>) -> Result<()>;
49    /// Returns true if the service stream has been dropped.
50    fn handle_request(&mut self, service: Arc<Mutex<dyn Service_>>) -> bool;
51    fn destroy(&mut self, node: &mut rcl_node_t);
52}
53
54pub struct TypedService<T>
55where
56    T: WrappedServiceTypeSupport,
57{
58    pub rcl_handle: rcl_service_t,
59    pub sender: mpsc::Sender<ServiceRequest<T>>,
60}
61
62impl<T: 'static> Service_ for TypedService<T>
63where
64    T: WrappedServiceTypeSupport,
65{
66    fn handle(&self) -> &rcl_service_t {
67        &self.rcl_handle
68    }
69
70    fn send_response(
71        &mut self, mut request_id: rmw_request_id_t, mut msg: Box<dyn VoidPtr>,
72    ) -> Result<()> {
73        let res =
74            unsafe { rcl_send_response(&self.rcl_handle, &mut request_id, msg.void_ptr_mut()) };
75        if res == RCL_RET_OK as i32 {
76            Ok(())
77        } else {
78            Err(Error::from_rcl_error(res))
79        }
80    }
81
82    fn handle_request(&mut self, service: Arc<Mutex<dyn Service_>>) -> bool {
83        let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
84        let mut request_msg = WrappedNativeMsg::<T::Request>::new();
85
86        let ret = unsafe {
87            rcl_take_request(&self.rcl_handle, request_id.as_mut_ptr(), request_msg.void_ptr_mut())
88        };
89        if ret == RCL_RET_OK as i32 {
90            let request_id = unsafe { request_id.assume_init() };
91            let request_msg = T::Request::from_native(&request_msg);
92            let request = ServiceRequest::<T> {
93                message: request_msg,
94                request_id,
95                service: Arc::downgrade(&service),
96            };
97            if let Err(e) = self.sender.try_send(request) {
98                if e.is_disconnected() {
99                    return true;
100                }
101                log::error!("warning: could not send service request ({})", e)
102            }
103        } // TODO handle failure.
104        false
105    }
106
107    fn destroy(&mut self, node: &mut rcl_node_t) {
108        unsafe {
109            rcl_service_fini(&mut self.rcl_handle, node);
110        }
111    }
112}
113
114pub fn create_service_helper(
115    node: &mut rcl_node_t, service_name: &str, service_ts: *const rosidl_service_type_support_t,
116    qos_profile: QosProfile,
117) -> Result<rcl_service_t> {
118    let mut service_handle = unsafe { rcl_get_zero_initialized_service() };
119    let service_name_c_string =
120        CString::new(service_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
121
122    let result = unsafe {
123        let mut service_options = rcl_service_get_default_options();
124        service_options.qos = qos_profile.into();
125        rcl_service_init(
126            &mut service_handle,
127            node,
128            service_ts,
129            service_name_c_string.as_ptr(),
130            &service_options,
131        )
132    };
133    if result == RCL_RET_OK as i32 {
134        Ok(service_handle)
135    } else {
136        Err(Error::from_rcl_error(result))
137    }
138}