1use crate::{
2 io::{interest::Interest, PollEvented},
3 process::{
4 imp::{orphan::Wait, OrphanQueue},
5 kill::Kill,
6 },
7 util::error::RUNTIME_SHUTTING_DOWN_ERROR,
8};
9
10use libc::{syscall, SYS_pidfd_open, ENOSYS, PIDFD_NONBLOCK};
11use mio::{event::Source, unix::SourceFd};
12use std::{
13 fs::File,
14 future::Future,
15 io,
16 marker::Unpin,
17 ops::Deref,
18 os::unix::io::{AsRawFd, FromRawFd, RawFd},
19 pin::Pin,
20 process::ExitStatus,
21 sync::atomic::{AtomicBool, Ordering::Relaxed},
22 task::{Context, Poll},
23};
24
25#[derive(Debug)]
26struct Pidfd {
27 fd: File,
28}
29
30impl Pidfd {
31 fn open(pid: u32) -> Option<Pidfd> {
32 static NO_PIDFD_SUPPORT: AtomicBool = AtomicBool::new(false);
34
35 if NO_PIDFD_SUPPORT.load(Relaxed) {
36 return None;
37 }
38
39 let fd = unsafe { syscall(SYS_pidfd_open, pid, PIDFD_NONBLOCK) };
42 if fd == -1 {
43 let errno = io::Error::last_os_error().raw_os_error().unwrap();
44
45 if errno == ENOSYS {
46 NO_PIDFD_SUPPORT.store(true, Relaxed)
47 }
48
49 None
50 } else {
51 Some(Pidfd {
53 fd: unsafe { File::from_raw_fd(fd as i32) },
54 })
55 }
56 }
57}
58
59impl AsRawFd for Pidfd {
60 fn as_raw_fd(&self) -> RawFd {
61 self.fd.as_raw_fd()
62 }
63}
64
65impl Source for Pidfd {
66 fn register(
67 &mut self,
68 registry: &mio::Registry,
69 token: mio::Token,
70 interest: mio::Interest,
71 ) -> io::Result<()> {
72 SourceFd(&self.as_raw_fd()).register(registry, token, interest)
73 }
74
75 fn reregister(
76 &mut self,
77 registry: &mio::Registry,
78 token: mio::Token,
79 interest: mio::Interest,
80 ) -> io::Result<()> {
81 SourceFd(&self.as_raw_fd()).reregister(registry, token, interest)
82 }
83
84 fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> {
85 SourceFd(&self.as_raw_fd()).deregister(registry)
86 }
87}
88
89#[derive(Debug)]
90struct PidfdReaperInner<W>
91where
92 W: Unpin,
93{
94 inner: W,
95 pidfd: PollEvented<Pidfd>,
96}
97
98fn display_eq(d: impl std::fmt::Display, s: &str) -> bool {
99 use std::fmt::Write;
100
101 struct FormatEq<'r> {
102 remainder: &'r str,
103 unequal: bool,
104 }
105
106 impl<'r> Write for FormatEq<'r> {
107 fn write_str(&mut self, s: &str) -> std::fmt::Result {
108 if !self.unequal {
109 if let Some(new_remainder) = self.remainder.strip_prefix(s) {
110 self.remainder = new_remainder;
111 } else {
112 self.unequal = true;
113 }
114 }
115 Ok(())
116 }
117 }
118
119 let mut fmt_eq = FormatEq {
120 remainder: s,
121 unequal: false,
122 };
123 let _ = write!(fmt_eq, "{d}");
124 fmt_eq.remainder.is_empty() && !fmt_eq.unequal
125}
126
127#[allow(deprecated)]
128fn is_rt_shutdown_err(err: &io::Error) -> bool {
129 if let Some(inner) = err.get_ref() {
130 err.kind() == io::ErrorKind::Other
131 && inner.source().is_none()
132 && display_eq(inner, RUNTIME_SHUTTING_DOWN_ERROR)
133 } else {
134 false
135 }
136}
137
138impl<W> Future for PidfdReaperInner<W>
139where
140 W: Wait + Unpin,
141{
142 type Output = io::Result<ExitStatus>;
143
144 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
145 let this = Pin::into_inner(self);
146
147 match this.pidfd.registration().poll_read_ready(cx) {
148 Poll::Ready(Ok(evt)) => {
149 if let Some(exit_code) = this.inner.try_wait()? {
150 return Poll::Ready(Ok(exit_code));
151 }
152 this.pidfd.registration().clear_readiness(evt);
153 }
154 Poll::Ready(Err(err)) if is_rt_shutdown_err(&err) => {}
155 Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
156 Poll::Pending => return Poll::Pending,
157 };
158
159 this.pidfd.reregister(Interest::READABLE)?;
160 cx.waker().wake_by_ref();
161 Poll::Pending
162 }
163}
164
165#[derive(Debug)]
166pub(crate) struct PidfdReaper<W, Q>
167where
168 W: Wait + Unpin,
169 Q: OrphanQueue<W> + Unpin,
170{
171 inner: Option<PidfdReaperInner<W>>,
172 orphan_queue: Q,
173}
174
175impl<W, Q> Deref for PidfdReaper<W, Q>
176where
177 W: Wait + Unpin,
178 Q: OrphanQueue<W> + Unpin,
179{
180 type Target = W;
181
182 fn deref(&self) -> &Self::Target {
183 &self.inner.as_ref().expect("inner has gone away").inner
184 }
185}
186
187impl<W, Q> PidfdReaper<W, Q>
188where
189 W: Wait + Unpin,
190 Q: OrphanQueue<W> + Unpin,
191{
192 pub(crate) fn new(inner: W, orphan_queue: Q) -> Result<Self, (Option<io::Error>, W)> {
193 if let Some(pidfd) = Pidfd::open(inner.id()) {
194 match PollEvented::new_with_interest(pidfd, Interest::READABLE) {
195 Ok(pidfd) => Ok(Self {
196 inner: Some(PidfdReaperInner { pidfd, inner }),
197 orphan_queue,
198 }),
199 Err(io_error) => Err((Some(io_error), inner)),
200 }
201 } else {
202 Err((None, inner))
203 }
204 }
205
206 pub(crate) fn inner_mut(&mut self) -> &mut W {
207 &mut self.inner.as_mut().expect("inner has gone away").inner
208 }
209}
210
211impl<W, Q> Future for PidfdReaper<W, Q>
212where
213 W: Wait + Unpin,
214 Q: OrphanQueue<W> + Unpin,
215{
216 type Output = io::Result<ExitStatus>;
217
218 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
219 Pin::new(
220 Pin::into_inner(self)
221 .inner
222 .as_mut()
223 .expect("inner has gone away"),
224 )
225 .poll(cx)
226 }
227}
228
229impl<W, Q> Kill for PidfdReaper<W, Q>
230where
231 W: Wait + Unpin + Kill,
232 Q: OrphanQueue<W> + Unpin,
233{
234 fn kill(&mut self) -> io::Result<()> {
235 self.inner_mut().kill()
236 }
237}
238
239impl<W, Q> Drop for PidfdReaper<W, Q>
240where
241 W: Wait + Unpin,
242 Q: OrphanQueue<W> + Unpin,
243{
244 fn drop(&mut self) {
245 let mut orphan = self.inner.take().expect("inner has gone away").inner;
246 if let Ok(Some(_)) = orphan.try_wait() {
247 return;
248 }
249
250 self.orphan_queue.push_orphan(orphan);
251 }
252}
253
254#[cfg(all(test, not(loom), not(miri)))]
255mod test {
256 use super::*;
257 use crate::{
258 process::unix::orphan::test::MockQueue,
259 runtime::{Builder as RuntimeBuilder, Runtime},
260 };
261 use std::process::{Command, Output};
262
263 fn create_runtime() -> Runtime {
264 RuntimeBuilder::new_current_thread()
265 .enable_io()
266 .build()
267 .unwrap()
268 }
269
270 fn run_test(fut: impl Future<Output = ()>) {
271 create_runtime().block_on(fut)
272 }
273
274 fn is_pidfd_available() -> bool {
275 let Output { stdout, status, .. } = Command::new("uname").arg("-r").output().unwrap();
276 assert!(status.success());
277 let stdout = String::from_utf8_lossy(&stdout);
278
279 let mut kernel_version_iter = match stdout.split_once('-') {
280 Some((version, _)) => version,
281 _ => &stdout,
282 }
283 .split('.');
284
285 let major: u32 = kernel_version_iter.next().unwrap().parse().unwrap();
286 let minor: u32 = kernel_version_iter.next().unwrap().trim().parse().unwrap();
287
288 major >= 6 || (major == 5 && minor >= 10)
289 }
290
291 #[test]
292 fn test_pidfd_reaper_poll() {
293 if !is_pidfd_available() {
294 eprintln!("pidfd is not available on this linux kernel, skip this test");
295 return;
296 }
297
298 let queue = MockQueue::new();
299
300 run_test(async {
301 let child = Command::new("true").spawn().unwrap();
302 let pidfd_reaper = PidfdReaper::new(child, &queue).unwrap();
303
304 let exit_status = pidfd_reaper.await.unwrap();
305 assert!(exit_status.success());
306 });
307
308 assert!(queue.all_enqueued.borrow().is_empty());
309 }
310
311 #[test]
312 fn test_pidfd_reaper_kill() {
313 if !is_pidfd_available() {
314 eprintln!("pidfd is not available on this linux kernel, skip this test");
315 return;
316 }
317
318 let queue = MockQueue::new();
319
320 run_test(async {
321 let child = Command::new("sleep").arg("1800").spawn().unwrap();
322 let mut pidfd_reaper = PidfdReaper::new(child, &queue).unwrap();
323
324 pidfd_reaper.kill().unwrap();
325
326 let exit_status = pidfd_reaper.await.unwrap();
327 assert!(!exit_status.success());
328 });
329
330 assert!(queue.all_enqueued.borrow().is_empty());
331 }
332
333 #[test]
334 fn test_pidfd_reaper_drop() {
335 if !is_pidfd_available() {
336 eprintln!("pidfd is not available on this linux kernel, skip this test");
337 return;
338 }
339
340 let queue = MockQueue::new();
341
342 let mut child = Command::new("sleep").arg("1800").spawn().unwrap();
343
344 run_test(async {
345 let _pidfd_reaper = PidfdReaper::new(&mut child, &queue).unwrap();
346 });
347
348 assert_eq!(queue.all_enqueued.borrow().len(), 1);
349
350 child.kill().unwrap();
351 child.wait().unwrap();
352 }
353}