1use event_listener::{Event, EventListener};
4use serde::{Deserialize, Serialize};
5use std::{
6 collections::{hash_map::Entry, HashMap},
7 fmt::Write,
8 marker::PhantomData,
9 ops::{Deref, DerefMut},
10 sync::Arc,
11};
12use tracing::{debug, instrument, trace, trace_span, Instrument};
13
14use static_assertions::assert_impl_all;
15use zbus_names::InterfaceName;
16use zvariant::{ObjectPath, OwnedObjectPath, OwnedValue, Signature, Type, Value};
17
18use crate::{
19 async_lock::{RwLock, RwLockReadGuard, RwLockWriteGuard},
20 connection::WeakConnection,
21 fdo,
22 fdo::{Introspectable, ManagedObjects, ObjectManager, Peer, Properties},
23 message::{Header, Message},
24 Connection, Error, Result,
25};
26
27mod interface;
28pub(crate) use interface::ArcInterface;
29pub use interface::{DispatchResult, Interface};
30
31mod signal_context;
32pub use signal_context::SignalContext;
33
34pub struct InterfaceDeref<'d, I> {
36 iface: RwLockReadGuard<'d, dyn Interface>,
37 phantom: PhantomData<I>,
38}
39
40impl<I> Deref for InterfaceDeref<'_, I>
41where
42 I: Interface,
43{
44 type Target = I;
45
46 fn deref(&self) -> &I {
47 self.iface.downcast_ref::<I>().unwrap()
48 }
49}
50
51pub struct InterfaceDerefMut<'d, I> {
53 iface: RwLockWriteGuard<'d, dyn Interface>,
54 phantom: PhantomData<I>,
55}
56
57impl<I> Deref for InterfaceDerefMut<'_, I>
58where
59 I: Interface,
60{
61 type Target = I;
62
63 fn deref(&self) -> &I {
64 self.iface.downcast_ref::<I>().unwrap()
65 }
66}
67
68impl<I> DerefMut for InterfaceDerefMut<'_, I>
69where
70 I: Interface,
71{
72 fn deref_mut(&mut self) -> &mut Self::Target {
73 self.iface.downcast_mut::<I>().unwrap()
74 }
75}
76
77pub struct InterfaceRef<I> {
81 ctxt: SignalContext<'static>,
82 lock: Arc<RwLock<dyn Interface>>,
83 phantom: PhantomData<I>,
84}
85
86impl<I> InterfaceRef<I>
87where
88 I: 'static,
89{
90 pub async fn get(&self) -> InterfaceDeref<'_, I> {
97 let iface = self.lock.read().await;
98
99 iface
100 .downcast_ref::<I>()
101 .expect("Unexpected interface type");
102
103 InterfaceDeref {
104 iface,
105 phantom: PhantomData,
106 }
107 }
108
109 pub async fn get_mut(&self) -> InterfaceDerefMut<'_, I> {
154 let mut iface = self.lock.write().await;
155
156 iface
157 .downcast_ref::<I>()
158 .expect("Unexpected interface type");
159 iface
160 .downcast_mut::<I>()
161 .expect("Unexpected interface type");
162
163 InterfaceDerefMut {
164 iface,
165 phantom: PhantomData,
166 }
167 }
168
169 pub fn signal_context(&self) -> &SignalContext<'static> {
170 &self.ctxt
171 }
172}
173
174impl<I> Clone for InterfaceRef<I> {
175 fn clone(&self) -> Self {
176 Self {
177 ctxt: self.ctxt.clone(),
178 lock: self.lock.clone(),
179 phantom: PhantomData,
180 }
181 }
182}
183
184#[derive(Default, Debug)]
185pub(crate) struct Node {
186 path: OwnedObjectPath,
187 children: HashMap<String, Node>,
188 interfaces: HashMap<InterfaceName<'static>, ArcInterface>,
189}
190
191impl Node {
192 pub(crate) fn new(path: OwnedObjectPath) -> Self {
193 let mut node = Self {
194 path,
195 ..Default::default()
196 };
197 assert!(node.add_interface(Peer));
198 assert!(node.add_interface(Introspectable));
199 assert!(node.add_interface(Properties));
200
201 node
202 }
203
204 pub(crate) fn get_child(&self, path: &ObjectPath<'_>) -> Option<&Node> {
206 let mut node = self;
207
208 for i in path.split('/').skip(1) {
209 if i.is_empty() {
210 continue;
211 }
212 match node.children.get(i) {
213 Some(n) => node = n,
214 None => return None,
215 }
216 }
217
218 Some(node)
219 }
220
221 fn get_child_mut(
225 &mut self,
226 path: &ObjectPath<'_>,
227 create: bool,
228 ) -> (Option<&mut Node>, Option<ObjectPath<'_>>) {
229 let mut node = self;
230 let mut node_path = String::new();
231 let mut obj_manager_path = None;
232
233 for i in path.split('/').skip(1) {
234 if i.is_empty() {
235 continue;
236 }
237
238 if node.interfaces.contains_key(&ObjectManager::name()) {
239 obj_manager_path = Some((*node.path).clone());
240 }
241
242 write!(&mut node_path, "/{i}").unwrap();
243 match node.children.entry(i.into()) {
244 Entry::Vacant(e) => {
245 if create {
246 let path = node_path.as_str().try_into().expect("Invalid Object Path");
247 node = e.insert(Node::new(path));
248 } else {
249 return (None, obj_manager_path);
250 }
251 }
252 Entry::Occupied(e) => node = e.into_mut(),
253 }
254 }
255
256 (Some(node), obj_manager_path)
257 }
258
259 pub(crate) fn interface_lock(&self, interface_name: InterfaceName<'_>) -> Option<ArcInterface> {
260 self.interfaces.get(&interface_name).cloned()
261 }
262
263 fn remove_interface(&mut self, interface_name: InterfaceName<'static>) -> bool {
264 self.interfaces.remove(&interface_name).is_some()
265 }
266
267 fn is_empty(&self) -> bool {
268 !self.interfaces.keys().any(|k| {
269 *k != Peer::name()
270 && *k != Introspectable::name()
271 && *k != Properties::name()
272 && *k != ObjectManager::name()
273 })
274 }
275
276 fn remove_node(&mut self, node: &str) -> bool {
277 self.children.remove(node).is_some()
278 }
279
280 fn add_arc_interface(&mut self, name: InterfaceName<'static>, arc_iface: ArcInterface) -> bool {
281 match self.interfaces.entry(name) {
282 Entry::Vacant(e) => {
283 e.insert(arc_iface);
284 true
285 }
286 Entry::Occupied(_) => false,
287 }
288 }
289
290 fn add_interface<I>(&mut self, iface: I) -> bool
291 where
292 I: Interface,
293 {
294 self.add_arc_interface(I::name(), ArcInterface::new(iface))
295 }
296
297 async fn introspect_to_writer<W: Write + Send>(&self, writer: &mut W) {
298 enum Fragment<'a> {
299 Node {
301 name: &'a str,
302 node: &'a Node,
303 level: usize,
304 },
305 End { level: usize },
307 }
308
309 let mut stack = Vec::new();
310 stack.push(Fragment::Node {
311 name: "",
312 node: self,
313 level: 0,
314 });
315
316 while let Some(fragment) = stack.pop() {
320 match fragment {
321 Fragment::Node { name, node, level } => {
322 stack.push(Fragment::End { level });
323
324 for (name, node) in &node.children {
325 stack.push(Fragment::Node {
326 name,
327 node,
328 level: level + 2,
329 })
330 }
331
332 if level == 0 {
333 writeln!(
334 writer,
335 r#"
336<!DOCTYPE node PUBLIC "-//freedesktop//DTD D-BUS Object Introspection 1.0//EN"
337 "http://www.freedesktop.org/standards/dbus/1.0/introspect.dtd">
338<node>"#
339 )
340 .unwrap();
341 } else {
342 writeln!(
343 writer,
344 "{:indent$}<node name=\"{}\">",
345 "",
346 name,
347 indent = level
348 )
349 .unwrap();
350 }
351
352 for iface in node.interfaces.values() {
353 iface
354 .instance
355 .read()
356 .await
357 .introspect_to_writer(writer, level + 2);
358 }
359 }
360 Fragment::End { level } => {
361 writeln!(writer, "{:indent$}</node>", "", indent = level).unwrap();
362 }
363 }
364 }
365 }
366
367 pub(crate) async fn introspect(&self) -> String {
368 let mut xml = String::with_capacity(1024);
369
370 self.introspect_to_writer(&mut xml).await;
371
372 xml
373 }
374
375 pub(crate) async fn get_managed_objects(&self) -> fdo::Result<ManagedObjects> {
376 let mut managed_objects = ManagedObjects::new();
377
378 let mut node_list: Vec<_> = self.children.values().collect();
380 while let Some(node) = node_list.pop() {
381 let mut interfaces = HashMap::new();
382 for iface_name in node.interfaces.keys().filter(|n| {
383 *n != &Peer::name()
385 && *n != &Introspectable::name()
386 && *n != &Properties::name()
387 && *n != &ObjectManager::name()
388 }) {
389 let props = node.get_properties(iface_name.clone()).await?;
390 interfaces.insert(iface_name.clone().into(), props);
391 }
392 managed_objects.insert(node.path.clone(), interfaces);
393 node_list.extend(node.children.values());
394 }
395
396 Ok(managed_objects)
397 }
398
399 async fn get_properties(
400 &self,
401 interface_name: InterfaceName<'_>,
402 ) -> fdo::Result<HashMap<String, OwnedValue>> {
403 self.interface_lock(interface_name)
404 .expect("Interface was added but not found")
405 .instance
406 .read()
407 .await
408 .get_all()
409 .await
410 }
411}
412
413#[derive(Debug)]
471pub struct ObjectServer {
472 conn: WeakConnection,
473 root: RwLock<Node>,
474}
475
476assert_impl_all!(ObjectServer: Send, Sync, Unpin);
477
478impl ObjectServer {
479 pub(crate) fn new(conn: &Connection) -> Self {
481 Self {
482 conn: conn.into(),
483 root: RwLock::new(Node::new("/".try_into().expect("zvariant bug"))),
484 }
485 }
486
487 pub(crate) fn root(&self) -> &RwLock<Node> {
488 &self.root
489 }
490
491 pub async fn at<'p, P, I>(&self, path: P, iface: I) -> Result<bool>
500 where
501 I: Interface,
502 P: TryInto<ObjectPath<'p>>,
503 P::Error: Into<Error>,
504 {
505 self.add_arc_interface(path, I::name(), ArcInterface::new(iface))
506 .await
507 }
508
509 pub(crate) async fn add_arc_interface<'p, P>(
510 &self,
511 path: P,
512 name: InterfaceName<'static>,
513 arc_iface: ArcInterface,
514 ) -> Result<bool>
515 where
516 P: TryInto<ObjectPath<'p>>,
517 P::Error: Into<Error>,
518 {
519 let path = path.try_into().map_err(Into::into)?;
520 let mut root = self.root().write().await;
521 let (node, manager_path) = root.get_child_mut(&path, true);
522 let node = node.unwrap();
523 let added = node.add_arc_interface(name.clone(), arc_iface);
524 if added {
525 if name == ObjectManager::name() {
526 let ctxt = SignalContext::new(&self.connection(), path)?;
528 let objects = node.get_managed_objects().await?;
529 for (path, owned_interfaces) in objects {
530 let interfaces = owned_interfaces
531 .iter()
532 .map(|(i, props)| {
533 let props = props
534 .iter()
535 .map(|(k, v)| Ok((k.as_str(), Value::try_from(v)?)))
536 .collect::<Result<_>>();
537 Ok((i.into(), props?))
538 })
539 .collect::<Result<_>>()?;
540 ObjectManager::interfaces_added(&ctxt, &path, &interfaces).await?;
541 }
542 } else if let Some(manager_path) = manager_path {
543 let ctxt = SignalContext::new(&self.connection(), manager_path.clone())?;
544 let mut interfaces = HashMap::new();
545 let owned_props = node.get_properties(name.clone()).await?;
546 let props = owned_props
547 .iter()
548 .map(|(k, v)| Ok((k.as_str(), Value::try_from(v)?)))
549 .collect::<Result<_>>()?;
550 interfaces.insert(name, props);
551
552 ObjectManager::interfaces_added(&ctxt, &path, &interfaces).await?;
553 }
554 }
555
556 Ok(added)
557 }
558
559 pub async fn remove<'p, I, P>(&self, path: P) -> Result<bool>
564 where
565 I: Interface,
566 P: TryInto<ObjectPath<'p>>,
567 P::Error: Into<Error>,
568 {
569 let path = path.try_into().map_err(Into::into)?;
570 let mut root = self.root.write().await;
571 let (node, manager_path) = root.get_child_mut(&path, false);
572 let node = node.ok_or(Error::InterfaceNotFound)?;
573 if !node.remove_interface(I::name()) {
574 return Err(Error::InterfaceNotFound);
575 }
576 if let Some(manager_path) = manager_path {
577 let ctxt = SignalContext::new(&self.connection(), manager_path.clone())?;
578 ObjectManager::interfaces_removed(&ctxt, &path, &[I::name()]).await?;
579 }
580 if node.is_empty() {
581 let mut path_parts = path.rsplit('/').filter(|i| !i.is_empty());
582 let last_part = path_parts.next().unwrap();
583 let ppath = ObjectPath::from_string_unchecked(
584 path_parts.fold(String::new(), |a, p| format!("/{p}{a}")),
585 );
586 root.get_child_mut(&ppath, false)
587 .0
588 .unwrap()
589 .remove_node(last_part);
590 return Ok(true);
591 }
592 Ok(false)
593 }
594
595 pub async fn interface<'p, P, I>(&self, path: P) -> Result<InterfaceRef<I>>
638 where
639 I: Interface,
640 P: TryInto<ObjectPath<'p>>,
641 P::Error: Into<Error>,
642 {
643 let path = path.try_into().map_err(Into::into)?;
644 let root = self.root().read().await;
645 let node = root.get_child(&path).ok_or(Error::InterfaceNotFound)?;
646
647 let lock = node
648 .interface_lock(I::name())
649 .ok_or(Error::InterfaceNotFound)?
650 .instance
651 .clone();
652
653 lock.read()
655 .await
656 .downcast_ref::<I>()
657 .ok_or(Error::InterfaceNotFound)?;
658
659 let conn = self.connection();
660 let ctxt = SignalContext::new(&conn, path).unwrap().into_owned();
662
663 Ok(InterfaceRef {
664 ctxt,
665 lock,
666 phantom: PhantomData,
667 })
668 }
669
670 async fn dispatch_call_to_iface(
671 &self,
672 iface: Arc<RwLock<dyn Interface>>,
673 connection: &Connection,
674 msg: &Message,
675 hdr: &Header<'_>,
676 ) -> fdo::Result<()> {
677 let member = hdr
678 .member()
679 .ok_or_else(|| fdo::Error::Failed("Missing member".into()))?;
680 let iface_name = hdr
681 .interface()
682 .ok_or_else(|| fdo::Error::Failed("Missing interface".into()))?;
683
684 trace!("acquiring read lock on interface `{}`", iface_name);
685 let read_lock = iface.read().await;
686 trace!("acquired read lock on interface `{}`", iface_name);
687 match read_lock.call(self, connection, msg, member.as_ref()) {
688 DispatchResult::NotFound => {
689 return Err(fdo::Error::UnknownMethod(format!(
690 "Unknown method '{member}'"
691 )));
692 }
693 DispatchResult::Async(f) => {
694 return f.await.map_err(|e| match e {
695 Error::FDO(e) => *e,
696 e => fdo::Error::Failed(format!("{e}")),
697 });
698 }
699 DispatchResult::RequiresMut => {}
700 }
701 drop(read_lock);
702 trace!("acquiring write lock on interface `{}`", iface_name);
703 let mut write_lock = iface.write().await;
704 trace!("acquired write lock on interface `{}`", iface_name);
705 match write_lock.call_mut(self, connection, msg, member.as_ref()) {
706 DispatchResult::NotFound => {}
707 DispatchResult::RequiresMut => {}
708 DispatchResult::Async(f) => {
709 return f.await.map_err(|e| match e {
710 Error::FDO(e) => *e,
711 e => fdo::Error::Failed(format!("{e}")),
712 });
713 }
714 }
715 drop(write_lock);
716 Err(fdo::Error::UnknownMethod(format!(
717 "Unknown method '{member}'"
718 )))
719 }
720
721 async fn dispatch_method_call_try(
722 &self,
723 connection: &Connection,
724 msg: &Message,
725 hdr: &Header<'_>,
726 ) -> fdo::Result<()> {
727 let path = hdr
728 .path()
729 .ok_or_else(|| fdo::Error::Failed("Missing object path".into()))?;
730 let iface_name = hdr
731 .interface()
732 .ok_or_else(|| fdo::Error::Failed("Missing interface".into()))?;
738 let _ = hdr
743 .member()
744 .ok_or_else(|| fdo::Error::Failed("Missing member".into()))?;
745
746 let (iface, with_spawn) = {
749 let root = self.root.read().await;
750 let node = root
751 .get_child(path)
752 .ok_or_else(|| fdo::Error::UnknownObject(format!("Unknown object '{path}'")))?;
753
754 let iface = node.interface_lock(iface_name.as_ref()).ok_or_else(|| {
755 fdo::Error::UnknownInterface(format!("Unknown interface '{iface_name}'"))
756 })?;
757 (iface.instance, iface.spawn_tasks_for_methods)
758 };
759
760 if with_spawn {
761 let executor = connection.executor().clone();
762 let task_name = format!("`{msg}` method dispatcher");
763 let connection = connection.clone();
764 let msg = msg.clone();
765 executor
766 .spawn(
767 async move {
768 let server = connection.object_server();
769 let hdr = msg.header();
770 if let Err(e) = server
771 .dispatch_call_to_iface(iface, &connection, &msg, &hdr)
772 .await
773 {
774 debug!("Returning error: {}", e);
776 if let Err(e) = connection.reply_dbus_error(&hdr, e).await {
777 debug!(
778 "Error dispatching message. Message: {:?}, error: {:?}",
779 msg, e
780 );
781 }
782 }
783 }
784 .instrument(trace_span!("{}", task_name)),
785 &task_name,
786 )
787 .detach();
788 Ok(())
789 } else {
790 self.dispatch_call_to_iface(iface, connection, msg, hdr)
791 .await
792 }
793 }
794
795 #[instrument(skip(self))]
808 pub(crate) async fn dispatch_call(&self, msg: &Message, hdr: &Header<'_>) -> Result<()> {
809 let conn = self.connection();
810
811 if let Err(e) = self.dispatch_method_call_try(&conn, msg, hdr).await {
812 debug!("Returning error: {}", e);
813 conn.reply_dbus_error(hdr, e).await?;
814 }
815 trace!("Handled: {}", msg);
816
817 Ok(())
818 }
819
820 pub(crate) fn connection(&self) -> Connection {
821 self.conn
822 .upgrade()
823 .expect("ObjectServer can't exist w/o an associated Connection")
824 }
825}
826
827impl From<crate::blocking::ObjectServer> for ObjectServer {
828 fn from(server: crate::blocking::ObjectServer) -> Self {
829 server.into_inner()
830 }
831}
832
833#[derive(Debug)]
851pub struct ResponseDispatchNotifier<R> {
852 response: R,
853 event: Option<Event>,
854}
855
856impl<R> ResponseDispatchNotifier<R> {
857 pub fn new(response: R) -> (Self, EventListener) {
859 let event = Event::new();
860 let listener = event.listen();
861 (
862 Self {
863 response,
864 event: Some(event),
865 },
866 listener,
867 )
868 }
869
870 pub fn response(&self) -> &R {
872 &self.response
873 }
874}
875
876impl<R> Serialize for ResponseDispatchNotifier<R>
877where
878 R: Serialize,
879{
880 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
881 where
882 S: serde::Serializer,
883 {
884 self.response.serialize(serializer)
885 }
886}
887
888impl<'de, R> Deserialize<'de> for ResponseDispatchNotifier<R>
889where
890 R: Deserialize<'de>,
891{
892 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
893 where
894 D: serde::Deserializer<'de>,
895 {
896 Ok(Self {
897 response: R::deserialize(deserializer)?,
898 event: None,
899 })
900 }
901}
902
903impl<R> Type for ResponseDispatchNotifier<R>
904where
905 R: Type,
906{
907 fn signature() -> Signature<'static> {
908 R::signature()
909 }
910}
911
912impl<T> Drop for ResponseDispatchNotifier<T> {
913 fn drop(&mut self) {
914 if let Some(event) = self.event.take() {
915 event.notify(usize::MAX);
916 }
917 }
918}