flume/
lib.rs

1//! # Flume
2//!
3//! A blazingly fast multi-producer, multi-consumer channel.
4//!
5//! *"Do not communicate by sharing memory; instead, share memory by communicating."*
6//!
7//! ## Why Flume?
8//!
9//! - **Featureful**: Unbounded, bounded and rendezvous queues
10//! - **Fast**: Always faster than `std::sync::mpsc` and sometimes `crossbeam-channel`
11//! - **Safe**: No `unsafe` code anywhere in the codebase!
12//! - **Flexible**: `Sender` and `Receiver` both implement `Send + Sync + Clone`
13//! - **Familiar**: Drop-in replacement for `std::sync::mpsc`
14//! - **Capable**: Additional features like MPMC support and send timeouts/deadlines
15//! - **Simple**: Few dependencies, minimal codebase, fast to compile
16//! - **Asynchronous**: `async` support, including mix 'n match with sync code
17//! - **Ergonomic**: Powerful `select`-like interface
18//!
19//! ## Example
20//!
21//! ```
22//! let (tx, rx) = flume::unbounded();
23//!
24//! tx.send(42).unwrap();
25//! assert_eq!(rx.recv().unwrap(), 42);
26//! ```
27
28#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
29#![deny(missing_docs)]
30
31#[cfg(feature = "select")]
32pub mod select;
33#[cfg(feature = "async")]
34pub mod r#async;
35
36mod signal;
37
38// Reexports
39#[cfg(feature = "select")]
40pub use select::Selector;
41
42use std::{
43    collections::VecDeque,
44    sync::{Arc, atomic::{AtomicUsize, AtomicBool, Ordering}, Weak},
45    time::{Duration, Instant},
46    marker::PhantomData,
47    thread,
48    fmt,
49};
50use std::fmt::Formatter;
51#[cfg(feature = "spin")]
52use spin1::{Mutex as Spinlock, MutexGuard as SpinlockGuard};
53use crate::signal::{Signal, SyncSignal};
54
55/// An error that may be emitted when attempting to send a value into a channel on a sender when
56/// all receivers are dropped.
57#[derive(Copy, Clone, PartialEq, Eq)]
58pub struct SendError<T>(pub T);
59
60impl<T> SendError<T> {
61    /// Consume the error, yielding the message that failed to send.
62    pub fn into_inner(self) -> T { self.0 }
63}
64
65impl<T> fmt::Debug for SendError<T> {
66    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67        "SendError(..)".fmt(f)
68    }
69}
70
71impl<T> fmt::Display for SendError<T> {
72    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73        "sending on a closed channel".fmt(f)
74    }
75}
76
77impl<T> std::error::Error for SendError<T> {}
78
79/// An error that may be emitted when attempting to send a value into a channel on a sender when
80/// the channel is full or all receivers are dropped.
81#[derive(Copy, Clone, PartialEq, Eq)]
82pub enum TrySendError<T> {
83    /// The channel the message is sent on has a finite capacity and was full when the send was attempted.
84    Full(T),
85    /// All channel receivers were dropped and so the message has nobody to receive it.
86    Disconnected(T),
87}
88
89impl<T> TrySendError<T> {
90    /// Consume the error, yielding the message that failed to send.
91    pub fn into_inner(self) -> T {
92        match self {
93            Self::Full(msg) | Self::Disconnected(msg) => msg,
94        }
95    }
96}
97
98impl<T> fmt::Debug for TrySendError<T> {
99    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100        match *self {
101            TrySendError::Full(..) => "Full(..)".fmt(f),
102            TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
103        }
104    }
105}
106
107impl<T> fmt::Display for TrySendError<T> {
108    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109        match self {
110            TrySendError::Full(..) => "sending on a full channel".fmt(f),
111            TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f),
112        }
113    }
114}
115
116impl<T> std::error::Error for TrySendError<T> {}
117
118impl<T> From<SendError<T>> for TrySendError<T> {
119    fn from(err: SendError<T>) -> Self {
120        match err {
121            SendError(item) => Self::Disconnected(item),
122        }
123    }
124}
125
126/// An error that may be emitted when sending a value into a channel on a sender with a timeout when
127/// the send operation times out or all receivers are dropped.
128#[derive(Copy, Clone, PartialEq, Eq)]
129pub enum SendTimeoutError<T> {
130    /// A timeout occurred when attempting to send the message.
131    Timeout(T),
132    /// All channel receivers were dropped and so the message has nobody to receive it.
133    Disconnected(T),
134}
135
136impl<T> SendTimeoutError<T> {
137    /// Consume the error, yielding the message that failed to send.
138    pub fn into_inner(self) -> T {
139        match self {
140            Self::Timeout(msg) | Self::Disconnected(msg) => msg,
141        }
142    }
143}
144
145impl<T> fmt::Debug for SendTimeoutError<T> {
146    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
147        "SendTimeoutError(..)".fmt(f)
148    }
149}
150
151impl<T> fmt::Display for SendTimeoutError<T> {
152    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
153        match self {
154            SendTimeoutError::Timeout(..) => "timed out sending on a full channel".fmt(f),
155            SendTimeoutError::Disconnected(..) => "sending on a closed channel".fmt(f),
156        }
157    }
158}
159
160impl<T> std::error::Error for SendTimeoutError<T> {}
161
162impl<T> From<SendError<T>> for SendTimeoutError<T> {
163    fn from(err: SendError<T>) -> Self {
164        match err {
165            SendError(item) => Self::Disconnected(item),
166        }
167    }
168}
169
170enum TrySendTimeoutError<T> {
171    Full(T),
172    Disconnected(T),
173    Timeout(T),
174}
175
176/// An error that may be emitted when attempting to wait for a value on a receiver when all senders
177/// are dropped and there are no more messages in the channel.
178#[derive(Copy, Clone, Debug, PartialEq, Eq)]
179pub enum RecvError {
180    /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received.
181    Disconnected,
182}
183
184impl fmt::Display for RecvError {
185    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
186        match self {
187            RecvError::Disconnected => "receiving on a closed channel".fmt(f),
188        }
189    }
190}
191
192impl std::error::Error for RecvError {}
193
194/// An error that may be emitted when attempting to fetch a value on a receiver when there are no
195/// messages in the channel. If there are no messages in the channel and all senders are dropped,
196/// then `TryRecvError::Disconnected` will be returned.
197#[derive(Copy, Clone, Debug, PartialEq, Eq)]
198pub enum TryRecvError {
199    /// The channel was empty when the receive was attempted.
200    Empty,
201    /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received.
202    Disconnected,
203}
204
205impl fmt::Display for TryRecvError {
206    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
207        match self {
208            TryRecvError::Empty => "receiving on an empty channel".fmt(f),
209            TryRecvError::Disconnected => "channel is empty and closed".fmt(f),
210        }
211    }
212}
213
214impl std::error::Error for TryRecvError {}
215
216impl From<RecvError> for TryRecvError {
217    fn from(err: RecvError) -> Self {
218        match err {
219            RecvError::Disconnected => Self::Disconnected,
220        }
221    }
222}
223
224/// An error that may be emitted when attempting to wait for a value on a receiver with a timeout
225/// when the receive operation times out or all senders are dropped and there are no values left
226/// in the channel.
227#[derive(Copy, Clone, Debug, PartialEq, Eq)]
228pub enum RecvTimeoutError {
229    /// A timeout occurred when attempting to receive a message.
230    Timeout,
231    /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received.
232    Disconnected,
233}
234
235impl fmt::Display for RecvTimeoutError {
236    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
237        match self {
238            RecvTimeoutError::Timeout => "timed out waiting on a channel".fmt(f),
239            RecvTimeoutError::Disconnected => "channel is empty and closed".fmt(f),
240        }
241    }
242}
243
244impl std::error::Error for RecvTimeoutError {}
245
246impl From<RecvError> for RecvTimeoutError {
247    fn from(err: RecvError) -> Self {
248        match err {
249            RecvError::Disconnected => Self::Disconnected,
250        }
251    }
252}
253
254enum TryRecvTimeoutError {
255    Empty,
256    Timeout,
257    Disconnected,
258}
259
260// TODO: Investigate some sort of invalidation flag for timeouts
261#[cfg(feature = "spin")]
262struct Hook<T, S: ?Sized>(Option<Spinlock<Option<T>>>, S);
263
264#[cfg(not(feature = "spin"))]
265struct Hook<T, S: ?Sized>(Option<Mutex<Option<T>>>, S);
266
267#[cfg(feature = "spin")]
268impl<T, S: ?Sized + Signal> Hook<T, S> {
269    pub fn slot(msg: Option<T>, signal: S) -> Arc<Self>
270    where
271        S: Sized,
272    {
273        Arc::new(Self(Some(Spinlock::new(msg)), signal))
274    }
275
276    fn lock(&self) -> Option<SpinlockGuard<'_, Option<T>>> {
277        self.0.as_ref().map(|s| s.lock())
278    }
279}
280
281#[cfg(not(feature = "spin"))]
282impl<T, S: ?Sized + Signal> Hook<T, S> {
283    pub fn slot(msg: Option<T>, signal: S) -> Arc<Self>
284    where
285        S: Sized,
286    {
287        Arc::new(Self(Some(Mutex::new(msg)), signal))
288    }
289
290    fn lock(&self) -> Option<MutexGuard<'_, Option<T>>> {
291        self.0.as_ref().map(|s| s.lock().unwrap())
292    }
293}
294
295impl<T, S: ?Sized + Signal> Hook<T, S> {
296    pub fn fire_recv(&self) -> (T, &S) {
297        let msg = self.lock().unwrap().take().unwrap();
298        (msg, self.signal())
299    }
300
301    pub fn fire_send(&self, msg: T) -> (Option<T>, &S) {
302        let ret = match self.lock() {
303            Some(mut lock) => {
304                *lock = Some(msg);
305                None
306            }
307            None => Some(msg),
308        };
309        (ret, self.signal())
310    }
311
312    pub fn is_empty(&self) -> bool {
313        self.lock().map(|s| s.is_none()).unwrap_or(true)
314    }
315
316    pub fn try_take(&self) -> Option<T> {
317        self.lock().unwrap().take()
318    }
319
320    pub fn trigger(signal: S) -> Arc<Self>
321    where
322        S: Sized,
323    {
324        Arc::new(Self(None, signal))
325    }
326
327    pub fn signal(&self) -> &S {
328        &self.1
329    }
330
331    pub fn fire_nothing(&self) -> bool {
332        self.signal().fire()
333    }
334}
335
336impl<T> Hook<T, SyncSignal> {
337    pub fn wait_recv(&self, abort: &AtomicBool) -> Option<T> {
338        loop {
339            let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
340            let msg = self.lock().unwrap().take();
341            if let Some(msg) = msg {
342                break Some(msg);
343            } else if disconnected {
344                break None;
345            } else {
346                self.signal().wait()
347            }
348        }
349    }
350
351    // Err(true) if timeout
352    pub fn wait_deadline_recv(&self, abort: &AtomicBool, deadline: Instant) -> Result<T, bool> {
353        loop {
354            let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
355            let msg = self.lock().unwrap().take();
356            if let Some(msg) = msg {
357                break Ok(msg);
358            } else if disconnected {
359                break Err(false);
360            } else if let Some(dur) = deadline.checked_duration_since(Instant::now()) {
361                self.signal().wait_timeout(dur);
362            } else {
363                break Err(true);
364            }
365        }
366    }
367
368    pub fn wait_send(&self, abort: &AtomicBool) {
369        loop {
370            let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
371            if disconnected || self.lock().unwrap().is_none() {
372                break;
373            }
374
375            self.signal().wait();
376        }
377    }
378
379    // Err(true) if timeout
380    pub fn wait_deadline_send(&self, abort: &AtomicBool, deadline: Instant) -> Result<(), bool> {
381        loop {
382            let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
383            if self.lock().unwrap().is_none() {
384                break Ok(());
385            } else if disconnected {
386                break Err(false);
387            } else if let Some(dur) = deadline.checked_duration_since(Instant::now()) {
388                self.signal().wait_timeout(dur);
389            } else {
390                break Err(true);
391            }
392        }
393    }
394}
395
396#[cfg(feature = "spin")]
397#[inline]
398fn wait_lock<T>(lock: &Spinlock<T>) -> SpinlockGuard<T> {
399    // Some targets don't support `thread::sleep` (e.g. the `wasm32-unknown-unknown` target when
400    // running in the main thread of a web browser) so we only use it on targets where we know it
401    // will work
402    #[cfg(any(target_family = "unix", target_family = "windows"))]
403    {
404        let mut i = 4;
405        loop {
406            for _ in 0..10 {
407                if let Some(guard) = lock.try_lock() {
408                    return guard;
409                }
410                thread::yield_now();
411            }
412            // Sleep for at most ~1 ms
413            thread::sleep(Duration::from_nanos(1 << i.min(20)));
414            i += 1;
415        }
416    }
417    #[cfg(not(any(target_family = "unix", target_family = "windows")))]
418    lock.lock()
419}
420
421#[cfg(not(feature = "spin"))]
422#[inline]
423fn wait_lock<'a, T>(lock: &'a Mutex<T>) -> MutexGuard<'a, T> {
424    lock.lock().unwrap()
425}
426
427#[cfg(not(feature = "spin"))]
428use std::sync::{Mutex, MutexGuard};
429
430#[cfg(feature = "spin")]
431type ChanLock<T> = Spinlock<T>;
432#[cfg(not(feature = "spin"))]
433type ChanLock<T> = Mutex<T>;
434
435
436type SignalVec<T> = VecDeque<Arc<Hook<T, dyn signal::Signal>>>;
437struct Chan<T> {
438    sending: Option<(usize, SignalVec<T>)>,
439    queue: VecDeque<T>,
440    waiting: SignalVec<T>,
441}
442
443impl<T> Chan<T> {
444    fn pull_pending(&mut self, pull_extra: bool) {
445        if let Some((cap, sending)) = &mut self.sending {
446            let effective_cap = *cap + pull_extra as usize;
447
448            while self.queue.len() < effective_cap {
449                if let Some(s) = sending.pop_front() {
450                    let (msg, signal) = s.fire_recv();
451                    signal.fire();
452                    self.queue.push_back(msg);
453                } else {
454                    break;
455                }
456            }
457        }
458    }
459
460    fn try_wake_receiver_if_pending(&mut self) {
461        if !self.queue.is_empty() {
462            while Some(false) == self.waiting.pop_front().map(|s| s.fire_nothing()) {}
463        }
464    }
465}
466
467struct Shared<T> {
468    chan: ChanLock<Chan<T>>,
469    disconnected: AtomicBool,
470    sender_count: AtomicUsize,
471    receiver_count: AtomicUsize,
472}
473
474impl<T> Shared<T> {
475    fn new(cap: Option<usize>) -> Self {
476        Self {
477            chan: ChanLock::new(Chan {
478                sending: cap.map(|cap| (cap, VecDeque::new())),
479                queue: VecDeque::new(),
480                waiting: VecDeque::new(),
481            }),
482            disconnected: AtomicBool::new(false),
483            sender_count: AtomicUsize::new(1),
484            receiver_count: AtomicUsize::new(1),
485        }
486    }
487
488    fn send<S: Signal, R: From<Result<(), TrySendTimeoutError<T>>>>(
489        &self,
490        msg: T,
491        should_block: bool,
492        make_signal: impl FnOnce(T) -> Arc<Hook<T, S>>,
493        do_block: impl FnOnce(Arc<Hook<T, S>>) -> R,
494    ) -> R {
495        let mut chan = wait_lock(&self.chan);
496
497        if self.is_disconnected() {
498            Err(TrySendTimeoutError::Disconnected(msg)).into()
499        } else if !chan.waiting.is_empty() {
500            let mut msg = Some(msg);
501
502            loop {
503                let slot = chan.waiting.pop_front();
504                match slot.as_ref().map(|r| r.fire_send(msg.take().unwrap())) {
505                    // No more waiting receivers and msg in queue, so break out of the loop
506                    None if msg.is_none() => break,
507                    // No more waiting receivers, so add msg to queue and break out of the loop
508                    None => {
509                        chan.queue.push_back(msg.unwrap());
510                        break;
511                    }
512                    Some((Some(m), signal)) => {
513                        if signal.fire() {
514                            // Was async and a stream, so didn't acquire the message. Wake another
515                            // receiver, and do not yet push the message.
516                            msg.replace(m);
517                            continue;
518                        } else {
519                            // Was async and not a stream, so it did acquire the message. Push the
520                            // message to the queue for it to be received.
521                            chan.queue.push_back(m);
522                            drop(chan);
523                            break;
524                        }
525                    },
526                    Some((None, signal)) => {
527                        drop(chan);
528                        signal.fire();
529                        break; // Was sync, so it has acquired the message
530                    },
531                }
532            }
533
534            Ok(()).into()
535        } else if chan.sending.as_ref().map(|(cap, _)| chan.queue.len() < *cap).unwrap_or(true) {
536            chan.queue.push_back(msg);
537            Ok(()).into()
538        } else if should_block { // Only bounded from here on
539            let hook = make_signal(msg);
540            chan.sending.as_mut().unwrap().1.push_back(hook.clone());
541            drop(chan);
542
543            do_block(hook)
544        } else {
545            Err(TrySendTimeoutError::Full(msg)).into()
546        }
547    }
548
549    fn send_sync(
550        &self,
551        msg: T,
552        block: Option<Option<Instant>>,
553    ) -> Result<(), TrySendTimeoutError<T>> {
554        self.send(
555            // msg
556            msg,
557            // should_block
558            block.is_some(),
559            // make_signal
560            |msg| Hook::slot(Some(msg), SyncSignal::default()),
561            // do_block
562            |hook| if let Some(deadline) = block.unwrap() {
563                hook.wait_deadline_send(&self.disconnected, deadline)
564                    .or_else(|timed_out| {
565                        if timed_out { // Remove our signal
566                            let hook: Arc<Hook<T, dyn signal::Signal>> = hook.clone();
567                            wait_lock(&self.chan).sending
568                                .as_mut()
569                                .unwrap().1
570                                .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
571                        }
572                        hook.try_take().map(|msg| if self.is_disconnected() {
573                            Err(TrySendTimeoutError::Disconnected(msg))
574                        } else {
575                            Err(TrySendTimeoutError::Timeout(msg))
576                        })
577                        .unwrap_or(Ok(()))
578                    })
579            } else {
580                hook.wait_send(&self.disconnected);
581
582                match hook.try_take() {
583                    Some(msg) => Err(TrySendTimeoutError::Disconnected(msg)),
584                    None => Ok(()),
585                }
586            },
587        )
588    }
589
590    fn recv<S: Signal, R: From<Result<T, TryRecvTimeoutError>>>(
591        &self,
592        should_block: bool,
593        make_signal: impl FnOnce() -> Arc<Hook<T, S>>,
594        do_block: impl FnOnce(Arc<Hook<T, S>>) -> R,
595    ) -> R {
596        let mut chan = wait_lock(&self.chan);
597        chan.pull_pending(true);
598
599        if let Some(msg) = chan.queue.pop_front() {
600            drop(chan);
601            Ok(msg).into()
602        } else if self.is_disconnected() {
603            drop(chan);
604            Err(TryRecvTimeoutError::Disconnected).into()
605        } else if should_block {
606            let hook = make_signal();
607            chan.waiting.push_back(hook.clone());
608            drop(chan);
609
610            do_block(hook)
611        } else {
612            drop(chan);
613            Err(TryRecvTimeoutError::Empty).into()
614        }
615    }
616
617    fn recv_sync(&self, block: Option<Option<Instant>>) -> Result<T, TryRecvTimeoutError> {
618        self.recv(
619            // should_block
620            block.is_some(),
621            // make_signal
622            || Hook::slot(None, SyncSignal::default()),
623            // do_block
624            |hook| if let Some(deadline) = block.unwrap() {
625                hook.wait_deadline_recv(&self.disconnected, deadline)
626                    .or_else(|timed_out| {
627                        if timed_out { // Remove our signal
628                            let hook: Arc<Hook<T, dyn Signal>> = hook.clone();
629                            wait_lock(&self.chan).waiting
630                                .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
631                        }
632                        match hook.try_take() {
633                            Some(msg) => Ok(msg),
634                            None => {
635                                let disconnected = self.is_disconnected(); // Check disconnect *before* msg
636                                if let Some(msg) = wait_lock(&self.chan).queue.pop_front() {
637                                    Ok(msg)
638                                } else if disconnected {
639                                    Err(TryRecvTimeoutError::Disconnected)
640                                } else {
641                                    Err(TryRecvTimeoutError::Timeout)
642                                }
643                            },
644                        }
645                    })
646            } else {
647                hook.wait_recv(&self.disconnected)
648                    .or_else(|| wait_lock(&self.chan).queue.pop_front())
649                    .ok_or(TryRecvTimeoutError::Disconnected)
650            },
651        )
652    }
653
654    /// Disconnect anything listening on this channel (this will not prevent receivers receiving
655    /// msgs that have already been sent)
656    fn disconnect_all(&self) {
657        self.disconnected.store(true, Ordering::Relaxed);
658
659        let mut chan = wait_lock(&self.chan);
660        chan.pull_pending(false);
661        if let Some((_, sending)) = chan.sending.as_ref() {
662            sending.iter().for_each(|hook| {
663                hook.signal().fire();
664            })
665        }
666        chan.waiting.iter().for_each(|hook| {
667            hook.signal().fire();
668        });
669    }
670
671    fn is_disconnected(&self) -> bool {
672        self.disconnected.load(Ordering::SeqCst)
673    }
674
675    fn is_empty(&self) -> bool {
676        self.len() == 0
677    }
678
679    fn is_full(&self) -> bool {
680        self.capacity().map(|cap| cap == self.len()).unwrap_or(false)
681    }
682
683    fn len(&self) -> usize {
684        let mut chan = wait_lock(&self.chan);
685        chan.pull_pending(false);
686        chan.queue.len()
687    }
688
689    fn capacity(&self) -> Option<usize> {
690        wait_lock(&self.chan).sending.as_ref().map(|(cap, _)| *cap)
691    }
692
693    fn sender_count(&self) -> usize {
694        self.sender_count.load(Ordering::Relaxed)
695    }
696
697    fn receiver_count(&self) -> usize {
698        self.receiver_count.load(Ordering::Relaxed)
699    }
700}
701
702/// A transmitting end of a channel.
703pub struct Sender<T> {
704    shared: Arc<Shared<T>>,
705}
706
707impl<T> Sender<T> {
708    /// Attempt to send a value into the channel. If the channel is bounded and full, or all
709    /// receivers have been dropped, an error is returned. If the channel associated with this
710    /// sender is unbounded, this method has the same behaviour as [`Sender::send`].
711    pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
712        self.shared.send_sync(msg, None).map_err(|err| match err {
713            TrySendTimeoutError::Full(msg) => TrySendError::Full(msg),
714            TrySendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
715            _ => unreachable!(),
716        })
717    }
718
719    /// Send a value into the channel, returning an error if all receivers have been dropped.
720    /// If the channel is bounded and is full, this method will block until space is available
721    /// or all receivers have been dropped. If the channel is unbounded, this method will not
722    /// block.
723    pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
724        self.shared.send_sync(msg, Some(None)).map_err(|err| match err {
725            TrySendTimeoutError::Disconnected(msg) => SendError(msg),
726            _ => unreachable!(),
727        })
728    }
729
730    /// Send a value into the channel, returning an error if all receivers have been dropped
731    /// or the deadline has passed. If the channel is bounded and is full, this method will
732    /// block until space is available, the deadline is reached, or all receivers have been
733    /// dropped.
734    pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
735        self.shared.send_sync(msg, Some(Some(deadline))).map_err(|err| match err {
736            TrySendTimeoutError::Disconnected(msg) => SendTimeoutError::Disconnected(msg),
737            TrySendTimeoutError::Timeout(msg) => SendTimeoutError::Timeout(msg),
738            _ => unreachable!(),
739        })
740    }
741
742    /// Send a value into the channel, returning an error if all receivers have been dropped
743    /// or the timeout has expired. If the channel is bounded and is full, this method will
744    /// block until space is available, the timeout has expired, or all receivers have been
745    /// dropped.
746    pub fn send_timeout(&self, msg: T, dur: Duration) -> Result<(), SendTimeoutError<T>> {
747        self.send_deadline(msg, Instant::now().checked_add(dur).unwrap())
748    }
749
750    /// Returns true if all receivers for this channel have been dropped.
751    pub fn is_disconnected(&self) -> bool {
752        self.shared.is_disconnected()
753    }
754
755    /// Returns true if the channel is empty.
756    /// Note: Zero-capacity channels are always empty.
757    pub fn is_empty(&self) -> bool {
758        self.shared.is_empty()
759    }
760
761    /// Returns true if the channel is full.
762    /// Note: Zero-capacity channels are always full.
763    pub fn is_full(&self) -> bool {
764        self.shared.is_full()
765    }
766
767    /// Returns the number of messages in the channel
768    pub fn len(&self) -> usize {
769        self.shared.len()
770    }
771
772    /// If the channel is bounded, returns its capacity.
773    pub fn capacity(&self) -> Option<usize> {
774        self.shared.capacity()
775    }
776
777    /// Get the number of senders that currently exist, including this one.
778    pub fn sender_count(&self) -> usize {
779        self.shared.sender_count()
780    }
781
782    /// Get the number of receivers that currently exist.
783    ///
784    /// Note that this method makes no guarantees that a subsequent send will succeed; it's
785    /// possible that between `receiver_count()` being called and a `send()`, all open receivers
786    /// could drop.
787    pub fn receiver_count(&self) -> usize {
788        self.shared.receiver_count()
789    }
790
791    /// Creates a [`WeakSender`] that does not keep the channel open.
792    ///
793    /// The channel is closed once all `Sender`s are dropped, even if there
794    /// are still active `WeakSender`s.
795    pub fn downgrade(&self) -> WeakSender<T> {
796        WeakSender {
797            shared: Arc::downgrade(&self.shared),
798        }
799    }
800
801    /// Returns whether the senders are belong to the same channel.
802    pub fn same_channel(&self, other: &Sender<T>) -> bool {
803        Arc::ptr_eq(&self.shared, &other.shared)
804    }
805}
806
807impl<T> Clone for Sender<T> {
808    /// Clone this sender. [`Sender`] acts as a handle to the ending a channel. Remaining channel
809    /// contents will only be cleaned up when all senders and the receiver have been dropped.
810    fn clone(&self) -> Self {
811        self.shared.sender_count.fetch_add(1, Ordering::Relaxed);
812        Self { shared: self.shared.clone() }
813    }
814}
815
816impl<T> fmt::Debug for Sender<T> {
817    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
818        f.debug_struct("Sender").finish()
819    }
820}
821
822impl<T> Drop for Sender<T> {
823    fn drop(&mut self) {
824        // Notify receivers that all senders have been dropped if the number of senders drops to 0.
825        if self.shared.sender_count.fetch_sub(1, Ordering::Relaxed) == 1 {
826            self.shared.disconnect_all();
827        }
828    }
829}
830
831/// A sender that does not prevent the channel from being closed.
832///
833/// Weak senders do not count towards the number of active senders on the channel. As soon as
834/// all normal [`Sender`]s are dropped, the channel is closed, even if there is still a
835/// `WeakSender`.
836///
837/// To send messages, a `WeakSender` must first be upgraded to a `Sender` using the [`upgrade`]
838/// method.
839pub struct WeakSender<T> {
840    shared: Weak<Shared<T>>,
841}
842
843impl<T> WeakSender<T> {
844    /// Tries to upgrade the `WeakSender` to a [`Sender`], in order to send messages.
845    ///
846    /// Returns `None` if the channel was closed already. Note that a `Some` return value
847    /// does not guarantee that the channel is still open.
848    pub fn upgrade(&self) -> Option<Sender<T>> {
849        self.shared
850            .upgrade()
851            // check that there are still live senders
852            .filter(|shared| {
853                shared
854                    .sender_count
855                    .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |count| {
856                        if count == 0 {
857                            // all senders are closed already -> don't increase the sender count
858                            None
859                        } else {
860                            // there is still at least one active sender
861                            Some(count + 1)
862                        }
863                    })
864                    .is_ok()
865            })
866            .map(|shared| Sender { shared })
867    }
868}
869
870impl<T> fmt::Debug for WeakSender<T> {
871    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
872        f.debug_struct("WeakSender").finish()
873    }
874}
875
876impl<T> Clone for WeakSender<T> {
877    /// Clones this [`WeakSender`].
878    fn clone(&self) -> Self {
879        Self { shared: self.shared.clone() }
880    }
881}
882
883/// The receiving end of a channel.
884///
885/// Note: Cloning the receiver *does not* turn this channel into a broadcast channel.
886/// Each message will only be received by a single receiver. This is useful for
887/// implementing work stealing for concurrent programs.
888pub struct Receiver<T> {
889    shared: Arc<Shared<T>>,
890}
891
892impl<T> Receiver<T> {
893    /// Attempt to fetch an incoming value from the channel associated with this receiver,
894    /// returning an error if the channel is empty or if all senders have been dropped.
895    pub fn try_recv(&self) -> Result<T, TryRecvError> {
896        self.shared.recv_sync(None).map_err(|err| match err {
897            TryRecvTimeoutError::Disconnected => TryRecvError::Disconnected,
898            TryRecvTimeoutError::Empty => TryRecvError::Empty,
899            _ => unreachable!(),
900        })
901    }
902
903    /// Wait for an incoming value from the channel associated with this receiver, returning an
904    /// error if all senders have been dropped.
905    pub fn recv(&self) -> Result<T, RecvError> {
906        self.shared.recv_sync(Some(None)).map_err(|err| match err {
907            TryRecvTimeoutError::Disconnected => RecvError::Disconnected,
908            _ => unreachable!(),
909        })
910    }
911
912    /// Wait for an incoming value from the channel associated with this receiver, returning an
913    /// error if all senders have been dropped or the deadline has passed.
914    pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
915        self.shared.recv_sync(Some(Some(deadline))).map_err(|err| match err {
916            TryRecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected,
917            TryRecvTimeoutError::Timeout => RecvTimeoutError::Timeout,
918            _ => unreachable!(),
919        })
920    }
921
922    /// Wait for an incoming value from the channel associated with this receiver, returning an
923    /// error if all senders have been dropped or the timeout has expired.
924    pub fn recv_timeout(&self, dur: Duration) -> Result<T, RecvTimeoutError> {
925        self.recv_deadline(Instant::now().checked_add(dur).unwrap())
926    }
927
928    /// Create a blocking iterator over the values received on the channel that finishes iteration
929    /// when all senders have been dropped.
930    ///
931    /// You can also create a self-owned iterator with [`Receiver::into_iter`].
932    pub fn iter(&self) -> Iter<T> {
933        Iter { receiver: &self }
934    }
935
936    /// A non-blocking iterator over the values received on the channel that finishes iteration
937    /// when all senders have been dropped or the channel is empty.
938    pub fn try_iter(&self) -> TryIter<T> {
939        TryIter { receiver: &self }
940    }
941
942    /// Take all msgs currently sitting in the channel and produce an iterator over them. Unlike
943    /// `try_iter`, the iterator will not attempt to fetch any more values from the channel once
944    /// the function has been called.
945    pub fn drain(&self) -> Drain<T> {
946        let mut chan = wait_lock(&self.shared.chan);
947        chan.pull_pending(false);
948        let queue = std::mem::take(&mut chan.queue);
949
950        Drain { queue, _phantom: PhantomData }
951    }
952
953    /// Returns true if all senders for this channel have been dropped.
954    pub fn is_disconnected(&self) -> bool {
955        self.shared.is_disconnected()
956    }
957
958    /// Returns true if the channel is empty.
959    /// Note: Zero-capacity channels are always empty.
960    pub fn is_empty(&self) -> bool {
961        self.shared.is_empty()
962    }
963
964    /// Returns true if the channel is full.
965    /// Note: Zero-capacity channels are always full.
966    pub fn is_full(&self) -> bool {
967        self.shared.is_full()
968    }
969
970    /// Returns the number of messages in the channel.
971    pub fn len(&self) -> usize {
972        self.shared.len()
973    }
974
975    /// If the channel is bounded, returns its capacity.
976    pub fn capacity(&self) -> Option<usize> {
977        self.shared.capacity()
978    }
979
980    /// Get the number of senders that currently exist.
981    pub fn sender_count(&self) -> usize {
982        self.shared.sender_count()
983    }
984
985    /// Get the number of receivers that currently exist, including this one.
986    pub fn receiver_count(&self) -> usize {
987        self.shared.receiver_count()
988    }
989
990    /// Returns whether the receivers are belong to the same channel.
991    pub fn same_channel(&self, other: &Receiver<T>) -> bool {
992        Arc::ptr_eq(&self.shared, &other.shared)
993    }
994}
995
996impl<T> Clone for Receiver<T> {
997    /// Clone this receiver. [`Receiver`] acts as a handle to the ending a channel. Remaining
998    /// channel contents will only be cleaned up when all senders and the receiver have been
999    /// dropped.
1000    ///
1001    /// Note: Cloning the receiver *does not* turn this channel into a broadcast channel.
1002    /// Each message will only be received by a single receiver. This is useful for
1003    /// implementing work stealing for concurrent programs.
1004    fn clone(&self) -> Self {
1005        self.shared.receiver_count.fetch_add(1, Ordering::Relaxed);
1006        Self { shared: self.shared.clone() }
1007    }
1008}
1009
1010impl<T> fmt::Debug for Receiver<T> {
1011    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1012        f.debug_struct("Receiver").finish()
1013    }
1014}
1015
1016impl<T> Drop for Receiver<T> {
1017    fn drop(&mut self) {
1018        // Notify senders that all receivers have been dropped if the number of receivers drops
1019        // to 0.
1020        if self.shared.receiver_count.fetch_sub(1, Ordering::Relaxed) == 1 {
1021            self.shared.disconnect_all();
1022        }
1023    }
1024}
1025
1026/// This exists as a shorthand for [`Receiver::iter`].
1027impl<'a, T> IntoIterator for &'a Receiver<T> {
1028    type Item = T;
1029    type IntoIter = Iter<'a, T>;
1030
1031    fn into_iter(self) -> Self::IntoIter {
1032        Iter { receiver: self }
1033    }
1034}
1035
1036impl<T> IntoIterator for Receiver<T> {
1037    type Item = T;
1038    type IntoIter = IntoIter<T>;
1039
1040    /// Creates a self-owned but semantically equivalent alternative to [`Receiver::iter`].
1041    fn into_iter(self) -> Self::IntoIter {
1042        IntoIter { receiver: self }
1043    }
1044}
1045
1046/// An iterator over the msgs received from a channel.
1047pub struct Iter<'a, T> {
1048    receiver: &'a Receiver<T>,
1049}
1050
1051impl<'a, T> fmt::Debug for Iter<'a, T> {
1052    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1053        f.debug_struct("Iter").field("receiver", &self.receiver).finish()
1054    }
1055}
1056
1057impl<'a, T> Iterator for Iter<'a, T> {
1058    type Item = T;
1059
1060    fn next(&mut self) -> Option<Self::Item> {
1061        self.receiver.recv().ok()
1062    }
1063}
1064
1065/// An non-blocking iterator over the msgs received from a channel.
1066pub struct TryIter<'a, T> {
1067    receiver: &'a Receiver<T>,
1068}
1069
1070impl<'a, T> fmt::Debug for TryIter<'a, T> {
1071    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1072        f.debug_struct("TryIter").field("receiver", &self.receiver).finish()
1073    }
1074}
1075
1076impl<'a, T> Iterator for TryIter<'a, T> {
1077    type Item = T;
1078
1079    fn next(&mut self) -> Option<Self::Item> {
1080        self.receiver.try_recv().ok()
1081    }
1082}
1083
1084/// An fixed-sized iterator over the msgs drained from a channel.
1085#[derive(Debug)]
1086pub struct Drain<'a, T> {
1087    queue: VecDeque<T>,
1088    /// A phantom field used to constrain the lifetime of this iterator. We do this because the
1089    /// implementation may change and we don't want to unintentionally constrain it. Removing this
1090    /// lifetime later is a possibility.
1091    _phantom: PhantomData<&'a ()>,
1092}
1093
1094impl<'a, T> Iterator for Drain<'a, T> {
1095    type Item = T;
1096
1097    fn next(&mut self) -> Option<Self::Item> {
1098        self.queue.pop_front()
1099    }
1100}
1101
1102impl<'a, T> ExactSizeIterator for Drain<'a, T> {
1103    fn len(&self) -> usize {
1104        self.queue.len()
1105    }
1106}
1107
1108/// An owned iterator over the msgs received from a channel.
1109pub struct IntoIter<T> {
1110    receiver: Receiver<T>,
1111}
1112
1113impl<T> fmt::Debug for IntoIter<T> {
1114    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1115        f.debug_struct("IntoIter").field("receiver", &self.receiver).finish()
1116    }
1117}
1118
1119impl<T> Iterator for IntoIter<T> {
1120    type Item = T;
1121
1122    fn next(&mut self) -> Option<Self::Item> {
1123        self.receiver.recv().ok()
1124    }
1125}
1126
1127/// Create a channel with no maximum capacity.
1128///
1129/// Create an unbounded channel with a [`Sender`] and [`Receiver`] connected to each end respectively. Values sent in
1130/// one end of the channel will be received on the other end. The channel is thread-safe, and both [`Sender`] and
1131/// [`Receiver`] may be sent to or shared between threads as necessary. In addition, both [`Sender`] and [`Receiver`]
1132/// may be cloned.
1133///
1134/// # Examples
1135/// ```
1136/// let (tx, rx) = flume::unbounded();
1137///
1138/// tx.send(42).unwrap();
1139/// assert_eq!(rx.recv().unwrap(), 42);
1140/// ```
1141pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
1142    let shared = Arc::new(Shared::new(None));
1143    (
1144        Sender { shared: shared.clone() },
1145        Receiver { shared },
1146    )
1147}
1148
1149/// Create a channel with a maximum capacity.
1150///
1151/// Create a bounded channel with a [`Sender`] and [`Receiver`] connected to each end respectively. Values sent in one
1152/// end of the channel will be received on the other end. The channel is thread-safe, and both [`Sender`] and
1153/// [`Receiver`] may be sent to or shared between threads as necessary. In addition, both [`Sender`] and [`Receiver`]
1154/// may be cloned.
1155///
1156/// Unlike an [`unbounded`] channel, if there is no space left for new messages, calls to
1157/// [`Sender::send`] will block (unblocking once a receiver has made space). If blocking behaviour
1158/// is not desired, [`Sender::try_send`] may be used.
1159///
1160/// Like `std::sync::mpsc`, `flume` supports 'rendezvous' channels. A bounded queue with a maximum capacity of zero
1161/// will block senders until a receiver is available to take the value. You can imagine a rendezvous channel as a
1162/// ['Glienicke Bridge'](https://en.wikipedia.org/wiki/Glienicke_Bridge)-style location at which senders and receivers
1163/// perform a handshake and transfer ownership of a value.
1164///
1165/// # Examples
1166/// ```
1167/// let (tx, rx) = flume::bounded(32);
1168///
1169/// for i in 1..33 {
1170///     tx.send(i).unwrap();
1171/// }
1172/// assert!(tx.try_send(33).is_err());
1173///
1174/// assert_eq!(rx.try_iter().sum::<u32>(), (1..33).sum());
1175/// ```
1176pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
1177    let shared = Arc::new(Shared::new(Some(cap)));
1178    (
1179        Sender { shared: shared.clone() },
1180        Receiver { shared },
1181    )
1182}