zbus/
lib.rs

1#![deny(rust_2018_idioms)]
2#![doc(
3    html_logo_url = "https://raw.githubusercontent.com/dbus2/zbus/9f7a90d2b594ddc48b7a5f39fda5e00cd56a7dfb/logo.png"
4)]
5#![doc = include_str!("../README.md")]
6#![doc(test(attr(
7    warn(unused),
8    deny(warnings),
9    allow(dead_code),
10    // W/o this, we seem to get some bogus warning about `extern crate zbus`.
11    allow(unused_extern_crates),
12)))]
13
14#[cfg(doctest)]
15mod doctests {
16    // Repo README.
17    doc_comment::doctest!("../../README.md");
18    // Book markdown checks
19    doc_comment::doctest!("../../book/src/client.md");
20    doc_comment::doctest!("../../book/src/concepts.md");
21    // The connection chapter contains a p2p example.
22    #[cfg(feature = "p2p")]
23    doc_comment::doctest!("../../book/src/connection.md");
24    doc_comment::doctest!("../../book/src/contributors.md");
25    doc_comment::doctest!("../../book/src/introduction.md");
26    doc_comment::doctest!("../../book/src/service.md");
27    doc_comment::doctest!("../../book/src/blocking.md");
28    doc_comment::doctest!("../../book/src/faq.md");
29}
30
31#[cfg(all(not(feature = "async-io"), not(feature = "tokio")))]
32mod error_message {
33    #[cfg(windows)]
34    compile_error!("Either \"async-io\" (default) or \"tokio\" must be enabled. On Windows \"async-io\" is (currently) required for UNIX socket support");
35
36    #[cfg(not(windows))]
37    compile_error!("Either \"async-io\" (default) or \"tokio\" must be enabled.");
38}
39
40#[cfg(windows)]
41mod win32;
42
43mod dbus_error;
44pub use dbus_error::*;
45
46mod error;
47pub use error::*;
48
49pub mod address;
50pub use address::Address;
51
52mod guid;
53pub use guid::*;
54
55pub mod message;
56pub use message::Message;
57
58#[deprecated(since = "4.0.0", note = "Use `message::Builder` instead")]
59#[doc(hidden)]
60pub use message::Builder as MessageBuilder;
61#[deprecated(since = "4.0.0", note = "Use `message::EndianSig` instead")]
62#[doc(hidden)]
63pub use message::EndianSig;
64#[doc(hidden)]
65pub use message::Flags as MessageFlags;
66#[deprecated(since = "4.0.0", note = "Use `message::Header` instead")]
67#[doc(hidden)]
68pub use message::Header as MessageHeader;
69#[deprecated(since = "4.0.0", note = "Use `message::PrimaryHeader` instead")]
70#[doc(hidden)]
71pub use message::PrimaryHeader as MessagePrimaryHeader;
72#[deprecated(since = "4.0.0", note = "Use `message::Sequence` instead")]
73#[doc(hidden)]
74pub use message::Sequence as MessageSequence;
75#[deprecated(since = "4.0.0", note = "Use `message::Type` instead")]
76#[doc(hidden)]
77pub use message::Type as MessageType;
78#[deprecated(since = "4.0.0", note = "Use `message::NATIVE_ENDIAN_SIG` instead")]
79#[doc(hidden)]
80pub use message::NATIVE_ENDIAN_SIG;
81
82pub mod connection;
83/// Alias for `connection` module, for convenience.
84pub use connection as conn;
85pub use connection::{handshake::AuthMechanism, Connection};
86
87#[deprecated(since = "4.0.0", note = "Use `connection::Builder` instead")]
88#[doc(hidden)]
89pub use connection::Builder as ConnectionBuilder;
90
91mod message_stream;
92pub use message_stream::*;
93mod abstractions;
94pub use abstractions::*;
95
96pub mod match_rule;
97pub use match_rule::{MatchRule, OwnedMatchRule};
98
99#[deprecated(since = "4.0.0", note = "Use `match_rule::Builder` instead")]
100#[doc(hidden)]
101pub use match_rule::Builder as MatchRuleBuilder;
102#[deprecated(since = "4.0.0", note = "Use `match_rule::PathSpec` instead")]
103#[doc(hidden)]
104pub use match_rule::PathSpec as MatchRulePathSpec;
105
106pub mod proxy;
107pub use proxy::Proxy;
108
109#[deprecated(since = "4.0.0", note = "Use `proxy::Builder` instead")]
110#[doc(hidden)]
111pub use proxy::Builder as ProxyBuilder;
112#[deprecated(since = "4.0.0", note = "Use `proxy::CacheProperties` instead")]
113#[doc(hidden)]
114pub use proxy::CacheProperties;
115#[deprecated(since = "4.0.0", note = "Use `proxy::MethodFlags` instead")]
116#[doc(hidden)]
117pub use proxy::MethodFlags;
118#[deprecated(since = "4.0.0", note = "Use `proxy::OwnerChangedStream` instead")]
119#[doc(hidden)]
120pub use proxy::OwnerChangedStream;
121#[deprecated(since = "4.0.0", note = "Use `proxy::PropertyChanged` instead")]
122#[doc(hidden)]
123pub use proxy::PropertyChanged;
124#[deprecated(since = "4.0.0", note = "Use `proxy::PropertyStream` instead")]
125#[doc(hidden)]
126pub use proxy::PropertyStream;
127#[deprecated(since = "4.0.0", note = "Use `proxy::ProxyDefault` instead")]
128#[doc(hidden)]
129pub use proxy::ProxyDefault;
130
131pub mod object_server;
132pub use object_server::ObjectServer;
133
134#[deprecated(since = "4.0.0", note = "Use `object_server::DispatchResult` instead")]
135#[doc(hidden)]
136pub use object_server::DispatchResult;
137#[deprecated(since = "4.0.0", note = "Use `object_server::Interface` instead")]
138#[doc(hidden)]
139pub use object_server::Interface;
140#[deprecated(since = "4.0.0", note = "Use `object_server::InterfaceDeref` instead")]
141#[doc(hidden)]
142pub use object_server::InterfaceDeref;
143#[deprecated(
144    since = "4.0.0",
145    note = "Use `object_server::InterfaceDerefMut` instead"
146)]
147#[doc(hidden)]
148pub use object_server::InterfaceDerefMut;
149#[deprecated(since = "4.0.0", note = "Use `object_server::InterfaceRef` instead")]
150#[doc(hidden)]
151pub use object_server::InterfaceRef;
152#[deprecated(
153    since = "4.0.0",
154    note = "Use `object_server::ResponseDispatchNotifier` instead"
155)]
156#[doc(hidden)]
157pub use object_server::ResponseDispatchNotifier;
158#[deprecated(since = "4.0.0", note = "Use `object_server::SignalContext` instead")]
159#[doc(hidden)]
160pub use object_server::SignalContext;
161
162mod utils;
163pub use utils::*;
164
165#[macro_use]
166pub mod fdo;
167
168#[deprecated(since = "4.0.0", note = "Use `connection::Socket` instead")]
169#[doc(hidden)]
170pub use connection::Socket;
171
172pub mod blocking;
173
174pub use zbus_macros::{interface, proxy, DBusError};
175// Old names used for backwards compatibility
176pub use zbus_macros::{dbus_interface, dbus_proxy};
177
178// Required for the macros to function within this crate.
179extern crate self as zbus;
180
181// Macro support module, not part of the public API.
182#[doc(hidden)]
183pub mod export {
184    pub use async_trait;
185    pub use futures_core;
186    pub use futures_util;
187    pub use ordered_stream;
188    pub use serde;
189    pub use static_assertions;
190}
191
192pub use zbus_names as names;
193pub use zvariant;
194
195#[cfg(test)]
196mod tests {
197    use std::{
198        collections::HashMap,
199        sync::{mpsc::channel, Arc, Condvar, Mutex},
200    };
201
202    use crate::utils::block_on;
203    use enumflags2::BitFlags;
204    use event_listener::Event;
205    use ntest::timeout;
206    use test_log::test;
207    use tracing::{debug, instrument, trace};
208
209    use zbus_names::UniqueName;
210    use zvariant::{OwnedObjectPath, OwnedValue, Type};
211
212    use crate::{
213        blocking::{self, MessageIterator},
214        fdo::{RequestNameFlags, RequestNameReply},
215        message::Message,
216        object_server::SignalContext,
217        Connection, Result,
218    };
219
220    #[test]
221    fn msg() {
222        let m = Message::method("/org/freedesktop/DBus", "GetMachineId")
223            .unwrap()
224            .destination("org.freedesktop.DBus")
225            .unwrap()
226            .interface("org.freedesktop.DBus.Peer")
227            .unwrap()
228            .build(&())
229            .unwrap();
230        let hdr = m.header();
231        assert_eq!(hdr.path().unwrap(), "/org/freedesktop/DBus");
232        assert_eq!(hdr.interface().unwrap(), "org.freedesktop.DBus.Peer");
233        assert_eq!(hdr.member().unwrap(), "GetMachineId");
234    }
235
236    #[test]
237    #[timeout(15000)]
238    #[instrument]
239    fn basic_connection() {
240        let connection = blocking::Connection::session()
241            .map_err(|e| {
242                debug!("error: {}", e);
243
244                e
245            })
246            .unwrap();
247        // Hello method is already called during connection creation so subsequent calls are
248        // expected to fail but only with a D-Bus error.
249        match connection.call_method(
250            Some("org.freedesktop.DBus"),
251            "/org/freedesktop/DBus",
252            Some("org.freedesktop.DBus"),
253            "Hello",
254            &(),
255        ) {
256            Err(crate::Error::MethodError(_, _, _)) => (),
257            Err(e) => panic!("{}", e),
258
259            _ => panic!(),
260        };
261    }
262
263    #[test]
264    #[timeout(15000)]
265    fn basic_connection_async() {
266        block_on(test_basic_connection()).unwrap();
267    }
268
269    async fn test_basic_connection() -> Result<()> {
270        let connection = Connection::session().await?;
271
272        match connection
273            .call_method(
274                Some("org.freedesktop.DBus"),
275                "/org/freedesktop/DBus",
276                Some("org.freedesktop.DBus"),
277                "Hello",
278                &(),
279            )
280            .await
281        {
282            Err(crate::Error::MethodError(_, _, _)) => (),
283            Err(e) => panic!("{}", e),
284
285            _ => panic!(),
286        };
287
288        Ok(())
289    }
290
291    #[cfg(all(unix, not(target_os = "macos")))]
292    #[test]
293    #[timeout(15000)]
294    fn fdpass_systemd() {
295        use std::{fs::File, os::unix::io::AsRawFd};
296        use zvariant::OwnedFd;
297
298        let connection = blocking::Connection::system().unwrap();
299
300        let reply = connection
301            .call_method(
302                Some("org.freedesktop.systemd1"),
303                "/org/freedesktop/systemd1",
304                Some("org.freedesktop.systemd1.Manager"),
305                "DumpByFileDescriptor",
306                &(),
307            )
308            .unwrap();
309
310        let fd: OwnedFd = reply.body().deserialize().unwrap();
311        assert!(fd.as_raw_fd() >= 0);
312        let f = File::from(std::os::fd::OwnedFd::from(fd));
313        f.metadata().unwrap();
314    }
315
316    #[test]
317    #[instrument]
318    #[timeout(15000)]
319    fn freedesktop_api() {
320        let connection = blocking::Connection::session()
321            .map_err(|e| {
322                debug!("error: {}", e);
323
324                e
325            })
326            .unwrap();
327
328        let reply = connection
329            .call_method(
330                Some("org.freedesktop.DBus"),
331                "/org/freedesktop/DBus",
332                Some("org.freedesktop.DBus"),
333                "RequestName",
334                &(
335                    "org.freedesktop.zbus.sync",
336                    BitFlags::from(RequestNameFlags::ReplaceExisting),
337                ),
338            )
339            .unwrap();
340
341        let body = reply.body();
342        assert!(body.signature().map(|s| s == "u").unwrap());
343        let reply: RequestNameReply = body.deserialize().unwrap();
344        assert_eq!(reply, RequestNameReply::PrimaryOwner);
345
346        let reply = connection
347            .call_method(
348                Some("org.freedesktop.DBus"),
349                "/org/freedesktop/DBus",
350                Some("org.freedesktop.DBus"),
351                "GetId",
352                &(),
353            )
354            .unwrap();
355
356        let body = reply.body();
357        assert!(body.signature().map(|s| s == <&str>::signature()).unwrap());
358        let id: &str = body.deserialize().unwrap();
359        debug!("Unique ID of the bus: {}", id);
360
361        let reply = connection
362            .call_method(
363                Some("org.freedesktop.DBus"),
364                "/org/freedesktop/DBus",
365                Some("org.freedesktop.DBus"),
366                "NameHasOwner",
367                &"org.freedesktop.zbus.sync",
368            )
369            .unwrap();
370
371        let body = reply.body();
372        assert!(body.signature().map(|s| s == bool::signature()).unwrap());
373        assert!(body.deserialize::<bool>().unwrap());
374
375        let reply = connection
376            .call_method(
377                Some("org.freedesktop.DBus"),
378                "/org/freedesktop/DBus",
379                Some("org.freedesktop.DBus"),
380                "GetNameOwner",
381                &"org.freedesktop.zbus.sync",
382            )
383            .unwrap();
384
385        let body = reply.body();
386        assert!(body.signature().map(|s| s == <&str>::signature()).unwrap());
387        assert_eq!(
388            body.deserialize::<UniqueName<'_>>().unwrap(),
389            *connection.unique_name().unwrap(),
390        );
391
392        let reply = connection
393            .call_method(
394                Some("org.freedesktop.DBus"),
395                "/org/freedesktop/DBus",
396                Some("org.freedesktop.DBus"),
397                "GetConnectionCredentials",
398                &"org.freedesktop.DBus",
399            )
400            .unwrap();
401
402        let body = reply.body();
403        assert!(body.signature().map(|s| s == "a{sv}").unwrap());
404        let hashmap: HashMap<&str, OwnedValue> = body.deserialize().unwrap();
405
406        let pid: u32 = (&hashmap["ProcessID"]).try_into().unwrap();
407        debug!("DBus bus PID: {}", pid);
408
409        #[cfg(unix)]
410        {
411            let uid: u32 = (&hashmap["UnixUserID"]).try_into().unwrap();
412            debug!("DBus bus UID: {}", uid);
413        }
414    }
415
416    #[test]
417    #[timeout(15000)]
418    fn freedesktop_api_async() {
419        block_on(test_freedesktop_api()).unwrap();
420    }
421
422    #[instrument]
423    async fn test_freedesktop_api() -> Result<()> {
424        let connection = Connection::session().await?;
425
426        let reply = connection
427            .call_method(
428                Some("org.freedesktop.DBus"),
429                "/org/freedesktop/DBus",
430                Some("org.freedesktop.DBus"),
431                "RequestName",
432                &(
433                    "org.freedesktop.zbus.async",
434                    BitFlags::from(RequestNameFlags::ReplaceExisting),
435                ),
436            )
437            .await
438            .unwrap();
439
440        let body = reply.body();
441        assert!(body.signature().map(|s| s == "u").unwrap());
442        let reply: RequestNameReply = body.deserialize().unwrap();
443        assert_eq!(reply, RequestNameReply::PrimaryOwner);
444
445        let reply = connection
446            .call_method(
447                Some("org.freedesktop.DBus"),
448                "/org/freedesktop/DBus",
449                Some("org.freedesktop.DBus"),
450                "GetId",
451                &(),
452            )
453            .await
454            .unwrap();
455
456        let body = reply.body();
457        assert!(body.signature().map(|s| s == <&str>::signature()).unwrap());
458        let id: &str = body.deserialize().unwrap();
459        debug!("Unique ID of the bus: {}", id);
460
461        let reply = connection
462            .call_method(
463                Some("org.freedesktop.DBus"),
464                "/org/freedesktop/DBus",
465                Some("org.freedesktop.DBus"),
466                "NameHasOwner",
467                &"org.freedesktop.zbus.async",
468            )
469            .await
470            .unwrap();
471
472        let body = reply.body();
473        assert!(body.signature().map(|s| s == bool::signature()).unwrap());
474        assert!(body.deserialize::<bool>().unwrap());
475
476        let reply = connection
477            .call_method(
478                Some("org.freedesktop.DBus"),
479                "/org/freedesktop/DBus",
480                Some("org.freedesktop.DBus"),
481                "GetNameOwner",
482                &"org.freedesktop.zbus.async",
483            )
484            .await
485            .unwrap();
486
487        let body = reply.body();
488        assert!(body.signature().map(|s| s == <&str>::signature()).unwrap());
489        assert_eq!(
490            body.deserialize::<UniqueName<'_>>().unwrap(),
491            *connection.unique_name().unwrap(),
492        );
493
494        let reply = connection
495            .call_method(
496                Some("org.freedesktop.DBus"),
497                "/org/freedesktop/DBus",
498                Some("org.freedesktop.DBus"),
499                "GetConnectionCredentials",
500                &"org.freedesktop.DBus",
501            )
502            .await
503            .unwrap();
504
505        let body = reply.body();
506        assert!(body.signature().map(|s| s == "a{sv}").unwrap());
507        let hashmap: HashMap<&str, OwnedValue> = body.deserialize().unwrap();
508
509        let pid: u32 = (&hashmap["ProcessID"]).try_into().unwrap();
510        debug!("DBus bus PID: {}", pid);
511
512        #[cfg(unix)]
513        {
514            let uid: u32 = (&hashmap["UnixUserID"]).try_into().unwrap();
515            debug!("DBus bus UID: {}", uid);
516        }
517
518        Ok(())
519    }
520
521    #[test]
522    #[timeout(15000)]
523    fn issue_68() {
524        // Tests the fix for https://github.com/dbus2/zbus/issues/68
525        //
526        // While this is not an exact reproduction of the issue 68, the underlying problem it
527        // produces is exactly the same: `Connection::call_method` dropping all incoming messages
528        // while waiting for the reply to the method call.
529        let conn = blocking::Connection::session().unwrap();
530        let stream = MessageIterator::from(&conn);
531
532        // Send a message as client before service starts to process messages
533        let client_conn = blocking::Connection::session().unwrap();
534        let destination = conn.unique_name().map(UniqueName::<'_>::from).unwrap();
535        let msg = Message::method("/org/freedesktop/Issue68", "Ping")
536            .unwrap()
537            .destination(destination)
538            .unwrap()
539            .interface("org.freedesktop.Issue68")
540            .unwrap()
541            .build(&())
542            .unwrap();
543        let serial = msg.primary_header().serial_num();
544        client_conn.send(&msg).unwrap();
545
546        crate::blocking::fdo::DBusProxy::new(&conn)
547            .unwrap()
548            .get_id()
549            .unwrap();
550
551        for m in stream {
552            let msg = m.unwrap();
553
554            if msg.primary_header().serial_num() == serial {
555                break;
556            }
557        }
558    }
559
560    #[test]
561    #[timeout(15000)]
562    fn issue104() {
563        // Tests the fix for https://github.com/dbus2/zbus/issues/104
564        //
565        // The issue is caused by `proxy` macro adding `()` around the return value of methods
566        // with multiple out arguments, ending up with double parenthesis around the signature of
567        // the return type and zbus only removing the outer `()` only and then it not matching the
568        // signature we receive on the reply message.
569        use zvariant::{ObjectPath, Value};
570
571        struct Secret;
572        #[super::interface(name = "org.freedesktop.Secret.Service")]
573        impl Secret {
574            fn open_session(
575                &self,
576                _algorithm: &str,
577                input: Value<'_>,
578            ) -> zbus::fdo::Result<(OwnedValue, OwnedObjectPath)> {
579                Ok((
580                    OwnedValue::try_from(input).unwrap(),
581                    ObjectPath::try_from("/org/freedesktop/secrets/Blah")
582                        .unwrap()
583                        .into(),
584                ))
585            }
586        }
587
588        let secret = Secret;
589        let conn = blocking::connection::Builder::session()
590            .unwrap()
591            .serve_at("/org/freedesktop/secrets", secret)
592            .unwrap()
593            .build()
594            .unwrap();
595        let service_name = conn.unique_name().unwrap().clone();
596
597        {
598            let conn = blocking::Connection::session().unwrap();
599            #[super::proxy(
600                interface = "org.freedesktop.Secret.Service",
601                assume_defaults = true,
602                gen_async = false
603            )]
604            trait Secret {
605                fn open_session(
606                    &self,
607                    algorithm: &str,
608                    input: &zvariant::Value<'_>,
609                ) -> zbus::Result<(OwnedValue, OwnedObjectPath)>;
610            }
611
612            let proxy = SecretProxy::builder(&conn)
613                .destination(UniqueName::from(service_name))
614                .unwrap()
615                .path("/org/freedesktop/secrets")
616                .unwrap()
617                .build()
618                .unwrap();
619
620            trace!("Calling open_session");
621            proxy.open_session("plain", &Value::from("")).unwrap();
622            trace!("Called open_session");
623        };
624    }
625
626    // This one we just want to see if it builds, no need to run it. For details see:
627    //
628    // https://github.com/dbus2/zbus/issues/121
629    #[test]
630    #[ignore]
631    fn issue_121() {
632        use crate::proxy;
633
634        #[proxy(interface = "org.freedesktop.IBus", assume_defaults = true)]
635        trait IBus {
636            /// CurrentInputContext property
637            #[zbus(property)]
638            fn current_input_context(&self) -> zbus::Result<OwnedObjectPath>;
639
640            /// Engines property
641            #[zbus(property)]
642            fn engines(&self) -> zbus::Result<Vec<zvariant::OwnedValue>>;
643        }
644    }
645
646    #[test]
647    #[timeout(15000)]
648    fn issue_122() {
649        let conn = blocking::Connection::session().unwrap();
650        let stream = MessageIterator::from(&conn);
651
652        #[allow(clippy::mutex_atomic)]
653        let pair = Arc::new((Mutex::new(false), Condvar::new()));
654        let pair2 = Arc::clone(&pair);
655
656        let child = std::thread::spawn(move || {
657            {
658                let (lock, cvar) = &*pair2;
659                let mut started = lock.lock().unwrap();
660                *started = true;
661                cvar.notify_one();
662            }
663
664            for m in stream {
665                let msg = m.unwrap();
666                let hdr = msg.header();
667
668                if hdr.member().map(|m| m.as_str()) == Some("ZBusIssue122") {
669                    break;
670                }
671            }
672        });
673
674        // Wait for the receiving thread to start up.
675        let (lock, cvar) = &*pair;
676        let mut started = lock.lock().unwrap();
677        while !*started {
678            started = cvar.wait(started).unwrap();
679        }
680        // Still give it some milliseconds to ensure it's already blocking on receive_message call
681        // when we send a message.
682        std::thread::sleep(std::time::Duration::from_millis(100));
683
684        let destination = conn.unique_name().map(UniqueName::<'_>::from).unwrap();
685        let msg = Message::method("/does/not/matter", "ZBusIssue122")
686            .unwrap()
687            .destination(destination)
688            .unwrap()
689            .build(&())
690            .unwrap();
691        conn.send(&msg).unwrap();
692
693        child.join().unwrap();
694    }
695
696    #[test]
697    #[ignore]
698    fn issue_81() {
699        use zbus::proxy;
700        use zvariant::{OwnedValue, Type};
701
702        #[derive(
703            Debug, PartialEq, Eq, Clone, Type, OwnedValue, serde::Serialize, serde::Deserialize,
704        )]
705        pub struct DbusPath {
706            id: String,
707            path: OwnedObjectPath,
708        }
709
710        #[proxy(assume_defaults = true)]
711        trait Session {
712            #[zbus(property)]
713            fn sessions_tuple(&self) -> zbus::Result<(String, String)>;
714
715            #[zbus(property)]
716            fn sessions_struct(&self) -> zbus::Result<DbusPath>;
717        }
718    }
719
720    #[test]
721    #[timeout(15000)]
722    fn issue173() {
723        // Tests the fix for https://github.com/dbus2/zbus/issues/173
724        //
725        // The issue is caused by proxy not keeping track of its destination's owner changes
726        // (service restart) and failing to receive signals as a result.
727        let (tx, rx) = channel();
728        let child = std::thread::spawn(move || {
729            let conn = blocking::Connection::session().unwrap();
730            #[super::proxy(
731                interface = "org.freedesktop.zbus.ComeAndGo",
732                default_service = "org.freedesktop.zbus.ComeAndGo",
733                default_path = "/org/freedesktop/zbus/ComeAndGo"
734            )]
735            trait ComeAndGo {
736                #[zbus(signal)]
737                fn the_signal(&self) -> zbus::Result<()>;
738            }
739
740            let proxy = ComeAndGoProxyBlocking::new(&conn).unwrap();
741            let signals = proxy.receive_the_signal().unwrap();
742            tx.send(()).unwrap();
743
744            // We receive two signals, each time from different unique names. W/o the fix for
745            // issue#173, the second iteration hangs.
746            for _ in signals.take(2) {
747                tx.send(()).unwrap();
748            }
749        });
750
751        struct ComeAndGo;
752        #[super::interface(name = "org.freedesktop.zbus.ComeAndGo")]
753        impl ComeAndGo {
754            #[zbus(signal)]
755            async fn the_signal(signal_ctxt: &SignalContext<'_>) -> zbus::Result<()>;
756        }
757
758        rx.recv().unwrap();
759        for _ in 0..2 {
760            let conn = blocking::connection::Builder::session()
761                .unwrap()
762                .serve_at("/org/freedesktop/zbus/ComeAndGo", ComeAndGo)
763                .unwrap()
764                .name("org.freedesktop.zbus.ComeAndGo")
765                .unwrap()
766                .build()
767                .unwrap();
768
769            let iface_ref = conn
770                .object_server()
771                .interface::<_, ComeAndGo>("/org/freedesktop/zbus/ComeAndGo")
772                .unwrap();
773            block_on(ComeAndGo::the_signal(iface_ref.signal_context())).unwrap();
774
775            rx.recv().unwrap();
776
777            // Now we release the name ownership to use a different connection (i-e new unique
778            // name).
779            conn.release_name("org.freedesktop.zbus.ComeAndGo").unwrap();
780        }
781
782        child.join().unwrap();
783    }
784
785    #[test]
786    #[timeout(15000)]
787    fn uncached_property() {
788        block_on(test_uncached_property()).unwrap();
789    }
790
791    async fn test_uncached_property() -> Result<()> {
792        // A dummy boolean test service. It starts as `false` and can be
793        // flipped to `true`. Two properties can access the inner value, with
794        // and without caching.
795        #[derive(Default)]
796        struct ServiceUncachedPropertyTest(bool);
797        #[crate::interface(name = "org.freedesktop.zbus.UncachedPropertyTest")]
798        impl ServiceUncachedPropertyTest {
799            #[zbus(property)]
800            fn cached_prop(&self) -> bool {
801                self.0
802            }
803            #[zbus(property)]
804            fn uncached_prop(&self) -> bool {
805                self.0
806            }
807            async fn set_inner_to_true(&mut self) -> zbus::fdo::Result<()> {
808                self.0 = true;
809                Ok(())
810            }
811        }
812
813        #[crate::proxy(
814            interface = "org.freedesktop.zbus.UncachedPropertyTest",
815            default_service = "org.freedesktop.zbus.UncachedPropertyTest",
816            default_path = "/org/freedesktop/zbus/UncachedPropertyTest"
817        )]
818        trait UncachedPropertyTest {
819            #[zbus(property)]
820            fn cached_prop(&self) -> zbus::Result<bool>;
821
822            #[zbus(property(emits_changed_signal = "false"))]
823            fn uncached_prop(&self) -> zbus::Result<bool>;
824
825            fn set_inner_to_true(&self) -> zbus::Result<()>;
826        }
827
828        let service = crate::connection::Builder::session()
829            .unwrap()
830            .serve_at(
831                "/org/freedesktop/zbus/UncachedPropertyTest",
832                ServiceUncachedPropertyTest(false),
833            )
834            .unwrap()
835            .build()
836            .await
837            .unwrap();
838
839        let dest = service.unique_name().unwrap();
840
841        let client_conn = crate::Connection::session().await.unwrap();
842        let client = UncachedPropertyTestProxy::builder(&client_conn)
843            .destination(dest)
844            .unwrap()
845            .build()
846            .await
847            .unwrap();
848
849        // Query properties; this populates the cache too.
850        assert!(!client.cached_prop().await.unwrap());
851        assert!(!client.uncached_prop().await.unwrap());
852
853        // Flip the inner value so we can observe the different semantics of
854        // the two properties.
855        client.set_inner_to_true().await.unwrap();
856
857        // Query properties again; the first one should incur a stale read from
858        // cache, while the second one should be able to read the live/updated
859        // value.
860        assert!(!client.cached_prop().await.unwrap());
861        assert!(client.uncached_prop().await.unwrap());
862
863        Ok(())
864    }
865
866    #[test]
867    #[timeout(15000)]
868    fn issue_260() {
869        // Low-level server example in the book doesn't work. The reason was that
870        // `Connection::request_name` implicitly created the associated `ObjectServer` to avoid
871        // #68. This meant that the `ObjectServer` ended up replying to the incoming method call
872        // with an error, before the service code could do so.
873        block_on(async {
874            let connection = Connection::session().await?;
875
876            connection.request_name("org.zbus.Issue260").await?;
877
878            futures_util::try_join!(
879                issue_260_service(&connection),
880                issue_260_client(&connection),
881            )?;
882
883            Ok::<(), zbus::Error>(())
884        })
885        .unwrap();
886    }
887
888    async fn issue_260_service(connection: &Connection) -> Result<()> {
889        use futures_util::stream::TryStreamExt;
890
891        let mut stream = zbus::MessageStream::from(connection);
892        while let Some(msg) = stream.try_next().await? {
893            let msg_header = msg.header();
894
895            match msg_header.message_type() {
896                zbus::message::Type::MethodCall => {
897                    connection.reply(&msg, &()).await?;
898
899                    break;
900                }
901                _ => continue,
902            }
903        }
904
905        Ok(())
906    }
907
908    async fn issue_260_client(connection: &Connection) -> Result<()> {
909        zbus::Proxy::new(
910            connection,
911            "org.zbus.Issue260",
912            "/org/zbus/Issue260",
913            "org.zbus.Issue260",
914        )
915        .await?
916        .call("Whatever", &())
917        .await?;
918        Ok(())
919    }
920
921    #[test(tokio::test(flavor = "multi_thread", worker_threads = 2))]
922    // Issue specific to tokio runtime.
923    #[cfg(all(unix, feature = "tokio", feature = "p2p"))]
924    #[instrument]
925    async fn issue_279() {
926        // On failure to read from the socket, we were closing the error channel from the sender
927        // side and since the underlying tokio API doesn't provide a `close` method on the sender,
928        // the async-channel abstraction was achieving this through calling `close` on receiver,
929        // which is behind an async mutex and we end up with a deadlock.
930        use crate::{connection::Builder, MessageStream};
931        use futures_util::{stream::TryStreamExt, try_join};
932        use tokio::net::UnixStream;
933
934        let guid = crate::Guid::generate();
935        let (p0, p1) = UnixStream::pair().unwrap();
936
937        let server = Builder::unix_stream(p0).server(guid).unwrap().p2p().build();
938        let client = Builder::unix_stream(p1).p2p().build();
939        let (client, server) = try_join!(client, server).unwrap();
940        let mut stream = MessageStream::from(client);
941        let next_msg_fut = stream.try_next();
942
943        drop(server);
944
945        assert!(matches!(next_msg_fut.await, Err(_)));
946    }
947
948    #[test(tokio::test(flavor = "multi_thread"))]
949    // Issue specific to tokio runtime.
950    #[cfg(all(unix, feature = "tokio"))]
951    #[instrument]
952    async fn issue_310() {
953        // The issue was we were deadlocking on fetching the new property value after invalidation.
954        // This turned out to be caused by us trying to grab a read lock on resource while holding
955        // a write lock. Thanks to connman for being weird and invalidating the property just before
956        // updating it, so this issue could be exposed.
957        use futures_util::StreamExt;
958        use zbus::connection::Builder;
959
960        struct Station(u64);
961
962        #[zbus::interface(name = "net.connman.iwd.Station")]
963        impl Station {
964            #[zbus(property)]
965            fn connected_network(&self) -> OwnedObjectPath {
966                format!("/net/connman/iwd/0/33/Network/{}", self.0)
967                    .try_into()
968                    .unwrap()
969            }
970        }
971
972        #[zbus::proxy(
973            interface = "net.connman.iwd.Station",
974            default_service = "net.connman.iwd"
975        )]
976        trait Station {
977            #[zbus(property)]
978            fn connected_network(&self) -> zbus::Result<OwnedObjectPath>;
979        }
980        let connection = Builder::session()
981            .unwrap()
982            .serve_at("/net/connman/iwd/0/33", Station(0))
983            .unwrap()
984            .name("net.connman.iwd")
985            .unwrap()
986            .build()
987            .await
988            .unwrap();
989        let event = Arc::new(event_listener::Event::new());
990        let conn_clone = connection.clone();
991        let event_clone = event.clone();
992        tokio::spawn(async move {
993            for _ in 0..10 {
994                let listener = event_clone.listen();
995                let iface_ref = conn_clone
996                    .object_server()
997                    .interface::<_, Station>("/net/connman/iwd/0/33")
998                    .await
999                    .unwrap();
1000
1001                {
1002                    let iface = iface_ref.get().await;
1003                    iface
1004                        .connected_network_invalidate(iface_ref.signal_context())
1005                        .await
1006                        .unwrap();
1007                    iface
1008                        .connected_network_changed(iface_ref.signal_context())
1009                        .await
1010                        .unwrap();
1011                }
1012                listener.await;
1013                iface_ref.get_mut().await.0 += 1;
1014            }
1015        });
1016
1017        let station = StationProxy::builder(&connection)
1018            .path("/net/connman/iwd/0/33")
1019            .unwrap()
1020            .build()
1021            .await
1022            .unwrap();
1023
1024        let mut changes = station.receive_connected_network_changed().await;
1025
1026        let mut last_received = 0;
1027        while last_received < 9 {
1028            let change = changes.next().await.unwrap();
1029            let path = change.get().await.unwrap();
1030            let received: u64 = path
1031                .split('/')
1032                .last()
1033                .unwrap()
1034                .parse()
1035                .expect("invalid path");
1036            assert!(received >= last_received);
1037            last_received = received;
1038            event.notify(1);
1039        }
1040    }
1041
1042    #[test]
1043    #[ignore]
1044    fn issue_466() {
1045        #[crate::proxy(interface = "org.Some.Thing1", assume_defaults = true)]
1046        trait MyGreeter {
1047            fn foo(
1048                &self,
1049                arg: &(u32, zbus::zvariant::Value<'_>),
1050            ) -> zbus::Result<(u32, zbus::zvariant::OwnedValue)>;
1051
1052            #[zbus(property)]
1053            fn bar(&self) -> zbus::Result<(u32, zbus::zvariant::OwnedValue)>;
1054        }
1055    }
1056
1057    #[instrument]
1058    #[test]
1059    fn concurrent_interface_methods() {
1060        // This is  test case for ensuring the regression of #799 doesn't come back.
1061        block_on(async {
1062            struct Iface(Event);
1063
1064            #[zbus::interface(name = "org.zbus.test.issue799")]
1065            impl Iface {
1066                async fn method1(&self) {
1067                    self.0.notify(1);
1068                    // Never return
1069                    std::future::pending::<()>().await;
1070                }
1071
1072                async fn method2(&self) {}
1073            }
1074
1075            let event = Event::new();
1076            let listener = event.listen();
1077            let iface = Iface(event);
1078            let conn = zbus::connection::Builder::session()
1079                .unwrap()
1080                .name("org.zbus.test.issue799")
1081                .unwrap()
1082                .serve_at("/org/zbus/test/issue799", iface)
1083                .unwrap()
1084                .build()
1085                .await
1086                .unwrap();
1087
1088            #[zbus::proxy(
1089                default_service = "org.zbus.test.issue799",
1090                default_path = "/org/zbus/test/issue799",
1091                interface = "org.zbus.test.issue799"
1092            )]
1093            trait Iface {
1094                async fn method1(&self) -> Result<()>;
1095                async fn method2(&self) -> Result<()>;
1096            }
1097
1098            let proxy = IfaceProxy::new(&conn).await.unwrap();
1099            let proxy_clone = proxy.clone();
1100            conn.executor()
1101                .spawn(
1102                    async move {
1103                        proxy_clone.method1().await.unwrap();
1104                    },
1105                    "method1",
1106                )
1107                .detach();
1108            // Wait till the `method1`` is called.
1109            listener.await;
1110
1111            // Now while the `method1` is in progress, a call to `method2` should just work.
1112            proxy.method2().await.unwrap();
1113        })
1114    }
1115
1116    #[cfg(all(unix, feature = "p2p"))]
1117    #[instrument]
1118    #[test]
1119    #[timeout(15000)]
1120    fn issue_813() {
1121        // Our server-side handshake code was unable to handle FDs being sent in the first messages
1122        // if the client sent them too quickly after sending `BEGIN` command.
1123        //
1124        // We test this by manually sending out the auth commands together with 2 method calls with
1125        // 1 FD each. Before a fix for this issue, the server handshake would fail with an
1126        // `Unexpected FDs during handshake` error.
1127        use crate::{conn::socket::WriteHalf, connection::Builder};
1128        use futures_util::try_join;
1129        use nix::unistd::Uid;
1130        #[cfg(not(feature = "tokio"))]
1131        use std::os::unix::net::UnixStream;
1132        use std::{os::fd::AsFd, vec};
1133        #[cfg(feature = "tokio")]
1134        use tokio::net::UnixStream;
1135        use zvariant::Fd;
1136
1137        #[derive(Debug)]
1138        struct Issue813Iface {
1139            event: event_listener::Event,
1140            call_count: u8,
1141        }
1142        #[crate::interface(interface = "org.zbus.Issue813")]
1143        impl Issue813Iface {
1144            #[instrument]
1145            fn pass_fd(&mut self, fd: Fd<'_>) {
1146                self.call_count += 1;
1147                debug!("`PassFd` called with {} {} times", fd, self.call_count);
1148                if self.call_count == 2 {
1149                    self.event.notify(1);
1150                }
1151            }
1152        }
1153        #[crate::proxy(
1154            gen_blocking = false,
1155            default_path = "/org/zbus/Issue813",
1156            interface = "org.zbus.Issue813"
1157        )]
1158        trait Issue813 {
1159            fn pass_fd(&self, fd: Fd<'_>) -> zbus::Result<()>;
1160        }
1161
1162        block_on(async move {
1163            let guid = crate::Guid::generate();
1164            let (p0, p1) = UnixStream::pair().unwrap();
1165
1166            let client_event = event_listener::Event::new();
1167            let client_listener = client_event.listen();
1168            let server_event = event_listener::Event::new();
1169            let server_listener = server_event.listen();
1170            let server = async move {
1171                let _conn = Builder::unix_stream(p0)
1172                    .server(guid)?
1173                    .p2p()
1174                    .serve_at(
1175                        "/org/zbus/Issue813",
1176                        Issue813Iface {
1177                            event: server_event,
1178                            call_count: 0,
1179                        },
1180                    )?
1181                    .name("org.zbus.Issue813")?
1182                    .build()
1183                    .await?;
1184                client_listener.await;
1185
1186                Result::<()>::Ok(())
1187            };
1188            let client = async move {
1189                let commands = format!(
1190                    "\0AUTH EXTERNAL {}\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n",
1191                    hex::encode(Uid::effective().to_string())
1192                );
1193                let mut bytes: Vec<u8> = commands.bytes().collect();
1194                let fd = std::io::stdin();
1195                let msg = crate::message::Message::method("/org/zbus/Issue813", "PassFd")?
1196                    .destination("org.zbus.Issue813")?
1197                    .interface("org.zbus.Issue813")?
1198                    .build(&(Fd::from(fd.as_fd())))?;
1199                let msg_data = msg.data();
1200                let mut fds = vec![];
1201                for _ in 0..2 {
1202                    bytes.extend_from_slice(&*msg_data);
1203                    fds.push(fd.as_fd());
1204                }
1205
1206                #[cfg(feature = "tokio")]
1207                let mut split = crate::conn::Socket::split(p1);
1208                #[cfg(not(feature = "tokio"))]
1209                let mut split = crate::conn::Socket::split(async_io::Async::new(p1)?);
1210                split.write_mut().sendmsg(&bytes, &fds).await?;
1211
1212                server_listener.await;
1213                client_event.notify(1);
1214
1215                Ok(())
1216            };
1217            let (_, _) = try_join!(client, server)?;
1218
1219            Result::<()>::Ok(())
1220        })
1221        .unwrap();
1222    }
1223}