1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
use core::fmt;
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::ready;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
use pin_project_lite::pin_project;
// FIXME: docs, tests
pin_project! {
/// Stream for the [`take_until`](super::StreamExt::take_until) method.
#[must_use = "streams do nothing unless polled"]
pub struct TakeUntil<St: Stream, Fut: Future> {
#[pin]
stream: St,
// Contains the inner Future on start and None once the inner Future is resolved
// or taken out by the user.
#[pin]
fut: Option<Fut>,
// Contains fut's return value once fut is resolved
fut_result: Option<Fut::Output>,
// Whether the future was taken out by the user.
free: bool,
}
}
impl<St, Fut> fmt::Debug for TakeUntil<St, Fut>
where
St: Stream + fmt::Debug,
St::Item: fmt::Debug,
Fut: Future + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TakeUntil").field("stream", &self.stream).field("fut", &self.fut).finish()
}
}
impl<St, Fut> TakeUntil<St, Fut>
where
St: Stream,
Fut: Future,
{
pub(super) fn new(stream: St, fut: Fut) -> Self {
Self { stream, fut: Some(fut), fut_result: None, free: false }
}
delegate_access_inner!(stream, St, ());
/// Extract the stopping future out of the combinator.
/// The future is returned only if it isn't resolved yet, ie. if the stream isn't stopped yet.
/// Taking out the future means the combinator will be yielding
/// elements from the wrapped stream without ever stopping it.
pub fn take_future(&mut self) -> Option<Fut> {
if self.fut.is_some() {
self.free = true;
}
self.fut.take()
}
/// Once the stopping future is resolved, this method can be used
/// to extract the value returned by the stopping future.
///
/// This may be used to retrieve arbitrary data from the stopping
/// future, for example a reason why the stream was stopped.
///
/// This method will return `None` if the future isn't resolved yet,
/// or if the result was already taken out.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::future;
/// use futures::stream::{self, StreamExt};
/// use futures::task::Poll;
///
/// let stream = stream::iter(1..=10);
///
/// let mut i = 0;
/// let stop_fut = future::poll_fn(|_cx| {
/// i += 1;
/// if i <= 5 {
/// Poll::Pending
/// } else {
/// Poll::Ready("reason")
/// }
/// });
///
/// let mut stream = stream.take_until(stop_fut);
/// let _ = stream.by_ref().collect::<Vec<_>>().await;
///
/// let result = stream.take_result().unwrap();
/// assert_eq!(result, "reason");
/// # });
/// ```
pub fn take_result(&mut self) -> Option<Fut::Output> {
self.fut_result.take()
}
/// Whether the stream was stopped yet by the stopping future
/// being resolved.
pub fn is_stopped(&self) -> bool {
!self.free && self.fut.is_none()
}
}
impl<St, Fut> Stream for TakeUntil<St, Fut>
where
St: Stream,
Fut: Future,
{
type Item = St::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
let mut this = self.project();
if let Some(f) = this.fut.as_mut().as_pin_mut() {
if let Poll::Ready(result) = f.poll(cx) {
this.fut.set(None);
*this.fut_result = Some(result);
}
}
if !*this.free && this.fut.is_none() {
// Future resolved, inner stream stopped
Poll::Ready(None)
} else {
// Future either not resolved yet or taken out by the user
let item = ready!(this.stream.poll_next(cx));
if item.is_none() {
this.fut.set(None);
}
Poll::Ready(item)
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
if self.is_stopped() {
return (0, Some(0));
}
self.stream.size_hint()
}
}
impl<St, Fut> FusedStream for TakeUntil<St, Fut>
where
St: Stream,
Fut: Future,
{
fn is_terminated(&self) -> bool {
self.is_stopped()
}
}
// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<S, Fut, Item> Sink<Item> for TakeUntil<S, Fut>
where
S: Stream + Sink<Item>,
Fut: Future,
{
type Error = S::Error;
delegate_sink!(stream, Item);
}