1//! Blocking connection API.
23use enumflags2::BitFlags;
4use event_listener::EventListener;
5use static_assertions::assert_impl_all;
6use std::{io, ops::Deref};
7use zbus_names::{BusName, ErrorName, InterfaceName, MemberName, OwnedUniqueName, WellKnownName};
8use zvariant::ObjectPath;
910use crate::{
11 blocking::ObjectServer,
12 fdo::{ConnectionCredentials, RequestNameFlags, RequestNameReply},
13 message::Message,
14 utils::block_on,
15 DBusError, Error, Result,
16};
1718mod builder;
19pub use builder::Builder;
2021/// A blocking wrapper of [`zbus::Connection`].
22///
23/// Most of the API is very similar to [`zbus::Connection`], except it's blocking.
24#[derive(Debug, Clone)]
25#[must_use = "Dropping a `Connection` will close the underlying socket."]
26pub struct Connection {
27 inner: crate::Connection,
28}
2930assert_impl_all!(Connection: Send, Sync, Unpin);
3132impl Connection {
33/// Create a `Connection` to the session/user message bus.
34pub fn session() -> Result<Self> {
35 block_on(crate::Connection::session()).map(Self::from)
36 }
3738/// Create a `Connection` to the system-wide message bus.
39pub fn system() -> Result<Self> {
40 block_on(crate::Connection::system()).map(Self::from)
41 }
4243/// The capacity of the main (unfiltered) queue.
44pub fn max_queued(&self) -> usize {
45self.inner.max_queued()
46 }
4748/// Set the capacity of the main (unfiltered) queue.
49pub fn set_max_queued(mut self, max: usize) {
50self.inner.set_max_queued(max)
51 }
5253/// The server's GUID.
54pub fn server_guid(&self) -> &str {
55self.inner.server_guid()
56 }
5758/// The unique name as assigned by the message bus or `None` if not a message bus connection.
59pub fn unique_name(&self) -> Option<&OwnedUniqueName> {
60self.inner.unique_name()
61 }
6263/// Send `msg` to the peer.
64pub fn send(&self, msg: &Message) -> Result<()> {
65 block_on(self.inner.send(msg))
66 }
6768/// Send a method call.
69 ///
70 /// Create a method-call message, send it over the connection, then wait for the reply.
71 ///
72 /// On successful reply, an `Ok(Message)` is returned. On error, an `Err` is returned. D-Bus
73 /// error replies are returned as [`Error::MethodError`].
74pub fn call_method<'d, 'p, 'i, 'm, D, P, I, M, B>(
75&self,
76 destination: Option<D>,
77 path: P,
78 iface: Option<I>,
79 method_name: M,
80 body: &B,
81 ) -> Result<Message>
82where
83D: TryInto<BusName<'d>>,
84 P: TryInto<ObjectPath<'p>>,
85 I: TryInto<InterfaceName<'i>>,
86 M: TryInto<MemberName<'m>>,
87 D::Error: Into<Error>,
88 P::Error: Into<Error>,
89 I::Error: Into<Error>,
90 M::Error: Into<Error>,
91 B: serde::ser::Serialize + zvariant::DynamicType,
92 {
93 block_on(
94self.inner
95 .call_method(destination, path, iface, method_name, body),
96 )
97 }
9899/// Emit a signal.
100 ///
101 /// Create a signal message, and send it over the connection.
102pub fn emit_signal<'d, 'p, 'i, 'm, D, P, I, M, B>(
103&self,
104 destination: Option<D>,
105 path: P,
106 iface: I,
107 signal_name: M,
108 body: &B,
109 ) -> Result<()>
110where
111D: TryInto<BusName<'d>>,
112 P: TryInto<ObjectPath<'p>>,
113 I: TryInto<InterfaceName<'i>>,
114 M: TryInto<MemberName<'m>>,
115 D::Error: Into<Error>,
116 P::Error: Into<Error>,
117 I::Error: Into<Error>,
118 M::Error: Into<Error>,
119 B: serde::ser::Serialize + zvariant::DynamicType,
120 {
121 block_on(
122self.inner
123 .emit_signal(destination, path, iface, signal_name, body),
124 )
125 }
126127/// Reply to a message.
128 ///
129 /// Given an existing message (likely a method call), send a reply back to the caller with the
130 /// given `body`.
131pub fn reply<B>(&self, call: &Message, body: &B) -> Result<()>
132where
133B: serde::ser::Serialize + zvariant::DynamicType,
134 {
135 block_on(self.inner.reply(call, body))
136 }
137138/// Reply an error to a message.
139 ///
140 /// Given an existing message (likely a method call), send an error reply back to the caller
141 /// with the given `error_name` and `body`.
142 ///
143 /// Returns the message serial number.
144pub fn reply_error<'e, E, B>(&self, call: &Message, error_name: E, body: &B) -> Result<()>
145where
146B: serde::ser::Serialize + zvariant::DynamicType,
147 E: TryInto<ErrorName<'e>>,
148 E::Error: Into<Error>,
149 {
150 block_on(self.inner.reply_error(call, error_name, body))
151 }
152153/// Reply to a method call with an error.
154 ///
155 /// Given an existing method call message header, send an error reply back to the caller
156 /// using one of the standard interface reply types.
157 ///
158 /// Returns the message serial number.
159pub fn reply_dbus_error(
160&self,
161 call: &zbus::message::Header<'_>,
162 err: impl DBusError,
163 ) -> Result<()> {
164 block_on(self.inner.reply_dbus_error(call, err))
165 }
166167/// Register a well-known name for this service on the bus.
168 ///
169 /// Blocking version of [`crate::Connection::request_name`]. See docs there for more details
170 /// and caveats.
171pub fn request_name<'w, W>(&self, well_known_name: W) -> Result<()>
172where
173W: TryInto<WellKnownName<'w>>,
174 W::Error: Into<Error>,
175 {
176 block_on(self.inner.request_name(well_known_name))
177 }
178179/// Register a well-known name for this service on the bus.
180 ///
181 /// Blocking version of [`crate::Connection::request_name_with_flags`]. See docs there for more
182 /// details and caveats.
183pub fn request_name_with_flags<'w, W>(
184&self,
185 well_known_name: W,
186 flags: BitFlags<RequestNameFlags>,
187 ) -> Result<RequestNameReply>
188where
189W: TryInto<WellKnownName<'w>>,
190 W::Error: Into<Error>,
191 {
192 block_on(self.inner.request_name_with_flags(well_known_name, flags))
193 }
194195/// Deregister a previously registered well-known name for this service on the bus.
196 ///
197 /// Use this method to deregister a well-known name, registered through
198 /// [`Connection::request_name`].
199 ///
200 /// Unless an error is encountered, returns `Ok(true)` if name was previously registered with
201 /// the bus through `self` and it has now been successfully deregistered, `Ok(false)` if name
202 /// was not previously registered or already deregistered.
203pub fn release_name<'w, W>(&self, well_known_name: W) -> Result<bool>
204where
205W: TryInto<WellKnownName<'w>>,
206 W::Error: Into<Error>,
207 {
208 block_on(self.inner.release_name(well_known_name))
209 }
210211/// Checks if `self` is a connection to a message bus.
212 ///
213 /// This will return `false` for p2p connections.
214pub fn is_bus(&self) -> bool {
215self.inner.is_bus()
216 }
217218/// Get a reference to the associated [`ObjectServer`].
219 ///
220 /// The `ObjectServer` is created on-demand.
221pub fn object_server(&self) -> impl Deref<Target = ObjectServer> + '_ {
222self.inner.sync_object_server(true, None)
223 }
224225/// Get a reference to the underlying async Connection.
226pub fn inner(&self) -> &crate::Connection {
227&self.inner
228 }
229230/// Get the underlying async Connection, consuming `self`.
231pub fn into_inner(self) -> crate::Connection {
232self.inner
233 }
234235/// Returns a listener, notified on various connection activity.
236 ///
237 /// This function is meant for the caller to implement idle or timeout on inactivity.
238pub fn monitor_activity(&self) -> EventListener {
239self.inner.monitor_activity()
240 }
241242/// Returns the peer credentials.
243 ///
244 /// The fields are populated on the best effort basis. Some or all fields may not even make
245 /// sense for certain sockets or on certain platforms and hence will be set to `None`.
246 ///
247 /// # Caveats
248 ///
249 /// Currently `unix_group_ids` and `linux_security_label` fields are not populated.
250pub fn peer_credentials(&self) -> io::Result<ConnectionCredentials> {
251 block_on(self.inner.peer_credentials())
252 }
253254/// Close the connection.
255 ///
256 /// After this call, all reading and writing operations will fail.
257pub fn close(self) -> Result<()> {
258 block_on(self.inner.close())
259 }
260}
261262impl From<crate::Connection> for Connection {
263fn from(conn: crate::Connection) -> Self {
264Self { inner: conn }
265 }
266}
267268#[cfg(feature = "p2p")]
269#[cfg(all(test, unix))]
270mod tests {
271use event_listener::Listener;
272use ntest::timeout;
273#[cfg(all(unix, not(feature = "tokio")))]
274use std::os::unix::net::UnixStream;
275use std::thread;
276use test_log::test;
277#[cfg(all(unix, feature = "tokio"))]
278use tokio::net::UnixStream;
279#[cfg(all(windows, not(feature = "tokio")))]
280use uds_windows::UnixStream;
281282use crate::{
283 blocking::{connection::Builder, MessageIterator},
284 Guid,
285 };
286287#[test]
288 #[timeout(15000)]
289fn unix_p2p() {
290let guid = Guid::generate();
291292// Tokio needs us to call the sync function from async context. :shrug:
293let (p0, p1) = crate::utils::block_on(async { UnixStream::pair().unwrap() });
294295let (tx, rx) = std::sync::mpsc::channel();
296let server_thread = thread::spawn(move || {
297let c = Builder::unix_stream(p0)
298 .server(guid)
299 .unwrap()
300 .p2p()
301 .build()
302 .unwrap();
303 rx.recv().unwrap();
304let reply = c
305 .call_method(None::<()>, "/", Some("org.zbus.p2p"), "Test", &())
306 .unwrap();
307assert_eq!(reply.to_string(), "Method return");
308let val: String = reply.body().deserialize().unwrap();
309 val
310 });
311312let c = Builder::unix_stream(p1).p2p().build().unwrap();
313314let listener = c.monitor_activity();
315316let mut s = MessageIterator::from(&c);
317 tx.send(()).unwrap();
318let m = s.next().unwrap().unwrap();
319assert_eq!(m.to_string(), "Method call Test");
320 c.reply(&m, &("yay")).unwrap();
321322for _ in s {}
323324let val = server_thread.join().expect("failed to join server thread");
325assert_eq!(val, "yay");
326327// there was some activity
328listener.wait();
329// eventually, nothing happens and it will timeout
330loop {
331let listener = c.monitor_activity();
332if listener
333 .wait_timeout(std::time::Duration::from_millis(10))
334 .is_none()
335 {
336break;
337 }
338 }
339 }
340}