1use super::super::rosxmlrpc::Response;
2use super::clock::{Clock, Rate, RealClock, SimulatedClock};
3use super::error::{Error, ErrorKind, Result, ResultExt};
4use super::master::{self, Master, Topic};
5use super::naming::{self, Resolver};
6use super::raii::{Publisher, Service, Subscriber};
7use super::resolve;
8use super::slave::Slave;
9use crate::api::clock::Delay;
10use crate::api::handlers::CallbackSubscriptionHandler;
11use crate::api::slave::ParamCache;
12use crate::api::ShutdownManager;
13use crate::msg::rosgraph_msgs::{Clock as ClockMsg, Log};
14use crate::msg::std_msgs::Header;
15use crate::rosxmlrpc::client::bad_response_structure;
16use crate::tcpros::{Client, Message, ServicePair, ServiceResult};
17use crate::util::FAILED_TO_LOCK;
18use crate::{RawMessage, RawMessageDescription, SubscriptionHandler};
19use error_chain::bail;
20use lazy_static::lazy_static;
21use log::error;
22use ros_message::{Duration, Time};
23use serde::{Deserialize, Serialize};
24use std::collections::{HashMap, HashSet};
25use std::ops::Deref;
26use std::sync::Arc;
27use std::sync::Mutex;
28use std::thread::sleep;
29use std::time::Instant;
30use xml_rpc;
31use yaml_rust::{Yaml, YamlLoader};
32
33pub struct Ros {
34 master: Arc<Master>,
35 slave: Arc<Slave>,
36 param_cache: ParamCache,
37 hostname: String,
38 bind_address: String,
39 resolver: Resolver,
40 name: String,
41 clock: Arc<dyn Clock>,
42 static_subs: Vec<Subscriber>,
43 logger: Arc<Mutex<Option<Publisher<Log>>>>,
44 shutdown_manager: Arc<ShutdownManager>,
45}
46
47impl Ros {
48 pub fn new(name: &str) -> Result<Ros> {
49 let mut namespace = resolve::namespace();
50 if !namespace.starts_with('/') {
51 namespace = format!("/{}", namespace);
52 }
53 let master_uri = resolve::master();
54 let hostname = resolve::hostname();
55 let name = resolve::name(name);
56 let mut ros = Ros::new_raw(&master_uri, &hostname, &namespace, &name)?;
57 for (src, dest) in resolve::mappings() {
58 ros.map(&src, &dest)?;
59 }
60 for (src, dest) in resolve::params() {
61 let data = YamlLoader::load_from_str(&dest)
62 .chain_err(|| ErrorKind::BadYamlData(dest.clone()))?
63 .into_iter()
64 .next()
65 .ok_or_else(|| ErrorKind::BadYamlData(dest.clone()))?;
66 let param = ros.param(&src).ok_or(ErrorKind::CannotResolveName(src))?;
67 param.set_raw(yaml_to_xmlrpc(data)?)?;
68 }
69
70 if ros
71 .param("/use_sim_time")
72 .and_then(|v| v.get().ok())
73 .unwrap_or(false)
74 {
75 let clock = Arc::new(SimulatedClock::default());
76 let ros_clock = Arc::clone(&clock);
77 let sub = ros
78 .subscribe::<ClockMsg, _>("/clock", 1, move |v| clock.trigger(v.clock))
79 .chain_err(|| {
80 ErrorKind::CommunicationIssue("Failed to subscribe to simulated clock".into())
81 })?;
82 ros.static_subs.push(sub);
83 ros.clock = ros_clock;
84 }
85
86 *ros.logger.lock().unwrap() = Some(ros.publish("/rosout", 100)?);
87
88 Ok(ros)
89 }
90
91 fn new_raw(master_uri: &str, hostname: &str, namespace: &str, name: &str) -> Result<Ros> {
92 let namespace = namespace.trim_end_matches('/');
93
94 if name.contains('/') {
95 bail!(ErrorKind::Naming(
96 naming::error::ErrorKind::IllegalCharacter(name.into()),
97 ));
98 }
99
100 let bind_host = {
101 if hostname == "localhost" || hostname.starts_with("127.") {
102 hostname
103 } else {
104 "0.0.0.0"
105 }
106 };
107
108 let name = format!("{}/{}", namespace, name);
109 let resolver = Resolver::new(&name)?;
110
111 let logger = Arc::new(Mutex::new(None));
112 let shutdown_manager = Arc::new(ShutdownManager::new({
113 let logger = Arc::clone(&logger);
114 move || drop(logger.lock().unwrap().take())
115 }));
116
117 let param_cache = Arc::new(Mutex::new(Default::default()));
118 let slave = Slave::new(
119 master_uri,
120 hostname,
121 bind_host,
122 0,
123 &name,
124 Arc::clone(¶m_cache),
125 Arc::clone(&shutdown_manager),
126 )?;
127 let master = Master::new(master_uri, &name, slave.uri())?;
128
129 Ok(Ros {
130 master: Arc::new(master),
131 slave: Arc::new(slave),
132 param_cache,
133 hostname: String::from(hostname),
134 bind_address: String::from(bind_host),
135 resolver,
136 name,
137 clock: Arc::new(RealClock::default()),
138 static_subs: Vec::new(),
139 logger,
140 shutdown_manager,
141 })
142 }
143
144 fn map(&mut self, source: &str, destination: &str) -> Result<()> {
145 self.resolver.map(source, destination).map_err(Into::into)
146 }
147
148 #[inline]
149 pub fn uri(&self) -> &str {
150 self.slave.uri()
151 }
152
153 #[inline]
154 pub fn name(&self) -> &str {
155 &self.name
156 }
157
158 #[inline]
159 pub fn hostname(&self) -> &str {
160 &self.hostname
161 }
162
163 #[inline]
164 pub fn bind_address(&self) -> &str {
165 &self.bind_address
166 }
167
168 #[inline]
169 pub fn now(&self) -> Time {
170 self.clock.now()
171 }
172
173 #[inline]
174 pub fn delay(&self, d: Duration) -> Delay {
175 self.clock.await_init();
176 Delay::new(Arc::clone(&self.clock), d)
177 }
178
179 #[inline]
180 pub fn shutdown_sender(&self) -> Arc<ShutdownManager> {
181 Arc::clone(&self.shutdown_manager)
182 }
183
184 pub fn rate(&self, rate: f64) -> Rate {
185 self.clock.await_init();
186 let nanos = 1_000_000_000.0 / rate;
187 Rate::new(Arc::clone(&self.clock), Duration::from_nanos(nanos as i64))
188 }
189
190 #[inline]
191 pub fn is_ok(&self) -> bool {
192 !self.shutdown_manager.awaiting_shutdown()
193 }
194
195 #[inline]
196 pub fn spin(&self) -> Spinner {
197 Spinner {
198 shutdown_manager: Arc::clone(&self.shutdown_manager),
199 }
200 }
201
202 pub fn param(&self, name: &str) -> Option<Parameter> {
203 self.resolver.translate(name).ok().map(|v| Parameter {
204 param_cache: Arc::clone(&self.param_cache),
205 master: Arc::clone(&self.master),
206 name: v,
207 })
208 }
209
210 pub fn parameters(&self) -> Response<Vec<String>> {
211 self.master.get_param_names()
212 }
213
214 pub fn state(&self) -> Response<master::SystemState> {
215 self.master.get_system_state().map(Into::into)
216 }
217
218 pub fn topics(&self) -> Response<Vec<Topic>> {
219 self.master
220 .get_topic_types()
221 .map(|v| v.into_iter().map(Into::into).collect())
222 }
223
224 pub fn client<T: ServicePair>(&self, service: &str) -> Result<Client<T>> {
225 let name = self.resolver.translate(service)?;
226 Ok(Client::new(Arc::clone(&self.master), &self.name, &name))
227 }
228
229 pub fn wait_for_service(
230 &self,
231 service: &str,
232 timeout: Option<std::time::Duration>,
233 ) -> Result<()> {
234 let timeout = timeout.map(|v| std::time::Instant::now() + v);
235 let client = self.client::<RawMessage>(service)?;
236
237 loop {
238 let iteration_limit = std::time::Duration::from_secs(10);
239 let leftover_timeout = match timeout {
240 Some(t) => t
241 .checked_duration_since(Instant::now())
242 .ok_or_else(|| Error::from(ErrorKind::TimeoutError))?,
243 None => iteration_limit,
244 }
245 .min(iteration_limit);
246 if client.probe(leftover_timeout).is_ok() {
247 return Ok(());
248 }
249 sleep(std::time::Duration::from_millis(100));
250 }
251 }
252
253 pub fn service<T, F>(&self, service: &str, handler: F) -> Result<Service>
254 where
255 T: ServicePair,
256 F: Fn(T::Request) -> ServiceResult<T::Response> + Send + Sync + 'static,
257 {
258 let name = self.resolver.translate(service)?;
259 Service::new::<T, F>(
260 Arc::clone(&self.master),
261 Arc::clone(&self.slave),
262 &self.hostname,
263 &self.bind_address,
264 &name,
265 handler,
266 )
267 }
268
269 #[inline]
270 pub fn subscribe<T, F>(&self, topic: &str, queue_size: usize, callback: F) -> Result<Subscriber>
271 where
272 T: Message,
273 F: Fn(T) + Send + 'static,
274 {
275 self.subscribe_with_ids(topic, queue_size, move |data, _| callback(data))
276 }
277
278 pub fn subscribe_with_ids<T, F>(
279 &self,
280 topic: &str,
281 queue_size: usize,
282 callback: F,
283 ) -> Result<Subscriber>
284 where
285 T: Message,
286 F: Fn(T, &str) + Send + 'static,
287 {
288 self.subscribe_with_ids_and_headers(
289 topic,
290 queue_size,
291 callback,
292 |_: HashMap<String, String>| (),
293 )
294 }
295
296 pub fn subscribe_with_ids_and_headers<T, F, G>(
297 &self,
298 topic: &str,
299 mut queue_size: usize,
300 on_message: F,
301 on_connect: G,
302 ) -> Result<Subscriber>
303 where
304 T: Message,
305 F: Fn(T, &str) + Send + 'static,
306 G: Fn(HashMap<String, String>) + Send + 'static,
307 {
308 if queue_size == 0 {
309 queue_size = usize::max_value();
310 }
311 let name = self.resolver.translate(topic)?;
312 Subscriber::new::<T, _>(
313 Arc::clone(&self.master),
314 Arc::clone(&self.slave),
315 &name,
316 queue_size,
317 CallbackSubscriptionHandler::new(on_message, on_connect),
318 )
319 }
320
321 pub fn subscribe_with<T, H>(
322 &self,
323 topic: &str,
324 mut queue_size: usize,
325 handler: H,
326 ) -> Result<Subscriber>
327 where
328 T: Message,
329 H: SubscriptionHandler<T>,
330 {
331 if queue_size == 0 {
332 queue_size = usize::max_value();
333 }
334 let name = self.resolver.translate(topic)?;
335 Subscriber::new::<T, H>(
336 Arc::clone(&self.master),
337 Arc::clone(&self.slave),
338 &name,
339 queue_size,
340 handler,
341 )
342 }
343
344 pub fn publish<T>(&self, topic: &str, queue_size: usize) -> Result<Publisher<T>>
345 where
346 T: Message,
347 {
348 self.publish_common(topic, queue_size, None)
349 }
350
351 pub fn publish_with_description<T>(
352 &self,
353 topic: &str,
354 queue_size: usize,
355 message_description: RawMessageDescription,
356 ) -> Result<Publisher<T>>
357 where
358 T: Message,
359 {
360 self.publish_common(topic, queue_size, Some(message_description))
361 }
362
363 fn publish_common<T>(
364 &self,
365 topic: &str,
366 mut queue_size: usize,
367 message_description: Option<RawMessageDescription>,
368 ) -> Result<Publisher<T>>
369 where
370 T: Message,
371 {
372 if queue_size == 0 {
373 queue_size = usize::max_value();
374 }
375 let name = self.resolver.translate(topic)?;
376 Publisher::new(
377 Arc::clone(&self.master),
378 Arc::clone(&self.slave),
379 Arc::clone(&self.clock),
380 &self.bind_address,
381 &name,
382 queue_size,
383 message_description,
384 )
385 }
386
387 fn log_to_terminal(&self, level: i8, msg: &str, file: &str, line: u32) {
388 use colored::{Color, Colorize};
389
390 let format_string =
391 |prefix, color| format!("[{} @ {}:{}]: {}", prefix, file, line, msg).color(color);
392
393 match level {
394 Log::DEBUG => println!("{}", format_string("DEBUG", Color::White)),
395 Log::INFO => println!("{}", format_string("INFO", Color::White)),
396 Log::WARN => eprintln!("{}", format_string("WARN", Color::Yellow)),
397 Log::ERROR => eprintln!("{}", format_string("ERROR", Color::Red)),
398 Log::FATAL => eprintln!("{}", format_string("FATAL", Color::Red)),
399 _ => {}
400 }
401 }
402
403 pub fn log(&self, level: i8, msg: String, file: &str, line: u32) {
404 self.log_to_terminal(level, &msg, file, line);
405 let topics = self.slave.publications.get_topic_names();
406 let message = Log {
407 header: Header::default(),
408 level,
409 msg,
410 name: self.name.clone(),
411 line,
412 file: file.into(),
413 function: String::default(),
414 topics,
415 };
416 let maybe_logger = self.logger.lock().unwrap();
417 if let Some(logger) = maybe_logger.deref() {
418 if let Err(err) = logger.send(message) {
419 error!("Logging error: {}", err);
420 }
421 }
422 }
423
424 pub fn log_once(&self, level: i8, msg: String, file: &str, line: u32) {
425 lazy_static! {
426 static ref UNIQUE_LOGS: Mutex<HashSet<String>> = Mutex::new(HashSet::new());
427 }
428 let key = format!("{}:{}", file, line);
429 let mut unique_logs = UNIQUE_LOGS.lock().expect(FAILED_TO_LOCK);
430 if !unique_logs.contains(&key) {
431 unique_logs.insert(key);
432 self.log(level, msg, file, line);
433 }
434 }
435
436 pub fn log_throttle(&self, period: f64, level: i8, msg: String, file: &str, line: u32) {
437 lazy_static! {
438 static ref PERIODIC_LOGS: Mutex<HashMap<String, Time>> = Mutex::new(HashMap::new());
439 }
440 let now = self.now();
441 let key = format!("{}:{}", file, line);
442 let get_next_log_time = |now, period| now + Duration::from_nanos((period * 1e9) as i64);
443 let mut period_logs = PERIODIC_LOGS.lock().expect(FAILED_TO_LOCK);
444 match period_logs.get_mut(&key) {
445 Some(next_log_time) => {
446 if now >= *next_log_time {
447 *next_log_time = get_next_log_time(*next_log_time, period);
448 self.log(level, msg, file, line);
449 }
450 }
451 None => {
452 period_logs.insert(key, get_next_log_time(now, period));
453 self.log(level, msg, file, line);
454 }
455 }
456 }
457
458 pub fn log_throttle_identical(
459 &self,
460 period: f64,
461 level: i8,
462 msg: String,
463 file: &str,
464 line: u32,
465 ) {
466 lazy_static! {
467 static ref IDENTICAL_LOGS: Mutex<HashMap<String, (Time, String)>> =
468 Mutex::new(HashMap::new());
469 }
470 let now = self.now();
471 let key = format!("{}:{}", file, line);
472 let get_next_log_time = |now, period| now + Duration::from_nanos((period * 1e9) as i64);
473 let mut identical_logs = IDENTICAL_LOGS.lock().expect(FAILED_TO_LOCK);
474 match identical_logs.get_mut(&key) {
475 Some((next_log_time, previous_msg)) => {
476 if &msg != previous_msg {
477 *previous_msg = msg.clone();
478 *next_log_time = get_next_log_time(now, period);
479 self.log(level, msg, file, line);
480 } else if now >= *next_log_time {
481 *next_log_time = get_next_log_time(*next_log_time, period);
482 self.log(level, msg, file, line);
483 }
484 }
485 None => {
486 identical_logs.insert(key, (get_next_log_time(now, period), msg.clone()));
487 self.log(level, msg, file, line);
488 }
489 }
490 }
491}
492
493pub struct Parameter {
494 param_cache: ParamCache,
495 master: Arc<Master>,
496 name: String,
497}
498
499impl Parameter {
500 pub fn name(&self) -> &str {
501 &self.name
502 }
503
504 pub fn get<'b, T: Deserialize<'b>>(&self) -> Response<T> {
505 let data = self.get_raw()?;
506 Deserialize::deserialize(data).map_err(bad_response_structure)
507 }
508
509 pub fn get_raw(&self) -> Response<xml_rpc::Value> {
510 let subscribed;
511 {
512 let cache = self.param_cache.lock().expect(FAILED_TO_LOCK);
513 if let Some(data) = cache.data.get(&self.name) {
514 return data.clone();
515 }
516 subscribed = cache.subscribed;
517 }
518 if !subscribed {
519 self.master.subscribe_param_any("/")?;
520 self.param_cache.lock().expect(FAILED_TO_LOCK).subscribed = true;
521 }
522 let data = self.master.get_param_any(&self.name);
523 self.param_cache
524 .lock()
525 .expect(FAILED_TO_LOCK)
526 .data
527 .insert(self.name.clone(), data.clone());
528 data
529 }
530
531 pub fn set<T: Serialize>(&self, value: &T) -> Response<()> {
532 self.master.set_param::<T>(&self.name, value)?;
533 self.clear_param_cache();
534 Ok(())
535 }
536
537 pub fn set_raw(&self, value: xml_rpc::Value) -> Response<()> {
538 self.master.set_param_any(&self.name, value)?;
539 self.clear_param_cache();
540 Ok(())
541 }
542
543 pub fn delete(&self) -> Response<()> {
544 self.master.delete_param(&self.name)?;
545 self.clear_param_cache();
546 Ok(())
547 }
548
549 pub fn exists(&self) -> Response<bool> {
550 self.master.has_param(&self.name)
551 }
552
553 pub fn search(&self) -> Response<String> {
554 self.master.search_param(&self.name)
555 }
556
557 fn clear_param_cache(&self) {
558 self.param_cache.lock().expect(FAILED_TO_LOCK).data.clear();
559 }
560}
561
562fn yaml_to_xmlrpc(val: Yaml) -> Result<xml_rpc::Value> {
563 Ok(match val {
564 Yaml::Real(v) => xml_rpc::Value::Double(
565 v.parse()
566 .chain_err(|| ErrorKind::BadYamlData("Failed to parse float".into()))?,
567 ),
568 Yaml::Integer(v) => xml_rpc::Value::Int(v as i32),
569 Yaml::String(v) => xml_rpc::Value::String(v),
570 Yaml::Boolean(v) => xml_rpc::Value::Bool(v),
571 Yaml::Array(v) => {
572 xml_rpc::Value::Array(v.into_iter().map(yaml_to_xmlrpc).collect::<Result<_>>()?)
573 }
574 Yaml::Hash(v) => xml_rpc::Value::Struct(
575 v.into_iter()
576 .map(|(k, v)| Ok((yaml_to_string(k)?, yaml_to_xmlrpc(v)?)))
577 .collect::<Result<_>>()?,
578 ),
579 Yaml::Alias(_) => bail!(ErrorKind::BadYamlData("Alias is not supported".into())),
580 Yaml::Null => bail!(ErrorKind::BadYamlData("Illegal null value".into())),
581 Yaml::BadValue => bail!(ErrorKind::BadYamlData("Bad value provided".into())),
582 })
583}
584
585fn yaml_to_string(val: Yaml) -> Result<String> {
586 Ok(match val {
587 Yaml::Real(v) | Yaml::String(v) => v,
588 Yaml::Integer(v) => v.to_string(),
589 Yaml::Boolean(true) => "true".into(),
590 Yaml::Boolean(false) => "false".into(),
591 _ => bail!(ErrorKind::BadYamlData(
592 "Hash keys need to be strings".into()
593 )),
594 })
595}
596
597pub struct Spinner {
598 shutdown_manager: Arc<ShutdownManager>,
599}
600
601impl Drop for Spinner {
602 fn drop(&mut self) {
603 while !self.shutdown_manager.awaiting_shutdown() {
604 sleep(std::time::Duration::from_millis(100));
605 }
606 }
607}