calloop/sources/ping/
eventfd.rs

1//! Eventfd based implementation of the ping event source.
2//!
3//! # Implementation notes
4//!
5//! The eventfd is a much lighter signalling mechanism provided by the Linux
6//! kernel. Rather than write an arbitrary sequence of bytes, it only has a
7//! 64-bit counter.
8//!
9//! To avoid closing the eventfd early, we wrap it in a RAII-style closer
10//! `CloseOnDrop` in `make_ping()`. When all the senders are dropped, another
11//! wrapper `FlagOnDrop` handles signalling this to the event source, which is
12//! the sole owner of the eventfd itself. The senders have weak references to
13//! the eventfd, and if the source is dropped before the senders, they will
14//! simply not do anything (except log a message).
15//!
16//! To differentiate between regular ping events and close ping events, we add 2
17//! to the counter for regular events and 1 for close events. In the source we
18//! can then check the LSB and if it's set, we know it was a close event. This
19//! only works if a close event never fires more than once.
20
21use std::os::unix::io::{AsFd, BorrowedFd, OwnedFd};
22use std::sync::Arc;
23
24use rustix::event::{eventfd, EventfdFlags};
25use rustix::io::{read, write, Errno};
26
27use super::PingError;
28use crate::{
29    generic::Generic, EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory,
30};
31
32// These are not bitfields! They are increments to add to the eventfd counter.
33// Since the fd can only be closed once, we can effectively use the
34// INCREMENT_CLOSE value as a bitmask when checking.
35const INCREMENT_PING: u64 = 0x2;
36const INCREMENT_CLOSE: u64 = 0x1;
37
38#[inline]
39pub fn make_ping() -> std::io::Result<(Ping, PingSource)> {
40    let read = eventfd(0, EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK)?;
41
42    // We only have one fd for the eventfd. If the sending end closes it when
43    // all copies are dropped, the receiving end will be closed as well. We need
44    // to make sure the fd is not closed until all holders of it have dropped
45    // it.
46
47    let fd = Arc::new(read);
48
49    let ping = Ping {
50        event: Arc::new(FlagOnDrop(Arc::clone(&fd))),
51    };
52
53    let source = PingSource {
54        event: Generic::new(ArcAsFd(fd), Interest::READ, Mode::Level),
55    };
56
57    Ok((ping, source))
58}
59
60// Helper functions for the event source IO.
61
62#[inline]
63fn send_ping(fd: BorrowedFd<'_>, count: u64) -> std::io::Result<()> {
64    assert!(count > 0);
65    match write(fd, &count.to_ne_bytes()) {
66        // The write succeeded, the ping will wake up the loop.
67        Ok(_) => Ok(()),
68
69        // The counter hit its cap, which means previous calls to write() will
70        // wake up the loop.
71        Err(Errno::AGAIN) => Ok(()),
72
73        // Anything else is a real error.
74        Err(e) => Err(e.into()),
75    }
76}
77
78#[inline]
79fn drain_ping(fd: BorrowedFd<'_>) -> std::io::Result<u64> {
80    // The eventfd counter is effectively a u64.
81    const NBYTES: usize = 8;
82    let mut buf = [0u8; NBYTES];
83
84    match read(fd, &mut buf) {
85        // Reading from an eventfd should only ever produce 8 bytes. No looping
86        // is required.
87        Ok(NBYTES) => Ok(u64::from_ne_bytes(buf)),
88
89        Ok(_) => unreachable!(),
90
91        // Any other error can be propagated.
92        Err(e) => Err(e.into()),
93    }
94}
95
96// Rust 1.64.0 adds an `AsFd` implementation for `Arc`, so this won't be needed
97#[derive(Debug)]
98struct ArcAsFd(Arc<OwnedFd>);
99
100impl AsFd for ArcAsFd {
101    fn as_fd(&self) -> BorrowedFd {
102        self.0.as_fd()
103    }
104}
105
106// The event source is simply a generic source with one of the eventfds.
107#[derive(Debug)]
108pub struct PingSource {
109    event: Generic<ArcAsFd>,
110}
111
112impl EventSource for PingSource {
113    type Event = ();
114    type Metadata = ();
115    type Ret = ();
116    type Error = PingError;
117
118    fn process_events<C>(
119        &mut self,
120        readiness: Readiness,
121        token: Token,
122        mut callback: C,
123    ) -> Result<PostAction, Self::Error>
124    where
125        C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
126    {
127        self.event
128            .process_events(readiness, token, |_, fd| {
129                let counter = drain_ping(fd.as_fd())?;
130
131                // If the LSB is set, it means we were closed. If anything else
132                // is also set, it means we were pinged. The two are not
133                // mutually exclusive.
134                let close = (counter & INCREMENT_CLOSE) != 0;
135                let ping = (counter & (u64::MAX - 1)) != 0;
136
137                if ping {
138                    callback((), &mut ());
139                }
140
141                if close {
142                    Ok(PostAction::Remove)
143                } else {
144                    Ok(PostAction::Continue)
145                }
146            })
147            .map_err(|e| PingError(e.into()))
148    }
149
150    fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
151        self.event.register(poll, token_factory)
152    }
153
154    fn reregister(
155        &mut self,
156        poll: &mut Poll,
157        token_factory: &mut TokenFactory,
158    ) -> crate::Result<()> {
159        self.event.reregister(poll, token_factory)
160    }
161
162    fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
163        self.event.unregister(poll)
164    }
165}
166
167#[derive(Clone, Debug)]
168pub struct Ping {
169    // This is an Arc because it's potentially shared with clones. The last one
170    // dropped needs to signal to the event source via the eventfd.
171    event: Arc<FlagOnDrop>,
172}
173
174impl Ping {
175    /// Send a ping to the `PingSource`.
176    pub fn ping(&self) {
177        if let Err(e) = send_ping(self.event.0.as_fd(), INCREMENT_PING) {
178            log::warn!("[calloop] Failed to write a ping: {:?}", e);
179        }
180    }
181}
182
183/// This manages signalling to the PingSource when it's dropped. There should
184/// only ever be one of these per PingSource.
185#[derive(Debug)]
186struct FlagOnDrop(Arc<OwnedFd>);
187
188impl Drop for FlagOnDrop {
189    fn drop(&mut self) {
190        if let Err(e) = send_ping(self.0.as_fd(), INCREMENT_CLOSE) {
191            log::warn!("[calloop] Failed to send close ping: {:?}", e);
192        }
193    }
194}