r2r/
action_clients.rs

1use futures::{
2    channel::{mpsc, oneshot},
3    future::{FutureExt, TryFutureExt},
4    stream::Stream,
5};
6use std::{
7    collections::HashMap,
8    ffi::CString,
9    future::Future,
10    mem::MaybeUninit,
11    sync::{Mutex, Weak},
12};
13
14use crate::{
15    action_common::*,
16    error::*,
17    msg_types::{
18        generated_msgs::{action_msgs, builtin_interfaces, unique_identifier_msgs},
19        *,
20    },
21};
22use r2r_actions::*;
23use r2r_rcl::*;
24
25unsafe impl<T> Send for ActionClient<T> where T: WrappedActionTypeSupport {}
26
27/// Action client
28///
29/// Use this to make goal requests to an action server.
30#[derive(Clone)]
31pub struct ActionClient<T>
32where
33    T: WrappedActionTypeSupport,
34{
35    client: Weak<Mutex<WrappedActionClient<T>>>,
36}
37
38unsafe impl<T> Send for ActionClientGoal<T> where T: WrappedActionTypeSupport {}
39
40/// Action client goal handle
41///
42/// This can be used to cancel goals and query the status of goals.
43#[derive(Clone)]
44pub struct ActionClientGoal<T>
45where
46    T: WrappedActionTypeSupport,
47{
48    client: Weak<Mutex<WrappedActionClient<T>>>,
49    pub uuid: uuid::Uuid,
50}
51
52impl<T: 'static> ActionClientGoal<T>
53where
54    T: WrappedActionTypeSupport,
55{
56    /// Get the current status of this goal.
57    pub fn get_status(&self) -> Result<GoalStatus> {
58        let client = self
59            .client
60            .upgrade()
61            .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
62        let client = client.lock().unwrap();
63
64        Ok(client.get_goal_status(&self.uuid))
65    }
66
67    /// Send a cancel request for this goal to the server.
68    ///
69    /// If the server accepts and completes the request, the future completes without error.
70    /// Otherwise, one of these errors can be returned:
71    /// - `GoalCancelRejected`
72    /// - `GoalCancelUnknownGoalID`
73    /// - `GoalCancelAlreadyTerminated`
74    pub fn cancel(&self) -> Result<impl Future<Output = Result<()>>> {
75        // upgrade to actual ref. if still alive
76        let client = self
77            .client
78            .upgrade()
79            .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
80        let mut client = client.lock().unwrap();
81
82        client.send_cancel_request(&self.uuid)
83    }
84}
85
86impl<T: 'static> ActionClient<T>
87where
88    T: WrappedActionTypeSupport,
89{
90    /// Make a new goal request.
91    ///
92    /// If the server accepts the new goal, the future resolves to a triple of:
93    /// - A goal handle.
94    /// - A new future for the eventual result.
95    /// - A stream of feedback messages.
96    pub fn send_goal_request(
97        &self, goal: T::Goal,
98    ) -> Result<
99        impl Future<
100            Output = Result<(
101                ActionClientGoal<T>,
102                impl Future<Output = Result<(GoalStatus, T::Result)>>,
103                impl Stream<Item = T::Feedback> + Unpin,
104            )>,
105        >,
106    >
107    where
108        T: WrappedActionTypeSupport,
109    {
110        // upgrade to actual ref. if still alive
111        let client = self
112            .client
113            .upgrade()
114            .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
115        let mut client = client.lock().unwrap();
116
117        let uuid = uuid::Uuid::new_v4();
118        let uuid_msg = unique_identifier_msgs::msg::UUID {
119            uuid: uuid.as_bytes().to_vec(),
120        };
121        let request_msg = T::make_goal_request_msg(uuid_msg, goal);
122        let native_msg = WrappedNativeMsg::<
123            <<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Request,
124        >::from(&request_msg);
125        let mut seq_no = 0i64;
126        let result = unsafe {
127            rcl_action_send_goal_request(&client.rcl_handle, native_msg.void_ptr(), &mut seq_no)
128        };
129
130        // set up channels
131        let (goal_req_sender, goal_req_receiver) =
132            oneshot::channel::<(bool, builtin_interfaces::msg::Time)>();
133        let (feedback_sender, feedback_receiver) = mpsc::channel::<T::Feedback>(10);
134        client.feedback_senders.push((uuid, feedback_sender));
135        let (result_sender, result_receiver) = oneshot::channel::<(GoalStatus, T::Result)>();
136        client.result_senders.push((uuid, result_sender));
137
138        if result == RCL_RET_OK as i32 {
139            client
140                .goal_response_channels
141                .push((seq_no, goal_req_sender));
142            // instead of "canceled" we return invalid client.
143            let fut_client = Weak::clone(&self.client);
144            let future = goal_req_receiver
145                .map_err(|_| Error::RCL_RET_ACTION_CLIENT_INVALID)
146                .map(move |r| match r {
147                    Ok((accepted, _stamp)) => {
148                        if accepted {
149                            // on goal accept we immediately send the result request
150                            {
151                                let c = fut_client
152                                    .upgrade()
153                                    .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
154                                let mut c = c.lock().unwrap();
155                                c.send_result_request(uuid);
156                            }
157
158                            Ok((
159                                ActionClientGoal {
160                                    client: fut_client,
161                                    uuid,
162                                },
163                                result_receiver.map_err(|_| Error::RCL_RET_ACTION_CLIENT_INVALID),
164                                feedback_receiver,
165                            ))
166                        } else {
167                            Err(Error::RCL_RET_ACTION_GOAL_REJECTED)
168                        }
169                    }
170                    Err(e) => Err(e),
171                });
172            Ok(future)
173        } else {
174            log::error!("could not send goal request {}", result);
175            Err(Error::from_rcl_error(result))
176        }
177    }
178}
179
180pub fn make_action_client<T>(client: Weak<Mutex<WrappedActionClient<T>>>) -> ActionClient<T>
181where
182    T: WrappedActionTypeSupport,
183{
184    ActionClient { client }
185}
186
187pub type ResultSender<R> = (uuid::Uuid, oneshot::Sender<(GoalStatus, R)>);
188pub struct WrappedActionClient<T>
189where
190    T: WrappedActionTypeSupport,
191{
192    pub rcl_handle: rcl_action_client_t,
193    pub goal_response_channels: Vec<(i64, oneshot::Sender<(bool, builtin_interfaces::msg::Time)>)>,
194    pub cancel_response_channels:
195        Vec<(i64, oneshot::Sender<action_msgs::srv::CancelGoal::Response>)>,
196    pub feedback_senders: Vec<(uuid::Uuid, mpsc::Sender<T::Feedback>)>,
197    pub result_requests: Vec<(i64, uuid::Uuid)>,
198    pub result_senders: Vec<ResultSender<T::Result>>,
199    pub goal_status: HashMap<uuid::Uuid, GoalStatus>,
200
201    pub poll_available_channels: Vec<oneshot::Sender<()>>,
202}
203
204pub trait ActionClient_ {
205    fn handle(&self) -> &rcl_action_client_t;
206    fn destroy(&mut self, node: &mut rcl_node_t);
207
208    fn handle_goal_response(&mut self);
209    fn handle_cancel_response(&mut self);
210    fn handle_feedback_msg(&mut self);
211    fn handle_status_msg(&mut self);
212    fn handle_result_response(&mut self);
213
214    fn send_result_request(&mut self, uuid: uuid::Uuid);
215
216    fn register_poll_available(&mut self, s: oneshot::Sender<()>);
217    fn poll_available(&mut self, node: &mut rcl_node_t);
218}
219
220impl<T> WrappedActionClient<T>
221where
222    T: WrappedActionTypeSupport,
223{
224    pub fn get_goal_status(&self, uuid: &uuid::Uuid) -> GoalStatus {
225        *self.goal_status.get(uuid).unwrap_or(&GoalStatus::Unknown)
226    }
227
228    pub fn send_cancel_request(
229        &mut self, goal: &uuid::Uuid,
230    ) -> Result<impl Future<Output = Result<()>>>
231    where
232        T: WrappedActionTypeSupport,
233    {
234        let msg = action_msgs::srv::CancelGoal::Request {
235            goal_info: action_msgs::msg::GoalInfo {
236                goal_id: unique_identifier_msgs::msg::UUID {
237                    uuid: goal.as_bytes().to_vec(),
238                },
239                ..action_msgs::msg::GoalInfo::default()
240            },
241        };
242        let native_msg = WrappedNativeMsg::<action_msgs::srv::CancelGoal::Request>::from(&msg);
243        let mut seq_no = 0i64;
244        let result = unsafe {
245            rcl_action_send_cancel_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no)
246        };
247
248        if result == RCL_RET_OK as i32 {
249            let (cancel_req_sender, cancel_req_receiver) =
250                oneshot::channel::<action_msgs::srv::CancelGoal::Response>();
251
252            self.cancel_response_channels
253                .push((seq_no, cancel_req_sender));
254            // instead of "canceled" we return invalid client.
255            let future = cancel_req_receiver
256                .map_err(|_| Error::RCL_RET_CLIENT_INVALID)
257                .map(|r| match r {
258                    Ok(r) => match r.return_code {
259                        e if e == action_msgs::srv::CancelGoal::Response::ERROR_NONE as i8 => {
260                            Ok(())
261                        }
262                        e if e == action_msgs::srv::CancelGoal::Response::ERROR_REJECTED as i8 => {
263                            Err(Error::GoalCancelRejected)
264                        }
265                        e if e
266                            == action_msgs::srv::CancelGoal::Response::ERROR_UNKNOWN_GOAL_ID
267                                as i8 =>
268                        {
269                            Err(Error::GoalCancelUnknownGoalID)
270                        }
271                        e if e
272                            == action_msgs::srv::CancelGoal::Response::ERROR_GOAL_TERMINATED
273                                as i8 =>
274                        {
275                            Err(Error::GoalCancelAlreadyTerminated)
276                        }
277                        x => panic!("unknown error code return from action server: {}", x),
278                    },
279                    Err(e) => Err(e),
280                });
281            Ok(future)
282        } else {
283            log::error!("could not send goal request {}", result);
284            Err(Error::from_rcl_error(result))
285        }
286    }
287}
288
289impl<T: 'static> ActionClient_ for WrappedActionClient<T>
290where
291    T: WrappedActionTypeSupport,
292{
293    fn handle(&self) -> &rcl_action_client_t {
294        &self.rcl_handle
295    }
296
297    fn handle_goal_response(&mut self) {
298        let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
299        let mut response_msg = WrappedNativeMsg::<
300            <<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response,
301        >::new();
302
303        let ret = unsafe {
304            rcl_action_take_goal_response(
305                &self.rcl_handle,
306                request_id.as_mut_ptr(),
307                response_msg.void_ptr_mut(),
308            )
309        };
310        if ret == RCL_RET_OK as i32 {
311            let request_id = unsafe { request_id.assume_init() };
312            if let Some(idx) = self
313                .goal_response_channels
314                .iter()
315                .position(|(id, _)| id == &request_id.sequence_number)
316            {
317                let (_, sender) = self.goal_response_channels.swap_remove(idx);
318                let response = <<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response::from_native(&response_msg);
319                let (accept, stamp) = T::destructure_goal_response_msg(response);
320                match sender.send((accept, stamp)) {
321                    Ok(()) => {}
322                    Err(e) => {
323                        log::debug!("error sending to action client: {:?}", e);
324                    }
325                }
326            } else {
327                let we_have: String = self
328                    .goal_response_channels
329                    .iter()
330                    .map(|(id, _)| id.to_string())
331                    .collect::<Vec<_>>()
332                    .join(",");
333                log::error!(
334                    "no such req id: {}, we have [{}], ignoring",
335                    request_id.sequence_number,
336                    we_have
337                );
338            }
339        }
340    }
341
342    fn handle_cancel_response(&mut self) {
343        let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
344        let mut response_msg = WrappedNativeMsg::<action_msgs::srv::CancelGoal::Response>::new();
345
346        let ret = unsafe {
347            rcl_action_take_cancel_response(
348                &self.rcl_handle,
349                request_id.as_mut_ptr(),
350                response_msg.void_ptr_mut(),
351            )
352        };
353        if ret == RCL_RET_OK as i32 {
354            let request_id = unsafe { request_id.assume_init() };
355            if let Some(idx) = self
356                .cancel_response_channels
357                .iter()
358                .position(|(id, _)| id == &request_id.sequence_number)
359            {
360                let (_, sender) = self.cancel_response_channels.swap_remove(idx);
361                let response = action_msgs::srv::CancelGoal::Response::from_native(&response_msg);
362                if let Err(e) = sender.send(response) {
363                    log::error!("warning: could not send cancel response msg ({:?})", e)
364                }
365            } else {
366                let we_have: String = self
367                    .goal_response_channels
368                    .iter()
369                    .map(|(id, _)| id.to_string())
370                    .collect::<Vec<_>>()
371                    .join(",");
372                log::error!(
373                    "no such req id: {}, we have [{}], ignoring",
374                    request_id.sequence_number,
375                    we_have
376                );
377            }
378        }
379    }
380
381    fn handle_feedback_msg(&mut self) {
382        let mut feedback_msg = WrappedNativeMsg::<T::FeedbackMessage>::new();
383        let ret =
384            unsafe { rcl_action_take_feedback(&self.rcl_handle, feedback_msg.void_ptr_mut()) };
385        if ret == RCL_RET_OK as i32 {
386            let msg = T::FeedbackMessage::from_native(&feedback_msg);
387            let (uuid, feedback) = T::destructure_feedback_msg(msg);
388            let msg_uuid = uuid_msg_to_uuid(&uuid);
389            if let Some((_, sender)) = self
390                .feedback_senders
391                .iter_mut()
392                .find(|(uuid, _)| uuid == &msg_uuid)
393            {
394                if let Err(e) = sender.try_send(feedback) {
395                    log::error!("warning: could not send feedback msg ({})", e)
396                }
397            }
398        }
399    }
400
401    fn handle_status_msg(&mut self) {
402        let mut status_array = WrappedNativeMsg::<action_msgs::msg::GoalStatusArray>::new();
403        let ret = unsafe { rcl_action_take_status(&self.rcl_handle, status_array.void_ptr_mut()) };
404        if ret == RCL_RET_OK as i32 {
405            let arr = action_msgs::msg::GoalStatusArray::from_native(&status_array);
406            for a in &arr.status_list {
407                let uuid = uuid_msg_to_uuid(&a.goal_info.goal_id);
408                if !self.result_senders.iter().any(|(suuid, _)| suuid == &uuid) {
409                    continue;
410                }
411                let status = GoalStatus::from_rcl(a.status);
412                *self.goal_status.entry(uuid).or_insert(GoalStatus::Unknown) = status;
413            }
414        }
415    }
416
417    fn handle_result_response(&mut self) {
418        let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
419        let mut response_msg = WrappedNativeMsg::<
420            <<T as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Response,
421        >::new();
422
423        let ret = unsafe {
424            rcl_action_take_result_response(
425                &self.rcl_handle,
426                request_id.as_mut_ptr(),
427                response_msg.void_ptr_mut(),
428            )
429        };
430
431        if ret == RCL_RET_OK as i32 {
432            let request_id = unsafe { request_id.assume_init() };
433            if let Some(idx) = self
434                .result_requests
435                .iter()
436                .position(|(id, _)| id == &request_id.sequence_number)
437            {
438                let (_, uuid) = self.result_requests.swap_remove(idx);
439                if let Some(idx) = self
440                    .result_senders
441                    .iter()
442                    .position(|(suuid, _)| suuid == &uuid)
443                {
444                    let (_, sender) = self.result_senders.swap_remove(idx);
445                    let response = <<T as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Response::from_native(&response_msg);
446                    let (status, result) = T::destructure_result_response_msg(response);
447                    let status = GoalStatus::from_rcl(status);
448                    match sender.send((status, result)) {
449                        Ok(()) => {}
450                        Err(e) => {
451                            log::debug!("error sending result to action client: {:?}", e);
452                        }
453                    }
454                }
455            } else {
456                let we_have: String = self
457                    .result_requests
458                    .iter()
459                    .map(|(id, _)| id.to_string())
460                    .collect::<Vec<_>>()
461                    .join(",");
462                log::error!(
463                    "no such req id: {}, we have [{}], ignoring",
464                    request_id.sequence_number,
465                    we_have
466                );
467            }
468        }
469    }
470
471    fn send_result_request(&mut self, uuid: uuid::Uuid) {
472        let uuid_msg = unique_identifier_msgs::msg::UUID {
473            uuid: uuid.as_bytes().to_vec(),
474        };
475        let request_msg = T::make_result_request_msg(uuid_msg);
476        let native_msg = WrappedNativeMsg::<
477            <<T as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Request,
478        >::from(&request_msg);
479        let mut seq_no = 0i64;
480        let result = unsafe {
481            rcl_action_send_result_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no)
482        };
483
484        if result == RCL_RET_OK as i32 {
485            self.result_requests.push((seq_no, uuid));
486        } else {
487            log::error!("could not send request {}", result);
488        }
489    }
490
491    fn register_poll_available(&mut self, s: oneshot::Sender<()>) {
492        self.poll_available_channels.push(s);
493    }
494
495    fn poll_available(&mut self, node: &mut rcl_node_t) {
496        if self.poll_available_channels.is_empty() {
497            return;
498        }
499        let available = action_server_available_helper(node, self.handle());
500        match available {
501            Ok(true) => {
502                // send ok and close channels
503                while let Some(sender) = self.poll_available_channels.pop() {
504                    let _res = sender.send(()); // we ignore if receiver dropped.
505                }
506            }
507            Ok(false) => {
508                // not available...
509            }
510            Err(_) => {
511                // error, close all channels
512                self.poll_available_channels.clear();
513            }
514        }
515    }
516
517    fn destroy(&mut self, node: &mut rcl_node_t) {
518        unsafe {
519            rcl_action_client_fini(&mut self.rcl_handle, node);
520        }
521    }
522}
523
524pub fn create_action_client_helper(
525    node: &mut rcl_node_t, action_name: &str, action_ts: *const rosidl_action_type_support_t,
526) -> Result<rcl_action_client_t> {
527    let mut client_handle = unsafe { rcl_action_get_zero_initialized_client() };
528    let action_name_c_string =
529        CString::new(action_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
530
531    let result = unsafe {
532        let client_options = rcl_action_client_get_default_options();
533        rcl_action_client_init(
534            &mut client_handle,
535            node,
536            action_ts,
537            action_name_c_string.as_ptr(),
538            &client_options,
539        )
540    };
541    if result == RCL_RET_OK as i32 {
542        Ok(client_handle)
543    } else {
544        Err(Error::from_rcl_error(result))
545    }
546}
547
548pub fn action_client_get_num_waits(
549    rcl_handle: &rcl_action_client_t, num_subs: &mut usize, num_gc: &mut usize,
550    num_timers: &mut usize, num_clients: &mut usize, num_services: &mut usize,
551) -> Result<()> {
552    unsafe {
553        let result = rcl_action_client_wait_set_get_num_entities(
554            rcl_handle,
555            num_subs,
556            num_gc,
557            num_timers,
558            num_clients,
559            num_services,
560        );
561        if result == RCL_RET_OK as i32 {
562            Ok(())
563        } else {
564            Err(Error::from_rcl_error(result))
565        }
566    }
567}
568
569use crate::nodes::IsAvailablePollable;
570
571impl<T: 'static> IsAvailablePollable for ActionClient<T>
572where
573    T: WrappedActionTypeSupport,
574{
575    fn register_poll_available(&self, sender: oneshot::Sender<()>) -> Result<()> {
576        let client = self
577            .client
578            .upgrade()
579            .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
580        let mut client = client.lock().unwrap();
581        client.register_poll_available(sender);
582        Ok(())
583    }
584}
585
586pub fn action_server_available_helper(
587    node: &rcl_node_t, client: &rcl_action_client_t,
588) -> Result<bool> {
589    let mut avail = false;
590    let result = unsafe { rcl_action_server_is_available(node, client, &mut avail) };
591
592    if result == RCL_RET_OK as i32 {
593        Ok(avail)
594    } else {
595        log::error!("could not check if action server is available {}", result);
596        Err(Error::from_rcl_error(result))
597    }
598}