1use futures::{
2 channel::{mpsc, oneshot},
3 future::{FutureExt, TryFutureExt},
4 stream::Stream,
5};
6use std::{
7 collections::HashMap,
8 ffi::CString,
9 future::Future,
10 mem::MaybeUninit,
11 sync::{Mutex, Weak},
12};
13
14use crate::{
15 action_common::*,
16 error::*,
17 msg_types::{
18 generated_msgs::{action_msgs, builtin_interfaces, unique_identifier_msgs},
19 *,
20 },
21};
22use r2r_actions::*;
23use r2r_rcl::*;
24
25unsafe impl<T> Send for ActionClient<T> where T: WrappedActionTypeSupport {}
26
27#[derive(Clone)]
31pub struct ActionClient<T>
32where
33 T: WrappedActionTypeSupport,
34{
35 client: Weak<Mutex<WrappedActionClient<T>>>,
36}
37
38unsafe impl<T> Send for ActionClientGoal<T> where T: WrappedActionTypeSupport {}
39
40#[derive(Clone)]
44pub struct ActionClientGoal<T>
45where
46 T: WrappedActionTypeSupport,
47{
48 client: Weak<Mutex<WrappedActionClient<T>>>,
49 pub uuid: uuid::Uuid,
50}
51
52impl<T: 'static> ActionClientGoal<T>
53where
54 T: WrappedActionTypeSupport,
55{
56 pub fn get_status(&self) -> Result<GoalStatus> {
58 let client = self
59 .client
60 .upgrade()
61 .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
62 let client = client.lock().unwrap();
63
64 Ok(client.get_goal_status(&self.uuid))
65 }
66
67 pub fn cancel(&self) -> Result<impl Future<Output = Result<()>>> {
75 let client = self
77 .client
78 .upgrade()
79 .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
80 let mut client = client.lock().unwrap();
81
82 client.send_cancel_request(&self.uuid)
83 }
84}
85
86impl<T: 'static> ActionClient<T>
87where
88 T: WrappedActionTypeSupport,
89{
90 pub fn send_goal_request(
97 &self, goal: T::Goal,
98 ) -> Result<
99 impl Future<
100 Output = Result<(
101 ActionClientGoal<T>,
102 impl Future<Output = Result<(GoalStatus, T::Result)>>,
103 impl Stream<Item = T::Feedback> + Unpin,
104 )>,
105 >,
106 >
107 where
108 T: WrappedActionTypeSupport,
109 {
110 let client = self
112 .client
113 .upgrade()
114 .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
115 let mut client = client.lock().unwrap();
116
117 let uuid = uuid::Uuid::new_v4();
118 let uuid_msg = unique_identifier_msgs::msg::UUID {
119 uuid: uuid.as_bytes().to_vec(),
120 };
121 let request_msg = T::make_goal_request_msg(uuid_msg, goal);
122 let native_msg = WrappedNativeMsg::<
123 <<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Request,
124 >::from(&request_msg);
125 let mut seq_no = 0i64;
126 let result = unsafe {
127 rcl_action_send_goal_request(&client.rcl_handle, native_msg.void_ptr(), &mut seq_no)
128 };
129
130 let (goal_req_sender, goal_req_receiver) =
132 oneshot::channel::<(bool, builtin_interfaces::msg::Time)>();
133 let (feedback_sender, feedback_receiver) = mpsc::channel::<T::Feedback>(10);
134 client.feedback_senders.push((uuid, feedback_sender));
135 let (result_sender, result_receiver) = oneshot::channel::<(GoalStatus, T::Result)>();
136 client.result_senders.push((uuid, result_sender));
137
138 if result == RCL_RET_OK as i32 {
139 client
140 .goal_response_channels
141 .push((seq_no, goal_req_sender));
142 let fut_client = Weak::clone(&self.client);
144 let future = goal_req_receiver
145 .map_err(|_| Error::RCL_RET_ACTION_CLIENT_INVALID)
146 .map(move |r| match r {
147 Ok((accepted, _stamp)) => {
148 if accepted {
149 {
151 let c = fut_client
152 .upgrade()
153 .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
154 let mut c = c.lock().unwrap();
155 c.send_result_request(uuid);
156 }
157
158 Ok((
159 ActionClientGoal {
160 client: fut_client,
161 uuid,
162 },
163 result_receiver.map_err(|_| Error::RCL_RET_ACTION_CLIENT_INVALID),
164 feedback_receiver,
165 ))
166 } else {
167 Err(Error::RCL_RET_ACTION_GOAL_REJECTED)
168 }
169 }
170 Err(e) => Err(e),
171 });
172 Ok(future)
173 } else {
174 log::error!("could not send goal request {}", result);
175 Err(Error::from_rcl_error(result))
176 }
177 }
178}
179
180pub fn make_action_client<T>(client: Weak<Mutex<WrappedActionClient<T>>>) -> ActionClient<T>
181where
182 T: WrappedActionTypeSupport,
183{
184 ActionClient { client }
185}
186
187pub type ResultSender<R> = (uuid::Uuid, oneshot::Sender<(GoalStatus, R)>);
188pub struct WrappedActionClient<T>
189where
190 T: WrappedActionTypeSupport,
191{
192 pub rcl_handle: rcl_action_client_t,
193 pub goal_response_channels: Vec<(i64, oneshot::Sender<(bool, builtin_interfaces::msg::Time)>)>,
194 pub cancel_response_channels:
195 Vec<(i64, oneshot::Sender<action_msgs::srv::CancelGoal::Response>)>,
196 pub feedback_senders: Vec<(uuid::Uuid, mpsc::Sender<T::Feedback>)>,
197 pub result_requests: Vec<(i64, uuid::Uuid)>,
198 pub result_senders: Vec<ResultSender<T::Result>>,
199 pub goal_status: HashMap<uuid::Uuid, GoalStatus>,
200
201 pub poll_available_channels: Vec<oneshot::Sender<()>>,
202}
203
204pub trait ActionClient_ {
205 fn handle(&self) -> &rcl_action_client_t;
206 fn destroy(&mut self, node: &mut rcl_node_t);
207
208 fn handle_goal_response(&mut self);
209 fn handle_cancel_response(&mut self);
210 fn handle_feedback_msg(&mut self);
211 fn handle_status_msg(&mut self);
212 fn handle_result_response(&mut self);
213
214 fn send_result_request(&mut self, uuid: uuid::Uuid);
215
216 fn register_poll_available(&mut self, s: oneshot::Sender<()>);
217 fn poll_available(&mut self, node: &mut rcl_node_t);
218}
219
220impl<T> WrappedActionClient<T>
221where
222 T: WrappedActionTypeSupport,
223{
224 pub fn get_goal_status(&self, uuid: &uuid::Uuid) -> GoalStatus {
225 *self.goal_status.get(uuid).unwrap_or(&GoalStatus::Unknown)
226 }
227
228 pub fn send_cancel_request(
229 &mut self, goal: &uuid::Uuid,
230 ) -> Result<impl Future<Output = Result<()>>>
231 where
232 T: WrappedActionTypeSupport,
233 {
234 let msg = action_msgs::srv::CancelGoal::Request {
235 goal_info: action_msgs::msg::GoalInfo {
236 goal_id: unique_identifier_msgs::msg::UUID {
237 uuid: goal.as_bytes().to_vec(),
238 },
239 ..action_msgs::msg::GoalInfo::default()
240 },
241 };
242 let native_msg = WrappedNativeMsg::<action_msgs::srv::CancelGoal::Request>::from(&msg);
243 let mut seq_no = 0i64;
244 let result = unsafe {
245 rcl_action_send_cancel_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no)
246 };
247
248 if result == RCL_RET_OK as i32 {
249 let (cancel_req_sender, cancel_req_receiver) =
250 oneshot::channel::<action_msgs::srv::CancelGoal::Response>();
251
252 self.cancel_response_channels
253 .push((seq_no, cancel_req_sender));
254 let future = cancel_req_receiver
256 .map_err(|_| Error::RCL_RET_CLIENT_INVALID)
257 .map(|r| match r {
258 Ok(r) => match r.return_code {
259 e if e == action_msgs::srv::CancelGoal::Response::ERROR_NONE as i8 => {
260 Ok(())
261 }
262 e if e == action_msgs::srv::CancelGoal::Response::ERROR_REJECTED as i8 => {
263 Err(Error::GoalCancelRejected)
264 }
265 e if e
266 == action_msgs::srv::CancelGoal::Response::ERROR_UNKNOWN_GOAL_ID
267 as i8 =>
268 {
269 Err(Error::GoalCancelUnknownGoalID)
270 }
271 e if e
272 == action_msgs::srv::CancelGoal::Response::ERROR_GOAL_TERMINATED
273 as i8 =>
274 {
275 Err(Error::GoalCancelAlreadyTerminated)
276 }
277 x => panic!("unknown error code return from action server: {}", x),
278 },
279 Err(e) => Err(e),
280 });
281 Ok(future)
282 } else {
283 log::error!("could not send goal request {}", result);
284 Err(Error::from_rcl_error(result))
285 }
286 }
287}
288
289impl<T: 'static> ActionClient_ for WrappedActionClient<T>
290where
291 T: WrappedActionTypeSupport,
292{
293 fn handle(&self) -> &rcl_action_client_t {
294 &self.rcl_handle
295 }
296
297 fn handle_goal_response(&mut self) {
298 let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
299 let mut response_msg = WrappedNativeMsg::<
300 <<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response,
301 >::new();
302
303 let ret = unsafe {
304 rcl_action_take_goal_response(
305 &self.rcl_handle,
306 request_id.as_mut_ptr(),
307 response_msg.void_ptr_mut(),
308 )
309 };
310 if ret == RCL_RET_OK as i32 {
311 let request_id = unsafe { request_id.assume_init() };
312 if let Some(idx) = self
313 .goal_response_channels
314 .iter()
315 .position(|(id, _)| id == &request_id.sequence_number)
316 {
317 let (_, sender) = self.goal_response_channels.swap_remove(idx);
318 let response = <<T as WrappedActionTypeSupport>::SendGoal as WrappedServiceTypeSupport>::Response::from_native(&response_msg);
319 let (accept, stamp) = T::destructure_goal_response_msg(response);
320 match sender.send((accept, stamp)) {
321 Ok(()) => {}
322 Err(e) => {
323 log::debug!("error sending to action client: {:?}", e);
324 }
325 }
326 } else {
327 let we_have: String = self
328 .goal_response_channels
329 .iter()
330 .map(|(id, _)| id.to_string())
331 .collect::<Vec<_>>()
332 .join(",");
333 log::error!(
334 "no such req id: {}, we have [{}], ignoring",
335 request_id.sequence_number,
336 we_have
337 );
338 }
339 }
340 }
341
342 fn handle_cancel_response(&mut self) {
343 let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
344 let mut response_msg = WrappedNativeMsg::<action_msgs::srv::CancelGoal::Response>::new();
345
346 let ret = unsafe {
347 rcl_action_take_cancel_response(
348 &self.rcl_handle,
349 request_id.as_mut_ptr(),
350 response_msg.void_ptr_mut(),
351 )
352 };
353 if ret == RCL_RET_OK as i32 {
354 let request_id = unsafe { request_id.assume_init() };
355 if let Some(idx) = self
356 .cancel_response_channels
357 .iter()
358 .position(|(id, _)| id == &request_id.sequence_number)
359 {
360 let (_, sender) = self.cancel_response_channels.swap_remove(idx);
361 let response = action_msgs::srv::CancelGoal::Response::from_native(&response_msg);
362 if let Err(e) = sender.send(response) {
363 log::error!("warning: could not send cancel response msg ({:?})", e)
364 }
365 } else {
366 let we_have: String = self
367 .goal_response_channels
368 .iter()
369 .map(|(id, _)| id.to_string())
370 .collect::<Vec<_>>()
371 .join(",");
372 log::error!(
373 "no such req id: {}, we have [{}], ignoring",
374 request_id.sequence_number,
375 we_have
376 );
377 }
378 }
379 }
380
381 fn handle_feedback_msg(&mut self) {
382 let mut feedback_msg = WrappedNativeMsg::<T::FeedbackMessage>::new();
383 let ret =
384 unsafe { rcl_action_take_feedback(&self.rcl_handle, feedback_msg.void_ptr_mut()) };
385 if ret == RCL_RET_OK as i32 {
386 let msg = T::FeedbackMessage::from_native(&feedback_msg);
387 let (uuid, feedback) = T::destructure_feedback_msg(msg);
388 let msg_uuid = uuid_msg_to_uuid(&uuid);
389 if let Some((_, sender)) = self
390 .feedback_senders
391 .iter_mut()
392 .find(|(uuid, _)| uuid == &msg_uuid)
393 {
394 if let Err(e) = sender.try_send(feedback) {
395 log::error!("warning: could not send feedback msg ({})", e)
396 }
397 }
398 }
399 }
400
401 fn handle_status_msg(&mut self) {
402 let mut status_array = WrappedNativeMsg::<action_msgs::msg::GoalStatusArray>::new();
403 let ret = unsafe { rcl_action_take_status(&self.rcl_handle, status_array.void_ptr_mut()) };
404 if ret == RCL_RET_OK as i32 {
405 let arr = action_msgs::msg::GoalStatusArray::from_native(&status_array);
406 for a in &arr.status_list {
407 let uuid = uuid_msg_to_uuid(&a.goal_info.goal_id);
408 if !self.result_senders.iter().any(|(suuid, _)| suuid == &uuid) {
409 continue;
410 }
411 let status = GoalStatus::from_rcl(a.status);
412 *self.goal_status.entry(uuid).or_insert(GoalStatus::Unknown) = status;
413 }
414 }
415 }
416
417 fn handle_result_response(&mut self) {
418 let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
419 let mut response_msg = WrappedNativeMsg::<
420 <<T as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Response,
421 >::new();
422
423 let ret = unsafe {
424 rcl_action_take_result_response(
425 &self.rcl_handle,
426 request_id.as_mut_ptr(),
427 response_msg.void_ptr_mut(),
428 )
429 };
430
431 if ret == RCL_RET_OK as i32 {
432 let request_id = unsafe { request_id.assume_init() };
433 if let Some(idx) = self
434 .result_requests
435 .iter()
436 .position(|(id, _)| id == &request_id.sequence_number)
437 {
438 let (_, uuid) = self.result_requests.swap_remove(idx);
439 if let Some(idx) = self
440 .result_senders
441 .iter()
442 .position(|(suuid, _)| suuid == &uuid)
443 {
444 let (_, sender) = self.result_senders.swap_remove(idx);
445 let response = <<T as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Response::from_native(&response_msg);
446 let (status, result) = T::destructure_result_response_msg(response);
447 let status = GoalStatus::from_rcl(status);
448 match sender.send((status, result)) {
449 Ok(()) => {}
450 Err(e) => {
451 log::debug!("error sending result to action client: {:?}", e);
452 }
453 }
454 }
455 } else {
456 let we_have: String = self
457 .result_requests
458 .iter()
459 .map(|(id, _)| id.to_string())
460 .collect::<Vec<_>>()
461 .join(",");
462 log::error!(
463 "no such req id: {}, we have [{}], ignoring",
464 request_id.sequence_number,
465 we_have
466 );
467 }
468 }
469 }
470
471 fn send_result_request(&mut self, uuid: uuid::Uuid) {
472 let uuid_msg = unique_identifier_msgs::msg::UUID {
473 uuid: uuid.as_bytes().to_vec(),
474 };
475 let request_msg = T::make_result_request_msg(uuid_msg);
476 let native_msg = WrappedNativeMsg::<
477 <<T as WrappedActionTypeSupport>::GetResult as WrappedServiceTypeSupport>::Request,
478 >::from(&request_msg);
479 let mut seq_no = 0i64;
480 let result = unsafe {
481 rcl_action_send_result_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no)
482 };
483
484 if result == RCL_RET_OK as i32 {
485 self.result_requests.push((seq_no, uuid));
486 } else {
487 log::error!("could not send request {}", result);
488 }
489 }
490
491 fn register_poll_available(&mut self, s: oneshot::Sender<()>) {
492 self.poll_available_channels.push(s);
493 }
494
495 fn poll_available(&mut self, node: &mut rcl_node_t) {
496 if self.poll_available_channels.is_empty() {
497 return;
498 }
499 let available = action_server_available_helper(node, self.handle());
500 match available {
501 Ok(true) => {
502 while let Some(sender) = self.poll_available_channels.pop() {
504 let _res = sender.send(()); }
506 }
507 Ok(false) => {
508 }
510 Err(_) => {
511 self.poll_available_channels.clear();
513 }
514 }
515 }
516
517 fn destroy(&mut self, node: &mut rcl_node_t) {
518 unsafe {
519 rcl_action_client_fini(&mut self.rcl_handle, node);
520 }
521 }
522}
523
524pub fn create_action_client_helper(
525 node: &mut rcl_node_t, action_name: &str, action_ts: *const rosidl_action_type_support_t,
526) -> Result<rcl_action_client_t> {
527 let mut client_handle = unsafe { rcl_action_get_zero_initialized_client() };
528 let action_name_c_string =
529 CString::new(action_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
530
531 let result = unsafe {
532 let client_options = rcl_action_client_get_default_options();
533 rcl_action_client_init(
534 &mut client_handle,
535 node,
536 action_ts,
537 action_name_c_string.as_ptr(),
538 &client_options,
539 )
540 };
541 if result == RCL_RET_OK as i32 {
542 Ok(client_handle)
543 } else {
544 Err(Error::from_rcl_error(result))
545 }
546}
547
548pub fn action_client_get_num_waits(
549 rcl_handle: &rcl_action_client_t, num_subs: &mut usize, num_gc: &mut usize,
550 num_timers: &mut usize, num_clients: &mut usize, num_services: &mut usize,
551) -> Result<()> {
552 unsafe {
553 let result = rcl_action_client_wait_set_get_num_entities(
554 rcl_handle,
555 num_subs,
556 num_gc,
557 num_timers,
558 num_clients,
559 num_services,
560 );
561 if result == RCL_RET_OK as i32 {
562 Ok(())
563 } else {
564 Err(Error::from_rcl_error(result))
565 }
566 }
567}
568
569use crate::nodes::IsAvailablePollable;
570
571impl<T: 'static> IsAvailablePollable for ActionClient<T>
572where
573 T: WrappedActionTypeSupport,
574{
575 fn register_poll_available(&self, sender: oneshot::Sender<()>) -> Result<()> {
576 let client = self
577 .client
578 .upgrade()
579 .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
580 let mut client = client.lock().unwrap();
581 client.register_poll_available(sender);
582 Ok(())
583 }
584}
585
586pub fn action_server_available_helper(
587 node: &rcl_node_t, client: &rcl_action_client_t,
588) -> Result<bool> {
589 let mut avail = false;
590 let result = unsafe { rcl_action_server_is_available(node, client, &mut avail) };
591
592 if result == RCL_RET_OK as i32 {
593 Ok(avail)
594 } else {
595 log::error!("could not check if action server is available {}", result);
596 Err(Error::from_rcl_error(result))
597 }
598}