zbus/blocking/connection/
mod.rs

1//! Blocking connection API.
2
3use enumflags2::BitFlags;
4use event_listener::EventListener;
5use static_assertions::assert_impl_all;
6use std::{io, ops::Deref};
7use zbus_names::{BusName, ErrorName, InterfaceName, MemberName, OwnedUniqueName, WellKnownName};
8use zvariant::ObjectPath;
9
10use crate::{
11    blocking::ObjectServer,
12    fdo::{ConnectionCredentials, RequestNameFlags, RequestNameReply},
13    message::Message,
14    utils::block_on,
15    DBusError, Error, Result,
16};
17
18mod builder;
19pub use builder::Builder;
20
21/// A blocking wrapper of [`zbus::Connection`].
22///
23/// Most of the API is very similar to [`zbus::Connection`], except it's blocking.
24#[derive(Debug, Clone)]
25#[must_use = "Dropping a `Connection` will close the underlying socket."]
26pub struct Connection {
27    inner: crate::Connection,
28}
29
30assert_impl_all!(Connection: Send, Sync, Unpin);
31
32impl Connection {
33    /// Create a `Connection` to the session/user message bus.
34    pub fn session() -> Result<Self> {
35        block_on(crate::Connection::session()).map(Self::from)
36    }
37
38    /// Create a `Connection` to the system-wide message bus.
39    pub fn system() -> Result<Self> {
40        block_on(crate::Connection::system()).map(Self::from)
41    }
42
43    /// The capacity of the main (unfiltered) queue.
44    pub fn max_queued(&self) -> usize {
45        self.inner.max_queued()
46    }
47
48    /// Set the capacity of the main (unfiltered) queue.
49    pub fn set_max_queued(mut self, max: usize) {
50        self.inner.set_max_queued(max)
51    }
52
53    /// The server's GUID.
54    pub fn server_guid(&self) -> &str {
55        self.inner.server_guid()
56    }
57
58    /// The unique name as assigned by the message bus or `None` if not a message bus connection.
59    pub fn unique_name(&self) -> Option<&OwnedUniqueName> {
60        self.inner.unique_name()
61    }
62
63    /// Send `msg` to the peer.
64    pub fn send(&self, msg: &Message) -> Result<()> {
65        block_on(self.inner.send(msg))
66    }
67
68    /// Send a method call.
69    ///
70    /// Create a method-call message, send it over the connection, then wait for the reply.
71    ///
72    /// On successful reply, an `Ok(Message)` is returned. On error, an `Err` is returned. D-Bus
73    /// error replies are returned as [`Error::MethodError`].
74    pub fn call_method<'d, 'p, 'i, 'm, D, P, I, M, B>(
75        &self,
76        destination: Option<D>,
77        path: P,
78        iface: Option<I>,
79        method_name: M,
80        body: &B,
81    ) -> Result<Message>
82    where
83        D: TryInto<BusName<'d>>,
84        P: TryInto<ObjectPath<'p>>,
85        I: TryInto<InterfaceName<'i>>,
86        M: TryInto<MemberName<'m>>,
87        D::Error: Into<Error>,
88        P::Error: Into<Error>,
89        I::Error: Into<Error>,
90        M::Error: Into<Error>,
91        B: serde::ser::Serialize + zvariant::DynamicType,
92    {
93        block_on(
94            self.inner
95                .call_method(destination, path, iface, method_name, body),
96        )
97    }
98
99    /// Emit a signal.
100    ///
101    /// Create a signal message, and send it over the connection.
102    pub fn emit_signal<'d, 'p, 'i, 'm, D, P, I, M, B>(
103        &self,
104        destination: Option<D>,
105        path: P,
106        iface: I,
107        signal_name: M,
108        body: &B,
109    ) -> Result<()>
110    where
111        D: TryInto<BusName<'d>>,
112        P: TryInto<ObjectPath<'p>>,
113        I: TryInto<InterfaceName<'i>>,
114        M: TryInto<MemberName<'m>>,
115        D::Error: Into<Error>,
116        P::Error: Into<Error>,
117        I::Error: Into<Error>,
118        M::Error: Into<Error>,
119        B: serde::ser::Serialize + zvariant::DynamicType,
120    {
121        block_on(
122            self.inner
123                .emit_signal(destination, path, iface, signal_name, body),
124        )
125    }
126
127    /// Reply to a message.
128    ///
129    /// Given an existing message (likely a method call), send a reply back to the caller with the
130    /// given `body`.
131    pub fn reply<B>(&self, call: &Message, body: &B) -> Result<()>
132    where
133        B: serde::ser::Serialize + zvariant::DynamicType,
134    {
135        block_on(self.inner.reply(call, body))
136    }
137
138    /// Reply an error to a message.
139    ///
140    /// Given an existing message (likely a method call), send an error reply back to the caller
141    /// with the given `error_name` and `body`.
142    ///
143    /// Returns the message serial number.
144    pub fn reply_error<'e, E, B>(&self, call: &Message, error_name: E, body: &B) -> Result<()>
145    where
146        B: serde::ser::Serialize + zvariant::DynamicType,
147        E: TryInto<ErrorName<'e>>,
148        E::Error: Into<Error>,
149    {
150        block_on(self.inner.reply_error(call, error_name, body))
151    }
152
153    /// Reply to a method call with an error.
154    ///
155    /// Given an existing method call message header, send an error reply back to the caller
156    /// using one of the standard interface reply types.
157    ///
158    /// Returns the message serial number.
159    pub fn reply_dbus_error(
160        &self,
161        call: &zbus::message::Header<'_>,
162        err: impl DBusError,
163    ) -> Result<()> {
164        block_on(self.inner.reply_dbus_error(call, err))
165    }
166
167    /// Register a well-known name for this service on the bus.
168    ///
169    /// Blocking version of [`crate::Connection::request_name`]. See docs there for more details
170    /// and caveats.
171    pub fn request_name<'w, W>(&self, well_known_name: W) -> Result<()>
172    where
173        W: TryInto<WellKnownName<'w>>,
174        W::Error: Into<Error>,
175    {
176        block_on(self.inner.request_name(well_known_name))
177    }
178
179    /// Register a well-known name for this service on the bus.
180    ///
181    /// Blocking version of [`crate::Connection::request_name_with_flags`]. See docs there for more
182    /// details and caveats.
183    pub fn request_name_with_flags<'w, W>(
184        &self,
185        well_known_name: W,
186        flags: BitFlags<RequestNameFlags>,
187    ) -> Result<RequestNameReply>
188    where
189        W: TryInto<WellKnownName<'w>>,
190        W::Error: Into<Error>,
191    {
192        block_on(self.inner.request_name_with_flags(well_known_name, flags))
193    }
194
195    /// Deregister a previously registered well-known name for this service on the bus.
196    ///
197    /// Use this method to deregister a well-known name, registered through
198    /// [`Connection::request_name`].
199    ///
200    /// Unless an error is encountered, returns `Ok(true)` if name was previously registered with
201    /// the bus through `self` and it has now been successfully deregistered, `Ok(false)` if name
202    /// was not previously registered or already deregistered.
203    pub fn release_name<'w, W>(&self, well_known_name: W) -> Result<bool>
204    where
205        W: TryInto<WellKnownName<'w>>,
206        W::Error: Into<Error>,
207    {
208        block_on(self.inner.release_name(well_known_name))
209    }
210
211    /// Checks if `self` is a connection to a message bus.
212    ///
213    /// This will return `false` for p2p connections.
214    pub fn is_bus(&self) -> bool {
215        self.inner.is_bus()
216    }
217
218    /// Get a reference to the associated [`ObjectServer`].
219    ///
220    /// The `ObjectServer` is created on-demand.
221    pub fn object_server(&self) -> impl Deref<Target = ObjectServer> + '_ {
222        self.inner.sync_object_server(true, None)
223    }
224
225    /// Get a reference to the underlying async Connection.
226    pub fn inner(&self) -> &crate::Connection {
227        &self.inner
228    }
229
230    /// Get the underlying async Connection, consuming `self`.
231    pub fn into_inner(self) -> crate::Connection {
232        self.inner
233    }
234
235    /// Returns a listener, notified on various connection activity.
236    ///
237    /// This function is meant for the caller to implement idle or timeout on inactivity.
238    pub fn monitor_activity(&self) -> EventListener {
239        self.inner.monitor_activity()
240    }
241
242    /// Returns the peer credentials.
243    ///
244    /// The fields are populated on the best effort basis. Some or all fields may not even make
245    /// sense for certain sockets or on certain platforms and hence will be set to `None`.
246    ///
247    /// # Caveats
248    ///
249    /// Currently `unix_group_ids` and `linux_security_label` fields are not populated.
250    pub fn peer_credentials(&self) -> io::Result<ConnectionCredentials> {
251        block_on(self.inner.peer_credentials())
252    }
253
254    /// Close the connection.
255    ///
256    /// After this call, all reading and writing operations will fail.
257    pub fn close(self) -> Result<()> {
258        block_on(self.inner.close())
259    }
260}
261
262impl From<crate::Connection> for Connection {
263    fn from(conn: crate::Connection) -> Self {
264        Self { inner: conn }
265    }
266}
267
268#[cfg(feature = "p2p")]
269#[cfg(all(test, unix))]
270mod tests {
271    use event_listener::Listener;
272    use ntest::timeout;
273    #[cfg(all(unix, not(feature = "tokio")))]
274    use std::os::unix::net::UnixStream;
275    use std::thread;
276    use test_log::test;
277    #[cfg(all(unix, feature = "tokio"))]
278    use tokio::net::UnixStream;
279    #[cfg(all(windows, not(feature = "tokio")))]
280    use uds_windows::UnixStream;
281
282    use crate::{
283        blocking::{connection::Builder, MessageIterator},
284        Guid,
285    };
286
287    #[test]
288    #[timeout(15000)]
289    fn unix_p2p() {
290        let guid = Guid::generate();
291
292        // Tokio needs us to call the sync function from async context. :shrug:
293        let (p0, p1) = crate::utils::block_on(async { UnixStream::pair().unwrap() });
294
295        let (tx, rx) = std::sync::mpsc::channel();
296        let server_thread = thread::spawn(move || {
297            let c = Builder::unix_stream(p0)
298                .server(guid)
299                .unwrap()
300                .p2p()
301                .build()
302                .unwrap();
303            rx.recv().unwrap();
304            let reply = c
305                .call_method(None::<()>, "/", Some("org.zbus.p2p"), "Test", &())
306                .unwrap();
307            assert_eq!(reply.to_string(), "Method return");
308            let val: String = reply.body().deserialize().unwrap();
309            val
310        });
311
312        let c = Builder::unix_stream(p1).p2p().build().unwrap();
313
314        let listener = c.monitor_activity();
315
316        let mut s = MessageIterator::from(&c);
317        tx.send(()).unwrap();
318        let m = s.next().unwrap().unwrap();
319        assert_eq!(m.to_string(), "Method call Test");
320        c.reply(&m, &("yay")).unwrap();
321
322        for _ in s {}
323
324        let val = server_thread.join().expect("failed to join server thread");
325        assert_eq!(val, "yay");
326
327        // there was some activity
328        listener.wait();
329        // eventually, nothing happens and it will timeout
330        loop {
331            let listener = c.monitor_activity();
332            if listener
333                .wait_timeout(std::time::Duration::from_millis(10))
334                .is_none()
335            {
336                break;
337            }
338        }
339    }
340}