rosrust/api/
raii.rs

1use super::clock::Clock;
2use super::error::Result;
3use super::master::Master;
4use super::slave::Slave;
5use crate::api::SystemState;
6use crate::error::ErrorKind;
7use crate::rosxmlrpc::Response;
8use crate::tcpros::{Message, PublisherStream, ServicePair, ServiceResult};
9use crate::{RawMessageDescription, SubscriptionHandler};
10use log::error;
11use std::sync::atomic::AtomicUsize;
12use std::sync::Arc;
13
14#[derive(Clone)]
15pub struct Publisher<T: Message> {
16    clock: Arc<dyn Clock>,
17    seq: Arc<AtomicUsize>,
18    stream: PublisherStream<T>,
19    raii: Arc<InteractorRaii<PublisherInfo>>,
20}
21
22impl<T: Message> Publisher<T> {
23    pub(crate) fn new(
24        master: Arc<Master>,
25        slave: Arc<Slave>,
26        clock: Arc<dyn Clock>,
27        hostname: &str,
28        name: &str,
29        queue_size: usize,
30        message_description: Option<RawMessageDescription>,
31    ) -> Result<Self> {
32        let message_description =
33            message_description.unwrap_or_else(RawMessageDescription::from_message::<T>);
34        let stream =
35            slave.add_publication::<T>(hostname, name, queue_size, message_description.clone())?;
36
37        let raii = Arc::new(InteractorRaii::new(PublisherInfo {
38            master,
39            slave,
40            name: name.into(),
41        }));
42
43        raii.interactor
44            .master
45            .register_publisher(name, &message_description.msg_type)
46            .map_err(|err| {
47                error!("Failed to register publisher for topic '{}': {}", name, err);
48                err
49            })?;
50
51        Ok(Self {
52            stream,
53            clock,
54            seq: Arc::new(AtomicUsize::new(0)),
55            raii,
56        })
57    }
58
59    #[inline]
60    pub fn subscriber_count(&self) -> usize {
61        self.stream.subscriber_count()
62    }
63
64    #[inline]
65    pub fn subscriber_names(&self) -> Vec<String> {
66        self.stream.subscriber_names()
67    }
68
69    #[inline]
70    pub fn set_latching(&mut self, latching: bool) {
71        self.stream.set_latching(latching);
72    }
73
74    #[inline]
75    pub fn set_queue_size(&mut self, queue_size: usize) {
76        self.stream.set_queue_size(queue_size);
77    }
78
79    /// Wait until all the subscribers reported by rosmaster have connected
80    #[inline]
81    pub fn wait_for_subscribers(&self, timeout: Option<std::time::Duration>) -> Result<()> {
82        let timeout = timeout.map(|v| std::time::Instant::now() + v);
83        let iteration_time = std::time::Duration::from_millis(50);
84        loop {
85            let system_state: SystemState = self.raii.interactor.master.get_system_state()?.into();
86            let mut master_subs = system_state
87                .subscribers
88                .into_iter()
89                .find(|v| v.name == self.raii.interactor.name)
90                .map(|v| v.connections)
91                .unwrap_or_default();
92            master_subs.sort();
93            let mut local_subs = self.subscriber_names();
94            local_subs.sort();
95            if master_subs == local_subs {
96                return Ok(());
97            }
98            let now = std::time::Instant::now();
99            let mut wait_time = iteration_time;
100            if let Some(timeout) = &timeout {
101                let time_left = timeout
102                    .checked_duration_since(now)
103                    .ok_or(ErrorKind::TimeoutError)?;
104                wait_time = wait_time.min(time_left);
105            }
106            std::thread::sleep(wait_time);
107        }
108    }
109
110    #[inline]
111    pub fn send(&self, mut message: T) -> Result<()> {
112        message.set_header(&self.clock, &self.seq);
113        self.stream.send(&message).map_err(Into::into)
114    }
115}
116
117struct PublisherInfo {
118    master: Arc<Master>,
119    slave: Arc<Slave>,
120    name: String,
121}
122
123impl Interactor for PublisherInfo {
124    fn unregister(&mut self) -> Response<()> {
125        self.slave.remove_publication(&self.name);
126        self.master.unregister_publisher(&self.name).map(|_| ())
127    }
128}
129
130#[derive(Clone)]
131pub struct Subscriber {
132    info: Arc<InteractorRaii<SubscriberInfo>>,
133}
134
135impl Subscriber {
136    pub(crate) fn new<T, H>(
137        master: Arc<Master>,
138        slave: Arc<Slave>,
139        name: &str,
140        queue_size: usize,
141        handler: H,
142    ) -> Result<Self>
143    where
144        T: Message,
145        H: SubscriptionHandler<T>,
146    {
147        let id = slave.add_subscription::<T, H>(name, queue_size, handler)?;
148
149        let info = Arc::new(InteractorRaii::new(SubscriberInfo {
150            master,
151            slave,
152            name: name.into(),
153            id,
154        }));
155
156        let publishers = info
157            .interactor
158            .master
159            .register_subscriber(name, &T::msg_type())?;
160
161        if let Err(err) = info
162            .interactor
163            .slave
164            .add_publishers_to_subscription(name, publishers.into_iter())
165        {
166            error!(
167                "Failed to subscribe to all publishers of topic '{}': {}",
168                name, err
169            );
170        }
171
172        Ok(Self { info })
173    }
174
175    #[inline]
176    pub fn publisher_count(&self) -> usize {
177        self.info
178            .interactor
179            .slave
180            .get_publisher_count_of_subscription(&self.info.interactor.name)
181    }
182
183    #[inline]
184    pub fn publisher_uris(&self) -> Vec<String> {
185        self.info
186            .interactor
187            .slave
188            .get_publisher_uris_of_subscription(&self.info.interactor.name)
189    }
190}
191
192struct SubscriberInfo {
193    master: Arc<Master>,
194    slave: Arc<Slave>,
195    name: String,
196    id: usize,
197}
198
199impl Interactor for SubscriberInfo {
200    fn unregister(&mut self) -> Response<()> {
201        self.slave.remove_subscription(&self.name, self.id);
202        self.master.unregister_subscriber(&self.name).map(|_| ())
203    }
204}
205
206#[derive(Clone)]
207pub struct Service {
208    _raii: Arc<InteractorRaii<ServiceInfo>>,
209}
210
211impl Service {
212    pub(crate) fn new<T, F>(
213        master: Arc<Master>,
214        slave: Arc<Slave>,
215        hostname: &str,
216        bind_address: &str,
217        name: &str,
218        handler: F,
219    ) -> Result<Self>
220    where
221        T: ServicePair,
222        F: Fn(T::Request) -> ServiceResult<T::Response> + Send + Sync + 'static,
223    {
224        let api = slave.add_service::<T, F>(hostname, bind_address, name, handler)?;
225
226        let raii = Arc::new(InteractorRaii::new(ServiceInfo {
227            master,
228            slave,
229            api,
230            name: name.into(),
231        }));
232
233        raii.interactor
234            .master
235            .register_service(name, &raii.interactor.api)?;
236        Ok(Self { _raii: raii })
237    }
238}
239
240struct ServiceInfo {
241    master: Arc<Master>,
242    slave: Arc<Slave>,
243    name: String,
244    api: String,
245}
246
247impl Interactor for ServiceInfo {
248    fn unregister(&mut self) -> Response<()> {
249        self.slave.remove_service(&self.name);
250        self.master
251            .unregister_service(&self.name, &self.api)
252            .map(|_| ())
253    }
254}
255
256trait Interactor {
257    fn unregister(&mut self) -> Response<()>;
258}
259
260struct InteractorRaii<I: Interactor> {
261    pub interactor: I,
262}
263
264impl<I: Interactor> InteractorRaii<I> {
265    pub fn new(interactor: I) -> InteractorRaii<I> {
266        Self { interactor }
267    }
268}
269
270impl<I: Interactor> Drop for InteractorRaii<I> {
271    fn drop(&mut self) {
272        if let Err(e) = self.interactor.unregister() {
273            error!("Error while unloading: {:?}", e);
274        }
275    }
276}