zbus/proxy/
mod.rs

1//! The client-side proxy API.
2
3use enumflags2::{bitflags, BitFlags};
4use event_listener::{Event, EventListener};
5use futures_core::{ready, stream};
6use futures_util::{future::Either, stream::Map};
7use ordered_stream::{join as join_streams, FromFuture, Join, OrderedStream, PollResult};
8use static_assertions::assert_impl_all;
9use std::{
10    collections::{HashMap, HashSet},
11    fmt,
12    future::Future,
13    ops::Deref,
14    pin::Pin,
15    sync::{Arc, OnceLock, RwLock, RwLockReadGuard},
16    task::{Context, Poll},
17};
18use tracing::{debug, info_span, instrument, trace, Instrument};
19
20use zbus_names::{BusName, InterfaceName, MemberName, UniqueName};
21use zvariant::{ObjectPath, OwnedValue, Str, Value};
22
23use crate::{
24    fdo::{self, IntrospectableProxy, NameOwnerChanged, PropertiesChangedStream, PropertiesProxy},
25    message::{Flags, Message, Sequence, Type},
26    AsyncDrop, Connection, Error, Executor, MatchRule, MessageStream, OwnedMatchRule, Result, Task,
27};
28
29mod builder;
30pub use builder::{Builder, CacheProperties, ProxyDefault};
31
32/// A client-side interface proxy.
33///
34/// A `Proxy` is a helper to interact with an interface on a remote object.
35///
36/// # Example
37///
38/// ```
39/// use std::result::Result;
40/// use std::error::Error;
41/// use zbus::{Connection, Proxy};
42///
43/// #[tokio::main]
44/// async fn main() -> Result<(), Box<dyn Error>> {
45///     let connection = Connection::session().await?;
46///     let p = Proxy::new(
47///         &connection,
48///         "org.freedesktop.DBus",
49///         "/org/freedesktop/DBus",
50///         "org.freedesktop.DBus",
51///     ).await?;
52///     // owned return value
53///     let _id: String = p.call("GetId", &()).await?;
54///     // borrowed return value
55///     let body = p.call_method("GetId", &()).await?.body();
56///     let _id: &str = body.deserialize()?;
57///
58///     Ok(())
59/// }
60/// ```
61///
62/// # Note
63///
64/// It is recommended to use the [`proxy`] macro, which provides a more convenient and
65/// type-safe *façade* `Proxy` derived from a Rust trait.
66///
67/// [`futures` crate]: https://crates.io/crates/futures
68/// [`proxy`]: attr.proxy.html
69#[derive(Clone, Debug)]
70pub struct Proxy<'a> {
71    pub(crate) inner: Arc<ProxyInner<'a>>,
72}
73
74assert_impl_all!(Proxy<'_>: Send, Sync, Unpin);
75
76/// This is required to avoid having the Drop impl extend the lifetime 'a, which breaks zbus_xmlgen
77/// (and possibly other crates).
78pub(crate) struct ProxyInnerStatic {
79    pub(crate) conn: Connection,
80    dest_owner_change_match_rule: OnceLock<OwnedMatchRule>,
81}
82
83impl fmt::Debug for ProxyInnerStatic {
84    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85        f.debug_struct("ProxyInnerStatic")
86            .field(
87                "dest_owner_change_match_rule",
88                &self.dest_owner_change_match_rule,
89            )
90            .finish_non_exhaustive()
91    }
92}
93
94#[derive(Debug)]
95pub(crate) struct ProxyInner<'a> {
96    inner_without_borrows: ProxyInnerStatic,
97    pub(crate) destination: BusName<'a>,
98    pub(crate) path: ObjectPath<'a>,
99    pub(crate) interface: InterfaceName<'a>,
100
101    /// Cache of property values.
102    property_cache: Option<OnceLock<(Arc<PropertiesCache>, Task<()>)>>,
103    /// Set of properties which do not get cached, by name.
104    /// This overrides proxy-level caching behavior.
105    uncached_properties: HashSet<Str<'a>>,
106}
107
108impl Drop for ProxyInnerStatic {
109    fn drop(&mut self) {
110        if let Some(rule) = self.dest_owner_change_match_rule.take() {
111            self.conn.queue_remove_match(rule);
112        }
113    }
114}
115
116/// A property changed event.
117///
118/// The property changed event generated by [`PropertyStream`].
119pub struct PropertyChanged<'a, T> {
120    name: &'a str,
121    properties: Arc<PropertiesCache>,
122    proxy: Proxy<'a>,
123    phantom: std::marker::PhantomData<T>,
124}
125
126impl<'a, T> PropertyChanged<'a, T> {
127    // The name of the property that changed.
128    pub fn name(&self) -> &str {
129        self.name
130    }
131
132    // Get the raw value of the property that changed.
133    //
134    // If the notification signal contained the new value, it has been cached already and this call
135    // will return that value. Otherwise (i-e invalidated property), a D-Bus call is made to fetch
136    // and cache the new value.
137    pub async fn get_raw<'p>(&'p self) -> Result<impl Deref<Target = Value<'static>> + 'p> {
138        struct Wrapper<'w> {
139            name: &'w str,
140            values: RwLockReadGuard<'w, HashMap<String, PropertyValue>>,
141        }
142
143        impl<'w> Deref for Wrapper<'w> {
144            type Target = Value<'static>;
145
146            fn deref(&self) -> &Self::Target {
147                self.values
148                    .get(self.name)
149                    .expect("PropertyStream with no corresponding property")
150                    .value
151                    .as_ref()
152                    .expect("PropertyStream with no corresponding property")
153            }
154        }
155
156        {
157            let values = self.properties.values.read().expect("lock poisoned");
158            if values
159                .get(self.name)
160                .expect("PropertyStream with no corresponding property")
161                .value
162                .is_some()
163            {
164                return Ok(Wrapper {
165                    name: self.name,
166                    values,
167                });
168            }
169        }
170
171        // The property was invalidated, so we need to fetch the new value.
172        let properties_proxy = self.proxy.properties_proxy();
173        let value = properties_proxy
174            .get(self.proxy.inner.interface.clone(), self.name)
175            .await
176            .map_err(crate::Error::from)?;
177
178        // Save the new value
179        {
180            let mut values = self.properties.values.write().expect("lock poisoned");
181
182            values
183                .get_mut(self.name)
184                .expect("PropertyStream with no corresponding property")
185                .value = Some(value);
186        }
187
188        Ok(Wrapper {
189            name: self.name,
190            values: self.properties.values.read().expect("lock poisoned"),
191        })
192    }
193}
194
195impl<T> PropertyChanged<'_, T>
196where
197    T: TryFrom<zvariant::OwnedValue>,
198    T::Error: Into<crate::Error>,
199{
200    // Get the value of the property that changed.
201    //
202    // If the notification signal contained the new value, it has been cached already and this call
203    // will return that value. Otherwise (i-e invalidated property), a D-Bus call is made to fetch
204    // and cache the new value.
205    pub async fn get(&self) -> Result<T> {
206        self.get_raw()
207            .await
208            .and_then(|v| T::try_from(OwnedValue::try_from(&*v)?).map_err(Into::into))
209    }
210}
211
212/// A [`stream::Stream`] implementation that yields property change notifications.
213///
214/// Use [`Proxy::receive_property_changed`] to create an instance of this type.
215#[derive(Debug)]
216pub struct PropertyStream<'a, T> {
217    name: &'a str,
218    proxy: Proxy<'a>,
219    changed_listener: EventListener,
220    phantom: std::marker::PhantomData<T>,
221}
222
223impl<'a, T> stream::Stream for PropertyStream<'a, T>
224where
225    T: Unpin,
226{
227    type Item = PropertyChanged<'a, T>;
228
229    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
230        let m = self.get_mut();
231        let properties = match m.proxy.get_property_cache() {
232            Some(properties) => properties.clone(),
233            // With no cache, we will get no updates; return immediately
234            None => return Poll::Ready(None),
235        };
236        ready!(Pin::new(&mut m.changed_listener).poll(cx));
237
238        m.changed_listener = properties
239            .values
240            .read()
241            .expect("lock poisoned")
242            .get(m.name)
243            .expect("PropertyStream with no corresponding property")
244            .event
245            .listen();
246
247        Poll::Ready(Some(PropertyChanged {
248            name: m.name,
249            properties,
250            proxy: m.proxy.clone(),
251            phantom: std::marker::PhantomData,
252        }))
253    }
254}
255
256#[derive(Debug)]
257pub(crate) struct PropertiesCache {
258    values: RwLock<HashMap<String, PropertyValue>>,
259    caching_result: RwLock<CachingResult>,
260}
261
262#[derive(Debug)]
263enum CachingResult {
264    Caching { ready: Event },
265    Cached { result: Result<()> },
266}
267
268impl PropertiesCache {
269    #[instrument(skip_all)]
270    fn new(
271        proxy: PropertiesProxy<'static>,
272        interface: InterfaceName<'static>,
273        executor: &Executor<'_>,
274        uncached_properties: HashSet<zvariant::Str<'static>>,
275    ) -> (Arc<Self>, Task<()>) {
276        let cache = Arc::new(PropertiesCache {
277            values: Default::default(),
278            caching_result: RwLock::new(CachingResult::Caching {
279                ready: Event::new(),
280            }),
281        });
282
283        let cache_clone = cache.clone();
284        let task_name = format!("{interface} proxy caching");
285        let proxy_caching = async move {
286            let result = cache_clone
287                .init(proxy, interface, uncached_properties)
288                .await;
289            let (prop_changes, interface, uncached_properties) = {
290                let mut caching_result = cache_clone.caching_result.write().expect("lock poisoned");
291                let ready = match &*caching_result {
292                    CachingResult::Caching { ready } => ready,
293                    // SAFETY: This is the only part of the code that changes this state and it's
294                    // only run once.
295                    _ => unreachable!(),
296                };
297                match result {
298                    Ok((prop_changes, interface, uncached_properties)) => {
299                        ready.notify(usize::MAX);
300                        *caching_result = CachingResult::Cached { result: Ok(()) };
301
302                        (prop_changes, interface, uncached_properties)
303                    }
304                    Err(e) => {
305                        ready.notify(usize::MAX);
306                        *caching_result = CachingResult::Cached { result: Err(e) };
307
308                        return;
309                    }
310                }
311            };
312
313            if let Err(e) = cache_clone
314                .keep_updated(prop_changes, interface, uncached_properties)
315                .await
316            {
317                debug!("Error keeping properties cache updated: {e}");
318            }
319        }
320        .instrument(info_span!("{}", task_name));
321        let task = executor.spawn(proxy_caching, &task_name);
322
323        (cache, task)
324    }
325
326    // new() runs this in a task it spawns for initialization of properties cache.
327    async fn init(
328        &self,
329        proxy: PropertiesProxy<'static>,
330        interface: InterfaceName<'static>,
331        uncached_properties: HashSet<zvariant::Str<'static>>,
332    ) -> Result<(
333        PropertiesChangedStream<'static>,
334        InterfaceName<'static>,
335        HashSet<zvariant::Str<'static>>,
336    )> {
337        use ordered_stream::OrderedStreamExt;
338
339        let prop_changes = proxy.receive_properties_changed().await?.map(Either::Left);
340
341        let get_all = proxy
342            .inner()
343            .connection()
344            .call_method_raw(
345                Some(proxy.inner().destination()),
346                proxy.inner().path(),
347                Some(proxy.inner().interface()),
348                "GetAll",
349                BitFlags::empty(),
350                &interface,
351            )
352            .await
353            .map(|r| FromFuture::from(r.expect("no reply")).map(Either::Right))?;
354
355        let mut join = join_streams(prop_changes, get_all);
356
357        loop {
358            match join.next().await {
359                Some(Either::Left(_update)) => {
360                    // discard updates prior to the initial population
361                }
362                Some(Either::Right(populate)) => {
363                    populate?.body().deserialize().map(|values| {
364                        self.update_cache(&uncached_properties, &values, Vec::new(), &interface);
365                    })?;
366                    break;
367                }
368                None => break,
369            }
370        }
371        if let Some((Either::Left(update), _)) = Pin::new(&mut join).take_buffered() {
372            // if an update was buffered, then it happened after the get_all returned and needs to
373            // be applied before we discard the join
374            if let Ok(args) = update.args() {
375                if args.interface_name == interface {
376                    self.update_cache(
377                        &uncached_properties,
378                        &args.changed_properties,
379                        args.invalidated_properties,
380                        &interface,
381                    );
382                }
383            }
384        }
385        // This is needed to avoid a "implementation of `OrderedStream` is not general enough"
386        // error that occurs if you apply the map and join to Pin::new(&mut prop_changes) instead
387        // of directly to the stream.
388        let prop_changes = join.into_inner().0.into_inner();
389
390        Ok((prop_changes, interface, uncached_properties))
391    }
392
393    // new() runs this in a task it spawns for keeping the cache in sync.
394    #[instrument(skip_all)]
395    async fn keep_updated(
396        &self,
397        mut prop_changes: PropertiesChangedStream<'static>,
398        interface: InterfaceName<'static>,
399        uncached_properties: HashSet<zvariant::Str<'static>>,
400    ) -> Result<()> {
401        use futures_util::StreamExt;
402
403        trace!("Listening for property changes on {interface}...");
404        while let Some(update) = prop_changes.next().await {
405            if let Ok(args) = update.args() {
406                if args.interface_name == interface {
407                    self.update_cache(
408                        &uncached_properties,
409                        &args.changed_properties,
410                        args.invalidated_properties,
411                        &interface,
412                    );
413                }
414            }
415        }
416
417        Ok(())
418    }
419
420    fn update_cache(
421        &self,
422        uncached_properties: &HashSet<Str<'_>>,
423        changed: &HashMap<&str, Value<'_>>,
424        invalidated: Vec<&str>,
425        interface: &InterfaceName<'_>,
426    ) {
427        let mut values = self.values.write().expect("lock poisoned");
428
429        for inval in invalidated {
430            if uncached_properties.contains(&Str::from(inval)) {
431                debug!(
432                    "Ignoring invalidation of uncached property `{}.{}`",
433                    interface, inval
434                );
435                continue;
436            }
437            trace!("Property `{interface}.{inval}` invalidated");
438
439            if let Some(entry) = values.get_mut(inval) {
440                entry.value = None;
441                entry.event.notify(usize::MAX);
442            }
443        }
444
445        for (property_name, value) in changed {
446            if uncached_properties.contains(&Str::from(*property_name)) {
447                debug!(
448                    "Ignoring update of uncached property `{}.{}`",
449                    interface, property_name
450                );
451                continue;
452            }
453            trace!("Property `{interface}.{property_name}` updated");
454
455            let entry = values.entry(property_name.to_string()).or_default();
456
457            let value = match OwnedValue::try_from(value) {
458                Ok(value) => value,
459                Err(e) => {
460                    debug!(
461                        "Failed to convert property `{interface}.{property_name}` to OwnedValue: {e}"
462                    );
463                    continue;
464                }
465            };
466            entry.value = Some(value);
467            entry.event.notify(usize::MAX);
468        }
469    }
470
471    /// Wait for the cache to be populated and return any error encountered during population
472    pub(crate) async fn ready(&self) -> Result<()> {
473        let listener = match &*self.caching_result.read().expect("lock poisoned") {
474            CachingResult::Caching { ready } => ready.listen(),
475            CachingResult::Cached { result } => return result.clone(),
476        };
477        listener.await;
478
479        // It must be ready now.
480        match &*self.caching_result.read().expect("lock poisoned") {
481            // SAFETY: We were just notified that state has changed to `Cached` and we never go back
482            // to `Caching` once in `Cached`.
483            CachingResult::Caching { .. } => unreachable!(),
484            CachingResult::Cached { result } => result.clone(),
485        }
486    }
487}
488
489impl<'a> ProxyInner<'a> {
490    pub(crate) fn new(
491        conn: Connection,
492        destination: BusName<'a>,
493        path: ObjectPath<'a>,
494        interface: InterfaceName<'a>,
495        cache: CacheProperties,
496        uncached_properties: HashSet<Str<'a>>,
497    ) -> Self {
498        let property_cache = match cache {
499            CacheProperties::Yes | CacheProperties::Lazily => Some(OnceLock::new()),
500            CacheProperties::No => None,
501        };
502        Self {
503            inner_without_borrows: ProxyInnerStatic {
504                conn,
505                dest_owner_change_match_rule: OnceLock::new(),
506            },
507            destination,
508            path,
509            interface,
510            property_cache,
511            uncached_properties,
512        }
513    }
514
515    /// Subscribe to the "NameOwnerChanged" signal on the bus for our destination.
516    ///
517    /// If the destination is a unique name, we will not subscribe to the signal.
518    pub(crate) async fn subscribe_dest_owner_change(&self) -> Result<()> {
519        if !self.inner_without_borrows.conn.is_bus() {
520            // Names don't mean much outside the bus context.
521            return Ok(());
522        }
523
524        let well_known_name = match &self.destination {
525            BusName::WellKnown(well_known_name) => well_known_name,
526            BusName::Unique(_) => return Ok(()),
527        };
528
529        if self
530            .inner_without_borrows
531            .dest_owner_change_match_rule
532            .get()
533            .is_some()
534        {
535            // Already watching over the bus for any name updates so nothing to do here.
536            return Ok(());
537        }
538
539        let conn = &self.inner_without_borrows.conn;
540        let signal_rule: OwnedMatchRule = MatchRule::builder()
541            .msg_type(Type::Signal)
542            .sender("org.freedesktop.DBus")?
543            .path("/org/freedesktop/DBus")?
544            .interface("org.freedesktop.DBus")?
545            .member("NameOwnerChanged")?
546            .add_arg(well_known_name.as_str())?
547            .build()
548            .to_owned()
549            .into();
550
551        conn.add_match(
552            signal_rule.clone(),
553            Some(MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED),
554        )
555        .await?;
556
557        if self
558            .inner_without_borrows
559            .dest_owner_change_match_rule
560            .set(signal_rule.clone())
561            .is_err()
562        {
563            // we raced another destination_unique_name call and added it twice
564            conn.remove_match(signal_rule).await?;
565        }
566
567        Ok(())
568    }
569}
570
571const MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED: usize = 8;
572
573impl<'a> Proxy<'a> {
574    /// Create a new `Proxy` for the given destination/path/interface.
575    pub async fn new<D, P, I>(
576        conn: &Connection,
577        destination: D,
578        path: P,
579        interface: I,
580    ) -> Result<Proxy<'a>>
581    where
582        D: TryInto<BusName<'a>>,
583        P: TryInto<ObjectPath<'a>>,
584        I: TryInto<InterfaceName<'a>>,
585        D::Error: Into<Error>,
586        P::Error: Into<Error>,
587        I::Error: Into<Error>,
588    {
589        Builder::new(conn)
590            .destination(destination)?
591            .path(path)?
592            .interface(interface)?
593            .build()
594            .await
595    }
596
597    /// Create a new `Proxy` for the given destination/path/interface, taking ownership of all
598    /// passed arguments.
599    pub async fn new_owned<D, P, I>(
600        conn: Connection,
601        destination: D,
602        path: P,
603        interface: I,
604    ) -> Result<Proxy<'a>>
605    where
606        D: TryInto<BusName<'static>>,
607        P: TryInto<ObjectPath<'static>>,
608        I: TryInto<InterfaceName<'static>>,
609        D::Error: Into<Error>,
610        P::Error: Into<Error>,
611        I::Error: Into<Error>,
612    {
613        Builder::new(&conn)
614            .destination(destination)?
615            .path(path)?
616            .interface(interface)?
617            .build()
618            .await
619    }
620
621    /// Get a reference to the associated connection.
622    pub fn connection(&self) -> &Connection {
623        &self.inner.inner_without_borrows.conn
624    }
625
626    /// Get a reference to the destination service name.
627    pub fn destination(&self) -> &BusName<'_> {
628        &self.inner.destination
629    }
630
631    /// Get a reference to the object path.
632    pub fn path(&self) -> &ObjectPath<'_> {
633        &self.inner.path
634    }
635
636    /// Get a reference to the interface.
637    pub fn interface(&self) -> &InterfaceName<'_> {
638        &self.inner.interface
639    }
640
641    /// Introspect the associated object, and return the XML description.
642    ///
643    /// See the [xml](xml/index.html) module for parsing the
644    /// result.
645    pub async fn introspect(&self) -> fdo::Result<String> {
646        let proxy = IntrospectableProxy::builder(&self.inner.inner_without_borrows.conn)
647            .destination(&self.inner.destination)?
648            .path(&self.inner.path)?
649            .build()
650            .await?;
651
652        proxy.introspect().await
653    }
654
655    fn properties_proxy(&self) -> PropertiesProxy<'_> {
656        PropertiesProxy::builder(&self.inner.inner_without_borrows.conn)
657            // Safe because already checked earlier
658            .destination(self.inner.destination.as_ref())
659            .unwrap()
660            // Safe because already checked earlier
661            .path(self.inner.path.as_ref())
662            .unwrap()
663            // does not have properties
664            .cache_properties(CacheProperties::No)
665            .build_internal()
666            .unwrap()
667            .into()
668    }
669
670    fn owned_properties_proxy(&self) -> PropertiesProxy<'static> {
671        PropertiesProxy::builder(&self.inner.inner_without_borrows.conn)
672            // Safe because already checked earlier
673            .destination(self.inner.destination.to_owned())
674            .unwrap()
675            // Safe because already checked earlier
676            .path(self.inner.path.to_owned())
677            .unwrap()
678            // does not have properties
679            .cache_properties(CacheProperties::No)
680            .build_internal()
681            .unwrap()
682            .into()
683    }
684
685    /// Get the cache, starting it in the background if needed.
686    ///
687    /// Use PropertiesCache::ready() to wait for the cache to be populated and to get any errors
688    /// encountered in the population.
689    pub(crate) fn get_property_cache(&self) -> Option<&Arc<PropertiesCache>> {
690        let cache = match &self.inner.property_cache {
691            Some(cache) => cache,
692            None => return None,
693        };
694        let (cache, _) = &cache.get_or_init(|| {
695            let proxy = self.owned_properties_proxy();
696            let interface = self.interface().to_owned();
697            let uncached_properties: HashSet<zvariant::Str<'static>> = self
698                .inner
699                .uncached_properties
700                .iter()
701                .map(|s| s.to_owned())
702                .collect();
703            let executor = self.connection().executor();
704
705            PropertiesCache::new(proxy, interface, executor, uncached_properties)
706        });
707
708        Some(cache)
709    }
710
711    /// Get the cached value of the property `property_name`.
712    ///
713    /// This returns `None` if the property is not in the cache.  This could be because the cache
714    /// was invalidated by an update, because caching was disabled for this property or proxy, or
715    /// because the cache has not yet been populated.  Use `get_property` to fetch the value from
716    /// the peer.
717    pub fn cached_property<T>(&self, property_name: &str) -> Result<Option<T>>
718    where
719        T: TryFrom<OwnedValue>,
720        T::Error: Into<Error>,
721    {
722        self.cached_property_raw(property_name)
723            .as_deref()
724            .map(|v| T::try_from(OwnedValue::try_from(v)?).map_err(Into::into))
725            .transpose()
726    }
727
728    /// Get the cached value of the property `property_name`.
729    ///
730    /// Same as `cached_property`, but gives you access to the raw value stored in the cache. This
731    /// is useful if you want to avoid allocations and cloning.
732    pub fn cached_property_raw<'p>(
733        &'p self,
734        property_name: &'p str,
735    ) -> Option<impl Deref<Target = Value<'static>> + 'p> {
736        if let Some(values) = self
737            .inner
738            .property_cache
739            .as_ref()
740            .and_then(OnceLock::get)
741            .map(|c| c.0.values.read().expect("lock poisoned"))
742        {
743            // ensure that the property is in the cache.
744            values
745                .get(property_name)
746                // if the property value has not yet been cached, this will return None.
747                .and_then(|e| e.value.as_ref())?;
748
749            struct Wrapper<'a> {
750                values: RwLockReadGuard<'a, HashMap<String, PropertyValue>>,
751                property_name: &'a str,
752            }
753
754            impl Deref for Wrapper<'_> {
755                type Target = Value<'static>;
756
757                fn deref(&self) -> &Self::Target {
758                    self.values
759                        .get(self.property_name)
760                        .and_then(|e| e.value.as_ref())
761                        .map(|v| v.deref())
762                        .expect("inexistent property")
763                }
764            }
765
766            Some(Wrapper {
767                values,
768                property_name,
769            })
770        } else {
771            None
772        }
773    }
774
775    async fn get_proxy_property(&self, property_name: &str) -> Result<OwnedValue> {
776        Ok(self
777            .properties_proxy()
778            .get(self.inner.interface.as_ref(), property_name)
779            .await?)
780    }
781
782    /// Get the property `property_name`.
783    ///
784    /// Get the property value from the cache (if caching is enabled) or call the
785    /// `Get` method of the `org.freedesktop.DBus.Properties` interface.
786    pub async fn get_property<T>(&self, property_name: &str) -> Result<T>
787    where
788        T: TryFrom<OwnedValue>,
789        T::Error: Into<Error>,
790    {
791        if let Some(cache) = self.get_property_cache() {
792            cache.ready().await?;
793        }
794        if let Some(value) = self.cached_property(property_name)? {
795            return Ok(value);
796        }
797
798        let value = self.get_proxy_property(property_name).await?;
799        value.try_into().map_err(Into::into)
800    }
801
802    /// Set the property `property_name`.
803    ///
804    /// Effectively, call the `Set` method of the `org.freedesktop.DBus.Properties` interface.
805    pub async fn set_property<'t, T>(&self, property_name: &str, value: T) -> fdo::Result<()>
806    where
807        T: 't + Into<Value<'t>>,
808    {
809        self.properties_proxy()
810            .set(self.inner.interface.as_ref(), property_name, &value.into())
811            .await
812    }
813
814    /// Call a method and return the reply.
815    ///
816    /// Typically, you would want to use [`call`] method instead. Use this method if you need to
817    /// deserialize the reply message manually (this way, you can avoid the memory
818    /// allocation/copying, by deserializing the reply to an unowned type).
819    ///
820    /// [`call`]: struct.Proxy.html#method.call
821    pub async fn call_method<'m, M, B>(&self, method_name: M, body: &B) -> Result<Message>
822    where
823        M: TryInto<MemberName<'m>>,
824        M::Error: Into<Error>,
825        B: serde::ser::Serialize + zvariant::DynamicType,
826    {
827        self.inner
828            .inner_without_borrows
829            .conn
830            .call_method(
831                Some(&self.inner.destination),
832                self.inner.path.as_str(),
833                Some(&self.inner.interface),
834                method_name,
835                body,
836            )
837            .await
838    }
839
840    /// Call a method and return the reply body.
841    ///
842    /// Use [`call_method`] instead if you need to deserialize the reply manually/separately.
843    ///
844    /// [`call_method`]: struct.Proxy.html#method.call_method
845    pub async fn call<'m, M, B, R>(&self, method_name: M, body: &B) -> Result<R>
846    where
847        M: TryInto<MemberName<'m>>,
848        M::Error: Into<Error>,
849        B: serde::ser::Serialize + zvariant::DynamicType,
850        R: for<'d> zvariant::DynamicDeserialize<'d>,
851    {
852        let reply = self.call_method(method_name, body).await?;
853
854        reply.body().deserialize()
855    }
856
857    /// Call a method and return the reply body, optionally supplying a set of
858    /// method flags to control the way the method call message is sent and handled.
859    ///
860    /// Use [`call`] instead if you do not need any special handling via additional flags.
861    /// If the `NoReplyExpected` flag is passed , this will return None immediately
862    /// after sending the message, similar to [`call_noreply`]
863    ///
864    /// [`call`]: struct.Proxy.html#method.call
865    /// [`call_noreply`]: struct.Proxy.html#method.call_noreply
866    pub async fn call_with_flags<'m, M, B, R>(
867        &self,
868        method_name: M,
869        flags: BitFlags<MethodFlags>,
870        body: &B,
871    ) -> Result<Option<R>>
872    where
873        M: TryInto<MemberName<'m>>,
874        M::Error: Into<Error>,
875        B: serde::ser::Serialize + zvariant::DynamicType,
876        R: for<'d> zvariant::DynamicDeserialize<'d>,
877    {
878        let flags = flags.iter().map(Flags::from).collect::<BitFlags<_>>();
879        match self
880            .inner
881            .inner_without_borrows
882            .conn
883            .call_method_raw(
884                Some(self.destination()),
885                self.path(),
886                Some(self.interface()),
887                method_name,
888                flags,
889                body,
890            )
891            .await?
892        {
893            Some(reply) => reply.await?.body().deserialize().map(Some),
894            None => Ok(None),
895        }
896    }
897
898    /// Call a method without expecting a reply
899    ///
900    /// This sets the `NoReplyExpected` flag on the calling message and does not wait for a reply.
901    pub async fn call_noreply<'m, M, B>(&self, method_name: M, body: &B) -> Result<()>
902    where
903        M: TryInto<MemberName<'m>>,
904        M::Error: Into<Error>,
905        B: serde::ser::Serialize + zvariant::DynamicType,
906    {
907        self.call_with_flags::<_, _, ()>(method_name, MethodFlags::NoReplyExpected.into(), body)
908            .await?;
909        Ok(())
910    }
911
912    /// Create a stream for signal named `signal_name`.
913    pub async fn receive_signal<'m, M>(&self, signal_name: M) -> Result<SignalStream<'m>>
914    where
915        M: TryInto<MemberName<'m>>,
916        M::Error: Into<Error>,
917    {
918        self.receive_signal_with_args(signal_name, &[]).await
919    }
920
921    /// Same as [`Proxy::receive_signal`] but with a filter.
922    ///
923    /// The D-Bus specification allows you to filter signals by their arguments, which helps avoid
924    /// a lot of unnecessary traffic and processing since the filter is run on the server side. Use
925    /// this method where possible. Note that this filtering is limited to arguments of string
926    /// types.
927    ///
928    /// The arguments are passed as a tuples of argument index and expected value.
929    pub async fn receive_signal_with_args<'m, M>(
930        &self,
931        signal_name: M,
932        args: &[(u8, &str)],
933    ) -> Result<SignalStream<'m>>
934    where
935        M: TryInto<MemberName<'m>>,
936        M::Error: Into<Error>,
937    {
938        let signal_name = signal_name.try_into().map_err(Into::into)?;
939        self.receive_signals(Some(signal_name), args).await
940    }
941
942    async fn receive_signals<'m>(
943        &self,
944        signal_name: Option<MemberName<'m>>,
945        args: &[(u8, &str)],
946    ) -> Result<SignalStream<'m>> {
947        self.inner.subscribe_dest_owner_change().await?;
948
949        SignalStream::new(self.clone(), signal_name, args).await
950    }
951
952    /// Create a stream for all signals emitted by this service.
953    pub async fn receive_all_signals(&self) -> Result<SignalStream<'static>> {
954        self.receive_signals(None, &[]).await
955    }
956
957    /// Get a stream to receive property changed events.
958    ///
959    /// Note that zbus doesn't queue the updates. If the listener is slower than the receiver, it
960    /// will only receive the last update.
961    ///
962    /// If caching is not enabled on this proxy, the resulting stream will not return any events.
963    pub async fn receive_property_changed<'name: 'a, T>(
964        &self,
965        name: &'name str,
966    ) -> PropertyStream<'a, T> {
967        let properties = self.get_property_cache();
968        let changed_listener = if let Some(properties) = &properties {
969            let mut values = properties.values.write().expect("lock poisoned");
970            let entry = values
971                .entry(name.to_string())
972                .or_insert_with(PropertyValue::default);
973            entry.event.listen()
974        } else {
975            Event::new().listen()
976        };
977
978        PropertyStream {
979            name,
980            proxy: self.clone(),
981            changed_listener,
982            phantom: std::marker::PhantomData,
983        }
984    }
985
986    /// Get a stream to receive destination owner changed events.
987    ///
988    /// If the proxy destination is a unique name, the stream will be notified of the peer
989    /// disconnection from the bus (with a `None` value).
990    ///
991    /// If the proxy destination is a well-known name, the stream will be notified whenever the name
992    /// owner is changed, either by a new peer being granted ownership (`Some` value) or when the
993    /// name is released (with a `None` value).
994    ///
995    /// Note that zbus doesn't queue the updates. If the listener is slower than the receiver, it
996    /// will only receive the last update.
997    pub async fn receive_owner_changed(&self) -> Result<OwnerChangedStream<'_>> {
998        use futures_util::StreamExt;
999        let dbus_proxy = fdo::DBusProxy::builder(self.connection())
1000            .cache_properties(CacheProperties::No)
1001            .build()
1002            .await?;
1003        Ok(OwnerChangedStream {
1004            stream: dbus_proxy
1005                .receive_name_owner_changed_with_args(&[(0, self.destination().as_str())])
1006                .await?
1007                .map(Box::new(move |signal| {
1008                    let args = signal.args().unwrap();
1009                    let new_owner = args.new_owner().as_ref().map(|owner| owner.to_owned());
1010
1011                    new_owner
1012                })),
1013            name: self.destination().clone(),
1014        })
1015    }
1016}
1017
1018#[derive(Debug, Default)]
1019struct PropertyValue {
1020    value: Option<OwnedValue>,
1021    event: Event,
1022}
1023
1024/// Flags to use with [`Proxy::call_with_flags`].
1025#[bitflags]
1026#[repr(u8)]
1027#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1028pub enum MethodFlags {
1029    /// No response is expected from this method call, regardless of whether the
1030    /// signature for the interface method indicates a reply type. When passed,
1031    /// `call_with_flags` will return `Ok(None)` immediately after successfully
1032    /// sending the method call.
1033    ///
1034    /// Errors encountered while *making* the call will still be returned as
1035    /// an `Err` variant, but any errors that are triggered by the receiver's
1036    /// handling of the call will not be delivered.
1037    NoReplyExpected = 0x1,
1038
1039    /// When set on a call whose destination is a message bus, this flag will instruct
1040    /// the bus not to [launch][al] a service to handle the call if no application
1041    /// on the bus owns the requested name.
1042    ///
1043    /// This flag is ignored when using a peer-to-peer connection.
1044    ///
1045    /// [al]: https://dbus.freedesktop.org/doc/dbus-specification.html#message-bus-starting-services
1046    NoAutoStart = 0x2,
1047
1048    /// Indicates to the receiver that this client is prepared to wait for interactive
1049    /// authorization, which might take a considerable time to complete. For example, the receiver
1050    /// may query the user for confirmation via [polkit] or a similar framework.
1051    ///
1052    /// [polkit]: https://gitlab.freedesktop.org/polkit/polkit/
1053    AllowInteractiveAuth = 0x4,
1054}
1055
1056assert_impl_all!(MethodFlags: Send, Sync, Unpin);
1057
1058impl From<MethodFlags> for Flags {
1059    fn from(method_flag: MethodFlags) -> Self {
1060        match method_flag {
1061            MethodFlags::NoReplyExpected => Self::NoReplyExpected,
1062            MethodFlags::NoAutoStart => Self::NoAutoStart,
1063            MethodFlags::AllowInteractiveAuth => Self::AllowInteractiveAuth,
1064        }
1065    }
1066}
1067
1068type OwnerChangedStreamMap<'a> = Map<
1069    fdo::NameOwnerChangedStream<'a>,
1070    Box<dyn FnMut(fdo::NameOwnerChanged) -> Option<UniqueName<'static>> + Send + Sync + Unpin>,
1071>;
1072
1073/// A [`stream::Stream`] implementation that yields `UniqueName` when the bus owner changes.
1074///
1075/// Use [`Proxy::receive_owner_changed`] to create an instance of this type.
1076pub struct OwnerChangedStream<'a> {
1077    stream: OwnerChangedStreamMap<'a>,
1078    name: BusName<'a>,
1079}
1080
1081assert_impl_all!(OwnerChangedStream<'_>: Send, Sync, Unpin);
1082
1083impl OwnerChangedStream<'_> {
1084    /// The bus name being tracked.
1085    pub fn name(&self) -> &BusName<'_> {
1086        &self.name
1087    }
1088}
1089
1090impl<'a> stream::Stream for OwnerChangedStream<'a> {
1091    type Item = Option<UniqueName<'static>>;
1092
1093    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1094        use futures_util::StreamExt;
1095        self.get_mut().stream.poll_next_unpin(cx)
1096    }
1097}
1098
1099/// A [`stream::Stream`] implementation that yields signal [messages](`Message`).
1100///
1101/// Use [`Proxy::receive_signal`] to create an instance of this type.
1102///
1103/// This type uses a [`MessageStream::for_match_rule`] internally and therefore the note about match
1104/// rule registration and [`AsyncDrop`] in its documentation applies here as well.
1105#[derive(Debug)]
1106pub struct SignalStream<'a> {
1107    stream: Join<MessageStream, Option<MessageStream>>,
1108    src_unique_name: Option<UniqueName<'static>>,
1109    signal_name: Option<MemberName<'a>>,
1110}
1111
1112impl<'a> SignalStream<'a> {
1113    /// The signal name.
1114    pub fn name(&self) -> Option<&MemberName<'a>> {
1115        self.signal_name.as_ref()
1116    }
1117
1118    async fn new(
1119        proxy: Proxy<'_>,
1120        signal_name: Option<MemberName<'a>>,
1121        args: &[(u8, &str)],
1122    ) -> Result<SignalStream<'a>> {
1123        let mut rule_builder = MatchRule::builder()
1124            .msg_type(Type::Signal)
1125            .sender(proxy.destination())?
1126            .path(proxy.path())?
1127            .interface(proxy.interface())?;
1128        if let Some(name) = &signal_name {
1129            rule_builder = rule_builder.member(name)?;
1130        }
1131        for (i, arg) in args {
1132            rule_builder = rule_builder.arg(*i, *arg)?;
1133        }
1134        let signal_rule: OwnedMatchRule = rule_builder.build().to_owned().into();
1135        let conn = proxy.connection();
1136
1137        let (src_unique_name, stream) = match proxy.destination().to_owned() {
1138            BusName::Unique(name) => (
1139                Some(name),
1140                join_streams(
1141                    MessageStream::for_match_rule(signal_rule, conn, None).await?,
1142                    None,
1143                ),
1144            ),
1145            BusName::WellKnown(name) => {
1146                use ordered_stream::OrderedStreamExt;
1147
1148                let name_owner_changed_rule = MatchRule::builder()
1149                    .msg_type(Type::Signal)
1150                    .sender("org.freedesktop.DBus")?
1151                    .path("/org/freedesktop/DBus")?
1152                    .interface("org.freedesktop.DBus")?
1153                    .member("NameOwnerChanged")?
1154                    .add_arg(name.as_str())?
1155                    .build();
1156                let name_owner_changed_stream = MessageStream::for_match_rule(
1157                    name_owner_changed_rule,
1158                    conn,
1159                    Some(MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED),
1160                )
1161                .await?
1162                .map(Either::Left);
1163
1164                let get_name_owner = conn
1165                    .call_method_raw(
1166                        Some("org.freedesktop.DBus"),
1167                        "/org/freedesktop/DBus",
1168                        Some("org.freedesktop.DBus"),
1169                        "GetNameOwner",
1170                        BitFlags::empty(),
1171                        &name,
1172                    )
1173                    .await
1174                    .map(|r| FromFuture::from(r.expect("no reply")).map(Either::Right))?;
1175
1176                let mut join = join_streams(name_owner_changed_stream, get_name_owner);
1177
1178                let mut src_unique_name = loop {
1179                    match join.next().await {
1180                        Some(Either::Left(Ok(msg))) => {
1181                            let signal = NameOwnerChanged::from_message(msg)
1182                                .expect("`NameOwnerChanged` signal stream got wrong message");
1183                            {
1184                                break signal
1185                                    .args()
1186                                    // SAFETY: The filtering code couldn't have let this through if
1187                                    // args were not in order.
1188                                    .expect("`NameOwnerChanged` signal has no args")
1189                                    .new_owner()
1190                                    .as_ref()
1191                                    .map(UniqueName::to_owned);
1192                            }
1193                        }
1194                        Some(Either::Left(Err(_))) => (),
1195                        Some(Either::Right(Ok(response))) => {
1196                            break Some(response.body().deserialize::<UniqueName<'_>>()?.to_owned())
1197                        }
1198                        Some(Either::Right(Err(e))) => {
1199                            // Probably the name is not owned. Not a problem but let's still log it.
1200                            debug!("Failed to get owner of {name}: {e}");
1201
1202                            break None;
1203                        }
1204                        None => {
1205                            return Err(Error::InputOutput(
1206                                std::io::Error::new(
1207                                    std::io::ErrorKind::BrokenPipe,
1208                                    "connection closed",
1209                                )
1210                                .into(),
1211                            ))
1212                        }
1213                    }
1214                };
1215
1216                // Let's take into account any buffered NameOwnerChanged signal.
1217                let (stream, _, queued) = join.into_inner();
1218                if let Some(msg) = queued.and_then(|e| match e.0 {
1219                    Either::Left(Ok(msg)) => Some(msg),
1220                    Either::Left(Err(_)) | Either::Right(_) => None,
1221                }) {
1222                    if let Some(signal) = NameOwnerChanged::from_message(msg) {
1223                        if let Ok(args) = signal.args() {
1224                            match (args.name(), args.new_owner().deref()) {
1225                                (BusName::WellKnown(n), Some(new_owner)) if n == &name => {
1226                                    src_unique_name = Some(new_owner.to_owned());
1227                                }
1228                                _ => (),
1229                            }
1230                        }
1231                    }
1232                }
1233                let name_owner_changed_stream = stream.into_inner();
1234
1235                let stream = join_streams(
1236                    MessageStream::for_match_rule(signal_rule, conn, None).await?,
1237                    Some(name_owner_changed_stream),
1238                );
1239
1240                (src_unique_name, stream)
1241            }
1242        };
1243
1244        Ok(SignalStream {
1245            stream,
1246            src_unique_name,
1247            signal_name,
1248        })
1249    }
1250
1251    fn filter(&mut self, msg: &Message) -> Result<bool> {
1252        let header = msg.header();
1253        let sender = header.sender();
1254        if sender == self.src_unique_name.as_ref() {
1255            return Ok(true);
1256        }
1257
1258        // The src_unique_name must be maintained in lock-step with the applied filter
1259        if let Some(signal) = NameOwnerChanged::from_message(msg.clone()) {
1260            let args = signal.args()?;
1261            self.src_unique_name = args.new_owner().as_ref().map(|n| n.to_owned());
1262        }
1263
1264        Ok(false)
1265    }
1266}
1267
1268assert_impl_all!(SignalStream<'_>: Send, Sync, Unpin);
1269
1270impl<'a> stream::Stream for SignalStream<'a> {
1271    type Item = Message;
1272
1273    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1274        OrderedStream::poll_next_before(self, cx, None).map(|res| res.into_data())
1275    }
1276}
1277
1278impl<'a> OrderedStream for SignalStream<'a> {
1279    type Data = Message;
1280    type Ordering = Sequence;
1281
1282    fn poll_next_before(
1283        self: Pin<&mut Self>,
1284        cx: &mut Context<'_>,
1285        before: Option<&Self::Ordering>,
1286    ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
1287        let this = self.get_mut();
1288        loop {
1289            match ready!(OrderedStream::poll_next_before(
1290                Pin::new(&mut this.stream),
1291                cx,
1292                before
1293            )) {
1294                PollResult::Item { data, ordering } => {
1295                    if let Ok(msg) = data {
1296                        if let Ok(true) = this.filter(&msg) {
1297                            return Poll::Ready(PollResult::Item {
1298                                data: msg,
1299                                ordering,
1300                            });
1301                        }
1302                    }
1303                }
1304                PollResult::Terminated => return Poll::Ready(PollResult::Terminated),
1305                PollResult::NoneBefore => return Poll::Ready(PollResult::NoneBefore),
1306            }
1307        }
1308    }
1309}
1310
1311impl<'a> stream::FusedStream for SignalStream<'a> {
1312    fn is_terminated(&self) -> bool {
1313        ordered_stream::FusedOrderedStream::is_terminated(&self.stream)
1314    }
1315}
1316
1317#[async_trait::async_trait]
1318impl AsyncDrop for SignalStream<'_> {
1319    async fn async_drop(self) {
1320        let (signals, names, _buffered) = self.stream.into_inner();
1321        signals.async_drop().await;
1322        if let Some(names) = names {
1323            names.async_drop().await;
1324        }
1325    }
1326}
1327
1328impl<'a> From<crate::blocking::Proxy<'a>> for Proxy<'a> {
1329    fn from(proxy: crate::blocking::Proxy<'a>) -> Self {
1330        proxy.into_inner()
1331    }
1332}
1333
1334/// This trait is implemented by all async proxies, which are generated with the
1335/// [`dbus_proxy`](zbus::dbus_proxy) macro.
1336pub trait ProxyImpl<'c>
1337where
1338    Self: Sized,
1339{
1340    /// Returns a customizable builder for this proxy.
1341    fn builder(conn: &Connection) -> Builder<'c, Self>;
1342
1343    /// Consumes `self`, returning the underlying `zbus::Proxy`.
1344    fn into_inner(self) -> Proxy<'c>;
1345
1346    /// The reference to the underlying `zbus::Proxy`.
1347    fn inner(&self) -> &Proxy<'c>;
1348}
1349
1350#[cfg(test)]
1351mod tests {
1352    use super::*;
1353    use crate::{connection, interface, object_server::SignalContext, proxy, utils::block_on};
1354    use futures_util::StreamExt;
1355    use ntest::timeout;
1356    use test_log::test;
1357
1358    #[test]
1359    #[timeout(15000)]
1360    fn signal() {
1361        block_on(test_signal()).unwrap();
1362    }
1363
1364    async fn test_signal() -> Result<()> {
1365        // Register a well-known name with the session bus and ensure we get the appropriate
1366        // signals called for that.
1367        let conn = Connection::session().await?;
1368        let dest_conn = Connection::session().await?;
1369        let unique_name = dest_conn.unique_name().unwrap().clone();
1370
1371        let well_known = "org.freedesktop.zbus.async.ProxySignalStreamTest";
1372        let proxy: Proxy<'_> = Builder::new(&conn)
1373            .destination(well_known)?
1374            .path("/does/not/matter")?
1375            .interface("does.not.matter")?
1376            .build()
1377            .await?;
1378        let mut owner_changed_stream = proxy.receive_owner_changed().await?;
1379
1380        let proxy = fdo::DBusProxy::new(&dest_conn).await?;
1381        let mut name_acquired_stream = proxy
1382            .inner()
1383            .receive_signal_with_args("NameAcquired", &[(0, well_known)])
1384            .await?;
1385
1386        let prop_stream = proxy
1387            .inner()
1388            .receive_property_changed("SomeProp")
1389            .await
1390            .filter_map(|changed| async move {
1391                let v: Option<u32> = changed.get().await.ok();
1392                dbg!(v)
1393            });
1394        drop(proxy);
1395        drop(prop_stream);
1396
1397        dest_conn.request_name(well_known).await?;
1398
1399        let (new_owner, acquired_signal) =
1400            futures_util::join!(owner_changed_stream.next(), name_acquired_stream.next(),);
1401
1402        assert_eq!(&new_owner.unwrap().unwrap(), &*unique_name);
1403
1404        let acquired_signal = acquired_signal.unwrap();
1405        assert_eq!(
1406            acquired_signal.body().deserialize::<&str>().unwrap(),
1407            well_known
1408        );
1409
1410        let proxy = Proxy::new(&conn, &unique_name, "/does/not/matter", "does.not.matter").await?;
1411        let mut unique_name_changed_stream = proxy.receive_owner_changed().await?;
1412
1413        drop(dest_conn);
1414        name_acquired_stream.async_drop().await;
1415
1416        // There shouldn't be an owner anymore.
1417        let new_owner = owner_changed_stream.next().await;
1418        assert!(new_owner.unwrap().is_none());
1419
1420        let new_unique_owner = unique_name_changed_stream.next().await;
1421        assert!(new_unique_owner.unwrap().is_none());
1422
1423        Ok(())
1424    }
1425
1426    #[test]
1427    #[timeout(15000)]
1428    fn signal_stream_deadlock() {
1429        block_on(test_signal_stream_deadlock()).unwrap();
1430    }
1431
1432    /// Tests deadlocking in signal reception when the message queue is full.
1433    ///
1434    /// Creates a connection with a small message queue, and a service that
1435    /// emits signals at a high rate. First a listener is created that listens
1436    /// for that signal which should fill the small queue. Then another signal
1437    /// signal listener is created against another signal. Previously, this second
1438    /// call to add the match rule never resolved and resulted in a deadlock.
1439    async fn test_signal_stream_deadlock() -> Result<()> {
1440        #[proxy(
1441            gen_blocking = false,
1442            default_path = "/org/zbus/Test",
1443            default_service = "org.zbus.Test.MR501",
1444            interface = "org.zbus.Test"
1445        )]
1446        trait Test {
1447            #[zbus(signal)]
1448            fn my_signal(&self, msg: &str) -> Result<()>;
1449        }
1450
1451        struct TestIface;
1452
1453        #[interface(name = "org.zbus.Test")]
1454        impl TestIface {
1455            #[zbus(signal)]
1456            async fn my_signal(context: &SignalContext<'_>, msg: &'static str) -> Result<()>;
1457        }
1458
1459        let test_iface = TestIface;
1460        let server_conn = connection::Builder::session()?
1461            .name("org.zbus.Test.MR501")?
1462            .serve_at("/org/zbus/Test", test_iface)?
1463            .build()
1464            .await?;
1465
1466        let client_conn = connection::Builder::session()?
1467            .max_queued(1)
1468            .build()
1469            .await?;
1470
1471        let test_proxy = TestProxy::new(&client_conn).await?;
1472        let test_prop_proxy = PropertiesProxy::builder(&client_conn)
1473            .destination("org.zbus.Test.MR501")?
1474            .path("/org/zbus/Test")?
1475            .build()
1476            .await?;
1477
1478        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
1479
1480        let handle = {
1481            let tx = tx.clone();
1482            let conn = server_conn.clone();
1483            let server_fut = async move {
1484                use std::time::Duration;
1485
1486                #[cfg(not(feature = "tokio"))]
1487                use async_io::Timer;
1488
1489                #[cfg(feature = "tokio")]
1490                use tokio::time::sleep;
1491
1492                let iface_ref = conn
1493                    .object_server()
1494                    .interface::<_, TestIface>("/org/zbus/Test")
1495                    .await
1496                    .unwrap();
1497
1498                let context = iface_ref.signal_context();
1499                while !tx.is_closed() {
1500                    for _ in 0..10 {
1501                        TestIface::my_signal(context, "This is a test")
1502                            .await
1503                            .unwrap();
1504                    }
1505
1506                    #[cfg(not(feature = "tokio"))]
1507                    Timer::after(Duration::from_millis(5)).await;
1508
1509                    #[cfg(feature = "tokio")]
1510                    sleep(Duration::from_millis(5)).await;
1511                }
1512            };
1513            server_conn.executor().spawn(server_fut, "server_task")
1514        };
1515
1516        let signal_fut = async {
1517            let mut signal_stream = test_proxy.receive_my_signal().await.unwrap();
1518
1519            tx.send(()).await.unwrap();
1520
1521            while let Some(_signal) = signal_stream.next().await {}
1522        };
1523
1524        let prop_fut = async move {
1525            rx.recv().await.unwrap();
1526            let _prop_stream = test_prop_proxy.receive_properties_changed().await.unwrap();
1527        };
1528
1529        futures_util::pin_mut!(signal_fut);
1530        futures_util::pin_mut!(prop_fut);
1531
1532        futures_util::future::select(signal_fut, prop_fut).await;
1533
1534        handle.await;
1535
1536        Ok(())
1537    }
1538}