rosrust/api/slave/
subscriptions.rs
1use crate::api::error::{self, ErrorKind, Result};
2use crate::tcpros::{SubscriberRosConnection, Topic};
3use crate::util::FAILED_TO_LOCK;
4use crate::{Message, SubscriptionHandler};
5use error_chain::bail;
6use log::error;
7use std::collections::{BTreeSet, HashMap};
8use std::iter::FromIterator;
9use std::sync::{Arc, Mutex};
10
11#[derive(Clone, Default)]
12pub struct SubscriptionsTracker {
13 mapping: Arc<Mutex<HashMap<String, SubscriberRosConnection>>>,
14}
15
16impl SubscriptionsTracker {
17 pub fn add_publishers<T>(&self, topic: &str, name: &str, publishers: T) -> Result<()>
18 where
19 T: Iterator<Item = String>,
20 {
21 let mut last_error_message = None;
22 if let Some(subscription) = self.mapping.lock().expect(FAILED_TO_LOCK).get_mut(topic) {
23 let publisher_set: BTreeSet<String> = publishers.collect();
24 subscription.limit_publishers_to(&publisher_set);
25 for publisher in publisher_set {
26 if let Err(err) = connect_to_publisher(subscription, name, &publisher, topic) {
27 let info = err
28 .iter()
29 .map(|v| format!("{}", v))
30 .collect::<Vec<_>>()
31 .join("\nCaused by:");
32 error!("Failed to connect to publisher '{}': {}", publisher, info);
33 last_error_message = Some(err);
34 }
35 }
36 }
37 match last_error_message {
38 None => Ok(()),
39 Some(err) => Err(err),
40 }
41 }
42
43 #[inline]
44 pub fn get_topics<T: FromIterator<Topic>>(&self) -> T {
45 self.mapping
46 .lock()
47 .expect(FAILED_TO_LOCK)
48 .values()
49 .map(SubscriberRosConnection::get_topic)
50 .cloned()
51 .collect()
52 }
53
54 pub fn add<T, H>(&self, name: &str, topic: &str, queue_size: usize, handler: H) -> Result<usize>
55 where
56 T: Message,
57 H: SubscriptionHandler<T>,
58 {
59 let msg_definition = T::msg_definition();
60 let msg_type = T::msg_type();
61 let md5sum = T::md5sum();
62 let mut mapping = self.mapping.lock().expect(FAILED_TO_LOCK);
63 let connection = mapping.entry(String::from(topic)).or_insert_with(|| {
64 SubscriberRosConnection::new(
65 name,
66 topic,
67 msg_definition,
68 msg_type.clone(),
69 md5sum.clone(),
70 )
71 });
72 let connection_topic = connection.get_topic();
73 if !header_matches(&connection_topic.msg_type, &msg_type)
74 || !header_matches(&connection_topic.md5sum, &md5sum)
75 {
76 error!(
77 "Attempted to connect to {} topic '{}' with message type {}",
78 connection_topic.msg_type, topic, msg_type
79 );
80 Err(ErrorKind::MismatchedType(
81 topic.into(),
82 connection_topic.msg_type.clone(),
83 msg_type,
84 )
85 .into())
86 } else {
87 Ok(connection.add_subscriber(queue_size, handler))
88 }
89 }
90
91 #[inline]
92 pub fn remove(&self, topic: &str, id: usize) {
93 let mut mapping = self.mapping.lock().expect(FAILED_TO_LOCK);
94 let has_subs = match mapping.get_mut(topic) {
95 None => return,
96 Some(val) => {
97 val.remove_subscriber(id);
98 val.has_subscribers()
99 }
100 };
101 if !has_subs {
102 mapping.remove(topic);
103 }
104 }
105
106 #[inline]
107 pub fn publisher_count(&self, topic: &str) -> usize {
108 self.mapping
109 .lock()
110 .expect(FAILED_TO_LOCK)
111 .get(topic)
112 .map_or(0, SubscriberRosConnection::publisher_count)
113 }
114
115 #[inline]
116 pub fn publisher_uris(&self, topic: &str) -> Vec<String> {
117 self.mapping
118 .lock()
119 .expect(FAILED_TO_LOCK)
120 .get(topic)
121 .map_or_else(Vec::new, SubscriberRosConnection::publisher_uris)
122 }
123}
124
125fn header_matches(first: &str, second: &str) -> bool {
126 first == "*" || second == "*" || first == second
127}
128
129fn connect_to_publisher(
130 subscriber: &mut SubscriberRosConnection,
131 caller_id: &str,
132 publisher: &str,
133 topic: &str,
134) -> Result<()> {
135 if subscriber.is_connected_to(publisher) {
136 return Ok(());
137 }
138 let (protocol, hostname, port) = request_topic(publisher, caller_id, topic)?;
139 if protocol != "TCPROS" {
140 bail!(ErrorKind::CommunicationIssue(format!(
141 "Publisher responded with a non-TCPROS protocol: {}",
142 protocol
143 )))
144 }
145 subscriber
146 .connect_to(publisher, (hostname.as_str(), port as u16))
147 .map_err(|err| ErrorKind::Io(err).into())
148}
149
150fn request_topic(
151 publisher_uri: &str,
152 caller_id: &str,
153 topic: &str,
154) -> error::rosxmlrpc::Result<(String, String, i32)> {
155 use crate::rosxmlrpc::error::ResultExt;
156 let (_code, _message, protocols): (i32, String, (String, String, i32)) = xml_rpc::Client::new()
157 .map_err(error::rosxmlrpc::ErrorKind::ForeignXmlRpc)?
158 .call(
159 &publisher_uri
160 .parse()
161 .chain_err(|| error::rosxmlrpc::ErrorKind::BadUri(publisher_uri.into()))?,
162 "requestTopic",
163 (caller_id, topic, [["TCPROS"]]),
164 )
165 .chain_err(|| error::rosxmlrpc::ErrorKind::TopicConnectionError(topic.to_owned()))?
166 .map_err(|_| "error")?;
167 Ok(protocols)
168}