zbus/connection/socket/
vsock.rs

1#[cfg(feature = "tokio-vsock")]
2use super::{Socket, Split};
3
4#[cfg(all(feature = "vsock", not(feature = "tokio")))]
5#[async_trait::async_trait]
6impl super::ReadHalf for std::sync::Arc<async_io::Async<vsock::VsockStream>> {
7    async fn recvmsg(&mut self, buf: &mut [u8]) -> super::RecvmsgResult {
8        match futures_util::AsyncReadExt::read(&mut self.as_ref(), buf).await {
9            Err(e) => Err(e),
10            Ok(len) => {
11                #[cfg(unix)]
12                let ret = (len, vec![]);
13                #[cfg(not(unix))]
14                let ret = len;
15                Ok(ret)
16            }
17        }
18    }
19}
20
21#[cfg(all(feature = "vsock", not(feature = "tokio")))]
22#[async_trait::async_trait]
23impl super::WriteHalf for std::sync::Arc<async_io::Async<vsock::VsockStream>> {
24    async fn sendmsg(
25        &mut self,
26        buf: &[u8],
27        #[cfg(unix)] fds: &[std::os::fd::BorrowedFd<'_>],
28    ) -> std::io::Result<usize> {
29        use std::io;
30
31        #[cfg(unix)]
32        if !fds.is_empty() {
33            return Err(io::Error::new(
34                io::ErrorKind::InvalidInput,
35                "fds cannot be sent with a vsock stream",
36            ));
37        }
38
39        futures_util::AsyncWriteExt::write(&mut self.as_ref(), buf).await
40    }
41
42    async fn close(&mut self) -> std::io::Result<()> {
43        let stream = self.clone();
44        crate::Task::spawn_blocking(
45            move || stream.get_ref().shutdown(std::net::Shutdown::Both),
46            "close socket",
47        )
48        .await
49    }
50}
51
52#[cfg(feature = "tokio-vsock")]
53impl Socket for tokio_vsock::VsockStream {
54    type ReadHalf = tokio_vsock::ReadHalf;
55    type WriteHalf = tokio_vsock::WriteHalf;
56
57    fn split(self) -> Split<Self::ReadHalf, Self::WriteHalf> {
58        let (read, write) = self.split();
59
60        Split { read, write }
61    }
62}
63
64#[cfg(feature = "tokio-vsock")]
65#[async_trait::async_trait]
66impl super::ReadHalf for tokio_vsock::ReadHalf {
67    async fn recvmsg(&mut self, buf: &mut [u8]) -> super::RecvmsgResult {
68        use tokio::io::{AsyncReadExt, ReadBuf};
69
70        let mut read_buf = ReadBuf::new(buf);
71        self.read_buf(&mut read_buf).await.map(|_| {
72            let ret = read_buf.filled().len();
73            #[cfg(unix)]
74            let ret = (ret, vec![]);
75
76            ret
77        })
78    }
79}
80
81#[cfg(feature = "tokio-vsock")]
82#[async_trait::async_trait]
83impl super::WriteHalf for tokio_vsock::WriteHalf {
84    async fn sendmsg(
85        &mut self,
86        buf: &[u8],
87        #[cfg(unix)] fds: &[std::os::fd::BorrowedFd<'_>],
88    ) -> std::io::Result<usize> {
89        use std::io;
90        use tokio::io::AsyncWriteExt;
91
92        #[cfg(unix)]
93        if !fds.is_empty() {
94            return Err(io::Error::new(
95                io::ErrorKind::InvalidInput,
96                "fds cannot be sent with a vsock stream",
97            ));
98        }
99
100        self.write(buf).await
101    }
102
103    async fn close(&mut self) -> std::io::Result<()> {
104        tokio::io::AsyncWriteExt::shutdown(self).await
105    }
106}