1use super::clock::Clock;
2use super::error::Result;
3use super::master::Master;
4use super::slave::Slave;
5use crate::api::SystemState;
6use crate::error::ErrorKind;
7use crate::rosxmlrpc::Response;
8use crate::tcpros::{Message, PublisherStream, ServicePair, ServiceResult};
9use crate::{RawMessageDescription, SubscriptionHandler};
10use log::error;
11use std::sync::atomic::AtomicUsize;
12use std::sync::Arc;
13
14#[derive(Clone)]
15pub struct Publisher<T: Message> {
16 clock: Arc<dyn Clock>,
17 seq: Arc<AtomicUsize>,
18 stream: PublisherStream<T>,
19 raii: Arc<InteractorRaii<PublisherInfo>>,
20}
21
22impl<T: Message> Publisher<T> {
23 pub(crate) fn new(
24 master: Arc<Master>,
25 slave: Arc<Slave>,
26 clock: Arc<dyn Clock>,
27 hostname: &str,
28 name: &str,
29 queue_size: usize,
30 message_description: Option<RawMessageDescription>,
31 ) -> Result<Self> {
32 let message_description =
33 message_description.unwrap_or_else(RawMessageDescription::from_message::<T>);
34 let stream =
35 slave.add_publication::<T>(hostname, name, queue_size, message_description.clone())?;
36
37 let raii = Arc::new(InteractorRaii::new(PublisherInfo {
38 master,
39 slave,
40 name: name.into(),
41 }));
42
43 raii.interactor
44 .master
45 .register_publisher(name, &message_description.msg_type)
46 .map_err(|err| {
47 error!("Failed to register publisher for topic '{}': {}", name, err);
48 err
49 })?;
50
51 Ok(Self {
52 stream,
53 clock,
54 seq: Arc::new(AtomicUsize::new(0)),
55 raii,
56 })
57 }
58
59 #[inline]
60 pub fn subscriber_count(&self) -> usize {
61 self.stream.subscriber_count()
62 }
63
64 #[inline]
65 pub fn subscriber_names(&self) -> Vec<String> {
66 self.stream.subscriber_names()
67 }
68
69 #[inline]
70 pub fn set_latching(&mut self, latching: bool) {
71 self.stream.set_latching(latching);
72 }
73
74 #[inline]
75 pub fn set_queue_size(&mut self, queue_size: usize) {
76 self.stream.set_queue_size(queue_size);
77 }
78
79 #[inline]
81 pub fn wait_for_subscribers(&self, timeout: Option<std::time::Duration>) -> Result<()> {
82 let timeout = timeout.map(|v| std::time::Instant::now() + v);
83 let iteration_time = std::time::Duration::from_millis(50);
84 loop {
85 let system_state: SystemState = self.raii.interactor.master.get_system_state()?.into();
86 let mut master_subs = system_state
87 .subscribers
88 .into_iter()
89 .find(|v| v.name == self.raii.interactor.name)
90 .map(|v| v.connections)
91 .unwrap_or_default();
92 master_subs.sort();
93 let mut local_subs = self.subscriber_names();
94 local_subs.sort();
95 if master_subs == local_subs {
96 return Ok(());
97 }
98 let now = std::time::Instant::now();
99 let mut wait_time = iteration_time;
100 if let Some(timeout) = &timeout {
101 let time_left = timeout
102 .checked_duration_since(now)
103 .ok_or(ErrorKind::TimeoutError)?;
104 wait_time = wait_time.min(time_left);
105 }
106 std::thread::sleep(wait_time);
107 }
108 }
109
110 #[inline]
111 pub fn send(&self, mut message: T) -> Result<()> {
112 message.set_header(&self.clock, &self.seq);
113 self.stream.send(&message).map_err(Into::into)
114 }
115}
116
117struct PublisherInfo {
118 master: Arc<Master>,
119 slave: Arc<Slave>,
120 name: String,
121}
122
123impl Interactor for PublisherInfo {
124 fn unregister(&mut self) -> Response<()> {
125 self.slave.remove_publication(&self.name);
126 self.master.unregister_publisher(&self.name).map(|_| ())
127 }
128}
129
130#[derive(Clone)]
131pub struct Subscriber {
132 info: Arc<InteractorRaii<SubscriberInfo>>,
133}
134
135impl Subscriber {
136 pub(crate) fn new<T, H>(
137 master: Arc<Master>,
138 slave: Arc<Slave>,
139 name: &str,
140 queue_size: usize,
141 handler: H,
142 ) -> Result<Self>
143 where
144 T: Message,
145 H: SubscriptionHandler<T>,
146 {
147 let id = slave.add_subscription::<T, H>(name, queue_size, handler)?;
148
149 let info = Arc::new(InteractorRaii::new(SubscriberInfo {
150 master,
151 slave,
152 name: name.into(),
153 id,
154 }));
155
156 let publishers = info
157 .interactor
158 .master
159 .register_subscriber(name, &T::msg_type())?;
160
161 if let Err(err) = info
162 .interactor
163 .slave
164 .add_publishers_to_subscription(name, publishers.into_iter())
165 {
166 error!(
167 "Failed to subscribe to all publishers of topic '{}': {}",
168 name, err
169 );
170 }
171
172 Ok(Self { info })
173 }
174
175 #[inline]
176 pub fn publisher_count(&self) -> usize {
177 self.info
178 .interactor
179 .slave
180 .get_publisher_count_of_subscription(&self.info.interactor.name)
181 }
182
183 #[inline]
184 pub fn publisher_uris(&self) -> Vec<String> {
185 self.info
186 .interactor
187 .slave
188 .get_publisher_uris_of_subscription(&self.info.interactor.name)
189 }
190}
191
192struct SubscriberInfo {
193 master: Arc<Master>,
194 slave: Arc<Slave>,
195 name: String,
196 id: usize,
197}
198
199impl Interactor for SubscriberInfo {
200 fn unregister(&mut self) -> Response<()> {
201 self.slave.remove_subscription(&self.name, self.id);
202 self.master.unregister_subscriber(&self.name).map(|_| ())
203 }
204}
205
206#[derive(Clone)]
207pub struct Service {
208 _raii: Arc<InteractorRaii<ServiceInfo>>,
209}
210
211impl Service {
212 pub(crate) fn new<T, F>(
213 master: Arc<Master>,
214 slave: Arc<Slave>,
215 hostname: &str,
216 bind_address: &str,
217 name: &str,
218 handler: F,
219 ) -> Result<Self>
220 where
221 T: ServicePair,
222 F: Fn(T::Request) -> ServiceResult<T::Response> + Send + Sync + 'static,
223 {
224 let api = slave.add_service::<T, F>(hostname, bind_address, name, handler)?;
225
226 let raii = Arc::new(InteractorRaii::new(ServiceInfo {
227 master,
228 slave,
229 api,
230 name: name.into(),
231 }));
232
233 raii.interactor
234 .master
235 .register_service(name, &raii.interactor.api)?;
236 Ok(Self { _raii: raii })
237 }
238}
239
240struct ServiceInfo {
241 master: Arc<Master>,
242 slave: Arc<Slave>,
243 name: String,
244 api: String,
245}
246
247impl Interactor for ServiceInfo {
248 fn unregister(&mut self) -> Response<()> {
249 self.slave.remove_service(&self.name);
250 self.master
251 .unregister_service(&self.name, &self.api)
252 .map(|_| ())
253 }
254}
255
256trait Interactor {
257 fn unregister(&mut self) -> Response<()>;
258}
259
260struct InteractorRaii<I: Interactor> {
261 pub interactor: I,
262}
263
264impl<I: Interactor> InteractorRaii<I> {
265 pub fn new(interactor: I) -> InteractorRaii<I> {
266 Self { interactor }
267 }
268}
269
270impl<I: Interactor> Drop for InteractorRaii<I> {
271 fn drop(&mut self) {
272 if let Err(e) = self.interactor.unregister() {
273 error!("Error while unloading: {:?}", e);
274 }
275 }
276}