zbus/
message_stream.rs

1use std::{
2    pin::Pin,
3    sync::Arc,
4    task::{Context, Poll},
5};
6
7use async_broadcast::Receiver as ActiveReceiver;
8use futures_core::stream;
9use futures_util::stream::FusedStream;
10use ordered_stream::{OrderedStream, PollResult};
11use static_assertions::assert_impl_all;
12use tracing::warn;
13
14use crate::{
15    connection::ConnectionInner,
16    message::{Message, Sequence},
17    AsyncDrop, Connection, MatchRule, OwnedMatchRule, Result,
18};
19
20/// A [`stream::Stream`] implementation that yields [`Message`] items.
21///
22/// You can convert a [`Connection`] to this type and back to [`Connection`].
23///
24/// **NOTE**: You must ensure a `MessageStream` is continuously polled or you will experience hangs.
25/// If you don't need to continuously poll the `MessageStream` but need to keep it around for later
26/// use, keep the connection around and convert it into a `MessageStream` when needed. The
27/// conversion is not an expensive operation so you don't need to  worry about performance, unless
28/// you do it very frequently. If you need to convert back and forth frequently, you may want to
29/// consider keeping both a connection and stream around.
30#[derive(Clone, Debug)]
31#[must_use = "streams do nothing unless polled"]
32pub struct MessageStream {
33    inner: Inner,
34}
35
36assert_impl_all!(MessageStream: Send, Sync, Unpin);
37
38impl MessageStream {
39    /// Create a message stream for the given match rule.
40    ///
41    /// If `conn` is a bus connection and match rule is for a signal, the match rule will be
42    /// registered with the bus and queued for deregistration when the stream is dropped. If you'd
43    /// like immediate deregistration, use [`AsyncDrop::async_drop`]. The reason match rules are
44    /// only registered with the bus for signals is that D-Bus specification only allows signals to
45    /// be broadcasted and unicast messages are always sent to their destination (regardless of any
46    /// match rules registered by the destination) by the bus. Hence there is no need to register
47    /// match rules for non-signal messages with the bus.
48    ///
49    /// Having said that, stream created by this method can still very useful as it allows you to
50    /// avoid needless task wakeups and simplify your stream consuming code.
51    ///
52    /// You can optionally also request the capacity of the underlying message queue through
53    /// `max_queued`. If specified, the capacity is guaranteed to be at least `max_queued`. If not
54    /// specified, the default of 64 is assumed. The capacity can also be changed later through
55    /// [`MessageStream::set_max_queued`].
56    ///
57    /// # Example
58    ///
59    /// ```
60    /// use async_io::Timer;
61    /// use zbus::{AsyncDrop, Connection, MatchRule, MessageStream, fdo::NameOwnerChanged};
62    /// use futures_util::{TryStreamExt, future::select, future::Either::{Left, Right}, pin_mut};
63    ///
64    /// # zbus::block_on(async {
65    /// let conn = Connection::session().await?;
66    /// let rule = MatchRule::builder()
67    ///     .msg_type(zbus::message::Type::Signal)
68    ///     .sender("org.freedesktop.DBus")?
69    ///     .interface("org.freedesktop.DBus")?
70    ///     .member("NameOwnerChanged")?
71    ///     .add_arg("org.freedesktop.zbus.MatchRuleStreamTest42")?
72    ///     .build();
73    /// let mut stream = MessageStream::for_match_rule(
74    ///     rule,
75    ///     &conn,
76    ///     // For such a specific match rule, we don't need a big queue.
77    ///     Some(1),
78    /// ).await?;
79    ///
80    /// let rule_str = "type='signal',sender='org.freedesktop.DBus',\
81    ///                 interface='org.freedesktop.DBus',member='NameOwnerChanged',\
82    ///                 arg0='org.freedesktop.zbus.MatchRuleStreamTest42'";
83    /// assert_eq!(
84    ///     stream.match_rule().map(|r| r.to_string()).as_deref(),
85    ///     Some(rule_str),
86    /// );
87    ///
88    /// // We register 2 names, starting with the uninteresting one. If `stream` wasn't filtering
89    /// // messages based on the match rule, we'd receive method return call for each of these 2
90    /// // calls first.
91    /// //
92    /// // Note that the `NameOwnerChanged` signal will not be sent by the bus  for the first name
93    /// // we register since we setup an arg filter.
94    /// conn.request_name("org.freedesktop.zbus.MatchRuleStreamTest44")
95    ///     .await?;
96    /// conn.request_name("org.freedesktop.zbus.MatchRuleStreamTest42")
97    ///     .await?;
98    ///
99    /// let msg = stream.try_next().await?.unwrap();
100    /// let signal = NameOwnerChanged::from_message(msg).unwrap();
101    /// assert_eq!(signal.args()?.name(), "org.freedesktop.zbus.MatchRuleStreamTest42");
102    /// stream.async_drop().await;
103    ///
104    /// // Ensure the match rule is deregistered and this connection doesn't receive
105    /// // `NameOwnerChanged` signals.
106    /// let stream = MessageStream::from(&conn).try_filter_map(|msg| async move {
107    ///     Ok(NameOwnerChanged::from_message(msg))
108    /// });
109    /// conn.release_name("org.freedesktop.zbus.MatchRuleStreamTest42").await?;
110    ///
111    /// pin_mut!(stream);
112    /// let next = stream.try_next();
113    /// pin_mut!(next);
114    /// let timeout = Timer::after(std::time::Duration::from_millis(50));
115    /// pin_mut!(timeout);
116    /// match select(next, timeout).await {
117    ///    Left((msg, _)) => unreachable!("unexpected message: {:?}", msg),
118    ///    Right((_, _)) => (),
119    /// }
120    ///
121    /// # Ok::<(), zbus::Error>(())
122    /// # }).unwrap();
123    /// ```
124    ///
125    /// # Caveats
126    ///
127    /// Since this method relies on [`MatchRule::matches`], it inherits its caveats.
128    pub async fn for_match_rule<R>(
129        rule: R,
130        conn: &Connection,
131        max_queued: Option<usize>,
132    ) -> Result<Self>
133    where
134        R: TryInto<OwnedMatchRule>,
135        R::Error: Into<crate::Error>,
136    {
137        let rule = rule.try_into().map_err(Into::into)?;
138        let msg_receiver = conn.add_match(rule.clone(), max_queued).await?;
139
140        Ok(Self::for_subscription_channel(
141            msg_receiver,
142            Some(rule),
143            conn,
144        ))
145    }
146
147    /// The associated match rule, if any.
148    pub fn match_rule(&self) -> Option<MatchRule<'_>> {
149        self.inner.match_rule.as_deref().cloned()
150    }
151
152    /// The maximum number of messages to queue for this stream.
153    pub fn max_queued(&self) -> usize {
154        self.inner.msg_receiver.capacity()
155    }
156
157    /// Set maximum number of messages to queue for this stream.
158    ///
159    /// After this call, the capacity is guaranteed to be at least `max_queued`.
160    pub fn set_max_queued(&mut self, max_queued: usize) {
161        if max_queued <= self.max_queued() {
162            return;
163        }
164        self.inner.msg_receiver.set_capacity(max_queued);
165    }
166
167    pub(crate) fn for_subscription_channel(
168        msg_receiver: ActiveReceiver<Result<Message>>,
169        rule: Option<OwnedMatchRule>,
170        conn: &Connection,
171    ) -> Self {
172        let conn_inner = conn.inner.clone();
173
174        Self {
175            inner: Inner {
176                conn_inner,
177                msg_receiver,
178                match_rule: rule,
179            },
180        }
181    }
182}
183
184impl stream::Stream for MessageStream {
185    type Item = Result<Message>;
186
187    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
188        let this = self.get_mut();
189
190        Pin::new(&mut this.inner.msg_receiver).poll_next(cx)
191    }
192}
193
194impl OrderedStream for MessageStream {
195    type Data = Result<Message>;
196    type Ordering = Sequence;
197
198    fn poll_next_before(
199        self: Pin<&mut Self>,
200        cx: &mut Context<'_>,
201        before: Option<&Self::Ordering>,
202    ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
203        let this = self.get_mut();
204
205        match stream::Stream::poll_next(Pin::new(this), cx) {
206            Poll::Pending if before.is_some() => {
207                // Assume the provided Sequence in before was obtained from a Message
208                // associated with our Connection (because that's the only supported use case).
209                // Because there is only one socket-reader task, any messages that would have been
210                // ordered before that message would have already been sitting in the broadcast
211                // queue (and we would have seen Ready in our poll).  Because we didn't, we can
212                // guarantee that we won't ever produce a message whose sequence is before that
213                // provided value, and so we can return NoneBefore.
214                //
215                // This ensures that ordered_stream::Join will never return Pending while it
216                // has a message buffered.
217                Poll::Ready(PollResult::NoneBefore)
218            }
219            Poll::Pending => Poll::Pending,
220            Poll::Ready(Some(Ok(msg))) => Poll::Ready(PollResult::Item {
221                ordering: msg.recv_position(),
222                data: Ok(msg),
223            }),
224            Poll::Ready(Some(Err(e))) => Poll::Ready(PollResult::Item {
225                ordering: Sequence::LAST,
226                data: Err(e),
227            }),
228            Poll::Ready(None) => Poll::Ready(PollResult::Terminated),
229        }
230    }
231}
232
233impl FusedStream for MessageStream {
234    fn is_terminated(&self) -> bool {
235        self.inner.msg_receiver.is_terminated()
236    }
237}
238
239impl From<Connection> for MessageStream {
240    fn from(conn: Connection) -> Self {
241        let conn_inner = conn.inner;
242        let msg_receiver = conn_inner.msg_receiver.activate_cloned();
243
244        Self {
245            inner: Inner {
246                conn_inner,
247                msg_receiver,
248                match_rule: None,
249            },
250        }
251    }
252}
253
254impl From<&Connection> for MessageStream {
255    fn from(conn: &Connection) -> Self {
256        Self::from(conn.clone())
257    }
258}
259
260impl From<MessageStream> for Connection {
261    fn from(stream: MessageStream) -> Connection {
262        Connection::from(&stream)
263    }
264}
265
266impl From<&MessageStream> for Connection {
267    fn from(stream: &MessageStream) -> Connection {
268        Connection {
269            inner: stream.inner.conn_inner.clone(),
270        }
271    }
272}
273
274#[derive(Clone, Debug)]
275struct Inner {
276    conn_inner: Arc<ConnectionInner>,
277    msg_receiver: ActiveReceiver<Result<Message>>,
278    match_rule: Option<OwnedMatchRule>,
279}
280
281impl Drop for Inner {
282    fn drop(&mut self) {
283        let conn = Connection {
284            inner: self.conn_inner.clone(),
285        };
286
287        if let Some(rule) = self.match_rule.take() {
288            conn.queue_remove_match(rule);
289        }
290    }
291}
292
293#[async_trait::async_trait]
294impl AsyncDrop for MessageStream {
295    async fn async_drop(mut self) {
296        let conn = Connection {
297            inner: self.inner.conn_inner.clone(),
298        };
299
300        if let Some(rule) = self.inner.match_rule.take() {
301            if let Err(e) = conn.remove_match(rule).await {
302                warn!("Failed to remove match rule: {}", e);
303            }
304        }
305    }
306}