zbus/blocking/connection/
mod.rs1use enumflags2::BitFlags;
4use event_listener::EventListener;
5use static_assertions::assert_impl_all;
6use std::{io, ops::Deref};
7use zbus_names::{BusName, ErrorName, InterfaceName, MemberName, OwnedUniqueName, WellKnownName};
8use zvariant::ObjectPath;
9
10use crate::{
11 blocking::ObjectServer,
12 fdo::{ConnectionCredentials, RequestNameFlags, RequestNameReply},
13 message::Message,
14 utils::block_on,
15 DBusError, Error, Result,
16};
17
18mod builder;
19pub use builder::Builder;
20
21#[derive(Debug, Clone)]
25#[must_use = "Dropping a `Connection` will close the underlying socket."]
26pub struct Connection {
27 inner: crate::Connection,
28}
29
30assert_impl_all!(Connection: Send, Sync, Unpin);
31
32impl Connection {
33 pub fn session() -> Result<Self> {
35 block_on(crate::Connection::session()).map(Self::from)
36 }
37
38 pub fn system() -> Result<Self> {
40 block_on(crate::Connection::system()).map(Self::from)
41 }
42
43 pub fn max_queued(&self) -> usize {
45 self.inner.max_queued()
46 }
47
48 pub fn set_max_queued(mut self, max: usize) {
50 self.inner.set_max_queued(max)
51 }
52
53 pub fn server_guid(&self) -> &str {
55 self.inner.server_guid()
56 }
57
58 pub fn unique_name(&self) -> Option<&OwnedUniqueName> {
60 self.inner.unique_name()
61 }
62
63 pub fn send(&self, msg: &Message) -> Result<()> {
65 block_on(self.inner.send(msg))
66 }
67
68 pub fn call_method<'d, 'p, 'i, 'm, D, P, I, M, B>(
75 &self,
76 destination: Option<D>,
77 path: P,
78 iface: Option<I>,
79 method_name: M,
80 body: &B,
81 ) -> Result<Message>
82 where
83 D: TryInto<BusName<'d>>,
84 P: TryInto<ObjectPath<'p>>,
85 I: TryInto<InterfaceName<'i>>,
86 M: TryInto<MemberName<'m>>,
87 D::Error: Into<Error>,
88 P::Error: Into<Error>,
89 I::Error: Into<Error>,
90 M::Error: Into<Error>,
91 B: serde::ser::Serialize + zvariant::DynamicType,
92 {
93 block_on(
94 self.inner
95 .call_method(destination, path, iface, method_name, body),
96 )
97 }
98
99 pub fn emit_signal<'d, 'p, 'i, 'm, D, P, I, M, B>(
103 &self,
104 destination: Option<D>,
105 path: P,
106 iface: I,
107 signal_name: M,
108 body: &B,
109 ) -> Result<()>
110 where
111 D: TryInto<BusName<'d>>,
112 P: TryInto<ObjectPath<'p>>,
113 I: TryInto<InterfaceName<'i>>,
114 M: TryInto<MemberName<'m>>,
115 D::Error: Into<Error>,
116 P::Error: Into<Error>,
117 I::Error: Into<Error>,
118 M::Error: Into<Error>,
119 B: serde::ser::Serialize + zvariant::DynamicType,
120 {
121 block_on(
122 self.inner
123 .emit_signal(destination, path, iface, signal_name, body),
124 )
125 }
126
127 pub fn reply<B>(&self, call: &Message, body: &B) -> Result<()>
132 where
133 B: serde::ser::Serialize + zvariant::DynamicType,
134 {
135 block_on(self.inner.reply(call, body))
136 }
137
138 pub fn reply_error<'e, E, B>(&self, call: &Message, error_name: E, body: &B) -> Result<()>
145 where
146 B: serde::ser::Serialize + zvariant::DynamicType,
147 E: TryInto<ErrorName<'e>>,
148 E::Error: Into<Error>,
149 {
150 block_on(self.inner.reply_error(call, error_name, body))
151 }
152
153 pub fn reply_dbus_error(
160 &self,
161 call: &zbus::message::Header<'_>,
162 err: impl DBusError,
163 ) -> Result<()> {
164 block_on(self.inner.reply_dbus_error(call, err))
165 }
166
167 pub fn request_name<'w, W>(&self, well_known_name: W) -> Result<()>
172 where
173 W: TryInto<WellKnownName<'w>>,
174 W::Error: Into<Error>,
175 {
176 block_on(self.inner.request_name(well_known_name))
177 }
178
179 pub fn request_name_with_flags<'w, W>(
184 &self,
185 well_known_name: W,
186 flags: BitFlags<RequestNameFlags>,
187 ) -> Result<RequestNameReply>
188 where
189 W: TryInto<WellKnownName<'w>>,
190 W::Error: Into<Error>,
191 {
192 block_on(self.inner.request_name_with_flags(well_known_name, flags))
193 }
194
195 pub fn release_name<'w, W>(&self, well_known_name: W) -> Result<bool>
204 where
205 W: TryInto<WellKnownName<'w>>,
206 W::Error: Into<Error>,
207 {
208 block_on(self.inner.release_name(well_known_name))
209 }
210
211 pub fn is_bus(&self) -> bool {
215 self.inner.is_bus()
216 }
217
218 pub fn object_server(&self) -> impl Deref<Target = ObjectServer> + '_ {
222 self.inner.sync_object_server(true, None)
223 }
224
225 pub fn inner(&self) -> &crate::Connection {
227 &self.inner
228 }
229
230 pub fn into_inner(self) -> crate::Connection {
232 self.inner
233 }
234
235 pub fn monitor_activity(&self) -> EventListener {
239 self.inner.monitor_activity()
240 }
241
242 pub fn peer_credentials(&self) -> io::Result<ConnectionCredentials> {
251 block_on(self.inner.peer_credentials())
252 }
253
254 pub fn close(self) -> Result<()> {
258 block_on(self.inner.close())
259 }
260}
261
262impl From<crate::Connection> for Connection {
263 fn from(conn: crate::Connection) -> Self {
264 Self { inner: conn }
265 }
266}
267
268#[cfg(feature = "p2p")]
269#[cfg(all(test, unix))]
270mod tests {
271 use event_listener::Listener;
272 use ntest::timeout;
273 #[cfg(all(unix, not(feature = "tokio")))]
274 use std::os::unix::net::UnixStream;
275 use std::thread;
276 use test_log::test;
277 #[cfg(all(unix, feature = "tokio"))]
278 use tokio::net::UnixStream;
279 #[cfg(all(windows, not(feature = "tokio")))]
280 use uds_windows::UnixStream;
281
282 use crate::{
283 blocking::{connection::Builder, MessageIterator},
284 Guid,
285 };
286
287 #[test]
288 #[timeout(15000)]
289 fn unix_p2p() {
290 let guid = Guid::generate();
291
292 let (p0, p1) = crate::utils::block_on(async { UnixStream::pair().unwrap() });
294
295 let (tx, rx) = std::sync::mpsc::channel();
296 let server_thread = thread::spawn(move || {
297 let c = Builder::unix_stream(p0)
298 .server(guid)
299 .unwrap()
300 .p2p()
301 .build()
302 .unwrap();
303 rx.recv().unwrap();
304 let reply = c
305 .call_method(None::<()>, "/", Some("org.zbus.p2p"), "Test", &())
306 .unwrap();
307 assert_eq!(reply.to_string(), "Method return");
308 let val: String = reply.body().deserialize().unwrap();
309 val
310 });
311
312 let c = Builder::unix_stream(p1).p2p().build().unwrap();
313
314 let listener = c.monitor_activity();
315
316 let mut s = MessageIterator::from(&c);
317 tx.send(()).unwrap();
318 let m = s.next().unwrap().unwrap();
319 assert_eq!(m.to_string(), "Method call Test");
320 c.reply(&m, &("yay")).unwrap();
321
322 for _ in s {}
323
324 let val = server_thread.join().expect("failed to join server thread");
325 assert_eq!(val, "yay");
326
327 listener.wait();
329 loop {
331 let listener = c.monitor_activity();
332 if listener
333 .wait_timeout(std::time::Duration::from_millis(10))
334 .is_none()
335 {
336 break;
337 }
338 }
339 }
340}