polling/
epoll.rs

1//! Bindings to epoll (Linux, Android).
2
3use std::io;
4use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
5use std::time::{Duration, Instant};
6
7#[cfg(not(target_os = "redox"))]
8use rustix::event::{eventfd, EventfdFlags};
9#[cfg(not(target_os = "redox"))]
10use rustix::time::{
11    timerfd_create, timerfd_settime, Itimerspec, TimerfdClockId, TimerfdFlags, TimerfdTimerFlags,
12};
13
14use rustix::buffer::spare_capacity;
15use rustix::event::{epoll, Timespec};
16use rustix::fd::OwnedFd;
17use rustix::fs::{fcntl_getfl, fcntl_setfl, OFlags};
18use rustix::io::{fcntl_getfd, fcntl_setfd, read, write, FdFlags};
19use rustix::pipe::{pipe, pipe_with, PipeFlags};
20
21use crate::{Event, PollMode};
22
23/// Interface to epoll.
24#[derive(Debug)]
25pub struct Poller {
26    /// File descriptor for the epoll instance.
27    epoll_fd: OwnedFd,
28
29    /// Notifier used to wake up epoll.
30    notifier: Notifier,
31
32    /// File descriptor for the timerfd that produces timeouts.
33    ///
34    /// Redox does not support timerfd.
35    #[cfg(not(target_os = "redox"))]
36    timer_fd: Option<OwnedFd>,
37}
38
39impl Poller {
40    /// Creates a new poller.
41    pub fn new() -> io::Result<Poller> {
42        // Create an epoll instance.
43        //
44        // Use `epoll_create1` with `EPOLL_CLOEXEC`.
45        let epoll_fd = epoll::create(epoll::CreateFlags::CLOEXEC)?;
46
47        // Set up notifier and timerfd.
48        let notifier = Notifier::new()?;
49        #[cfg(not(target_os = "redox"))]
50        let timer_fd = timerfd_create(
51            TimerfdClockId::Monotonic,
52            TimerfdFlags::CLOEXEC | TimerfdFlags::NONBLOCK,
53        )
54        .ok();
55
56        let poller = Poller {
57            epoll_fd,
58            notifier,
59            #[cfg(not(target_os = "redox"))]
60            timer_fd,
61        };
62
63        unsafe {
64            #[cfg(not(target_os = "redox"))]
65            if let Some(ref timer_fd) = poller.timer_fd {
66                poller.add(
67                    timer_fd.as_raw_fd(),
68                    Event::none(crate::NOTIFY_KEY),
69                    PollMode::Oneshot,
70                )?;
71            }
72
73            poller.add(
74                poller.notifier.as_fd().as_raw_fd(),
75                Event::readable(crate::NOTIFY_KEY),
76                PollMode::Oneshot,
77            )?;
78        }
79
80        #[cfg(feature = "tracing")]
81        tracing::trace!(
82            epoll_fd = ?poller.epoll_fd.as_raw_fd(),
83            notifier = ?poller.notifier,
84            "new",
85        );
86        Ok(poller)
87    }
88
89    /// Whether this poller supports level-triggered events.
90    pub fn supports_level(&self) -> bool {
91        true
92    }
93
94    /// Whether the poller supports edge-triggered events.
95    pub fn supports_edge(&self) -> bool {
96        true
97    }
98
99    /// Adds a new file descriptor.
100    ///
101    /// # Safety
102    ///
103    /// The `fd` must be a valid file descriptor. The usual condition of remaining registered in
104    /// the `Poller` doesn't apply to `epoll`.
105    pub unsafe fn add(&self, fd: RawFd, ev: Event, mode: PollMode) -> io::Result<()> {
106        #[cfg(feature = "tracing")]
107        let span = tracing::trace_span!(
108            "add",
109            epoll_fd = ?self.epoll_fd.as_raw_fd(),
110            ?fd,
111            ?ev,
112        );
113        #[cfg(feature = "tracing")]
114        let _enter = span.enter();
115
116        epoll::add(
117            &self.epoll_fd,
118            unsafe { rustix::fd::BorrowedFd::borrow_raw(fd) },
119            epoll::EventData::new_u64(ev.key as u64),
120            epoll_flags(&ev, mode) | ev.extra.flags,
121        )?;
122
123        Ok(())
124    }
125
126    /// Modifies an existing file descriptor.
127    pub fn modify(&self, fd: BorrowedFd<'_>, ev: Event, mode: PollMode) -> io::Result<()> {
128        #[cfg(feature = "tracing")]
129        let span = tracing::trace_span!(
130            "modify",
131            epoll_fd = ?self.epoll_fd.as_raw_fd(),
132            ?fd,
133            ?ev,
134        );
135        #[cfg(feature = "tracing")]
136        let _enter = span.enter();
137
138        epoll::modify(
139            &self.epoll_fd,
140            fd,
141            epoll::EventData::new_u64(ev.key as u64),
142            epoll_flags(&ev, mode) | ev.extra.flags,
143        )?;
144
145        Ok(())
146    }
147
148    /// Deletes a file descriptor.
149    #[cfg_attr(not(feature = "tracing"), inline(always))]
150    pub fn delete(&self, fd: BorrowedFd<'_>) -> io::Result<()> {
151        #[cfg(feature = "tracing")]
152        let span = tracing::trace_span!(
153            "delete",
154            epoll_fd = ?self.epoll_fd.as_raw_fd(),
155            ?fd,
156        );
157        #[cfg(feature = "tracing")]
158        let _enter = span.enter();
159
160        epoll::delete(&self.epoll_fd, fd)?;
161
162        Ok(())
163    }
164
165    /// Waits for I/O events with an optional deadline.
166    #[allow(clippy::needless_update)]
167    pub fn wait_deadline(&self, events: &mut Events, deadline: Option<Instant>) -> io::Result<()> {
168        #[cfg(feature = "tracing")]
169        let span = tracing::trace_span!(
170            "wait",
171            epoll_fd = ?self.epoll_fd.as_raw_fd(),
172            ?deadline,
173        );
174        #[cfg(feature = "tracing")]
175        let _enter = span.enter();
176
177        let timeout = deadline.map(|deadline| deadline.saturating_duration_since(Instant::now()));
178
179        #[cfg(not(target_os = "redox"))]
180        if let Some(ref timer_fd) = self.timer_fd {
181            // Configure the timeout using timerfd.
182            let new_val = Itimerspec {
183                it_interval: TS_ZERO,
184                it_value: match timeout {
185                    None => TS_ZERO,
186                    Some(t) => {
187                        let mut ts = TS_ZERO;
188                        ts.tv_sec = t.as_secs() as _;
189                        ts.tv_nsec = t.subsec_nanos() as _;
190                        ts
191                    }
192                },
193                ..unsafe { std::mem::zeroed() }
194            };
195
196            timerfd_settime(timer_fd, TimerfdTimerFlags::empty(), &new_val)?;
197
198            // Set interest in timerfd.
199            self.modify(
200                timer_fd.as_fd(),
201                Event::readable(crate::NOTIFY_KEY),
202                PollMode::Oneshot,
203            )?;
204        }
205
206        #[cfg(not(target_os = "redox"))]
207        let timer_fd = &self.timer_fd;
208        #[cfg(target_os = "redox")]
209        let timer_fd: Option<core::convert::Infallible> = None;
210
211        // Timeout for epoll. In case of overflow, use no timeout.
212        let timeout = match (timer_fd, timeout) {
213            (_, Some(t)) if t == Duration::from_secs(0) => Some(Timespec::default()),
214            (None, Some(t)) => Timespec::try_from(t).ok(),
215            _ => None,
216        };
217
218        // Wait for I/O events.
219        epoll::wait(
220            &self.epoll_fd,
221            spare_capacity(&mut events.list),
222            timeout.as_ref(),
223        )?;
224        #[cfg(feature = "tracing")]
225        tracing::trace!(
226            epoll_fd = ?self.epoll_fd.as_raw_fd(),
227            res = ?events.list.len(),
228            "new events",
229        );
230
231        // Clear the notification (if received) and re-register interest in it.
232        self.notifier.clear();
233        self.modify(
234            self.notifier.as_fd(),
235            Event::readable(crate::NOTIFY_KEY),
236            PollMode::Oneshot,
237        )?;
238        Ok(())
239    }
240
241    /// Sends a notification to wake up the current or next `wait()` call.
242    pub fn notify(&self) -> io::Result<()> {
243        #[cfg(feature = "tracing")]
244        let span = tracing::trace_span!(
245            "notify",
246            epoll_fd = ?self.epoll_fd.as_raw_fd(),
247            notifier = ?self.notifier,
248        );
249        #[cfg(feature = "tracing")]
250        let _enter = span.enter();
251
252        self.notifier.notify();
253        Ok(())
254    }
255}
256
257impl AsRawFd for Poller {
258    fn as_raw_fd(&self) -> RawFd {
259        self.epoll_fd.as_raw_fd()
260    }
261}
262
263impl AsFd for Poller {
264    fn as_fd(&self) -> BorrowedFd<'_> {
265        self.epoll_fd.as_fd()
266    }
267}
268
269impl Drop for Poller {
270    fn drop(&mut self) {
271        #[cfg(feature = "tracing")]
272        let span = tracing::trace_span!(
273            "drop",
274            epoll_fd = ?self.epoll_fd.as_raw_fd(),
275            notifier = ?self.notifier,
276        );
277        #[cfg(feature = "tracing")]
278        let _enter = span.enter();
279
280        #[cfg(not(target_os = "redox"))]
281        if let Some(timer_fd) = self.timer_fd.take() {
282            let _ = self.delete(timer_fd.as_fd());
283        }
284        let _ = self.delete(self.notifier.as_fd());
285    }
286}
287
288/// `timespec` value that equals zero.
289#[cfg(not(target_os = "redox"))]
290const TS_ZERO: Timespec = unsafe { std::mem::transmute([0u8; std::mem::size_of::<Timespec>()]) };
291
292/// Get the EPOLL flags for the interest.
293fn epoll_flags(interest: &Event, mode: PollMode) -> epoll::EventFlags {
294    let mut flags = match mode {
295        PollMode::Oneshot => epoll::EventFlags::ONESHOT,
296        PollMode::Level => epoll::EventFlags::empty(),
297        PollMode::Edge => epoll::EventFlags::ET,
298        PollMode::EdgeOneshot => epoll::EventFlags::ET | epoll::EventFlags::ONESHOT,
299    };
300    if interest.readable {
301        flags |= read_flags();
302    }
303    if interest.writable {
304        flags |= write_flags();
305    }
306    flags
307}
308
309/// Epoll flags for all possible readability events.
310fn read_flags() -> epoll::EventFlags {
311    use epoll::EventFlags as Epoll;
312    Epoll::IN | Epoll::HUP | Epoll::ERR | Epoll::PRI
313}
314
315/// Epoll flags for all possible writability events.
316fn write_flags() -> epoll::EventFlags {
317    use epoll::EventFlags as Epoll;
318    Epoll::OUT | Epoll::HUP | Epoll::ERR
319}
320
321/// A list of reported I/O events.
322pub struct Events {
323    list: Vec<epoll::Event>,
324}
325
326unsafe impl Send for Events {}
327
328impl Events {
329    /// Creates an empty list.
330    pub fn with_capacity(cap: usize) -> Events {
331        Events {
332            list: Vec::with_capacity(cap),
333        }
334    }
335
336    /// Iterates over I/O events.
337    pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
338        self.list.iter().map(|ev| {
339            let flags = ev.flags;
340            Event {
341                key: ev.data.u64() as usize,
342                readable: flags.intersects(read_flags()),
343                writable: flags.intersects(write_flags()),
344                extra: EventExtra { flags },
345            }
346        })
347    }
348
349    /// Clear the list.
350    pub fn clear(&mut self) {
351        self.list.clear();
352    }
353
354    /// Get the capacity of the list.
355    pub fn capacity(&self) -> usize {
356        self.list.capacity()
357    }
358}
359
360/// Extra information about this event.
361#[derive(Debug, Clone, Copy, PartialEq, Eq)]
362pub struct EventExtra {
363    flags: epoll::EventFlags,
364}
365
366impl EventExtra {
367    /// Create an empty version of the data.
368    #[inline]
369    pub const fn empty() -> EventExtra {
370        EventExtra {
371            flags: epoll::EventFlags::empty(),
372        }
373    }
374
375    /// Add the interrupt flag to this event.
376    #[inline]
377    pub fn set_hup(&mut self, active: bool) {
378        self.flags.set(epoll::EventFlags::HUP, active);
379    }
380
381    /// Add the priority flag to this event.
382    #[inline]
383    pub fn set_pri(&mut self, active: bool) {
384        self.flags.set(epoll::EventFlags::PRI, active);
385    }
386
387    /// Tell if the interrupt flag is set.
388    #[inline]
389    pub fn is_hup(&self) -> bool {
390        self.flags.contains(epoll::EventFlags::HUP)
391    }
392
393    /// Tell if the priority flag is set.
394    #[inline]
395    pub fn is_pri(&self) -> bool {
396        self.flags.contains(epoll::EventFlags::PRI)
397    }
398
399    #[inline]
400    pub fn is_connect_failed(&self) -> Option<bool> {
401        Some(
402            self.flags.contains(epoll::EventFlags::ERR)
403                && self.flags.contains(epoll::EventFlags::HUP),
404        )
405    }
406
407    #[inline]
408    pub fn is_err(&self) -> Option<bool> {
409        Some(self.flags.contains(epoll::EventFlags::ERR))
410    }
411}
412
413/// The notifier for Linux.
414///
415/// Certain container runtimes do not expose eventfd to the client, as it relies on the host and
416/// can be used to "escape" the container under certain conditions. Gramine is the prime example,
417/// see [here](gramine). In this case, fall back to using a pipe.
418///
419/// [gramine]: https://gramine.readthedocs.io/en/stable/manifest-syntax.html#allowing-eventfd
420#[derive(Debug)]
421enum Notifier {
422    /// The primary notifier, using eventfd.
423    #[cfg(not(target_os = "redox"))]
424    EventFd(OwnedFd),
425
426    /// The fallback notifier, using a pipe.
427    Pipe {
428        /// The read end of the pipe.
429        read_pipe: OwnedFd,
430
431        /// The write end of the pipe.
432        write_pipe: OwnedFd,
433    },
434}
435
436impl Notifier {
437    /// Create a new notifier.
438    fn new() -> io::Result<Self> {
439        // Skip eventfd for testing if necessary.
440        #[cfg(not(target_os = "redox"))]
441        {
442            if !cfg!(polling_test_epoll_pipe) {
443                // Try to create an eventfd.
444                match eventfd(0, EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK) {
445                    Ok(fd) => {
446                        #[cfg(feature = "tracing")]
447                        tracing::trace!("created eventfd for notifier");
448                        return Ok(Notifier::EventFd(fd));
449                    }
450
451                    Err(_err) => {
452                        #[cfg(feature = "tracing")]
453                        tracing::warn!(
454                            "eventfd() failed with error ({}), falling back to pipe",
455                            _err
456                        );
457                    }
458                }
459            }
460        }
461
462        let (read, write) = pipe_with(PipeFlags::CLOEXEC).or_else(|_| {
463            let (read, write) = pipe()?;
464            fcntl_setfd(&read, fcntl_getfd(&read)? | FdFlags::CLOEXEC)?;
465            fcntl_setfd(&write, fcntl_getfd(&write)? | FdFlags::CLOEXEC)?;
466            io::Result::Ok((read, write))
467        })?;
468
469        fcntl_setfl(&read, fcntl_getfl(&read)? | OFlags::NONBLOCK)?;
470        Ok(Notifier::Pipe {
471            read_pipe: read,
472            write_pipe: write,
473        })
474    }
475
476    /// The file descriptor to register in the poller.
477    fn as_fd(&self) -> BorrowedFd<'_> {
478        match self {
479            #[cfg(not(target_os = "redox"))]
480            Notifier::EventFd(fd) => fd.as_fd(),
481            Notifier::Pipe {
482                read_pipe: read, ..
483            } => read.as_fd(),
484        }
485    }
486
487    /// Notify the poller.
488    fn notify(&self) {
489        match self {
490            #[cfg(not(target_os = "redox"))]
491            Self::EventFd(fd) => {
492                let buf: [u8; 8] = 1u64.to_ne_bytes();
493                let _ = write(fd, &buf);
494            }
495
496            Self::Pipe { write_pipe, .. } => {
497                write(write_pipe, &[0; 1]).ok();
498            }
499        }
500    }
501
502    /// Clear the notification.
503    fn clear(&self) {
504        match self {
505            #[cfg(not(target_os = "redox"))]
506            Self::EventFd(fd) => {
507                let mut buf = [0u8; 8];
508                let _ = read(fd, &mut buf);
509            }
510
511            Self::Pipe { read_pipe, .. } => while read(read_pipe, &mut [0u8; 1024]).is_ok() {},
512        }
513    }
514}