1use log::debug;
2use std::io::{self, BufRead, BufReader, Read, Write};
3use std::net::SocketAddr;
4use std::net::TcpStream;
5use std::ops::Div;
6use std::time::Duration;
7use std::time::Instant;
8use std::{fmt, io::Cursor};
9
10#[cfg(feature = "socks-proxy")]
11use socks::{TargetAddr, ToTargetAddr};
12
13use crate::chunked::Decoder as ChunkDecoder;
14use crate::error::ErrorKind;
15use crate::pool::{PoolKey, PoolReturner};
16use crate::proxy::Proxy;
17use crate::unit::Unit;
18use crate::Response;
19use crate::{error::Error, proxy::Proto};
20
21pub trait ReadWrite: Read + Write + Send + Sync + fmt::Debug + 'static {
23 fn socket(&self) -> Option<&TcpStream>;
24}
25
26impl ReadWrite for TcpStream {
27 fn socket(&self) -> Option<&TcpStream> {
28 Some(self)
29 }
30}
31
32pub trait TlsConnector: Send + Sync {
33 fn connect(
34 &self,
35 dns_name: &str,
36 io: Box<dyn ReadWrite>,
37 ) -> Result<Box<dyn ReadWrite>, crate::error::Error>;
38}
39
40pub(crate) struct Stream {
41 inner: BufReader<Box<dyn ReadWrite>>,
42 pub(crate) remote_addr: SocketAddr,
44 pool_returner: PoolReturner,
45}
46
47impl<T: ReadWrite + ?Sized> ReadWrite for Box<T> {
48 fn socket(&self) -> Option<&TcpStream> {
49 ReadWrite::socket(self.as_ref())
50 }
51}
52
53pub(crate) struct DeadlineStream {
59 stream: Stream,
60 deadline: Option<Instant>,
61}
62
63impl DeadlineStream {
64 pub(crate) fn new(stream: Stream, deadline: Option<Instant>) -> Self {
65 DeadlineStream { stream, deadline }
66 }
67
68 pub(crate) fn inner_ref(&self) -> &Stream {
69 &self.stream
70 }
71
72 pub(crate) fn inner_mut(&mut self) -> &mut Stream {
73 &mut self.stream
74 }
75}
76
77impl From<DeadlineStream> for Stream {
78 fn from(deadline_stream: DeadlineStream) -> Stream {
79 deadline_stream.stream
80 }
81}
82
83impl BufRead for DeadlineStream {
84 fn fill_buf(&mut self) -> io::Result<&[u8]> {
85 if let Some(deadline) = self.deadline {
86 let timeout = time_until_deadline(deadline)?;
87 if let Some(socket) = self.stream.socket() {
88 socket.set_read_timeout(Some(timeout))?;
89 socket.set_write_timeout(Some(timeout))?;
90 }
91 }
92 self.stream.fill_buf().map_err(|e| {
93 if e.kind() == io::ErrorKind::WouldBlock {
98 return io_err_timeout("timed out reading response".to_string());
99 }
100 e
101 })
102 }
103
104 fn consume(&mut self, amt: usize) {
105 self.stream.consume(amt)
106 }
107}
108
109impl Read for DeadlineStream {
110 #[allow(clippy::unused_io_amount)]
111 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
112 if !self.stream.inner.buffer().is_empty() {
117 let n = self.stream.inner.buffer().read(buf)?;
118 self.stream.inner.consume(n);
119 return Ok(n);
120 }
121 let nread = {
126 let mut rem = self.fill_buf()?;
127 rem.read(buf)?
128 };
129 self.consume(nread);
130 Ok(nread)
131 }
132}
133
134fn time_until_deadline(deadline: Instant) -> io::Result<Duration> {
137 let now = Instant::now();
138 match deadline.checked_duration_since(now) {
139 None => Err(io_err_timeout("timed out reading response".to_string())),
140 Some(duration) => Ok(duration),
141 }
142}
143
144pub(crate) fn io_err_timeout(error: String) -> io::Error {
145 io::Error::new(io::ErrorKind::TimedOut, error)
146}
147
148#[derive(Debug)]
149pub(crate) struct ReadOnlyStream(Cursor<Vec<u8>>);
150
151impl ReadOnlyStream {
152 pub(crate) fn new(v: Vec<u8>) -> Self {
153 Self(Cursor::new(v))
154 }
155}
156
157impl Read for ReadOnlyStream {
158 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
159 self.0.read(buf)
160 }
161}
162
163impl std::io::Write for ReadOnlyStream {
164 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
165 Ok(buf.len())
166 }
167
168 fn flush(&mut self) -> io::Result<()> {
169 Ok(())
170 }
171}
172
173impl ReadWrite for ReadOnlyStream {
174 fn socket(&self) -> Option<&std::net::TcpStream> {
175 None
176 }
177}
178
179impl fmt::Debug for Stream {
180 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
181 match self.inner.get_ref().socket() {
182 Some(_) => write!(f, "Stream({:?})", self.inner.get_ref()),
183 None => write!(f, "Stream(Test)"),
184 }
185 }
186}
187
188impl Stream {
189 pub(crate) fn new(
190 t: impl ReadWrite,
191 remote_addr: SocketAddr,
192 pool_returner: PoolReturner,
193 ) -> Stream {
194 Stream::logged_create(Stream {
195 inner: BufReader::new(Box::new(t)),
196 remote_addr,
197 pool_returner,
198 })
199 }
200
201 fn logged_create(stream: Stream) -> Stream {
202 debug!("created stream: {:?}", stream);
203 stream
204 }
205
206 pub(crate) fn buffer(&self) -> &[u8] {
207 self.inner.buffer()
208 }
209
210 fn serverclosed_stream(stream: &std::net::TcpStream) -> io::Result<bool> {
224 let mut buf = [0; 1];
225 stream.set_nonblocking(true)?;
226
227 let result = match stream.peek(&mut buf) {
228 Ok(n) => {
229 debug!(
230 "peek on reused connection returned {}, not WouldBlock; discarding",
231 n
232 );
233 Ok(true)
234 }
235 Err(e) if e.kind() == io::ErrorKind::WouldBlock => Ok(false),
236 Err(e) => Err(e),
237 };
238 stream.set_nonblocking(false)?;
239
240 result
241 }
242 pub(crate) fn server_closed(&self) -> io::Result<bool> {
244 match self.socket() {
245 Some(socket) => Stream::serverclosed_stream(socket),
246 None => Ok(false),
247 }
248 }
249
250 pub(crate) fn set_unpoolable(&mut self) {
251 self.pool_returner = PoolReturner::none();
252 }
253
254 pub(crate) fn return_to_pool(mut self) -> io::Result<()> {
255 self.reset()?;
257 self.pool_returner.clone().return_to_pool(self);
258 Ok(())
259 }
260
261 pub(crate) fn reset(&mut self) -> io::Result<()> {
262 if let Some(socket) = self.socket() {
265 socket.set_read_timeout(None)?;
266 socket.set_write_timeout(None)?;
267 }
268
269 Ok(())
270 }
271
272 pub(crate) fn socket(&self) -> Option<&TcpStream> {
273 self.inner.get_ref().socket()
274 }
275
276 pub(crate) fn set_read_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
277 if let Some(socket) = self.socket() {
278 socket.set_read_timeout(timeout)
279 } else {
280 Ok(())
281 }
282 }
283}
284
285impl Read for Stream {
286 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
287 self.inner.read(buf)
288 }
289}
290
291impl BufRead for Stream {
292 fn fill_buf(&mut self) -> io::Result<&[u8]> {
293 self.inner.fill_buf()
294 }
295
296 fn consume(&mut self, amt: usize) {
297 self.inner.consume(amt)
298 }
299}
300
301impl<R: Read> From<ChunkDecoder<R>> for Stream
302where
303 R: Read,
304 Stream: From<R>,
305{
306 fn from(chunk_decoder: ChunkDecoder<R>) -> Stream {
307 chunk_decoder.into_inner().into()
308 }
309}
310
311impl Write for Stream {
312 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
313 self.inner.get_mut().write(buf)
314 }
315 fn flush(&mut self) -> io::Result<()> {
316 self.inner.get_mut().flush()
317 }
318}
319
320impl Drop for Stream {
321 fn drop(&mut self) {
322 debug!("dropping stream: {:?}", self);
323 }
324}
325
326pub(crate) fn connect_http(unit: &Unit, hostname: &str) -> Result<Stream, Error> {
327 let port = unit.url.port().unwrap_or(80);
329 let pool_key = PoolKey::from_parts("http", hostname, port);
330 let pool_returner = PoolReturner::new(&unit.agent, pool_key);
331 connect_host(unit, hostname, port).map(|(t, r)| Stream::new(t, r, pool_returner))
332}
333
334pub(crate) fn connect_https(unit: &Unit, hostname: &str) -> Result<Stream, Error> {
335 let port = unit.url.port().unwrap_or(443);
336
337 let (sock, remote_addr) = connect_host(unit, hostname, port)?;
338
339 let tls_conf = &unit.agent.config.tls_config;
340 let https_stream = tls_conf.connect(hostname, Box::new(sock))?;
341 let pool_key = PoolKey::from_parts("https", hostname, port);
342 let pool_returner = PoolReturner::new(&unit.agent, pool_key);
343 Ok(Stream::new(https_stream, remote_addr, pool_returner))
344}
345
346pub(crate) fn connect_host(
348 unit: &Unit,
349 hostname: &str,
350 port: u16,
351) -> Result<(TcpStream, SocketAddr), Error> {
352 let connect_deadline: Option<Instant> =
353 if let Some(timeout_connect) = unit.agent.config.timeout_connect {
354 Instant::now().checked_add(timeout_connect)
355 } else {
356 unit.deadline
357 };
358 let proxy: Option<Proxy> = unit.agent.config.proxy.clone();
359 let netloc = match proxy {
360 Some(ref proxy) => format!("{}:{}", proxy.server, proxy.port),
361 None => format!("{}:{}", hostname, port),
362 };
363
364 let sock_addrs = unit.resolver().resolve(&netloc).map_err(|e| {
366 ErrorKind::Dns
367 .msg(format!("resolve dns name '{}'", netloc))
368 .src(e)
369 })?;
370
371 if sock_addrs.is_empty() {
372 return Err(ErrorKind::Dns.msg(format!("No ip address for {}", hostname)));
373 }
374
375 let proto = proxy.as_ref().map(|proxy| proxy.proto);
376
377 let mut any_err = None;
378 let mut any_stream_and_addr = None;
379 let multiple_addrs = sock_addrs.len() > 1;
381
382 for sock_addr in sock_addrs {
383 let timeout = match connect_deadline {
385 Some(deadline) => {
386 let mut deadline = time_until_deadline(deadline)?;
387 if multiple_addrs {
388 deadline = deadline.div(2);
389 }
390 Some(deadline)
391 }
392 None => None,
393 };
394
395 debug!("connecting to {} at {}", netloc, &sock_addr);
396
397 #[allow(clippy::unnecessary_unwrap)]
399 let stream = if proto.is_some() && Some(Proto::HTTP) != proto {
400 connect_socks(
401 unit,
402 proxy.clone().unwrap(),
403 connect_deadline,
404 sock_addr,
405 hostname,
406 port,
407 proto.unwrap(),
408 )
409 } else if let Some(timeout) = timeout {
410 TcpStream::connect_timeout(&sock_addr, timeout)
411 } else {
412 TcpStream::connect(sock_addr)
413 };
414
415 if let Ok(stream) = stream {
416 any_stream_and_addr = Some((stream, sock_addr));
417 break;
418 } else if let Err(err) = stream {
419 any_err = Some(err);
420 }
421 }
422
423 let (mut stream, remote_addr) = if let Some(stream_and_addr) = any_stream_and_addr {
424 stream_and_addr
425 } else if let Some(e) = any_err {
426 return Err(ErrorKind::ConnectionFailed.msg("Connect error").src(e));
427 } else {
428 panic!("shouldn't happen: failed to connect to all IPs, but no error");
429 };
430
431 stream.set_nodelay(unit.agent.config.no_delay)?;
432
433 if let Some(deadline) = unit.deadline {
434 stream.set_read_timeout(Some(time_until_deadline(deadline)?))?;
435 } else {
436 stream.set_read_timeout(unit.agent.config.timeout_read)?;
437 }
438
439 if let Some(deadline) = unit.deadline {
440 stream.set_write_timeout(Some(time_until_deadline(deadline)?))?;
441 } else {
442 stream.set_write_timeout(unit.agent.config.timeout_write)?;
443 }
444
445 if proto == Some(Proto::HTTP) && unit.url.scheme() == "https" {
446 if let Some(ref proxy) = proxy {
447 write!(
448 stream,
449 "{}",
450 proxy.connect(hostname, port, &unit.agent.config.user_agent)
451 )
452 .unwrap();
453 stream.flush()?;
454
455 let s = stream.try_clone()?;
456 let pool_key = PoolKey::from_parts(unit.url.scheme(), hostname, port);
457 let pool_returner = PoolReturner::new(&unit.agent, pool_key);
458 let s = Stream::new(s, remote_addr, pool_returner);
459 let response = Response::do_from_stream(s, unit.clone())?;
460 Proxy::verify_response(&response)?;
461 }
462 }
463
464 Ok((stream, remote_addr))
465}
466
467#[cfg(feature = "socks-proxy")]
468fn socks_local_nslookup(
469 unit: &Unit,
470 hostname: &str,
471 port: u16,
472) -> Result<TargetAddr, std::io::Error> {
473 let addrs: Vec<SocketAddr> = unit
474 .resolver()
475 .resolve(&format!("{}:{}", hostname, port))
476 .map_err(|e| {
477 std::io::Error::new(io::ErrorKind::NotFound, format!("DNS failure: {}.", e))
478 })?;
479
480 if addrs.is_empty() {
481 return Err(std::io::Error::new(
482 io::ErrorKind::NotFound,
483 "DNS failure: no socket addrs found.",
484 ));
485 }
486
487 match addrs[0].to_target_addr() {
488 Ok(addr) => Ok(addr),
489 Err(err) => {
490 return Err(std::io::Error::new(
491 io::ErrorKind::NotFound,
492 format!("DNS failure: {}.", err),
493 ))
494 }
495 }
496}
497
498#[cfg(feature = "socks-proxy")]
499fn connect_socks(
500 unit: &Unit,
501 proxy: Proxy,
502 deadline: Option<Instant>,
503 proxy_addr: SocketAddr,
504 host: &str,
505 port: u16,
506 proto: Proto,
507) -> Result<TcpStream, std::io::Error> {
508 use socks::TargetAddr::Domain;
509 use std::net::{Ipv4Addr, Ipv6Addr};
510 use std::str::FromStr;
511
512 let host_addr = if Ipv4Addr::from_str(host).is_ok()
513 || Ipv6Addr::from_str(host).is_ok()
514 || proto == Proto::SOCKS4
515 {
516 match socks_local_nslookup(unit, host, port) {
517 Ok(addr) => addr,
518 Err(err) => return Err(err),
519 }
520 } else {
521 Domain(String::from(host), port)
522 };
523
524 #[allow(clippy::mutex_atomic)]
537 let stream = if let Some(deadline) = deadline {
538 use std::sync::mpsc::channel;
539 use std::sync::{Arc, Condvar, Mutex};
540 use std::thread;
541 let master_signal = Arc::new((Mutex::new(false), Condvar::new()));
542 let slave_signal = master_signal.clone();
543 let (tx, rx) = channel();
544 thread::spawn(move || {
545 let (lock, cvar) = &*slave_signal;
546 if tx .send(if proto == Proto::SOCKS5 {
548 get_socks5_stream(&proxy, &proxy_addr, host_addr)
549 } else {
550 get_socks4_stream(&proxy_addr, host_addr)
551 })
552 .is_ok()
553 {
554 let mut done = lock.lock().unwrap();
556 *done = true;
558 cvar.notify_one();
560 }
561 });
562
563 let (lock, cvar) = &*master_signal;
564 let done = lock.lock().unwrap();
565
566 let timeout_connect = time_until_deadline(deadline)?;
567 let done_result = cvar.wait_timeout(done, timeout_connect).unwrap();
568 let done = done_result.0;
569 if *done {
570 rx.recv().unwrap()?
571 } else {
572 return Err(io_err_timeout(format!(
573 "SOCKS proxy: {}:{} timed out connecting after {}ms.",
574 host,
575 port,
576 timeout_connect.as_millis()
577 )));
578 }
579 } else if proto == Proto::SOCKS5 {
580 get_socks5_stream(&proxy, &proxy_addr, host_addr)?
581 } else {
582 get_socks4_stream(&proxy_addr, host_addr)?
583 };
584
585 Ok(stream)
586}
587
588#[cfg(feature = "socks-proxy")]
589fn get_socks5_stream(
590 proxy: &Proxy,
591 proxy_addr: &SocketAddr,
592 host_addr: TargetAddr,
593) -> Result<TcpStream, std::io::Error> {
594 use socks::Socks5Stream;
595 if proxy.use_authorization() {
596 let stream = Socks5Stream::connect_with_password(
597 proxy_addr,
598 host_addr,
599 proxy.user.as_ref().unwrap(),
600 proxy.password.as_ref().unwrap(),
601 )?
602 .into_inner();
603 Ok(stream)
604 } else {
605 match Socks5Stream::connect(proxy_addr, host_addr) {
606 Ok(socks_stream) => Ok(socks_stream.into_inner()),
607 Err(err) => Err(err),
608 }
609 }
610}
611
612#[cfg(feature = "socks-proxy")]
613fn get_socks4_stream(
614 proxy_addr: &SocketAddr,
615 host_addr: TargetAddr,
616) -> Result<TcpStream, std::io::Error> {
617 match socks::Socks4Stream::connect(proxy_addr, host_addr, "") {
618 Ok(socks_stream) => Ok(socks_stream.into_inner()),
619 Err(err) => Err(err),
620 }
621}
622
623#[cfg(not(feature = "socks-proxy"))]
624fn connect_socks(
625 _unit: &Unit,
626 _proxy: Proxy,
627 _deadline: Option<Instant>,
628 _proxy_addr: SocketAddr,
629 _hostname: &str,
630 _port: u16,
631 _proto: Proto,
632) -> Result<TcpStream, std::io::Error> {
633 Err(std::io::Error::new(
634 io::ErrorKind::Other,
635 "SOCKS feature disabled.",
636 ))
637}
638
639#[cfg(test)]
640pub(crate) fn connect_test(unit: &Unit) -> Result<Stream, Error> {
641 use crate::test;
642 test::resolve_handler(unit)
643}
644
645#[cfg(not(test))]
646pub(crate) fn connect_test(unit: &Unit) -> Result<Stream, Error> {
647 Err(ErrorKind::UnknownScheme.msg(format!("unknown scheme '{}'", unit.url.scheme())))
648}
649
650#[cfg(test)]
651pub(crate) fn remote_addr_for_test() -> SocketAddr {
652 use std::net::{Ipv4Addr, SocketAddrV4};
653 SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0).into()
654}
655
656#[cfg(test)]
657mod tests {
658 use super::*;
659 use std::{
660 io::Read,
661 sync::{Arc, Mutex},
662 };
663
664 struct ReadRecorder {
666 reads: Arc<Mutex<Vec<usize>>>,
667 }
668
669 impl Read for ReadRecorder {
670 fn read(&mut self, buf: &mut [u8]) -> std::result::Result<usize, std::io::Error> {
671 self.reads.lock().unwrap().push(buf.len());
672 buf.fill(0);
673 Ok(buf.len())
674 }
675 }
676
677 impl Write for ReadRecorder {
678 fn write(&mut self, _: &[u8]) -> io::Result<usize> {
679 unimplemented!()
680 }
681
682 fn flush(&mut self) -> io::Result<()> {
683 unimplemented!()
684 }
685 }
686
687 impl fmt::Debug for ReadRecorder {
688 fn fmt(&self, _: &mut fmt::Formatter<'_>) -> fmt::Result {
689 unimplemented!()
690 }
691 }
692
693 impl ReadWrite for ReadRecorder {
694 fn socket(&self) -> Option<&TcpStream> {
695 unimplemented!()
696 }
697 }
698
699 #[test]
702 fn test_deadline_stream_buffering() {
703 let reads = Arc::new(Mutex::new(vec![]));
704 let recorder = ReadRecorder {
705 reads: reads.clone(),
706 };
707 let stream = Stream::new(recorder, remote_addr_for_test(), PoolReturner::none());
708 let mut deadline_stream = DeadlineStream::new(stream, None);
709 let mut buf = [0u8; 1];
710 for _ in 0..8193 {
711 let _ = deadline_stream.read(&mut buf).unwrap();
712 }
713 let reads = reads.lock().unwrap();
714 assert_eq!(reads.len(), 2);
715 assert_eq!(reads[0], 8192);
716 assert_eq!(reads[1], 8192);
717 }
718}