zbus/connection/socket/
tcp.rs

1#[cfg(not(feature = "tokio"))]
2use async_io::Async;
3use std::io;
4#[cfg(unix)]
5use std::os::fd::BorrowedFd;
6#[cfg(not(feature = "tokio"))]
7use std::{net::TcpStream, sync::Arc};
8
9use super::{ReadHalf, RecvmsgResult, WriteHalf};
10#[cfg(feature = "tokio")]
11use super::{Socket, Split};
12
13#[cfg(not(feature = "tokio"))]
14#[async_trait::async_trait]
15impl ReadHalf for Arc<Async<TcpStream>> {
16    async fn recvmsg(&mut self, buf: &mut [u8]) -> RecvmsgResult {
17        match futures_util::AsyncReadExt::read(&mut self.as_ref(), buf).await {
18            Err(e) => Err(e),
19            Ok(len) => {
20                #[cfg(unix)]
21                let ret = (len, vec![]);
22                #[cfg(not(unix))]
23                let ret = len;
24                Ok(ret)
25            }
26        }
27    }
28
29    #[cfg(windows)]
30    async fn peer_credentials(&mut self) -> io::Result<crate::fdo::ConnectionCredentials> {
31        let stream = self.clone();
32        crate::Task::spawn_blocking(
33            move || {
34                use crate::win32::{tcp_stream_get_peer_pid, ProcessToken};
35
36                let pid = tcp_stream_get_peer_pid(stream.get_ref())? as _;
37                let sid = ProcessToken::open(if pid != 0 { Some(pid as _) } else { None })
38                    .and_then(|process_token| process_token.sid())?;
39                io::Result::Ok(
40                    crate::fdo::ConnectionCredentials::default()
41                        .set_process_id(pid)
42                        .set_windows_sid(sid),
43                )
44            },
45            "peer credentials",
46        )
47        .await
48    }
49}
50
51#[cfg(not(feature = "tokio"))]
52#[async_trait::async_trait]
53impl WriteHalf for Arc<Async<TcpStream>> {
54    async fn sendmsg(
55        &mut self,
56        buf: &[u8],
57        #[cfg(unix)] fds: &[BorrowedFd<'_>],
58    ) -> io::Result<usize> {
59        #[cfg(unix)]
60        if !fds.is_empty() {
61            return Err(io::Error::new(
62                io::ErrorKind::InvalidInput,
63                "fds cannot be sent with a tcp stream",
64            ));
65        }
66
67        futures_util::AsyncWriteExt::write(&mut self.as_ref(), buf).await
68    }
69
70    async fn close(&mut self) -> io::Result<()> {
71        let stream = self.clone();
72        crate::Task::spawn_blocking(
73            move || stream.get_ref().shutdown(std::net::Shutdown::Both),
74            "close socket",
75        )
76        .await
77    }
78
79    async fn peer_credentials(&mut self) -> io::Result<crate::fdo::ConnectionCredentials> {
80        ReadHalf::peer_credentials(self).await
81    }
82}
83
84#[cfg(feature = "tokio")]
85impl Socket for tokio::net::TcpStream {
86    type ReadHalf = tokio::net::tcp::OwnedReadHalf;
87    type WriteHalf = tokio::net::tcp::OwnedWriteHalf;
88
89    fn split(self) -> Split<Self::ReadHalf, Self::WriteHalf> {
90        let (read, write) = self.into_split();
91
92        Split { read, write }
93    }
94}
95
96#[cfg(feature = "tokio")]
97#[async_trait::async_trait]
98impl ReadHalf for tokio::net::tcp::OwnedReadHalf {
99    async fn recvmsg(&mut self, buf: &mut [u8]) -> RecvmsgResult {
100        use tokio::io::{AsyncReadExt, ReadBuf};
101
102        let mut read_buf = ReadBuf::new(buf);
103        self.read_buf(&mut read_buf).await.map(|_| {
104            let ret = read_buf.filled().len();
105            #[cfg(unix)]
106            let ret = (ret, vec![]);
107
108            ret
109        })
110    }
111
112    #[cfg(windows)]
113    async fn peer_credentials(&mut self) -> io::Result<crate::fdo::ConnectionCredentials> {
114        let peer_addr = self.peer_addr()?.clone();
115        crate::Task::spawn_blocking(
116            move || win32_credentials_from_addr(&peer_addr),
117            "peer credentials",
118        )
119        .await
120    }
121}
122
123#[cfg(feature = "tokio")]
124#[async_trait::async_trait]
125impl WriteHalf for tokio::net::tcp::OwnedWriteHalf {
126    async fn sendmsg(
127        &mut self,
128        buf: &[u8],
129        #[cfg(unix)] fds: &[BorrowedFd<'_>],
130    ) -> io::Result<usize> {
131        use tokio::io::AsyncWriteExt;
132
133        #[cfg(unix)]
134        if !fds.is_empty() {
135            return Err(io::Error::new(
136                io::ErrorKind::InvalidInput,
137                "fds cannot be sent with a tcp stream",
138            ));
139        }
140
141        self.write(buf).await
142    }
143
144    async fn close(&mut self) -> io::Result<()> {
145        tokio::io::AsyncWriteExt::shutdown(self).await
146    }
147
148    #[cfg(windows)]
149    async fn peer_credentials(&mut self) -> io::Result<crate::fdo::ConnectionCredentials> {
150        let peer_addr = self.peer_addr()?.clone();
151        crate::Task::spawn_blocking(
152            move || win32_credentials_from_addr(&peer_addr),
153            "peer credentials",
154        )
155        .await
156    }
157}
158
159#[cfg(feature = "tokio")]
160#[cfg(windows)]
161fn win32_credentials_from_addr(
162    addr: &std::net::SocketAddr,
163) -> io::Result<crate::fdo::ConnectionCredentials> {
164    use crate::win32::{socket_addr_get_pid, ProcessToken};
165
166    let pid = socket_addr_get_pid(addr)? as _;
167    let sid = ProcessToken::open(if pid != 0 { Some(pid as _) } else { None })
168        .and_then(|process_token| process_token.sid())?;
169    Ok(crate::fdo::ConnectionCredentials::default()
170        .set_process_id(pid)
171        .set_windows_sid(sid))
172}