rosrust/api/slave/
mod.rs

1mod handler;
2mod publications;
3mod subscriptions;
4
5pub use self::handler::ParamCache;
6use self::handler::SlaveHandler;
7use super::error::{self, ErrorKind, Result};
8use crate::api::ShutdownManager;
9use crate::tcpros::{Message, PublisherStream, Service, ServicePair, ServiceResult};
10use crate::util::{kill, FAILED_TO_LOCK};
11use crate::{RawMessageDescription, SubscriptionHandler};
12use crossbeam::channel::TryRecvError;
13use error_chain::bail;
14use log::error;
15use std::collections::HashMap;
16use std::sync::{Arc, Mutex};
17use std::thread;
18
19pub struct Slave {
20    name: String,
21    uri: String,
22    pub publications: publications::PublicationsTracker,
23    pub subscriptions: subscriptions::SubscriptionsTracker,
24    pub services: Arc<Mutex<HashMap<String, Service>>>,
25    pub shutdown_tx: kill::Sender,
26}
27
28type SerdeResult<T> = Result<T>;
29
30impl Slave {
31    pub fn new(
32        master_uri: &str,
33        hostname: &str,
34        bind_address: &str,
35        port: u16,
36        name: &str,
37        param_cache: ParamCache,
38        shutdown_manager: Arc<ShutdownManager>,
39    ) -> Result<Slave> {
40        use std::net::ToSocketAddrs;
41
42        let (shutdown_tx, shutdown_rx) = kill::channel(kill::KillMode::Sync);
43        let handler =
44            SlaveHandler::new(master_uri, hostname, name, param_cache, shutdown_tx.clone());
45        let publications = handler.publications.clone();
46        let subscriptions = handler.subscriptions.clone();
47        let services = Arc::clone(&handler.services);
48        let socket_addr = match (bind_address, port).to_socket_addrs()?.next() {
49            Some(socket_addr) => socket_addr,
50            None => bail!(error::ErrorKind::from(error::rosxmlrpc::ErrorKind::BadUri(
51                format!("{}:{}", hostname, port)
52            ))),
53        };
54
55        let bound_handler = handler.bind(&socket_addr)?;
56
57        let port = bound_handler.local_addr().port();
58        let uri = format!("http://{}:{}/", hostname, port);
59
60        thread::spawn(move || {
61            loop {
62                match shutdown_rx.try_recv() {
63                    Ok(_) | Err(TryRecvError::Disconnected) => break,
64                    Err(TryRecvError::Empty) => {}
65                }
66                bound_handler.poll();
67                // TODO: use a timed out poll once rouille provides it
68                std::thread::sleep(std::time::Duration::from_millis(5));
69            }
70            shutdown_manager.shutdown();
71        });
72
73        Ok(Slave {
74            name: String::from(name),
75            uri,
76            publications,
77            subscriptions,
78            services,
79            shutdown_tx,
80        })
81    }
82
83    #[inline]
84    pub fn uri(&self) -> &str {
85        &self.uri
86    }
87
88    pub fn add_publishers_to_subscription<T>(&self, topic: &str, publishers: T) -> SerdeResult<()>
89    where
90        T: Iterator<Item = String>,
91    {
92        self.subscriptions
93            .add_publishers(topic, &self.name, publishers)
94    }
95
96    pub fn add_service<T, F>(
97        &self,
98        hostname: &str,
99        bind_address: &str,
100        service: &str,
101        handler: F,
102    ) -> SerdeResult<String>
103    where
104        T: ServicePair,
105        F: Fn(T::Request) -> ServiceResult<T::Response> + Send + Sync + 'static,
106    {
107        use std::collections::hash_map::Entry;
108        match self
109            .services
110            .lock()
111            .expect(FAILED_TO_LOCK)
112            .entry(String::from(service))
113        {
114            Entry::Occupied(..) => {
115                error!("Duplicate initiation of service '{}' attempted", service);
116                Err(ErrorKind::Duplicate("service".into()).into())
117            }
118            Entry::Vacant(entry) => {
119                let service =
120                    Service::new::<T, _>(hostname, bind_address, 0, service, &self.name, handler)?;
121                let api = service.api.clone();
122                entry.insert(service);
123                Ok(api)
124            }
125        }
126    }
127
128    #[inline]
129    pub fn remove_service(&self, service: &str) {
130        self.services.lock().expect(FAILED_TO_LOCK).remove(service);
131    }
132
133    #[inline]
134    pub fn add_publication<T>(
135        &self,
136        hostname: &str,
137        topic: &str,
138        queue_size: usize,
139        message_description: RawMessageDescription,
140    ) -> error::tcpros::Result<PublisherStream<T>>
141    where
142        T: Message,
143    {
144        self.publications
145            .add(hostname, topic, queue_size, &self.name, message_description)
146    }
147
148    #[inline]
149    pub fn remove_publication(&self, topic: &str) {
150        self.publications.remove(topic)
151    }
152
153    #[inline]
154    pub fn add_subscription<T, H>(
155        &self,
156        topic: &str,
157        queue_size: usize,
158        handler: H,
159    ) -> Result<usize>
160    where
161        T: Message,
162        H: SubscriptionHandler<T>,
163    {
164        self.subscriptions
165            .add(&self.name, topic, queue_size, handler)
166    }
167
168    #[inline]
169    pub fn remove_subscription(&self, topic: &str, id: usize) {
170        self.subscriptions.remove(topic, id)
171    }
172
173    #[inline]
174    pub fn get_publisher_count_of_subscription(&self, topic: &str) -> usize {
175        self.subscriptions.publisher_count(topic)
176    }
177
178    #[inline]
179    pub fn get_publisher_uris_of_subscription(&self, topic: &str) -> Vec<String> {
180        self.subscriptions.publisher_uris(topic)
181    }
182}