zbus/connection/socket/
tcp.rs
1#[cfg(not(feature = "tokio"))]
2use async_io::Async;
3use std::io;
4#[cfg(unix)]
5use std::os::fd::BorrowedFd;
6#[cfg(not(feature = "tokio"))]
7use std::{net::TcpStream, sync::Arc};
8
9use super::{ReadHalf, RecvmsgResult, WriteHalf};
10#[cfg(feature = "tokio")]
11use super::{Socket, Split};
12
13#[cfg(not(feature = "tokio"))]
14#[async_trait::async_trait]
15impl ReadHalf for Arc<Async<TcpStream>> {
16 async fn recvmsg(&mut self, buf: &mut [u8]) -> RecvmsgResult {
17 match futures_util::AsyncReadExt::read(&mut self.as_ref(), buf).await {
18 Err(e) => Err(e),
19 Ok(len) => {
20 #[cfg(unix)]
21 let ret = (len, vec![]);
22 #[cfg(not(unix))]
23 let ret = len;
24 Ok(ret)
25 }
26 }
27 }
28
29 #[cfg(windows)]
30 async fn peer_credentials(&mut self) -> io::Result<crate::fdo::ConnectionCredentials> {
31 let stream = self.clone();
32 crate::Task::spawn_blocking(
33 move || {
34 use crate::win32::{tcp_stream_get_peer_pid, ProcessToken};
35
36 let pid = tcp_stream_get_peer_pid(stream.get_ref())? as _;
37 let sid = ProcessToken::open(if pid != 0 { Some(pid as _) } else { None })
38 .and_then(|process_token| process_token.sid())?;
39 io::Result::Ok(
40 crate::fdo::ConnectionCredentials::default()
41 .set_process_id(pid)
42 .set_windows_sid(sid),
43 )
44 },
45 "peer credentials",
46 )
47 .await
48 }
49}
50
51#[cfg(not(feature = "tokio"))]
52#[async_trait::async_trait]
53impl WriteHalf for Arc<Async<TcpStream>> {
54 async fn sendmsg(
55 &mut self,
56 buf: &[u8],
57 #[cfg(unix)] fds: &[BorrowedFd<'_>],
58 ) -> io::Result<usize> {
59 #[cfg(unix)]
60 if !fds.is_empty() {
61 return Err(io::Error::new(
62 io::ErrorKind::InvalidInput,
63 "fds cannot be sent with a tcp stream",
64 ));
65 }
66
67 futures_util::AsyncWriteExt::write(&mut self.as_ref(), buf).await
68 }
69
70 async fn close(&mut self) -> io::Result<()> {
71 let stream = self.clone();
72 crate::Task::spawn_blocking(
73 move || stream.get_ref().shutdown(std::net::Shutdown::Both),
74 "close socket",
75 )
76 .await
77 }
78
79 async fn peer_credentials(&mut self) -> io::Result<crate::fdo::ConnectionCredentials> {
80 ReadHalf::peer_credentials(self).await
81 }
82}
83
84#[cfg(feature = "tokio")]
85impl Socket for tokio::net::TcpStream {
86 type ReadHalf = tokio::net::tcp::OwnedReadHalf;
87 type WriteHalf = tokio::net::tcp::OwnedWriteHalf;
88
89 fn split(self) -> Split<Self::ReadHalf, Self::WriteHalf> {
90 let (read, write) = self.into_split();
91
92 Split { read, write }
93 }
94}
95
96#[cfg(feature = "tokio")]
97#[async_trait::async_trait]
98impl ReadHalf for tokio::net::tcp::OwnedReadHalf {
99 async fn recvmsg(&mut self, buf: &mut [u8]) -> RecvmsgResult {
100 use tokio::io::{AsyncReadExt, ReadBuf};
101
102 let mut read_buf = ReadBuf::new(buf);
103 self.read_buf(&mut read_buf).await.map(|_| {
104 let ret = read_buf.filled().len();
105 #[cfg(unix)]
106 let ret = (ret, vec![]);
107
108 ret
109 })
110 }
111
112 #[cfg(windows)]
113 async fn peer_credentials(&mut self) -> io::Result<crate::fdo::ConnectionCredentials> {
114 let peer_addr = self.peer_addr()?.clone();
115 crate::Task::spawn_blocking(
116 move || win32_credentials_from_addr(&peer_addr),
117 "peer credentials",
118 )
119 .await
120 }
121}
122
123#[cfg(feature = "tokio")]
124#[async_trait::async_trait]
125impl WriteHalf for tokio::net::tcp::OwnedWriteHalf {
126 async fn sendmsg(
127 &mut self,
128 buf: &[u8],
129 #[cfg(unix)] fds: &[BorrowedFd<'_>],
130 ) -> io::Result<usize> {
131 use tokio::io::AsyncWriteExt;
132
133 #[cfg(unix)]
134 if !fds.is_empty() {
135 return Err(io::Error::new(
136 io::ErrorKind::InvalidInput,
137 "fds cannot be sent with a tcp stream",
138 ));
139 }
140
141 self.write(buf).await
142 }
143
144 async fn close(&mut self) -> io::Result<()> {
145 tokio::io::AsyncWriteExt::shutdown(self).await
146 }
147
148 #[cfg(windows)]
149 async fn peer_credentials(&mut self) -> io::Result<crate::fdo::ConnectionCredentials> {
150 let peer_addr = self.peer_addr()?.clone();
151 crate::Task::spawn_blocking(
152 move || win32_credentials_from_addr(&peer_addr),
153 "peer credentials",
154 )
155 .await
156 }
157}
158
159#[cfg(feature = "tokio")]
160#[cfg(windows)]
161fn win32_credentials_from_addr(
162 addr: &std::net::SocketAddr,
163) -> io::Result<crate::fdo::ConnectionCredentials> {
164 use crate::win32::{socket_addr_get_pid, ProcessToken};
165
166 let pid = socket_addr_get_pid(addr)? as _;
167 let sid = ProcessToken::open(if pid != 0 { Some(pid as _) } else { None })
168 .and_then(|process_token| process_token.sid())?;
169 Ok(crate::fdo::ConnectionCredentials::default()
170 .set_process_id(pid)
171 .set_windows_sid(sid))
172}