arci_ros/
rosrust_utils.rs

1use std::{
2    sync::{Arc, Mutex},
3    time::{Duration, SystemTime},
4};
5
6use flume::{Receiver, Sender};
7use rosrust::Time;
8type MessageBuffer<T> = Arc<Mutex<Option<T>>>;
9
10fn set_message_buffer<T>(buffer: &MessageBuffer<T>, message: T) {
11    buffer.lock().unwrap().replace(message);
12}
13
14fn subscribe_with_message_buffer<T: rosrust::Message>(
15    topic: &str,
16    queue_size: usize,
17) -> (MessageBuffer<T>, rosrust::Subscriber) {
18    let buffer: MessageBuffer<T> = Arc::new(Mutex::new(None));
19    let buffer_for_callback = buffer.clone();
20    let subscriber = rosrust::subscribe(topic, queue_size, move |message: T| {
21        set_message_buffer(&buffer_for_callback, message);
22    })
23    .unwrap();
24    (buffer, subscriber)
25}
26
27pub struct SubscriberHandler<T> {
28    topic: String,
29    buffer: MessageBuffer<T>,
30    _subscriber: rosrust::Subscriber,
31}
32
33impl<T> SubscriberHandler<T>
34where
35    T: rosrust::Message,
36{
37    pub fn new(topic: &str, queue_size: usize) -> Self {
38        let (buffer, _subscriber) = subscribe_with_message_buffer::<T>(topic, queue_size);
39        Self {
40            topic: topic.to_string(),
41            buffer,
42            _subscriber,
43        }
44    }
45
46    pub fn take(&self) -> Result<Option<T>, arci::Error> {
47        Ok(self.buffer.lock().unwrap().take())
48    }
49
50    pub fn get(&self) -> Result<Option<T>, arci::Error> {
51        Ok(self.buffer.lock().unwrap().clone())
52    }
53
54    pub fn wait_message(&self, loop_millis: u64) {
55        while rosrust::is_ok() && self.get().unwrap().is_none() {
56            rosrust::ros_info!("Waiting {}", self.topic);
57            std::thread::sleep(Duration::from_millis(loop_millis));
58        }
59    }
60}
61
62pub fn wait_subscriber<T>(publisher: &rosrust::Publisher<T>)
63where
64    T: rosrust::Message,
65{
66    let rate = rosrust::rate(10.0);
67    while rosrust::is_ok() && publisher.subscriber_count() == 0 {
68        rate.sleep();
69    }
70    // one more to avoid `rostopic echo`
71    rate.sleep();
72}
73
74type ReceiverStampedBuffer<T> = Arc<Mutex<Receiver<(rosrust::Time, T)>>>;
75type SenderStampedBuffer<T> = Arc<Mutex<Sender<(rosrust::Time, T)>>>;
76
77pub struct ServiceHandler<T>
78where
79    T: rosrust::ServicePair,
80{
81    request_receiver: ReceiverStampedBuffer<T::Request>,
82    response_sender: SenderStampedBuffer<T::Response>,
83    _service_name: Arc<Mutex<String>>,
84    _server: rosrust::Service,
85}
86
87impl<T> ServiceHandler<T>
88where
89    T: rosrust::ServicePair,
90{
91    pub fn new(service_name: &str) -> Self {
92        let (request_sender, request_receiver) = flume::unbounded::<(rosrust::Time, T::Request)>();
93        let (response_sender, response_receiver) =
94            flume::unbounded::<(rosrust::Time, T::Response)>();
95
96        let request_sender = Arc::new(Mutex::new(request_sender));
97        let request_receiver = Arc::new(Mutex::new(request_receiver));
98        let response_sender = Arc::new(Mutex::new(response_sender));
99        let response_receiver = Arc::new(Mutex::new(response_receiver));
100
101        let _service_name = Arc::new(Mutex::new(service_name.to_string()));
102        let service_name_for_callback = _service_name.clone();
103
104        let _server = rosrust::service::<T, _>(&_service_name.lock().unwrap(), move |req| {
105            let req_time = rosrust::now();
106            request_sender
107                .lock()
108                .unwrap()
109                .send((req_time, req))
110                .unwrap();
111            let (res_time, response) = response_receiver.lock().unwrap().recv().unwrap();
112            if res_time == req_time {
113                Ok(response)
114            } else {
115                Err(format!(
116                    "Mismatch time stamp in ServiceHandler for {}",
117                    service_name_for_callback.lock().unwrap()
118                ))
119            }
120        })
121        .unwrap();
122        Self {
123            request_receiver,
124            response_sender,
125            _service_name,
126            _server,
127        }
128    }
129
130    pub fn get_request(&self, timeout_millis: u32) -> Option<(rosrust::Time, T::Request)> {
131        self.request_receiver
132            .lock()
133            .unwrap()
134            .recv_timeout(Duration::from_millis(timeout_millis as u64))
135            .ok()
136    }
137
138    pub fn set_response(&self, time: rosrust::Time, res: T::Response) {
139        self.response_sender
140            .lock()
141            .unwrap()
142            .send((time, res))
143            .unwrap();
144    }
145}
146
147#[derive(Debug)]
148pub struct ActionResultWait {
149    goal_id: String,
150    action_result_receiver: Receiver<Result<(), crate::Error>>,
151}
152
153impl ActionResultWait {
154    pub fn new(
155        goal_id: String,
156        action_result_receiver: Receiver<Result<(), crate::Error>>,
157    ) -> Self {
158        Self {
159            goal_id,
160            action_result_receiver,
161        }
162    }
163
164    pub fn wait(&mut self, timeout: Duration) -> Result<(), crate::Error> {
165        self.action_result_receiver
166            .recv_timeout(timeout)
167            .map_err(|_| crate::Error::ActionResultTimeout)?
168    }
169
170    pub fn goal_id(&self) -> &str {
171        &self.goal_id
172    }
173}
174
175#[macro_export]
176macro_rules! define_action_client {
177    ($name:ident, $namespace:path, $action_base:expr) => {
178        $crate::paste::item! {
179            pub struct $name {
180                goal_publisher: $crate::rosrust::Publisher<$namespace::[<$action_base ActionGoal>]>,
181                _result_subscriber: $crate::rosrust::Subscriber,
182                cancel_publisher: $crate::rosrust::Publisher<msg::actionlib_msgs::GoalID>,
183                goal_id_to_sender: std::sync::Arc<std::sync::Mutex<std::collections::HashMap<String, $crate::flume::Sender<std::result::Result<(), $crate::Error>>>>>
184            }
185            impl $name {
186                pub fn new(base_topic: &str, queue_size: usize) -> Self {
187                    use std::sync::{Arc, Mutex};
188                    use std::collections::HashMap;
189                    use $crate::Error;
190                    use $crate::{flume, rosrust};
191                    let goal_topic = format!("{}/goal", base_topic);
192                    let cancel_topic = format!("{}/cancel", base_topic);
193                    let goal_publisher = rosrust::publish(&goal_topic, queue_size).unwrap();
194                    let goal_id_to_sender: Arc<Mutex<HashMap<String, flume::Sender<std::result::Result<(), Error>> >>> = Arc::new(Mutex::new(HashMap::new()));
195                    let goal_id_to_sender_cloned = goal_id_to_sender.clone();
196                    let _result_subscriber = rosrust::subscribe(
197                        &format!("{}/result", base_topic),
198                        queue_size, move |result: $namespace::[<$action_base ActionResult>]| {
199                            if let Some(sender) = goal_id_to_sender_cloned.lock().unwrap().remove(&result.status.goal_id.id) {
200                                let _ = sender.send(
201                                // TODO more detailed error / status handling
202                                match result.status.status {
203                                    msg::actionlib_msgs::GoalStatus::SUCCEEDED => {
204                                        Ok(())
205                                    },
206                                    msg::actionlib_msgs::GoalStatus::PREEMPTED => {
207                                        Err(Error::ActionResultPreempted(format!("{:?}", result)))
208                                    },
209                                    _ => {
210                                        Err(Error::ActionResultNotSuccess(format!("{:?}", result)))
211                                    }
212                                });
213                            }
214                        }
215                    ).unwrap();
216                    let cancel_publisher =
217                        rosrust::publish(&cancel_topic, queue_size).unwrap();
218                    rosrust::ros_info!("Waiting {} ....", goal_topic);
219                    $crate::wait_subscriber(&goal_publisher);
220                    rosrust::ros_info!("Waiting {} Done", goal_topic);
221                    rosrust::ros_info!("Waiting {} ....", cancel_topic);
222                    $crate::wait_subscriber(&cancel_publisher);
223                    rosrust::ros_info!("Waiting {} Done", cancel_topic);
224                    Self {
225                        goal_publisher,
226                        _result_subscriber,
227                        cancel_publisher,
228                        goal_id_to_sender
229                     }
230                }
231                #[allow(dead_code)]
232                pub fn send_goal_and_wait(&self, goal: $namespace::[<$action_base Goal>], timeout: std::time::Duration) -> Result<(), $crate::Error> {
233                    self.send_goal(goal)?.wait(timeout)
234                }
235                #[allow(clippy::field_reassign_with_default)]
236                pub fn send_goal(&self, goal: $namespace::[<$action_base Goal>]) -> Result<$crate::ActionResultWait, $crate::Error> {
237                    use $crate::{flume, rosrust};
238                    let mut action_goal = <$namespace::[<$action_base ActionGoal>]>::default();
239                    action_goal.goal = goal;
240                    let goal_id = format!("{}-{}", rosrust::name(), rosrust::now().seconds());
241                    action_goal.goal_id.id = goal_id.clone();
242                    let (sender, receiver) = flume::unbounded();
243                    self.goal_id_to_sender.lock().unwrap().insert(goal_id.clone(), sender);
244                    if self.goal_publisher.send(action_goal).is_err() {
245                        let _ = self.goal_id_to_sender.lock().unwrap().remove(&goal_id);
246                        return Err($crate::Error::ActionGoalSendingFailure);
247                    }
248                    Ok($crate::ActionResultWait::new(goal_id, receiver))
249                }
250                #[allow(dead_code)]
251                pub fn cancel_goal(&self, goal_id: &str) -> Result<(), $crate::Error> {
252                    let mut goal_id_to_sender = self.goal_id_to_sender.lock().unwrap();
253                    if goal_id.is_empty() {
254                        goal_id_to_sender.clear();
255                    } else {
256                        let _ = goal_id_to_sender.remove(goal_id);
257                    }
258                    if self.cancel_publisher.send(
259                    msg::actionlib_msgs::GoalID { id: goal_id.to_owned(), ..Default::default()}).is_err() {
260                        return Err($crate::Error::ActionCancelSendingFailure);
261                    }
262                    Ok(())
263                }
264                #[allow(dead_code)]
265                pub fn cancel_all_goal(&self) -> Result<(), $crate::Error> {
266                    self.cancel_goal("")
267                }
268            }
269        }
270    };
271}
272
273pub fn convert_system_time_to_ros_time(time: &SystemTime) -> Time {
274    let ros_now = rosrust::now();
275    let system_now = SystemTime::now();
276
277    // compare time to avoid SystemTimeError
278    // https://doc.rust-lang.org/std/time/struct.SystemTime.html#method.duration_since
279    if system_now < *time {
280        Time::from_nanos(
281            time.duration_since(system_now).unwrap().as_nanos() as i64 + ros_now.nanos(),
282        )
283    } else {
284        Time::from_nanos(
285            ros_now.nanos() - system_now.duration_since(*time).unwrap().as_nanos() as i64,
286        )
287    }
288}
289
290pub fn convert_ros_time_to_system_time(time: &Time) -> SystemTime {
291    let ros_now = rosrust::now();
292    let system_now = SystemTime::now();
293    let ros_time_nanos = time.nanos() as u64;
294    let ros_now_nanos = ros_now.nanos() as u64;
295    // from_nanos needs u64 as input
296    // https://doc.rust-lang.org/stable/std/time/struct.Duration.html#method.from_nanos
297    if ros_now_nanos < ros_time_nanos {
298        system_now
299            .checked_add(std::time::Duration::from_nanos(
300                ros_time_nanos - ros_now_nanos,
301            ))
302            .unwrap()
303    } else {
304        system_now
305            .checked_sub(std::time::Duration::from_nanos(
306                ros_now_nanos - ros_time_nanos,
307            ))
308            .unwrap()
309    }
310}
311
312/// # subscribe ROS message helper
313///
314/// using for inspect specific massage type.
315/// Message is displayed on screen and sent to `mpsc receiver`
316///
317/// # Panic!
318///
319/// If subscriber can't be construct, this function is panic.
320/// Or if ``Roscore`` is not up, could be panic.
321///
322pub fn subscribe_with_channel<T: rosrust::Message>(
323    topic_name: &str,
324    queue_size: usize,
325) -> (flume::Receiver<T>, rosrust::Subscriber) {
326    let (tx, rx) = flume::unbounded::<T>();
327
328    let sub = rosrust::subscribe(topic_name, queue_size, move |v: T| {
329        tx.send(v).unwrap();
330    })
331    .unwrap();
332
333    (rx, sub)
334}