tokio/process/unix/
pidfd_reaper.rs

1use crate::{
2    io::{interest::Interest, PollEvented},
3    process::{
4        imp::{orphan::Wait, OrphanQueue},
5        kill::Kill,
6    },
7    util::error::RUNTIME_SHUTTING_DOWN_ERROR,
8};
9
10use libc::{syscall, SYS_pidfd_open, ENOSYS, PIDFD_NONBLOCK};
11use mio::{event::Source, unix::SourceFd};
12use std::{
13    fs::File,
14    future::Future,
15    io,
16    marker::Unpin,
17    ops::Deref,
18    os::unix::io::{AsRawFd, FromRawFd, RawFd},
19    pin::Pin,
20    process::ExitStatus,
21    sync::atomic::{AtomicBool, Ordering::Relaxed},
22    task::{Context, Poll},
23};
24
25#[derive(Debug)]
26struct Pidfd {
27    fd: File,
28}
29
30impl Pidfd {
31    fn open(pid: u32) -> Option<Pidfd> {
32        // Store false (0) to reduce executable size
33        static NO_PIDFD_SUPPORT: AtomicBool = AtomicBool::new(false);
34
35        if NO_PIDFD_SUPPORT.load(Relaxed) {
36            return None;
37        }
38
39        // Safety: The following function calls invovkes syscall pidfd_open,
40        // which takes two parameter: pidfd_open(fd: c_int, flag: c_int)
41        let fd = unsafe { syscall(SYS_pidfd_open, pid, PIDFD_NONBLOCK) };
42        if fd == -1 {
43            let errno = io::Error::last_os_error().raw_os_error().unwrap();
44
45            if errno == ENOSYS {
46                NO_PIDFD_SUPPORT.store(true, Relaxed)
47            }
48
49            None
50        } else {
51            // Safety: pidfd_open returns -1 on error or a valid fd with ownership.
52            Some(Pidfd {
53                fd: unsafe { File::from_raw_fd(fd as i32) },
54            })
55        }
56    }
57}
58
59impl AsRawFd for Pidfd {
60    fn as_raw_fd(&self) -> RawFd {
61        self.fd.as_raw_fd()
62    }
63}
64
65impl Source for Pidfd {
66    fn register(
67        &mut self,
68        registry: &mio::Registry,
69        token: mio::Token,
70        interest: mio::Interest,
71    ) -> io::Result<()> {
72        SourceFd(&self.as_raw_fd()).register(registry, token, interest)
73    }
74
75    fn reregister(
76        &mut self,
77        registry: &mio::Registry,
78        token: mio::Token,
79        interest: mio::Interest,
80    ) -> io::Result<()> {
81        SourceFd(&self.as_raw_fd()).reregister(registry, token, interest)
82    }
83
84    fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> {
85        SourceFd(&self.as_raw_fd()).deregister(registry)
86    }
87}
88
89#[derive(Debug)]
90struct PidfdReaperInner<W>
91where
92    W: Unpin,
93{
94    inner: W,
95    pidfd: PollEvented<Pidfd>,
96}
97
98fn display_eq(d: impl std::fmt::Display, s: &str) -> bool {
99    use std::fmt::Write;
100
101    struct FormatEq<'r> {
102        remainder: &'r str,
103        unequal: bool,
104    }
105
106    impl<'r> Write for FormatEq<'r> {
107        fn write_str(&mut self, s: &str) -> std::fmt::Result {
108            if !self.unequal {
109                if let Some(new_remainder) = self.remainder.strip_prefix(s) {
110                    self.remainder = new_remainder;
111                } else {
112                    self.unequal = true;
113                }
114            }
115            Ok(())
116        }
117    }
118
119    let mut fmt_eq = FormatEq {
120        remainder: s,
121        unequal: false,
122    };
123    let _ = write!(fmt_eq, "{d}");
124    fmt_eq.remainder.is_empty() && !fmt_eq.unequal
125}
126
127#[allow(deprecated)]
128fn is_rt_shutdown_err(err: &io::Error) -> bool {
129    if let Some(inner) = err.get_ref() {
130        err.kind() == io::ErrorKind::Other
131            && inner.source().is_none()
132            && display_eq(inner, RUNTIME_SHUTTING_DOWN_ERROR)
133    } else {
134        false
135    }
136}
137
138impl<W> Future for PidfdReaperInner<W>
139where
140    W: Wait + Unpin,
141{
142    type Output = io::Result<ExitStatus>;
143
144    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
145        let this = Pin::into_inner(self);
146
147        match this.pidfd.registration().poll_read_ready(cx) {
148            Poll::Ready(Ok(evt)) => {
149                if let Some(exit_code) = this.inner.try_wait()? {
150                    return Poll::Ready(Ok(exit_code));
151                }
152                this.pidfd.registration().clear_readiness(evt);
153            }
154            Poll::Ready(Err(err)) if is_rt_shutdown_err(&err) => {}
155            Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
156            Poll::Pending => return Poll::Pending,
157        };
158
159        this.pidfd.reregister(Interest::READABLE)?;
160        cx.waker().wake_by_ref();
161        Poll::Pending
162    }
163}
164
165#[derive(Debug)]
166pub(crate) struct PidfdReaper<W, Q>
167where
168    W: Wait + Unpin,
169    Q: OrphanQueue<W> + Unpin,
170{
171    inner: Option<PidfdReaperInner<W>>,
172    orphan_queue: Q,
173}
174
175impl<W, Q> Deref for PidfdReaper<W, Q>
176where
177    W: Wait + Unpin,
178    Q: OrphanQueue<W> + Unpin,
179{
180    type Target = W;
181
182    fn deref(&self) -> &Self::Target {
183        &self.inner.as_ref().expect("inner has gone away").inner
184    }
185}
186
187impl<W, Q> PidfdReaper<W, Q>
188where
189    W: Wait + Unpin,
190    Q: OrphanQueue<W> + Unpin,
191{
192    pub(crate) fn new(inner: W, orphan_queue: Q) -> Result<Self, (Option<io::Error>, W)> {
193        if let Some(pidfd) = Pidfd::open(inner.id()) {
194            match PollEvented::new_with_interest(pidfd, Interest::READABLE) {
195                Ok(pidfd) => Ok(Self {
196                    inner: Some(PidfdReaperInner { pidfd, inner }),
197                    orphan_queue,
198                }),
199                Err(io_error) => Err((Some(io_error), inner)),
200            }
201        } else {
202            Err((None, inner))
203        }
204    }
205
206    pub(crate) fn inner_mut(&mut self) -> &mut W {
207        &mut self.inner.as_mut().expect("inner has gone away").inner
208    }
209}
210
211impl<W, Q> Future for PidfdReaper<W, Q>
212where
213    W: Wait + Unpin,
214    Q: OrphanQueue<W> + Unpin,
215{
216    type Output = io::Result<ExitStatus>;
217
218    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
219        Pin::new(
220            Pin::into_inner(self)
221                .inner
222                .as_mut()
223                .expect("inner has gone away"),
224        )
225        .poll(cx)
226    }
227}
228
229impl<W, Q> Kill for PidfdReaper<W, Q>
230where
231    W: Wait + Unpin + Kill,
232    Q: OrphanQueue<W> + Unpin,
233{
234    fn kill(&mut self) -> io::Result<()> {
235        self.inner_mut().kill()
236    }
237}
238
239impl<W, Q> Drop for PidfdReaper<W, Q>
240where
241    W: Wait + Unpin,
242    Q: OrphanQueue<W> + Unpin,
243{
244    fn drop(&mut self) {
245        let mut orphan = self.inner.take().expect("inner has gone away").inner;
246        if let Ok(Some(_)) = orphan.try_wait() {
247            return;
248        }
249
250        self.orphan_queue.push_orphan(orphan);
251    }
252}
253
254#[cfg(all(test, not(loom), not(miri)))]
255mod test {
256    use super::*;
257    use crate::{
258        process::unix::orphan::test::MockQueue,
259        runtime::{Builder as RuntimeBuilder, Runtime},
260    };
261    use std::process::{Command, Output};
262
263    fn create_runtime() -> Runtime {
264        RuntimeBuilder::new_current_thread()
265            .enable_io()
266            .build()
267            .unwrap()
268    }
269
270    fn run_test(fut: impl Future<Output = ()>) {
271        create_runtime().block_on(fut)
272    }
273
274    fn is_pidfd_available() -> bool {
275        let Output { stdout, status, .. } = Command::new("uname").arg("-r").output().unwrap();
276        assert!(status.success());
277        let stdout = String::from_utf8_lossy(&stdout);
278
279        let mut kernel_version_iter = match stdout.split_once('-') {
280            Some((version, _)) => version,
281            _ => &stdout,
282        }
283        .split('.');
284
285        let major: u32 = kernel_version_iter.next().unwrap().parse().unwrap();
286        let minor: u32 = kernel_version_iter.next().unwrap().trim().parse().unwrap();
287
288        major >= 6 || (major == 5 && minor >= 10)
289    }
290
291    #[test]
292    fn test_pidfd_reaper_poll() {
293        if !is_pidfd_available() {
294            eprintln!("pidfd is not available on this linux kernel, skip this test");
295            return;
296        }
297
298        let queue = MockQueue::new();
299
300        run_test(async {
301            let child = Command::new("true").spawn().unwrap();
302            let pidfd_reaper = PidfdReaper::new(child, &queue).unwrap();
303
304            let exit_status = pidfd_reaper.await.unwrap();
305            assert!(exit_status.success());
306        });
307
308        assert!(queue.all_enqueued.borrow().is_empty());
309    }
310
311    #[test]
312    fn test_pidfd_reaper_kill() {
313        if !is_pidfd_available() {
314            eprintln!("pidfd is not available on this linux kernel, skip this test");
315            return;
316        }
317
318        let queue = MockQueue::new();
319
320        run_test(async {
321            let child = Command::new("sleep").arg("1800").spawn().unwrap();
322            let mut pidfd_reaper = PidfdReaper::new(child, &queue).unwrap();
323
324            pidfd_reaper.kill().unwrap();
325
326            let exit_status = pidfd_reaper.await.unwrap();
327            assert!(!exit_status.success());
328        });
329
330        assert!(queue.all_enqueued.borrow().is_empty());
331    }
332
333    #[test]
334    fn test_pidfd_reaper_drop() {
335        if !is_pidfd_available() {
336            eprintln!("pidfd is not available on this linux kernel, skip this test");
337            return;
338        }
339
340        let queue = MockQueue::new();
341
342        let mut child = Command::new("sleep").arg("1800").spawn().unwrap();
343
344        run_test(async {
345            let _pidfd_reaper = PidfdReaper::new(&mut child, &queue).unwrap();
346        });
347
348        assert_eq!(queue.all_enqueued.borrow().len(), 1);
349
350        child.kill().unwrap();
351        child.wait().unwrap();
352    }
353}