1pub mod upgrade;
4
5use futures_util::ready;
6use hyper::service::HttpService;
7use std::future::Future;
8use std::marker::PhantomPinned;
9use std::mem::MaybeUninit;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12use std::{error::Error as StdError, io, time::Duration};
13
14use bytes::Bytes;
15use http::{Request, Response};
16use http_body::Body;
17use hyper::{
18 body::Incoming,
19 rt::{Read, ReadBuf, Timer, Write},
20 service::Service,
21};
22
23#[cfg(feature = "http1")]
24use hyper::server::conn::http1;
25
26#[cfg(feature = "http2")]
27use hyper::{rt::bounds::Http2ServerConnExec, server::conn::http2};
28
29#[cfg(any(not(feature = "http2"), not(feature = "http1")))]
30use std::marker::PhantomData;
31
32use pin_project_lite::pin_project;
33
34use crate::common::rewind::Rewind;
35
36type Error = Box<dyn std::error::Error + Send + Sync>;
37
38type Result<T> = std::result::Result<T, Error>;
39
40const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
41
42#[cfg(feature = "http2")]
44pub trait HttpServerConnExec<A, B: Body>: Http2ServerConnExec<A, B> {}
45
46#[cfg(feature = "http2")]
47impl<A, B: Body, T: Http2ServerConnExec<A, B>> HttpServerConnExec<A, B> for T {}
48
49#[cfg(not(feature = "http2"))]
51pub trait HttpServerConnExec<A, B: Body> {}
52
53#[cfg(not(feature = "http2"))]
54impl<A, B: Body, T> HttpServerConnExec<A, B> for T {}
55
56#[derive(Clone, Debug)]
58pub struct Builder<E> {
59 #[cfg(feature = "http1")]
60 http1: http1::Builder,
61 #[cfg(feature = "http2")]
62 http2: http2::Builder<E>,
63 #[cfg(any(feature = "http1", feature = "http2"))]
64 version: Option<Version>,
65 #[cfg(not(feature = "http2"))]
66 _executor: E,
67}
68
69impl<E: Default> Default for Builder<E> {
70 fn default() -> Self {
71 Self::new(E::default())
72 }
73}
74
75impl<E> Builder<E> {
76 pub fn new(executor: E) -> Self {
92 Self {
93 #[cfg(feature = "http1")]
94 http1: http1::Builder::new(),
95 #[cfg(feature = "http2")]
96 http2: http2::Builder::new(executor),
97 #[cfg(any(feature = "http1", feature = "http2"))]
98 version: None,
99 #[cfg(not(feature = "http2"))]
100 _executor: executor,
101 }
102 }
103
104 #[cfg(feature = "http1")]
106 pub fn http1(&mut self) -> Http1Builder<'_, E> {
107 Http1Builder { inner: self }
108 }
109
110 #[cfg(feature = "http2")]
112 pub fn http2(&mut self) -> Http2Builder<'_, E> {
113 Http2Builder { inner: self }
114 }
115
116 #[cfg(feature = "http2")]
122 pub fn http2_only(mut self) -> Self {
123 assert!(self.version.is_none());
124 self.version = Some(Version::H2);
125 self
126 }
127
128 #[cfg(feature = "http1")]
134 pub fn http1_only(mut self) -> Self {
135 assert!(self.version.is_none());
136 self.version = Some(Version::H1);
137 self
138 }
139
140 pub fn is_http1_available(&self) -> bool {
142 match self.version {
143 #[cfg(feature = "http1")]
144 Some(Version::H1) => true,
145 #[cfg(feature = "http2")]
146 Some(Version::H2) => false,
147 #[cfg(any(feature = "http1", feature = "http2"))]
148 _ => true,
149 }
150 }
151
152 pub fn is_http2_available(&self) -> bool {
154 match self.version {
155 #[cfg(feature = "http1")]
156 Some(Version::H1) => false,
157 #[cfg(feature = "http2")]
158 Some(Version::H2) => true,
159 #[cfg(any(feature = "http1", feature = "http2"))]
160 _ => true,
161 }
162 }
163
164 pub fn serve_connection<I, S, B>(&self, io: I, service: S) -> Connection<'_, I, S, E>
166 where
167 S: Service<Request<Incoming>, Response = Response<B>>,
168 S::Future: 'static,
169 S::Error: Into<Box<dyn StdError + Send + Sync>>,
170 B: Body + 'static,
171 B::Error: Into<Box<dyn StdError + Send + Sync>>,
172 I: Read + Write + Unpin + 'static,
173 E: HttpServerConnExec<S::Future, B>,
174 {
175 let state = match self.version {
176 #[cfg(feature = "http1")]
177 Some(Version::H1) => {
178 let io = Rewind::new_buffered(io, Bytes::new());
179 let conn = self.http1.serve_connection(io, service);
180 ConnState::H1 { conn }
181 }
182 #[cfg(feature = "http2")]
183 Some(Version::H2) => {
184 let io = Rewind::new_buffered(io, Bytes::new());
185 let conn = self.http2.serve_connection(io, service);
186 ConnState::H2 { conn }
187 }
188 #[cfg(any(feature = "http1", feature = "http2"))]
189 _ => ConnState::ReadVersion {
190 read_version: read_version(io),
191 builder: Cow::Borrowed(self),
192 service: Some(service),
193 },
194 };
195
196 Connection { state }
197 }
198
199 pub fn serve_connection_with_upgrades<I, S, B>(
209 &self,
210 io: I,
211 service: S,
212 ) -> UpgradeableConnection<'_, I, S, E>
213 where
214 S: Service<Request<Incoming>, Response = Response<B>>,
215 S::Future: 'static,
216 S::Error: Into<Box<dyn StdError + Send + Sync>>,
217 B: Body + 'static,
218 B::Error: Into<Box<dyn StdError + Send + Sync>>,
219 I: Read + Write + Unpin + Send + 'static,
220 E: HttpServerConnExec<S::Future, B>,
221 {
222 UpgradeableConnection {
223 state: UpgradeableConnState::ReadVersion {
224 read_version: read_version(io),
225 builder: Cow::Borrowed(self),
226 service: Some(service),
227 },
228 }
229 }
230}
231
232#[derive(Copy, Clone, Debug)]
233enum Version {
234 H1,
235 H2,
236}
237
238impl Version {
239 #[must_use]
240 #[cfg(any(not(feature = "http2"), not(feature = "http1")))]
241 pub fn unsupported(self) -> Error {
242 match self {
243 Version::H1 => Error::from("HTTP/1 is not supported"),
244 Version::H2 => Error::from("HTTP/2 is not supported"),
245 }
246 }
247}
248
249fn read_version<I>(io: I) -> ReadVersion<I>
250where
251 I: Read + Unpin,
252{
253 ReadVersion {
254 io: Some(io),
255 buf: [MaybeUninit::uninit(); 24],
256 filled: 0,
257 version: Version::H2,
258 cancelled: false,
259 _pin: PhantomPinned,
260 }
261}
262
263pin_project! {
264 struct ReadVersion<I> {
265 io: Option<I>,
266 buf: [MaybeUninit<u8>; 24],
267 filled: usize,
269 version: Version,
270 cancelled: bool,
271 #[pin]
273 _pin: PhantomPinned,
274 }
275}
276
277impl<I> ReadVersion<I> {
278 pub fn cancel(self: Pin<&mut Self>) {
279 *self.project().cancelled = true;
280 }
281}
282
283impl<I> Future for ReadVersion<I>
284where
285 I: Read + Unpin,
286{
287 type Output = io::Result<(Version, Rewind<I>)>;
288
289 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
290 let this = self.project();
291 if *this.cancelled {
292 return Poll::Ready(Err(io::Error::new(io::ErrorKind::Interrupted, "Cancelled")));
293 }
294
295 let mut buf = ReadBuf::uninit(&mut *this.buf);
296 unsafe {
299 buf.unfilled().advance(*this.filled);
300 };
301
302 while buf.filled().len() < H2_PREFACE.len() {
304 let len = buf.filled().len();
305 ready!(Pin::new(this.io.as_mut().unwrap()).poll_read(cx, buf.unfilled()))?;
306 *this.filled = buf.filled().len();
307
308 if buf.filled().len() == len
310 || buf.filled()[len..] != H2_PREFACE[len..buf.filled().len()]
311 {
312 *this.version = Version::H1;
313 break;
314 }
315 }
316
317 let io = this.io.take().unwrap();
318 let buf = buf.filled().to_vec();
319 Poll::Ready(Ok((
320 *this.version,
321 Rewind::new_buffered(io, Bytes::from(buf)),
322 )))
323 }
324}
325
326pin_project! {
327 #[must_use = "futures do nothing unless polled"]
333 pub struct Connection<'a, I, S, E>
334 where
335 S: HttpService<Incoming>,
336 {
337 #[pin]
338 state: ConnState<'a, I, S, E>,
339 }
340}
341
342enum Cow<'a, T> {
344 Borrowed(&'a T),
345 Owned(T),
346}
347
348impl<T> std::ops::Deref for Cow<'_, T> {
349 type Target = T;
350 fn deref(&self) -> &T {
351 match self {
352 Cow::Borrowed(t) => &*t,
353 Cow::Owned(ref t) => t,
354 }
355 }
356}
357
358#[cfg(feature = "http1")]
359type Http1Connection<I, S> = hyper::server::conn::http1::Connection<Rewind<I>, S>;
360
361#[cfg(not(feature = "http1"))]
362type Http1Connection<I, S> = (PhantomData<I>, PhantomData<S>);
363
364#[cfg(feature = "http2")]
365type Http2Connection<I, S, E> = hyper::server::conn::http2::Connection<Rewind<I>, S, E>;
366
367#[cfg(not(feature = "http2"))]
368type Http2Connection<I, S, E> = (PhantomData<I>, PhantomData<S>, PhantomData<E>);
369
370pin_project! {
371 #[project = ConnStateProj]
372 enum ConnState<'a, I, S, E>
373 where
374 S: HttpService<Incoming>,
375 {
376 ReadVersion {
377 #[pin]
378 read_version: ReadVersion<I>,
379 builder: Cow<'a, Builder<E>>,
380 service: Option<S>,
381 },
382 H1 {
383 #[pin]
384 conn: Http1Connection<I, S>,
385 },
386 H2 {
387 #[pin]
388 conn: Http2Connection<I, S, E>,
389 },
390 }
391}
392
393impl<I, S, E, B> Connection<'_, I, S, E>
394where
395 S: HttpService<Incoming, ResBody = B>,
396 S::Error: Into<Box<dyn StdError + Send + Sync>>,
397 I: Read + Write + Unpin,
398 B: Body + 'static,
399 B::Error: Into<Box<dyn StdError + Send + Sync>>,
400 E: HttpServerConnExec<S::Future, B>,
401{
402 pub fn graceful_shutdown(self: Pin<&mut Self>) {
411 match self.project().state.project() {
412 ConnStateProj::ReadVersion { read_version, .. } => read_version.cancel(),
413 #[cfg(feature = "http1")]
414 ConnStateProj::H1 { conn } => conn.graceful_shutdown(),
415 #[cfg(feature = "http2")]
416 ConnStateProj::H2 { conn } => conn.graceful_shutdown(),
417 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
418 _ => unreachable!(),
419 }
420 }
421
422 pub fn into_owned(self) -> Connection<'static, I, S, E>
424 where
425 Builder<E>: Clone,
426 {
427 Connection {
428 state: match self.state {
429 ConnState::ReadVersion {
430 read_version,
431 builder,
432 service,
433 } => ConnState::ReadVersion {
434 read_version,
435 service,
436 builder: Cow::Owned(builder.clone()),
437 },
438 #[cfg(feature = "http1")]
439 ConnState::H1 { conn } => ConnState::H1 { conn },
440 #[cfg(feature = "http2")]
441 ConnState::H2 { conn } => ConnState::H2 { conn },
442 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
443 _ => unreachable!(),
444 },
445 }
446 }
447}
448
449impl<I, S, E, B> Future for Connection<'_, I, S, E>
450where
451 S: Service<Request<Incoming>, Response = Response<B>>,
452 S::Future: 'static,
453 S::Error: Into<Box<dyn StdError + Send + Sync>>,
454 B: Body + 'static,
455 B::Error: Into<Box<dyn StdError + Send + Sync>>,
456 I: Read + Write + Unpin + 'static,
457 E: HttpServerConnExec<S::Future, B>,
458{
459 type Output = Result<()>;
460
461 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
462 loop {
463 let mut this = self.as_mut().project();
464
465 match this.state.as_mut().project() {
466 ConnStateProj::ReadVersion {
467 read_version,
468 builder,
469 service,
470 } => {
471 let (version, io) = ready!(read_version.poll(cx))?;
472 let service = service.take().unwrap();
473 match version {
474 #[cfg(feature = "http1")]
475 Version::H1 => {
476 let conn = builder.http1.serve_connection(io, service);
477 this.state.set(ConnState::H1 { conn });
478 }
479 #[cfg(feature = "http2")]
480 Version::H2 => {
481 let conn = builder.http2.serve_connection(io, service);
482 this.state.set(ConnState::H2 { conn });
483 }
484 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
485 _ => return Poll::Ready(Err(version.unsupported())),
486 }
487 }
488 #[cfg(feature = "http1")]
489 ConnStateProj::H1 { conn } => {
490 return conn.poll(cx).map_err(Into::into);
491 }
492 #[cfg(feature = "http2")]
493 ConnStateProj::H2 { conn } => {
494 return conn.poll(cx).map_err(Into::into);
495 }
496 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
497 _ => unreachable!(),
498 }
499 }
500 }
501}
502
503pin_project! {
504 #[must_use = "futures do nothing unless polled"]
510 pub struct UpgradeableConnection<'a, I, S, E>
511 where
512 S: HttpService<Incoming>,
513 {
514 #[pin]
515 state: UpgradeableConnState<'a, I, S, E>,
516 }
517}
518
519#[cfg(feature = "http1")]
520type Http1UpgradeableConnection<I, S> = hyper::server::conn::http1::UpgradeableConnection<I, S>;
521
522#[cfg(not(feature = "http1"))]
523type Http1UpgradeableConnection<I, S> = (PhantomData<I>, PhantomData<S>);
524
525pin_project! {
526 #[project = UpgradeableConnStateProj]
527 enum UpgradeableConnState<'a, I, S, E>
528 where
529 S: HttpService<Incoming>,
530 {
531 ReadVersion {
532 #[pin]
533 read_version: ReadVersion<I>,
534 builder: Cow<'a, Builder<E>>,
535 service: Option<S>,
536 },
537 H1 {
538 #[pin]
539 conn: Http1UpgradeableConnection<Rewind<I>, S>,
540 },
541 H2 {
542 #[pin]
543 conn: Http2Connection<I, S, E>,
544 },
545 }
546}
547
548impl<I, S, E, B> UpgradeableConnection<'_, I, S, E>
549where
550 S: HttpService<Incoming, ResBody = B>,
551 S::Error: Into<Box<dyn StdError + Send + Sync>>,
552 I: Read + Write + Unpin,
553 B: Body + 'static,
554 B::Error: Into<Box<dyn StdError + Send + Sync>>,
555 E: HttpServerConnExec<S::Future, B>,
556{
557 pub fn graceful_shutdown(self: Pin<&mut Self>) {
566 match self.project().state.project() {
567 UpgradeableConnStateProj::ReadVersion { read_version, .. } => read_version.cancel(),
568 #[cfg(feature = "http1")]
569 UpgradeableConnStateProj::H1 { conn } => conn.graceful_shutdown(),
570 #[cfg(feature = "http2")]
571 UpgradeableConnStateProj::H2 { conn } => conn.graceful_shutdown(),
572 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
573 _ => unreachable!(),
574 }
575 }
576
577 pub fn into_owned(self) -> UpgradeableConnection<'static, I, S, E>
579 where
580 Builder<E>: Clone,
581 {
582 UpgradeableConnection {
583 state: match self.state {
584 UpgradeableConnState::ReadVersion {
585 read_version,
586 builder,
587 service,
588 } => UpgradeableConnState::ReadVersion {
589 read_version,
590 service,
591 builder: Cow::Owned(builder.clone()),
592 },
593 #[cfg(feature = "http1")]
594 UpgradeableConnState::H1 { conn } => UpgradeableConnState::H1 { conn },
595 #[cfg(feature = "http2")]
596 UpgradeableConnState::H2 { conn } => UpgradeableConnState::H2 { conn },
597 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
598 _ => unreachable!(),
599 },
600 }
601 }
602}
603
604impl<I, S, E, B> Future for UpgradeableConnection<'_, I, S, E>
605where
606 S: Service<Request<Incoming>, Response = Response<B>>,
607 S::Future: 'static,
608 S::Error: Into<Box<dyn StdError + Send + Sync>>,
609 B: Body + 'static,
610 B::Error: Into<Box<dyn StdError + Send + Sync>>,
611 I: Read + Write + Unpin + Send + 'static,
612 E: HttpServerConnExec<S::Future, B>,
613{
614 type Output = Result<()>;
615
616 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
617 loop {
618 let mut this = self.as_mut().project();
619
620 match this.state.as_mut().project() {
621 UpgradeableConnStateProj::ReadVersion {
622 read_version,
623 builder,
624 service,
625 } => {
626 let (version, io) = ready!(read_version.poll(cx))?;
627 let service = service.take().unwrap();
628 match version {
629 #[cfg(feature = "http1")]
630 Version::H1 => {
631 let conn = builder.http1.serve_connection(io, service).with_upgrades();
632 this.state.set(UpgradeableConnState::H1 { conn });
633 }
634 #[cfg(feature = "http2")]
635 Version::H2 => {
636 let conn = builder.http2.serve_connection(io, service);
637 this.state.set(UpgradeableConnState::H2 { conn });
638 }
639 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
640 _ => return Poll::Ready(Err(version.unsupported())),
641 }
642 }
643 #[cfg(feature = "http1")]
644 UpgradeableConnStateProj::H1 { conn } => {
645 return conn.poll(cx).map_err(Into::into);
646 }
647 #[cfg(feature = "http2")]
648 UpgradeableConnStateProj::H2 { conn } => {
649 return conn.poll(cx).map_err(Into::into);
650 }
651 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
652 _ => unreachable!(),
653 }
654 }
655 }
656}
657
658#[cfg(feature = "http1")]
660pub struct Http1Builder<'a, E> {
661 inner: &'a mut Builder<E>,
662}
663
664#[cfg(feature = "http1")]
665impl<E> Http1Builder<'_, E> {
666 #[cfg(feature = "http2")]
668 pub fn http2(&mut self) -> Http2Builder<'_, E> {
669 Http2Builder { inner: self.inner }
670 }
671
672 pub fn auto_date_header(&mut self, enabled: bool) -> &mut Self {
678 self.inner.http1.auto_date_header(enabled);
679 self
680 }
681
682 pub fn half_close(&mut self, val: bool) -> &mut Self {
691 self.inner.http1.half_close(val);
692 self
693 }
694
695 pub fn keep_alive(&mut self, val: bool) -> &mut Self {
699 self.inner.http1.keep_alive(val);
700 self
701 }
702
703 pub fn title_case_headers(&mut self, enabled: bool) -> &mut Self {
710 self.inner.http1.title_case_headers(enabled);
711 self
712 }
713
714 pub fn ignore_invalid_headers(&mut self, enabled: bool) -> &mut Self {
722 self.inner.http1.ignore_invalid_headers(enabled);
723 self
724 }
725
726 pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Self {
740 self.inner.http1.preserve_header_case(enabled);
741 self
742 }
743
744 pub fn max_headers(&mut self, val: usize) -> &mut Self {
760 self.inner.http1.max_headers(val);
761 self
762 }
763
764 pub fn header_read_timeout(&mut self, read_timeout: impl Into<Option<Duration>>) -> &mut Self {
774 self.inner.http1.header_read_timeout(read_timeout);
775 self
776 }
777
778 pub fn writev(&mut self, val: bool) -> &mut Self {
791 self.inner.http1.writev(val);
792 self
793 }
794
795 pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
803 self.inner.http1.max_buf_size(max);
804 self
805 }
806
807 pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self {
813 self.inner.http1.pipeline_flush(enabled);
814 self
815 }
816
817 pub fn timer<M>(&mut self, timer: M) -> &mut Self
819 where
820 M: Timer + Send + Sync + 'static,
821 {
822 self.inner.http1.timer(timer);
823 self
824 }
825
826 #[cfg(feature = "http2")]
828 pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
829 where
830 S: Service<Request<Incoming>, Response = Response<B>>,
831 S::Future: 'static,
832 S::Error: Into<Box<dyn StdError + Send + Sync>>,
833 B: Body + 'static,
834 B::Error: Into<Box<dyn StdError + Send + Sync>>,
835 I: Read + Write + Unpin + 'static,
836 E: HttpServerConnExec<S::Future, B>,
837 {
838 self.inner.serve_connection(io, service).await
839 }
840
841 #[cfg(not(feature = "http2"))]
843 pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
844 where
845 S: Service<Request<Incoming>, Response = Response<B>>,
846 S::Future: 'static,
847 S::Error: Into<Box<dyn StdError + Send + Sync>>,
848 B: Body + 'static,
849 B::Error: Into<Box<dyn StdError + Send + Sync>>,
850 I: Read + Write + Unpin + 'static,
851 {
852 self.inner.serve_connection(io, service).await
853 }
854
855 #[cfg(feature = "http2")]
859 pub fn serve_connection_with_upgrades<I, S, B>(
860 &self,
861 io: I,
862 service: S,
863 ) -> UpgradeableConnection<'_, I, S, E>
864 where
865 S: Service<Request<Incoming>, Response = Response<B>>,
866 S::Future: 'static,
867 S::Error: Into<Box<dyn StdError + Send + Sync>>,
868 B: Body + 'static,
869 B::Error: Into<Box<dyn StdError + Send + Sync>>,
870 I: Read + Write + Unpin + Send + 'static,
871 E: HttpServerConnExec<S::Future, B>,
872 {
873 self.inner.serve_connection_with_upgrades(io, service)
874 }
875}
876
877#[cfg(feature = "http2")]
879pub struct Http2Builder<'a, E> {
880 inner: &'a mut Builder<E>,
881}
882
883#[cfg(feature = "http2")]
884impl<E> Http2Builder<'_, E> {
885 #[cfg(feature = "http1")]
886 pub fn http1(&mut self) -> Http1Builder<'_, E> {
888 Http1Builder { inner: self.inner }
889 }
890
891 pub fn max_pending_accept_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self {
898 self.inner.http2.max_pending_accept_reset_streams(max);
899 self
900 }
901
902 pub fn max_local_error_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self {
911 self.inner.http2.max_local_error_reset_streams(max);
912 self
913 }
914
915 pub fn initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
924 self.inner.http2.initial_stream_window_size(sz);
925 self
926 }
927
928 pub fn initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
934 self.inner.http2.initial_connection_window_size(sz);
935 self
936 }
937
938 pub fn adaptive_window(&mut self, enabled: bool) -> &mut Self {
944 self.inner.http2.adaptive_window(enabled);
945 self
946 }
947
948 pub fn max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
954 self.inner.http2.max_frame_size(sz);
955 self
956 }
957
958 pub fn max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self {
965 self.inner.http2.max_concurrent_streams(max);
966 self
967 }
968
969 pub fn keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self {
979 self.inner.http2.keep_alive_interval(interval);
980 self
981 }
982
983 pub fn keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
993 self.inner.http2.keep_alive_timeout(timeout);
994 self
995 }
996
997 pub fn max_send_buf_size(&mut self, max: usize) -> &mut Self {
1005 self.inner.http2.max_send_buf_size(max);
1006 self
1007 }
1008
1009 pub fn enable_connect_protocol(&mut self) -> &mut Self {
1013 self.inner.http2.enable_connect_protocol();
1014 self
1015 }
1016
1017 pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
1021 self.inner.http2.max_header_list_size(max);
1022 self
1023 }
1024
1025 pub fn timer<M>(&mut self, timer: M) -> &mut Self
1027 where
1028 M: Timer + Send + Sync + 'static,
1029 {
1030 self.inner.http2.timer(timer);
1031 self
1032 }
1033
1034 pub fn auto_date_header(&mut self, enabled: bool) -> &mut Self {
1040 self.inner.http2.auto_date_header(enabled);
1041 self
1042 }
1043
1044 pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
1046 where
1047 S: Service<Request<Incoming>, Response = Response<B>>,
1048 S::Future: 'static,
1049 S::Error: Into<Box<dyn StdError + Send + Sync>>,
1050 B: Body + 'static,
1051 B::Error: Into<Box<dyn StdError + Send + Sync>>,
1052 I: Read + Write + Unpin + 'static,
1053 E: HttpServerConnExec<S::Future, B>,
1054 {
1055 self.inner.serve_connection(io, service).await
1056 }
1057
1058 pub fn serve_connection_with_upgrades<I, S, B>(
1062 &self,
1063 io: I,
1064 service: S,
1065 ) -> UpgradeableConnection<'_, I, S, E>
1066 where
1067 S: Service<Request<Incoming>, Response = Response<B>>,
1068 S::Future: 'static,
1069 S::Error: Into<Box<dyn StdError + Send + Sync>>,
1070 B: Body + 'static,
1071 B::Error: Into<Box<dyn StdError + Send + Sync>>,
1072 I: Read + Write + Unpin + Send + 'static,
1073 E: HttpServerConnExec<S::Future, B>,
1074 {
1075 self.inner.serve_connection_with_upgrades(io, service)
1076 }
1077}
1078
1079#[cfg(test)]
1080mod tests {
1081 use crate::{
1082 rt::{TokioExecutor, TokioIo},
1083 server::conn::auto,
1084 };
1085 use http::{Request, Response};
1086 use http_body::Body;
1087 use http_body_util::{BodyExt, Empty, Full};
1088 use hyper::{body, body::Bytes, client, service::service_fn};
1089 use std::{convert::Infallible, error::Error as StdError, net::SocketAddr, time::Duration};
1090 use tokio::{
1091 net::{TcpListener, TcpStream},
1092 pin,
1093 };
1094
1095 const BODY: &[u8] = b"Hello, world!";
1096
1097 #[test]
1098 fn configuration() {
1099 auto::Builder::new(TokioExecutor::new())
1101 .http1()
1102 .keep_alive(true)
1103 .http2()
1104 .keep_alive_interval(None);
1105 let mut builder = auto::Builder::new(TokioExecutor::new());
1109
1110 builder.http1().keep_alive(true);
1111 builder.http2().keep_alive_interval(None);
1112 }
1114
1115 #[cfg(not(miri))]
1116 #[tokio::test]
1117 async fn http1() {
1118 let addr = start_server(false, false).await;
1119 let mut sender = connect_h1(addr).await;
1120
1121 let response = sender
1122 .send_request(Request::new(Empty::<Bytes>::new()))
1123 .await
1124 .unwrap();
1125
1126 let body = response.into_body().collect().await.unwrap().to_bytes();
1127
1128 assert_eq!(body, BODY);
1129 }
1130
1131 #[cfg(not(miri))]
1132 #[tokio::test]
1133 async fn http2() {
1134 let addr = start_server(false, false).await;
1135 let mut sender = connect_h2(addr).await;
1136
1137 let response = sender
1138 .send_request(Request::new(Empty::<Bytes>::new()))
1139 .await
1140 .unwrap();
1141
1142 let body = response.into_body().collect().await.unwrap().to_bytes();
1143
1144 assert_eq!(body, BODY);
1145 }
1146
1147 #[cfg(not(miri))]
1148 #[tokio::test]
1149 async fn http2_only() {
1150 let addr = start_server(false, true).await;
1151 let mut sender = connect_h2(addr).await;
1152
1153 let response = sender
1154 .send_request(Request::new(Empty::<Bytes>::new()))
1155 .await
1156 .unwrap();
1157
1158 let body = response.into_body().collect().await.unwrap().to_bytes();
1159
1160 assert_eq!(body, BODY);
1161 }
1162
1163 #[cfg(not(miri))]
1164 #[tokio::test]
1165 async fn http2_only_fail_if_client_is_http1() {
1166 let addr = start_server(false, true).await;
1167 let mut sender = connect_h1(addr).await;
1168
1169 let _ = sender
1170 .send_request(Request::new(Empty::<Bytes>::new()))
1171 .await
1172 .expect_err("should fail");
1173 }
1174
1175 #[cfg(not(miri))]
1176 #[tokio::test]
1177 async fn http1_only() {
1178 let addr = start_server(true, false).await;
1179 let mut sender = connect_h1(addr).await;
1180
1181 let response = sender
1182 .send_request(Request::new(Empty::<Bytes>::new()))
1183 .await
1184 .unwrap();
1185
1186 let body = response.into_body().collect().await.unwrap().to_bytes();
1187
1188 assert_eq!(body, BODY);
1189 }
1190
1191 #[cfg(not(miri))]
1192 #[tokio::test]
1193 async fn http1_only_fail_if_client_is_http2() {
1194 let addr = start_server(true, false).await;
1195 let mut sender = connect_h2(addr).await;
1196
1197 let _ = sender
1198 .send_request(Request::new(Empty::<Bytes>::new()))
1199 .await
1200 .expect_err("should fail");
1201 }
1202
1203 #[cfg(not(miri))]
1204 #[tokio::test]
1205 async fn graceful_shutdown() {
1206 let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
1207 .await
1208 .unwrap();
1209
1210 let listener_addr = listener.local_addr().unwrap();
1211
1212 let listen_task = tokio::spawn(async move { listener.accept().await.unwrap() });
1214 let _stream = TcpStream::connect(listener_addr).await.unwrap();
1216
1217 let (stream, _) = listen_task.await.unwrap();
1218 let stream = TokioIo::new(stream);
1219 let builder = auto::Builder::new(TokioExecutor::new());
1220 let connection = builder.serve_connection(stream, service_fn(hello));
1221
1222 pin!(connection);
1223
1224 connection.as_mut().graceful_shutdown();
1225
1226 let connection_error = tokio::time::timeout(Duration::from_millis(200), connection)
1227 .await
1228 .expect("Connection should have finished in a timely manner after graceful shutdown.")
1229 .expect_err("Connection should have been interrupted.");
1230
1231 let connection_error = connection_error
1232 .downcast_ref::<std::io::Error>()
1233 .expect("The error should have been `std::io::Error`.");
1234 assert_eq!(connection_error.kind(), std::io::ErrorKind::Interrupted);
1235 }
1236
1237 async fn connect_h1<B>(addr: SocketAddr) -> client::conn::http1::SendRequest<B>
1238 where
1239 B: Body + Send + 'static,
1240 B::Data: Send,
1241 B::Error: Into<Box<dyn StdError + Send + Sync>>,
1242 {
1243 let stream = TokioIo::new(TcpStream::connect(addr).await.unwrap());
1244 let (sender, connection) = client::conn::http1::handshake(stream).await.unwrap();
1245
1246 tokio::spawn(connection);
1247
1248 sender
1249 }
1250
1251 async fn connect_h2<B>(addr: SocketAddr) -> client::conn::http2::SendRequest<B>
1252 where
1253 B: Body + Unpin + Send + 'static,
1254 B::Data: Send,
1255 B::Error: Into<Box<dyn StdError + Send + Sync>>,
1256 {
1257 let stream = TokioIo::new(TcpStream::connect(addr).await.unwrap());
1258 let (sender, connection) = client::conn::http2::Builder::new(TokioExecutor::new())
1259 .handshake(stream)
1260 .await
1261 .unwrap();
1262
1263 tokio::spawn(connection);
1264
1265 sender
1266 }
1267
1268 async fn start_server(h1_only: bool, h2_only: bool) -> SocketAddr {
1269 let addr: SocketAddr = ([127, 0, 0, 1], 0).into();
1270 let listener = TcpListener::bind(addr).await.unwrap();
1271
1272 let local_addr = listener.local_addr().unwrap();
1273
1274 tokio::spawn(async move {
1275 loop {
1276 let (stream, _) = listener.accept().await.unwrap();
1277 let stream = TokioIo::new(stream);
1278 tokio::task::spawn(async move {
1279 let mut builder = auto::Builder::new(TokioExecutor::new());
1280 if h1_only {
1281 builder = builder.http1_only();
1282 builder.serve_connection(stream, service_fn(hello)).await
1283 } else if h2_only {
1284 builder = builder.http2_only();
1285 builder.serve_connection(stream, service_fn(hello)).await
1286 } else {
1287 builder
1288 .http2()
1289 .max_header_list_size(4096)
1290 .serve_connection_with_upgrades(stream, service_fn(hello))
1291 .await
1292 }
1293 .unwrap();
1294 });
1295 }
1296 });
1297
1298 local_addr
1299 }
1300
1301 async fn hello(_req: Request<body::Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
1302 Ok(Response::new(Full::new(Bytes::from(BODY))))
1303 }
1304}