1use std::{
2 sync::{Arc, Mutex},
3 time::{Duration, SystemTime},
4};
5
6use flume::{Receiver, Sender};
7use rosrust::Time;
8type MessageBuffer<T> = Arc<Mutex<Option<T>>>;
9
10fn set_message_buffer<T>(buffer: &MessageBuffer<T>, message: T) {
11 buffer.lock().unwrap().replace(message);
12}
13
14fn subscribe_with_message_buffer<T: rosrust::Message>(
15 topic: &str,
16 queue_size: usize,
17) -> (MessageBuffer<T>, rosrust::Subscriber) {
18 let buffer: MessageBuffer<T> = Arc::new(Mutex::new(None));
19 let buffer_for_callback = buffer.clone();
20 let subscriber = rosrust::subscribe(topic, queue_size, move |message: T| {
21 set_message_buffer(&buffer_for_callback, message);
22 })
23 .unwrap();
24 (buffer, subscriber)
25}
26
27pub struct SubscriberHandler<T> {
28 topic: String,
29 buffer: MessageBuffer<T>,
30 _subscriber: rosrust::Subscriber,
31}
32
33impl<T> SubscriberHandler<T>
34where
35 T: rosrust::Message,
36{
37 pub fn new(topic: &str, queue_size: usize) -> Self {
38 let (buffer, _subscriber) = subscribe_with_message_buffer::<T>(topic, queue_size);
39 Self {
40 topic: topic.to_string(),
41 buffer,
42 _subscriber,
43 }
44 }
45
46 pub fn take(&self) -> Result<Option<T>, arci::Error> {
47 Ok(self.buffer.lock().unwrap().take())
48 }
49
50 pub fn get(&self) -> Result<Option<T>, arci::Error> {
51 Ok(self.buffer.lock().unwrap().clone())
52 }
53
54 pub fn wait_message(&self, loop_millis: u64) {
55 while rosrust::is_ok() && self.get().unwrap().is_none() {
56 rosrust::ros_info!("Waiting {}", self.topic);
57 std::thread::sleep(Duration::from_millis(loop_millis));
58 }
59 }
60}
61
62pub fn wait_subscriber<T>(publisher: &rosrust::Publisher<T>)
63where
64 T: rosrust::Message,
65{
66 let rate = rosrust::rate(10.0);
67 while rosrust::is_ok() && publisher.subscriber_count() == 0 {
68 rate.sleep();
69 }
70 rate.sleep();
72}
73
74type ReceiverStampedBuffer<T> = Arc<Mutex<Receiver<(rosrust::Time, T)>>>;
75type SenderStampedBuffer<T> = Arc<Mutex<Sender<(rosrust::Time, T)>>>;
76
77pub struct ServiceHandler<T>
78where
79 T: rosrust::ServicePair,
80{
81 request_receiver: ReceiverStampedBuffer<T::Request>,
82 response_sender: SenderStampedBuffer<T::Response>,
83 _service_name: Arc<Mutex<String>>,
84 _server: rosrust::Service,
85}
86
87impl<T> ServiceHandler<T>
88where
89 T: rosrust::ServicePair,
90{
91 pub fn new(service_name: &str) -> Self {
92 let (request_sender, request_receiver) = flume::unbounded::<(rosrust::Time, T::Request)>();
93 let (response_sender, response_receiver) =
94 flume::unbounded::<(rosrust::Time, T::Response)>();
95
96 let request_sender = Arc::new(Mutex::new(request_sender));
97 let request_receiver = Arc::new(Mutex::new(request_receiver));
98 let response_sender = Arc::new(Mutex::new(response_sender));
99 let response_receiver = Arc::new(Mutex::new(response_receiver));
100
101 let _service_name = Arc::new(Mutex::new(service_name.to_string()));
102 let service_name_for_callback = _service_name.clone();
103
104 let _server = rosrust::service::<T, _>(&_service_name.lock().unwrap(), move |req| {
105 let req_time = rosrust::now();
106 request_sender
107 .lock()
108 .unwrap()
109 .send((req_time, req))
110 .unwrap();
111 let (res_time, response) = response_receiver.lock().unwrap().recv().unwrap();
112 if res_time == req_time {
113 Ok(response)
114 } else {
115 Err(format!(
116 "Mismatch time stamp in ServiceHandler for {}",
117 service_name_for_callback.lock().unwrap()
118 ))
119 }
120 })
121 .unwrap();
122 Self {
123 request_receiver,
124 response_sender,
125 _service_name,
126 _server,
127 }
128 }
129
130 pub fn get_request(&self, timeout_millis: u32) -> Option<(rosrust::Time, T::Request)> {
131 self.request_receiver
132 .lock()
133 .unwrap()
134 .recv_timeout(Duration::from_millis(timeout_millis as u64))
135 .ok()
136 }
137
138 pub fn set_response(&self, time: rosrust::Time, res: T::Response) {
139 self.response_sender
140 .lock()
141 .unwrap()
142 .send((time, res))
143 .unwrap();
144 }
145}
146
147#[derive(Debug)]
148pub struct ActionResultWait {
149 goal_id: String,
150 action_result_receiver: Receiver<Result<(), crate::Error>>,
151}
152
153impl ActionResultWait {
154 pub fn new(
155 goal_id: String,
156 action_result_receiver: Receiver<Result<(), crate::Error>>,
157 ) -> Self {
158 Self {
159 goal_id,
160 action_result_receiver,
161 }
162 }
163
164 pub fn wait(&mut self, timeout: Duration) -> Result<(), crate::Error> {
165 self.action_result_receiver
166 .recv_timeout(timeout)
167 .map_err(|_| crate::Error::ActionResultTimeout)?
168 }
169
170 pub fn goal_id(&self) -> &str {
171 &self.goal_id
172 }
173}
174
175#[macro_export]
176macro_rules! define_action_client {
177 ($name:ident, $namespace:path, $action_base:expr) => {
178 $crate::paste::item! {
179 pub struct $name {
180 goal_publisher: $crate::rosrust::Publisher<$namespace::[<$action_base ActionGoal>]>,
181 _result_subscriber: $crate::rosrust::Subscriber,
182 cancel_publisher: $crate::rosrust::Publisher<msg::actionlib_msgs::GoalID>,
183 goal_id_to_sender: std::sync::Arc<std::sync::Mutex<std::collections::HashMap<String, $crate::flume::Sender<std::result::Result<(), $crate::Error>>>>>
184 }
185 impl $name {
186 pub fn new(base_topic: &str, queue_size: usize) -> Self {
187 use std::sync::{Arc, Mutex};
188 use std::collections::HashMap;
189 use $crate::Error;
190 use $crate::{flume, rosrust};
191 let goal_topic = format!("{}/goal", base_topic);
192 let cancel_topic = format!("{}/cancel", base_topic);
193 let goal_publisher = rosrust::publish(&goal_topic, queue_size).unwrap();
194 let goal_id_to_sender: Arc<Mutex<HashMap<String, flume::Sender<std::result::Result<(), Error>> >>> = Arc::new(Mutex::new(HashMap::new()));
195 let goal_id_to_sender_cloned = goal_id_to_sender.clone();
196 let _result_subscriber = rosrust::subscribe(
197 &format!("{}/result", base_topic),
198 queue_size, move |result: $namespace::[<$action_base ActionResult>]| {
199 if let Some(sender) = goal_id_to_sender_cloned.lock().unwrap().remove(&result.status.goal_id.id) {
200 let _ = sender.send(
201 match result.status.status {
203 msg::actionlib_msgs::GoalStatus::SUCCEEDED => {
204 Ok(())
205 },
206 msg::actionlib_msgs::GoalStatus::PREEMPTED => {
207 Err(Error::ActionResultPreempted(format!("{:?}", result)))
208 },
209 _ => {
210 Err(Error::ActionResultNotSuccess(format!("{:?}", result)))
211 }
212 });
213 }
214 }
215 ).unwrap();
216 let cancel_publisher =
217 rosrust::publish(&cancel_topic, queue_size).unwrap();
218 rosrust::ros_info!("Waiting {} ....", goal_topic);
219 $crate::wait_subscriber(&goal_publisher);
220 rosrust::ros_info!("Waiting {} Done", goal_topic);
221 rosrust::ros_info!("Waiting {} ....", cancel_topic);
222 $crate::wait_subscriber(&cancel_publisher);
223 rosrust::ros_info!("Waiting {} Done", cancel_topic);
224 Self {
225 goal_publisher,
226 _result_subscriber,
227 cancel_publisher,
228 goal_id_to_sender
229 }
230 }
231 #[allow(dead_code)]
232 pub fn send_goal_and_wait(&self, goal: $namespace::[<$action_base Goal>], timeout: std::time::Duration) -> Result<(), $crate::Error> {
233 self.send_goal(goal)?.wait(timeout)
234 }
235 #[allow(clippy::field_reassign_with_default)]
236 pub fn send_goal(&self, goal: $namespace::[<$action_base Goal>]) -> Result<$crate::ActionResultWait, $crate::Error> {
237 use $crate::{flume, rosrust};
238 let mut action_goal = <$namespace::[<$action_base ActionGoal>]>::default();
239 action_goal.goal = goal;
240 let goal_id = format!("{}-{}", rosrust::name(), rosrust::now().seconds());
241 action_goal.goal_id.id = goal_id.clone();
242 let (sender, receiver) = flume::unbounded();
243 self.goal_id_to_sender.lock().unwrap().insert(goal_id.clone(), sender);
244 if self.goal_publisher.send(action_goal).is_err() {
245 let _ = self.goal_id_to_sender.lock().unwrap().remove(&goal_id);
246 return Err($crate::Error::ActionGoalSendingFailure);
247 }
248 Ok($crate::ActionResultWait::new(goal_id, receiver))
249 }
250 #[allow(dead_code)]
251 pub fn cancel_goal(&self, goal_id: &str) -> Result<(), $crate::Error> {
252 let mut goal_id_to_sender = self.goal_id_to_sender.lock().unwrap();
253 if goal_id.is_empty() {
254 goal_id_to_sender.clear();
255 } else {
256 let _ = goal_id_to_sender.remove(goal_id);
257 }
258 if self.cancel_publisher.send(
259 msg::actionlib_msgs::GoalID { id: goal_id.to_owned(), ..Default::default()}).is_err() {
260 return Err($crate::Error::ActionCancelSendingFailure);
261 }
262 Ok(())
263 }
264 #[allow(dead_code)]
265 pub fn cancel_all_goal(&self) -> Result<(), $crate::Error> {
266 self.cancel_goal("")
267 }
268 }
269 }
270 };
271}
272
273pub fn convert_system_time_to_ros_time(time: &SystemTime) -> Time {
274 let ros_now = rosrust::now();
275 let system_now = SystemTime::now();
276
277 if system_now < *time {
280 Time::from_nanos(
281 time.duration_since(system_now).unwrap().as_nanos() as i64 + ros_now.nanos(),
282 )
283 } else {
284 Time::from_nanos(
285 ros_now.nanos() - system_now.duration_since(*time).unwrap().as_nanos() as i64,
286 )
287 }
288}
289
290pub fn convert_ros_time_to_system_time(time: &Time) -> SystemTime {
291 let ros_now = rosrust::now();
292 let system_now = SystemTime::now();
293 let ros_time_nanos = time.nanos() as u64;
294 let ros_now_nanos = ros_now.nanos() as u64;
295 if ros_now_nanos < ros_time_nanos {
298 system_now
299 .checked_add(std::time::Duration::from_nanos(
300 ros_time_nanos - ros_now_nanos,
301 ))
302 .unwrap()
303 } else {
304 system_now
305 .checked_sub(std::time::Duration::from_nanos(
306 ros_now_nanos - ros_time_nanos,
307 ))
308 .unwrap()
309 }
310}
311
312pub fn subscribe_with_channel<T: rosrust::Message>(
323 topic_name: &str,
324 queue_size: usize,
325) -> (flume::Receiver<T>, rosrust::Subscriber) {
326 let (tx, rx) = flume::unbounded::<T>();
327
328 let sub = rosrust::subscribe(topic_name, queue_size, move |v: T| {
329 tx.send(v).unwrap();
330 })
331 .unwrap();
332
333 (rx, sub)
334}