1use futures::{
2 channel::{mpsc, oneshot},
3 future::{FutureExt, TryFutureExt},
4 stream::Stream,
5};
6use std::{
7 collections::HashMap,
8 future::Future,
9 mem::MaybeUninit,
10 sync::{Mutex, Weak},
11};
12
13use crate::{
14 action_clients::*,
15 action_common::*,
16 error::*,
17 msg_types::{
18 generated_msgs::{action_msgs, builtin_interfaces, unique_identifier_msgs},
19 *,
20 },
21};
22use r2r_actions::*;
23use r2r_rcl::*;
24unsafe impl Send for ActionClientUntyped {}
29
30#[derive(Clone)]
35pub struct ActionClientUntyped {
36 client: Weak<Mutex<WrappedActionClientUntyped>>,
37}
38
39unsafe impl Send for ActionClientGoalUntyped {}
40
41#[derive(Clone)]
45pub struct ActionClientGoalUntyped {
46 client: Weak<Mutex<WrappedActionClientUntyped>>,
47 pub uuid: uuid::Uuid,
48}
49
50impl ActionClientGoalUntyped {
51 pub fn get_status(&self) -> Result<GoalStatus> {
53 let client = self
54 .client
55 .upgrade()
56 .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
57 let client = client.lock().unwrap();
58
59 Ok(client.get_goal_status(&self.uuid))
60 }
61
62 pub fn cancel(&self) -> Result<impl Future<Output = Result<()>>> {
70 let client = self
72 .client
73 .upgrade()
74 .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
75 let mut client = client.lock().unwrap();
76
77 client.send_cancel_request(&self.uuid)
78 }
79}
80
81impl ActionClientUntyped {
82 pub fn send_goal_request(
89 &self,
90 goal: serde_json::Value, ) -> Result<
92 impl Future<
93 Output = Result<(
94 ActionClientGoalUntyped,
95 impl Future<Output = Result<(GoalStatus, Result<serde_json::Value>)>>, impl Stream<Item = Result<serde_json::Value>> + Unpin, )>,
98 >,
99 > {
100 let client = self
102 .client
103 .upgrade()
104 .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
105 let mut client = client.lock().unwrap();
106
107 let uuid = uuid::Uuid::new_v4();
108 let uuid_msg = unique_identifier_msgs::msg::UUID {
109 uuid: uuid.as_bytes().to_vec(),
110 };
111
112 let native_msg = (client.action_type_support.make_goal_request_msg)(uuid_msg, goal);
113
114 let mut seq_no = 0i64;
115 let result = unsafe {
116 rcl_action_send_goal_request(&client.rcl_handle, native_msg.void_ptr(), &mut seq_no)
117 };
118
119 let (goal_req_sender, goal_req_receiver) =
121 oneshot::channel::<(bool, builtin_interfaces::msg::Time)>();
122 let (feedback_sender, feedback_receiver) = mpsc::channel::<Result<serde_json::Value>>(10);
123 client.feedback_senders.push((uuid, feedback_sender));
124 let (result_sender, result_receiver) =
125 oneshot::channel::<(GoalStatus, Result<serde_json::Value>)>();
126 client.result_senders.push((uuid, result_sender));
127
128 if result == RCL_RET_OK as i32 {
129 client
130 .goal_response_channels
131 .push((seq_no, goal_req_sender));
132 let fut_client = Weak::clone(&self.client);
134 let future = goal_req_receiver
135 .map_err(|_| Error::RCL_RET_ACTION_CLIENT_INVALID)
136 .map(move |r| match r {
137 Ok((accepted, _stamp)) => {
138 if accepted {
139 {
141 let c = fut_client
142 .upgrade()
143 .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
144 let mut c = c.lock().unwrap();
145 c.send_result_request(uuid);
146 }
147
148 Ok((
149 ActionClientGoalUntyped {
150 client: fut_client,
151 uuid,
152 },
153 result_receiver.map_err(|_| Error::RCL_RET_ACTION_CLIENT_INVALID),
154 feedback_receiver,
155 ))
156 } else {
157 Err(Error::RCL_RET_ACTION_GOAL_REJECTED)
158 }
159 }
160 Err(e) => Err(e),
161 });
162 Ok(future)
163 } else {
164 log::error!("could not send goal request {}", result);
165 Err(Error::from_rcl_error(result))
166 }
167 }
168}
169
170pub fn make_action_client_untyped(
171 client: Weak<Mutex<WrappedActionClientUntyped>>,
172) -> ActionClientUntyped {
173 ActionClientUntyped { client }
174}
175
176pub type ResultSender = (uuid::Uuid, oneshot::Sender<(GoalStatus, Result<serde_json::Value>)>);
177pub struct WrappedActionClientUntyped {
178 pub action_type_support: UntypedActionSupport,
179 pub rcl_handle: rcl_action_client_t,
180 pub goal_response_channels: Vec<(i64, oneshot::Sender<(bool, builtin_interfaces::msg::Time)>)>,
181 pub cancel_response_channels:
182 Vec<(i64, oneshot::Sender<action_msgs::srv::CancelGoal::Response>)>,
183 pub feedback_senders: Vec<(uuid::Uuid, mpsc::Sender<Result<serde_json::Value>>)>,
184 pub result_requests: Vec<(i64, uuid::Uuid)>,
185 pub result_senders: Vec<ResultSender>,
186 pub goal_status: HashMap<uuid::Uuid, GoalStatus>,
187
188 pub poll_available_channels: Vec<oneshot::Sender<()>>,
189}
190
191impl WrappedActionClientUntyped {
192 pub fn get_goal_status(&self, uuid: &uuid::Uuid) -> GoalStatus {
193 *self.goal_status.get(uuid).unwrap_or(&GoalStatus::Unknown)
194 }
195
196 pub fn send_cancel_request(
197 &mut self, goal: &uuid::Uuid,
198 ) -> Result<impl Future<Output = Result<()>>> {
199 let msg = action_msgs::srv::CancelGoal::Request {
200 goal_info: action_msgs::msg::GoalInfo {
201 goal_id: unique_identifier_msgs::msg::UUID {
202 uuid: goal.as_bytes().to_vec(),
203 },
204 ..action_msgs::msg::GoalInfo::default()
205 },
206 };
207 let native_msg = WrappedNativeMsg::<action_msgs::srv::CancelGoal::Request>::from(&msg);
208 let mut seq_no = 0i64;
209 let result = unsafe {
210 rcl_action_send_cancel_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no)
211 };
212
213 if result == RCL_RET_OK as i32 {
214 let (cancel_req_sender, cancel_req_receiver) =
215 oneshot::channel::<action_msgs::srv::CancelGoal::Response>();
216
217 self.cancel_response_channels
218 .push((seq_no, cancel_req_sender));
219 let future = cancel_req_receiver
221 .map_err(|_| Error::RCL_RET_CLIENT_INVALID)
222 .map(|r| match r {
223 Ok(r) => match r.return_code {
224 e if e == action_msgs::srv::CancelGoal::Response::ERROR_NONE as i8 => {
225 Ok(())
226 }
227 e if e == action_msgs::srv::CancelGoal::Response::ERROR_REJECTED as i8 => {
228 Err(Error::GoalCancelRejected)
229 }
230 e if e
231 == action_msgs::srv::CancelGoal::Response::ERROR_UNKNOWN_GOAL_ID
232 as i8 =>
233 {
234 Err(Error::GoalCancelUnknownGoalID)
235 }
236 e if e
237 == action_msgs::srv::CancelGoal::Response::ERROR_GOAL_TERMINATED
238 as i8 =>
239 {
240 Err(Error::GoalCancelAlreadyTerminated)
241 }
242 x => panic!("unknown error code return from action server: {}", x),
243 },
244 Err(e) => Err(e),
245 });
246 Ok(future)
247 } else {
248 log::error!("could not send goal request {}", result);
249 Err(Error::from_rcl_error(result))
250 }
251 }
252}
253
254impl ActionClient_ for WrappedActionClientUntyped {
255 fn handle(&self) -> &rcl_action_client_t {
256 &self.rcl_handle
257 }
258
259 fn handle_goal_response(&mut self) {
260 let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
261 let mut response_msg = (self.action_type_support.make_goal_response_msg)();
262
263 let ret = unsafe {
264 rcl_action_take_goal_response(
265 &self.rcl_handle,
266 request_id.as_mut_ptr(),
267 response_msg.void_ptr_mut(),
268 )
269 };
270 if ret == RCL_RET_OK as i32 {
271 let request_id = unsafe { request_id.assume_init() };
272 if let Some(idx) = self
273 .goal_response_channels
274 .iter()
275 .position(|(id, _)| id == &request_id.sequence_number)
276 {
277 let (_, sender) = self.goal_response_channels.swap_remove(idx);
278 let (accept, stamp) =
279 (self.action_type_support.destructure_goal_response_msg)(response_msg);
280 match sender.send((accept, stamp)) {
281 Ok(()) => {}
282 Err(e) => {
283 log::debug!("error sending to action client: {:?}", e);
284 }
285 }
286 } else {
287 let we_have: String = self
288 .goal_response_channels
289 .iter()
290 .map(|(id, _)| id.to_string())
291 .collect::<Vec<_>>()
292 .join(",");
293 log::error!(
294 "no such req id: {}, we have [{}], ignoring",
295 request_id.sequence_number,
296 we_have
297 );
298 }
299 }
300 }
301
302 fn handle_cancel_response(&mut self) {
303 let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
304 let mut response_msg = WrappedNativeMsg::<action_msgs::srv::CancelGoal::Response>::new();
305
306 let ret = unsafe {
307 rcl_action_take_cancel_response(
308 &self.rcl_handle,
309 request_id.as_mut_ptr(),
310 response_msg.void_ptr_mut(),
311 )
312 };
313 if ret == RCL_RET_OK as i32 {
314 let request_id = unsafe { request_id.assume_init() };
315 if let Some(idx) = self
316 .cancel_response_channels
317 .iter()
318 .position(|(id, _)| id == &request_id.sequence_number)
319 {
320 let (_, sender) = self.cancel_response_channels.swap_remove(idx);
321 let response = action_msgs::srv::CancelGoal::Response::from_native(&response_msg);
322 if let Err(e) = sender.send(response) {
323 log::error!("warning: could not send cancel response msg ({:?})", e)
324 }
325 } else {
326 let we_have: String = self
327 .goal_response_channels
328 .iter()
329 .map(|(id, _)| id.to_string())
330 .collect::<Vec<_>>()
331 .join(",");
332 log::error!(
333 "no such req id: {}, we have [{}], ignoring",
334 request_id.sequence_number,
335 we_have
336 );
337 }
338 }
339 }
340
341 fn handle_feedback_msg(&mut self) {
342 let mut feedback_msg = (self.action_type_support.make_feedback_msg)();
343 let ret =
344 unsafe { rcl_action_take_feedback(&self.rcl_handle, feedback_msg.void_ptr_mut()) };
345 if ret == RCL_RET_OK as i32 {
346 let (uuid, feedback) =
347 (self.action_type_support.destructure_feedback_msg)(feedback_msg);
348 let msg_uuid = uuid_msg_to_uuid(&uuid);
349 if let Some((_, sender)) = self
350 .feedback_senders
351 .iter_mut()
352 .find(|(uuid, _)| uuid == &msg_uuid)
353 {
354 if let Err(e) = sender.try_send(feedback) {
355 log::error!("warning: could not send feedback msg ({})", e)
356 }
357 }
358 }
359 }
360
361 fn handle_status_msg(&mut self) {
362 let mut status_array = WrappedNativeMsg::<action_msgs::msg::GoalStatusArray>::new();
363 let ret = unsafe { rcl_action_take_status(&self.rcl_handle, status_array.void_ptr_mut()) };
364 if ret == RCL_RET_OK as i32 {
365 let arr = action_msgs::msg::GoalStatusArray::from_native(&status_array);
366 for a in &arr.status_list {
367 let uuid = uuid_msg_to_uuid(&a.goal_info.goal_id);
368 if !self.result_senders.iter().any(|(suuid, _)| suuid == &uuid) {
369 continue;
370 }
371 let status = GoalStatus::from_rcl(a.status);
372 *self.goal_status.entry(uuid).or_insert(GoalStatus::Unknown) = status;
373 }
374 }
375 }
376
377 fn handle_result_response(&mut self) {
378 let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
379 let mut response_msg = (self.action_type_support.make_result_response_msg)();
380 let ret = unsafe {
381 rcl_action_take_result_response(
382 &self.rcl_handle,
383 request_id.as_mut_ptr(),
384 response_msg.void_ptr_mut(),
385 )
386 };
387
388 if ret == RCL_RET_OK as i32 {
389 let request_id = unsafe { request_id.assume_init() };
390 if let Some(idx) = self
391 .result_requests
392 .iter()
393 .position(|(id, _)| id == &request_id.sequence_number)
394 {
395 let (_, uuid) = self.result_requests.swap_remove(idx);
396 if let Some(idx) = self
397 .result_senders
398 .iter()
399 .position(|(suuid, _)| suuid == &uuid)
400 {
401 let (_, sender) = self.result_senders.swap_remove(idx);
402 let (status, result) =
403 (self.action_type_support.destructure_result_response_msg)(response_msg);
404 let status = GoalStatus::from_rcl(status);
405 match sender.send((status, result)) {
406 Ok(()) => {}
407 Err(e) => {
408 log::debug!("error sending result to action client: {:?}", e);
409 }
410 }
411 }
412 } else {
413 let we_have: String = self
414 .result_requests
415 .iter()
416 .map(|(id, _)| id.to_string())
417 .collect::<Vec<_>>()
418 .join(",");
419 log::error!(
420 "no such req id: {}, we have [{}], ignoring",
421 request_id.sequence_number,
422 we_have
423 );
424 }
425 }
426 }
427
428 fn send_result_request(&mut self, uuid: uuid::Uuid) {
429 let uuid_msg = unique_identifier_msgs::msg::UUID {
430 uuid: uuid.as_bytes().to_vec(),
431 };
432 let native_msg = (self.action_type_support.make_result_request_msg)(uuid_msg);
433 let mut seq_no = 0i64;
434 let result = unsafe {
435 rcl_action_send_result_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no)
436 };
437
438 if result == RCL_RET_OK as i32 {
439 self.result_requests.push((seq_no, uuid));
440 } else {
441 log::error!("could not send request {}", result);
442 }
443 }
444
445 fn register_poll_available(&mut self, s: oneshot::Sender<()>) {
446 self.poll_available_channels.push(s);
447 }
448
449 fn poll_available(&mut self, node: &mut rcl_node_t) {
450 if self.poll_available_channels.is_empty() {
451 return;
452 }
453 let available = action_server_available_helper(node, self.handle());
454 match available {
455 Ok(true) => {
456 while let Some(sender) = self.poll_available_channels.pop() {
458 let _res = sender.send(()); }
460 }
461 Ok(false) => {
462 }
464 Err(_) => {
465 self.poll_available_channels.clear();
467 }
468 }
469 }
470
471 fn destroy(&mut self, node: &mut rcl_node_t) {
472 unsafe {
473 rcl_action_client_fini(&mut self.rcl_handle, node);
474 }
475 }
476}
477
478use crate::nodes::IsAvailablePollable;
479
480impl IsAvailablePollable for ActionClientUntyped {
481 fn register_poll_available(&self, sender: oneshot::Sender<()>) -> Result<()> {
482 let client = self
483 .client
484 .upgrade()
485 .ok_or(Error::RCL_RET_ACTION_CLIENT_INVALID)?;
486 let mut client = client.lock().unwrap();
487 client.register_poll_available(sender);
488 Ok(())
489 }
490}