1use super::publications::PublicationsTracker;
2use super::subscriptions::SubscriptionsTracker;
3use crate::rosxmlrpc::{self, Response, ResponseError, Server};
4use crate::tcpros::Service;
5use crate::util::{kill, FAILED_TO_LOCK};
6use log::{error, info};
7use std::collections::HashMap;
8use std::net::SocketAddr;
9use std::sync::{Arc, Mutex};
10use xml_rpc::{self, rouille, Params, Value};
11
12pub struct SlaveHandler {
13 pub subscriptions: SubscriptionsTracker,
14 pub publications: PublicationsTracker,
15 pub services: Arc<Mutex<HashMap<String, Service>>>,
16 server: Server,
17}
18
19fn unwrap_array_case(params: Params) -> Params {
20 if let Some(Value::Array(items)) = params.get(0) {
21 return items.clone();
22 }
23 params
24}
25
26#[derive(Default)]
27pub struct ParamCacheState {
28 pub data: HashMap<String, Response<Value>>,
29 pub subscribed: bool,
30}
31
32pub type ParamCache = Arc<Mutex<ParamCacheState>>;
33
34impl SlaveHandler {
35 pub fn new(
36 master_uri: &str,
37 hostname: &str,
38 name: &str,
39 param_cache: ParamCache,
40 shutdown_signal: kill::Sender,
41 ) -> SlaveHandler {
42 let mut server = Server::default();
43
44 server.register_value("getBusStats", "Bus stats", |_args| {
45 Err(ResponseError::Server("Method not implemented".into()))
47 });
48
49 server.register_value("getBusInfo", "Bus info", |_args| {
50 Err(ResponseError::Server("Method not implemented".into()))
52 });
53
54 let master_uri_string = String::from(master_uri);
55
56 server.register_value("getMasterUri", "Master URI", move |_args| {
57 Ok(Value::String(master_uri_string.clone()))
58 });
59
60 server.register_value("shutdown", "Shutdown", move |args| {
61 let mut args = unwrap_array_case(args).into_iter();
62 let _caller_id = args
63 .next()
64 .ok_or_else(|| ResponseError::Client("Missing argument 'caller_id'".into()))?;
65 let message = match args.next() {
66 Some(Value::String(message)) => message,
67 _ => return Err(ResponseError::Client("Missing argument 'message'".into())),
68 };
69 info!("Server is shutting down because: {}", message);
70 match shutdown_signal.send() {
71 Ok(()) => Ok(Value::Int(0)),
72 Err(err) => {
73 error!("Shutdown error: {:?}", err);
74 Err(ResponseError::Server("Failed to shut down".into()))
75 }
76 }
77 });
78
79 server.register_value("getPid", "PID", |_args| {
80 Ok(Value::Int(std::process::id() as i32))
81 });
82
83 let subscriptions = SubscriptionsTracker::default();
84 let subs = subscriptions.clone();
85
86 server.register_value("getSubscriptions", "List of subscriptions", move |_args| {
87 Ok(Value::Array(
88 subs.get_topics::<Vec<_>>()
89 .into_iter()
90 .map(|topic| {
91 Value::Array(vec![
92 Value::String(topic.name),
93 Value::String(topic.msg_type),
94 ])
95 })
96 .collect(),
97 ))
98 });
99
100 let publications = PublicationsTracker::default();
101 let pubs = publications.clone();
102
103 server.register_value("getPublications", "List of publications", move |_args| {
104 Ok(Value::Array(
105 pubs.get_topics::<Vec<_>>()
106 .into_iter()
107 .map(|topic| {
108 Value::Array(vec![
109 Value::String(topic.name),
110 Value::String(topic.msg_type),
111 ])
112 })
113 .collect(),
114 ))
115 });
116
117 server.register_value("paramUpdate", "Parameter updated", move |args| {
118 let mut args = unwrap_array_case(args).into_iter();
119 let _caller_id = args
120 .next()
121 .ok_or_else(|| ResponseError::Client("Missing argument 'caller_id'".into()))?;
122 let parameter_key = match args.next() {
123 Some(Value::String(parameter_key)) => parameter_key,
124 _ => {
125 return Err(ResponseError::Client(
126 "Missing argument 'parameter_key'".into(),
127 ))
128 }
129 };
130 let _parameter_value = match args.next() {
131 Some(parameter_value) => parameter_value,
132 _ => {
133 return Err(ResponseError::Client(
134 "Missing argument 'parameter_key'".into(),
135 ))
136 }
137 };
138 let key = parameter_key.trim_end_matches('/');
139 param_cache
140 .lock()
141 .expect(FAILED_TO_LOCK)
142 .data
143 .retain(|k, _| !k.starts_with(key) && !key.starts_with(k));
144 Ok(Value::Int(0))
145 });
146
147 let name_string = String::from(name);
148 let subs = subscriptions.clone();
149
150 server.register_value("publisherUpdate", "Publishers updated", move |args| {
151 let mut args = unwrap_array_case(args).into_iter();
152 let _caller_id = args
153 .next()
154 .ok_or_else(|| ResponseError::Client("Missing argument 'caller_id'".into()))?;
155 let topic = match args.next() {
156 Some(Value::String(topic)) => topic,
157 _ => return Err(ResponseError::Client("Missing argument 'topic'".into())),
158 };
159 let publishers = match args.next() {
160 Some(Value::Array(publishers)) => publishers,
161 _ => {
162 return Err(ResponseError::Client(
163 "Missing argument 'publishers'".into(),
164 ));
165 }
166 };
167 let publishers = publishers
168 .into_iter()
169 .map(|v| match v {
170 Value::String(x) => Ok(x),
171 _ => Err(ResponseError::Client(
172 "Publishers need to be strings".into(),
173 )),
174 })
175 .collect::<Response<Vec<String>>>()?;
176
177 subs.add_publishers(&topic, &name_string, publishers.into_iter())
178 .map_err(|v| {
179 ResponseError::Server(format!("Failed to handle publishers: {}", v))
180 })?;
181 Ok(Value::Int(0))
182 });
183
184 let hostname_string = String::from(hostname);
185 let pubs = publications.clone();
186
187 server.register_value("requestTopic", "Chosen protocol", move |args| {
188 let mut args = unwrap_array_case(args).into_iter();
189 let _caller_id = args
190 .next()
191 .ok_or_else(|| ResponseError::Client("Missing argument 'caller_id'".into()))?;
192 let topic = match args.next() {
193 Some(Value::String(topic)) => topic,
194 _ => return Err(ResponseError::Client("Missing argument 'topic'".into())),
195 };
196 let protocols = match args.next() {
197 Some(Value::Array(protocols)) => protocols,
198 Some(_) => {
199 return Err(ResponseError::Client(
200 "Protocols need to be provided as [ [String, XmlRpcLegalValue] ]".into(),
201 ));
202 }
203 None => return Err(ResponseError::Client("Missing argument 'protocols'".into())),
204 };
205 let port = pubs.get_port(&topic).ok_or_else(|| {
206 ResponseError::Client("Requested topic not published by node".into())
207 })?;
208 let ip = hostname_string.clone();
209 let mut has_tcpros = false;
210 for protocol in protocols {
211 if let Value::Array(protocol) = protocol {
212 if let Some(Value::String(name)) = protocol.get(0) {
213 has_tcpros |= name == "TCPROS";
214 }
215 }
216 }
217 if has_tcpros {
218 Ok(Value::Array(vec![
219 Value::String("TCPROS".into()),
220 Value::String(ip),
221 Value::Int(port),
222 ]))
223 } else {
224 Err(ResponseError::Server(
225 "No matching protocols available".into(),
226 ))
227 }
228 });
229
230 SlaveHandler {
231 subscriptions,
232 publications,
233 services: Arc::new(Mutex::new(HashMap::new())),
234 server,
235 }
236 }
237
238 pub fn bind(
239 self,
240 addr: &SocketAddr,
241 ) -> rosxmlrpc::error::Result<
242 xml_rpc::server::BoundServer<
243 impl Fn(&rouille::Request) -> rouille::Response + Send + Sync + 'static,
244 >,
245 > {
246 self.server.bind(addr).map_err(Into::into)
247 }
248}
249
250#[allow(dead_code)]
251pub struct BusStats {
252 pub publish: Vec<PublishStats>,
253 pub subscribe: Vec<SubscribeStats>,
254 pub service: ServiceStats,
255}
256
257#[allow(dead_code)]
258pub struct PublishStats {
259 pub name: String,
260 pub data_sent: String,
261 pub connection_data: PublishConnectionData,
262}
263
264#[allow(dead_code)]
265pub struct PublishConnectionData {
266 pub connection_id: String,
267 pub bytes_sent: i32,
268 pub number_sent: i32,
269 pub connected: bool,
270}
271
272#[allow(dead_code)]
273pub struct SubscribeStats {
274 pub name: String,
275 pub connection_data: SubscribeConnectionData,
276}
277
278#[allow(dead_code)]
279pub struct SubscribeConnectionData {
280 pub connection_id: String,
281 pub bytes_received: i32,
282 pub drop_estimate: i32,
283 pub connected: bool,
284}
285
286#[allow(dead_code)]
287pub struct ServiceStats {
288 pub number_of_requests: i32,
289 pub bytes_received: i32,
290 pub bytes_sent: i32,
291}
292
293#[allow(dead_code)]
294pub struct BusInfo {
295 pub connection_id: String,
296 pub destination_id: String,
297 pub direction: String,
298 pub transport: String,
299 pub topic: String,
300 pub connected: bool,
301}