ureq/
stream.rs

1use log::debug;
2use std::io::{self, BufRead, BufReader, Read, Write};
3use std::net::SocketAddr;
4use std::net::TcpStream;
5use std::ops::Div;
6use std::time::Duration;
7use std::time::Instant;
8use std::{fmt, io::Cursor};
9
10#[cfg(feature = "socks-proxy")]
11use socks::{TargetAddr, ToTargetAddr};
12
13use crate::chunked::Decoder as ChunkDecoder;
14use crate::error::ErrorKind;
15use crate::pool::{PoolKey, PoolReturner};
16use crate::proxy::Proxy;
17use crate::unit::Unit;
18use crate::Response;
19use crate::{error::Error, proxy::Proto};
20
21/// Trait for things implementing [std::io::Read] + [std::io::Write]. Used in [TlsConnector].
22pub trait ReadWrite: Read + Write + Send + Sync + fmt::Debug + 'static {
23    fn socket(&self) -> Option<&TcpStream>;
24}
25
26impl ReadWrite for TcpStream {
27    fn socket(&self) -> Option<&TcpStream> {
28        Some(self)
29    }
30}
31
32pub trait TlsConnector: Send + Sync {
33    fn connect(
34        &self,
35        dns_name: &str,
36        io: Box<dyn ReadWrite>,
37    ) -> Result<Box<dyn ReadWrite>, crate::error::Error>;
38}
39
40pub(crate) struct Stream {
41    inner: BufReader<Box<dyn ReadWrite>>,
42    /// The remote address the stream is connected to.
43    pub(crate) remote_addr: SocketAddr,
44    pool_returner: PoolReturner,
45}
46
47impl<T: ReadWrite + ?Sized> ReadWrite for Box<T> {
48    fn socket(&self) -> Option<&TcpStream> {
49        ReadWrite::socket(self.as_ref())
50    }
51}
52
53// DeadlineStream wraps a stream such that read() will return an error
54// after the provided deadline, and sets timeouts on the underlying
55// TcpStream to ensure read() doesn't block beyond the deadline.
56// When the From trait is used to turn a DeadlineStream back into a
57// Stream (by PoolReturnRead), the timeouts are removed.
58pub(crate) struct DeadlineStream {
59    stream: Stream,
60    deadline: Option<Instant>,
61}
62
63impl DeadlineStream {
64    pub(crate) fn new(stream: Stream, deadline: Option<Instant>) -> Self {
65        DeadlineStream { stream, deadline }
66    }
67
68    pub(crate) fn inner_ref(&self) -> &Stream {
69        &self.stream
70    }
71
72    pub(crate) fn inner_mut(&mut self) -> &mut Stream {
73        &mut self.stream
74    }
75}
76
77impl From<DeadlineStream> for Stream {
78    fn from(deadline_stream: DeadlineStream) -> Stream {
79        deadline_stream.stream
80    }
81}
82
83impl BufRead for DeadlineStream {
84    fn fill_buf(&mut self) -> io::Result<&[u8]> {
85        if let Some(deadline) = self.deadline {
86            let timeout = time_until_deadline(deadline)?;
87            if let Some(socket) = self.stream.socket() {
88                socket.set_read_timeout(Some(timeout))?;
89                socket.set_write_timeout(Some(timeout))?;
90            }
91        }
92        self.stream.fill_buf().map_err(|e| {
93            // On unix-y platforms set_read_timeout and set_write_timeout
94            // causes ErrorKind::WouldBlock instead of ErrorKind::TimedOut.
95            // Since the socket most definitely not set_nonblocking(true),
96            // we can safely normalize WouldBlock to TimedOut
97            if e.kind() == io::ErrorKind::WouldBlock {
98                return io_err_timeout("timed out reading response".to_string());
99            }
100            e
101        })
102    }
103
104    fn consume(&mut self, amt: usize) {
105        self.stream.consume(amt)
106    }
107}
108
109impl Read for DeadlineStream {
110    #[allow(clippy::unused_io_amount)]
111    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
112        // If the stream's BufReader has any buffered bytes, return those first.
113        // This avoids calling `fill_buf()` on DeadlineStream unnecessarily,
114        // since that call always does a syscall. This ensures DeadlineStream
115        // can pass through the efficiency we gain by using a BufReader in Stream.
116        if !self.stream.inner.buffer().is_empty() {
117            let n = self.stream.inner.buffer().read(buf)?;
118            self.stream.inner.consume(n);
119            return Ok(n);
120        }
121        // All reads on a DeadlineStream use the BufRead impl. This ensures
122        // that we have a chance to set the correct timeout before each recv
123        // syscall.
124        // Copied from the BufReader implementation of `read()`.
125        let nread = {
126            let mut rem = self.fill_buf()?;
127            rem.read(buf)?
128        };
129        self.consume(nread);
130        Ok(nread)
131    }
132}
133
134// If the deadline is in the future, return the remaining time until
135// then. Otherwise return a TimedOut error.
136fn time_until_deadline(deadline: Instant) -> io::Result<Duration> {
137    let now = Instant::now();
138    match deadline.checked_duration_since(now) {
139        None => Err(io_err_timeout("timed out reading response".to_string())),
140        Some(duration) => Ok(duration),
141    }
142}
143
144pub(crate) fn io_err_timeout(error: String) -> io::Error {
145    io::Error::new(io::ErrorKind::TimedOut, error)
146}
147
148#[derive(Debug)]
149pub(crate) struct ReadOnlyStream(Cursor<Vec<u8>>);
150
151impl ReadOnlyStream {
152    pub(crate) fn new(v: Vec<u8>) -> Self {
153        Self(Cursor::new(v))
154    }
155}
156
157impl Read for ReadOnlyStream {
158    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
159        self.0.read(buf)
160    }
161}
162
163impl std::io::Write for ReadOnlyStream {
164    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
165        Ok(buf.len())
166    }
167
168    fn flush(&mut self) -> io::Result<()> {
169        Ok(())
170    }
171}
172
173impl ReadWrite for ReadOnlyStream {
174    fn socket(&self) -> Option<&std::net::TcpStream> {
175        None
176    }
177}
178
179impl fmt::Debug for Stream {
180    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
181        match self.inner.get_ref().socket() {
182            Some(_) => write!(f, "Stream({:?})", self.inner.get_ref()),
183            None => write!(f, "Stream(Test)"),
184        }
185    }
186}
187
188impl Stream {
189    pub(crate) fn new(
190        t: impl ReadWrite,
191        remote_addr: SocketAddr,
192        pool_returner: PoolReturner,
193    ) -> Stream {
194        Stream::logged_create(Stream {
195            inner: BufReader::new(Box::new(t)),
196            remote_addr,
197            pool_returner,
198        })
199    }
200
201    fn logged_create(stream: Stream) -> Stream {
202        debug!("created stream: {:?}", stream);
203        stream
204    }
205
206    pub(crate) fn buffer(&self) -> &[u8] {
207        self.inner.buffer()
208    }
209
210    // Check if the server has closed a stream by performing a one-byte
211    // non-blocking read. If this returns EOF, the server has closed the
212    // connection: return true. If this returns a successful read, there are
213    // some bytes on the connection even though there was no inflight request.
214    // For plain HTTP streams, that might mean an HTTP 408 was pushed; it
215    // could also mean a buggy server that sent more bytes than a response's
216    // Content-Length. For HTTPS streams, that might mean a close_notify alert,
217    // which is the proper way to shut down an idle stream.
218    // Either way, bytes available on the stream before we've made a request
219    // means the stream is not usable, so we should discard it.
220    // If this returns WouldBlock (aka EAGAIN),
221    // that means the connection is still open: return false. Otherwise
222    // return an error.
223    fn serverclosed_stream(stream: &std::net::TcpStream) -> io::Result<bool> {
224        let mut buf = [0; 1];
225        stream.set_nonblocking(true)?;
226
227        let result = match stream.peek(&mut buf) {
228            Ok(n) => {
229                debug!(
230                    "peek on reused connection returned {}, not WouldBlock; discarding",
231                    n
232                );
233                Ok(true)
234            }
235            Err(e) if e.kind() == io::ErrorKind::WouldBlock => Ok(false),
236            Err(e) => Err(e),
237        };
238        stream.set_nonblocking(false)?;
239
240        result
241    }
242    // Return true if the server has closed this connection.
243    pub(crate) fn server_closed(&self) -> io::Result<bool> {
244        match self.socket() {
245            Some(socket) => Stream::serverclosed_stream(socket),
246            None => Ok(false),
247        }
248    }
249
250    pub(crate) fn set_unpoolable(&mut self) {
251        self.pool_returner = PoolReturner::none();
252    }
253
254    pub(crate) fn return_to_pool(mut self) -> io::Result<()> {
255        // ensure stream can be reused
256        self.reset()?;
257        self.pool_returner.clone().return_to_pool(self);
258        Ok(())
259    }
260
261    pub(crate) fn reset(&mut self) -> io::Result<()> {
262        // When we are turning this back into a regular, non-deadline Stream,
263        // remove any timeouts we set.
264        if let Some(socket) = self.socket() {
265            socket.set_read_timeout(None)?;
266            socket.set_write_timeout(None)?;
267        }
268
269        Ok(())
270    }
271
272    pub(crate) fn socket(&self) -> Option<&TcpStream> {
273        self.inner.get_ref().socket()
274    }
275
276    pub(crate) fn set_read_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
277        if let Some(socket) = self.socket() {
278            socket.set_read_timeout(timeout)
279        } else {
280            Ok(())
281        }
282    }
283}
284
285impl Read for Stream {
286    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
287        self.inner.read(buf)
288    }
289}
290
291impl BufRead for Stream {
292    fn fill_buf(&mut self) -> io::Result<&[u8]> {
293        self.inner.fill_buf()
294    }
295
296    fn consume(&mut self, amt: usize) {
297        self.inner.consume(amt)
298    }
299}
300
301impl<R: Read> From<ChunkDecoder<R>> for Stream
302where
303    R: Read,
304    Stream: From<R>,
305{
306    fn from(chunk_decoder: ChunkDecoder<R>) -> Stream {
307        chunk_decoder.into_inner().into()
308    }
309}
310
311impl Write for Stream {
312    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
313        self.inner.get_mut().write(buf)
314    }
315    fn flush(&mut self) -> io::Result<()> {
316        self.inner.get_mut().flush()
317    }
318}
319
320impl Drop for Stream {
321    fn drop(&mut self) {
322        debug!("dropping stream: {:?}", self);
323    }
324}
325
326pub(crate) fn connect_http(unit: &Unit, hostname: &str) -> Result<Stream, Error> {
327    //
328    let port = unit.url.port().unwrap_or(80);
329    let pool_key = PoolKey::from_parts("http", hostname, port);
330    let pool_returner = PoolReturner::new(&unit.agent, pool_key);
331    connect_host(unit, hostname, port).map(|(t, r)| Stream::new(t, r, pool_returner))
332}
333
334pub(crate) fn connect_https(unit: &Unit, hostname: &str) -> Result<Stream, Error> {
335    let port = unit.url.port().unwrap_or(443);
336
337    let (sock, remote_addr) = connect_host(unit, hostname, port)?;
338
339    let tls_conf = &unit.agent.config.tls_config;
340    let https_stream = tls_conf.connect(hostname, Box::new(sock))?;
341    let pool_key = PoolKey::from_parts("https", hostname, port);
342    let pool_returner = PoolReturner::new(&unit.agent, pool_key);
343    Ok(Stream::new(https_stream, remote_addr, pool_returner))
344}
345
346/// If successful, returns a `TcpStream` and the remote address it is connected to.
347pub(crate) fn connect_host(
348    unit: &Unit,
349    hostname: &str,
350    port: u16,
351) -> Result<(TcpStream, SocketAddr), Error> {
352    let connect_deadline: Option<Instant> =
353        if let Some(timeout_connect) = unit.agent.config.timeout_connect {
354            Instant::now().checked_add(timeout_connect)
355        } else {
356            unit.deadline
357        };
358    let proxy: Option<Proxy> = unit.agent.config.proxy.clone();
359    let netloc = match proxy {
360        Some(ref proxy) => format!("{}:{}", proxy.server, proxy.port),
361        None => format!("{}:{}", hostname, port),
362    };
363
364    // TODO: Find a way to apply deadline to DNS lookup.
365    let sock_addrs = unit.resolver().resolve(&netloc).map_err(|e| {
366        ErrorKind::Dns
367            .msg(format!("resolve dns name '{}'", netloc))
368            .src(e)
369    })?;
370
371    if sock_addrs.is_empty() {
372        return Err(ErrorKind::Dns.msg(format!("No ip address for {}", hostname)));
373    }
374
375    let proto = proxy.as_ref().map(|proxy| proxy.proto);
376
377    let mut any_err = None;
378    let mut any_stream_and_addr = None;
379    // Find the first sock_addr that accepts a connection
380    let multiple_addrs = sock_addrs.len() > 1;
381
382    for sock_addr in sock_addrs {
383        // ensure connect timeout or overall timeout aren't yet hit.
384        let timeout = match connect_deadline {
385            Some(deadline) => {
386                let mut deadline = time_until_deadline(deadline)?;
387                if multiple_addrs {
388                    deadline = deadline.div(2);
389                }
390                Some(deadline)
391            }
392            None => None,
393        };
394
395        debug!("connecting to {} at {}", netloc, &sock_addr);
396
397        // connect with a configured timeout.
398        #[allow(clippy::unnecessary_unwrap)]
399        let stream = if proto.is_some() && Some(Proto::HTTP) != proto {
400            connect_socks(
401                unit,
402                proxy.clone().unwrap(),
403                connect_deadline,
404                sock_addr,
405                hostname,
406                port,
407                proto.unwrap(),
408            )
409        } else if let Some(timeout) = timeout {
410            TcpStream::connect_timeout(&sock_addr, timeout)
411        } else {
412            TcpStream::connect(sock_addr)
413        };
414
415        if let Ok(stream) = stream {
416            any_stream_and_addr = Some((stream, sock_addr));
417            break;
418        } else if let Err(err) = stream {
419            any_err = Some(err);
420        }
421    }
422
423    let (mut stream, remote_addr) = if let Some(stream_and_addr) = any_stream_and_addr {
424        stream_and_addr
425    } else if let Some(e) = any_err {
426        return Err(ErrorKind::ConnectionFailed.msg("Connect error").src(e));
427    } else {
428        panic!("shouldn't happen: failed to connect to all IPs, but no error");
429    };
430
431    stream.set_nodelay(unit.agent.config.no_delay)?;
432
433    if let Some(deadline) = unit.deadline {
434        stream.set_read_timeout(Some(time_until_deadline(deadline)?))?;
435    } else {
436        stream.set_read_timeout(unit.agent.config.timeout_read)?;
437    }
438
439    if let Some(deadline) = unit.deadline {
440        stream.set_write_timeout(Some(time_until_deadline(deadline)?))?;
441    } else {
442        stream.set_write_timeout(unit.agent.config.timeout_write)?;
443    }
444
445    if proto == Some(Proto::HTTP) && unit.url.scheme() == "https" {
446        if let Some(ref proxy) = proxy {
447            write!(
448                stream,
449                "{}",
450                proxy.connect(hostname, port, &unit.agent.config.user_agent)
451            )
452            .unwrap();
453            stream.flush()?;
454
455            let s = stream.try_clone()?;
456            let pool_key = PoolKey::from_parts(unit.url.scheme(), hostname, port);
457            let pool_returner = PoolReturner::new(&unit.agent, pool_key);
458            let s = Stream::new(s, remote_addr, pool_returner);
459            let response = Response::do_from_stream(s, unit.clone())?;
460            Proxy::verify_response(&response)?;
461        }
462    }
463
464    Ok((stream, remote_addr))
465}
466
467#[cfg(feature = "socks-proxy")]
468fn socks_local_nslookup(
469    unit: &Unit,
470    hostname: &str,
471    port: u16,
472) -> Result<TargetAddr, std::io::Error> {
473    let addrs: Vec<SocketAddr> = unit
474        .resolver()
475        .resolve(&format!("{}:{}", hostname, port))
476        .map_err(|e| {
477            std::io::Error::new(io::ErrorKind::NotFound, format!("DNS failure: {}.", e))
478        })?;
479
480    if addrs.is_empty() {
481        return Err(std::io::Error::new(
482            io::ErrorKind::NotFound,
483            "DNS failure: no socket addrs found.",
484        ));
485    }
486
487    match addrs[0].to_target_addr() {
488        Ok(addr) => Ok(addr),
489        Err(err) => {
490            return Err(std::io::Error::new(
491                io::ErrorKind::NotFound,
492                format!("DNS failure: {}.", err),
493            ))
494        }
495    }
496}
497
498#[cfg(feature = "socks-proxy")]
499fn connect_socks(
500    unit: &Unit,
501    proxy: Proxy,
502    deadline: Option<Instant>,
503    proxy_addr: SocketAddr,
504    host: &str,
505    port: u16,
506    proto: Proto,
507) -> Result<TcpStream, std::io::Error> {
508    use socks::TargetAddr::Domain;
509    use std::net::{Ipv4Addr, Ipv6Addr};
510    use std::str::FromStr;
511
512    let host_addr = if Ipv4Addr::from_str(host).is_ok()
513        || Ipv6Addr::from_str(host).is_ok()
514        || proto == Proto::SOCKS4
515    {
516        match socks_local_nslookup(unit, host, port) {
517            Ok(addr) => addr,
518            Err(err) => return Err(err),
519        }
520    } else {
521        Domain(String::from(host), port)
522    };
523
524    // Since SocksXStream doesn't support set_read_timeout, a suboptimal one is implemented via
525    // thread::spawn.
526    // # Happy Path
527    // 1) thread spawns 2) get_socksX_stream returns ok 3) tx sends result ok
528    // 4) slave_signal signals done and cvar notifies master_signal 5) cvar.wait_timeout receives the done signal
529    // 6) rx receives the socks5 stream and the function exists
530    // # Sad path
531    // 1) get_socksX_stream hangs 2)slave_signal does not send done notification 3) cvar.wait_timeout times out
532    // 3) an exception is thrown.
533    // # Defects
534    // 1) In the event of a timeout, a thread may be left running in the background.
535    // TODO: explore supporting timeouts upstream in Socks5Proxy.
536    #[allow(clippy::mutex_atomic)]
537    let stream = if let Some(deadline) = deadline {
538        use std::sync::mpsc::channel;
539        use std::sync::{Arc, Condvar, Mutex};
540        use std::thread;
541        let master_signal = Arc::new((Mutex::new(false), Condvar::new()));
542        let slave_signal = master_signal.clone();
543        let (tx, rx) = channel();
544        thread::spawn(move || {
545            let (lock, cvar) = &*slave_signal;
546            if tx // try to get a socks stream and send it to the parent thread's rx
547                .send(if proto == Proto::SOCKS5 {
548                    get_socks5_stream(&proxy, &proxy_addr, host_addr)
549                } else {
550                    get_socks4_stream(&proxy_addr, host_addr)
551                })
552                .is_ok()
553            {
554                // if sending the stream has succeeded we need to notify the parent thread
555                let mut done = lock.lock().unwrap();
556                // set the done signal to true
557                *done = true;
558                // notify the parent thread
559                cvar.notify_one();
560            }
561        });
562
563        let (lock, cvar) = &*master_signal;
564        let done = lock.lock().unwrap();
565
566        let timeout_connect = time_until_deadline(deadline)?;
567        let done_result = cvar.wait_timeout(done, timeout_connect).unwrap();
568        let done = done_result.0;
569        if *done {
570            rx.recv().unwrap()?
571        } else {
572            return Err(io_err_timeout(format!(
573                "SOCKS proxy: {}:{} timed out connecting after {}ms.",
574                host,
575                port,
576                timeout_connect.as_millis()
577            )));
578        }
579    } else if proto == Proto::SOCKS5 {
580        get_socks5_stream(&proxy, &proxy_addr, host_addr)?
581    } else {
582        get_socks4_stream(&proxy_addr, host_addr)?
583    };
584
585    Ok(stream)
586}
587
588#[cfg(feature = "socks-proxy")]
589fn get_socks5_stream(
590    proxy: &Proxy,
591    proxy_addr: &SocketAddr,
592    host_addr: TargetAddr,
593) -> Result<TcpStream, std::io::Error> {
594    use socks::Socks5Stream;
595    if proxy.use_authorization() {
596        let stream = Socks5Stream::connect_with_password(
597            proxy_addr,
598            host_addr,
599            proxy.user.as_ref().unwrap(),
600            proxy.password.as_ref().unwrap(),
601        )?
602        .into_inner();
603        Ok(stream)
604    } else {
605        match Socks5Stream::connect(proxy_addr, host_addr) {
606            Ok(socks_stream) => Ok(socks_stream.into_inner()),
607            Err(err) => Err(err),
608        }
609    }
610}
611
612#[cfg(feature = "socks-proxy")]
613fn get_socks4_stream(
614    proxy_addr: &SocketAddr,
615    host_addr: TargetAddr,
616) -> Result<TcpStream, std::io::Error> {
617    match socks::Socks4Stream::connect(proxy_addr, host_addr, "") {
618        Ok(socks_stream) => Ok(socks_stream.into_inner()),
619        Err(err) => Err(err),
620    }
621}
622
623#[cfg(not(feature = "socks-proxy"))]
624fn connect_socks(
625    _unit: &Unit,
626    _proxy: Proxy,
627    _deadline: Option<Instant>,
628    _proxy_addr: SocketAddr,
629    _hostname: &str,
630    _port: u16,
631    _proto: Proto,
632) -> Result<TcpStream, std::io::Error> {
633    Err(std::io::Error::new(
634        io::ErrorKind::Other,
635        "SOCKS feature disabled.",
636    ))
637}
638
639#[cfg(test)]
640pub(crate) fn connect_test(unit: &Unit) -> Result<Stream, Error> {
641    use crate::test;
642    test::resolve_handler(unit)
643}
644
645#[cfg(not(test))]
646pub(crate) fn connect_test(unit: &Unit) -> Result<Stream, Error> {
647    Err(ErrorKind::UnknownScheme.msg(format!("unknown scheme '{}'", unit.url.scheme())))
648}
649
650#[cfg(test)]
651pub(crate) fn remote_addr_for_test() -> SocketAddr {
652    use std::net::{Ipv4Addr, SocketAddrV4};
653    SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0).into()
654}
655
656#[cfg(test)]
657mod tests {
658    use super::*;
659    use std::{
660        io::Read,
661        sync::{Arc, Mutex},
662    };
663
664    // Returns all zeroes to `.read()` and logs how many times it's called
665    struct ReadRecorder {
666        reads: Arc<Mutex<Vec<usize>>>,
667    }
668
669    impl Read for ReadRecorder {
670        fn read(&mut self, buf: &mut [u8]) -> std::result::Result<usize, std::io::Error> {
671            self.reads.lock().unwrap().push(buf.len());
672            buf.fill(0);
673            Ok(buf.len())
674        }
675    }
676
677    impl Write for ReadRecorder {
678        fn write(&mut self, _: &[u8]) -> io::Result<usize> {
679            unimplemented!()
680        }
681
682        fn flush(&mut self) -> io::Result<()> {
683            unimplemented!()
684        }
685    }
686
687    impl fmt::Debug for ReadRecorder {
688        fn fmt(&self, _: &mut fmt::Formatter<'_>) -> fmt::Result {
689            unimplemented!()
690        }
691    }
692
693    impl ReadWrite for ReadRecorder {
694        fn socket(&self) -> Option<&TcpStream> {
695            unimplemented!()
696        }
697    }
698
699    // Test that when a DeadlineStream wraps a Stream, and the user performs a series of
700    // tiny read_exacts, Stream's BufReader is used appropriately.
701    #[test]
702    fn test_deadline_stream_buffering() {
703        let reads = Arc::new(Mutex::new(vec![]));
704        let recorder = ReadRecorder {
705            reads: reads.clone(),
706        };
707        let stream = Stream::new(recorder, remote_addr_for_test(), PoolReturner::none());
708        let mut deadline_stream = DeadlineStream::new(stream, None);
709        let mut buf = [0u8; 1];
710        for _ in 0..8193 {
711            let _ = deadline_stream.read(&mut buf).unwrap();
712        }
713        let reads = reads.lock().unwrap();
714        assert_eq!(reads.len(), 2);
715        assert_eq!(reads[0], 8192);
716        assert_eq!(reads[1], 8192);
717    }
718}