hyper_util/server/conn/auto/
mod.rs

1//! Http1 or Http2 connection.
2
3pub mod upgrade;
4
5use futures_util::ready;
6use hyper::service::HttpService;
7use std::future::Future;
8use std::marker::PhantomPinned;
9use std::mem::MaybeUninit;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12use std::{error::Error as StdError, io, time::Duration};
13
14use bytes::Bytes;
15use http::{Request, Response};
16use http_body::Body;
17use hyper::{
18    body::Incoming,
19    rt::{Read, ReadBuf, Timer, Write},
20    service::Service,
21};
22
23#[cfg(feature = "http1")]
24use hyper::server::conn::http1;
25
26#[cfg(feature = "http2")]
27use hyper::{rt::bounds::Http2ServerConnExec, server::conn::http2};
28
29#[cfg(any(not(feature = "http2"), not(feature = "http1")))]
30use std::marker::PhantomData;
31
32use pin_project_lite::pin_project;
33
34use crate::common::rewind::Rewind;
35
36type Error = Box<dyn std::error::Error + Send + Sync>;
37
38type Result<T> = std::result::Result<T, Error>;
39
40const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
41
42/// Exactly equivalent to [`Http2ServerConnExec`].
43#[cfg(feature = "http2")]
44pub trait HttpServerConnExec<A, B: Body>: Http2ServerConnExec<A, B> {}
45
46#[cfg(feature = "http2")]
47impl<A, B: Body, T: Http2ServerConnExec<A, B>> HttpServerConnExec<A, B> for T {}
48
49/// Exactly equivalent to [`Http2ServerConnExec`].
50#[cfg(not(feature = "http2"))]
51pub trait HttpServerConnExec<A, B: Body> {}
52
53#[cfg(not(feature = "http2"))]
54impl<A, B: Body, T> HttpServerConnExec<A, B> for T {}
55
56/// Http1 or Http2 connection builder.
57#[derive(Clone, Debug)]
58pub struct Builder<E> {
59    #[cfg(feature = "http1")]
60    http1: http1::Builder,
61    #[cfg(feature = "http2")]
62    http2: http2::Builder<E>,
63    #[cfg(any(feature = "http1", feature = "http2"))]
64    version: Option<Version>,
65    #[cfg(not(feature = "http2"))]
66    _executor: E,
67}
68
69impl<E: Default> Default for Builder<E> {
70    fn default() -> Self {
71        Self::new(E::default())
72    }
73}
74
75impl<E> Builder<E> {
76    /// Create a new auto connection builder.
77    ///
78    /// `executor` parameter should be a type that implements
79    /// [`Executor`](hyper::rt::Executor) trait.
80    ///
81    /// # Example
82    ///
83    /// ```
84    /// use hyper_util::{
85    ///     rt::TokioExecutor,
86    ///     server::conn::auto,
87    /// };
88    ///
89    /// auto::Builder::new(TokioExecutor::new());
90    /// ```
91    pub fn new(executor: E) -> Self {
92        Self {
93            #[cfg(feature = "http1")]
94            http1: http1::Builder::new(),
95            #[cfg(feature = "http2")]
96            http2: http2::Builder::new(executor),
97            #[cfg(any(feature = "http1", feature = "http2"))]
98            version: None,
99            #[cfg(not(feature = "http2"))]
100            _executor: executor,
101        }
102    }
103
104    /// Http1 configuration.
105    #[cfg(feature = "http1")]
106    pub fn http1(&mut self) -> Http1Builder<'_, E> {
107        Http1Builder { inner: self }
108    }
109
110    /// Http2 configuration.
111    #[cfg(feature = "http2")]
112    pub fn http2(&mut self) -> Http2Builder<'_, E> {
113        Http2Builder { inner: self }
114    }
115
116    /// Only accepts HTTP/2
117    ///
118    /// Does not do anything if used with [`serve_connection_with_upgrades`]
119    ///
120    /// [`serve_connection_with_upgrades`]: Builder::serve_connection_with_upgrades
121    #[cfg(feature = "http2")]
122    pub fn http2_only(mut self) -> Self {
123        assert!(self.version.is_none());
124        self.version = Some(Version::H2);
125        self
126    }
127
128    /// Only accepts HTTP/1
129    ///
130    /// Does not do anything if used with [`serve_connection_with_upgrades`]
131    ///
132    /// [`serve_connection_with_upgrades`]: Builder::serve_connection_with_upgrades
133    #[cfg(feature = "http1")]
134    pub fn http1_only(mut self) -> Self {
135        assert!(self.version.is_none());
136        self.version = Some(Version::H1);
137        self
138    }
139
140    /// Returns `true` if this builder can serve an HTTP/1.1-based connection.
141    pub fn is_http1_available(&self) -> bool {
142        match self.version {
143            #[cfg(feature = "http1")]
144            Some(Version::H1) => true,
145            #[cfg(feature = "http2")]
146            Some(Version::H2) => false,
147            #[cfg(any(feature = "http1", feature = "http2"))]
148            _ => true,
149        }
150    }
151
152    /// Returns `true` if this builder can serve an HTTP/2-based connection.
153    pub fn is_http2_available(&self) -> bool {
154        match self.version {
155            #[cfg(feature = "http1")]
156            Some(Version::H1) => false,
157            #[cfg(feature = "http2")]
158            Some(Version::H2) => true,
159            #[cfg(any(feature = "http1", feature = "http2"))]
160            _ => true,
161        }
162    }
163
164    /// Bind a connection together with a [`Service`].
165    pub fn serve_connection<I, S, B>(&self, io: I, service: S) -> Connection<'_, I, S, E>
166    where
167        S: Service<Request<Incoming>, Response = Response<B>>,
168        S::Future: 'static,
169        S::Error: Into<Box<dyn StdError + Send + Sync>>,
170        B: Body + 'static,
171        B::Error: Into<Box<dyn StdError + Send + Sync>>,
172        I: Read + Write + Unpin + 'static,
173        E: HttpServerConnExec<S::Future, B>,
174    {
175        let state = match self.version {
176            #[cfg(feature = "http1")]
177            Some(Version::H1) => {
178                let io = Rewind::new_buffered(io, Bytes::new());
179                let conn = self.http1.serve_connection(io, service);
180                ConnState::H1 { conn }
181            }
182            #[cfg(feature = "http2")]
183            Some(Version::H2) => {
184                let io = Rewind::new_buffered(io, Bytes::new());
185                let conn = self.http2.serve_connection(io, service);
186                ConnState::H2 { conn }
187            }
188            #[cfg(any(feature = "http1", feature = "http2"))]
189            _ => ConnState::ReadVersion {
190                read_version: read_version(io),
191                builder: Cow::Borrowed(self),
192                service: Some(service),
193            },
194        };
195
196        Connection { state }
197    }
198
199    /// Bind a connection together with a [`Service`], with the ability to
200    /// handle HTTP upgrades. This requires that the IO object implements
201    /// `Send`.
202    ///
203    /// Note that if you ever want to use [`hyper::upgrade::Upgraded::downcast`]
204    /// with this crate, you'll need to use [`hyper_util::server::conn::auto::upgrade::downcast`]
205    /// instead. See the documentation of the latter to understand why.
206    ///
207    /// [`hyper_util::server::conn::auto::upgrade::downcast`]: crate::server::conn::auto::upgrade::downcast
208    pub fn serve_connection_with_upgrades<I, S, B>(
209        &self,
210        io: I,
211        service: S,
212    ) -> UpgradeableConnection<'_, I, S, E>
213    where
214        S: Service<Request<Incoming>, Response = Response<B>>,
215        S::Future: 'static,
216        S::Error: Into<Box<dyn StdError + Send + Sync>>,
217        B: Body + 'static,
218        B::Error: Into<Box<dyn StdError + Send + Sync>>,
219        I: Read + Write + Unpin + Send + 'static,
220        E: HttpServerConnExec<S::Future, B>,
221    {
222        UpgradeableConnection {
223            state: UpgradeableConnState::ReadVersion {
224                read_version: read_version(io),
225                builder: Cow::Borrowed(self),
226                service: Some(service),
227            },
228        }
229    }
230}
231
232#[derive(Copy, Clone, Debug)]
233enum Version {
234    H1,
235    H2,
236}
237
238impl Version {
239    #[must_use]
240    #[cfg(any(not(feature = "http2"), not(feature = "http1")))]
241    pub fn unsupported(self) -> Error {
242        match self {
243            Version::H1 => Error::from("HTTP/1 is not supported"),
244            Version::H2 => Error::from("HTTP/2 is not supported"),
245        }
246    }
247}
248
249fn read_version<I>(io: I) -> ReadVersion<I>
250where
251    I: Read + Unpin,
252{
253    ReadVersion {
254        io: Some(io),
255        buf: [MaybeUninit::uninit(); 24],
256        filled: 0,
257        version: Version::H2,
258        cancelled: false,
259        _pin: PhantomPinned,
260    }
261}
262
263pin_project! {
264    struct ReadVersion<I> {
265        io: Option<I>,
266        buf: [MaybeUninit<u8>; 24],
267        // the amount of `buf` thats been filled
268        filled: usize,
269        version: Version,
270        cancelled: bool,
271        // Make this future `!Unpin` for compatibility with async trait methods.
272        #[pin]
273        _pin: PhantomPinned,
274    }
275}
276
277impl<I> ReadVersion<I> {
278    pub fn cancel(self: Pin<&mut Self>) {
279        *self.project().cancelled = true;
280    }
281}
282
283impl<I> Future for ReadVersion<I>
284where
285    I: Read + Unpin,
286{
287    type Output = io::Result<(Version, Rewind<I>)>;
288
289    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
290        let this = self.project();
291        if *this.cancelled {
292            return Poll::Ready(Err(io::Error::new(io::ErrorKind::Interrupted, "Cancelled")));
293        }
294
295        let mut buf = ReadBuf::uninit(&mut *this.buf);
296        // SAFETY: `this.filled` tracks how many bytes have been read (and thus initialized) and
297        // we're only advancing by that many.
298        unsafe {
299            buf.unfilled().advance(*this.filled);
300        };
301
302        // We start as H2 and switch to H1 as soon as we don't have the preface.
303        while buf.filled().len() < H2_PREFACE.len() {
304            let len = buf.filled().len();
305            ready!(Pin::new(this.io.as_mut().unwrap()).poll_read(cx, buf.unfilled()))?;
306            *this.filled = buf.filled().len();
307
308            // We starts as H2 and switch to H1 when we don't get the preface.
309            if buf.filled().len() == len
310                || buf.filled()[len..] != H2_PREFACE[len..buf.filled().len()]
311            {
312                *this.version = Version::H1;
313                break;
314            }
315        }
316
317        let io = this.io.take().unwrap();
318        let buf = buf.filled().to_vec();
319        Poll::Ready(Ok((
320            *this.version,
321            Rewind::new_buffered(io, Bytes::from(buf)),
322        )))
323    }
324}
325
326pin_project! {
327    /// A [`Future`](core::future::Future) representing an HTTP/1 connection, returned from
328    /// [`Builder::serve_connection`](struct.Builder.html#method.serve_connection).
329    ///
330    /// To drive HTTP on this connection this future **must be polled**, typically with
331    /// `.await`. If it isn't polled, no progress will be made on this connection.
332    #[must_use = "futures do nothing unless polled"]
333    pub struct Connection<'a, I, S, E>
334    where
335        S: HttpService<Incoming>,
336    {
337        #[pin]
338        state: ConnState<'a, I, S, E>,
339    }
340}
341
342// A custom COW, since the libstd is has ToOwned bounds that are too eager.
343enum Cow<'a, T> {
344    Borrowed(&'a T),
345    Owned(T),
346}
347
348impl<T> std::ops::Deref for Cow<'_, T> {
349    type Target = T;
350    fn deref(&self) -> &T {
351        match self {
352            Cow::Borrowed(t) => &*t,
353            Cow::Owned(ref t) => t,
354        }
355    }
356}
357
358#[cfg(feature = "http1")]
359type Http1Connection<I, S> = hyper::server::conn::http1::Connection<Rewind<I>, S>;
360
361#[cfg(not(feature = "http1"))]
362type Http1Connection<I, S> = (PhantomData<I>, PhantomData<S>);
363
364#[cfg(feature = "http2")]
365type Http2Connection<I, S, E> = hyper::server::conn::http2::Connection<Rewind<I>, S, E>;
366
367#[cfg(not(feature = "http2"))]
368type Http2Connection<I, S, E> = (PhantomData<I>, PhantomData<S>, PhantomData<E>);
369
370pin_project! {
371    #[project = ConnStateProj]
372    enum ConnState<'a, I, S, E>
373    where
374        S: HttpService<Incoming>,
375    {
376        ReadVersion {
377            #[pin]
378            read_version: ReadVersion<I>,
379            builder: Cow<'a, Builder<E>>,
380            service: Option<S>,
381        },
382        H1 {
383            #[pin]
384            conn: Http1Connection<I, S>,
385        },
386        H2 {
387            #[pin]
388            conn: Http2Connection<I, S, E>,
389        },
390    }
391}
392
393impl<I, S, E, B> Connection<'_, I, S, E>
394where
395    S: HttpService<Incoming, ResBody = B>,
396    S::Error: Into<Box<dyn StdError + Send + Sync>>,
397    I: Read + Write + Unpin,
398    B: Body + 'static,
399    B::Error: Into<Box<dyn StdError + Send + Sync>>,
400    E: HttpServerConnExec<S::Future, B>,
401{
402    /// Start a graceful shutdown process for this connection.
403    ///
404    /// This `Connection` should continue to be polled until shutdown can finish.
405    ///
406    /// # Note
407    ///
408    /// This should only be called while the `Connection` future is still pending. If called after
409    /// `Connection::poll` has resolved, this does nothing.
410    pub fn graceful_shutdown(self: Pin<&mut Self>) {
411        match self.project().state.project() {
412            ConnStateProj::ReadVersion { read_version, .. } => read_version.cancel(),
413            #[cfg(feature = "http1")]
414            ConnStateProj::H1 { conn } => conn.graceful_shutdown(),
415            #[cfg(feature = "http2")]
416            ConnStateProj::H2 { conn } => conn.graceful_shutdown(),
417            #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
418            _ => unreachable!(),
419        }
420    }
421
422    /// Make this Connection static, instead of borrowing from Builder.
423    pub fn into_owned(self) -> Connection<'static, I, S, E>
424    where
425        Builder<E>: Clone,
426    {
427        Connection {
428            state: match self.state {
429                ConnState::ReadVersion {
430                    read_version,
431                    builder,
432                    service,
433                } => ConnState::ReadVersion {
434                    read_version,
435                    service,
436                    builder: Cow::Owned(builder.clone()),
437                },
438                #[cfg(feature = "http1")]
439                ConnState::H1 { conn } => ConnState::H1 { conn },
440                #[cfg(feature = "http2")]
441                ConnState::H2 { conn } => ConnState::H2 { conn },
442                #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
443                _ => unreachable!(),
444            },
445        }
446    }
447}
448
449impl<I, S, E, B> Future for Connection<'_, I, S, E>
450where
451    S: Service<Request<Incoming>, Response = Response<B>>,
452    S::Future: 'static,
453    S::Error: Into<Box<dyn StdError + Send + Sync>>,
454    B: Body + 'static,
455    B::Error: Into<Box<dyn StdError + Send + Sync>>,
456    I: Read + Write + Unpin + 'static,
457    E: HttpServerConnExec<S::Future, B>,
458{
459    type Output = Result<()>;
460
461    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
462        loop {
463            let mut this = self.as_mut().project();
464
465            match this.state.as_mut().project() {
466                ConnStateProj::ReadVersion {
467                    read_version,
468                    builder,
469                    service,
470                } => {
471                    let (version, io) = ready!(read_version.poll(cx))?;
472                    let service = service.take().unwrap();
473                    match version {
474                        #[cfg(feature = "http1")]
475                        Version::H1 => {
476                            let conn = builder.http1.serve_connection(io, service);
477                            this.state.set(ConnState::H1 { conn });
478                        }
479                        #[cfg(feature = "http2")]
480                        Version::H2 => {
481                            let conn = builder.http2.serve_connection(io, service);
482                            this.state.set(ConnState::H2 { conn });
483                        }
484                        #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
485                        _ => return Poll::Ready(Err(version.unsupported())),
486                    }
487                }
488                #[cfg(feature = "http1")]
489                ConnStateProj::H1 { conn } => {
490                    return conn.poll(cx).map_err(Into::into);
491                }
492                #[cfg(feature = "http2")]
493                ConnStateProj::H2 { conn } => {
494                    return conn.poll(cx).map_err(Into::into);
495                }
496                #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
497                _ => unreachable!(),
498            }
499        }
500    }
501}
502
503pin_project! {
504    /// An upgradable [`Connection`], returned by
505    /// [`Builder::serve_upgradable_connection`](struct.Builder.html#method.serve_connection_with_upgrades).
506    ///
507    /// To drive HTTP on this connection this future **must be polled**, typically with
508    /// `.await`. If it isn't polled, no progress will be made on this connection.
509    #[must_use = "futures do nothing unless polled"]
510    pub struct UpgradeableConnection<'a, I, S, E>
511    where
512        S: HttpService<Incoming>,
513    {
514        #[pin]
515        state: UpgradeableConnState<'a, I, S, E>,
516    }
517}
518
519#[cfg(feature = "http1")]
520type Http1UpgradeableConnection<I, S> = hyper::server::conn::http1::UpgradeableConnection<I, S>;
521
522#[cfg(not(feature = "http1"))]
523type Http1UpgradeableConnection<I, S> = (PhantomData<I>, PhantomData<S>);
524
525pin_project! {
526    #[project = UpgradeableConnStateProj]
527    enum UpgradeableConnState<'a, I, S, E>
528    where
529        S: HttpService<Incoming>,
530    {
531        ReadVersion {
532            #[pin]
533            read_version: ReadVersion<I>,
534            builder: Cow<'a, Builder<E>>,
535            service: Option<S>,
536        },
537        H1 {
538            #[pin]
539            conn: Http1UpgradeableConnection<Rewind<I>, S>,
540        },
541        H2 {
542            #[pin]
543            conn: Http2Connection<I, S, E>,
544        },
545    }
546}
547
548impl<I, S, E, B> UpgradeableConnection<'_, I, S, E>
549where
550    S: HttpService<Incoming, ResBody = B>,
551    S::Error: Into<Box<dyn StdError + Send + Sync>>,
552    I: Read + Write + Unpin,
553    B: Body + 'static,
554    B::Error: Into<Box<dyn StdError + Send + Sync>>,
555    E: HttpServerConnExec<S::Future, B>,
556{
557    /// Start a graceful shutdown process for this connection.
558    ///
559    /// This `UpgradeableConnection` should continue to be polled until shutdown can finish.
560    ///
561    /// # Note
562    ///
563    /// This should only be called while the `Connection` future is still nothing. pending. If
564    /// called after `UpgradeableConnection::poll` has resolved, this does nothing.
565    pub fn graceful_shutdown(self: Pin<&mut Self>) {
566        match self.project().state.project() {
567            UpgradeableConnStateProj::ReadVersion { read_version, .. } => read_version.cancel(),
568            #[cfg(feature = "http1")]
569            UpgradeableConnStateProj::H1 { conn } => conn.graceful_shutdown(),
570            #[cfg(feature = "http2")]
571            UpgradeableConnStateProj::H2 { conn } => conn.graceful_shutdown(),
572            #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
573            _ => unreachable!(),
574        }
575    }
576
577    /// Make this Connection static, instead of borrowing from Builder.
578    pub fn into_owned(self) -> UpgradeableConnection<'static, I, S, E>
579    where
580        Builder<E>: Clone,
581    {
582        UpgradeableConnection {
583            state: match self.state {
584                UpgradeableConnState::ReadVersion {
585                    read_version,
586                    builder,
587                    service,
588                } => UpgradeableConnState::ReadVersion {
589                    read_version,
590                    service,
591                    builder: Cow::Owned(builder.clone()),
592                },
593                #[cfg(feature = "http1")]
594                UpgradeableConnState::H1 { conn } => UpgradeableConnState::H1 { conn },
595                #[cfg(feature = "http2")]
596                UpgradeableConnState::H2 { conn } => UpgradeableConnState::H2 { conn },
597                #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
598                _ => unreachable!(),
599            },
600        }
601    }
602}
603
604impl<I, S, E, B> Future for UpgradeableConnection<'_, I, S, E>
605where
606    S: Service<Request<Incoming>, Response = Response<B>>,
607    S::Future: 'static,
608    S::Error: Into<Box<dyn StdError + Send + Sync>>,
609    B: Body + 'static,
610    B::Error: Into<Box<dyn StdError + Send + Sync>>,
611    I: Read + Write + Unpin + Send + 'static,
612    E: HttpServerConnExec<S::Future, B>,
613{
614    type Output = Result<()>;
615
616    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
617        loop {
618            let mut this = self.as_mut().project();
619
620            match this.state.as_mut().project() {
621                UpgradeableConnStateProj::ReadVersion {
622                    read_version,
623                    builder,
624                    service,
625                } => {
626                    let (version, io) = ready!(read_version.poll(cx))?;
627                    let service = service.take().unwrap();
628                    match version {
629                        #[cfg(feature = "http1")]
630                        Version::H1 => {
631                            let conn = builder.http1.serve_connection(io, service).with_upgrades();
632                            this.state.set(UpgradeableConnState::H1 { conn });
633                        }
634                        #[cfg(feature = "http2")]
635                        Version::H2 => {
636                            let conn = builder.http2.serve_connection(io, service);
637                            this.state.set(UpgradeableConnState::H2 { conn });
638                        }
639                        #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
640                        _ => return Poll::Ready(Err(version.unsupported())),
641                    }
642                }
643                #[cfg(feature = "http1")]
644                UpgradeableConnStateProj::H1 { conn } => {
645                    return conn.poll(cx).map_err(Into::into);
646                }
647                #[cfg(feature = "http2")]
648                UpgradeableConnStateProj::H2 { conn } => {
649                    return conn.poll(cx).map_err(Into::into);
650                }
651                #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
652                _ => unreachable!(),
653            }
654        }
655    }
656}
657
658/// Http1 part of builder.
659#[cfg(feature = "http1")]
660pub struct Http1Builder<'a, E> {
661    inner: &'a mut Builder<E>,
662}
663
664#[cfg(feature = "http1")]
665impl<E> Http1Builder<'_, E> {
666    /// Http2 configuration.
667    #[cfg(feature = "http2")]
668    pub fn http2(&mut self) -> Http2Builder<'_, E> {
669        Http2Builder { inner: self.inner }
670    }
671
672    /// Set whether the `date` header should be included in HTTP responses.
673    ///
674    /// Note that including the `date` header is recommended by RFC 7231.
675    ///
676    /// Default is true.
677    pub fn auto_date_header(&mut self, enabled: bool) -> &mut Self {
678        self.inner.http1.auto_date_header(enabled);
679        self
680    }
681
682    /// Set whether HTTP/1 connections should support half-closures.
683    ///
684    /// Clients can chose to shutdown their write-side while waiting
685    /// for the server to respond. Setting this to `true` will
686    /// prevent closing the connection immediately if `read`
687    /// detects an EOF in the middle of a request.
688    ///
689    /// Default is `false`.
690    pub fn half_close(&mut self, val: bool) -> &mut Self {
691        self.inner.http1.half_close(val);
692        self
693    }
694
695    /// Enables or disables HTTP/1 keep-alive.
696    ///
697    /// Default is true.
698    pub fn keep_alive(&mut self, val: bool) -> &mut Self {
699        self.inner.http1.keep_alive(val);
700        self
701    }
702
703    /// Set whether HTTP/1 connections will write header names as title case at
704    /// the socket level.
705    ///
706    /// Note that this setting does not affect HTTP/2.
707    ///
708    /// Default is false.
709    pub fn title_case_headers(&mut self, enabled: bool) -> &mut Self {
710        self.inner.http1.title_case_headers(enabled);
711        self
712    }
713
714    /// Set whether HTTP/1 connections will silently ignored malformed header lines.
715    ///
716    /// If this is enabled and a header line does not start with a valid header
717    /// name, or does not include a colon at all, the line will be silently ignored
718    /// and no error will be reported.
719    ///
720    /// Default is false.
721    pub fn ignore_invalid_headers(&mut self, enabled: bool) -> &mut Self {
722        self.inner.http1.ignore_invalid_headers(enabled);
723        self
724    }
725
726    /// Set whether to support preserving original header cases.
727    ///
728    /// Currently, this will record the original cases received, and store them
729    /// in a private extension on the `Request`. It will also look for and use
730    /// such an extension in any provided `Response`.
731    ///
732    /// Since the relevant extension is still private, there is no way to
733    /// interact with the original cases. The only effect this can have now is
734    /// to forward the cases in a proxy-like fashion.
735    ///
736    /// Note that this setting does not affect HTTP/2.
737    ///
738    /// Default is false.
739    pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Self {
740        self.inner.http1.preserve_header_case(enabled);
741        self
742    }
743
744    /// Set the maximum number of headers.
745    ///
746    /// When a request is received, the parser will reserve a buffer to store headers for optimal
747    /// performance.
748    ///
749    /// If server receives more headers than the buffer size, it responds to the client with
750    /// "431 Request Header Fields Too Large".
751    ///
752    /// The headers is allocated on the stack by default, which has higher performance. After
753    /// setting this value, headers will be allocated in heap memory, that is, heap memory
754    /// allocation will occur for each request, and there will be a performance drop of about 5%.
755    ///
756    /// Note that this setting does not affect HTTP/2.
757    ///
758    /// Default is 100.
759    pub fn max_headers(&mut self, val: usize) -> &mut Self {
760        self.inner.http1.max_headers(val);
761        self
762    }
763
764    /// Set a timeout for reading client request headers. If a client does not
765    /// transmit the entire header within this time, the connection is closed.
766    ///
767    /// Requires a [`Timer`] set by [`Http1Builder::timer`] to take effect. Panics if `header_read_timeout` is configured
768    /// without a [`Timer`].
769    ///
770    /// Pass `None` to disable.
771    ///
772    /// Default is currently 30 seconds, but do not depend on that.
773    pub fn header_read_timeout(&mut self, read_timeout: impl Into<Option<Duration>>) -> &mut Self {
774        self.inner.http1.header_read_timeout(read_timeout);
775        self
776    }
777
778    /// Set whether HTTP/1 connections should try to use vectored writes,
779    /// or always flatten into a single buffer.
780    ///
781    /// Note that setting this to false may mean more copies of body data,
782    /// but may also improve performance when an IO transport doesn't
783    /// support vectored writes well, such as most TLS implementations.
784    ///
785    /// Setting this to true will force hyper to use queued strategy
786    /// which may eliminate unnecessary cloning on some TLS backends
787    ///
788    /// Default is `auto`. In this mode hyper will try to guess which
789    /// mode to use
790    pub fn writev(&mut self, val: bool) -> &mut Self {
791        self.inner.http1.writev(val);
792        self
793    }
794
795    /// Set the maximum buffer size for the connection.
796    ///
797    /// Default is ~400kb.
798    ///
799    /// # Panics
800    ///
801    /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
802    pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
803        self.inner.http1.max_buf_size(max);
804        self
805    }
806
807    /// Aggregates flushes to better support pipelined responses.
808    ///
809    /// Experimental, may have bugs.
810    ///
811    /// Default is false.
812    pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self {
813        self.inner.http1.pipeline_flush(enabled);
814        self
815    }
816
817    /// Set the timer used in background tasks.
818    pub fn timer<M>(&mut self, timer: M) -> &mut Self
819    where
820        M: Timer + Send + Sync + 'static,
821    {
822        self.inner.http1.timer(timer);
823        self
824    }
825
826    /// Bind a connection together with a [`Service`].
827    #[cfg(feature = "http2")]
828    pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
829    where
830        S: Service<Request<Incoming>, Response = Response<B>>,
831        S::Future: 'static,
832        S::Error: Into<Box<dyn StdError + Send + Sync>>,
833        B: Body + 'static,
834        B::Error: Into<Box<dyn StdError + Send + Sync>>,
835        I: Read + Write + Unpin + 'static,
836        E: HttpServerConnExec<S::Future, B>,
837    {
838        self.inner.serve_connection(io, service).await
839    }
840
841    /// Bind a connection together with a [`Service`].
842    #[cfg(not(feature = "http2"))]
843    pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
844    where
845        S: Service<Request<Incoming>, Response = Response<B>>,
846        S::Future: 'static,
847        S::Error: Into<Box<dyn StdError + Send + Sync>>,
848        B: Body + 'static,
849        B::Error: Into<Box<dyn StdError + Send + Sync>>,
850        I: Read + Write + Unpin + 'static,
851    {
852        self.inner.serve_connection(io, service).await
853    }
854
855    /// Bind a connection together with a [`Service`], with the ability to
856    /// handle HTTP upgrades. This requires that the IO object implements
857    /// `Send`.
858    #[cfg(feature = "http2")]
859    pub fn serve_connection_with_upgrades<I, S, B>(
860        &self,
861        io: I,
862        service: S,
863    ) -> UpgradeableConnection<'_, I, S, E>
864    where
865        S: Service<Request<Incoming>, Response = Response<B>>,
866        S::Future: 'static,
867        S::Error: Into<Box<dyn StdError + Send + Sync>>,
868        B: Body + 'static,
869        B::Error: Into<Box<dyn StdError + Send + Sync>>,
870        I: Read + Write + Unpin + Send + 'static,
871        E: HttpServerConnExec<S::Future, B>,
872    {
873        self.inner.serve_connection_with_upgrades(io, service)
874    }
875}
876
877/// Http2 part of builder.
878#[cfg(feature = "http2")]
879pub struct Http2Builder<'a, E> {
880    inner: &'a mut Builder<E>,
881}
882
883#[cfg(feature = "http2")]
884impl<E> Http2Builder<'_, E> {
885    #[cfg(feature = "http1")]
886    /// Http1 configuration.
887    pub fn http1(&mut self) -> Http1Builder<'_, E> {
888        Http1Builder { inner: self.inner }
889    }
890
891    /// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent.
892    ///
893    /// This will default to the default value set by the [`h2` crate](https://crates.io/crates/h2).
894    /// As of v0.4.0, it is 20.
895    ///
896    /// See <https://github.com/hyperium/hyper/issues/2877> for more information.
897    pub fn max_pending_accept_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self {
898        self.inner.http2.max_pending_accept_reset_streams(max);
899        self
900    }
901
902    /// Configures the maximum number of local reset streams allowed before a GOAWAY will be sent.
903    ///
904    /// If not set, hyper will use a default, currently of 1024.
905    ///
906    /// If `None` is supplied, hyper will not apply any limit.
907    /// This is not advised, as it can potentially expose servers to DOS vulnerabilities.
908    ///
909    /// See <https://rustsec.org/advisories/RUSTSEC-2024-0003.html> for more information.
910    pub fn max_local_error_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self {
911        self.inner.http2.max_local_error_reset_streams(max);
912        self
913    }
914
915    /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
916    /// stream-level flow control.
917    ///
918    /// Passing `None` will do nothing.
919    ///
920    /// If not set, hyper will use a default.
921    ///
922    /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
923    pub fn initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
924        self.inner.http2.initial_stream_window_size(sz);
925        self
926    }
927
928    /// Sets the max connection-level flow control for HTTP2.
929    ///
930    /// Passing `None` will do nothing.
931    ///
932    /// If not set, hyper will use a default.
933    pub fn initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
934        self.inner.http2.initial_connection_window_size(sz);
935        self
936    }
937
938    /// Sets whether to use an adaptive flow control.
939    ///
940    /// Enabling this will override the limits set in
941    /// `http2_initial_stream_window_size` and
942    /// `http2_initial_connection_window_size`.
943    pub fn adaptive_window(&mut self, enabled: bool) -> &mut Self {
944        self.inner.http2.adaptive_window(enabled);
945        self
946    }
947
948    /// Sets the maximum frame size to use for HTTP2.
949    ///
950    /// Passing `None` will do nothing.
951    ///
952    /// If not set, hyper will use a default.
953    pub fn max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
954        self.inner.http2.max_frame_size(sz);
955        self
956    }
957
958    /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2
959    /// connections.
960    ///
961    /// Default is 200. Passing `None` will remove any limit.
962    ///
963    /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS
964    pub fn max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self {
965        self.inner.http2.max_concurrent_streams(max);
966        self
967    }
968
969    /// Sets an interval for HTTP2 Ping frames should be sent to keep a
970    /// connection alive.
971    ///
972    /// Pass `None` to disable HTTP2 keep-alive.
973    ///
974    /// Default is currently disabled.
975    ///
976    /// # Cargo Feature
977    ///
978    pub fn keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self {
979        self.inner.http2.keep_alive_interval(interval);
980        self
981    }
982
983    /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
984    ///
985    /// If the ping is not acknowledged within the timeout, the connection will
986    /// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
987    ///
988    /// Default is 20 seconds.
989    ///
990    /// # Cargo Feature
991    ///
992    pub fn keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
993        self.inner.http2.keep_alive_timeout(timeout);
994        self
995    }
996
997    /// Set the maximum write buffer size for each HTTP/2 stream.
998    ///
999    /// Default is currently ~400KB, but may change.
1000    ///
1001    /// # Panics
1002    ///
1003    /// The value must be no larger than `u32::MAX`.
1004    pub fn max_send_buf_size(&mut self, max: usize) -> &mut Self {
1005        self.inner.http2.max_send_buf_size(max);
1006        self
1007    }
1008
1009    /// Enables the [extended CONNECT protocol].
1010    ///
1011    /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
1012    pub fn enable_connect_protocol(&mut self) -> &mut Self {
1013        self.inner.http2.enable_connect_protocol();
1014        self
1015    }
1016
1017    /// Sets the max size of received header frames.
1018    ///
1019    /// Default is currently ~16MB, but may change.
1020    pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
1021        self.inner.http2.max_header_list_size(max);
1022        self
1023    }
1024
1025    /// Set the timer used in background tasks.
1026    pub fn timer<M>(&mut self, timer: M) -> &mut Self
1027    where
1028        M: Timer + Send + Sync + 'static,
1029    {
1030        self.inner.http2.timer(timer);
1031        self
1032    }
1033
1034    /// Set whether the `date` header should be included in HTTP responses.
1035    ///
1036    /// Note that including the `date` header is recommended by RFC 7231.
1037    ///
1038    /// Default is true.
1039    pub fn auto_date_header(&mut self, enabled: bool) -> &mut Self {
1040        self.inner.http2.auto_date_header(enabled);
1041        self
1042    }
1043
1044    /// Bind a connection together with a [`Service`].
1045    pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
1046    where
1047        S: Service<Request<Incoming>, Response = Response<B>>,
1048        S::Future: 'static,
1049        S::Error: Into<Box<dyn StdError + Send + Sync>>,
1050        B: Body + 'static,
1051        B::Error: Into<Box<dyn StdError + Send + Sync>>,
1052        I: Read + Write + Unpin + 'static,
1053        E: HttpServerConnExec<S::Future, B>,
1054    {
1055        self.inner.serve_connection(io, service).await
1056    }
1057
1058    /// Bind a connection together with a [`Service`], with the ability to
1059    /// handle HTTP upgrades. This requires that the IO object implements
1060    /// `Send`.
1061    pub fn serve_connection_with_upgrades<I, S, B>(
1062        &self,
1063        io: I,
1064        service: S,
1065    ) -> UpgradeableConnection<'_, I, S, E>
1066    where
1067        S: Service<Request<Incoming>, Response = Response<B>>,
1068        S::Future: 'static,
1069        S::Error: Into<Box<dyn StdError + Send + Sync>>,
1070        B: Body + 'static,
1071        B::Error: Into<Box<dyn StdError + Send + Sync>>,
1072        I: Read + Write + Unpin + Send + 'static,
1073        E: HttpServerConnExec<S::Future, B>,
1074    {
1075        self.inner.serve_connection_with_upgrades(io, service)
1076    }
1077}
1078
1079#[cfg(test)]
1080mod tests {
1081    use crate::{
1082        rt::{TokioExecutor, TokioIo},
1083        server::conn::auto,
1084    };
1085    use http::{Request, Response};
1086    use http_body::Body;
1087    use http_body_util::{BodyExt, Empty, Full};
1088    use hyper::{body, body::Bytes, client, service::service_fn};
1089    use std::{convert::Infallible, error::Error as StdError, net::SocketAddr, time::Duration};
1090    use tokio::{
1091        net::{TcpListener, TcpStream},
1092        pin,
1093    };
1094
1095    const BODY: &[u8] = b"Hello, world!";
1096
1097    #[test]
1098    fn configuration() {
1099        // One liner.
1100        auto::Builder::new(TokioExecutor::new())
1101            .http1()
1102            .keep_alive(true)
1103            .http2()
1104            .keep_alive_interval(None);
1105        //  .serve_connection(io, service);
1106
1107        // Using variable.
1108        let mut builder = auto::Builder::new(TokioExecutor::new());
1109
1110        builder.http1().keep_alive(true);
1111        builder.http2().keep_alive_interval(None);
1112        // builder.serve_connection(io, service);
1113    }
1114
1115    #[cfg(not(miri))]
1116    #[tokio::test]
1117    async fn http1() {
1118        let addr = start_server(false, false).await;
1119        let mut sender = connect_h1(addr).await;
1120
1121        let response = sender
1122            .send_request(Request::new(Empty::<Bytes>::new()))
1123            .await
1124            .unwrap();
1125
1126        let body = response.into_body().collect().await.unwrap().to_bytes();
1127
1128        assert_eq!(body, BODY);
1129    }
1130
1131    #[cfg(not(miri))]
1132    #[tokio::test]
1133    async fn http2() {
1134        let addr = start_server(false, false).await;
1135        let mut sender = connect_h2(addr).await;
1136
1137        let response = sender
1138            .send_request(Request::new(Empty::<Bytes>::new()))
1139            .await
1140            .unwrap();
1141
1142        let body = response.into_body().collect().await.unwrap().to_bytes();
1143
1144        assert_eq!(body, BODY);
1145    }
1146
1147    #[cfg(not(miri))]
1148    #[tokio::test]
1149    async fn http2_only() {
1150        let addr = start_server(false, true).await;
1151        let mut sender = connect_h2(addr).await;
1152
1153        let response = sender
1154            .send_request(Request::new(Empty::<Bytes>::new()))
1155            .await
1156            .unwrap();
1157
1158        let body = response.into_body().collect().await.unwrap().to_bytes();
1159
1160        assert_eq!(body, BODY);
1161    }
1162
1163    #[cfg(not(miri))]
1164    #[tokio::test]
1165    async fn http2_only_fail_if_client_is_http1() {
1166        let addr = start_server(false, true).await;
1167        let mut sender = connect_h1(addr).await;
1168
1169        let _ = sender
1170            .send_request(Request::new(Empty::<Bytes>::new()))
1171            .await
1172            .expect_err("should fail");
1173    }
1174
1175    #[cfg(not(miri))]
1176    #[tokio::test]
1177    async fn http1_only() {
1178        let addr = start_server(true, false).await;
1179        let mut sender = connect_h1(addr).await;
1180
1181        let response = sender
1182            .send_request(Request::new(Empty::<Bytes>::new()))
1183            .await
1184            .unwrap();
1185
1186        let body = response.into_body().collect().await.unwrap().to_bytes();
1187
1188        assert_eq!(body, BODY);
1189    }
1190
1191    #[cfg(not(miri))]
1192    #[tokio::test]
1193    async fn http1_only_fail_if_client_is_http2() {
1194        let addr = start_server(true, false).await;
1195        let mut sender = connect_h2(addr).await;
1196
1197        let _ = sender
1198            .send_request(Request::new(Empty::<Bytes>::new()))
1199            .await
1200            .expect_err("should fail");
1201    }
1202
1203    #[cfg(not(miri))]
1204    #[tokio::test]
1205    async fn graceful_shutdown() {
1206        let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
1207            .await
1208            .unwrap();
1209
1210        let listener_addr = listener.local_addr().unwrap();
1211
1212        // Spawn the task in background so that we can connect there
1213        let listen_task = tokio::spawn(async move { listener.accept().await.unwrap() });
1214        // Only connect a stream, do not send headers or anything
1215        let _stream = TcpStream::connect(listener_addr).await.unwrap();
1216
1217        let (stream, _) = listen_task.await.unwrap();
1218        let stream = TokioIo::new(stream);
1219        let builder = auto::Builder::new(TokioExecutor::new());
1220        let connection = builder.serve_connection(stream, service_fn(hello));
1221
1222        pin!(connection);
1223
1224        connection.as_mut().graceful_shutdown();
1225
1226        let connection_error = tokio::time::timeout(Duration::from_millis(200), connection)
1227            .await
1228            .expect("Connection should have finished in a timely manner after graceful shutdown.")
1229            .expect_err("Connection should have been interrupted.");
1230
1231        let connection_error = connection_error
1232            .downcast_ref::<std::io::Error>()
1233            .expect("The error should have been `std::io::Error`.");
1234        assert_eq!(connection_error.kind(), std::io::ErrorKind::Interrupted);
1235    }
1236
1237    async fn connect_h1<B>(addr: SocketAddr) -> client::conn::http1::SendRequest<B>
1238    where
1239        B: Body + Send + 'static,
1240        B::Data: Send,
1241        B::Error: Into<Box<dyn StdError + Send + Sync>>,
1242    {
1243        let stream = TokioIo::new(TcpStream::connect(addr).await.unwrap());
1244        let (sender, connection) = client::conn::http1::handshake(stream).await.unwrap();
1245
1246        tokio::spawn(connection);
1247
1248        sender
1249    }
1250
1251    async fn connect_h2<B>(addr: SocketAddr) -> client::conn::http2::SendRequest<B>
1252    where
1253        B: Body + Unpin + Send + 'static,
1254        B::Data: Send,
1255        B::Error: Into<Box<dyn StdError + Send + Sync>>,
1256    {
1257        let stream = TokioIo::new(TcpStream::connect(addr).await.unwrap());
1258        let (sender, connection) = client::conn::http2::Builder::new(TokioExecutor::new())
1259            .handshake(stream)
1260            .await
1261            .unwrap();
1262
1263        tokio::spawn(connection);
1264
1265        sender
1266    }
1267
1268    async fn start_server(h1_only: bool, h2_only: bool) -> SocketAddr {
1269        let addr: SocketAddr = ([127, 0, 0, 1], 0).into();
1270        let listener = TcpListener::bind(addr).await.unwrap();
1271
1272        let local_addr = listener.local_addr().unwrap();
1273
1274        tokio::spawn(async move {
1275            loop {
1276                let (stream, _) = listener.accept().await.unwrap();
1277                let stream = TokioIo::new(stream);
1278                tokio::task::spawn(async move {
1279                    let mut builder = auto::Builder::new(TokioExecutor::new());
1280                    if h1_only {
1281                        builder = builder.http1_only();
1282                        builder.serve_connection(stream, service_fn(hello)).await
1283                    } else if h2_only {
1284                        builder = builder.http2_only();
1285                        builder.serve_connection(stream, service_fn(hello)).await
1286                    } else {
1287                        builder
1288                            .http2()
1289                            .max_header_list_size(4096)
1290                            .serve_connection_with_upgrades(stream, service_fn(hello))
1291                            .await
1292                    }
1293                    .unwrap();
1294                });
1295            }
1296        });
1297
1298        local_addr
1299    }
1300
1301    async fn hello(_req: Request<body::Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
1302        Ok(Response::new(Full::new(Bytes::from(BODY))))
1303    }
1304}