zbus/connection/
socket_reader.rs
1use std::{collections::HashMap, sync::Arc};
2
3use event_listener::Event;
4use tracing::{debug, instrument, trace};
5
6use crate::{
7 async_lock::Mutex, connection::MsgBroadcaster, Executor, Message, OwnedMatchRule, Task,
8};
9
10use super::socket::ReadHalf;
11
12#[derive(Debug)]
13pub(crate) struct SocketReader {
14 socket: Box<dyn ReadHalf>,
15 senders: Arc<Mutex<HashMap<Option<OwnedMatchRule>, MsgBroadcaster>>>,
16 already_received_bytes: Vec<u8>,
17 #[cfg(unix)]
18 already_received_fds: Vec<std::os::fd::OwnedFd>,
19 prev_seq: u64,
20 activity_event: Arc<Event>,
21}
22
23impl SocketReader {
24 pub fn new(
25 socket: Box<dyn ReadHalf>,
26 senders: Arc<Mutex<HashMap<Option<OwnedMatchRule>, MsgBroadcaster>>>,
27 already_received_bytes: Vec<u8>,
28 #[cfg(unix)] already_received_fds: Vec<std::os::fd::OwnedFd>,
29 activity_event: Arc<Event>,
30 ) -> Self {
31 Self {
32 socket,
33 senders,
34 already_received_bytes,
35 #[cfg(unix)]
36 already_received_fds,
37 prev_seq: 0,
38 activity_event,
39 }
40 }
41
42 pub fn spawn(self, executor: &Executor<'_>) -> Task<()> {
43 executor.spawn(self.receive_msg(), "socket reader")
44 }
45
46 #[instrument(name = "socket reader", skip(self))]
48 async fn receive_msg(mut self) {
49 loop {
50 trace!("Waiting for message on the socket..");
51 let msg = self.read_socket().await;
52 match &msg {
53 Ok(msg) => trace!("Message received on the socket: {:?}", msg),
54 Err(e) => trace!("Error reading from the socket: {:?}", e),
55 };
56
57 let mut senders = self.senders.lock().await;
58 for (rule, sender) in &*senders {
59 if let Ok(msg) = &msg {
60 if let Some(rule) = rule.as_ref() {
61 match rule.matches(msg) {
62 Ok(true) => (),
63 Ok(false) => continue,
64 Err(e) => {
65 debug!("Error matching message against rule: {:?}", e);
66
67 continue;
68 }
69 }
70 }
71 }
72
73 if let Err(e) = sender.broadcast_direct(msg.clone()).await {
74 trace!(
81 "Error broadcasting message to stream for `{:?}`: {:?}",
82 rule,
83 e
84 );
85 }
86 }
87 trace!("Broadcasted to all streams: {:?}", msg);
88
89 if msg.is_err() {
90 senders.clear();
91 trace!("Socket reading task stopped");
92
93 return;
94 }
95 }
96 }
97
98 #[instrument]
99 async fn read_socket(&mut self) -> crate::Result<Message> {
100 self.activity_event.notify(usize::MAX);
101 let seq = self.prev_seq + 1;
102 let msg = self
103 .socket
104 .receive_message(
105 seq,
106 &mut self.already_received_bytes,
107 #[cfg(unix)]
108 &mut self.already_received_fds,
109 )
110 .await?;
111 self.prev_seq = seq;
112
113 Ok(msg)
114 }
115}