h2/
client.rs

1//! Client implementation of the HTTP/2 protocol.
2//!
3//! # Getting started
4//!
5//! Running an HTTP/2 client requires the caller to establish the underlying
6//! connection as well as get the connection to a state that is ready to begin
7//! the HTTP/2 handshake. See [here](../index.html#handshake) for more
8//! details.
9//!
10//! This could be as basic as using Tokio's [`TcpStream`] to connect to a remote
11//! host, but usually it means using either ALPN or HTTP/1.1 protocol upgrades.
12//!
13//! Once a connection is obtained, it is passed to [`handshake`], which will
14//! begin the [HTTP/2 handshake]. This returns a future that completes once
15//! the handshake process is performed and HTTP/2 streams may be initialized.
16//!
17//! [`handshake`] uses default configuration values. There are a number of
18//! settings that can be changed by using [`Builder`] instead.
19//!
20//! Once the handshake future completes, the caller is provided with a
21//! [`Connection`] instance and a [`SendRequest`] instance. The [`Connection`]
22//! instance is used to drive the connection (see [Managing the connection]).
23//! The [`SendRequest`] instance is used to initialize new streams (see [Making
24//! requests]).
25//!
26//! # Making requests
27//!
28//! Requests are made using the [`SendRequest`] handle provided by the handshake
29//! future. Once a request is submitted, an HTTP/2 stream is initialized and
30//! the request is sent to the server.
31//!
32//! A request body and request trailers are sent using [`SendRequest`] and the
33//! server's response is returned once the [`ResponseFuture`] future completes.
34//! Both the [`SendStream`] and [`ResponseFuture`] instances are returned by
35//! [`SendRequest::send_request`] and are tied to the HTTP/2 stream
36//! initialized by the sent request.
37//!
38//! The [`SendRequest::poll_ready`] function returns `Ready` when a new HTTP/2
39//! stream can be created, i.e. as long as the current number of active streams
40//! is below [`MAX_CONCURRENT_STREAMS`]. If a new stream cannot be created, the
41//! caller will be notified once an existing stream closes, freeing capacity for
42//! the caller.  The caller should use [`SendRequest::poll_ready`] to check for
43//! capacity before sending a request to the server.
44//!
45//! [`SendRequest`] enforces the [`MAX_CONCURRENT_STREAMS`] setting. The user
46//! must not send a request if `poll_ready` does not return `Ready`. Attempting
47//! to do so will result in an [`Error`] being returned.
48//!
49//! # Managing the connection
50//!
51//! The [`Connection`] instance is used to manage connection state. The caller
52//! is required to call [`Connection::poll`] in order to advance state.
53//! [`SendRequest::send_request`] and other functions have no effect unless
54//! [`Connection::poll`] is called.
55//!
56//! The [`Connection`] instance should only be dropped once [`Connection::poll`]
57//! returns `Ready`. At this point, the underlying socket has been closed and no
58//! further work needs to be done.
59//!
60//! The easiest way to ensure that the [`Connection`] instance gets polled is to
61//! submit the [`Connection`] instance to an [executor]. The executor will then
62//! manage polling the connection until the connection is complete.
63//! Alternatively, the caller can call `poll` manually.
64//!
65//! # Example
66//!
67//! ```rust, no_run
68//!
69//! use h2::client;
70//!
71//! use http::{Request, Method};
72//! use std::error::Error;
73//! use tokio::net::TcpStream;
74//!
75//! #[tokio::main]
76//! pub async fn main() -> Result<(), Box<dyn Error>> {
77//!     // Establish TCP connection to the server.
78//!     let tcp = TcpStream::connect("127.0.0.1:5928").await?;
79//!     let (h2, connection) = client::handshake(tcp).await?;
80//!     tokio::spawn(async move {
81//!         connection.await.unwrap();
82//!     });
83//!
84//!     let mut h2 = h2.ready().await?;
85//!     // Prepare the HTTP request to send to the server.
86//!     let request = Request::builder()
87//!                     .method(Method::GET)
88//!                     .uri("https://www.example.com/")
89//!                     .body(())
90//!                     .unwrap();
91//!
92//!     // Send the request. The second tuple item allows the caller
93//!     // to stream a request body.
94//!     let (response, _) = h2.send_request(request, true).unwrap();
95//!
96//!     let (head, mut body) = response.await?.into_parts();
97//!
98//!     println!("Received response: {:?}", head);
99//!
100//!     // The `flow_control` handle allows the caller to manage
101//!     // flow control.
102//!     //
103//!     // Whenever data is received, the caller is responsible for
104//!     // releasing capacity back to the server once it has freed
105//!     // the data from memory.
106//!     let mut flow_control = body.flow_control().clone();
107//!
108//!     while let Some(chunk) = body.data().await {
109//!         let chunk = chunk?;
110//!         println!("RX: {:?}", chunk);
111//!
112//!         // Let the server send more data.
113//!         let _ = flow_control.release_capacity(chunk.len());
114//!     }
115//!
116//!     Ok(())
117//! }
118//! ```
119//!
120//! [`TcpStream`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpStream.html
121//! [`handshake`]: fn.handshake.html
122//! [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
123//! [`SendRequest`]: struct.SendRequest.html
124//! [`SendStream`]: ../struct.SendStream.html
125//! [Making requests]: #making-requests
126//! [Managing the connection]: #managing-the-connection
127//! [`Connection`]: struct.Connection.html
128//! [`Connection::poll`]: struct.Connection.html#method.poll
129//! [`SendRequest::send_request`]: struct.SendRequest.html#method.send_request
130//! [`MAX_CONCURRENT_STREAMS`]: http://httpwg.org/specs/rfc7540.html#SettingValues
131//! [`SendRequest`]: struct.SendRequest.html
132//! [`ResponseFuture`]: struct.ResponseFuture.html
133//! [`SendRequest::poll_ready`]: struct.SendRequest.html#method.poll_ready
134//! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
135//! [`Builder`]: struct.Builder.html
136//! [`Error`]: ../struct.Error.html
137
138use crate::codec::{Codec, SendError, UserError};
139use crate::ext::Protocol;
140use crate::frame::{Headers, Pseudo, Reason, Settings, StreamId};
141use crate::proto::{self, Error};
142use crate::{FlowControl, PingPong, RecvStream, SendStream};
143
144use bytes::{Buf, Bytes};
145use http::{uri, HeaderMap, Method, Request, Response, Version};
146use std::fmt;
147use std::future::Future;
148use std::pin::Pin;
149use std::task::{Context, Poll};
150use std::time::Duration;
151use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
152use tracing::Instrument;
153
154/// Initializes new HTTP/2 streams on a connection by sending a request.
155///
156/// This type does no work itself. Instead, it is a handle to the inner
157/// connection state held by [`Connection`]. If the associated connection
158/// instance is dropped, all `SendRequest` functions will return [`Error`].
159///
160/// [`SendRequest`] instances are able to move to and operate on separate tasks
161/// / threads than their associated [`Connection`] instance. Internally, there
162/// is a buffer used to stage requests before they get written to the
163/// connection. There is no guarantee that requests get written to the
164/// connection in FIFO order as HTTP/2 prioritization logic can play a role.
165///
166/// [`SendRequest`] implements [`Clone`], enabling the creation of many
167/// instances that are backed by a single connection.
168///
169/// See [module] level documentation for more details.
170///
171/// [module]: index.html
172/// [`Connection`]: struct.Connection.html
173/// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html
174/// [`Error`]: ../struct.Error.html
175pub struct SendRequest<B: Buf> {
176    inner: proto::Streams<B, Peer>,
177    pending: Option<proto::OpaqueStreamRef>,
178}
179
180/// Returns a `SendRequest` instance once it is ready to send at least one
181/// request.
182#[derive(Debug)]
183pub struct ReadySendRequest<B: Buf> {
184    inner: Option<SendRequest<B>>,
185}
186
187/// Manages all state associated with an HTTP/2 client connection.
188///
189/// A `Connection` is backed by an I/O resource (usually a TCP socket) and
190/// implements the HTTP/2 client logic for that connection. It is responsible
191/// for driving the internal state forward, performing the work requested of the
192/// associated handles ([`SendRequest`], [`ResponseFuture`], [`SendStream`],
193/// [`RecvStream`]).
194///
195/// `Connection` values are created by calling [`handshake`]. Once a
196/// `Connection` value is obtained, the caller must repeatedly call [`poll`]
197/// until `Ready` is returned. The easiest way to do this is to submit the
198/// `Connection` instance to an [executor].
199///
200/// [module]: index.html
201/// [`handshake`]: fn.handshake.html
202/// [`SendRequest`]: struct.SendRequest.html
203/// [`ResponseFuture`]: struct.ResponseFuture.html
204/// [`SendStream`]: ../struct.SendStream.html
205/// [`RecvStream`]: ../struct.RecvStream.html
206/// [`poll`]: #method.poll
207/// [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
208///
209/// # Examples
210///
211/// ```
212/// # use tokio::io::{AsyncRead, AsyncWrite};
213/// # use h2::client;
214/// # use h2::client::*;
215/// #
216/// # async fn doc<T>(my_io: T) -> Result<(), h2::Error>
217/// # where T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
218/// # {
219///     let (send_request, connection) = client::handshake(my_io).await?;
220///     // Submit the connection handle to an executor.
221///     tokio::spawn(async { connection.await.expect("connection failed"); });
222///
223///     // Now, use `send_request` to initialize HTTP/2 streams.
224///     // ...
225/// # Ok(())
226/// # }
227/// #
228/// # pub fn main() {}
229/// ```
230#[must_use = "futures do nothing unless polled"]
231pub struct Connection<T, B: Buf = Bytes> {
232    inner: proto::Connection<T, Peer, B>,
233}
234
235/// A future of an HTTP response.
236#[derive(Debug)]
237#[must_use = "futures do nothing unless polled"]
238pub struct ResponseFuture {
239    inner: proto::OpaqueStreamRef,
240    push_promise_consumed: bool,
241}
242
243/// A future of a pushed HTTP response.
244///
245/// We have to differentiate between pushed and non pushed because of the spec
246/// <https://httpwg.org/specs/rfc7540.html#PUSH_PROMISE>
247/// > PUSH_PROMISE frames MUST only be sent on a peer-initiated stream
248/// > that is in either the "open" or "half-closed (remote)" state.
249#[derive(Debug)]
250#[must_use = "futures do nothing unless polled"]
251pub struct PushedResponseFuture {
252    inner: ResponseFuture,
253}
254
255/// A pushed response and corresponding request headers
256#[derive(Debug)]
257pub struct PushPromise {
258    /// The request headers
259    request: Request<()>,
260
261    /// The pushed response
262    response: PushedResponseFuture,
263}
264
265/// A stream of pushed responses and corresponding promised requests
266#[derive(Debug)]
267#[must_use = "streams do nothing unless polled"]
268pub struct PushPromises {
269    inner: proto::OpaqueStreamRef,
270}
271
272/// Builds client connections with custom configuration values.
273///
274/// Methods can be chained in order to set the configuration values.
275///
276/// The client is constructed by calling [`handshake`] and passing the I/O
277/// handle that will back the HTTP/2 server.
278///
279/// New instances of `Builder` are obtained via [`Builder::new`].
280///
281/// See function level documentation for details on the various client
282/// configuration settings.
283///
284/// [`Builder::new`]: struct.Builder.html#method.new
285/// [`handshake`]: struct.Builder.html#method.handshake
286///
287/// # Examples
288///
289/// ```
290/// # use tokio::io::{AsyncRead, AsyncWrite};
291/// # use h2::client::*;
292/// # use bytes::Bytes;
293/// #
294/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
295///     -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
296/// # {
297/// // `client_fut` is a future representing the completion of the HTTP/2
298/// // handshake.
299/// let client_fut = Builder::new()
300///     .initial_window_size(1_000_000)
301///     .max_concurrent_streams(1000)
302///     .handshake(my_io);
303/// # client_fut.await
304/// # }
305/// #
306/// # pub fn main() {}
307/// ```
308#[derive(Clone, Debug)]
309pub struct Builder {
310    /// Time to keep locally reset streams around before reaping.
311    reset_stream_duration: Duration,
312
313    /// Initial maximum number of locally initiated (send) streams.
314    /// After receiving a SETTINGS frame from the remote peer,
315    /// the connection will overwrite this value with the
316    /// MAX_CONCURRENT_STREAMS specified in the frame.
317    /// If no value is advertised by the remote peer in the initial SETTINGS
318    /// frame, it will be set to usize::MAX.
319    initial_max_send_streams: usize,
320
321    /// Initial target window size for new connections.
322    initial_target_connection_window_size: Option<u32>,
323
324    /// Maximum amount of bytes to "buffer" for writing per stream.
325    max_send_buffer_size: usize,
326
327    /// Maximum number of locally reset streams to keep at a time.
328    reset_stream_max: usize,
329
330    /// Maximum number of remotely reset streams to allow in the pending
331    /// accept queue.
332    pending_accept_reset_stream_max: usize,
333
334    /// Initial `Settings` frame to send as part of the handshake.
335    settings: Settings,
336
337    /// The stream ID of the first (lowest) stream. Subsequent streams will use
338    /// monotonically increasing stream IDs.
339    stream_id: StreamId,
340
341    /// Maximum number of locally reset streams due to protocol error across
342    /// the lifetime of the connection.
343    ///
344    /// When this gets exceeded, we issue GOAWAYs.
345    local_max_error_reset_streams: Option<usize>,
346}
347
348#[derive(Debug)]
349pub(crate) struct Peer;
350
351// ===== impl SendRequest =====
352
353impl<B> SendRequest<B>
354where
355    B: Buf,
356{
357    /// Returns `Ready` when the connection can initialize a new HTTP/2
358    /// stream.
359    ///
360    /// This function must return `Ready` before `send_request` is called. When
361    /// `Poll::Pending` is returned, the task will be notified once the readiness
362    /// state changes.
363    ///
364    /// See [module] level docs for more details.
365    ///
366    /// [module]: index.html
367    pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
368        ready!(self.inner.poll_pending_open(cx, self.pending.as_ref()))?;
369        self.pending = None;
370        Poll::Ready(Ok(()))
371    }
372
373    /// Consumes `self`, returning a future that returns `self` back once it is
374    /// ready to send a request.
375    ///
376    /// This function should be called before calling `send_request`.
377    ///
378    /// This is a functional combinator for [`poll_ready`]. The returned future
379    /// will call `SendStream::poll_ready` until `Ready`, then returns `self` to
380    /// the caller.
381    ///
382    /// # Examples
383    ///
384    /// ```rust
385    /// # use h2::client::*;
386    /// # use http::*;
387    /// # async fn doc(send_request: SendRequest<&'static [u8]>)
388    /// # {
389    /// // First, wait until the `send_request` handle is ready to send a new
390    /// // request
391    /// let mut send_request = send_request.ready().await.unwrap();
392    /// // Use `send_request` here.
393    /// # }
394    /// # pub fn main() {}
395    /// ```
396    ///
397    /// See [module] level docs for more details.
398    ///
399    /// [`poll_ready`]: #method.poll_ready
400    /// [module]: index.html
401    pub fn ready(self) -> ReadySendRequest<B> {
402        ReadySendRequest { inner: Some(self) }
403    }
404
405    /// Sends a HTTP/2 request to the server.
406    ///
407    /// `send_request` initializes a new HTTP/2 stream on the associated
408    /// connection, then sends the given request using this new stream. Only the
409    /// request head is sent.
410    ///
411    /// On success, a [`ResponseFuture`] instance and [`SendStream`] instance
412    /// are returned. The [`ResponseFuture`] instance is used to get the
413    /// server's response and the [`SendStream`] instance is used to send a
414    /// request body or trailers to the server over the same HTTP/2 stream.
415    ///
416    /// To send a request body or trailers, set `end_of_stream` to `false`.
417    /// Then, use the returned [`SendStream`] instance to stream request body
418    /// chunks or send trailers. If `end_of_stream` is **not** set to `false`
419    /// then attempting to call [`SendStream::send_data`] or
420    /// [`SendStream::send_trailers`] will result in an error.
421    ///
422    /// If no request body or trailers are to be sent, set `end_of_stream` to
423    /// `true` and drop the returned [`SendStream`] instance.
424    ///
425    /// # A note on HTTP versions
426    ///
427    /// The provided `Request` will be encoded differently depending on the
428    /// value of its version field. If the version is set to 2.0, then the
429    /// request is encoded as per the specification recommends.
430    ///
431    /// If the version is set to a lower value, then the request is encoded to
432    /// preserve the characteristics of HTTP 1.1 and lower. Specifically, host
433    /// headers are permitted and the `:authority` pseudo header is not
434    /// included.
435    ///
436    /// The caller should always set the request's version field to 2.0 unless
437    /// specifically transmitting an HTTP 1.1 request over 2.0.
438    ///
439    /// # Examples
440    ///
441    /// Sending a request with no body
442    ///
443    /// ```rust
444    /// # use h2::client::*;
445    /// # use http::*;
446    /// # async fn doc(send_request: SendRequest<&'static [u8]>)
447    /// # {
448    /// // First, wait until the `send_request` handle is ready to send a new
449    /// // request
450    /// let mut send_request = send_request.ready().await.unwrap();
451    /// // Prepare the HTTP request to send to the server.
452    /// let request = Request::get("https://www.example.com/")
453    ///     .body(())
454    ///     .unwrap();
455    ///
456    /// // Send the request to the server. Since we are not sending a
457    /// // body or trailers, we can drop the `SendStream` instance.
458    /// let (response, _) = send_request.send_request(request, true).unwrap();
459    /// let response = response.await.unwrap();
460    /// // Process the response
461    /// # }
462    /// # pub fn main() {}
463    /// ```
464    ///
465    /// Sending a request with a body and trailers
466    ///
467    /// ```rust
468    /// # use h2::client::*;
469    /// # use http::*;
470    /// # async fn doc(send_request: SendRequest<&'static [u8]>)
471    /// # {
472    /// // First, wait until the `send_request` handle is ready to send a new
473    /// // request
474    /// let mut send_request = send_request.ready().await.unwrap();
475    ///
476    /// // Prepare the HTTP request to send to the server.
477    /// let request = Request::get("https://www.example.com/")
478    ///     .body(())
479    ///     .unwrap();
480    ///
481    /// // Send the request to the server. If we are not sending a
482    /// // body or trailers, we can drop the `SendStream` instance.
483    /// let (response, mut send_stream) = send_request
484    ///     .send_request(request, false).unwrap();
485    ///
486    /// // At this point, one option would be to wait for send capacity.
487    /// // Doing so would allow us to not hold data in memory that
488    /// // cannot be sent. However, this is not a requirement, so this
489    /// // example will skip that step. See `SendStream` documentation
490    /// // for more details.
491    /// send_stream.send_data(b"hello", false).unwrap();
492    /// send_stream.send_data(b"world", false).unwrap();
493    ///
494    /// // Send the trailers.
495    /// let mut trailers = HeaderMap::new();
496    /// trailers.insert(
497    ///     header::HeaderName::from_bytes(b"my-trailer").unwrap(),
498    ///     header::HeaderValue::from_bytes(b"hello").unwrap());
499    ///
500    /// send_stream.send_trailers(trailers).unwrap();
501    ///
502    /// let response = response.await.unwrap();
503    /// // Process the response
504    /// # }
505    /// # pub fn main() {}
506    /// ```
507    ///
508    /// [`ResponseFuture`]: struct.ResponseFuture.html
509    /// [`SendStream`]: ../struct.SendStream.html
510    /// [`SendStream::send_data`]: ../struct.SendStream.html#method.send_data
511    /// [`SendStream::send_trailers`]: ../struct.SendStream.html#method.send_trailers
512    pub fn send_request(
513        &mut self,
514        request: Request<()>,
515        end_of_stream: bool,
516    ) -> Result<(ResponseFuture, SendStream<B>), crate::Error> {
517        self.inner
518            .send_request(request, end_of_stream, self.pending.as_ref())
519            .map_err(Into::into)
520            .map(|(stream, is_full)| {
521                if stream.is_pending_open() && is_full {
522                    // Only prevent sending another request when the request queue
523                    // is not full.
524                    self.pending = Some(stream.clone_to_opaque());
525                }
526
527                let response = ResponseFuture {
528                    inner: stream.clone_to_opaque(),
529                    push_promise_consumed: false,
530                };
531
532                let stream = SendStream::new(stream);
533
534                (response, stream)
535            })
536    }
537
538    /// Returns whether the [extended CONNECT protocol][1] is enabled or not.
539    ///
540    /// This setting is configured by the server peer by sending the
541    /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame.
542    /// This method returns the currently acknowledged value received from the
543    /// remote.
544    ///
545    /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
546    /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3
547    pub fn is_extended_connect_protocol_enabled(&self) -> bool {
548        self.inner.is_extended_connect_protocol_enabled()
549    }
550
551    /// Returns the current max send streams
552    pub fn current_max_send_streams(&self) -> usize {
553        self.inner.current_max_send_streams()
554    }
555
556    /// Returns the current max recv streams
557    pub fn current_max_recv_streams(&self) -> usize {
558        self.inner.current_max_recv_streams()
559    }
560}
561
562impl<B> fmt::Debug for SendRequest<B>
563where
564    B: Buf,
565{
566    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
567        fmt.debug_struct("SendRequest").finish()
568    }
569}
570
571impl<B> Clone for SendRequest<B>
572where
573    B: Buf,
574{
575    fn clone(&self) -> Self {
576        SendRequest {
577            inner: self.inner.clone(),
578            pending: None,
579        }
580    }
581}
582
583#[cfg(feature = "unstable")]
584impl<B> SendRequest<B>
585where
586    B: Buf,
587{
588    /// Returns the number of active streams.
589    ///
590    /// An active stream is a stream that has not yet transitioned to a closed
591    /// state.
592    pub fn num_active_streams(&self) -> usize {
593        self.inner.num_active_streams()
594    }
595
596    /// Returns the number of streams that are held in memory.
597    ///
598    /// A wired stream is a stream that is either active or is closed but must
599    /// stay in memory for some reason. For example, there are still outstanding
600    /// userspace handles pointing to the slot.
601    pub fn num_wired_streams(&self) -> usize {
602        self.inner.num_wired_streams()
603    }
604}
605
606// ===== impl ReadySendRequest =====
607
608impl<B> Future for ReadySendRequest<B>
609where
610    B: Buf,
611{
612    type Output = Result<SendRequest<B>, crate::Error>;
613
614    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
615        match &mut self.inner {
616            Some(send_request) => {
617                ready!(send_request.poll_ready(cx))?;
618            }
619            None => panic!("called `poll` after future completed"),
620        }
621
622        Poll::Ready(Ok(self.inner.take().unwrap()))
623    }
624}
625
626// ===== impl Builder =====
627
628impl Builder {
629    /// Returns a new client builder instance initialized with default
630    /// configuration values.
631    ///
632    /// Configuration methods can be chained on the return value.
633    ///
634    /// # Examples
635    ///
636    /// ```
637    /// # use tokio::io::{AsyncRead, AsyncWrite};
638    /// # use h2::client::*;
639    /// # use bytes::Bytes;
640    /// #
641    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
642    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
643    /// # {
644    /// // `client_fut` is a future representing the completion of the HTTP/2
645    /// // handshake.
646    /// let client_fut = Builder::new()
647    ///     .initial_window_size(1_000_000)
648    ///     .max_concurrent_streams(1000)
649    ///     .handshake(my_io);
650    /// # client_fut.await
651    /// # }
652    /// #
653    /// # pub fn main() {}
654    /// ```
655    pub fn new() -> Builder {
656        Builder {
657            max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
658            reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
659            reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
660            pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX,
661            initial_target_connection_window_size: None,
662            initial_max_send_streams: usize::MAX,
663            settings: Default::default(),
664            stream_id: 1.into(),
665            local_max_error_reset_streams: Some(proto::DEFAULT_LOCAL_RESET_COUNT_MAX),
666        }
667    }
668
669    /// Indicates the initial window size (in octets) for stream-level
670    /// flow control for received data.
671    ///
672    /// The initial window of a stream is used as part of flow control. For more
673    /// details, see [`FlowControl`].
674    ///
675    /// The default value is 65,535.
676    ///
677    /// [`FlowControl`]: ../struct.FlowControl.html
678    ///
679    /// # Examples
680    ///
681    /// ```
682    /// # use tokio::io::{AsyncRead, AsyncWrite};
683    /// # use h2::client::*;
684    /// # use bytes::Bytes;
685    /// #
686    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
687    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
688    /// # {
689    /// // `client_fut` is a future representing the completion of the HTTP/2
690    /// // handshake.
691    /// let client_fut = Builder::new()
692    ///     .initial_window_size(1_000_000)
693    ///     .handshake(my_io);
694    /// # client_fut.await
695    /// # }
696    /// #
697    /// # pub fn main() {}
698    /// ```
699    pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
700        self.settings.set_initial_window_size(Some(size));
701        self
702    }
703
704    /// Indicates the initial window size (in octets) for connection-level flow control
705    /// for received data.
706    ///
707    /// The initial window of a connection is used as part of flow control. For more details,
708    /// see [`FlowControl`].
709    ///
710    /// The default value is 65,535.
711    ///
712    /// [`FlowControl`]: ../struct.FlowControl.html
713    ///
714    /// # Examples
715    ///
716    /// ```
717    /// # use tokio::io::{AsyncRead, AsyncWrite};
718    /// # use h2::client::*;
719    /// # use bytes::Bytes;
720    /// #
721    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
722    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
723    /// # {
724    /// // `client_fut` is a future representing the completion of the HTTP/2
725    /// // handshake.
726    /// let client_fut = Builder::new()
727    ///     .initial_connection_window_size(1_000_000)
728    ///     .handshake(my_io);
729    /// # client_fut.await
730    /// # }
731    /// #
732    /// # pub fn main() {}
733    /// ```
734    pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
735        self.initial_target_connection_window_size = Some(size);
736        self
737    }
738
739    /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the
740    /// configured client is able to accept.
741    ///
742    /// The sender may send data frames that are **smaller** than this value,
743    /// but any data larger than `max` will be broken up into multiple `DATA`
744    /// frames.
745    ///
746    /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
747    ///
748    /// # Examples
749    ///
750    /// ```
751    /// # use tokio::io::{AsyncRead, AsyncWrite};
752    /// # use h2::client::*;
753    /// # use bytes::Bytes;
754    /// #
755    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
756    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
757    /// # {
758    /// // `client_fut` is a future representing the completion of the HTTP/2
759    /// // handshake.
760    /// let client_fut = Builder::new()
761    ///     .max_frame_size(1_000_000)
762    ///     .handshake(my_io);
763    /// # client_fut.await
764    /// # }
765    /// #
766    /// # pub fn main() {}
767    /// ```
768    ///
769    /// # Panics
770    ///
771    /// This function panics if `max` is not within the legal range specified
772    /// above.
773    pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
774        self.settings.set_max_frame_size(Some(max));
775        self
776    }
777
778    /// Sets the max size of received header frames.
779    ///
780    /// This advisory setting informs a peer of the maximum size of header list
781    /// that the sender is prepared to accept, in octets. The value is based on
782    /// the uncompressed size of header fields, including the length of the name
783    /// and value in octets plus an overhead of 32 octets for each header field.
784    ///
785    /// This setting is also used to limit the maximum amount of data that is
786    /// buffered to decode HEADERS frames.
787    ///
788    /// # Examples
789    ///
790    /// ```
791    /// # use tokio::io::{AsyncRead, AsyncWrite};
792    /// # use h2::client::*;
793    /// # use bytes::Bytes;
794    /// #
795    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
796    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
797    /// # {
798    /// // `client_fut` is a future representing the completion of the HTTP/2
799    /// // handshake.
800    /// let client_fut = Builder::new()
801    ///     .max_header_list_size(16 * 1024)
802    ///     .handshake(my_io);
803    /// # client_fut.await
804    /// # }
805    /// #
806    /// # pub fn main() {}
807    /// ```
808    pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
809        self.settings.set_max_header_list_size(Some(max));
810        self
811    }
812
813    /// Sets the maximum number of concurrent streams.
814    ///
815    /// The maximum concurrent streams setting only controls the maximum number
816    /// of streams that can be initiated by the remote peer. In other words,
817    /// when this setting is set to 100, this does not limit the number of
818    /// concurrent streams that can be created by the caller.
819    ///
820    /// It is recommended that this value be no smaller than 100, so as to not
821    /// unnecessarily limit parallelism. However, any value is legal, including
822    /// 0. If `max` is set to 0, then the remote will not be permitted to
823    /// initiate streams.
824    ///
825    /// Note that streams in the reserved state, i.e., push promises that have
826    /// been reserved but the stream has not started, do not count against this
827    /// setting.
828    ///
829    /// Also note that if the remote *does* exceed the value set here, it is not
830    /// a protocol level error. Instead, the `h2` library will immediately reset
831    /// the stream.
832    ///
833    /// See [Section 5.1.2] in the HTTP/2 spec for more details.
834    ///
835    /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
836    ///
837    /// # Examples
838    ///
839    /// ```
840    /// # use tokio::io::{AsyncRead, AsyncWrite};
841    /// # use h2::client::*;
842    /// # use bytes::Bytes;
843    /// #
844    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
845    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
846    /// # {
847    /// // `client_fut` is a future representing the completion of the HTTP/2
848    /// // handshake.
849    /// let client_fut = Builder::new()
850    ///     .max_concurrent_streams(1000)
851    ///     .handshake(my_io);
852    /// # client_fut.await
853    /// # }
854    /// #
855    /// # pub fn main() {}
856    /// ```
857    pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
858        self.settings.set_max_concurrent_streams(Some(max));
859        self
860    }
861
862    /// Sets the initial maximum of locally initiated (send) streams.
863    ///
864    /// The initial settings will be overwritten by the remote peer when
865    /// the SETTINGS frame is received. The new value will be set to the
866    /// `max_concurrent_streams()` from the frame. If no value is advertised in
867    /// the initial SETTINGS frame from the remote peer as part of
868    /// [HTTP/2 Connection Preface], `usize::MAX` will be set.
869    ///
870    /// This setting prevents the caller from exceeding this number of
871    /// streams that are counted towards the concurrency limit.
872    ///
873    /// Sending streams past the limit returned by the peer will be treated
874    /// as a stream error of type PROTOCOL_ERROR or REFUSED_STREAM.
875    ///
876    /// See [Section 5.1.2] in the HTTP/2 spec for more details.
877    ///
878    /// The default value is `usize::MAX`.
879    ///
880    /// [HTTP/2 Connection Preface]: https://httpwg.org/specs/rfc9113.html#preface
881    /// [Section 5.1.2]: https://httpwg.org/specs/rfc9113.html#rfc.section.5.1.2
882    ///
883    /// # Examples
884    ///
885    /// ```
886    /// # use tokio::io::{AsyncRead, AsyncWrite};
887    /// # use h2::client::*;
888    /// # use bytes::Bytes;
889    /// #
890    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
891    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
892    /// # {
893    /// // `client_fut` is a future representing the completion of the HTTP/2
894    /// // handshake.
895    /// let client_fut = Builder::new()
896    ///     .initial_max_send_streams(1000)
897    ///     .handshake(my_io);
898    /// # client_fut.await
899    /// # }
900    /// #
901    /// # pub fn main() {}
902    /// ```
903    pub fn initial_max_send_streams(&mut self, initial: usize) -> &mut Self {
904        self.initial_max_send_streams = initial;
905        self
906    }
907
908    /// Sets the maximum number of concurrent locally reset streams.
909    ///
910    /// When a stream is explicitly reset, the HTTP/2 specification requires
911    /// that any further frames received for that stream must be ignored for
912    /// "some time".
913    ///
914    /// In order to satisfy the specification, internal state must be maintained
915    /// to implement the behavior. This state grows linearly with the number of
916    /// streams that are locally reset.
917    ///
918    /// The `max_concurrent_reset_streams` setting configures sets an upper
919    /// bound on the amount of state that is maintained. When this max value is
920    /// reached, the oldest reset stream is purged from memory.
921    ///
922    /// Once the stream has been fully purged from memory, any additional frames
923    /// received for that stream will result in a connection level protocol
924    /// error, forcing the connection to terminate.
925    ///
926    /// The default value is 10.
927    ///
928    /// # Examples
929    ///
930    /// ```
931    /// # use tokio::io::{AsyncRead, AsyncWrite};
932    /// # use h2::client::*;
933    /// # use bytes::Bytes;
934    /// #
935    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
936    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
937    /// # {
938    /// // `client_fut` is a future representing the completion of the HTTP/2
939    /// // handshake.
940    /// let client_fut = Builder::new()
941    ///     .max_concurrent_reset_streams(1000)
942    ///     .handshake(my_io);
943    /// # client_fut.await
944    /// # }
945    /// #
946    /// # pub fn main() {}
947    /// ```
948    pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
949        self.reset_stream_max = max;
950        self
951    }
952
953    /// Sets the duration to remember locally reset streams.
954    ///
955    /// When a stream is explicitly reset, the HTTP/2 specification requires
956    /// that any further frames received for that stream must be ignored for
957    /// "some time".
958    ///
959    /// In order to satisfy the specification, internal state must be maintained
960    /// to implement the behavior. This state grows linearly with the number of
961    /// streams that are locally reset.
962    ///
963    /// The `reset_stream_duration` setting configures the max amount of time
964    /// this state will be maintained in memory. Once the duration elapses, the
965    /// stream state is purged from memory.
966    ///
967    /// Once the stream has been fully purged from memory, any additional frames
968    /// received for that stream will result in a connection level protocol
969    /// error, forcing the connection to terminate.
970    ///
971    /// The default value is 30 seconds.
972    ///
973    /// # Examples
974    ///
975    /// ```
976    /// # use tokio::io::{AsyncRead, AsyncWrite};
977    /// # use h2::client::*;
978    /// # use std::time::Duration;
979    /// # use bytes::Bytes;
980    /// #
981    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
982    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
983    /// # {
984    /// // `client_fut` is a future representing the completion of the HTTP/2
985    /// // handshake.
986    /// let client_fut = Builder::new()
987    ///     .reset_stream_duration(Duration::from_secs(10))
988    ///     .handshake(my_io);
989    /// # client_fut.await
990    /// # }
991    /// #
992    /// # pub fn main() {}
993    /// ```
994    pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
995        self.reset_stream_duration = dur;
996        self
997    }
998
999    /// Sets the maximum number of local resets due to protocol errors made by the remote end.
1000    ///
1001    /// Invalid frames and many other protocol errors will lead to resets being generated for those streams.
1002    /// Too many of these often indicate a malicious client, and there are attacks which can abuse this to DOS servers.
1003    /// This limit protects against these DOS attacks by limiting the amount of resets we can be forced to generate.
1004    ///
1005    /// When the number of local resets exceeds this threshold, the client will close the connection.
1006    ///
1007    /// If you really want to disable this, supply [`Option::None`] here.
1008    /// Disabling this is not recommended and may expose you to DOS attacks.
1009    ///
1010    /// The default value is currently 1024, but could change.
1011    pub fn max_local_error_reset_streams(&mut self, max: Option<usize>) -> &mut Self {
1012        self.local_max_error_reset_streams = max;
1013        self
1014    }
1015
1016    /// Sets the maximum number of pending-accept remotely-reset streams.
1017    ///
1018    /// Streams that have been received by the peer, but not accepted by the
1019    /// user, can also receive a RST_STREAM. This is a legitimate pattern: one
1020    /// could send a request and then shortly after, realize it is not needed,
1021    /// sending a CANCEL.
1022    ///
1023    /// However, since those streams are now "closed", they don't count towards
1024    /// the max concurrent streams. So, they will sit in the accept queue,
1025    /// using memory.
1026    ///
1027    /// When the number of remotely-reset streams sitting in the pending-accept
1028    /// queue reaches this maximum value, a connection error with the code of
1029    /// `ENHANCE_YOUR_CALM` will be sent to the peer, and returned by the
1030    /// `Future`.
1031    ///
1032    /// The default value is currently 20, but could change.
1033    ///
1034    /// # Examples
1035    ///
1036    /// ```
1037    /// # use tokio::io::{AsyncRead, AsyncWrite};
1038    /// # use h2::client::*;
1039    /// # use bytes::Bytes;
1040    /// #
1041    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1042    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1043    /// # {
1044    /// // `client_fut` is a future representing the completion of the HTTP/2
1045    /// // handshake.
1046    /// let client_fut = Builder::new()
1047    ///     .max_pending_accept_reset_streams(100)
1048    ///     .handshake(my_io);
1049    /// # client_fut.await
1050    /// # }
1051    /// #
1052    /// # pub fn main() {}
1053    /// ```
1054    pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self {
1055        self.pending_accept_reset_stream_max = max;
1056        self
1057    }
1058
1059    /// Sets the maximum send buffer size per stream.
1060    ///
1061    /// Once a stream has buffered up to (or over) the maximum, the stream's
1062    /// flow control will not "poll" additional capacity. Once bytes for the
1063    /// stream have been written to the connection, the send buffer capacity
1064    /// will be freed up again.
1065    ///
1066    /// The default is currently ~400KB, but may change.
1067    ///
1068    /// # Panics
1069    ///
1070    /// This function panics if `max` is larger than `u32::MAX`.
1071    pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
1072        assert!(max <= u32::MAX as usize);
1073        self.max_send_buffer_size = max;
1074        self
1075    }
1076
1077    /// Enables or disables server push promises.
1078    ///
1079    /// This value is included in the initial SETTINGS handshake.
1080    /// Setting this value to value to
1081    /// false in the initial SETTINGS handshake guarantees that the remote server
1082    /// will never send a push promise.
1083    ///
1084    /// This setting can be changed during the life of a single HTTP/2
1085    /// connection by sending another settings frame updating the value.
1086    ///
1087    /// Default value: `true`.
1088    ///
1089    /// # Examples
1090    ///
1091    /// ```
1092    /// # use tokio::io::{AsyncRead, AsyncWrite};
1093    /// # use h2::client::*;
1094    /// # use std::time::Duration;
1095    /// # use bytes::Bytes;
1096    /// #
1097    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1098    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1099    /// # {
1100    /// // `client_fut` is a future representing the completion of the HTTP/2
1101    /// // handshake.
1102    /// let client_fut = Builder::new()
1103    ///     .enable_push(false)
1104    ///     .handshake(my_io);
1105    /// # client_fut.await
1106    /// # }
1107    /// #
1108    /// # pub fn main() {}
1109    /// ```
1110    pub fn enable_push(&mut self, enabled: bool) -> &mut Self {
1111        self.settings.set_enable_push(enabled);
1112        self
1113    }
1114
1115    /// Sets the header table size.
1116    ///
1117    /// This setting informs the peer of the maximum size of the header compression
1118    /// table used to encode header blocks, in octets. The encoder may select any value
1119    /// equal to or less than the header table size specified by the sender.
1120    ///
1121    /// The default value is 4,096.
1122    ///
1123    /// # Examples
1124    ///
1125    /// ```
1126    /// # use tokio::io::{AsyncRead, AsyncWrite};
1127    /// # use h2::client::*;
1128    /// # use bytes::Bytes;
1129    /// #
1130    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1131    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1132    /// # {
1133    /// // `client_fut` is a future representing the completion of the HTTP/2
1134    /// // handshake.
1135    /// let client_fut = Builder::new()
1136    ///     .header_table_size(1_000_000)
1137    ///     .handshake(my_io);
1138    /// # client_fut.await
1139    /// # }
1140    /// #
1141    /// # pub fn main() {}
1142    /// ```
1143    pub fn header_table_size(&mut self, size: u32) -> &mut Self {
1144        self.settings.set_header_table_size(Some(size));
1145        self
1146    }
1147
1148    /// Sets the first stream ID to something other than 1.
1149    #[cfg(feature = "unstable")]
1150    pub fn initial_stream_id(&mut self, stream_id: u32) -> &mut Self {
1151        self.stream_id = stream_id.into();
1152        assert!(
1153            self.stream_id.is_client_initiated(),
1154            "stream id must be odd"
1155        );
1156        self
1157    }
1158
1159    /// Creates a new configured HTTP/2 client backed by `io`.
1160    ///
1161    /// It is expected that `io` already be in an appropriate state to commence
1162    /// the [HTTP/2 handshake]. The handshake is completed once both the connection
1163    /// preface and the initial settings frame is sent by the client.
1164    ///
1165    /// The handshake future does not wait for the initial settings frame from the
1166    /// server.
1167    ///
1168    /// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1169    /// tuple once the HTTP/2 handshake has been completed.
1170    ///
1171    /// This function also allows the caller to configure the send payload data
1172    /// type. See [Outbound data type] for more details.
1173    ///
1174    /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1175    /// [`Connection`]: struct.Connection.html
1176    /// [`SendRequest`]: struct.SendRequest.html
1177    /// [Outbound data type]: ../index.html#outbound-data-type.
1178    ///
1179    /// # Examples
1180    ///
1181    /// Basic usage:
1182    ///
1183    /// ```
1184    /// # use tokio::io::{AsyncRead, AsyncWrite};
1185    /// # use h2::client::*;
1186    /// # use bytes::Bytes;
1187    /// #
1188    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1189    ///     -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1190    /// # {
1191    /// // `client_fut` is a future representing the completion of the HTTP/2
1192    /// // handshake.
1193    /// let client_fut = Builder::new()
1194    ///     .handshake(my_io);
1195    /// # client_fut.await
1196    /// # }
1197    /// #
1198    /// # pub fn main() {}
1199    /// ```
1200    ///
1201    /// Configures the send-payload data type. In this case, the outbound data
1202    /// type will be `&'static [u8]`.
1203    ///
1204    /// ```
1205    /// # use tokio::io::{AsyncRead, AsyncWrite};
1206    /// # use h2::client::*;
1207    /// #
1208    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1209    /// # -> Result<((SendRequest<&'static [u8]>, Connection<T, &'static [u8]>)), h2::Error>
1210    /// # {
1211    /// // `client_fut` is a future representing the completion of the HTTP/2
1212    /// // handshake.
1213    /// let client_fut = Builder::new()
1214    ///     .handshake::<_, &'static [u8]>(my_io);
1215    /// # client_fut.await
1216    /// # }
1217    /// #
1218    /// # pub fn main() {}
1219    /// ```
1220    pub fn handshake<T, B>(
1221        &self,
1222        io: T,
1223    ) -> impl Future<Output = Result<(SendRequest<B>, Connection<T, B>), crate::Error>>
1224    where
1225        T: AsyncRead + AsyncWrite + Unpin,
1226        B: Buf,
1227    {
1228        Connection::handshake2(io, self.clone())
1229    }
1230}
1231
1232impl Default for Builder {
1233    fn default() -> Builder {
1234        Builder::new()
1235    }
1236}
1237
1238/// Creates a new configured HTTP/2 client with default configuration
1239/// values backed by `io`.
1240///
1241/// It is expected that `io` already be in an appropriate state to commence
1242/// the [HTTP/2 handshake]. See [Handshake] for more details.
1243///
1244/// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1245/// tuple once the HTTP/2 handshake has been completed. The returned
1246/// [`Connection`] instance will be using default configuration values. Use
1247/// [`Builder`] to customize the configuration values used by a [`Connection`]
1248/// instance.
1249///
1250/// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1251/// [Handshake]: ../index.html#handshake
1252/// [`Connection`]: struct.Connection.html
1253/// [`SendRequest`]: struct.SendRequest.html
1254///
1255/// # Examples
1256///
1257/// ```
1258/// # use tokio::io::{AsyncRead, AsyncWrite};
1259/// # use h2::client;
1260/// # use h2::client::*;
1261/// #
1262/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) -> Result<(), h2::Error>
1263/// # {
1264/// let (send_request, connection) = client::handshake(my_io).await?;
1265/// // The HTTP/2 handshake has completed, now start polling
1266/// // `connection` and use `send_request` to send requests to the
1267/// // server.
1268/// # Ok(())
1269/// # }
1270/// #
1271/// # pub fn main() {}
1272/// ```
1273pub async fn handshake<T>(io: T) -> Result<(SendRequest<Bytes>, Connection<T, Bytes>), crate::Error>
1274where
1275    T: AsyncRead + AsyncWrite + Unpin,
1276{
1277    let builder = Builder::new();
1278    builder
1279        .handshake(io)
1280        .instrument(tracing::trace_span!("client_handshake"))
1281        .await
1282}
1283
1284// ===== impl Connection =====
1285
1286async fn bind_connection<T>(io: &mut T) -> Result<(), crate::Error>
1287where
1288    T: AsyncRead + AsyncWrite + Unpin,
1289{
1290    tracing::debug!("binding client connection");
1291
1292    let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
1293    io.write_all(msg).await.map_err(crate::Error::from_io)?;
1294
1295    tracing::debug!("client connection bound");
1296
1297    Ok(())
1298}
1299
1300impl<T, B> Connection<T, B>
1301where
1302    T: AsyncRead + AsyncWrite + Unpin,
1303    B: Buf,
1304{
1305    async fn handshake2(
1306        mut io: T,
1307        builder: Builder,
1308    ) -> Result<(SendRequest<B>, Connection<T, B>), crate::Error> {
1309        bind_connection(&mut io).await?;
1310
1311        // Create the codec
1312        let mut codec = Codec::new(io);
1313
1314        if let Some(max) = builder.settings.max_frame_size() {
1315            codec.set_max_recv_frame_size(max as usize);
1316        }
1317
1318        if let Some(max) = builder.settings.max_header_list_size() {
1319            codec.set_max_recv_header_list_size(max as usize);
1320        }
1321
1322        // Send initial settings frame
1323        codec
1324            .buffer(builder.settings.clone().into())
1325            .expect("invalid SETTINGS frame");
1326
1327        let inner = proto::Connection::new(
1328            codec,
1329            proto::Config {
1330                next_stream_id: builder.stream_id,
1331                initial_max_send_streams: builder.initial_max_send_streams,
1332                max_send_buffer_size: builder.max_send_buffer_size,
1333                reset_stream_duration: builder.reset_stream_duration,
1334                reset_stream_max: builder.reset_stream_max,
1335                remote_reset_stream_max: builder.pending_accept_reset_stream_max,
1336                local_error_reset_streams_max: builder.local_max_error_reset_streams,
1337                settings: builder.settings.clone(),
1338            },
1339        );
1340        let send_request = SendRequest {
1341            inner: inner.streams().clone(),
1342            pending: None,
1343        };
1344
1345        let mut connection = Connection { inner };
1346        if let Some(sz) = builder.initial_target_connection_window_size {
1347            connection.set_target_window_size(sz);
1348        }
1349
1350        Ok((send_request, connection))
1351    }
1352
1353    /// Sets the target window size for the whole connection.
1354    ///
1355    /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
1356    /// frame will be immediately sent to the remote, increasing the connection
1357    /// level window by `size - current_value`.
1358    ///
1359    /// If `size` is less than the current value, nothing will happen
1360    /// immediately. However, as window capacity is released by
1361    /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
1362    /// out until the number of "in flight" bytes drops below `size`.
1363    ///
1364    /// The default value is 65,535.
1365    ///
1366    /// See [`FlowControl`] documentation for more details.
1367    ///
1368    /// [`FlowControl`]: ../struct.FlowControl.html
1369    /// [library level]: ../index.html#flow-control
1370    pub fn set_target_window_size(&mut self, size: u32) {
1371        assert!(size <= proto::MAX_WINDOW_SIZE);
1372        self.inner.set_target_window_size(size);
1373    }
1374
1375    /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
1376    /// flow control for received data.
1377    ///
1378    /// The `SETTINGS` will be sent to the remote, and only applied once the
1379    /// remote acknowledges the change.
1380    ///
1381    /// This can be used to increase or decrease the window size for existing
1382    /// streams.
1383    ///
1384    /// # Errors
1385    ///
1386    /// Returns an error if a previous call is still pending acknowledgement
1387    /// from the remote endpoint.
1388    pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
1389        assert!(size <= proto::MAX_WINDOW_SIZE);
1390        self.inner.set_initial_window_size(size)?;
1391        Ok(())
1392    }
1393
1394    /// Takes a `PingPong` instance from the connection.
1395    ///
1396    /// # Note
1397    ///
1398    /// This may only be called once. Calling multiple times will return `None`.
1399    pub fn ping_pong(&mut self) -> Option<PingPong> {
1400        self.inner.take_user_pings().map(PingPong::new)
1401    }
1402
1403    /// Returns the maximum number of concurrent streams that may be initiated
1404    /// by this client.
1405    ///
1406    /// This limit is configured by the server peer by sending the
1407    /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame.
1408    /// This method returns the currently acknowledged value received from the
1409    /// remote.
1410    ///
1411    /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
1412    pub fn max_concurrent_send_streams(&self) -> usize {
1413        self.inner.max_send_streams()
1414    }
1415    /// Returns the maximum number of concurrent streams that may be initiated
1416    /// by the server on this connection.
1417    ///
1418    /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS`
1419    /// parameter][1] sent in a `SETTINGS` frame that has been
1420    /// acknowledged by the remote peer. The value to be sent is configured by
1421    /// the [`Builder::max_concurrent_streams`][2] method before handshaking
1422    /// with the remote peer.
1423    ///
1424    /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
1425    /// [2]: ../struct.Builder.html#method.max_concurrent_streams
1426    pub fn max_concurrent_recv_streams(&self) -> usize {
1427        self.inner.max_recv_streams()
1428    }
1429}
1430
1431impl<T, B> Future for Connection<T, B>
1432where
1433    T: AsyncRead + AsyncWrite + Unpin,
1434    B: Buf,
1435{
1436    type Output = Result<(), crate::Error>;
1437
1438    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1439        self.inner.maybe_close_connection_if_no_streams();
1440        let had_streams_or_refs = self.inner.has_streams_or_other_references();
1441        let result = self.inner.poll(cx).map_err(Into::into);
1442        // if we had streams/refs, and don't anymore, wake up one more time to
1443        // ensure proper shutdown
1444        if result.is_pending()
1445            && had_streams_or_refs
1446            && !self.inner.has_streams_or_other_references()
1447        {
1448            tracing::trace!("last stream closed during poll, wake again");
1449            cx.waker().wake_by_ref();
1450        }
1451        result
1452    }
1453}
1454
1455impl<T, B> fmt::Debug for Connection<T, B>
1456where
1457    T: AsyncRead + AsyncWrite,
1458    T: fmt::Debug,
1459    B: fmt::Debug + Buf,
1460{
1461    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1462        fmt::Debug::fmt(&self.inner, fmt)
1463    }
1464}
1465
1466// ===== impl ResponseFuture =====
1467
1468impl Future for ResponseFuture {
1469    type Output = Result<Response<RecvStream>, crate::Error>;
1470
1471    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1472        let (parts, _) = ready!(self.inner.poll_response(cx))?.into_parts();
1473        let body = RecvStream::new(FlowControl::new(self.inner.clone()));
1474
1475        Poll::Ready(Ok(Response::from_parts(parts, body)))
1476    }
1477}
1478
1479impl ResponseFuture {
1480    /// Returns the stream ID of the response stream.
1481    ///
1482    /// # Panics
1483    ///
1484    /// If the lock on the stream store has been poisoned.
1485    pub fn stream_id(&self) -> crate::StreamId {
1486        crate::StreamId::from_internal(self.inner.stream_id())
1487    }
1488    /// Returns a stream of PushPromises
1489    ///
1490    /// # Panics
1491    ///
1492    /// If this method has been called before
1493    /// or the stream was itself was pushed
1494    pub fn push_promises(&mut self) -> PushPromises {
1495        if self.push_promise_consumed {
1496            panic!("Reference to push promises stream taken!");
1497        }
1498        self.push_promise_consumed = true;
1499        PushPromises {
1500            inner: self.inner.clone(),
1501        }
1502    }
1503}
1504
1505// ===== impl PushPromises =====
1506
1507impl PushPromises {
1508    /// Get the next `PushPromise`.
1509    pub async fn push_promise(&mut self) -> Option<Result<PushPromise, crate::Error>> {
1510        crate::poll_fn(move |cx| self.poll_push_promise(cx)).await
1511    }
1512
1513    #[doc(hidden)]
1514    pub fn poll_push_promise(
1515        &mut self,
1516        cx: &mut Context<'_>,
1517    ) -> Poll<Option<Result<PushPromise, crate::Error>>> {
1518        match self.inner.poll_pushed(cx) {
1519            Poll::Ready(Some(Ok((request, response)))) => {
1520                let response = PushedResponseFuture {
1521                    inner: ResponseFuture {
1522                        inner: response,
1523                        push_promise_consumed: false,
1524                    },
1525                };
1526                Poll::Ready(Some(Ok(PushPromise { request, response })))
1527            }
1528            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))),
1529            Poll::Ready(None) => Poll::Ready(None),
1530            Poll::Pending => Poll::Pending,
1531        }
1532    }
1533}
1534
1535#[cfg(feature = "stream")]
1536impl futures_core::Stream for PushPromises {
1537    type Item = Result<PushPromise, crate::Error>;
1538
1539    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1540        self.poll_push_promise(cx)
1541    }
1542}
1543
1544// ===== impl PushPromise =====
1545
1546impl PushPromise {
1547    /// Returns a reference to the push promise's request headers.
1548    pub fn request(&self) -> &Request<()> {
1549        &self.request
1550    }
1551
1552    /// Returns a mutable reference to the push promise's request headers.
1553    pub fn request_mut(&mut self) -> &mut Request<()> {
1554        &mut self.request
1555    }
1556
1557    /// Consumes `self`, returning the push promise's request headers and
1558    /// response future.
1559    pub fn into_parts(self) -> (Request<()>, PushedResponseFuture) {
1560        (self.request, self.response)
1561    }
1562}
1563
1564// ===== impl PushedResponseFuture =====
1565
1566impl Future for PushedResponseFuture {
1567    type Output = Result<Response<RecvStream>, crate::Error>;
1568
1569    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1570        Pin::new(&mut self.inner).poll(cx)
1571    }
1572}
1573
1574impl PushedResponseFuture {
1575    /// Returns the stream ID of the response stream.
1576    ///
1577    /// # Panics
1578    ///
1579    /// If the lock on the stream store has been poisoned.
1580    pub fn stream_id(&self) -> crate::StreamId {
1581        self.inner.stream_id()
1582    }
1583}
1584
1585// ===== impl Peer =====
1586
1587impl Peer {
1588    pub fn convert_send_message(
1589        id: StreamId,
1590        request: Request<()>,
1591        protocol: Option<Protocol>,
1592        end_of_stream: bool,
1593    ) -> Result<Headers, SendError> {
1594        use http::request::Parts;
1595
1596        let (
1597            Parts {
1598                method,
1599                uri,
1600                headers,
1601                version,
1602                ..
1603            },
1604            _,
1605        ) = request.into_parts();
1606
1607        let is_connect = method == Method::CONNECT;
1608
1609        // Build the set pseudo header set. All requests will include `method`
1610        // and `path`.
1611        let mut pseudo = Pseudo::request(method, uri, protocol);
1612
1613        if pseudo.scheme.is_none() {
1614            // If the scheme is not set, then there are a two options.
1615            //
1616            // 1) Authority is not set. In this case, a request was issued with
1617            //    a relative URI. This is permitted **only** when forwarding
1618            //    HTTP 1.x requests. If the HTTP version is set to 2.0, then
1619            //    this is an error.
1620            //
1621            // 2) Authority is set, then the HTTP method *must* be CONNECT.
1622            //
1623            // It is not possible to have a scheme but not an authority set (the
1624            // `http` crate does not allow it).
1625            //
1626            if pseudo.authority.is_none() {
1627                if version == Version::HTTP_2 {
1628                    return Err(UserError::MissingUriSchemeAndAuthority.into());
1629                } else {
1630                    // This is acceptable as per the above comment. However,
1631                    // HTTP/2 requires that a scheme is set. Since we are
1632                    // forwarding an HTTP 1.1 request, the scheme is set to
1633                    // "http".
1634                    pseudo.set_scheme(uri::Scheme::HTTP);
1635                }
1636            } else if !is_connect {
1637                // TODO: Error
1638            }
1639        }
1640
1641        // Create the HEADERS frame
1642        let mut frame = Headers::new(id, pseudo, headers);
1643
1644        if end_of_stream {
1645            frame.set_end_stream()
1646        }
1647
1648        Ok(frame)
1649    }
1650}
1651
1652impl proto::Peer for Peer {
1653    type Poll = Response<()>;
1654
1655    const NAME: &'static str = "Client";
1656
1657    fn r#dyn() -> proto::DynPeer {
1658        proto::DynPeer::Client
1659    }
1660
1661    /*
1662    fn is_server() -> bool {
1663        false
1664    }
1665    */
1666
1667    fn convert_poll_message(
1668        pseudo: Pseudo,
1669        fields: HeaderMap,
1670        stream_id: StreamId,
1671    ) -> Result<Self::Poll, Error> {
1672        let mut b = Response::builder();
1673
1674        b = b.version(Version::HTTP_2);
1675
1676        if let Some(status) = pseudo.status {
1677            b = b.status(status);
1678        }
1679
1680        let mut response = match b.body(()) {
1681            Ok(response) => response,
1682            Err(_) => {
1683                // TODO: Should there be more specialized handling for different
1684                // kinds of errors
1685                return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1686            }
1687        };
1688
1689        *response.headers_mut() = fields;
1690
1691        Ok(response)
1692    }
1693}