r2r/
action_clients_untyped.rs

1use futures::{
2    channel::{mpsc, oneshot},
3    future::{FutureExt, TryFutureExt},
4    stream::Stream,
5};
6use std::{
7    collections::HashMap,
8    future::Future,
9    mem::MaybeUninit,
10    sync::{Mutex, Weak},
11};
12
13use crate::{
14    action_clients::*,
15    action_common::*,
16    error::*,
17    msg_types::{
18        generated_msgs::{action_msgs, builtin_interfaces, unique_identifier_msgs},
19        *,
20    },
21};
22use r2r_actions::*;
23use r2r_rcl::*;
24//
25// TODO: refactor this to separate out shared code between typed action client and this.
26//
27
28unsafe impl Send for ActionClientUntyped {}
29
30/// Action client (untyped)
31///
32/// Use this to make goal requests to an action server, without having
33/// the concrete types at compile-time.
34#[derive(Clone)]
35pub struct ActionClientUntyped {
36    client: Weak<Mutex<WrappedActionClientUntyped>>,
37}
38
39unsafe impl Send for ActionClientGoalUntyped {}
40
41/// Action client goal handle (untyped)
42///
43/// This can be used to cancel goals and query the status of goals.
44#[derive(Clone)]
45pub struct ActionClientGoalUntyped {
46    client: Weak<Mutex<WrappedActionClientUntyped>>,
47    pub uuid: uuid::Uuid,
48}
49
50impl ActionClientGoalUntyped {
51    /// Get the current status of this goal.
52    pub fn get_status(&self) -> Result<GoalStatus> {
53        let client = self
54            .client
55            .upgrade()
56            .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
57        let client = client.lock().unwrap();
58
59        Ok(client.get_goal_status(&self.uuid))
60    }
61
62    /// Send a cancel request for this goal to the server.
63    ///
64    /// If the server accepts and completes the request, the future completes without error.
65    /// Otherwise, one of these errors can be returned:
66    /// - `GoalCancelRejected`
67    /// - `GoalCancelUnknownGoalID`
68    /// - `GoalCancelAlreadyTerminated`
69    pub fn cancel(&self) -> Result<impl Future<Output = Result<()>>> {
70        // upgrade to actual ref. if still alive
71        let client = self
72            .client
73            .upgrade()
74            .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
75        let mut client = client.lock().unwrap();
76
77        client.send_cancel_request(&self.uuid)
78    }
79}
80
81impl ActionClientUntyped {
82    /// Make a new goal request.
83    ///
84    /// If the server accepts the new goal, the future resolves to a triple of:
85    /// - A goal handle.
86    /// - A new future for the eventual result. (as `serde_json::Value`)
87    /// - A stream of feedback messages. (as `serde_json::Value`)
88    pub fn send_goal_request(
89        &self,
90        goal: serde_json::Value, // T::Goal
91    ) -> Result<
92        impl Future<
93            Output = Result<(
94                ActionClientGoalUntyped,
95                impl Future<Output = Result<(GoalStatus, Result<serde_json::Value>)>>, // T::Result
96                impl Stream<Item = Result<serde_json::Value>> + Unpin, // T::Feedback
97            )>,
98        >,
99    > {
100        // upgrade to actual ref. if still alive
101        let client = self
102            .client
103            .upgrade()
104            .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
105        let mut client = client.lock().unwrap();
106
107        let uuid = uuid::Uuid::new_v4();
108        let uuid_msg = unique_identifier_msgs::msg::UUID {
109            uuid: uuid.as_bytes().to_vec(),
110        };
111
112        let native_msg = (client.action_type_support.make_goal_request_msg)(uuid_msg, goal);
113
114        let mut seq_no = 0i64;
115        let result = unsafe {
116            rcl_action_send_goal_request(&client.rcl_handle, native_msg.void_ptr(), &mut seq_no)
117        };
118
119        // set up channels
120        let (goal_req_sender, goal_req_receiver) =
121            oneshot::channel::<(bool, builtin_interfaces::msg::Time)>();
122        let (feedback_sender, feedback_receiver) = mpsc::channel::<Result<serde_json::Value>>(10);
123        client.feedback_senders.push((uuid, feedback_sender));
124        let (result_sender, result_receiver) =
125            oneshot::channel::<(GoalStatus, Result<serde_json::Value>)>();
126        client.result_senders.push((uuid, result_sender));
127
128        if result == RCL_RET_OK as i32 {
129            client
130                .goal_response_channels
131                .push((seq_no, goal_req_sender));
132            // instead of "canceled" we return invalid client.
133            let fut_client = Weak::clone(&self.client);
134            let future = goal_req_receiver
135                .map_err(|_| Error::RCL_RET_ACTION_CLIENT_INVALID)
136                .map(move |r| match r {
137                    Ok((accepted, _stamp)) => {
138                        if accepted {
139                            // on goal accept we immediately send the result request
140                            {
141                                let c = fut_client
142                                    .upgrade()
143                                    .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
144                                let mut c = c.lock().unwrap();
145                                c.send_result_request(uuid);
146                            }
147
148                            Ok((
149                                ActionClientGoalUntyped {
150                                    client: fut_client,
151                                    uuid,
152                                },
153                                result_receiver.map_err(|_| Error::RCL_RET_ACTION_CLIENT_INVALID),
154                                feedback_receiver,
155                            ))
156                        } else {
157                            Err(Error::RCL_RET_ACTION_GOAL_REJECTED)
158                        }
159                    }
160                    Err(e) => Err(e),
161                });
162            Ok(future)
163        } else {
164            log::error!("could not send goal request {}", result);
165            Err(Error::from_rcl_error(result))
166        }
167    }
168}
169
170pub fn make_action_client_untyped(
171    client: Weak<Mutex<WrappedActionClientUntyped>>,
172) -> ActionClientUntyped {
173    ActionClientUntyped { client }
174}
175
176pub type ResultSender = (uuid::Uuid, oneshot::Sender<(GoalStatus, Result<serde_json::Value>)>);
177pub struct WrappedActionClientUntyped {
178    pub action_type_support: UntypedActionSupport,
179    pub rcl_handle: rcl_action_client_t,
180    pub goal_response_channels: Vec<(i64, oneshot::Sender<(bool, builtin_interfaces::msg::Time)>)>,
181    pub cancel_response_channels:
182        Vec<(i64, oneshot::Sender<action_msgs::srv::CancelGoal::Response>)>,
183    pub feedback_senders: Vec<(uuid::Uuid, mpsc::Sender<Result<serde_json::Value>>)>,
184    pub result_requests: Vec<(i64, uuid::Uuid)>,
185    pub result_senders: Vec<ResultSender>,
186    pub goal_status: HashMap<uuid::Uuid, GoalStatus>,
187
188    pub poll_available_channels: Vec<oneshot::Sender<()>>,
189}
190
191impl WrappedActionClientUntyped {
192    pub fn get_goal_status(&self, uuid: &uuid::Uuid) -> GoalStatus {
193        *self.goal_status.get(uuid).unwrap_or(&GoalStatus::Unknown)
194    }
195
196    pub fn send_cancel_request(
197        &mut self, goal: &uuid::Uuid,
198    ) -> Result<impl Future<Output = Result<()>>> {
199        let msg = action_msgs::srv::CancelGoal::Request {
200            goal_info: action_msgs::msg::GoalInfo {
201                goal_id: unique_identifier_msgs::msg::UUID {
202                    uuid: goal.as_bytes().to_vec(),
203                },
204                ..action_msgs::msg::GoalInfo::default()
205            },
206        };
207        let native_msg = WrappedNativeMsg::<action_msgs::srv::CancelGoal::Request>::from(&msg);
208        let mut seq_no = 0i64;
209        let result = unsafe {
210            rcl_action_send_cancel_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no)
211        };
212
213        if result == RCL_RET_OK as i32 {
214            let (cancel_req_sender, cancel_req_receiver) =
215                oneshot::channel::<action_msgs::srv::CancelGoal::Response>();
216
217            self.cancel_response_channels
218                .push((seq_no, cancel_req_sender));
219            // instead of "canceled" we return invalid client.
220            let future = cancel_req_receiver
221                .map_err(|_| Error::RCL_RET_CLIENT_INVALID)
222                .map(|r| match r {
223                    Ok(r) => match r.return_code {
224                        e if e == action_msgs::srv::CancelGoal::Response::ERROR_NONE as i8 => {
225                            Ok(())
226                        }
227                        e if e == action_msgs::srv::CancelGoal::Response::ERROR_REJECTED as i8 => {
228                            Err(Error::GoalCancelRejected)
229                        }
230                        e if e
231                            == action_msgs::srv::CancelGoal::Response::ERROR_UNKNOWN_GOAL_ID
232                                as i8 =>
233                        {
234                            Err(Error::GoalCancelUnknownGoalID)
235                        }
236                        e if e
237                            == action_msgs::srv::CancelGoal::Response::ERROR_GOAL_TERMINATED
238                                as i8 =>
239                        {
240                            Err(Error::GoalCancelAlreadyTerminated)
241                        }
242                        x => panic!("unknown error code return from action server: {}", x),
243                    },
244                    Err(e) => Err(e),
245                });
246            Ok(future)
247        } else {
248            log::error!("could not send goal request {}", result);
249            Err(Error::from_rcl_error(result))
250        }
251    }
252}
253
254impl ActionClient_ for WrappedActionClientUntyped {
255    fn handle(&self) -> &rcl_action_client_t {
256        &self.rcl_handle
257    }
258
259    fn handle_goal_response(&mut self) {
260        let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
261        let mut response_msg = (self.action_type_support.make_goal_response_msg)();
262
263        let ret = unsafe {
264            rcl_action_take_goal_response(
265                &self.rcl_handle,
266                request_id.as_mut_ptr(),
267                response_msg.void_ptr_mut(),
268            )
269        };
270        if ret == RCL_RET_OK as i32 {
271            let request_id = unsafe { request_id.assume_init() };
272            if let Some(idx) = self
273                .goal_response_channels
274                .iter()
275                .position(|(id, _)| id == &request_id.sequence_number)
276            {
277                let (_, sender) = self.goal_response_channels.swap_remove(idx);
278                let (accept, stamp) =
279                    (self.action_type_support.destructure_goal_response_msg)(response_msg);
280                match sender.send((accept, stamp)) {
281                    Ok(()) => {}
282                    Err(e) => {
283                        log::debug!("error sending to action client: {:?}", e);
284                    }
285                }
286            } else {
287                let we_have: String = self
288                    .goal_response_channels
289                    .iter()
290                    .map(|(id, _)| id.to_string())
291                    .collect::<Vec<_>>()
292                    .join(",");
293                log::error!(
294                    "no such req id: {}, we have [{}], ignoring",
295                    request_id.sequence_number,
296                    we_have
297                );
298            }
299        }
300    }
301
302    fn handle_cancel_response(&mut self) {
303        let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
304        let mut response_msg = WrappedNativeMsg::<action_msgs::srv::CancelGoal::Response>::new();
305
306        let ret = unsafe {
307            rcl_action_take_cancel_response(
308                &self.rcl_handle,
309                request_id.as_mut_ptr(),
310                response_msg.void_ptr_mut(),
311            )
312        };
313        if ret == RCL_RET_OK as i32 {
314            let request_id = unsafe { request_id.assume_init() };
315            if let Some(idx) = self
316                .cancel_response_channels
317                .iter()
318                .position(|(id, _)| id == &request_id.sequence_number)
319            {
320                let (_, sender) = self.cancel_response_channels.swap_remove(idx);
321                let response = action_msgs::srv::CancelGoal::Response::from_native(&response_msg);
322                if let Err(e) = sender.send(response) {
323                    log::error!("warning: could not send cancel response msg ({:?})", e)
324                }
325            } else {
326                let we_have: String = self
327                    .goal_response_channels
328                    .iter()
329                    .map(|(id, _)| id.to_string())
330                    .collect::<Vec<_>>()
331                    .join(",");
332                log::error!(
333                    "no such req id: {}, we have [{}], ignoring",
334                    request_id.sequence_number,
335                    we_have
336                );
337            }
338        }
339    }
340
341    fn handle_feedback_msg(&mut self) {
342        let mut feedback_msg = (self.action_type_support.make_feedback_msg)();
343        let ret =
344            unsafe { rcl_action_take_feedback(&self.rcl_handle, feedback_msg.void_ptr_mut()) };
345        if ret == RCL_RET_OK as i32 {
346            let (uuid, feedback) =
347                (self.action_type_support.destructure_feedback_msg)(feedback_msg);
348            let msg_uuid = uuid_msg_to_uuid(&uuid);
349            if let Some((_, sender)) = self
350                .feedback_senders
351                .iter_mut()
352                .find(|(uuid, _)| uuid == &msg_uuid)
353            {
354                if let Err(e) = sender.try_send(feedback) {
355                    log::error!("warning: could not send feedback msg ({})", e)
356                }
357            }
358        }
359    }
360
361    fn handle_status_msg(&mut self) {
362        let mut status_array = WrappedNativeMsg::<action_msgs::msg::GoalStatusArray>::new();
363        let ret = unsafe { rcl_action_take_status(&self.rcl_handle, status_array.void_ptr_mut()) };
364        if ret == RCL_RET_OK as i32 {
365            let arr = action_msgs::msg::GoalStatusArray::from_native(&status_array);
366            for a in &arr.status_list {
367                let uuid = uuid_msg_to_uuid(&a.goal_info.goal_id);
368                if !self.result_senders.iter().any(|(suuid, _)| suuid == &uuid) {
369                    continue;
370                }
371                let status = GoalStatus::from_rcl(a.status);
372                *self.goal_status.entry(uuid).or_insert(GoalStatus::Unknown) = status;
373            }
374        }
375    }
376
377    fn handle_result_response(&mut self) {
378        let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
379        let mut response_msg = (self.action_type_support.make_result_response_msg)();
380        let ret = unsafe {
381            rcl_action_take_result_response(
382                &self.rcl_handle,
383                request_id.as_mut_ptr(),
384                response_msg.void_ptr_mut(),
385            )
386        };
387
388        if ret == RCL_RET_OK as i32 {
389            let request_id = unsafe { request_id.assume_init() };
390            if let Some(idx) = self
391                .result_requests
392                .iter()
393                .position(|(id, _)| id == &request_id.sequence_number)
394            {
395                let (_, uuid) = self.result_requests.swap_remove(idx);
396                if let Some(idx) = self
397                    .result_senders
398                    .iter()
399                    .position(|(suuid, _)| suuid == &uuid)
400                {
401                    let (_, sender) = self.result_senders.swap_remove(idx);
402                    let (status, result) =
403                        (self.action_type_support.destructure_result_response_msg)(response_msg);
404                    let status = GoalStatus::from_rcl(status);
405                    match sender.send((status, result)) {
406                        Ok(()) => {}
407                        Err(e) => {
408                            log::debug!("error sending result to action client: {:?}", e);
409                        }
410                    }
411                }
412            } else {
413                let we_have: String = self
414                    .result_requests
415                    .iter()
416                    .map(|(id, _)| id.to_string())
417                    .collect::<Vec<_>>()
418                    .join(",");
419                log::error!(
420                    "no such req id: {}, we have [{}], ignoring",
421                    request_id.sequence_number,
422                    we_have
423                );
424            }
425        }
426    }
427
428    fn send_result_request(&mut self, uuid: uuid::Uuid) {
429        let uuid_msg = unique_identifier_msgs::msg::UUID {
430            uuid: uuid.as_bytes().to_vec(),
431        };
432        let native_msg = (self.action_type_support.make_result_request_msg)(uuid_msg);
433        let mut seq_no = 0i64;
434        let result = unsafe {
435            rcl_action_send_result_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no)
436        };
437
438        if result == RCL_RET_OK as i32 {
439            self.result_requests.push((seq_no, uuid));
440        } else {
441            log::error!("could not send request {}", result);
442        }
443    }
444
445    fn register_poll_available(&mut self, s: oneshot::Sender<()>) {
446        self.poll_available_channels.push(s);
447    }
448
449    fn poll_available(&mut self, node: &mut rcl_node_t) {
450        if self.poll_available_channels.is_empty() {
451            return;
452        }
453        let available = action_server_available_helper(node, self.handle());
454        match available {
455            Ok(true) => {
456                // send ok and close channels
457                while let Some(sender) = self.poll_available_channels.pop() {
458                    let _res = sender.send(()); // we ignore if receiver dropped.
459                }
460            }
461            Ok(false) => {
462                // not available...
463            }
464            Err(_) => {
465                // error, close all channels
466                self.poll_available_channels.clear();
467            }
468        }
469    }
470
471    fn destroy(&mut self, node: &mut rcl_node_t) {
472        unsafe {
473            rcl_action_client_fini(&mut self.rcl_handle, node);
474        }
475    }
476}
477
478use crate::nodes::IsAvailablePollable;
479
480impl IsAvailablePollable for ActionClientUntyped {
481    fn register_poll_available(&self, sender: oneshot::Sender<()>) -> Result<()> {
482        let client = self
483            .client
484            .upgrade()
485            .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
486        let mut client = client.lock().unwrap();
487        client.register_poll_available(sender);
488        Ok(())
489    }
490}