zbus/connection/
mod.rs

1//! Connection API.
2use async_broadcast::{broadcast, InactiveReceiver, Receiver, Sender as Broadcaster};
3use enumflags2::BitFlags;
4use event_listener::{Event, EventListener};
5use ordered_stream::{OrderedFuture, OrderedStream, PollResult};
6use static_assertions::assert_impl_all;
7use std::{
8    collections::HashMap,
9    io::{self, ErrorKind},
10    num::NonZeroU32,
11    ops::Deref,
12    pin::Pin,
13    sync::{Arc, OnceLock, Weak},
14    task::{Context, Poll},
15};
16use tracing::{debug, info_span, instrument, trace, trace_span, warn, Instrument};
17use zbus_names::{BusName, ErrorName, InterfaceName, MemberName, OwnedUniqueName, WellKnownName};
18use zvariant::ObjectPath;
19
20use futures_core::Future;
21use futures_util::StreamExt;
22
23use crate::{
24    async_lock::{Mutex, Semaphore, SemaphorePermit},
25    blocking,
26    fdo::{self, ConnectionCredentials, RequestNameFlags, RequestNameReply},
27    is_flatpak,
28    message::{Flags, Message, Type},
29    proxy::CacheProperties,
30    DBusError, Error, Executor, MatchRule, MessageStream, ObjectServer, OwnedGuid, OwnedMatchRule,
31    Result, Task,
32};
33
34mod builder;
35pub use builder::Builder;
36
37pub mod socket;
38pub use socket::Socket;
39
40mod socket_reader;
41use socket_reader::SocketReader;
42
43pub(crate) mod handshake;
44use handshake::Authenticated;
45
46const DEFAULT_MAX_QUEUED: usize = 64;
47const DEFAULT_MAX_METHOD_RETURN_QUEUED: usize = 8;
48
49/// Inner state shared by Connection and WeakConnection
50#[derive(Debug)]
51pub(crate) struct ConnectionInner {
52    server_guid: OwnedGuid,
53    #[cfg(unix)]
54    cap_unix_fd: bool,
55    #[cfg(feature = "p2p")]
56    bus_conn: bool,
57    unique_name: OnceLock<OwnedUniqueName>,
58    registered_names: Mutex<HashMap<WellKnownName<'static>, NameStatus>>,
59
60    activity_event: Arc<Event>,
61    socket_write: Mutex<Box<dyn socket::WriteHalf>>,
62
63    // Our executor
64    executor: Executor<'static>,
65
66    // Socket reader task
67    #[allow(unused)]
68    socket_reader_task: OnceLock<Task<()>>,
69
70    pub(crate) msg_receiver: InactiveReceiver<Result<Message>>,
71    pub(crate) method_return_receiver: InactiveReceiver<Result<Message>>,
72    msg_senders: Arc<Mutex<HashMap<Option<OwnedMatchRule>, MsgBroadcaster>>>,
73
74    subscriptions: Mutex<Subscriptions>,
75
76    object_server: OnceLock<blocking::ObjectServer>,
77    object_server_dispatch_task: OnceLock<Task<()>>,
78}
79
80type Subscriptions = HashMap<OwnedMatchRule, (u64, InactiveReceiver<Result<Message>>)>;
81
82pub(crate) type MsgBroadcaster = Broadcaster<Result<Message>>;
83
84/// A D-Bus connection.
85///
86/// A connection to a D-Bus bus, or a direct peer.
87///
88/// Once created, the connection is authenticated and negotiated and messages can be sent or
89/// received, such as [method calls] or [signals].
90///
91/// For higher-level message handling (typed functions, introspection, documentation reasons etc),
92/// it is recommended to wrap the low-level D-Bus messages into Rust functions with the
93/// [`proxy`] and [`interface`] macros instead of doing it directly on a `Connection`.
94///
95/// Typically, a connection is made to the session bus with [`Connection::session`], or to the
96/// system bus with [`Connection::system`]. Then the connection is used with [`crate::Proxy`]
97/// instances or the on-demand [`ObjectServer`] instance that can be accessed through
98/// [`Connection::object_server`].
99///
100/// `Connection` implements [`Clone`] and cloning it is a very cheap operation, as the underlying
101/// data is not cloned. This makes it very convenient to share the connection between different
102/// parts of your code. `Connection` also implements [`std::marker::Sync`] and [`std::marker::Send`]
103/// so you can send and share a connection instance across threads as well.
104///
105/// `Connection` keeps internal queues of incoming message. The default capacity of each of these is
106/// 64. The capacity of the main (unfiltered) queue is configurable through the [`set_max_queued`]
107/// method. When the queue is full, no more messages can be received until room is created for more.
108/// This is why it's important to ensure that all [`crate::MessageStream`] and
109/// [`crate::blocking::MessageIterator`] instances are continuously polled and iterated on,
110/// respectively.
111///
112/// For sending messages you can either use [`Connection::send`] method.
113///
114/// [method calls]: struct.Connection.html#method.call_method
115/// [signals]: struct.Connection.html#method.emit_signal
116/// [`proxy`]: attr.proxy.html
117/// [`interface`]: attr.interface.html
118/// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html
119/// [`set_max_queued`]: struct.Connection.html#method.set_max_queued
120///
121/// ### Examples
122///
123/// #### Get the session bus ID
124///
125/// ```
126/// # zbus::block_on(async {
127/// use zbus::Connection;
128///
129/// let connection = Connection::session().await?;
130///
131/// let reply_body = connection
132///     .call_method(
133///         Some("org.freedesktop.DBus"),
134///         "/org/freedesktop/DBus",
135///         Some("org.freedesktop.DBus"),
136///         "GetId",
137///         &(),
138///     )
139///     .await?
140///     .body();
141///
142/// let id: &str = reply_body.deserialize()?;
143/// println!("Unique ID of the bus: {}", id);
144/// # Ok::<(), zbus::Error>(())
145/// # }).unwrap();
146/// ```
147///
148/// #### Monitoring all messages
149///
150/// Let's eavesdrop on the session bus 😈 using the [Monitor] interface:
151///
152/// ```rust,no_run
153/// # zbus::block_on(async {
154/// use futures_util::stream::TryStreamExt;
155/// use zbus::{Connection, MessageStream};
156///
157/// let connection = Connection::session().await?;
158///
159/// connection
160///     .call_method(
161///         Some("org.freedesktop.DBus"),
162///         "/org/freedesktop/DBus",
163///         Some("org.freedesktop.DBus.Monitoring"),
164///         "BecomeMonitor",
165///         &(&[] as &[&str], 0u32),
166///     )
167///     .await?;
168///
169/// let mut stream = MessageStream::from(connection);
170/// while let Some(msg) = stream.try_next().await? {
171///     println!("Got message: {}", msg);
172/// }
173///
174/// # Ok::<(), zbus::Error>(())
175/// # }).unwrap();
176/// ```
177///
178/// This should print something like:
179///
180/// ```console
181/// Got message: Signal NameAcquired from org.freedesktop.DBus
182/// Got message: Signal NameLost from org.freedesktop.DBus
183/// Got message: Method call GetConnectionUnixProcessID from :1.1324
184/// Got message: Error org.freedesktop.DBus.Error.NameHasNoOwner:
185///              Could not get PID of name ':1.1332': no such name from org.freedesktop.DBus
186/// Got message: Method call AddMatch from :1.918
187/// Got message: Method return from org.freedesktop.DBus
188/// ```
189///
190/// [Monitor]: https://dbus.freedesktop.org/doc/dbus-specification.html#bus-messages-become-monitor
191#[derive(Clone, Debug)]
192#[must_use = "Dropping a `Connection` will close the underlying socket."]
193pub struct Connection {
194    pub(crate) inner: Arc<ConnectionInner>,
195}
196
197assert_impl_all!(Connection: Send, Sync, Unpin);
198
199/// A method call whose completion can be awaited or joined with other streams.
200///
201/// This is useful for cache population method calls, where joining the [`JoinableStream`] with
202/// an update signal stream can be used to ensure that cache updates are not overwritten by a cache
203/// population whose task is scheduled later.
204#[derive(Debug)]
205pub(crate) struct PendingMethodCall {
206    stream: Option<MessageStream>,
207    serial: NonZeroU32,
208}
209
210impl Future for PendingMethodCall {
211    type Output = Result<Message>;
212
213    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
214        self.poll_before(cx, None).map(|ret| {
215            ret.map(|(_, r)| r).unwrap_or_else(|| {
216                Err(crate::Error::InputOutput(
217                    io::Error::new(ErrorKind::BrokenPipe, "socket closed").into(),
218                ))
219            })
220        })
221    }
222}
223
224impl OrderedFuture for PendingMethodCall {
225    type Output = Result<Message>;
226    type Ordering = zbus::message::Sequence;
227
228    fn poll_before(
229        self: Pin<&mut Self>,
230        cx: &mut Context<'_>,
231        before: Option<&Self::Ordering>,
232    ) -> Poll<Option<(Self::Ordering, Self::Output)>> {
233        let this = self.get_mut();
234        if let Some(stream) = &mut this.stream {
235            loop {
236                match Pin::new(&mut *stream).poll_next_before(cx, before) {
237                    Poll::Ready(PollResult::Item {
238                        data: Ok(msg),
239                        ordering,
240                    }) => {
241                        if msg.header().reply_serial() != Some(this.serial) {
242                            continue;
243                        }
244                        let res = match msg.message_type() {
245                            Type::Error => Err(msg.into()),
246                            Type::MethodReturn => Ok(msg),
247                            _ => continue,
248                        };
249                        this.stream = None;
250                        return Poll::Ready(Some((ordering, res)));
251                    }
252                    Poll::Ready(PollResult::Item {
253                        data: Err(e),
254                        ordering,
255                    }) => {
256                        return Poll::Ready(Some((ordering, Err(e))));
257                    }
258
259                    Poll::Ready(PollResult::NoneBefore) => {
260                        return Poll::Ready(None);
261                    }
262                    Poll::Ready(PollResult::Terminated) => {
263                        return Poll::Ready(None);
264                    }
265                    Poll::Pending => return Poll::Pending,
266                }
267            }
268        }
269        Poll::Ready(None)
270    }
271}
272
273impl Connection {
274    /// Send `msg` to the peer.
275    pub async fn send(&self, msg: &Message) -> Result<()> {
276        #[cfg(unix)]
277        if !msg.data().fds().is_empty() && !self.inner.cap_unix_fd {
278            return Err(Error::Unsupported);
279        }
280
281        self.inner.activity_event.notify(usize::MAX);
282        let mut write = self.inner.socket_write.lock().await;
283
284        write.send_message(msg).await
285    }
286
287    /// Send a method call.
288    ///
289    /// Create a method-call message, send it over the connection, then wait for the reply.
290    ///
291    /// On successful reply, an `Ok(Message)` is returned. On error, an `Err` is returned. D-Bus
292    /// error replies are returned as [`Error::MethodError`].
293    pub async fn call_method<'d, 'p, 'i, 'm, D, P, I, M, B>(
294        &self,
295        destination: Option<D>,
296        path: P,
297        interface: Option<I>,
298        method_name: M,
299        body: &B,
300    ) -> Result<Message>
301    where
302        D: TryInto<BusName<'d>>,
303        P: TryInto<ObjectPath<'p>>,
304        I: TryInto<InterfaceName<'i>>,
305        M: TryInto<MemberName<'m>>,
306        D::Error: Into<Error>,
307        P::Error: Into<Error>,
308        I::Error: Into<Error>,
309        M::Error: Into<Error>,
310        B: serde::ser::Serialize + zvariant::DynamicType,
311    {
312        self.call_method_raw(
313            destination,
314            path,
315            interface,
316            method_name,
317            BitFlags::empty(),
318            body,
319        )
320        .await?
321        .expect("no reply")
322        .await
323    }
324
325    /// Send a method call.
326    ///
327    /// Send the given message, which must be a method call, over the connection and return an
328    /// object that allows the reply to be retrieved.  Typically you'd want to use
329    /// [`Connection::call_method`] instead.
330    ///
331    /// If the `flags` do not contain `MethodFlags::NoReplyExpected`, the return value is
332    /// guaranteed to be `Ok(Some(_))`, if there was no error encountered.
333    ///
334    /// INTERNAL NOTE: If this method is ever made pub, flags should become `BitFlags<MethodFlags>`.
335    pub(crate) async fn call_method_raw<'d, 'p, 'i, 'm, D, P, I, M, B>(
336        &self,
337        destination: Option<D>,
338        path: P,
339        interface: Option<I>,
340        method_name: M,
341        flags: BitFlags<Flags>,
342        body: &B,
343    ) -> Result<Option<PendingMethodCall>>
344    where
345        D: TryInto<BusName<'d>>,
346        P: TryInto<ObjectPath<'p>>,
347        I: TryInto<InterfaceName<'i>>,
348        M: TryInto<MemberName<'m>>,
349        D::Error: Into<Error>,
350        P::Error: Into<Error>,
351        I::Error: Into<Error>,
352        M::Error: Into<Error>,
353        B: serde::ser::Serialize + zvariant::DynamicType,
354    {
355        let _permit = acquire_serial_num_semaphore().await;
356
357        let mut builder = Message::method(path, method_name)?;
358        if let Some(sender) = self.unique_name() {
359            builder = builder.sender(sender)?
360        }
361        if let Some(destination) = destination {
362            builder = builder.destination(destination)?
363        }
364        if let Some(interface) = interface {
365            builder = builder.interface(interface)?
366        }
367        for flag in flags {
368            builder = builder.with_flags(flag)?;
369        }
370        let msg = builder.build(body)?;
371
372        let msg_receiver = self.inner.method_return_receiver.activate_cloned();
373        let stream = Some(MessageStream::for_subscription_channel(
374            msg_receiver,
375            // This is a lie but we only use the stream internally so it's fine.
376            None,
377            self,
378        ));
379        let serial = msg.primary_header().serial_num();
380        self.send(&msg).await?;
381        if flags.contains(Flags::NoReplyExpected) {
382            Ok(None)
383        } else {
384            Ok(Some(PendingMethodCall { stream, serial }))
385        }
386    }
387
388    /// Emit a signal.
389    ///
390    /// Create a signal message, and send it over the connection.
391    pub async fn emit_signal<'d, 'p, 'i, 'm, D, P, I, M, B>(
392        &self,
393        destination: Option<D>,
394        path: P,
395        interface: I,
396        signal_name: M,
397        body: &B,
398    ) -> Result<()>
399    where
400        D: TryInto<BusName<'d>>,
401        P: TryInto<ObjectPath<'p>>,
402        I: TryInto<InterfaceName<'i>>,
403        M: TryInto<MemberName<'m>>,
404        D::Error: Into<Error>,
405        P::Error: Into<Error>,
406        I::Error: Into<Error>,
407        M::Error: Into<Error>,
408        B: serde::ser::Serialize + zvariant::DynamicType,
409    {
410        let _permit = acquire_serial_num_semaphore().await;
411
412        let mut b = Message::signal(path, interface, signal_name)?;
413        if let Some(sender) = self.unique_name() {
414            b = b.sender(sender)?;
415        }
416        if let Some(destination) = destination {
417            b = b.destination(destination)?;
418        }
419        let m = b.build(body)?;
420
421        self.send(&m).await
422    }
423
424    /// Reply to a message.
425    ///
426    /// Given an existing message (likely a method call), send a reply back to the caller with the
427    /// given `body`.
428    pub async fn reply<B>(&self, call: &Message, body: &B) -> Result<()>
429    where
430        B: serde::ser::Serialize + zvariant::DynamicType,
431    {
432        let _permit = acquire_serial_num_semaphore().await;
433
434        let mut b = Message::method_reply(call)?;
435        if let Some(sender) = self.unique_name() {
436            b = b.sender(sender)?;
437        }
438        let m = b.build(body)?;
439        self.send(&m).await
440    }
441
442    /// Reply an error to a message.
443    ///
444    /// Given an existing message (likely a method call), send an error reply back to the caller
445    /// with the given `error_name` and `body`.
446    pub async fn reply_error<'e, E, B>(&self, call: &Message, error_name: E, body: &B) -> Result<()>
447    where
448        B: serde::ser::Serialize + zvariant::DynamicType,
449        E: TryInto<ErrorName<'e>>,
450        E::Error: Into<Error>,
451    {
452        let _permit = acquire_serial_num_semaphore().await;
453
454        let mut b = Message::method_error(call, error_name)?;
455        if let Some(sender) = self.unique_name() {
456            b = b.sender(sender)?;
457        }
458        let m = b.build(body)?;
459        self.send(&m).await
460    }
461
462    /// Reply an error to a message.
463    ///
464    /// Given an existing message (likely a method call), send an error reply back to the caller
465    /// using one of the standard interface reply types.
466    pub async fn reply_dbus_error(
467        &self,
468        call: &zbus::message::Header<'_>,
469        err: impl DBusError,
470    ) -> Result<()> {
471        let _permit = acquire_serial_num_semaphore().await;
472
473        let m = err.create_reply(call)?;
474        self.send(&m).await
475    }
476
477    /// Register a well-known name for this connection.
478    ///
479    /// When connecting to a bus, the name is requested from the bus. In case of p2p connection, the
480    /// name (if requested) is used of self-identification.
481    ///
482    /// You can request multiple names for the same connection. Use [`Connection::release_name`] for
483    /// deregistering names registered through this method.
484    ///
485    /// Note that exclusive ownership without queueing is requested (using
486    /// [`RequestNameFlags::ReplaceExisting`] and [`RequestNameFlags::DoNotQueue`] flags) since that
487    /// is the most typical case. If that is not what you want, you should use
488    /// [`Connection::request_name_with_flags`] instead (but make sure then that name is requested
489    /// **after** you've setup your service implementation with the `ObjectServer`).
490    ///
491    /// # Caveats
492    ///
493    /// The associated `ObjectServer` will only handle method calls destined for the unique name of
494    /// this connection or any of the registered well-known names. If no well-known name is
495    /// registered, the method calls destined to all well-known names will be handled.
496    ///
497    /// Since names registered through any other means than `Connection` or [`Builder`]
498    /// API are not known to the connection, method calls destined to those names will only be
499    /// handled by the associated `ObjectServer` if none of the names are registered through
500    /// `Connection*` API. Simply put, either register all the names through `Connection*` API or
501    /// none of them.
502    ///
503    /// # Errors
504    ///
505    /// Fails with `zbus::Error::NameTaken` if the name is already owned by another peer.
506    pub async fn request_name<'w, W>(&self, well_known_name: W) -> Result<()>
507    where
508        W: TryInto<WellKnownName<'w>>,
509        W::Error: Into<Error>,
510    {
511        self.request_name_with_flags(
512            well_known_name,
513            RequestNameFlags::ReplaceExisting | RequestNameFlags::DoNotQueue,
514        )
515        .await
516        .map(|_| ())
517    }
518
519    /// Register a well-known name for this connection.
520    ///
521    /// This is the same as [`Connection::request_name`] but allows to specify the flags to use when
522    /// requesting the name.
523    ///
524    /// If the [`RequestNameFlags::DoNotQueue`] flag is not specified and request ends up in the
525    /// queue, you can use [`fdo::NameAcquiredStream`] to be notified when the name is acquired. A
526    /// queued name request can be cancelled using [`Connection::release_name`].
527    ///
528    /// If the [`RequestNameFlags::AllowReplacement`] flag is specified, the requested name can be
529    /// lost if another peer requests the same name. You can use [`fdo::NameLostStream`] to be
530    /// notified when the name is lost
531    ///
532    /// # Example
533    ///
534    /// ```
535    /// #
536    /// # zbus::block_on(async {
537    /// use zbus::{Connection, fdo::{DBusProxy, RequestNameFlags, RequestNameReply}};
538    /// use enumflags2::BitFlags;
539    /// use futures_util::stream::StreamExt;
540    ///
541    /// let name = "org.freedesktop.zbus.QueuedNameTest";
542    /// let conn1 = Connection::session().await?;
543    /// // This should just work right away.
544    /// conn1.request_name(name).await?;
545    ///
546    /// let conn2 = Connection::session().await?;
547    /// // A second request from the another connection will fail with `DoNotQueue` flag, which is
548    /// // implicit with `request_name` method.
549    /// assert!(conn2.request_name(name).await.is_err());
550    ///
551    /// // Now let's try w/o `DoNotQueue` and we should be queued.
552    /// let reply = conn2
553    ///     .request_name_with_flags(name, RequestNameFlags::AllowReplacement.into())
554    ///     .await?;
555    /// assert_eq!(reply, RequestNameReply::InQueue);
556    /// // Another request should just give us the same response.
557    /// let reply = conn2
558    ///     // The flags on subsequent requests will however be ignored.
559    ///     .request_name_with_flags(name, BitFlags::empty())
560    ///     .await?;
561    /// assert_eq!(reply, RequestNameReply::InQueue);
562    /// let mut acquired_stream = DBusProxy::new(&conn2)
563    ///     .await?
564    ///     .receive_name_acquired()
565    ///     .await?;
566    /// assert!(conn1.release_name(name).await?);
567    /// // This would have waited forever if `conn1` hadn't just release the name.
568    /// let acquired = acquired_stream.next().await.unwrap();
569    /// assert_eq!(acquired.args().unwrap().name, name);
570    ///
571    /// // conn2 made the mistake of being too nice and allowed name replacemnt, so conn1 should be
572    /// // able to take it back.
573    /// let mut lost_stream = DBusProxy::new(&conn2)
574    ///     .await?
575    ///     .receive_name_lost()
576    ///     .await?;
577    /// conn1.request_name(name).await?;
578    /// let lost = lost_stream.next().await.unwrap();
579    /// assert_eq!(lost.args().unwrap().name, name);
580    ///
581    /// # Ok::<(), zbus::Error>(())
582    /// # }).unwrap();
583    /// ```
584    ///
585    /// # Caveats
586    ///
587    /// * Same as that of [`Connection::request_name`].
588    /// * If you wish to track changes to name ownership after this call, make sure that the
589    /// [`fdo::NameAcquired`] and/or [`fdo::NameLostStream`] instance(s) are created **before**
590    /// calling this method. Otherwise, you may loose the signal if it's emitted after this call but
591    /// just before the stream instance get created.
592    pub async fn request_name_with_flags<'w, W>(
593        &self,
594        well_known_name: W,
595        flags: BitFlags<RequestNameFlags>,
596    ) -> Result<RequestNameReply>
597    where
598        W: TryInto<WellKnownName<'w>>,
599        W::Error: Into<Error>,
600    {
601        let well_known_name = well_known_name.try_into().map_err(Into::into)?;
602        // We keep the lock until the end of this function so that the (possibly) spawned task
603        // doesn't end up accessing the name entry before it's inserted.
604        let mut names = self.inner.registered_names.lock().await;
605
606        match names.get(&well_known_name) {
607            Some(NameStatus::Owner(_)) => return Ok(RequestNameReply::AlreadyOwner),
608            Some(NameStatus::Queued(_)) => return Ok(RequestNameReply::InQueue),
609            None => (),
610        }
611
612        if !self.is_bus() {
613            names.insert(well_known_name.to_owned(), NameStatus::Owner(None));
614
615            return Ok(RequestNameReply::PrimaryOwner);
616        }
617
618        let dbus_proxy = fdo::DBusProxy::builder(self)
619            .cache_properties(CacheProperties::No)
620            .build()
621            .await?;
622        let mut acquired_stream = dbus_proxy.receive_name_acquired().await?;
623        let mut lost_stream = dbus_proxy.receive_name_lost().await?;
624        let reply = dbus_proxy
625            .request_name(well_known_name.clone(), flags)
626            .await?;
627        let lost_task_name = format!("monitor name {well_known_name} lost");
628        let name_lost_fut = if flags.contains(RequestNameFlags::AllowReplacement) {
629            let weak_conn = WeakConnection::from(self);
630            let well_known_name = well_known_name.to_owned();
631            Some(
632                async move {
633                    loop {
634                        let signal = lost_stream.next().await;
635                        let inner = match weak_conn.upgrade() {
636                            Some(conn) => conn.inner.clone(),
637                            None => break,
638                        };
639
640                        match signal {
641                            Some(signal) => match signal.args() {
642                                Ok(args) if args.name == well_known_name => {
643                                    tracing::info!(
644                                        "Connection `{}` lost name `{}`",
645                                        // SAFETY: This is bus connection so unique name can't be
646                                        // None.
647                                        inner.unique_name.get().unwrap(),
648                                        well_known_name
649                                    );
650                                    inner.registered_names.lock().await.remove(&well_known_name);
651
652                                    break;
653                                }
654                                Ok(_) => (),
655                                Err(e) => warn!("Failed to parse `NameLost` signal: {}", e),
656                            },
657                            None => {
658                                trace!("`NameLost` signal stream closed");
659                                // This is a very strange state we end up in. Now the name is
660                                // question remains in the queue
661                                // forever. Maybe we can do better here but I
662                                // think it's a very unlikely scenario anyway.
663                                //
664                                // Can happen if the connection is lost/dropped but then the whole
665                                // `Connection` instance will go away soon anyway and hence this
666                                // strange state along with it.
667                                break;
668                            }
669                        }
670                    }
671                }
672                .instrument(info_span!("{}", lost_task_name)),
673            )
674        } else {
675            None
676        };
677        let status = match reply {
678            RequestNameReply::InQueue => {
679                let weak_conn = WeakConnection::from(self);
680                let well_known_name = well_known_name.to_owned();
681                let task_name = format!("monitor name {well_known_name} acquired");
682                let task = self.executor().spawn(
683                    async move {
684                        loop {
685                            let signal = acquired_stream.next().await;
686                            let inner = match weak_conn.upgrade() {
687                                Some(conn) => conn.inner.clone(),
688                                None => break,
689                            };
690                            match signal {
691                                Some(signal) => match signal.args() {
692                                    Ok(args) if args.name == well_known_name => {
693                                        let mut names = inner.registered_names.lock().await;
694                                        if let Some(status) = names.get_mut(&well_known_name) {
695                                            let task = name_lost_fut.map(|fut| {
696                                                inner.executor.spawn(fut, &lost_task_name)
697                                            });
698                                            *status = NameStatus::Owner(task);
699
700                                            break;
701                                        }
702                                        // else the name was released in the meantime. :shrug:
703                                    }
704                                    Ok(_) => (),
705                                    Err(e) => warn!("Failed to parse `NameAcquired` signal: {}", e),
706                                },
707                                None => {
708                                    trace!("`NameAcquired` signal stream closed");
709                                    // See comment above for similar state in case of `NameLost`
710                                    // stream.
711                                    break;
712                                }
713                            }
714                        }
715                    }
716                    .instrument(info_span!("{}", task_name)),
717                    &task_name,
718                );
719
720                NameStatus::Queued(task)
721            }
722            RequestNameReply::PrimaryOwner | RequestNameReply::AlreadyOwner => {
723                let task = name_lost_fut.map(|fut| self.executor().spawn(fut, &lost_task_name));
724
725                NameStatus::Owner(task)
726            }
727            RequestNameReply::Exists => return Err(Error::NameTaken),
728        };
729
730        names.insert(well_known_name.to_owned(), status);
731
732        Ok(reply)
733    }
734
735    /// Deregister a previously registered well-known name for this service on the bus.
736    ///
737    /// Use this method to deregister a well-known name, registered through
738    /// [`Connection::request_name`].
739    ///
740    /// Unless an error is encountered, returns `Ok(true)` if name was previously registered with
741    /// the bus through `self` and it has now been successfully deregistered, `Ok(false)` if name
742    /// was not previously registered or already deregistered.
743    pub async fn release_name<'w, W>(&self, well_known_name: W) -> Result<bool>
744    where
745        W: TryInto<WellKnownName<'w>>,
746        W::Error: Into<Error>,
747    {
748        let well_known_name: WellKnownName<'w> = well_known_name.try_into().map_err(Into::into)?;
749        let mut names = self.inner.registered_names.lock().await;
750        // FIXME: Should be possible to avoid cloning/allocation here
751        if names.remove(&well_known_name.to_owned()).is_none() {
752            return Ok(false);
753        };
754
755        if !self.is_bus() {
756            return Ok(true);
757        }
758
759        fdo::DBusProxy::builder(self)
760            .cache_properties(CacheProperties::No)
761            .build()
762            .await?
763            .release_name(well_known_name)
764            .await
765            .map(|_| true)
766            .map_err(Into::into)
767    }
768
769    /// Checks if `self` is a connection to a message bus.
770    ///
771    /// This will return `false` for p2p connections. When the `p2p` feature is enabled, this will
772    /// always return `true`.
773    pub fn is_bus(&self) -> bool {
774        #[cfg(feature = "p2p")]
775        {
776            self.inner.bus_conn
777        }
778        #[cfg(not(feature = "p2p"))]
779        {
780            true
781        }
782    }
783
784    /// The unique name of the connection, if set/applicable.
785    ///
786    /// The unique name is assigned by the message bus or set manually using
787    /// [`Connection::set_unique_name`].
788    pub fn unique_name(&self) -> Option<&OwnedUniqueName> {
789        self.inner.unique_name.get()
790    }
791
792    /// Sets the unique name of the connection (if not already set).
793    ///
794    /// This is mainly provided for bus implementations. All other users should not need to use this
795    /// method. Hence why this method is only available when the `bus-impl` feature is enabled.
796    ///
797    /// # Panics
798    ///
799    /// This method panics if the unique name is already set. It will always panic if the connection
800    /// is to a message bus as it's the bus that assigns peers their unique names.
801    #[cfg(feature = "bus-impl")]
802    pub fn set_unique_name<U>(&self, unique_name: U) -> Result<()>
803    where
804        U: TryInto<OwnedUniqueName>,
805        U::Error: Into<Error>,
806    {
807        let name = unique_name.try_into().map_err(Into::into)?;
808        self.set_unique_name_(name);
809
810        Ok(())
811    }
812
813    /// The capacity of the main (unfiltered) queue.
814    pub fn max_queued(&self) -> usize {
815        self.inner.msg_receiver.capacity()
816    }
817
818    /// Set the capacity of the main (unfiltered) queue.
819    pub fn set_max_queued(&mut self, max: usize) {
820        self.inner.msg_receiver.clone().set_capacity(max);
821    }
822
823    /// The server's GUID.
824    pub fn server_guid(&self) -> &OwnedGuid {
825        &self.inner.server_guid
826    }
827
828    /// The underlying executor.
829    ///
830    /// When a connection is built with internal_executor set to false, zbus will not spawn a
831    /// thread to run the executor. You're responsible to continuously [tick the executor][tte].
832    /// Failure to do so will result in hangs.
833    ///
834    /// # Examples
835    ///
836    /// Here is how one would typically run the zbus executor through tokio's scheduler:
837    ///
838    /// ```
839    /// # // Disable on windows because somehow it triggers a stack overflow there:
840    /// # // https://gitlab.freedesktop.org/zeenix/zbus/-/jobs/34023494
841    /// # #[cfg(not(target_os = "unix"))]
842    /// # {
843    /// use zbus::connection::Builder;
844    /// use tokio::task::spawn;
845    ///
846    /// # struct SomeIface;
847    /// #
848    /// # #[zbus::interface]
849    /// # impl SomeIface {
850    /// # }
851    /// #
852    /// #[tokio::main]
853    /// async fn main() {
854    ///     let conn = Builder::session()
855    ///         .unwrap()
856    ///         .internal_executor(false)
857    /// #         // This is only for testing a deadlock that used to happen with this combo.
858    /// #         .serve_at("/some/iface", SomeIface)
859    /// #         .unwrap()
860    ///         .build()
861    ///         .await
862    ///         .unwrap();
863    ///     {
864    ///        let conn = conn.clone();
865    ///        spawn(async move {
866    ///            loop {
867    ///                conn.executor().tick().await;
868    ///            }
869    ///        });
870    ///     }
871    ///
872    ///     // All your other async code goes here.
873    /// }
874    /// # }
875    /// ```
876    ///
877    /// **Note**: zbus 2.1 added support for tight integration with tokio. This means, if you use
878    /// zbus with tokio, you do not need to worry about this at all. All you need to do is enable
879    /// `tokio` feature. You should also disable the (default) `async-io` feature in your
880    /// `Cargo.toml` to avoid unused dependencies. Also note that **prior** to zbus 3.0, disabling
881    /// `async-io` was required to enable tight `tokio` integration.
882    ///
883    /// [tte]: https://docs.rs/async-executor/1.4.1/async_executor/struct.Executor.html#method.tick
884    pub fn executor(&self) -> &Executor<'static> {
885        &self.inner.executor
886    }
887
888    /// Get a reference to the associated [`ObjectServer`].
889    ///
890    /// The `ObjectServer` is created on-demand.
891    ///
892    /// **Note**: Once the `ObjectServer` is created, it will be replying to all method calls
893    /// received on `self`. If you want to manually reply to method calls, do not use this
894    /// method (or any of the `ObjectServer` related API).
895    pub fn object_server(&self) -> impl Deref<Target = ObjectServer> + '_ {
896        // FIXME: Maybe it makes sense after all to implement Deref<Target= ObjectServer> for
897        // crate::ObjectServer instead of this wrapper?
898        struct Wrapper<'a>(&'a blocking::ObjectServer);
899        impl<'a> Deref for Wrapper<'a> {
900            type Target = ObjectServer;
901
902            fn deref(&self) -> &Self::Target {
903                self.0.inner()
904            }
905        }
906
907        Wrapper(self.sync_object_server(true, None))
908    }
909
910    pub(crate) fn sync_object_server(
911        &self,
912        start: bool,
913        started_event: Option<Event>,
914    ) -> &blocking::ObjectServer {
915        self.inner
916            .object_server
917            .get_or_init(move || self.setup_object_server(start, started_event))
918    }
919
920    fn setup_object_server(
921        &self,
922        start: bool,
923        started_event: Option<Event>,
924    ) -> blocking::ObjectServer {
925        if start {
926            self.start_object_server(started_event);
927        }
928
929        blocking::ObjectServer::new(self)
930    }
931
932    #[instrument(skip(self))]
933    pub(crate) fn start_object_server(&self, started_event: Option<Event>) {
934        self.inner.object_server_dispatch_task.get_or_init(|| {
935            trace!("starting ObjectServer task");
936            let weak_conn = WeakConnection::from(self);
937
938            let obj_server_task_name = "ObjectServer task";
939            self.inner.executor.spawn(
940                async move {
941                    let mut stream = match weak_conn.upgrade() {
942                        Some(conn) => {
943                            let mut builder = MatchRule::builder().msg_type(Type::MethodCall);
944                            if let Some(unique_name) = conn.unique_name() {
945                                builder = builder.destination(&**unique_name).expect("unique name");
946                            }
947                            let rule = builder.build();
948                            match conn.add_match(rule.into(), None).await {
949                                Ok(stream) => stream,
950                                Err(e) => {
951                                    // Very unlikely but can happen I guess if connection is closed.
952                                    debug!("Failed to create message stream: {}", e);
953
954                                    return;
955                                }
956                            }
957                        }
958                        None => {
959                            trace!("Connection is gone, stopping associated object server task");
960
961                            return;
962                        }
963                    };
964                    if let Some(started_event) = started_event {
965                        started_event.notify(1);
966                    }
967
968                    trace!("waiting for incoming method call messages..");
969                    while let Some(msg) = stream.next().await.and_then(|m| {
970                        if let Err(e) = &m {
971                            debug!("Error while reading from object server stream: {:?}", e);
972                        }
973                        m.ok()
974                    }) {
975                        if let Some(conn) = weak_conn.upgrade() {
976                            let hdr = msg.header();
977                            // If we're connected to a bus, skip the destination check as the
978                            // server will only send us method calls destined to us.
979                            if !conn.is_bus() {
980                                match hdr.destination() {
981                                    // Unique name is already checked by the match rule.
982                                    Some(BusName::Unique(_)) | None => (),
983                                    Some(BusName::WellKnown(dest)) => {
984                                        let names = conn.inner.registered_names.lock().await;
985                                        // destination doesn't matter if no name has been registered
986                                        // (probably means the name is registered through external
987                                        // means).
988                                        if !names.is_empty() && !names.contains_key(dest) {
989                                            trace!(
990                                                "Got a method call for a different destination: {}",
991                                                dest
992                                            );
993
994                                            continue;
995                                        }
996                                    }
997                                }
998                            }
999                            let server = conn.object_server();
1000                            if let Err(e) = server.dispatch_call(&msg, &hdr).await {
1001                                debug!(
1002                                    "Error dispatching message. Message: {:?}, error: {:?}",
1003                                    msg, e
1004                                );
1005                            }
1006                        } else {
1007                            // If connection is completely gone, no reason to keep running the task
1008                            // anymore.
1009                            trace!("Connection is gone, stopping associated object server task");
1010                            break;
1011                        }
1012                    }
1013                }
1014                .instrument(info_span!("{}", obj_server_task_name)),
1015                obj_server_task_name,
1016            )
1017        });
1018    }
1019
1020    pub(crate) async fn add_match(
1021        &self,
1022        rule: OwnedMatchRule,
1023        max_queued: Option<usize>,
1024    ) -> Result<Receiver<Result<Message>>> {
1025        use std::collections::hash_map::Entry;
1026
1027        if self.inner.msg_senders.lock().await.is_empty() {
1028            // This only happens if socket reader task has errored out.
1029            return Err(Error::InputOutput(Arc::new(io::Error::new(
1030                io::ErrorKind::BrokenPipe,
1031                "Socket reader task has errored out",
1032            ))));
1033        }
1034
1035        let mut subscriptions = self.inner.subscriptions.lock().await;
1036        let msg_type = rule.msg_type().unwrap_or(Type::Signal);
1037        match subscriptions.entry(rule.clone()) {
1038            Entry::Vacant(e) => {
1039                let max_queued = max_queued.unwrap_or(DEFAULT_MAX_QUEUED);
1040                let (sender, mut receiver) = broadcast(max_queued);
1041                receiver.set_await_active(false);
1042                if self.is_bus() && msg_type == Type::Signal {
1043                    fdo::DBusProxy::builder(self)
1044                        .cache_properties(CacheProperties::No)
1045                        .build()
1046                        .await?
1047                        .add_match_rule(e.key().inner().clone())
1048                        .await?;
1049                }
1050                e.insert((1, receiver.clone().deactivate()));
1051                self.inner
1052                    .msg_senders
1053                    .lock()
1054                    .await
1055                    .insert(Some(rule), sender);
1056
1057                Ok(receiver)
1058            }
1059            Entry::Occupied(mut e) => {
1060                let (num_subscriptions, receiver) = e.get_mut();
1061                *num_subscriptions += 1;
1062                if let Some(max_queued) = max_queued {
1063                    if max_queued > receiver.capacity() {
1064                        receiver.set_capacity(max_queued);
1065                    }
1066                }
1067
1068                Ok(receiver.activate_cloned())
1069            }
1070        }
1071    }
1072
1073    pub(crate) async fn remove_match(&self, rule: OwnedMatchRule) -> Result<bool> {
1074        use std::collections::hash_map::Entry;
1075        let mut subscriptions = self.inner.subscriptions.lock().await;
1076        // TODO when it becomes stable, use HashMap::raw_entry and only require expr: &str
1077        // (both here and in add_match)
1078        let msg_type = rule.msg_type().unwrap_or(Type::Signal);
1079        match subscriptions.entry(rule) {
1080            Entry::Vacant(_) => Ok(false),
1081            Entry::Occupied(mut e) => {
1082                let rule = e.key().inner().clone();
1083                e.get_mut().0 -= 1;
1084                if e.get().0 == 0 {
1085                    if self.is_bus() && msg_type == Type::Signal {
1086                        fdo::DBusProxy::builder(self)
1087                            .cache_properties(CacheProperties::No)
1088                            .build()
1089                            .await?
1090                            .remove_match_rule(rule.clone())
1091                            .await?;
1092                    }
1093                    e.remove();
1094                    self.inner
1095                        .msg_senders
1096                        .lock()
1097                        .await
1098                        .remove(&Some(rule.into()));
1099                }
1100                Ok(true)
1101            }
1102        }
1103    }
1104
1105    pub(crate) fn queue_remove_match(&self, rule: OwnedMatchRule) {
1106        let conn = self.clone();
1107        let task_name = format!("Remove match `{}`", *rule);
1108        let remove_match =
1109            async move { conn.remove_match(rule).await }.instrument(trace_span!("{}", task_name));
1110        self.inner.executor.spawn(remove_match, &task_name).detach()
1111    }
1112
1113    pub(crate) async fn new(
1114        auth: Authenticated,
1115        #[allow(unused)] bus_connection: bool,
1116        executor: Executor<'static>,
1117    ) -> Result<Self> {
1118        #[cfg(unix)]
1119        let cap_unix_fd = auth.cap_unix_fd;
1120
1121        macro_rules! create_msg_broadcast_channel {
1122            ($size:expr) => {{
1123                let (msg_sender, msg_receiver) = broadcast($size);
1124                let mut msg_receiver = msg_receiver.deactivate();
1125                msg_receiver.set_await_active(false);
1126
1127                (msg_sender, msg_receiver)
1128            }};
1129        }
1130        // The unfiltered message channel.
1131        let (msg_sender, msg_receiver) = create_msg_broadcast_channel!(DEFAULT_MAX_QUEUED);
1132        let mut msg_senders = HashMap::new();
1133        msg_senders.insert(None, msg_sender);
1134
1135        // The special method return & error channel.
1136        let (method_return_sender, method_return_receiver) =
1137            create_msg_broadcast_channel!(DEFAULT_MAX_METHOD_RETURN_QUEUED);
1138        let rule = MatchRule::builder()
1139            .msg_type(Type::MethodReturn)
1140            .build()
1141            .into();
1142        msg_senders.insert(Some(rule), method_return_sender.clone());
1143        let rule = MatchRule::builder().msg_type(Type::Error).build().into();
1144        msg_senders.insert(Some(rule), method_return_sender);
1145        let msg_senders = Arc::new(Mutex::new(msg_senders));
1146        let subscriptions = Mutex::new(HashMap::new());
1147
1148        let connection = Self {
1149            inner: Arc::new(ConnectionInner {
1150                activity_event: Arc::new(Event::new()),
1151                socket_write: Mutex::new(auth.socket_write),
1152                server_guid: auth.server_guid,
1153                #[cfg(unix)]
1154                cap_unix_fd,
1155                #[cfg(feature = "p2p")]
1156                bus_conn: bus_connection,
1157                unique_name: OnceLock::new(),
1158                subscriptions,
1159                object_server: OnceLock::new(),
1160                object_server_dispatch_task: OnceLock::new(),
1161                executor,
1162                socket_reader_task: OnceLock::new(),
1163                msg_senders,
1164                msg_receiver,
1165                method_return_receiver,
1166                registered_names: Mutex::new(HashMap::new()),
1167            }),
1168        };
1169
1170        if let Some(unique_name) = auth.unique_name {
1171            connection.set_unique_name_(unique_name);
1172        }
1173
1174        Ok(connection)
1175    }
1176
1177    /// Create a `Connection` to the session/user message bus.
1178    pub async fn session() -> Result<Self> {
1179        Builder::session()?.build().await
1180    }
1181
1182    /// Create a `Connection` to the system-wide message bus.
1183    pub async fn system() -> Result<Self> {
1184        Builder::system()?.build().await
1185    }
1186
1187    /// Returns a listener, notified on various connection activity.
1188    ///
1189    /// This function is meant for the caller to implement idle or timeout on inactivity.
1190    pub fn monitor_activity(&self) -> EventListener {
1191        self.inner.activity_event.listen()
1192    }
1193
1194    /// Returns the peer credentials.
1195    ///
1196    /// The fields are populated on the best effort basis. Some or all fields may not even make
1197    /// sense for certain sockets or on certain platforms and hence will be set to `None`.
1198    ///
1199    /// # Caveats
1200    ///
1201    /// Currently `unix_group_ids` and `linux_security_label` fields are not populated.
1202    pub async fn peer_credentials(&self) -> io::Result<ConnectionCredentials> {
1203        self.inner
1204            .socket_write
1205            .lock()
1206            .await
1207            .peer_credentials()
1208            .await
1209    }
1210
1211    /// Close the connection.
1212    ///
1213    /// After this call, all reading and writing operations will fail.
1214    pub async fn close(self) -> Result<()> {
1215        self.inner.activity_event.notify(usize::MAX);
1216        self.inner
1217            .socket_write
1218            .lock()
1219            .await
1220            .close()
1221            .await
1222            .map_err(Into::into)
1223    }
1224
1225    pub(crate) fn init_socket_reader(
1226        &self,
1227        socket_read: Box<dyn socket::ReadHalf>,
1228        already_read: Vec<u8>,
1229        #[cfg(unix)] already_received_fds: Vec<std::os::fd::OwnedFd>,
1230    ) {
1231        let inner = &self.inner;
1232        inner
1233            .socket_reader_task
1234            .set(
1235                SocketReader::new(
1236                    socket_read,
1237                    inner.msg_senders.clone(),
1238                    already_read,
1239                    #[cfg(unix)]
1240                    already_received_fds,
1241                    inner.activity_event.clone(),
1242                )
1243                .spawn(&inner.executor),
1244            )
1245            .expect("Attempted to set `socket_reader_task` twice");
1246    }
1247
1248    fn set_unique_name_(&self, name: OwnedUniqueName) {
1249        self.inner
1250            .unique_name
1251            .set(name)
1252            // programmer (probably our) error if this fails.
1253            .expect("unique name already set");
1254    }
1255}
1256
1257impl From<crate::blocking::Connection> for Connection {
1258    fn from(conn: crate::blocking::Connection) -> Self {
1259        conn.into_inner()
1260    }
1261}
1262
1263// Internal API that allows keeping a weak connection ref around.
1264#[derive(Debug)]
1265pub(crate) struct WeakConnection {
1266    inner: Weak<ConnectionInner>,
1267}
1268
1269impl WeakConnection {
1270    /// Upgrade to a Connection.
1271    pub fn upgrade(&self) -> Option<Connection> {
1272        self.inner.upgrade().map(|inner| Connection { inner })
1273    }
1274}
1275
1276impl From<&Connection> for WeakConnection {
1277    fn from(conn: &Connection) -> Self {
1278        Self {
1279            inner: Arc::downgrade(&conn.inner),
1280        }
1281    }
1282}
1283
1284#[derive(Debug)]
1285enum NameStatus {
1286    // The task waits for name lost signal if owner allows replacement.
1287    Owner(#[allow(unused)] Option<Task<()>>),
1288    // The task waits for name acquisition signal.
1289    Queued(#[allow(unused)] Task<()>),
1290}
1291
1292static SERIAL_NUM_SEMAPHORE: Semaphore = Semaphore::new(1);
1293
1294// Make message creation and sending an atomic operation, using an async
1295// semaphore if flatpak portal is detected to workaround an xdg-dbus-proxy issue:
1296//
1297// https://github.com/flatpak/xdg-dbus-proxy/issues/46
1298async fn acquire_serial_num_semaphore() -> Option<SemaphorePermit<'static>> {
1299    if is_flatpak() {
1300        Some(SERIAL_NUM_SEMAPHORE.acquire().await)
1301    } else {
1302        None
1303    }
1304}
1305
1306#[cfg(test)]
1307mod tests {
1308    use super::*;
1309    use crate::fdo::DBusProxy;
1310    use ntest::timeout;
1311    use test_log::test;
1312
1313    #[cfg(windows)]
1314    #[test]
1315    fn connect_autolaunch_session_bus() {
1316        let addr =
1317            crate::win32::autolaunch_bus_address().expect("Unable to get session bus address");
1318
1319        crate::block_on(async { addr.connect().await }).expect("Unable to connect to session bus");
1320    }
1321
1322    #[cfg(target_os = "macos")]
1323    #[test]
1324    fn connect_launchd_session_bus() {
1325        use crate::address::{transport::Launchd, Address, Transport};
1326        crate::block_on(async {
1327            let addr = Address::from(Transport::Launchd(Launchd::new(
1328                "DBUS_LAUNCHD_SESSION_BUS_SOCKET",
1329            )));
1330            addr.connect().await
1331        })
1332        .expect("Unable to connect to session bus");
1333    }
1334
1335    #[test]
1336    #[timeout(15000)]
1337    fn disconnect_on_drop() {
1338        // Reproducer for https://github.com/dbus2/zbus/issues/308 where setting up the
1339        // objectserver would cause the connection to not disconnect on drop.
1340        crate::utils::block_on(test_disconnect_on_drop());
1341    }
1342
1343    async fn test_disconnect_on_drop() {
1344        #[derive(Default)]
1345        struct MyInterface {}
1346
1347        #[crate::interface(name = "dev.peelz.FooBar.Baz")]
1348        impl MyInterface {
1349            fn do_thing(&self) {}
1350        }
1351        let name = "dev.peelz.foobar";
1352        let connection = Builder::session()
1353            .unwrap()
1354            .name(name)
1355            .unwrap()
1356            .serve_at("/dev/peelz/FooBar", MyInterface::default())
1357            .unwrap()
1358            .build()
1359            .await
1360            .unwrap();
1361
1362        let connection2 = Connection::session().await.unwrap();
1363        let dbus = DBusProxy::new(&connection2).await.unwrap();
1364        let mut stream = dbus
1365            .receive_name_owner_changed_with_args(&[(0, name), (2, "")])
1366            .await
1367            .unwrap();
1368
1369        drop(connection);
1370
1371        // If the connection is not dropped, this will hang forever.
1372        stream.next().await.unwrap();
1373
1374        // Let's still make sure the name is gone.
1375        let name_has_owner = dbus.name_has_owner(name.try_into().unwrap()).await.unwrap();
1376        assert!(!name_has_owner);
1377    }
1378}
1379
1380#[cfg(feature = "p2p")]
1381#[cfg(test)]
1382mod p2p_tests {
1383    use futures_util::stream::TryStreamExt;
1384    use ntest::timeout;
1385    use test_log::test;
1386    use zvariant::{Endian, NATIVE_ENDIAN};
1387
1388    use crate::{AuthMechanism, Guid};
1389
1390    use super::*;
1391
1392    // Same numbered client and server are already paired up.
1393    async fn test_p2p(
1394        server1: Connection,
1395        client1: Connection,
1396        server2: Connection,
1397        client2: Connection,
1398    ) -> Result<()> {
1399        let forward1 = {
1400            let stream = MessageStream::from(server1.clone());
1401            let sink = client2.clone();
1402
1403            stream.try_for_each(move |msg| {
1404                let sink = sink.clone();
1405                async move { sink.send(&msg).await }
1406            })
1407        };
1408        let forward2 = {
1409            let stream = MessageStream::from(client2.clone());
1410            let sink = server1.clone();
1411
1412            stream.try_for_each(move |msg| {
1413                let sink = sink.clone();
1414                async move { sink.send(&msg).await }
1415            })
1416        };
1417        let _forward_task = client1.executor().spawn(
1418            async move { futures_util::try_join!(forward1, forward2) },
1419            "forward_task",
1420        );
1421
1422        let server_ready = Event::new();
1423        let server_ready_listener = server_ready.listen();
1424        let client_done = Event::new();
1425        let client_done_listener = client_done.listen();
1426
1427        let server_future = async move {
1428            let mut stream = MessageStream::from(&server2);
1429            server_ready.notify(1);
1430            let method = loop {
1431                let m = stream.try_next().await?.unwrap();
1432                if m.to_string() == "Method call Test" {
1433                    assert_eq!(m.body().deserialize::<u64>().unwrap(), 64);
1434                    break m;
1435                }
1436            };
1437
1438            // Send another message first to check the queueing function on client side.
1439            server2
1440                .emit_signal(None::<()>, "/", "org.zbus.p2p", "ASignalForYou", &())
1441                .await?;
1442            server2.reply(&method, &("yay")).await?;
1443            client_done_listener.await;
1444
1445            Ok(())
1446        };
1447
1448        let client_future = async move {
1449            let mut stream = MessageStream::from(&client1);
1450            server_ready_listener.await;
1451            // We want to set non-native endian to ensure that:
1452            // 1. the message is actually encoded with the specified endian.
1453            // 2. the server side is able to decode it and replies in the same encoding.
1454            let endian = match NATIVE_ENDIAN {
1455                Endian::Little => Endian::Big,
1456                Endian::Big => Endian::Little,
1457            };
1458            let method = Message::method("/", "Test")?
1459                .interface("org.zbus.p2p")?
1460                .endian(endian)
1461                .build(&64u64)?;
1462            client1.send(&method).await?;
1463            // Check we didn't miss the signal that was sent during the call.
1464            let m = stream.try_next().await?.unwrap();
1465            client_done.notify(1);
1466            assert_eq!(m.to_string(), "Signal ASignalForYou");
1467            let reply = stream.try_next().await?.unwrap();
1468            assert_eq!(reply.to_string(), "Method return");
1469            // Check if the reply was in the non-native endian.
1470            assert_eq!(Endian::from(reply.primary_header().endian_sig()), endian);
1471            reply.body().deserialize::<String>()
1472        };
1473
1474        let (val, _) = futures_util::try_join!(client_future, server_future,)?;
1475        assert_eq!(val, "yay");
1476
1477        Ok(())
1478    }
1479
1480    #[test]
1481    #[timeout(15000)]
1482    fn tcp_p2p() {
1483        crate::utils::block_on(test_tcp_p2p()).unwrap();
1484    }
1485
1486    async fn test_tcp_p2p() -> Result<()> {
1487        let (server1, client1) = tcp_p2p_pipe().await?;
1488        let (server2, client2) = tcp_p2p_pipe().await?;
1489
1490        test_p2p(server1, client1, server2, client2).await
1491    }
1492
1493    async fn tcp_p2p_pipe() -> Result<(Connection, Connection)> {
1494        let guid = Guid::generate();
1495
1496        #[cfg(not(feature = "tokio"))]
1497        let (server_conn_builder, client_conn_builder) = {
1498            let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1499            let addr = listener.local_addr().unwrap();
1500            let p1 = std::net::TcpStream::connect(addr).unwrap();
1501            let p0 = listener.incoming().next().unwrap().unwrap();
1502
1503            (
1504                Builder::tcp_stream(p0)
1505                    .server(guid)
1506                    .unwrap()
1507                    .p2p()
1508                    .auth_mechanism(AuthMechanism::Anonymous),
1509                Builder::tcp_stream(p1).p2p(),
1510            )
1511        };
1512
1513        #[cfg(feature = "tokio")]
1514        let (server_conn_builder, client_conn_builder) = {
1515            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1516            let addr = listener.local_addr().unwrap();
1517            let p1 = tokio::net::TcpStream::connect(addr).await.unwrap();
1518            let p0 = listener.accept().await.unwrap().0;
1519
1520            (
1521                Builder::tcp_stream(p0)
1522                    .server(guid)
1523                    .unwrap()
1524                    .p2p()
1525                    .auth_mechanism(AuthMechanism::Anonymous),
1526                Builder::tcp_stream(p1).p2p(),
1527            )
1528        };
1529
1530        futures_util::try_join!(server_conn_builder.build(), client_conn_builder.build())
1531    }
1532
1533    #[cfg(unix)]
1534    #[test]
1535    #[timeout(15000)]
1536    fn unix_p2p() {
1537        crate::utils::block_on(test_unix_p2p()).unwrap();
1538    }
1539
1540    #[cfg(unix)]
1541    async fn test_unix_p2p() -> Result<()> {
1542        let (server1, client1) = unix_p2p_pipe().await?;
1543        let (server2, client2) = unix_p2p_pipe().await?;
1544
1545        test_p2p(server1, client1, server2, client2).await
1546    }
1547
1548    #[cfg(unix)]
1549    async fn unix_p2p_pipe() -> Result<(Connection, Connection)> {
1550        #[cfg(not(feature = "tokio"))]
1551        use std::os::unix::net::UnixStream;
1552        #[cfg(feature = "tokio")]
1553        use tokio::net::UnixStream;
1554        #[cfg(all(windows, not(feature = "tokio")))]
1555        use uds_windows::UnixStream;
1556
1557        let guid = Guid::generate();
1558
1559        let (p0, p1) = UnixStream::pair().unwrap();
1560
1561        futures_util::try_join!(
1562            Builder::unix_stream(p1).p2p().build(),
1563            Builder::unix_stream(p0).server(guid).unwrap().p2p().build(),
1564        )
1565    }
1566
1567    // Compile-test only since we don't have a VM setup to run this with/in.
1568    #[cfg(any(
1569        all(feature = "vsock", not(feature = "tokio")),
1570        feature = "tokio-vsock"
1571    ))]
1572    #[test]
1573    #[timeout(15000)]
1574    #[ignore]
1575    fn vsock_p2p() {
1576        crate::utils::block_on(test_vsock_p2p()).unwrap();
1577    }
1578
1579    #[cfg(any(
1580        all(feature = "vsock", not(feature = "tokio")),
1581        feature = "tokio-vsock"
1582    ))]
1583    async fn test_vsock_p2p() -> Result<()> {
1584        let (server1, client1) = vsock_p2p_pipe().await?;
1585        let (server2, client2) = vsock_p2p_pipe().await?;
1586
1587        test_p2p(server1, client1, server2, client2).await
1588    }
1589
1590    #[cfg(all(feature = "vsock", not(feature = "tokio")))]
1591    async fn vsock_p2p_pipe() -> Result<(Connection, Connection)> {
1592        let guid = Guid::generate();
1593
1594        let listener = vsock::VsockListener::bind_with_cid_port(vsock::VMADDR_CID_ANY, 42).unwrap();
1595        let addr = listener.local_addr().unwrap();
1596        let client = vsock::VsockStream::connect(&addr).unwrap();
1597        let server = listener.incoming().next().unwrap().unwrap();
1598
1599        futures_util::try_join!(
1600            Builder::vsock_stream(server)
1601                .server(guid)
1602                .unwrap()
1603                .p2p()
1604                .auth_mechanism(AuthMechanism::Anonymous)
1605                .build(),
1606            Builder::vsock_stream(client).p2p().build(),
1607        )
1608    }
1609
1610    #[cfg(feature = "tokio-vsock")]
1611    async fn vsock_p2p_pipe() -> Result<(Connection, Connection)> {
1612        let guid = Guid::generate();
1613
1614        let listener = tokio_vsock::VsockListener::bind(2, 42).unwrap();
1615        let client = tokio_vsock::VsockStream::connect(3, 42).await.unwrap();
1616        let server = listener.incoming().next().await.unwrap().unwrap();
1617
1618        futures_util::try_join!(
1619            Builder::vsock_stream(server)
1620                .server(guid)
1621                .unwrap()
1622                .p2p()
1623                .auth_mechanism(AuthMechanism::Anonymous)
1624                .build(),
1625            Builder::vsock_stream(client).p2p().build(),
1626        )
1627    }
1628    #[cfg(any(unix, not(feature = "tokio")))]
1629    #[test]
1630    #[timeout(15000)]
1631    fn unix_p2p_cookie_auth() {
1632        use crate::utils::block_on;
1633        use std::{
1634            fs::{create_dir_all, remove_file, write},
1635            time::{SystemTime as Time, UNIX_EPOCH},
1636        };
1637        #[cfg(unix)]
1638        use std::{
1639            fs::{set_permissions, Permissions},
1640            os::unix::fs::PermissionsExt,
1641        };
1642        use xdg_home::home_dir;
1643
1644        let cookie_context = "zbus-test-cookie-context";
1645        let cookie_id = 123456789;
1646        let cookie = hex::encode(b"our cookie");
1647
1648        // Ensure cookie directory exists.
1649        let cookie_dir = home_dir().unwrap().join(".dbus-keyrings");
1650        create_dir_all(&cookie_dir).unwrap();
1651        #[cfg(unix)]
1652        set_permissions(&cookie_dir, Permissions::from_mode(0o700)).unwrap();
1653
1654        // Create a cookie file.
1655        let cookie_file = cookie_dir.join(cookie_context);
1656        let ts = Time::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
1657        let cookie_entry = format!("{cookie_id} {ts} {cookie}");
1658        write(&cookie_file, cookie_entry).unwrap();
1659
1660        // Explicit cookie ID.
1661        let res1 = block_on(test_unix_p2p_cookie_auth(cookie_context, Some(cookie_id)));
1662        // Implicit cookie ID (first one should be picked).
1663        let res2 = block_on(test_unix_p2p_cookie_auth(cookie_context, None));
1664
1665        // Remove the cookie file.
1666        remove_file(&cookie_file).unwrap();
1667
1668        res1.unwrap();
1669        res2.unwrap();
1670    }
1671
1672    #[cfg(any(unix, not(feature = "tokio")))]
1673    async fn test_unix_p2p_cookie_auth(
1674        cookie_context: &'static str,
1675        cookie_id: Option<usize>,
1676    ) -> Result<()> {
1677        #[cfg(all(unix, not(feature = "tokio")))]
1678        use std::os::unix::net::UnixStream;
1679        #[cfg(all(unix, feature = "tokio"))]
1680        use tokio::net::UnixStream;
1681        #[cfg(all(windows, not(feature = "tokio")))]
1682        use uds_windows::UnixStream;
1683
1684        let guid = Guid::generate();
1685
1686        let (p0, p1) = UnixStream::pair().unwrap();
1687        let mut server_builder = Builder::unix_stream(p0)
1688            .server(guid)
1689            .unwrap()
1690            .p2p()
1691            .auth_mechanism(AuthMechanism::Cookie)
1692            .cookie_context(cookie_context)
1693            .unwrap();
1694        if let Some(cookie_id) = cookie_id {
1695            server_builder = server_builder.cookie_id(cookie_id);
1696        }
1697
1698        futures_util::try_join!(
1699            Builder::unix_stream(p1).p2p().build(),
1700            server_builder.build(),
1701        )
1702        .map(|_| ())
1703    }
1704
1705    #[test]
1706    #[timeout(15000)]
1707    fn channel_pair() {
1708        crate::utils::block_on(test_channel_pair()).unwrap();
1709    }
1710
1711    async fn test_channel_pair() -> Result<()> {
1712        let (server1, client1) = create_channel_pair().await;
1713        let (server2, client2) = create_channel_pair().await;
1714
1715        test_p2p(server1, client1, server2, client2).await
1716    }
1717
1718    async fn create_channel_pair() -> (Connection, Connection) {
1719        let (a, b) = socket::Channel::pair();
1720
1721        let guid = crate::Guid::generate();
1722        let conn1 = Builder::authenticated_socket(a, guid.clone())
1723            .unwrap()
1724            .p2p()
1725            .build()
1726            .await
1727            .unwrap();
1728        let conn2 = Builder::authenticated_socket(b, guid)
1729            .unwrap()
1730            .p2p()
1731            .build()
1732            .await
1733            .unwrap();
1734
1735        (conn1, conn2)
1736    }
1737}