zbus/connection/
builder.rs

1#[cfg(not(feature = "tokio"))]
2use async_io::Async;
3use event_listener::Event;
4use static_assertions::assert_impl_all;
5#[cfg(not(feature = "tokio"))]
6use std::net::TcpStream;
7#[cfg(all(unix, not(feature = "tokio")))]
8use std::os::unix::net::UnixStream;
9use std::{
10    collections::{HashMap, HashSet, VecDeque},
11    vec,
12};
13#[cfg(feature = "tokio")]
14use tokio::net::TcpStream;
15#[cfg(all(unix, feature = "tokio"))]
16use tokio::net::UnixStream;
17#[cfg(feature = "tokio-vsock")]
18use tokio_vsock::VsockStream;
19#[cfg(all(windows, not(feature = "tokio")))]
20use uds_windows::UnixStream;
21#[cfg(all(feature = "vsock", not(feature = "tokio")))]
22use vsock::VsockStream;
23
24use zvariant::{ObjectPath, Str};
25
26use crate::{
27    address::{self, Address},
28    names::{InterfaceName, WellKnownName},
29    object_server::{ArcInterface, Interface},
30    Connection, Error, Executor, Guid, OwnedGuid, Result,
31};
32
33use super::{
34    handshake::{AuthMechanism, Authenticated},
35    socket::{BoxedSplit, ReadHalf, Split, WriteHalf},
36};
37
38const DEFAULT_MAX_QUEUED: usize = 64;
39
40#[derive(Debug)]
41enum Target {
42    #[cfg(any(unix, not(feature = "tokio")))]
43    UnixStream(UnixStream),
44    TcpStream(TcpStream),
45    #[cfg(any(
46        all(feature = "vsock", not(feature = "tokio")),
47        feature = "tokio-vsock"
48    ))]
49    VsockStream(VsockStream),
50    Address(Address),
51    Socket(Split<Box<dyn ReadHalf>, Box<dyn WriteHalf>>),
52    AuthenticatedSocket(Split<Box<dyn ReadHalf>, Box<dyn WriteHalf>>),
53}
54
55type Interfaces<'a> = HashMap<ObjectPath<'a>, HashMap<InterfaceName<'static>, ArcInterface>>;
56
57/// A builder for [`zbus::Connection`].
58#[derive(Debug)]
59#[must_use]
60pub struct Builder<'a> {
61    target: Option<Target>,
62    max_queued: Option<usize>,
63    // This is only set for p2p server case or pre-authenticated sockets.
64    guid: Option<Guid<'a>>,
65    #[cfg(feature = "p2p")]
66    p2p: bool,
67    internal_executor: bool,
68    interfaces: Interfaces<'a>,
69    names: HashSet<WellKnownName<'a>>,
70    auth_mechanisms: Option<VecDeque<AuthMechanism>>,
71    #[cfg(feature = "bus-impl")]
72    unique_name: Option<crate::names::UniqueName<'a>>,
73    cookie_context: Option<super::handshake::CookieContext<'a>>,
74    cookie_id: Option<usize>,
75}
76
77assert_impl_all!(Builder<'_>: Send, Sync, Unpin);
78
79impl<'a> Builder<'a> {
80    /// Create a builder for the session/user message bus connection.
81    pub fn session() -> Result<Self> {
82        Ok(Self::new(Target::Address(Address::session()?)))
83    }
84
85    /// Create a builder for the system-wide message bus connection.
86    pub fn system() -> Result<Self> {
87        Ok(Self::new(Target::Address(Address::system()?)))
88    }
89
90    /// Create a builder for connection that will use the given [D-Bus bus address].
91    ///
92    /// # Example
93    ///
94    /// Here is an example of connecting to an IBus service:
95    ///
96    /// ```no_run
97    /// # use std::error::Error;
98    /// # use zbus::connection::Builder;
99    /// # use zbus::block_on;
100    /// #
101    /// # block_on(async {
102    /// let addr = "unix:\
103    ///     path=/home/zeenix/.cache/ibus/dbus-ET0Xzrk9,\
104    ///     guid=fdd08e811a6c7ebe1fef0d9e647230da";
105    /// let conn = Builder::address(addr)?
106    ///     .build()
107    ///     .await?;
108    ///
109    /// // Do something useful with `conn`..
110    /// #     drop(conn);
111    /// #     Ok::<(), zbus::Error>(())
112    /// # }).unwrap();
113    /// #
114    /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
115    /// ```
116    ///
117    /// **Note:** The IBus address is different for each session. You can find the address for your
118    /// current session using `ibus address` command.
119    ///
120    /// [D-Bus bus address]: https://dbus.freedesktop.org/doc/dbus-specification.html#addresses
121    pub fn address<A>(address: A) -> Result<Self>
122    where
123        A: TryInto<Address>,
124        A::Error: Into<Error>,
125    {
126        Ok(Self::new(Target::Address(
127            address.try_into().map_err(Into::into)?,
128        )))
129    }
130
131    /// Create a builder for connection that will use the given unix stream.
132    ///
133    /// If the default `async-io` feature is disabled, this method will expect
134    /// [`tokio::net::UnixStream`](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html)
135    /// argument.
136    ///
137    /// Since tokio currently [does not support Unix domain sockets][tuds] on Windows, this method
138    /// is not available when the `tokio` feature is enabled and building for Windows target.
139    ///
140    /// [tuds]: https://github.com/tokio-rs/tokio/issues/2201
141    #[cfg(any(unix, not(feature = "tokio")))]
142    pub fn unix_stream(stream: UnixStream) -> Self {
143        Self::new(Target::UnixStream(stream))
144    }
145
146    /// Create a builder for connection that will use the given TCP stream.
147    ///
148    /// If the default `async-io` feature is disabled, this method will expect
149    /// [`tokio::net::TcpStream`](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html)
150    /// argument.
151    pub fn tcp_stream(stream: TcpStream) -> Self {
152        Self::new(Target::TcpStream(stream))
153    }
154
155    /// Create a builder for connection that will use the given VSOCK stream.
156    ///
157    /// This method is only available when either `vsock` or `tokio-vsock` feature is enabled. The
158    /// type of `stream` is `vsock::VsockStream` with `vsock` feature and `tokio_vsock::VsockStream`
159    /// with `tokio-vsock` feature.
160    #[cfg(any(
161        all(feature = "vsock", not(feature = "tokio")),
162        feature = "tokio-vsock"
163    ))]
164    pub fn vsock_stream(stream: VsockStream) -> Self {
165        Self::new(Target::VsockStream(stream))
166    }
167
168    /// Create a builder for connection that will use the given socket.
169    pub fn socket<S: Into<BoxedSplit>>(socket: S) -> Self {
170        Self::new(Target::Socket(socket.into()))
171    }
172
173    /// Create a builder for a connection that will use the given pre-authenticated socket.
174    ///
175    /// This is similar to [`Builder::socket`], except that the socket is either already
176    /// authenticated or does not require authentication.
177    pub fn authenticated_socket<S, G>(socket: S, guid: G) -> Result<Self>
178    where
179        S: Into<BoxedSplit>,
180        G: TryInto<Guid<'a>>,
181        G::Error: Into<Error>,
182    {
183        let mut builder = Self::new(Target::AuthenticatedSocket(socket.into()));
184        builder.guid = Some(guid.try_into().map_err(Into::into)?);
185
186        Ok(builder)
187    }
188
189    /// Specify the mechanism to use during authentication.
190    pub fn auth_mechanism(self, auth_mechanism: AuthMechanism) -> Self {
191        #[allow(deprecated)]
192        self.auth_mechanisms(&[auth_mechanism])
193    }
194
195    /// Specify the mechanisms to use during authentication.
196    #[deprecated(since = "4.1.3", note = "Use `auth_mechanism` instead.")]
197    pub fn auth_mechanisms(mut self, auth_mechanisms: &[AuthMechanism]) -> Self {
198        self.auth_mechanisms = Some(VecDeque::from(auth_mechanisms.to_vec()));
199
200        self
201    }
202
203    /// The cookie context to use during authentication.
204    ///
205    /// This is only used when the `cookie` authentication mechanism is enabled and only valid for
206    /// server connection.
207    ///
208    /// If not specified, the default cookie context of `org_freedesktop_general` will be used.
209    ///
210    /// # Errors
211    ///
212    /// If the given string is not a valid cookie context.
213    pub fn cookie_context<C>(mut self, context: C) -> Result<Self>
214    where
215        C: Into<Str<'a>>,
216    {
217        self.cookie_context = Some(context.into().try_into()?);
218
219        Ok(self)
220    }
221
222    /// The ID of the cookie to use during authentication.
223    ///
224    /// This is only used when the `cookie` authentication mechanism is enabled and only valid for
225    /// server connection.
226    ///
227    /// If not specified, the first cookie found in the cookie context file will be used.
228    pub fn cookie_id(mut self, id: usize) -> Self {
229        self.cookie_id = Some(id);
230
231        self
232    }
233
234    /// The to-be-created connection will be a peer-to-peer connection.
235    ///
236    /// This method is only available when the `p2p` feature is enabled.
237    #[cfg(feature = "p2p")]
238    pub fn p2p(mut self) -> Self {
239        self.p2p = true;
240
241        self
242    }
243
244    /// The to-be-created connection will be a server using the given GUID.
245    ///
246    /// The to-be-created connection will wait for incoming client authentication handshake and
247    /// negotiation messages, for peer-to-peer communications after successful creation.
248    ///
249    /// This method is only available when the `p2p` feature is enabled.
250    ///
251    /// **NOTE:** This method is redundant when using [`Builder::authenticated_socket`] since the
252    /// latter already sets the GUID for the connection and zbus doesn't differentiate between a
253    /// server and a client connection, except for authentication.
254    #[cfg(feature = "p2p")]
255    pub fn server<G>(mut self, guid: G) -> Result<Self>
256    where
257        G: TryInto<Guid<'a>>,
258        G::Error: Into<Error>,
259    {
260        self.guid = Some(guid.try_into().map_err(Into::into)?);
261
262        Ok(self)
263    }
264
265    /// Set the capacity of the main (unfiltered) queue.
266    ///
267    /// Since typically you'd want to set this at instantiation time, you can set it through the
268    /// builder.
269    ///
270    /// # Example
271    ///
272    /// ```
273    /// # use std::error::Error;
274    /// # use zbus::connection::Builder;
275    /// # use zbus::block_on;
276    /// #
277    /// # block_on(async {
278    /// let conn = Builder::session()?
279    ///     .max_queued(30)
280    ///     .build()
281    ///     .await?;
282    /// assert_eq!(conn.max_queued(), 30);
283    ///
284    /// #     Ok::<(), zbus::Error>(())
285    /// # }).unwrap();
286    /// #
287    /// // Do something useful with `conn`..
288    /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
289    /// ```
290    pub fn max_queued(mut self, max: usize) -> Self {
291        self.max_queued = Some(max);
292
293        self
294    }
295
296    /// Enable or disable the internal executor thread.
297    ///
298    /// The thread is enabled by default.
299    ///
300    /// See [Connection::executor] for more details.
301    pub fn internal_executor(mut self, enabled: bool) -> Self {
302        self.internal_executor = enabled;
303
304        self
305    }
306
307    /// Register a D-Bus [`Interface`] to be served at a given path.
308    ///
309    /// This is similar to [`zbus::ObjectServer::at`], except that it allows you to have your
310    /// interfaces available immediately after the connection is established. Typically, this is
311    /// exactly what you'd want. Also in contrast to [`zbus::ObjectServer::at`], this method will
312    /// replace any previously added interface with the same name at the same path.
313    ///
314    /// Standard interfaces (Peer, Introspectable, Properties) are added on your behalf. If you
315    /// attempt to add yours, [`Builder::build()`] will fail.
316    pub fn serve_at<P, I>(mut self, path: P, iface: I) -> Result<Self>
317    where
318        I: Interface,
319        P: TryInto<ObjectPath<'a>>,
320        P::Error: Into<Error>,
321    {
322        let path = path.try_into().map_err(Into::into)?;
323        let entry = self.interfaces.entry(path).or_default();
324        entry.insert(I::name(), ArcInterface::new(iface));
325        Ok(self)
326    }
327
328    /// Register a well-known name for this connection on the bus.
329    ///
330    /// This is similar to [`zbus::Connection::request_name`], except the name is requested as part
331    /// of the connection setup ([`Builder::build`]), immediately after interfaces
332    /// registered (through [`Builder::serve_at`]) are advertised. Typically this is
333    /// exactly what you want.
334    pub fn name<W>(mut self, well_known_name: W) -> Result<Self>
335    where
336        W: TryInto<WellKnownName<'a>>,
337        W::Error: Into<Error>,
338    {
339        let well_known_name = well_known_name.try_into().map_err(Into::into)?;
340        self.names.insert(well_known_name);
341
342        Ok(self)
343    }
344
345    /// Sets the unique name of the connection.
346    ///
347    /// This is mainly provided for bus implementations. All other users should not need to use this
348    /// method. Hence why this method is only available when the `bus-impl` feature is enabled.
349    ///
350    /// # Panics
351    ///
352    /// It will panic if the connection is to a message bus as it's the bus that assigns
353    /// peers their unique names.
354    #[cfg(feature = "bus-impl")]
355    pub fn unique_name<U>(mut self, unique_name: U) -> Result<Self>
356    where
357        U: TryInto<crate::names::UniqueName<'a>>,
358        U::Error: Into<Error>,
359    {
360        if !self.p2p {
361            panic!("unique name can only be set for peer-to-peer connections");
362        }
363        let name = unique_name.try_into().map_err(Into::into)?;
364        self.unique_name = Some(name);
365
366        Ok(self)
367    }
368
369    /// Build the connection, consuming the builder.
370    ///
371    /// # Errors
372    ///
373    /// Until server-side bus connection is supported, attempting to build such a connection will
374    /// result in [`Error::Unsupported`] error.
375    pub async fn build(self) -> Result<Connection> {
376        let executor = Executor::new();
377        #[cfg(not(feature = "tokio"))]
378        let internal_executor = self.internal_executor;
379        // Box the future as it's large and can cause stack overflow.
380        let conn = Box::pin(executor.run(self.build_(executor.clone()))).await?;
381
382        #[cfg(not(feature = "tokio"))]
383        start_internal_executor(&executor, internal_executor)?;
384
385        Ok(conn)
386    }
387
388    async fn build_(mut self, executor: Executor<'static>) -> Result<Connection> {
389        #[cfg(feature = "p2p")]
390        let is_bus_conn = !self.p2p;
391        #[cfg(not(feature = "p2p"))]
392        let is_bus_conn = true;
393
394        #[cfg(not(feature = "bus-impl"))]
395        let unique_name = None;
396        #[cfg(feature = "bus-impl")]
397        let unique_name = self.unique_name.take().map(Into::into);
398
399        #[allow(unused_mut)]
400        let (mut stream, server_guid, authenticated) = self.target_connect().await?;
401        let mut auth = if authenticated {
402            let (socket_read, socket_write) = stream.take();
403            Authenticated {
404                #[cfg(unix)]
405                cap_unix_fd: socket_read.can_pass_unix_fd(),
406                socket_read: Some(socket_read),
407                socket_write,
408                // SAFETY: `server_guid` is provided as arg of `Builder::authenticated_socket`.
409                server_guid: server_guid.unwrap(),
410                already_received_bytes: vec![],
411                unique_name,
412                #[cfg(unix)]
413                already_received_fds: vec![],
414            }
415        } else {
416            #[cfg(feature = "p2p")]
417            match self.guid {
418                None => {
419                    // SASL Handshake
420                    Authenticated::client(stream, server_guid, self.auth_mechanisms, is_bus_conn)
421                        .await?
422                }
423                Some(guid) => {
424                    if !self.p2p {
425                        return Err(Error::Unsupported);
426                    }
427
428                    let creds = stream.read_mut().peer_credentials().await?;
429                    #[cfg(unix)]
430                    let client_uid = creds.unix_user_id();
431                    #[cfg(windows)]
432                    let client_sid = creds.into_windows_sid();
433
434                    Authenticated::server(
435                        stream,
436                        guid.to_owned().into(),
437                        #[cfg(unix)]
438                        client_uid,
439                        #[cfg(windows)]
440                        client_sid,
441                        self.auth_mechanisms,
442                        self.cookie_id,
443                        self.cookie_context.unwrap_or_default(),
444                        unique_name,
445                    )
446                    .await?
447                }
448            }
449
450            #[cfg(not(feature = "p2p"))]
451            Authenticated::client(stream, server_guid, self.auth_mechanisms, is_bus_conn).await?
452        };
453
454        // SAFETY: `Authenticated` is always built with these fields set to `Some`.
455        let socket_read = auth.socket_read.take().unwrap();
456        let already_received_bytes = auth.already_received_bytes.drain(..).collect();
457        #[cfg(unix)]
458        let already_received_fds = auth.already_received_fds.drain(..).collect();
459
460        let mut conn = Connection::new(auth, is_bus_conn, executor).await?;
461        conn.set_max_queued(self.max_queued.unwrap_or(DEFAULT_MAX_QUEUED));
462
463        if !self.interfaces.is_empty() {
464            let object_server = conn.sync_object_server(false, None);
465            for (path, interfaces) in self.interfaces {
466                for (name, iface) in interfaces {
467                    let added = object_server
468                        .inner()
469                        .add_arc_interface(path.clone(), name.clone(), iface.clone())
470                        .await?;
471                    if !added {
472                        return Err(Error::InterfaceExists(name.clone(), path.to_owned()));
473                    }
474                }
475            }
476
477            let started_event = Event::new();
478            let listener = started_event.listen();
479            conn.start_object_server(Some(started_event));
480
481            listener.await;
482        }
483
484        // Start the socket reader task.
485        conn.init_socket_reader(
486            socket_read,
487            already_received_bytes,
488            #[cfg(unix)]
489            already_received_fds,
490        );
491
492        for name in self.names {
493            conn.request_name(name).await?;
494        }
495
496        Ok(conn)
497    }
498
499    fn new(target: Target) -> Self {
500        Self {
501            target: Some(target),
502            #[cfg(feature = "p2p")]
503            p2p: false,
504            max_queued: None,
505            guid: None,
506            internal_executor: true,
507            interfaces: HashMap::new(),
508            names: HashSet::new(),
509            auth_mechanisms: None,
510            #[cfg(feature = "bus-impl")]
511            unique_name: None,
512            cookie_id: None,
513            cookie_context: None,
514        }
515    }
516
517    async fn target_connect(&mut self) -> Result<(BoxedSplit, Option<OwnedGuid>, bool)> {
518        let mut authenticated = false;
519        let mut guid = None;
520        // SAFETY: `self.target` is always `Some` from the beginning and this method is only called
521        // once.
522        let split = match self.target.take().unwrap() {
523            #[cfg(not(feature = "tokio"))]
524            Target::UnixStream(stream) => Async::new(stream)?.into(),
525            #[cfg(all(unix, feature = "tokio"))]
526            Target::UnixStream(stream) => stream.into(),
527            #[cfg(not(feature = "tokio"))]
528            Target::TcpStream(stream) => Async::new(stream)?.into(),
529            #[cfg(feature = "tokio")]
530            Target::TcpStream(stream) => stream.into(),
531            #[cfg(all(feature = "vsock", not(feature = "tokio")))]
532            Target::VsockStream(stream) => Async::new(stream)?.into(),
533            #[cfg(feature = "tokio-vsock")]
534            Target::VsockStream(stream) => stream.into(),
535            Target::Address(address) => {
536                guid = address.guid().map(|g| g.to_owned().into());
537                match address.connect().await? {
538                    #[cfg(any(unix, not(feature = "tokio")))]
539                    address::transport::Stream::Unix(stream) => stream.into(),
540                    address::transport::Stream::Tcp(stream) => stream.into(),
541                    #[cfg(any(
542                        all(feature = "vsock", not(feature = "tokio")),
543                        feature = "tokio-vsock"
544                    ))]
545                    address::transport::Stream::Vsock(stream) => stream.into(),
546                }
547            }
548            Target::Socket(stream) => stream,
549            Target::AuthenticatedSocket(stream) => {
550                authenticated = true;
551                guid = self.guid.take().map(Into::into);
552                stream
553            }
554        };
555
556        Ok((split, guid, authenticated))
557    }
558}
559
560/// Start the internal executor thread.
561///
562/// Returns a dummy task that keep the executor ticking thread from exiting due to absence of any
563/// tasks until socket reader task kicks in.
564#[cfg(not(feature = "tokio"))]
565fn start_internal_executor(executor: &Executor<'static>, internal_executor: bool) -> Result<()> {
566    if internal_executor {
567        let executor = executor.clone();
568        std::thread::Builder::new()
569            .name("zbus::Connection executor".into())
570            .spawn(move || {
571                crate::utils::block_on(async move {
572                    // Run as long as there is a task to run.
573                    while !executor.is_empty() {
574                        executor.tick().await;
575                    }
576                })
577            })?;
578    }
579
580    Ok(())
581}