zbus/object_server/
mod.rs

1//! The object server API.
2
3use event_listener::{Event, EventListener};
4use serde::{Deserialize, Serialize};
5use std::{
6    collections::{hash_map::Entry, HashMap},
7    fmt::Write,
8    marker::PhantomData,
9    ops::{Deref, DerefMut},
10    sync::Arc,
11};
12use tracing::{debug, instrument, trace, trace_span, Instrument};
13
14use static_assertions::assert_impl_all;
15use zbus_names::InterfaceName;
16use zvariant::{ObjectPath, OwnedObjectPath, OwnedValue, Signature, Type, Value};
17
18use crate::{
19    async_lock::{RwLock, RwLockReadGuard, RwLockWriteGuard},
20    connection::WeakConnection,
21    fdo,
22    fdo::{Introspectable, ManagedObjects, ObjectManager, Peer, Properties},
23    message::{Header, Message},
24    Connection, Error, Result,
25};
26
27mod interface;
28pub(crate) use interface::ArcInterface;
29pub use interface::{DispatchResult, Interface};
30
31mod signal_context;
32pub use signal_context::SignalContext;
33
34/// Opaque structure that derefs to an `Interface` type.
35pub struct InterfaceDeref<'d, I> {
36    iface: RwLockReadGuard<'d, dyn Interface>,
37    phantom: PhantomData<I>,
38}
39
40impl<I> Deref for InterfaceDeref<'_, I>
41where
42    I: Interface,
43{
44    type Target = I;
45
46    fn deref(&self) -> &I {
47        self.iface.downcast_ref::<I>().unwrap()
48    }
49}
50
51/// Opaque structure that mutably derefs to an `Interface` type.
52pub struct InterfaceDerefMut<'d, I> {
53    iface: RwLockWriteGuard<'d, dyn Interface>,
54    phantom: PhantomData<I>,
55}
56
57impl<I> Deref for InterfaceDerefMut<'_, I>
58where
59    I: Interface,
60{
61    type Target = I;
62
63    fn deref(&self) -> &I {
64        self.iface.downcast_ref::<I>().unwrap()
65    }
66}
67
68impl<I> DerefMut for InterfaceDerefMut<'_, I>
69where
70    I: Interface,
71{
72    fn deref_mut(&mut self) -> &mut Self::Target {
73        self.iface.downcast_mut::<I>().unwrap()
74    }
75}
76
77/// Wrapper over an interface, along with its corresponding `SignalContext`
78/// instance. A reference to the underlying interface may be obtained via
79/// [`InterfaceRef::get`] and [`InterfaceRef::get_mut`].
80pub struct InterfaceRef<I> {
81    ctxt: SignalContext<'static>,
82    lock: Arc<RwLock<dyn Interface>>,
83    phantom: PhantomData<I>,
84}
85
86impl<I> InterfaceRef<I>
87where
88    I: 'static,
89{
90    /// Get a reference to the underlying interface.
91    ///
92    /// **WARNING:** If methods (e.g property setters) in `ObjectServer` require `&mut self`
93    /// `ObjectServer` will not be able to access the interface in question until all references
94    /// of this method are dropped, it is highly recommended that the scope of the interface
95    /// returned is restricted.
96    pub async fn get(&self) -> InterfaceDeref<'_, I> {
97        let iface = self.lock.read().await;
98
99        iface
100            .downcast_ref::<I>()
101            .expect("Unexpected interface type");
102
103        InterfaceDeref {
104            iface,
105            phantom: PhantomData,
106        }
107    }
108
109    /// Get a reference to the underlying interface.
110    ///
111    /// **WARNINGS:** Since the `ObjectServer` will not be able to access the interface in question
112    /// until the return value of this method is dropped, it is highly recommended that the scope
113    /// of the interface returned is restricted.
114    ///
115    /// # Errors
116    ///
117    /// If the interface at this instance's path is not valid, `Error::InterfaceNotFound` error is
118    /// returned.
119    ///
120    /// # Examples
121    ///
122    /// ```no_run
123    /// # use std::error::Error;
124    /// # use async_io::block_on;
125    /// # use zbus::{Connection, interface};
126    ///
127    /// struct MyIface(u32);
128    ///
129    /// #[interface(name = "org.myiface.MyIface")]
130    /// impl MyIface {
131    ///    #[zbus(property)]
132    ///    async fn count(&self) -> u32 {
133    ///        self.0
134    ///    }
135    /// }
136    ///
137    /// # block_on(async {
138    /// // Setup connection and object_server etc here and then in another part of the code:
139    /// # let connection = Connection::session().await?;
140    /// #
141    /// # let path = "/org/zbus/path";
142    /// # connection.object_server().at(path, MyIface(22)).await?;
143    /// let object_server = connection.object_server();
144    /// let iface_ref = object_server.interface::<_, MyIface>(path).await?;
145    /// let mut iface = iface_ref.get_mut().await;
146    /// iface.0 = 42;
147    /// iface.count_changed(iface_ref.signal_context()).await?;
148    /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
149    /// # })?;
150    /// #
151    /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
152    /// ```
153    pub async fn get_mut(&self) -> InterfaceDerefMut<'_, I> {
154        let mut iface = self.lock.write().await;
155
156        iface
157            .downcast_ref::<I>()
158            .expect("Unexpected interface type");
159        iface
160            .downcast_mut::<I>()
161            .expect("Unexpected interface type");
162
163        InterfaceDerefMut {
164            iface,
165            phantom: PhantomData,
166        }
167    }
168
169    pub fn signal_context(&self) -> &SignalContext<'static> {
170        &self.ctxt
171    }
172}
173
174impl<I> Clone for InterfaceRef<I> {
175    fn clone(&self) -> Self {
176        Self {
177            ctxt: self.ctxt.clone(),
178            lock: self.lock.clone(),
179            phantom: PhantomData,
180        }
181    }
182}
183
184#[derive(Default, Debug)]
185pub(crate) struct Node {
186    path: OwnedObjectPath,
187    children: HashMap<String, Node>,
188    interfaces: HashMap<InterfaceName<'static>, ArcInterface>,
189}
190
191impl Node {
192    pub(crate) fn new(path: OwnedObjectPath) -> Self {
193        let mut node = Self {
194            path,
195            ..Default::default()
196        };
197        assert!(node.add_interface(Peer));
198        assert!(node.add_interface(Introspectable));
199        assert!(node.add_interface(Properties));
200
201        node
202    }
203
204    // Get the child Node at path.
205    pub(crate) fn get_child(&self, path: &ObjectPath<'_>) -> Option<&Node> {
206        let mut node = self;
207
208        for i in path.split('/').skip(1) {
209            if i.is_empty() {
210                continue;
211            }
212            match node.children.get(i) {
213                Some(n) => node = n,
214                None => return None,
215            }
216        }
217
218        Some(node)
219    }
220
221    // Get the child Node at path. Optionally create one if it doesn't exist.
222    // It also returns the path of parent node that implements ObjectManager (if any). If multiple
223    // parents implement it (they shouldn't), then the closest one is returned.
224    fn get_child_mut(
225        &mut self,
226        path: &ObjectPath<'_>,
227        create: bool,
228    ) -> (Option<&mut Node>, Option<ObjectPath<'_>>) {
229        let mut node = self;
230        let mut node_path = String::new();
231        let mut obj_manager_path = None;
232
233        for i in path.split('/').skip(1) {
234            if i.is_empty() {
235                continue;
236            }
237
238            if node.interfaces.contains_key(&ObjectManager::name()) {
239                obj_manager_path = Some((*node.path).clone());
240            }
241
242            write!(&mut node_path, "/{i}").unwrap();
243            match node.children.entry(i.into()) {
244                Entry::Vacant(e) => {
245                    if create {
246                        let path = node_path.as_str().try_into().expect("Invalid Object Path");
247                        node = e.insert(Node::new(path));
248                    } else {
249                        return (None, obj_manager_path);
250                    }
251                }
252                Entry::Occupied(e) => node = e.into_mut(),
253            }
254        }
255
256        (Some(node), obj_manager_path)
257    }
258
259    pub(crate) fn interface_lock(&self, interface_name: InterfaceName<'_>) -> Option<ArcInterface> {
260        self.interfaces.get(&interface_name).cloned()
261    }
262
263    fn remove_interface(&mut self, interface_name: InterfaceName<'static>) -> bool {
264        self.interfaces.remove(&interface_name).is_some()
265    }
266
267    fn is_empty(&self) -> bool {
268        !self.interfaces.keys().any(|k| {
269            *k != Peer::name()
270                && *k != Introspectable::name()
271                && *k != Properties::name()
272                && *k != ObjectManager::name()
273        })
274    }
275
276    fn remove_node(&mut self, node: &str) -> bool {
277        self.children.remove(node).is_some()
278    }
279
280    fn add_arc_interface(&mut self, name: InterfaceName<'static>, arc_iface: ArcInterface) -> bool {
281        match self.interfaces.entry(name) {
282            Entry::Vacant(e) => {
283                e.insert(arc_iface);
284                true
285            }
286            Entry::Occupied(_) => false,
287        }
288    }
289
290    fn add_interface<I>(&mut self, iface: I) -> bool
291    where
292        I: Interface,
293    {
294        self.add_arc_interface(I::name(), ArcInterface::new(iface))
295    }
296
297    async fn introspect_to_writer<W: Write + Send>(&self, writer: &mut W) {
298        enum Fragment<'a> {
299            /// Represent an unclosed node tree, could be further splitted into sub-`Fragment`s
300            Node {
301                name: &'a str,
302                node: &'a Node,
303                level: usize,
304            },
305            /// Represent a closing `</node>`
306            End { level: usize },
307        }
308
309        let mut stack = Vec::new();
310        stack.push(Fragment::Node {
311            name: "",
312            node: self,
313            level: 0,
314        });
315
316        // This can be seen as traversing the fragment tree in pre-order DFS with formatted XML
317        // fragment, splitted `Fragment::Node`s and `Fragment::End` being current node, left
318        // subtree and right leaf respectively.
319        while let Some(fragment) = stack.pop() {
320            match fragment {
321                Fragment::Node { name, node, level } => {
322                    stack.push(Fragment::End { level });
323
324                    for (name, node) in &node.children {
325                        stack.push(Fragment::Node {
326                            name,
327                            node,
328                            level: level + 2,
329                        })
330                    }
331
332                    if level == 0 {
333                        writeln!(
334                            writer,
335                            r#"
336<!DOCTYPE node PUBLIC "-//freedesktop//DTD D-BUS Object Introspection 1.0//EN"
337 "http://www.freedesktop.org/standards/dbus/1.0/introspect.dtd">
338<node>"#
339                        )
340                        .unwrap();
341                    } else {
342                        writeln!(
343                            writer,
344                            "{:indent$}<node name=\"{}\">",
345                            "",
346                            name,
347                            indent = level
348                        )
349                        .unwrap();
350                    }
351
352                    for iface in node.interfaces.values() {
353                        iface
354                            .instance
355                            .read()
356                            .await
357                            .introspect_to_writer(writer, level + 2);
358                    }
359                }
360                Fragment::End { level } => {
361                    writeln!(writer, "{:indent$}</node>", "", indent = level).unwrap();
362                }
363            }
364        }
365    }
366
367    pub(crate) async fn introspect(&self) -> String {
368        let mut xml = String::with_capacity(1024);
369
370        self.introspect_to_writer(&mut xml).await;
371
372        xml
373    }
374
375    pub(crate) async fn get_managed_objects(&self) -> fdo::Result<ManagedObjects> {
376        let mut managed_objects = ManagedObjects::new();
377
378        // Recursively get all properties of all interfaces of descendants.
379        let mut node_list: Vec<_> = self.children.values().collect();
380        while let Some(node) = node_list.pop() {
381            let mut interfaces = HashMap::new();
382            for iface_name in node.interfaces.keys().filter(|n| {
383                // Filter standard interfaces.
384                *n != &Peer::name()
385                    && *n != &Introspectable::name()
386                    && *n != &Properties::name()
387                    && *n != &ObjectManager::name()
388            }) {
389                let props = node.get_properties(iface_name.clone()).await?;
390                interfaces.insert(iface_name.clone().into(), props);
391            }
392            managed_objects.insert(node.path.clone(), interfaces);
393            node_list.extend(node.children.values());
394        }
395
396        Ok(managed_objects)
397    }
398
399    async fn get_properties(
400        &self,
401        interface_name: InterfaceName<'_>,
402    ) -> fdo::Result<HashMap<String, OwnedValue>> {
403        self.interface_lock(interface_name)
404            .expect("Interface was added but not found")
405            .instance
406            .read()
407            .await
408            .get_all()
409            .await
410    }
411}
412
413/// An object server, holding server-side D-Bus objects & interfaces.
414///
415/// Object servers hold interfaces on various object paths, and expose them over D-Bus.
416///
417/// All object paths will have the standard interfaces implemented on your behalf, such as
418/// `org.freedesktop.DBus.Introspectable` or `org.freedesktop.DBus.Properties`.
419///
420/// # Example
421///
422/// This example exposes the `org.myiface.Example.Quit` method on the `/org/zbus/path`
423/// path.
424///
425/// ```no_run
426/// # use std::error::Error;
427/// use zbus::{Connection, interface};
428/// use event_listener::Event;
429/// # use async_io::block_on;
430///
431/// struct Example {
432///     // Interfaces are owned by the ObjectServer. They can have
433///     // `&mut self` methods.
434///     quit_event: Event,
435/// }
436///
437/// impl Example {
438///     fn new(quit_event: Event) -> Self {
439///         Self { quit_event }
440///     }
441/// }
442///
443/// #[interface(name = "org.myiface.Example")]
444/// impl Example {
445///     // This will be the "Quit" D-Bus method.
446///     async fn quit(&mut self) {
447///         self.quit_event.notify(1);
448///     }
449///
450///     // See `interface` documentation to learn
451///     // how to expose properties & signals as well.
452/// }
453///
454/// # block_on(async {
455/// let connection = Connection::session().await?;
456///
457/// let quit_event = Event::new();
458/// let quit_listener = quit_event.listen();
459/// let interface = Example::new(quit_event);
460/// connection
461///     .object_server()
462///     .at("/org/zbus/path", interface)
463///     .await?;
464///
465/// quit_listener.await;
466/// # Ok::<_, Box<dyn Error + Send + Sync>>(())
467/// # })?;
468/// # Ok::<_, Box<dyn Error + Send + Sync>>(())
469/// ```
470#[derive(Debug)]
471pub struct ObjectServer {
472    conn: WeakConnection,
473    root: RwLock<Node>,
474}
475
476assert_impl_all!(ObjectServer: Send, Sync, Unpin);
477
478impl ObjectServer {
479    /// Creates a new D-Bus `ObjectServer`.
480    pub(crate) fn new(conn: &Connection) -> Self {
481        Self {
482            conn: conn.into(),
483            root: RwLock::new(Node::new("/".try_into().expect("zvariant bug"))),
484        }
485    }
486
487    pub(crate) fn root(&self) -> &RwLock<Node> {
488        &self.root
489    }
490
491    /// Register a D-Bus [`Interface`] at a given path. (see the example above)
492    ///
493    /// Typically you'd want your interfaces to be registered immediately after the associated
494    /// connection is established and therefore use [`zbus::connection::Builder::serve_at`] instead.
495    /// However, there are situations where you'd need to register interfaces dynamically and that's
496    /// where this method becomes useful.
497    ///
498    /// If the interface already exists at this path, returns false.
499    pub async fn at<'p, P, I>(&self, path: P, iface: I) -> Result<bool>
500    where
501        I: Interface,
502        P: TryInto<ObjectPath<'p>>,
503        P::Error: Into<Error>,
504    {
505        self.add_arc_interface(path, I::name(), ArcInterface::new(iface))
506            .await
507    }
508
509    pub(crate) async fn add_arc_interface<'p, P>(
510        &self,
511        path: P,
512        name: InterfaceName<'static>,
513        arc_iface: ArcInterface,
514    ) -> Result<bool>
515    where
516        P: TryInto<ObjectPath<'p>>,
517        P::Error: Into<Error>,
518    {
519        let path = path.try_into().map_err(Into::into)?;
520        let mut root = self.root().write().await;
521        let (node, manager_path) = root.get_child_mut(&path, true);
522        let node = node.unwrap();
523        let added = node.add_arc_interface(name.clone(), arc_iface);
524        if added {
525            if name == ObjectManager::name() {
526                // Just added an object manager. Need to signal all managed objects under it.
527                let ctxt = SignalContext::new(&self.connection(), path)?;
528                let objects = node.get_managed_objects().await?;
529                for (path, owned_interfaces) in objects {
530                    let interfaces = owned_interfaces
531                        .iter()
532                        .map(|(i, props)| {
533                            let props = props
534                                .iter()
535                                .map(|(k, v)| Ok((k.as_str(), Value::try_from(v)?)))
536                                .collect::<Result<_>>();
537                            Ok((i.into(), props?))
538                        })
539                        .collect::<Result<_>>()?;
540                    ObjectManager::interfaces_added(&ctxt, &path, &interfaces).await?;
541                }
542            } else if let Some(manager_path) = manager_path {
543                let ctxt = SignalContext::new(&self.connection(), manager_path.clone())?;
544                let mut interfaces = HashMap::new();
545                let owned_props = node.get_properties(name.clone()).await?;
546                let props = owned_props
547                    .iter()
548                    .map(|(k, v)| Ok((k.as_str(), Value::try_from(v)?)))
549                    .collect::<Result<_>>()?;
550                interfaces.insert(name, props);
551
552                ObjectManager::interfaces_added(&ctxt, &path, &interfaces).await?;
553            }
554        }
555
556        Ok(added)
557    }
558
559    /// Unregister a D-Bus [`Interface`] at a given path.
560    ///
561    /// If there are no more interfaces left at that path, destroys the object as well.
562    /// Returns whether the object was destroyed.
563    pub async fn remove<'p, I, P>(&self, path: P) -> Result<bool>
564    where
565        I: Interface,
566        P: TryInto<ObjectPath<'p>>,
567        P::Error: Into<Error>,
568    {
569        let path = path.try_into().map_err(Into::into)?;
570        let mut root = self.root.write().await;
571        let (node, manager_path) = root.get_child_mut(&path, false);
572        let node = node.ok_or(Error::InterfaceNotFound)?;
573        if !node.remove_interface(I::name()) {
574            return Err(Error::InterfaceNotFound);
575        }
576        if let Some(manager_path) = manager_path {
577            let ctxt = SignalContext::new(&self.connection(), manager_path.clone())?;
578            ObjectManager::interfaces_removed(&ctxt, &path, &[I::name()]).await?;
579        }
580        if node.is_empty() {
581            let mut path_parts = path.rsplit('/').filter(|i| !i.is_empty());
582            let last_part = path_parts.next().unwrap();
583            let ppath = ObjectPath::from_string_unchecked(
584                path_parts.fold(String::new(), |a, p| format!("/{p}{a}")),
585            );
586            root.get_child_mut(&ppath, false)
587                .0
588                .unwrap()
589                .remove_node(last_part);
590            return Ok(true);
591        }
592        Ok(false)
593    }
594
595    /// Get the interface at the given path.
596    ///
597    /// # Errors
598    ///
599    /// If the interface is not registered at the given path, `Error::InterfaceNotFound` error is
600    /// returned.
601    ///
602    /// # Examples
603    ///
604    /// The typical use of this is property changes outside of a dispatched handler:
605    ///
606    /// ```no_run
607    /// # use std::error::Error;
608    /// # use zbus::{Connection, interface};
609    /// # use async_io::block_on;
610    /// #
611    /// struct MyIface(u32);
612    ///
613    /// #[interface(name = "org.myiface.MyIface")]
614    /// impl MyIface {
615    ///      #[zbus(property)]
616    ///      async fn count(&self) -> u32 {
617    ///          self.0
618    ///      }
619    /// }
620    ///
621    /// # block_on(async {
622    /// # let connection = Connection::session().await?;
623    /// #
624    /// # let path = "/org/zbus/path";
625    /// # connection.object_server().at(path, MyIface(0)).await?;
626    /// let iface_ref = connection
627    ///     .object_server()
628    ///     .interface::<_, MyIface>(path).await?;
629    /// let mut iface = iface_ref.get_mut().await;
630    /// iface.0 = 42;
631    /// iface.count_changed(iface_ref.signal_context()).await?;
632    /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
633    /// # })?;
634    /// #
635    /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
636    /// ```
637    pub async fn interface<'p, P, I>(&self, path: P) -> Result<InterfaceRef<I>>
638    where
639        I: Interface,
640        P: TryInto<ObjectPath<'p>>,
641        P::Error: Into<Error>,
642    {
643        let path = path.try_into().map_err(Into::into)?;
644        let root = self.root().read().await;
645        let node = root.get_child(&path).ok_or(Error::InterfaceNotFound)?;
646
647        let lock = node
648            .interface_lock(I::name())
649            .ok_or(Error::InterfaceNotFound)?
650            .instance
651            .clone();
652
653        // Ensure what we return can later be dowcasted safely.
654        lock.read()
655            .await
656            .downcast_ref::<I>()
657            .ok_or(Error::InterfaceNotFound)?;
658
659        let conn = self.connection();
660        // SAFETY: We know that there is a valid path on the node as we already converted w/o error.
661        let ctxt = SignalContext::new(&conn, path).unwrap().into_owned();
662
663        Ok(InterfaceRef {
664            ctxt,
665            lock,
666            phantom: PhantomData,
667        })
668    }
669
670    async fn dispatch_call_to_iface(
671        &self,
672        iface: Arc<RwLock<dyn Interface>>,
673        connection: &Connection,
674        msg: &Message,
675        hdr: &Header<'_>,
676    ) -> fdo::Result<()> {
677        let member = hdr
678            .member()
679            .ok_or_else(|| fdo::Error::Failed("Missing member".into()))?;
680        let iface_name = hdr
681            .interface()
682            .ok_or_else(|| fdo::Error::Failed("Missing interface".into()))?;
683
684        trace!("acquiring read lock on interface `{}`", iface_name);
685        let read_lock = iface.read().await;
686        trace!("acquired read lock on interface `{}`", iface_name);
687        match read_lock.call(self, connection, msg, member.as_ref()) {
688            DispatchResult::NotFound => {
689                return Err(fdo::Error::UnknownMethod(format!(
690                    "Unknown method '{member}'"
691                )));
692            }
693            DispatchResult::Async(f) => {
694                return f.await.map_err(|e| match e {
695                    Error::FDO(e) => *e,
696                    e => fdo::Error::Failed(format!("{e}")),
697                });
698            }
699            DispatchResult::RequiresMut => {}
700        }
701        drop(read_lock);
702        trace!("acquiring write lock on interface `{}`", iface_name);
703        let mut write_lock = iface.write().await;
704        trace!("acquired write lock on interface `{}`", iface_name);
705        match write_lock.call_mut(self, connection, msg, member.as_ref()) {
706            DispatchResult::NotFound => {}
707            DispatchResult::RequiresMut => {}
708            DispatchResult::Async(f) => {
709                return f.await.map_err(|e| match e {
710                    Error::FDO(e) => *e,
711                    e => fdo::Error::Failed(format!("{e}")),
712                });
713            }
714        }
715        drop(write_lock);
716        Err(fdo::Error::UnknownMethod(format!(
717            "Unknown method '{member}'"
718        )))
719    }
720
721    async fn dispatch_method_call_try(
722        &self,
723        connection: &Connection,
724        msg: &Message,
725        hdr: &Header<'_>,
726    ) -> fdo::Result<()> {
727        let path = hdr
728            .path()
729            .ok_or_else(|| fdo::Error::Failed("Missing object path".into()))?;
730        let iface_name = hdr
731            .interface()
732            // TODO: In the absence of an INTERFACE field, if two or more interfaces on the same
733            // object have a method with the same name, it is undefined which of those
734            // methods will be invoked. Implementations may choose to either return an
735            // error, or deliver the message as though it had an arbitrary one of those
736            // interfaces.
737            .ok_or_else(|| fdo::Error::Failed("Missing interface".into()))?;
738        // Check that the message has a member before spawning.
739        // Note that an unknown member will still spawn a task. We should instead gather
740        // all the details for the call before spawning.
741        // See also https://github.com/dbus2/zbus/issues/674 for future of Interface.
742        let _ = hdr
743            .member()
744            .ok_or_else(|| fdo::Error::Failed("Missing member".into()))?;
745
746        // Ensure the root lock isn't held while dispatching the message. That
747        // way, the object server can be mutated during that time.
748        let (iface, with_spawn) = {
749            let root = self.root.read().await;
750            let node = root
751                .get_child(path)
752                .ok_or_else(|| fdo::Error::UnknownObject(format!("Unknown object '{path}'")))?;
753
754            let iface = node.interface_lock(iface_name.as_ref()).ok_or_else(|| {
755                fdo::Error::UnknownInterface(format!("Unknown interface '{iface_name}'"))
756            })?;
757            (iface.instance, iface.spawn_tasks_for_methods)
758        };
759
760        if with_spawn {
761            let executor = connection.executor().clone();
762            let task_name = format!("`{msg}` method dispatcher");
763            let connection = connection.clone();
764            let msg = msg.clone();
765            executor
766                .spawn(
767                    async move {
768                        let server = connection.object_server();
769                        let hdr = msg.header();
770                        if let Err(e) = server
771                            .dispatch_call_to_iface(iface, &connection, &msg, &hdr)
772                            .await
773                        {
774                            // When not spawning a task, this error is handled by the caller.
775                            debug!("Returning error: {}", e);
776                            if let Err(e) = connection.reply_dbus_error(&hdr, e).await {
777                                debug!(
778                                    "Error dispatching message. Message: {:?}, error: {:?}",
779                                    msg, e
780                                );
781                            }
782                        }
783                    }
784                    .instrument(trace_span!("{}", task_name)),
785                    &task_name,
786                )
787                .detach();
788            Ok(())
789        } else {
790            self.dispatch_call_to_iface(iface, connection, msg, hdr)
791                .await
792        }
793    }
794
795    /// Dispatch an incoming message to a registered interface.
796    ///
797    /// The object server will handle the message by:
798    ///
799    /// - looking up the called object path & interface,
800    ///
801    /// - calling the associated method if one exists,
802    ///
803    /// - returning a message (responding to the caller with either a return or error message) to
804    ///   the caller through the associated server connection.
805    ///
806    /// Returns an error if the message is malformed.
807    #[instrument(skip(self))]
808    pub(crate) async fn dispatch_call(&self, msg: &Message, hdr: &Header<'_>) -> Result<()> {
809        let conn = self.connection();
810
811        if let Err(e) = self.dispatch_method_call_try(&conn, msg, hdr).await {
812            debug!("Returning error: {}", e);
813            conn.reply_dbus_error(hdr, e).await?;
814        }
815        trace!("Handled: {}", msg);
816
817        Ok(())
818    }
819
820    pub(crate) fn connection(&self) -> Connection {
821        self.conn
822            .upgrade()
823            .expect("ObjectServer can't exist w/o an associated Connection")
824    }
825}
826
827impl From<crate::blocking::ObjectServer> for ObjectServer {
828    fn from(server: crate::blocking::ObjectServer) -> Self {
829        server.into_inner()
830    }
831}
832
833/// A response wrapper that notifies after response has been sent.
834///
835/// Sometimes in [`interface`] method implemenations we need to do some other work after the
836/// response has been sent off. This wrapper type allows us to do that. Instead of returning your
837/// intended response type directly, wrap it in this type and return it from your method. The
838/// returned `EventListener` from `new` method will be notified when the response has been sent.
839///
840/// A typical use case is sending off signals after the response has been sent. The easiest way to
841/// do that is to spawn a task from the method that sends the signal but only after being notified
842/// of the response dispatch.
843///
844/// # Caveats
845///
846/// The notification indicates that the response has been sent off, not that destination peer has
847/// received it. That can only be guaranteed for a peer-to-peer connection.
848///
849/// [`interface`]: crate::interface
850#[derive(Debug)]
851pub struct ResponseDispatchNotifier<R> {
852    response: R,
853    event: Option<Event>,
854}
855
856impl<R> ResponseDispatchNotifier<R> {
857    /// Create a new `NotifyResponse`.
858    pub fn new(response: R) -> (Self, EventListener) {
859        let event = Event::new();
860        let listener = event.listen();
861        (
862            Self {
863                response,
864                event: Some(event),
865            },
866            listener,
867        )
868    }
869
870    /// Get the response.
871    pub fn response(&self) -> &R {
872        &self.response
873    }
874}
875
876impl<R> Serialize for ResponseDispatchNotifier<R>
877where
878    R: Serialize,
879{
880    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
881    where
882        S: serde::Serializer,
883    {
884        self.response.serialize(serializer)
885    }
886}
887
888impl<'de, R> Deserialize<'de> for ResponseDispatchNotifier<R>
889where
890    R: Deserialize<'de>,
891{
892    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
893    where
894        D: serde::Deserializer<'de>,
895    {
896        Ok(Self {
897            response: R::deserialize(deserializer)?,
898            event: None,
899        })
900    }
901}
902
903impl<R> Type for ResponseDispatchNotifier<R>
904where
905    R: Type,
906{
907    fn signature() -> Signature<'static> {
908        R::signature()
909    }
910}
911
912impl<T> Drop for ResponseDispatchNotifier<T> {
913    fn drop(&mut self) {
914        if let Some(event) = self.event.take() {
915            event.notify(usize::MAX);
916        }
917    }
918}