1mod handler;
2mod publications;
3mod subscriptions;
4
5pub use self::handler::ParamCache;
6use self::handler::SlaveHandler;
7use super::error::{self, ErrorKind, Result};
8use crate::api::ShutdownManager;
9use crate::tcpros::{Message, PublisherStream, Service, ServicePair, ServiceResult};
10use crate::util::{kill, FAILED_TO_LOCK};
11use crate::{RawMessageDescription, SubscriptionHandler};
12use crossbeam::channel::TryRecvError;
13use error_chain::bail;
14use log::error;
15use std::collections::HashMap;
16use std::sync::{Arc, Mutex};
17use std::thread;
18
19pub struct Slave {
20 name: String,
21 uri: String,
22 pub publications: publications::PublicationsTracker,
23 pub subscriptions: subscriptions::SubscriptionsTracker,
24 pub services: Arc<Mutex<HashMap<String, Service>>>,
25 pub shutdown_tx: kill::Sender,
26}
27
28type SerdeResult<T> = Result<T>;
29
30impl Slave {
31 pub fn new(
32 master_uri: &str,
33 hostname: &str,
34 bind_address: &str,
35 port: u16,
36 name: &str,
37 param_cache: ParamCache,
38 shutdown_manager: Arc<ShutdownManager>,
39 ) -> Result<Slave> {
40 use std::net::ToSocketAddrs;
41
42 let (shutdown_tx, shutdown_rx) = kill::channel(kill::KillMode::Sync);
43 let handler =
44 SlaveHandler::new(master_uri, hostname, name, param_cache, shutdown_tx.clone());
45 let publications = handler.publications.clone();
46 let subscriptions = handler.subscriptions.clone();
47 let services = Arc::clone(&handler.services);
48 let socket_addr = match (bind_address, port).to_socket_addrs()?.next() {
49 Some(socket_addr) => socket_addr,
50 None => bail!(error::ErrorKind::from(error::rosxmlrpc::ErrorKind::BadUri(
51 format!("{}:{}", hostname, port)
52 ))),
53 };
54
55 let bound_handler = handler.bind(&socket_addr)?;
56
57 let port = bound_handler.local_addr().port();
58 let uri = format!("http://{}:{}/", hostname, port);
59
60 thread::spawn(move || {
61 loop {
62 match shutdown_rx.try_recv() {
63 Ok(_) | Err(TryRecvError::Disconnected) => break,
64 Err(TryRecvError::Empty) => {}
65 }
66 bound_handler.poll();
67 std::thread::sleep(std::time::Duration::from_millis(5));
69 }
70 shutdown_manager.shutdown();
71 });
72
73 Ok(Slave {
74 name: String::from(name),
75 uri,
76 publications,
77 subscriptions,
78 services,
79 shutdown_tx,
80 })
81 }
82
83 #[inline]
84 pub fn uri(&self) -> &str {
85 &self.uri
86 }
87
88 pub fn add_publishers_to_subscription<T>(&self, topic: &str, publishers: T) -> SerdeResult<()>
89 where
90 T: Iterator<Item = String>,
91 {
92 self.subscriptions
93 .add_publishers(topic, &self.name, publishers)
94 }
95
96 pub fn add_service<T, F>(
97 &self,
98 hostname: &str,
99 bind_address: &str,
100 service: &str,
101 handler: F,
102 ) -> SerdeResult<String>
103 where
104 T: ServicePair,
105 F: Fn(T::Request) -> ServiceResult<T::Response> + Send + Sync + 'static,
106 {
107 use std::collections::hash_map::Entry;
108 match self
109 .services
110 .lock()
111 .expect(FAILED_TO_LOCK)
112 .entry(String::from(service))
113 {
114 Entry::Occupied(..) => {
115 error!("Duplicate initiation of service '{}' attempted", service);
116 Err(ErrorKind::Duplicate("service".into()).into())
117 }
118 Entry::Vacant(entry) => {
119 let service =
120 Service::new::<T, _>(hostname, bind_address, 0, service, &self.name, handler)?;
121 let api = service.api.clone();
122 entry.insert(service);
123 Ok(api)
124 }
125 }
126 }
127
128 #[inline]
129 pub fn remove_service(&self, service: &str) {
130 self.services.lock().expect(FAILED_TO_LOCK).remove(service);
131 }
132
133 #[inline]
134 pub fn add_publication<T>(
135 &self,
136 hostname: &str,
137 topic: &str,
138 queue_size: usize,
139 message_description: RawMessageDescription,
140 ) -> error::tcpros::Result<PublisherStream<T>>
141 where
142 T: Message,
143 {
144 self.publications
145 .add(hostname, topic, queue_size, &self.name, message_description)
146 }
147
148 #[inline]
149 pub fn remove_publication(&self, topic: &str) {
150 self.publications.remove(topic)
151 }
152
153 #[inline]
154 pub fn add_subscription<T, H>(
155 &self,
156 topic: &str,
157 queue_size: usize,
158 handler: H,
159 ) -> Result<usize>
160 where
161 T: Message,
162 H: SubscriptionHandler<T>,
163 {
164 self.subscriptions
165 .add(&self.name, topic, queue_size, handler)
166 }
167
168 #[inline]
169 pub fn remove_subscription(&self, topic: &str, id: usize) {
170 self.subscriptions.remove(topic, id)
171 }
172
173 #[inline]
174 pub fn get_publisher_count_of_subscription(&self, topic: &str) -> usize {
175 self.subscriptions.publisher_count(topic)
176 }
177
178 #[inline]
179 pub fn get_publisher_uris_of_subscription(&self, topic: &str) -> Vec<String> {
180 self.subscriptions.publisher_uris(topic)
181 }
182}