rosrust/api/slave/
handler.rs

1use super::publications::PublicationsTracker;
2use super::subscriptions::SubscriptionsTracker;
3use crate::rosxmlrpc::{self, Response, ResponseError, Server};
4use crate::tcpros::Service;
5use crate::util::{kill, FAILED_TO_LOCK};
6use log::{error, info};
7use std::collections::HashMap;
8use std::net::SocketAddr;
9use std::sync::{Arc, Mutex};
10use xml_rpc::{self, rouille, Params, Value};
11
12pub struct SlaveHandler {
13    pub subscriptions: SubscriptionsTracker,
14    pub publications: PublicationsTracker,
15    pub services: Arc<Mutex<HashMap<String, Service>>>,
16    server: Server,
17}
18
19fn unwrap_array_case(params: Params) -> Params {
20    if let Some(Value::Array(items)) = params.get(0) {
21        return items.clone();
22    }
23    params
24}
25
26#[derive(Default)]
27pub struct ParamCacheState {
28    pub data: HashMap<String, Response<Value>>,
29    pub subscribed: bool,
30}
31
32pub type ParamCache = Arc<Mutex<ParamCacheState>>;
33
34impl SlaveHandler {
35    pub fn new(
36        master_uri: &str,
37        hostname: &str,
38        name: &str,
39        param_cache: ParamCache,
40        shutdown_signal: kill::Sender,
41    ) -> SlaveHandler {
42        let mut server = Server::default();
43
44        server.register_value("getBusStats", "Bus stats", |_args| {
45            // TODO: implement actual stats displaying
46            Err(ResponseError::Server("Method not implemented".into()))
47        });
48
49        server.register_value("getBusInfo", "Bus info", |_args| {
50            // TODO: implement actual info displaying
51            Err(ResponseError::Server("Method not implemented".into()))
52        });
53
54        let master_uri_string = String::from(master_uri);
55
56        server.register_value("getMasterUri", "Master URI", move |_args| {
57            Ok(Value::String(master_uri_string.clone()))
58        });
59
60        server.register_value("shutdown", "Shutdown", move |args| {
61            let mut args = unwrap_array_case(args).into_iter();
62            let _caller_id = args
63                .next()
64                .ok_or_else(|| ResponseError::Client("Missing argument 'caller_id'".into()))?;
65            let message = match args.next() {
66                Some(Value::String(message)) => message,
67                _ => return Err(ResponseError::Client("Missing argument 'message'".into())),
68            };
69            info!("Server is shutting down because: {}", message);
70            match shutdown_signal.send() {
71                Ok(()) => Ok(Value::Int(0)),
72                Err(err) => {
73                    error!("Shutdown error: {:?}", err);
74                    Err(ResponseError::Server("Failed to shut down".into()))
75                }
76            }
77        });
78
79        server.register_value("getPid", "PID", |_args| {
80            Ok(Value::Int(std::process::id() as i32))
81        });
82
83        let subscriptions = SubscriptionsTracker::default();
84        let subs = subscriptions.clone();
85
86        server.register_value("getSubscriptions", "List of subscriptions", move |_args| {
87            Ok(Value::Array(
88                subs.get_topics::<Vec<_>>()
89                    .into_iter()
90                    .map(|topic| {
91                        Value::Array(vec![
92                            Value::String(topic.name),
93                            Value::String(topic.msg_type),
94                        ])
95                    })
96                    .collect(),
97            ))
98        });
99
100        let publications = PublicationsTracker::default();
101        let pubs = publications.clone();
102
103        server.register_value("getPublications", "List of publications", move |_args| {
104            Ok(Value::Array(
105                pubs.get_topics::<Vec<_>>()
106                    .into_iter()
107                    .map(|topic| {
108                        Value::Array(vec![
109                            Value::String(topic.name),
110                            Value::String(topic.msg_type),
111                        ])
112                    })
113                    .collect(),
114            ))
115        });
116
117        server.register_value("paramUpdate", "Parameter updated", move |args| {
118            let mut args = unwrap_array_case(args).into_iter();
119            let _caller_id = args
120                .next()
121                .ok_or_else(|| ResponseError::Client("Missing argument 'caller_id'".into()))?;
122            let parameter_key = match args.next() {
123                Some(Value::String(parameter_key)) => parameter_key,
124                _ => {
125                    return Err(ResponseError::Client(
126                        "Missing argument 'parameter_key'".into(),
127                    ))
128                }
129            };
130            let _parameter_value = match args.next() {
131                Some(parameter_value) => parameter_value,
132                _ => {
133                    return Err(ResponseError::Client(
134                        "Missing argument 'parameter_key'".into(),
135                    ))
136                }
137            };
138            let key = parameter_key.trim_end_matches('/');
139            param_cache
140                .lock()
141                .expect(FAILED_TO_LOCK)
142                .data
143                .retain(|k, _| !k.starts_with(key) && !key.starts_with(k));
144            Ok(Value::Int(0))
145        });
146
147        let name_string = String::from(name);
148        let subs = subscriptions.clone();
149
150        server.register_value("publisherUpdate", "Publishers updated", move |args| {
151            let mut args = unwrap_array_case(args).into_iter();
152            let _caller_id = args
153                .next()
154                .ok_or_else(|| ResponseError::Client("Missing argument 'caller_id'".into()))?;
155            let topic = match args.next() {
156                Some(Value::String(topic)) => topic,
157                _ => return Err(ResponseError::Client("Missing argument 'topic'".into())),
158            };
159            let publishers = match args.next() {
160                Some(Value::Array(publishers)) => publishers,
161                _ => {
162                    return Err(ResponseError::Client(
163                        "Missing argument 'publishers'".into(),
164                    ));
165                }
166            };
167            let publishers = publishers
168                .into_iter()
169                .map(|v| match v {
170                    Value::String(x) => Ok(x),
171                    _ => Err(ResponseError::Client(
172                        "Publishers need to be strings".into(),
173                    )),
174                })
175                .collect::<Response<Vec<String>>>()?;
176
177            subs.add_publishers(&topic, &name_string, publishers.into_iter())
178                .map_err(|v| {
179                    ResponseError::Server(format!("Failed to handle publishers: {}", v))
180                })?;
181            Ok(Value::Int(0))
182        });
183
184        let hostname_string = String::from(hostname);
185        let pubs = publications.clone();
186
187        server.register_value("requestTopic", "Chosen protocol", move |args| {
188            let mut args = unwrap_array_case(args).into_iter();
189            let _caller_id = args
190                .next()
191                .ok_or_else(|| ResponseError::Client("Missing argument 'caller_id'".into()))?;
192            let topic = match args.next() {
193                Some(Value::String(topic)) => topic,
194                _ => return Err(ResponseError::Client("Missing argument 'topic'".into())),
195            };
196            let protocols = match args.next() {
197                Some(Value::Array(protocols)) => protocols,
198                Some(_) => {
199                    return Err(ResponseError::Client(
200                        "Protocols need to be provided as [ [String, XmlRpcLegalValue] ]".into(),
201                    ));
202                }
203                None => return Err(ResponseError::Client("Missing argument 'protocols'".into())),
204            };
205            let port = pubs.get_port(&topic).ok_or_else(|| {
206                ResponseError::Client("Requested topic not published by node".into())
207            })?;
208            let ip = hostname_string.clone();
209            let mut has_tcpros = false;
210            for protocol in protocols {
211                if let Value::Array(protocol) = protocol {
212                    if let Some(Value::String(name)) = protocol.get(0) {
213                        has_tcpros |= name == "TCPROS";
214                    }
215                }
216            }
217            if has_tcpros {
218                Ok(Value::Array(vec![
219                    Value::String("TCPROS".into()),
220                    Value::String(ip),
221                    Value::Int(port),
222                ]))
223            } else {
224                Err(ResponseError::Server(
225                    "No matching protocols available".into(),
226                ))
227            }
228        });
229
230        SlaveHandler {
231            subscriptions,
232            publications,
233            services: Arc::new(Mutex::new(HashMap::new())),
234            server,
235        }
236    }
237
238    pub fn bind(
239        self,
240        addr: &SocketAddr,
241    ) -> rosxmlrpc::error::Result<
242        xml_rpc::server::BoundServer<
243            impl Fn(&rouille::Request) -> rouille::Response + Send + Sync + 'static,
244        >,
245    > {
246        self.server.bind(addr).map_err(Into::into)
247    }
248}
249
250#[allow(dead_code)]
251pub struct BusStats {
252    pub publish: Vec<PublishStats>,
253    pub subscribe: Vec<SubscribeStats>,
254    pub service: ServiceStats,
255}
256
257#[allow(dead_code)]
258pub struct PublishStats {
259    pub name: String,
260    pub data_sent: String,
261    pub connection_data: PublishConnectionData,
262}
263
264#[allow(dead_code)]
265pub struct PublishConnectionData {
266    pub connection_id: String,
267    pub bytes_sent: i32,
268    pub number_sent: i32,
269    pub connected: bool,
270}
271
272#[allow(dead_code)]
273pub struct SubscribeStats {
274    pub name: String,
275    pub connection_data: SubscribeConnectionData,
276}
277
278#[allow(dead_code)]
279pub struct SubscribeConnectionData {
280    pub connection_id: String,
281    pub bytes_received: i32,
282    pub drop_estimate: i32,
283    pub connected: bool,
284}
285
286#[allow(dead_code)]
287pub struct ServiceStats {
288    pub number_of_requests: i32,
289    pub bytes_received: i32,
290    pub bytes_sent: i32,
291}
292
293#[allow(dead_code)]
294pub struct BusInfo {
295    pub connection_id: String,
296    pub destination_id: String,
297    pub direction: String,
298    pub transport: String,
299    pub topic: String,
300    pub connected: bool,
301}