1#[cfg(not(feature = "tokio"))]
2use async_io::Async;
3use event_listener::Event;
4use static_assertions::assert_impl_all;
5#[cfg(not(feature = "tokio"))]
6use std::net::TcpStream;
7#[cfg(all(unix, not(feature = "tokio")))]
8use std::os::unix::net::UnixStream;
9use std::{
10 collections::{HashMap, HashSet, VecDeque},
11 vec,
12};
13#[cfg(feature = "tokio")]
14use tokio::net::TcpStream;
15#[cfg(all(unix, feature = "tokio"))]
16use tokio::net::UnixStream;
17#[cfg(feature = "tokio-vsock")]
18use tokio_vsock::VsockStream;
19#[cfg(all(windows, not(feature = "tokio")))]
20use uds_windows::UnixStream;
21#[cfg(all(feature = "vsock", not(feature = "tokio")))]
22use vsock::VsockStream;
23
24use zvariant::{ObjectPath, Str};
25
26use crate::{
27 address::{self, Address},
28 names::{InterfaceName, WellKnownName},
29 object_server::{ArcInterface, Interface},
30 Connection, Error, Executor, Guid, OwnedGuid, Result,
31};
32
33use super::{
34 handshake::{AuthMechanism, Authenticated},
35 socket::{BoxedSplit, ReadHalf, Split, WriteHalf},
36};
37
38const DEFAULT_MAX_QUEUED: usize = 64;
39
40#[derive(Debug)]
41enum Target {
42 #[cfg(any(unix, not(feature = "tokio")))]
43 UnixStream(UnixStream),
44 TcpStream(TcpStream),
45 #[cfg(any(
46 all(feature = "vsock", not(feature = "tokio")),
47 feature = "tokio-vsock"
48 ))]
49 VsockStream(VsockStream),
50 Address(Address),
51 Socket(Split<Box<dyn ReadHalf>, Box<dyn WriteHalf>>),
52 AuthenticatedSocket(Split<Box<dyn ReadHalf>, Box<dyn WriteHalf>>),
53}
54
55type Interfaces<'a> = HashMap<ObjectPath<'a>, HashMap<InterfaceName<'static>, ArcInterface>>;
56
57#[derive(Debug)]
59#[must_use]
60pub struct Builder<'a> {
61 target: Option<Target>,
62 max_queued: Option<usize>,
63 guid: Option<Guid<'a>>,
65 #[cfg(feature = "p2p")]
66 p2p: bool,
67 internal_executor: bool,
68 interfaces: Interfaces<'a>,
69 names: HashSet<WellKnownName<'a>>,
70 auth_mechanisms: Option<VecDeque<AuthMechanism>>,
71 #[cfg(feature = "bus-impl")]
72 unique_name: Option<crate::names::UniqueName<'a>>,
73 cookie_context: Option<super::handshake::CookieContext<'a>>,
74 cookie_id: Option<usize>,
75}
76
77assert_impl_all!(Builder<'_>: Send, Sync, Unpin);
78
79impl<'a> Builder<'a> {
80 pub fn session() -> Result<Self> {
82 Ok(Self::new(Target::Address(Address::session()?)))
83 }
84
85 pub fn system() -> Result<Self> {
87 Ok(Self::new(Target::Address(Address::system()?)))
88 }
89
90 pub fn address<A>(address: A) -> Result<Self>
122 where
123 A: TryInto<Address>,
124 A::Error: Into<Error>,
125 {
126 Ok(Self::new(Target::Address(
127 address.try_into().map_err(Into::into)?,
128 )))
129 }
130
131 #[cfg(any(unix, not(feature = "tokio")))]
142 pub fn unix_stream(stream: UnixStream) -> Self {
143 Self::new(Target::UnixStream(stream))
144 }
145
146 pub fn tcp_stream(stream: TcpStream) -> Self {
152 Self::new(Target::TcpStream(stream))
153 }
154
155 #[cfg(any(
161 all(feature = "vsock", not(feature = "tokio")),
162 feature = "tokio-vsock"
163 ))]
164 pub fn vsock_stream(stream: VsockStream) -> Self {
165 Self::new(Target::VsockStream(stream))
166 }
167
168 pub fn socket<S: Into<BoxedSplit>>(socket: S) -> Self {
170 Self::new(Target::Socket(socket.into()))
171 }
172
173 pub fn authenticated_socket<S, G>(socket: S, guid: G) -> Result<Self>
178 where
179 S: Into<BoxedSplit>,
180 G: TryInto<Guid<'a>>,
181 G::Error: Into<Error>,
182 {
183 let mut builder = Self::new(Target::AuthenticatedSocket(socket.into()));
184 builder.guid = Some(guid.try_into().map_err(Into::into)?);
185
186 Ok(builder)
187 }
188
189 pub fn auth_mechanism(self, auth_mechanism: AuthMechanism) -> Self {
191 #[allow(deprecated)]
192 self.auth_mechanisms(&[auth_mechanism])
193 }
194
195 #[deprecated(since = "4.1.3", note = "Use `auth_mechanism` instead.")]
197 pub fn auth_mechanisms(mut self, auth_mechanisms: &[AuthMechanism]) -> Self {
198 self.auth_mechanisms = Some(VecDeque::from(auth_mechanisms.to_vec()));
199
200 self
201 }
202
203 pub fn cookie_context<C>(mut self, context: C) -> Result<Self>
214 where
215 C: Into<Str<'a>>,
216 {
217 self.cookie_context = Some(context.into().try_into()?);
218
219 Ok(self)
220 }
221
222 pub fn cookie_id(mut self, id: usize) -> Self {
229 self.cookie_id = Some(id);
230
231 self
232 }
233
234 #[cfg(feature = "p2p")]
238 pub fn p2p(mut self) -> Self {
239 self.p2p = true;
240
241 self
242 }
243
244 #[cfg(feature = "p2p")]
255 pub fn server<G>(mut self, guid: G) -> Result<Self>
256 where
257 G: TryInto<Guid<'a>>,
258 G::Error: Into<Error>,
259 {
260 self.guid = Some(guid.try_into().map_err(Into::into)?);
261
262 Ok(self)
263 }
264
265 pub fn max_queued(mut self, max: usize) -> Self {
291 self.max_queued = Some(max);
292
293 self
294 }
295
296 pub fn internal_executor(mut self, enabled: bool) -> Self {
302 self.internal_executor = enabled;
303
304 self
305 }
306
307 pub fn serve_at<P, I>(mut self, path: P, iface: I) -> Result<Self>
317 where
318 I: Interface,
319 P: TryInto<ObjectPath<'a>>,
320 P::Error: Into<Error>,
321 {
322 let path = path.try_into().map_err(Into::into)?;
323 let entry = self.interfaces.entry(path).or_default();
324 entry.insert(I::name(), ArcInterface::new(iface));
325 Ok(self)
326 }
327
328 pub fn name<W>(mut self, well_known_name: W) -> Result<Self>
335 where
336 W: TryInto<WellKnownName<'a>>,
337 W::Error: Into<Error>,
338 {
339 let well_known_name = well_known_name.try_into().map_err(Into::into)?;
340 self.names.insert(well_known_name);
341
342 Ok(self)
343 }
344
345 #[cfg(feature = "bus-impl")]
355 pub fn unique_name<U>(mut self, unique_name: U) -> Result<Self>
356 where
357 U: TryInto<crate::names::UniqueName<'a>>,
358 U::Error: Into<Error>,
359 {
360 if !self.p2p {
361 panic!("unique name can only be set for peer-to-peer connections");
362 }
363 let name = unique_name.try_into().map_err(Into::into)?;
364 self.unique_name = Some(name);
365
366 Ok(self)
367 }
368
369 pub async fn build(self) -> Result<Connection> {
376 let executor = Executor::new();
377 #[cfg(not(feature = "tokio"))]
378 let internal_executor = self.internal_executor;
379 let conn = Box::pin(executor.run(self.build_(executor.clone()))).await?;
381
382 #[cfg(not(feature = "tokio"))]
383 start_internal_executor(&executor, internal_executor)?;
384
385 Ok(conn)
386 }
387
388 async fn build_(mut self, executor: Executor<'static>) -> Result<Connection> {
389 #[cfg(feature = "p2p")]
390 let is_bus_conn = !self.p2p;
391 #[cfg(not(feature = "p2p"))]
392 let is_bus_conn = true;
393
394 #[cfg(not(feature = "bus-impl"))]
395 let unique_name = None;
396 #[cfg(feature = "bus-impl")]
397 let unique_name = self.unique_name.take().map(Into::into);
398
399 #[allow(unused_mut)]
400 let (mut stream, server_guid, authenticated) = self.target_connect().await?;
401 let mut auth = if authenticated {
402 let (socket_read, socket_write) = stream.take();
403 Authenticated {
404 #[cfg(unix)]
405 cap_unix_fd: socket_read.can_pass_unix_fd(),
406 socket_read: Some(socket_read),
407 socket_write,
408 server_guid: server_guid.unwrap(),
410 already_received_bytes: vec![],
411 unique_name,
412 #[cfg(unix)]
413 already_received_fds: vec![],
414 }
415 } else {
416 #[cfg(feature = "p2p")]
417 match self.guid {
418 None => {
419 Authenticated::client(stream, server_guid, self.auth_mechanisms, is_bus_conn)
421 .await?
422 }
423 Some(guid) => {
424 if !self.p2p {
425 return Err(Error::Unsupported);
426 }
427
428 let creds = stream.read_mut().peer_credentials().await?;
429 #[cfg(unix)]
430 let client_uid = creds.unix_user_id();
431 #[cfg(windows)]
432 let client_sid = creds.into_windows_sid();
433
434 Authenticated::server(
435 stream,
436 guid.to_owned().into(),
437 #[cfg(unix)]
438 client_uid,
439 #[cfg(windows)]
440 client_sid,
441 self.auth_mechanisms,
442 self.cookie_id,
443 self.cookie_context.unwrap_or_default(),
444 unique_name,
445 )
446 .await?
447 }
448 }
449
450 #[cfg(not(feature = "p2p"))]
451 Authenticated::client(stream, server_guid, self.auth_mechanisms, is_bus_conn).await?
452 };
453
454 let socket_read = auth.socket_read.take().unwrap();
456 let already_received_bytes = auth.already_received_bytes.drain(..).collect();
457 #[cfg(unix)]
458 let already_received_fds = auth.already_received_fds.drain(..).collect();
459
460 let mut conn = Connection::new(auth, is_bus_conn, executor).await?;
461 conn.set_max_queued(self.max_queued.unwrap_or(DEFAULT_MAX_QUEUED));
462
463 if !self.interfaces.is_empty() {
464 let object_server = conn.sync_object_server(false, None);
465 for (path, interfaces) in self.interfaces {
466 for (name, iface) in interfaces {
467 let added = object_server
468 .inner()
469 .add_arc_interface(path.clone(), name.clone(), iface.clone())
470 .await?;
471 if !added {
472 return Err(Error::InterfaceExists(name.clone(), path.to_owned()));
473 }
474 }
475 }
476
477 let started_event = Event::new();
478 let listener = started_event.listen();
479 conn.start_object_server(Some(started_event));
480
481 listener.await;
482 }
483
484 conn.init_socket_reader(
486 socket_read,
487 already_received_bytes,
488 #[cfg(unix)]
489 already_received_fds,
490 );
491
492 for name in self.names {
493 conn.request_name(name).await?;
494 }
495
496 Ok(conn)
497 }
498
499 fn new(target: Target) -> Self {
500 Self {
501 target: Some(target),
502 #[cfg(feature = "p2p")]
503 p2p: false,
504 max_queued: None,
505 guid: None,
506 internal_executor: true,
507 interfaces: HashMap::new(),
508 names: HashSet::new(),
509 auth_mechanisms: None,
510 #[cfg(feature = "bus-impl")]
511 unique_name: None,
512 cookie_id: None,
513 cookie_context: None,
514 }
515 }
516
517 async fn target_connect(&mut self) -> Result<(BoxedSplit, Option<OwnedGuid>, bool)> {
518 let mut authenticated = false;
519 let mut guid = None;
520 let split = match self.target.take().unwrap() {
523 #[cfg(not(feature = "tokio"))]
524 Target::UnixStream(stream) => Async::new(stream)?.into(),
525 #[cfg(all(unix, feature = "tokio"))]
526 Target::UnixStream(stream) => stream.into(),
527 #[cfg(not(feature = "tokio"))]
528 Target::TcpStream(stream) => Async::new(stream)?.into(),
529 #[cfg(feature = "tokio")]
530 Target::TcpStream(stream) => stream.into(),
531 #[cfg(all(feature = "vsock", not(feature = "tokio")))]
532 Target::VsockStream(stream) => Async::new(stream)?.into(),
533 #[cfg(feature = "tokio-vsock")]
534 Target::VsockStream(stream) => stream.into(),
535 Target::Address(address) => {
536 guid = address.guid().map(|g| g.to_owned().into());
537 match address.connect().await? {
538 #[cfg(any(unix, not(feature = "tokio")))]
539 address::transport::Stream::Unix(stream) => stream.into(),
540 address::transport::Stream::Tcp(stream) => stream.into(),
541 #[cfg(any(
542 all(feature = "vsock", not(feature = "tokio")),
543 feature = "tokio-vsock"
544 ))]
545 address::transport::Stream::Vsock(stream) => stream.into(),
546 }
547 }
548 Target::Socket(stream) => stream,
549 Target::AuthenticatedSocket(stream) => {
550 authenticated = true;
551 guid = self.guid.take().map(Into::into);
552 stream
553 }
554 };
555
556 Ok((split, guid, authenticated))
557 }
558}
559
560#[cfg(not(feature = "tokio"))]
565fn start_internal_executor(executor: &Executor<'static>, internal_executor: bool) -> Result<()> {
566 if internal_executor {
567 let executor = executor.clone();
568 std::thread::Builder::new()
569 .name("zbus::Connection executor".into())
570 .spawn(move || {
571 crate::utils::block_on(async move {
572 while !executor.is_empty() {
574 executor.tick().await;
575 }
576 })
577 })?;
578 }
579
580 Ok(())
581}