zbus/connection/socket/
vsock.rs
1#[cfg(feature = "tokio-vsock")]
2use super::{Socket, Split};
3
4#[cfg(all(feature = "vsock", not(feature = "tokio")))]
5#[async_trait::async_trait]
6impl super::ReadHalf for std::sync::Arc<async_io::Async<vsock::VsockStream>> {
7 async fn recvmsg(&mut self, buf: &mut [u8]) -> super::RecvmsgResult {
8 match futures_util::AsyncReadExt::read(&mut self.as_ref(), buf).await {
9 Err(e) => Err(e),
10 Ok(len) => {
11 #[cfg(unix)]
12 let ret = (len, vec![]);
13 #[cfg(not(unix))]
14 let ret = len;
15 Ok(ret)
16 }
17 }
18 }
19}
20
21#[cfg(all(feature = "vsock", not(feature = "tokio")))]
22#[async_trait::async_trait]
23impl super::WriteHalf for std::sync::Arc<async_io::Async<vsock::VsockStream>> {
24 async fn sendmsg(
25 &mut self,
26 buf: &[u8],
27 #[cfg(unix)] fds: &[std::os::fd::BorrowedFd<'_>],
28 ) -> std::io::Result<usize> {
29 use std::io;
30
31 #[cfg(unix)]
32 if !fds.is_empty() {
33 return Err(io::Error::new(
34 io::ErrorKind::InvalidInput,
35 "fds cannot be sent with a vsock stream",
36 ));
37 }
38
39 futures_util::AsyncWriteExt::write(&mut self.as_ref(), buf).await
40 }
41
42 async fn close(&mut self) -> std::io::Result<()> {
43 let stream = self.clone();
44 crate::Task::spawn_blocking(
45 move || stream.get_ref().shutdown(std::net::Shutdown::Both),
46 "close socket",
47 )
48 .await
49 }
50}
51
52#[cfg(feature = "tokio-vsock")]
53impl Socket for tokio_vsock::VsockStream {
54 type ReadHalf = tokio_vsock::ReadHalf;
55 type WriteHalf = tokio_vsock::WriteHalf;
56
57 fn split(self) -> Split<Self::ReadHalf, Self::WriteHalf> {
58 let (read, write) = self.split();
59
60 Split { read, write }
61 }
62}
63
64#[cfg(feature = "tokio-vsock")]
65#[async_trait::async_trait]
66impl super::ReadHalf for tokio_vsock::ReadHalf {
67 async fn recvmsg(&mut self, buf: &mut [u8]) -> super::RecvmsgResult {
68 use tokio::io::{AsyncReadExt, ReadBuf};
69
70 let mut read_buf = ReadBuf::new(buf);
71 self.read_buf(&mut read_buf).await.map(|_| {
72 let ret = read_buf.filled().len();
73 #[cfg(unix)]
74 let ret = (ret, vec![]);
75
76 ret
77 })
78 }
79}
80
81#[cfg(feature = "tokio-vsock")]
82#[async_trait::async_trait]
83impl super::WriteHalf for tokio_vsock::WriteHalf {
84 async fn sendmsg(
85 &mut self,
86 buf: &[u8],
87 #[cfg(unix)] fds: &[std::os::fd::BorrowedFd<'_>],
88 ) -> std::io::Result<usize> {
89 use std::io;
90 use tokio::io::AsyncWriteExt;
91
92 #[cfg(unix)]
93 if !fds.is_empty() {
94 return Err(io::Error::new(
95 io::ErrorKind::InvalidInput,
96 "fds cannot be sent with a vsock stream",
97 ));
98 }
99
100 self.write(buf).await
101 }
102
103 async fn close(&mut self) -> std::io::Result<()> {
104 tokio::io::AsyncWriteExt::shutdown(self).await
105 }
106}