rosrust/api/
ros.rs

1use super::super::rosxmlrpc::Response;
2use super::clock::{Clock, Rate, RealClock, SimulatedClock};
3use super::error::{Error, ErrorKind, Result, ResultExt};
4use super::master::{self, Master, Topic};
5use super::naming::{self, Resolver};
6use super::raii::{Publisher, Service, Subscriber};
7use super::resolve;
8use super::slave::Slave;
9use crate::api::clock::Delay;
10use crate::api::handlers::CallbackSubscriptionHandler;
11use crate::api::slave::ParamCache;
12use crate::api::ShutdownManager;
13use crate::msg::rosgraph_msgs::{Clock as ClockMsg, Log};
14use crate::msg::std_msgs::Header;
15use crate::rosxmlrpc::client::bad_response_structure;
16use crate::tcpros::{Client, Message, ServicePair, ServiceResult};
17use crate::util::FAILED_TO_LOCK;
18use crate::{RawMessage, RawMessageDescription, SubscriptionHandler};
19use error_chain::bail;
20use lazy_static::lazy_static;
21use log::error;
22use ros_message::{Duration, Time};
23use serde::{Deserialize, Serialize};
24use std::collections::{HashMap, HashSet};
25use std::ops::Deref;
26use std::sync::Arc;
27use std::sync::Mutex;
28use std::thread::sleep;
29use std::time::Instant;
30use xml_rpc;
31use yaml_rust::{Yaml, YamlLoader};
32
33pub struct Ros {
34    master: Arc<Master>,
35    slave: Arc<Slave>,
36    param_cache: ParamCache,
37    hostname: String,
38    bind_address: String,
39    resolver: Resolver,
40    name: String,
41    clock: Arc<dyn Clock>,
42    static_subs: Vec<Subscriber>,
43    logger: Arc<Mutex<Option<Publisher<Log>>>>,
44    shutdown_manager: Arc<ShutdownManager>,
45}
46
47impl Ros {
48    pub fn new(name: &str) -> Result<Ros> {
49        let mut namespace = resolve::namespace();
50        if !namespace.starts_with('/') {
51            namespace = format!("/{}", namespace);
52        }
53        let master_uri = resolve::master();
54        let hostname = resolve::hostname();
55        let name = resolve::name(name);
56        let mut ros = Ros::new_raw(&master_uri, &hostname, &namespace, &name)?;
57        for (src, dest) in resolve::mappings() {
58            ros.map(&src, &dest)?;
59        }
60        for (src, dest) in resolve::params() {
61            let data = YamlLoader::load_from_str(&dest)
62                .chain_err(|| ErrorKind::BadYamlData(dest.clone()))?
63                .into_iter()
64                .next()
65                .ok_or_else(|| ErrorKind::BadYamlData(dest.clone()))?;
66            let param = ros.param(&src).ok_or(ErrorKind::CannotResolveName(src))?;
67            param.set_raw(yaml_to_xmlrpc(data)?)?;
68        }
69
70        if ros
71            .param("/use_sim_time")
72            .and_then(|v| v.get().ok())
73            .unwrap_or(false)
74        {
75            let clock = Arc::new(SimulatedClock::default());
76            let ros_clock = Arc::clone(&clock);
77            let sub = ros
78                .subscribe::<ClockMsg, _>("/clock", 1, move |v| clock.trigger(v.clock))
79                .chain_err(|| {
80                    ErrorKind::CommunicationIssue("Failed to subscribe to simulated clock".into())
81                })?;
82            ros.static_subs.push(sub);
83            ros.clock = ros_clock;
84        }
85
86        *ros.logger.lock().unwrap() = Some(ros.publish("/rosout", 100)?);
87
88        Ok(ros)
89    }
90
91    fn new_raw(master_uri: &str, hostname: &str, namespace: &str, name: &str) -> Result<Ros> {
92        let namespace = namespace.trim_end_matches('/');
93
94        if name.contains('/') {
95            bail!(ErrorKind::Naming(
96                naming::error::ErrorKind::IllegalCharacter(name.into()),
97            ));
98        }
99
100        let bind_host = {
101            if hostname == "localhost" || hostname.starts_with("127.") {
102                hostname
103            } else {
104                "0.0.0.0"
105            }
106        };
107
108        let name = format!("{}/{}", namespace, name);
109        let resolver = Resolver::new(&name)?;
110
111        let logger = Arc::new(Mutex::new(None));
112        let shutdown_manager = Arc::new(ShutdownManager::new({
113            let logger = Arc::clone(&logger);
114            move || drop(logger.lock().unwrap().take())
115        }));
116
117        let param_cache = Arc::new(Mutex::new(Default::default()));
118        let slave = Slave::new(
119            master_uri,
120            hostname,
121            bind_host,
122            0,
123            &name,
124            Arc::clone(&param_cache),
125            Arc::clone(&shutdown_manager),
126        )?;
127        let master = Master::new(master_uri, &name, slave.uri())?;
128
129        Ok(Ros {
130            master: Arc::new(master),
131            slave: Arc::new(slave),
132            param_cache,
133            hostname: String::from(hostname),
134            bind_address: String::from(bind_host),
135            resolver,
136            name,
137            clock: Arc::new(RealClock::default()),
138            static_subs: Vec::new(),
139            logger,
140            shutdown_manager,
141        })
142    }
143
144    fn map(&mut self, source: &str, destination: &str) -> Result<()> {
145        self.resolver.map(source, destination).map_err(Into::into)
146    }
147
148    #[inline]
149    pub fn uri(&self) -> &str {
150        self.slave.uri()
151    }
152
153    #[inline]
154    pub fn name(&self) -> &str {
155        &self.name
156    }
157
158    #[inline]
159    pub fn hostname(&self) -> &str {
160        &self.hostname
161    }
162
163    #[inline]
164    pub fn bind_address(&self) -> &str {
165        &self.bind_address
166    }
167
168    #[inline]
169    pub fn now(&self) -> Time {
170        self.clock.now()
171    }
172
173    #[inline]
174    pub fn delay(&self, d: Duration) -> Delay {
175        self.clock.await_init();
176        Delay::new(Arc::clone(&self.clock), d)
177    }
178
179    #[inline]
180    pub fn shutdown_sender(&self) -> Arc<ShutdownManager> {
181        Arc::clone(&self.shutdown_manager)
182    }
183
184    pub fn rate(&self, rate: f64) -> Rate {
185        self.clock.await_init();
186        let nanos = 1_000_000_000.0 / rate;
187        Rate::new(Arc::clone(&self.clock), Duration::from_nanos(nanos as i64))
188    }
189
190    #[inline]
191    pub fn is_ok(&self) -> bool {
192        !self.shutdown_manager.awaiting_shutdown()
193    }
194
195    #[inline]
196    pub fn spin(&self) -> Spinner {
197        Spinner {
198            shutdown_manager: Arc::clone(&self.shutdown_manager),
199        }
200    }
201
202    pub fn param(&self, name: &str) -> Option<Parameter> {
203        self.resolver.translate(name).ok().map(|v| Parameter {
204            param_cache: Arc::clone(&self.param_cache),
205            master: Arc::clone(&self.master),
206            name: v,
207        })
208    }
209
210    pub fn parameters(&self) -> Response<Vec<String>> {
211        self.master.get_param_names()
212    }
213
214    pub fn state(&self) -> Response<master::SystemState> {
215        self.master.get_system_state().map(Into::into)
216    }
217
218    pub fn topics(&self) -> Response<Vec<Topic>> {
219        self.master
220            .get_topic_types()
221            .map(|v| v.into_iter().map(Into::into).collect())
222    }
223
224    pub fn client<T: ServicePair>(&self, service: &str) -> Result<Client<T>> {
225        let name = self.resolver.translate(service)?;
226        Ok(Client::new(Arc::clone(&self.master), &self.name, &name))
227    }
228
229    pub fn wait_for_service(
230        &self,
231        service: &str,
232        timeout: Option<std::time::Duration>,
233    ) -> Result<()> {
234        let timeout = timeout.map(|v| std::time::Instant::now() + v);
235        let client = self.client::<RawMessage>(service)?;
236
237        loop {
238            let iteration_limit = std::time::Duration::from_secs(10);
239            let leftover_timeout = match timeout {
240                Some(t) => t
241                    .checked_duration_since(Instant::now())
242                    .ok_or_else(|| Error::from(ErrorKind::TimeoutError))?,
243                None => iteration_limit,
244            }
245            .min(iteration_limit);
246            if client.probe(leftover_timeout).is_ok() {
247                return Ok(());
248            }
249            sleep(std::time::Duration::from_millis(100));
250        }
251    }
252
253    pub fn service<T, F>(&self, service: &str, handler: F) -> Result<Service>
254    where
255        T: ServicePair,
256        F: Fn(T::Request) -> ServiceResult<T::Response> + Send + Sync + 'static,
257    {
258        let name = self.resolver.translate(service)?;
259        Service::new::<T, F>(
260            Arc::clone(&self.master),
261            Arc::clone(&self.slave),
262            &self.hostname,
263            &self.bind_address,
264            &name,
265            handler,
266        )
267    }
268
269    #[inline]
270    pub fn subscribe<T, F>(&self, topic: &str, queue_size: usize, callback: F) -> Result<Subscriber>
271    where
272        T: Message,
273        F: Fn(T) + Send + 'static,
274    {
275        self.subscribe_with_ids(topic, queue_size, move |data, _| callback(data))
276    }
277
278    pub fn subscribe_with_ids<T, F>(
279        &self,
280        topic: &str,
281        queue_size: usize,
282        callback: F,
283    ) -> Result<Subscriber>
284    where
285        T: Message,
286        F: Fn(T, &str) + Send + 'static,
287    {
288        self.subscribe_with_ids_and_headers(
289            topic,
290            queue_size,
291            callback,
292            |_: HashMap<String, String>| (),
293        )
294    }
295
296    pub fn subscribe_with_ids_and_headers<T, F, G>(
297        &self,
298        topic: &str,
299        mut queue_size: usize,
300        on_message: F,
301        on_connect: G,
302    ) -> Result<Subscriber>
303    where
304        T: Message,
305        F: Fn(T, &str) + Send + 'static,
306        G: Fn(HashMap<String, String>) + Send + 'static,
307    {
308        if queue_size == 0 {
309            queue_size = usize::max_value();
310        }
311        let name = self.resolver.translate(topic)?;
312        Subscriber::new::<T, _>(
313            Arc::clone(&self.master),
314            Arc::clone(&self.slave),
315            &name,
316            queue_size,
317            CallbackSubscriptionHandler::new(on_message, on_connect),
318        )
319    }
320
321    pub fn subscribe_with<T, H>(
322        &self,
323        topic: &str,
324        mut queue_size: usize,
325        handler: H,
326    ) -> Result<Subscriber>
327    where
328        T: Message,
329        H: SubscriptionHandler<T>,
330    {
331        if queue_size == 0 {
332            queue_size = usize::max_value();
333        }
334        let name = self.resolver.translate(topic)?;
335        Subscriber::new::<T, H>(
336            Arc::clone(&self.master),
337            Arc::clone(&self.slave),
338            &name,
339            queue_size,
340            handler,
341        )
342    }
343
344    pub fn publish<T>(&self, topic: &str, queue_size: usize) -> Result<Publisher<T>>
345    where
346        T: Message,
347    {
348        self.publish_common(topic, queue_size, None)
349    }
350
351    pub fn publish_with_description<T>(
352        &self,
353        topic: &str,
354        queue_size: usize,
355        message_description: RawMessageDescription,
356    ) -> Result<Publisher<T>>
357    where
358        T: Message,
359    {
360        self.publish_common(topic, queue_size, Some(message_description))
361    }
362
363    fn publish_common<T>(
364        &self,
365        topic: &str,
366        mut queue_size: usize,
367        message_description: Option<RawMessageDescription>,
368    ) -> Result<Publisher<T>>
369    where
370        T: Message,
371    {
372        if queue_size == 0 {
373            queue_size = usize::max_value();
374        }
375        let name = self.resolver.translate(topic)?;
376        Publisher::new(
377            Arc::clone(&self.master),
378            Arc::clone(&self.slave),
379            Arc::clone(&self.clock),
380            &self.bind_address,
381            &name,
382            queue_size,
383            message_description,
384        )
385    }
386
387    fn log_to_terminal(&self, level: i8, msg: &str, file: &str, line: u32) {
388        use colored::{Color, Colorize};
389
390        let format_string =
391            |prefix, color| format!("[{} @ {}:{}]: {}", prefix, file, line, msg).color(color);
392
393        match level {
394            Log::DEBUG => println!("{}", format_string("DEBUG", Color::White)),
395            Log::INFO => println!("{}", format_string("INFO", Color::White)),
396            Log::WARN => eprintln!("{}", format_string("WARN", Color::Yellow)),
397            Log::ERROR => eprintln!("{}", format_string("ERROR", Color::Red)),
398            Log::FATAL => eprintln!("{}", format_string("FATAL", Color::Red)),
399            _ => {}
400        }
401    }
402
403    pub fn log(&self, level: i8, msg: String, file: &str, line: u32) {
404        self.log_to_terminal(level, &msg, file, line);
405        let topics = self.slave.publications.get_topic_names();
406        let message = Log {
407            header: Header::default(),
408            level,
409            msg,
410            name: self.name.clone(),
411            line,
412            file: file.into(),
413            function: String::default(),
414            topics,
415        };
416        let maybe_logger = self.logger.lock().unwrap();
417        if let Some(logger) = maybe_logger.deref() {
418            if let Err(err) = logger.send(message) {
419                error!("Logging error: {}", err);
420            }
421        }
422    }
423
424    pub fn log_once(&self, level: i8, msg: String, file: &str, line: u32) {
425        lazy_static! {
426            static ref UNIQUE_LOGS: Mutex<HashSet<String>> = Mutex::new(HashSet::new());
427        }
428        let key = format!("{}:{}", file, line);
429        let mut unique_logs = UNIQUE_LOGS.lock().expect(FAILED_TO_LOCK);
430        if !unique_logs.contains(&key) {
431            unique_logs.insert(key);
432            self.log(level, msg, file, line);
433        }
434    }
435
436    pub fn log_throttle(&self, period: f64, level: i8, msg: String, file: &str, line: u32) {
437        lazy_static! {
438            static ref PERIODIC_LOGS: Mutex<HashMap<String, Time>> = Mutex::new(HashMap::new());
439        }
440        let now = self.now();
441        let key = format!("{}:{}", file, line);
442        let get_next_log_time = |now, period| now + Duration::from_nanos((period * 1e9) as i64);
443        let mut period_logs = PERIODIC_LOGS.lock().expect(FAILED_TO_LOCK);
444        match period_logs.get_mut(&key) {
445            Some(next_log_time) => {
446                if now >= *next_log_time {
447                    *next_log_time = get_next_log_time(*next_log_time, period);
448                    self.log(level, msg, file, line);
449                }
450            }
451            None => {
452                period_logs.insert(key, get_next_log_time(now, period));
453                self.log(level, msg, file, line);
454            }
455        }
456    }
457
458    pub fn log_throttle_identical(
459        &self,
460        period: f64,
461        level: i8,
462        msg: String,
463        file: &str,
464        line: u32,
465    ) {
466        lazy_static! {
467            static ref IDENTICAL_LOGS: Mutex<HashMap<String, (Time, String)>> =
468                Mutex::new(HashMap::new());
469        }
470        let now = self.now();
471        let key = format!("{}:{}", file, line);
472        let get_next_log_time = |now, period| now + Duration::from_nanos((period * 1e9) as i64);
473        let mut identical_logs = IDENTICAL_LOGS.lock().expect(FAILED_TO_LOCK);
474        match identical_logs.get_mut(&key) {
475            Some((next_log_time, previous_msg)) => {
476                if &msg != previous_msg {
477                    *previous_msg = msg.clone();
478                    *next_log_time = get_next_log_time(now, period);
479                    self.log(level, msg, file, line);
480                } else if now >= *next_log_time {
481                    *next_log_time = get_next_log_time(*next_log_time, period);
482                    self.log(level, msg, file, line);
483                }
484            }
485            None => {
486                identical_logs.insert(key, (get_next_log_time(now, period), msg.clone()));
487                self.log(level, msg, file, line);
488            }
489        }
490    }
491}
492
493pub struct Parameter {
494    param_cache: ParamCache,
495    master: Arc<Master>,
496    name: String,
497}
498
499impl Parameter {
500    pub fn name(&self) -> &str {
501        &self.name
502    }
503
504    pub fn get<'b, T: Deserialize<'b>>(&self) -> Response<T> {
505        let data = self.get_raw()?;
506        Deserialize::deserialize(data).map_err(bad_response_structure)
507    }
508
509    pub fn get_raw(&self) -> Response<xml_rpc::Value> {
510        let subscribed;
511        {
512            let cache = self.param_cache.lock().expect(FAILED_TO_LOCK);
513            if let Some(data) = cache.data.get(&self.name) {
514                return data.clone();
515            }
516            subscribed = cache.subscribed;
517        }
518        if !subscribed {
519            self.master.subscribe_param_any("/")?;
520            self.param_cache.lock().expect(FAILED_TO_LOCK).subscribed = true;
521        }
522        let data = self.master.get_param_any(&self.name);
523        self.param_cache
524            .lock()
525            .expect(FAILED_TO_LOCK)
526            .data
527            .insert(self.name.clone(), data.clone());
528        data
529    }
530
531    pub fn set<T: Serialize>(&self, value: &T) -> Response<()> {
532        self.master.set_param::<T>(&self.name, value)?;
533        self.clear_param_cache();
534        Ok(())
535    }
536
537    pub fn set_raw(&self, value: xml_rpc::Value) -> Response<()> {
538        self.master.set_param_any(&self.name, value)?;
539        self.clear_param_cache();
540        Ok(())
541    }
542
543    pub fn delete(&self) -> Response<()> {
544        self.master.delete_param(&self.name)?;
545        self.clear_param_cache();
546        Ok(())
547    }
548
549    pub fn exists(&self) -> Response<bool> {
550        self.master.has_param(&self.name)
551    }
552
553    pub fn search(&self) -> Response<String> {
554        self.master.search_param(&self.name)
555    }
556
557    fn clear_param_cache(&self) {
558        self.param_cache.lock().expect(FAILED_TO_LOCK).data.clear();
559    }
560}
561
562fn yaml_to_xmlrpc(val: Yaml) -> Result<xml_rpc::Value> {
563    Ok(match val {
564        Yaml::Real(v) => xml_rpc::Value::Double(
565            v.parse()
566                .chain_err(|| ErrorKind::BadYamlData("Failed to parse float".into()))?,
567        ),
568        Yaml::Integer(v) => xml_rpc::Value::Int(v as i32),
569        Yaml::String(v) => xml_rpc::Value::String(v),
570        Yaml::Boolean(v) => xml_rpc::Value::Bool(v),
571        Yaml::Array(v) => {
572            xml_rpc::Value::Array(v.into_iter().map(yaml_to_xmlrpc).collect::<Result<_>>()?)
573        }
574        Yaml::Hash(v) => xml_rpc::Value::Struct(
575            v.into_iter()
576                .map(|(k, v)| Ok((yaml_to_string(k)?, yaml_to_xmlrpc(v)?)))
577                .collect::<Result<_>>()?,
578        ),
579        Yaml::Alias(_) => bail!(ErrorKind::BadYamlData("Alias is not supported".into())),
580        Yaml::Null => bail!(ErrorKind::BadYamlData("Illegal null value".into())),
581        Yaml::BadValue => bail!(ErrorKind::BadYamlData("Bad value provided".into())),
582    })
583}
584
585fn yaml_to_string(val: Yaml) -> Result<String> {
586    Ok(match val {
587        Yaml::Real(v) | Yaml::String(v) => v,
588        Yaml::Integer(v) => v.to_string(),
589        Yaml::Boolean(true) => "true".into(),
590        Yaml::Boolean(false) => "false".into(),
591        _ => bail!(ErrorKind::BadYamlData(
592            "Hash keys need to be strings".into()
593        )),
594    })
595}
596
597pub struct Spinner {
598    shutdown_manager: Arc<ShutdownManager>,
599}
600
601impl Drop for Spinner {
602    fn drop(&mut self) {
603        while !self.shutdown_manager.awaiting_shutdown() {
604            sleep(std::time::Duration::from_millis(100));
605        }
606    }
607}