1use std::io;
4use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
5use std::time::{Duration, Instant};
6
7#[cfg(not(target_os = "redox"))]
8use rustix::event::{eventfd, EventfdFlags};
9#[cfg(not(target_os = "redox"))]
10use rustix::time::{
11 timerfd_create, timerfd_settime, Itimerspec, TimerfdClockId, TimerfdFlags, TimerfdTimerFlags,
12};
13
14use rustix::buffer::spare_capacity;
15use rustix::event::{epoll, Timespec};
16use rustix::fd::OwnedFd;
17use rustix::fs::{fcntl_getfl, fcntl_setfl, OFlags};
18use rustix::io::{fcntl_getfd, fcntl_setfd, read, write, FdFlags};
19use rustix::pipe::{pipe, pipe_with, PipeFlags};
20
21use crate::{Event, PollMode};
22
23#[derive(Debug)]
25pub struct Poller {
26 epoll_fd: OwnedFd,
28
29 notifier: Notifier,
31
32 #[cfg(not(target_os = "redox"))]
36 timer_fd: Option<OwnedFd>,
37}
38
39impl Poller {
40 pub fn new() -> io::Result<Poller> {
42 let epoll_fd = epoll::create(epoll::CreateFlags::CLOEXEC)?;
46
47 let notifier = Notifier::new()?;
49 #[cfg(not(target_os = "redox"))]
50 let timer_fd = timerfd_create(
51 TimerfdClockId::Monotonic,
52 TimerfdFlags::CLOEXEC | TimerfdFlags::NONBLOCK,
53 )
54 .ok();
55
56 let poller = Poller {
57 epoll_fd,
58 notifier,
59 #[cfg(not(target_os = "redox"))]
60 timer_fd,
61 };
62
63 unsafe {
64 #[cfg(not(target_os = "redox"))]
65 if let Some(ref timer_fd) = poller.timer_fd {
66 poller.add(
67 timer_fd.as_raw_fd(),
68 Event::none(crate::NOTIFY_KEY),
69 PollMode::Oneshot,
70 )?;
71 }
72
73 poller.add(
74 poller.notifier.as_fd().as_raw_fd(),
75 Event::readable(crate::NOTIFY_KEY),
76 PollMode::Oneshot,
77 )?;
78 }
79
80 #[cfg(feature = "tracing")]
81 tracing::trace!(
82 epoll_fd = ?poller.epoll_fd.as_raw_fd(),
83 notifier = ?poller.notifier,
84 "new",
85 );
86 Ok(poller)
87 }
88
89 pub fn supports_level(&self) -> bool {
91 true
92 }
93
94 pub fn supports_edge(&self) -> bool {
96 true
97 }
98
99 pub unsafe fn add(&self, fd: RawFd, ev: Event, mode: PollMode) -> io::Result<()> {
106 #[cfg(feature = "tracing")]
107 let span = tracing::trace_span!(
108 "add",
109 epoll_fd = ?self.epoll_fd.as_raw_fd(),
110 ?fd,
111 ?ev,
112 );
113 #[cfg(feature = "tracing")]
114 let _enter = span.enter();
115
116 epoll::add(
117 &self.epoll_fd,
118 unsafe { rustix::fd::BorrowedFd::borrow_raw(fd) },
119 epoll::EventData::new_u64(ev.key as u64),
120 epoll_flags(&ev, mode) | ev.extra.flags,
121 )?;
122
123 Ok(())
124 }
125
126 pub fn modify(&self, fd: BorrowedFd<'_>, ev: Event, mode: PollMode) -> io::Result<()> {
128 #[cfg(feature = "tracing")]
129 let span = tracing::trace_span!(
130 "modify",
131 epoll_fd = ?self.epoll_fd.as_raw_fd(),
132 ?fd,
133 ?ev,
134 );
135 #[cfg(feature = "tracing")]
136 let _enter = span.enter();
137
138 epoll::modify(
139 &self.epoll_fd,
140 fd,
141 epoll::EventData::new_u64(ev.key as u64),
142 epoll_flags(&ev, mode) | ev.extra.flags,
143 )?;
144
145 Ok(())
146 }
147
148 #[cfg_attr(not(feature = "tracing"), inline(always))]
150 pub fn delete(&self, fd: BorrowedFd<'_>) -> io::Result<()> {
151 #[cfg(feature = "tracing")]
152 let span = tracing::trace_span!(
153 "delete",
154 epoll_fd = ?self.epoll_fd.as_raw_fd(),
155 ?fd,
156 );
157 #[cfg(feature = "tracing")]
158 let _enter = span.enter();
159
160 epoll::delete(&self.epoll_fd, fd)?;
161
162 Ok(())
163 }
164
165 #[allow(clippy::needless_update)]
167 pub fn wait_deadline(&self, events: &mut Events, deadline: Option<Instant>) -> io::Result<()> {
168 #[cfg(feature = "tracing")]
169 let span = tracing::trace_span!(
170 "wait",
171 epoll_fd = ?self.epoll_fd.as_raw_fd(),
172 ?deadline,
173 );
174 #[cfg(feature = "tracing")]
175 let _enter = span.enter();
176
177 let timeout = deadline.map(|deadline| deadline.saturating_duration_since(Instant::now()));
178
179 #[cfg(not(target_os = "redox"))]
180 if let Some(ref timer_fd) = self.timer_fd {
181 let new_val = Itimerspec {
183 it_interval: TS_ZERO,
184 it_value: match timeout {
185 None => TS_ZERO,
186 Some(t) => {
187 let mut ts = TS_ZERO;
188 ts.tv_sec = t.as_secs() as _;
189 ts.tv_nsec = t.subsec_nanos() as _;
190 ts
191 }
192 },
193 ..unsafe { std::mem::zeroed() }
194 };
195
196 timerfd_settime(timer_fd, TimerfdTimerFlags::empty(), &new_val)?;
197
198 self.modify(
200 timer_fd.as_fd(),
201 Event::readable(crate::NOTIFY_KEY),
202 PollMode::Oneshot,
203 )?;
204 }
205
206 #[cfg(not(target_os = "redox"))]
207 let timer_fd = &self.timer_fd;
208 #[cfg(target_os = "redox")]
209 let timer_fd: Option<core::convert::Infallible> = None;
210
211 let timeout = match (timer_fd, timeout) {
213 (_, Some(t)) if t == Duration::from_secs(0) => Some(Timespec::default()),
214 (None, Some(t)) => Timespec::try_from(t).ok(),
215 _ => None,
216 };
217
218 epoll::wait(
220 &self.epoll_fd,
221 spare_capacity(&mut events.list),
222 timeout.as_ref(),
223 )?;
224 #[cfg(feature = "tracing")]
225 tracing::trace!(
226 epoll_fd = ?self.epoll_fd.as_raw_fd(),
227 res = ?events.list.len(),
228 "new events",
229 );
230
231 self.notifier.clear();
233 self.modify(
234 self.notifier.as_fd(),
235 Event::readable(crate::NOTIFY_KEY),
236 PollMode::Oneshot,
237 )?;
238 Ok(())
239 }
240
241 pub fn notify(&self) -> io::Result<()> {
243 #[cfg(feature = "tracing")]
244 let span = tracing::trace_span!(
245 "notify",
246 epoll_fd = ?self.epoll_fd.as_raw_fd(),
247 notifier = ?self.notifier,
248 );
249 #[cfg(feature = "tracing")]
250 let _enter = span.enter();
251
252 self.notifier.notify();
253 Ok(())
254 }
255}
256
257impl AsRawFd for Poller {
258 fn as_raw_fd(&self) -> RawFd {
259 self.epoll_fd.as_raw_fd()
260 }
261}
262
263impl AsFd for Poller {
264 fn as_fd(&self) -> BorrowedFd<'_> {
265 self.epoll_fd.as_fd()
266 }
267}
268
269impl Drop for Poller {
270 fn drop(&mut self) {
271 #[cfg(feature = "tracing")]
272 let span = tracing::trace_span!(
273 "drop",
274 epoll_fd = ?self.epoll_fd.as_raw_fd(),
275 notifier = ?self.notifier,
276 );
277 #[cfg(feature = "tracing")]
278 let _enter = span.enter();
279
280 #[cfg(not(target_os = "redox"))]
281 if let Some(timer_fd) = self.timer_fd.take() {
282 let _ = self.delete(timer_fd.as_fd());
283 }
284 let _ = self.delete(self.notifier.as_fd());
285 }
286}
287
288#[cfg(not(target_os = "redox"))]
290const TS_ZERO: Timespec = unsafe { std::mem::transmute([0u8; std::mem::size_of::<Timespec>()]) };
291
292fn epoll_flags(interest: &Event, mode: PollMode) -> epoll::EventFlags {
294 let mut flags = match mode {
295 PollMode::Oneshot => epoll::EventFlags::ONESHOT,
296 PollMode::Level => epoll::EventFlags::empty(),
297 PollMode::Edge => epoll::EventFlags::ET,
298 PollMode::EdgeOneshot => epoll::EventFlags::ET | epoll::EventFlags::ONESHOT,
299 };
300 if interest.readable {
301 flags |= read_flags();
302 }
303 if interest.writable {
304 flags |= write_flags();
305 }
306 flags
307}
308
309fn read_flags() -> epoll::EventFlags {
311 use epoll::EventFlags as Epoll;
312 Epoll::IN | Epoll::HUP | Epoll::ERR | Epoll::PRI
313}
314
315fn write_flags() -> epoll::EventFlags {
317 use epoll::EventFlags as Epoll;
318 Epoll::OUT | Epoll::HUP | Epoll::ERR
319}
320
321pub struct Events {
323 list: Vec<epoll::Event>,
324}
325
326unsafe impl Send for Events {}
327
328impl Events {
329 pub fn with_capacity(cap: usize) -> Events {
331 Events {
332 list: Vec::with_capacity(cap),
333 }
334 }
335
336 pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
338 self.list.iter().map(|ev| {
339 let flags = ev.flags;
340 Event {
341 key: ev.data.u64() as usize,
342 readable: flags.intersects(read_flags()),
343 writable: flags.intersects(write_flags()),
344 extra: EventExtra { flags },
345 }
346 })
347 }
348
349 pub fn clear(&mut self) {
351 self.list.clear();
352 }
353
354 pub fn capacity(&self) -> usize {
356 self.list.capacity()
357 }
358}
359
360#[derive(Debug, Clone, Copy, PartialEq, Eq)]
362pub struct EventExtra {
363 flags: epoll::EventFlags,
364}
365
366impl EventExtra {
367 #[inline]
369 pub const fn empty() -> EventExtra {
370 EventExtra {
371 flags: epoll::EventFlags::empty(),
372 }
373 }
374
375 #[inline]
377 pub fn set_hup(&mut self, active: bool) {
378 self.flags.set(epoll::EventFlags::HUP, active);
379 }
380
381 #[inline]
383 pub fn set_pri(&mut self, active: bool) {
384 self.flags.set(epoll::EventFlags::PRI, active);
385 }
386
387 #[inline]
389 pub fn is_hup(&self) -> bool {
390 self.flags.contains(epoll::EventFlags::HUP)
391 }
392
393 #[inline]
395 pub fn is_pri(&self) -> bool {
396 self.flags.contains(epoll::EventFlags::PRI)
397 }
398
399 #[inline]
400 pub fn is_connect_failed(&self) -> Option<bool> {
401 Some(
402 self.flags.contains(epoll::EventFlags::ERR)
403 && self.flags.contains(epoll::EventFlags::HUP),
404 )
405 }
406
407 #[inline]
408 pub fn is_err(&self) -> Option<bool> {
409 Some(self.flags.contains(epoll::EventFlags::ERR))
410 }
411}
412
413#[derive(Debug)]
421enum Notifier {
422 #[cfg(not(target_os = "redox"))]
424 EventFd(OwnedFd),
425
426 Pipe {
428 read_pipe: OwnedFd,
430
431 write_pipe: OwnedFd,
433 },
434}
435
436impl Notifier {
437 fn new() -> io::Result<Self> {
439 #[cfg(not(target_os = "redox"))]
441 {
442 if !cfg!(polling_test_epoll_pipe) {
443 match eventfd(0, EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK) {
445 Ok(fd) => {
446 #[cfg(feature = "tracing")]
447 tracing::trace!("created eventfd for notifier");
448 return Ok(Notifier::EventFd(fd));
449 }
450
451 Err(_err) => {
452 #[cfg(feature = "tracing")]
453 tracing::warn!(
454 "eventfd() failed with error ({}), falling back to pipe",
455 _err
456 );
457 }
458 }
459 }
460 }
461
462 let (read, write) = pipe_with(PipeFlags::CLOEXEC).or_else(|_| {
463 let (read, write) = pipe()?;
464 fcntl_setfd(&read, fcntl_getfd(&read)? | FdFlags::CLOEXEC)?;
465 fcntl_setfd(&write, fcntl_getfd(&write)? | FdFlags::CLOEXEC)?;
466 io::Result::Ok((read, write))
467 })?;
468
469 fcntl_setfl(&read, fcntl_getfl(&read)? | OFlags::NONBLOCK)?;
470 Ok(Notifier::Pipe {
471 read_pipe: read,
472 write_pipe: write,
473 })
474 }
475
476 fn as_fd(&self) -> BorrowedFd<'_> {
478 match self {
479 #[cfg(not(target_os = "redox"))]
480 Notifier::EventFd(fd) => fd.as_fd(),
481 Notifier::Pipe {
482 read_pipe: read, ..
483 } => read.as_fd(),
484 }
485 }
486
487 fn notify(&self) {
489 match self {
490 #[cfg(not(target_os = "redox"))]
491 Self::EventFd(fd) => {
492 let buf: [u8; 8] = 1u64.to_ne_bytes();
493 let _ = write(fd, &buf);
494 }
495
496 Self::Pipe { write_pipe, .. } => {
497 write(write_pipe, &[0; 1]).ok();
498 }
499 }
500 }
501
502 fn clear(&self) {
504 match self {
505 #[cfg(not(target_os = "redox"))]
506 Self::EventFd(fd) => {
507 let mut buf = [0u8; 8];
508 let _ = read(fd, &mut buf);
509 }
510
511 Self::Pipe { read_pipe, .. } => while read(read_pipe, &mut [0u8; 1024]).is_ok() {},
512 }
513 }
514}