rosrust/api/slave/
subscriptions.rs

1use crate::api::error::{self, ErrorKind, Result};
2use crate::tcpros::{SubscriberRosConnection, Topic};
3use crate::util::FAILED_TO_LOCK;
4use crate::{Message, SubscriptionHandler};
5use error_chain::bail;
6use log::error;
7use std::collections::{BTreeSet, HashMap};
8use std::iter::FromIterator;
9use std::sync::{Arc, Mutex};
10
11#[derive(Clone, Default)]
12pub struct SubscriptionsTracker {
13    mapping: Arc<Mutex<HashMap<String, SubscriberRosConnection>>>,
14}
15
16impl SubscriptionsTracker {
17    pub fn add_publishers<T>(&self, topic: &str, name: &str, publishers: T) -> Result<()>
18    where
19        T: Iterator<Item = String>,
20    {
21        let mut last_error_message = None;
22        if let Some(subscription) = self.mapping.lock().expect(FAILED_TO_LOCK).get_mut(topic) {
23            let publisher_set: BTreeSet<String> = publishers.collect();
24            subscription.limit_publishers_to(&publisher_set);
25            for publisher in publisher_set {
26                if let Err(err) = connect_to_publisher(subscription, name, &publisher, topic) {
27                    let info = err
28                        .iter()
29                        .map(|v| format!("{}", v))
30                        .collect::<Vec<_>>()
31                        .join("\nCaused by:");
32                    error!("Failed to connect to publisher '{}': {}", publisher, info);
33                    last_error_message = Some(err);
34                }
35            }
36        }
37        match last_error_message {
38            None => Ok(()),
39            Some(err) => Err(err),
40        }
41    }
42
43    #[inline]
44    pub fn get_topics<T: FromIterator<Topic>>(&self) -> T {
45        self.mapping
46            .lock()
47            .expect(FAILED_TO_LOCK)
48            .values()
49            .map(SubscriberRosConnection::get_topic)
50            .cloned()
51            .collect()
52    }
53
54    pub fn add<T, H>(&self, name: &str, topic: &str, queue_size: usize, handler: H) -> Result<usize>
55    where
56        T: Message,
57        H: SubscriptionHandler<T>,
58    {
59        let msg_definition = T::msg_definition();
60        let msg_type = T::msg_type();
61        let md5sum = T::md5sum();
62        let mut mapping = self.mapping.lock().expect(FAILED_TO_LOCK);
63        let connection = mapping.entry(String::from(topic)).or_insert_with(|| {
64            SubscriberRosConnection::new(
65                name,
66                topic,
67                msg_definition,
68                msg_type.clone(),
69                md5sum.clone(),
70            )
71        });
72        let connection_topic = connection.get_topic();
73        if !header_matches(&connection_topic.msg_type, &msg_type)
74            || !header_matches(&connection_topic.md5sum, &md5sum)
75        {
76            error!(
77                "Attempted to connect to {} topic '{}' with message type {}",
78                connection_topic.msg_type, topic, msg_type
79            );
80            Err(ErrorKind::MismatchedType(
81                topic.into(),
82                connection_topic.msg_type.clone(),
83                msg_type,
84            )
85            .into())
86        } else {
87            Ok(connection.add_subscriber(queue_size, handler))
88        }
89    }
90
91    #[inline]
92    pub fn remove(&self, topic: &str, id: usize) {
93        let mut mapping = self.mapping.lock().expect(FAILED_TO_LOCK);
94        let has_subs = match mapping.get_mut(topic) {
95            None => return,
96            Some(val) => {
97                val.remove_subscriber(id);
98                val.has_subscribers()
99            }
100        };
101        if !has_subs {
102            mapping.remove(topic);
103        }
104    }
105
106    #[inline]
107    pub fn publisher_count(&self, topic: &str) -> usize {
108        self.mapping
109            .lock()
110            .expect(FAILED_TO_LOCK)
111            .get(topic)
112            .map_or(0, SubscriberRosConnection::publisher_count)
113    }
114
115    #[inline]
116    pub fn publisher_uris(&self, topic: &str) -> Vec<String> {
117        self.mapping
118            .lock()
119            .expect(FAILED_TO_LOCK)
120            .get(topic)
121            .map_or_else(Vec::new, SubscriberRosConnection::publisher_uris)
122    }
123}
124
125fn header_matches(first: &str, second: &str) -> bool {
126    first == "*" || second == "*" || first == second
127}
128
129fn connect_to_publisher(
130    subscriber: &mut SubscriberRosConnection,
131    caller_id: &str,
132    publisher: &str,
133    topic: &str,
134) -> Result<()> {
135    if subscriber.is_connected_to(publisher) {
136        return Ok(());
137    }
138    let (protocol, hostname, port) = request_topic(publisher, caller_id, topic)?;
139    if protocol != "TCPROS" {
140        bail!(ErrorKind::CommunicationIssue(format!(
141            "Publisher responded with a non-TCPROS protocol: {}",
142            protocol
143        )))
144    }
145    subscriber
146        .connect_to(publisher, (hostname.as_str(), port as u16))
147        .map_err(|err| ErrorKind::Io(err).into())
148}
149
150fn request_topic(
151    publisher_uri: &str,
152    caller_id: &str,
153    topic: &str,
154) -> error::rosxmlrpc::Result<(String, String, i32)> {
155    use crate::rosxmlrpc::error::ResultExt;
156    let (_code, _message, protocols): (i32, String, (String, String, i32)) = xml_rpc::Client::new()
157        .map_err(error::rosxmlrpc::ErrorKind::ForeignXmlRpc)?
158        .call(
159            &publisher_uri
160                .parse()
161                .chain_err(|| error::rosxmlrpc::ErrorKind::BadUri(publisher_uri.into()))?,
162            "requestTopic",
163            (caller_id, topic, [["TCPROS"]]),
164        )
165        .chain_err(|| error::rosxmlrpc::ErrorKind::TopicConnectionError(topic.to_owned()))?
166        .map_err(|_| "error")?;
167    Ok(protocols)
168}