1#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
29#![deny(missing_docs)]
30
31#[cfg(feature = "select")]
32pub mod select;
33#[cfg(feature = "async")]
34pub mod r#async;
35
36mod signal;
37
38#[cfg(feature = "select")]
40pub use select::Selector;
41
42use std::{
43 collections::VecDeque,
44 sync::{Arc, atomic::{AtomicUsize, AtomicBool, Ordering}, Weak},
45 time::{Duration, Instant},
46 marker::PhantomData,
47 thread,
48 fmt,
49};
50use std::fmt::Formatter;
51#[cfg(feature = "spin")]
52use spin1::{Mutex as Spinlock, MutexGuard as SpinlockGuard};
53use crate::signal::{Signal, SyncSignal};
54
55#[derive(Copy, Clone, PartialEq, Eq)]
58pub struct SendError<T>(pub T);
59
60impl<T> SendError<T> {
61 pub fn into_inner(self) -> T { self.0 }
63}
64
65impl<T> fmt::Debug for SendError<T> {
66 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67 "SendError(..)".fmt(f)
68 }
69}
70
71impl<T> fmt::Display for SendError<T> {
72 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73 "sending on a closed channel".fmt(f)
74 }
75}
76
77impl<T> std::error::Error for SendError<T> {}
78
79#[derive(Copy, Clone, PartialEq, Eq)]
82pub enum TrySendError<T> {
83 Full(T),
85 Disconnected(T),
87}
88
89impl<T> TrySendError<T> {
90 pub fn into_inner(self) -> T {
92 match self {
93 Self::Full(msg) | Self::Disconnected(msg) => msg,
94 }
95 }
96}
97
98impl<T> fmt::Debug for TrySendError<T> {
99 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100 match *self {
101 TrySendError::Full(..) => "Full(..)".fmt(f),
102 TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
103 }
104 }
105}
106
107impl<T> fmt::Display for TrySendError<T> {
108 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109 match self {
110 TrySendError::Full(..) => "sending on a full channel".fmt(f),
111 TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f),
112 }
113 }
114}
115
116impl<T> std::error::Error for TrySendError<T> {}
117
118impl<T> From<SendError<T>> for TrySendError<T> {
119 fn from(err: SendError<T>) -> Self {
120 match err {
121 SendError(item) => Self::Disconnected(item),
122 }
123 }
124}
125
126#[derive(Copy, Clone, PartialEq, Eq)]
129pub enum SendTimeoutError<T> {
130 Timeout(T),
132 Disconnected(T),
134}
135
136impl<T> SendTimeoutError<T> {
137 pub fn into_inner(self) -> T {
139 match self {
140 Self::Timeout(msg) | Self::Disconnected(msg) => msg,
141 }
142 }
143}
144
145impl<T> fmt::Debug for SendTimeoutError<T> {
146 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
147 "SendTimeoutError(..)".fmt(f)
148 }
149}
150
151impl<T> fmt::Display for SendTimeoutError<T> {
152 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
153 match self {
154 SendTimeoutError::Timeout(..) => "timed out sending on a full channel".fmt(f),
155 SendTimeoutError::Disconnected(..) => "sending on a closed channel".fmt(f),
156 }
157 }
158}
159
160impl<T> std::error::Error for SendTimeoutError<T> {}
161
162impl<T> From<SendError<T>> for SendTimeoutError<T> {
163 fn from(err: SendError<T>) -> Self {
164 match err {
165 SendError(item) => Self::Disconnected(item),
166 }
167 }
168}
169
170enum TrySendTimeoutError<T> {
171 Full(T),
172 Disconnected(T),
173 Timeout(T),
174}
175
176#[derive(Copy, Clone, Debug, PartialEq, Eq)]
179pub enum RecvError {
180 Disconnected,
182}
183
184impl fmt::Display for RecvError {
185 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
186 match self {
187 RecvError::Disconnected => "receiving on a closed channel".fmt(f),
188 }
189 }
190}
191
192impl std::error::Error for RecvError {}
193
194#[derive(Copy, Clone, Debug, PartialEq, Eq)]
198pub enum TryRecvError {
199 Empty,
201 Disconnected,
203}
204
205impl fmt::Display for TryRecvError {
206 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
207 match self {
208 TryRecvError::Empty => "receiving on an empty channel".fmt(f),
209 TryRecvError::Disconnected => "channel is empty and closed".fmt(f),
210 }
211 }
212}
213
214impl std::error::Error for TryRecvError {}
215
216impl From<RecvError> for TryRecvError {
217 fn from(err: RecvError) -> Self {
218 match err {
219 RecvError::Disconnected => Self::Disconnected,
220 }
221 }
222}
223
224#[derive(Copy, Clone, Debug, PartialEq, Eq)]
228pub enum RecvTimeoutError {
229 Timeout,
231 Disconnected,
233}
234
235impl fmt::Display for RecvTimeoutError {
236 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
237 match self {
238 RecvTimeoutError::Timeout => "timed out waiting on a channel".fmt(f),
239 RecvTimeoutError::Disconnected => "channel is empty and closed".fmt(f),
240 }
241 }
242}
243
244impl std::error::Error for RecvTimeoutError {}
245
246impl From<RecvError> for RecvTimeoutError {
247 fn from(err: RecvError) -> Self {
248 match err {
249 RecvError::Disconnected => Self::Disconnected,
250 }
251 }
252}
253
254enum TryRecvTimeoutError {
255 Empty,
256 Timeout,
257 Disconnected,
258}
259
260#[cfg(feature = "spin")]
262struct Hook<T, S: ?Sized>(Option<Spinlock<Option<T>>>, S);
263
264#[cfg(not(feature = "spin"))]
265struct Hook<T, S: ?Sized>(Option<Mutex<Option<T>>>, S);
266
267#[cfg(feature = "spin")]
268impl<T, S: ?Sized + Signal> Hook<T, S> {
269 pub fn slot(msg: Option<T>, signal: S) -> Arc<Self>
270 where
271 S: Sized,
272 {
273 Arc::new(Self(Some(Spinlock::new(msg)), signal))
274 }
275
276 fn lock(&self) -> Option<SpinlockGuard<'_, Option<T>>> {
277 self.0.as_ref().map(|s| s.lock())
278 }
279}
280
281#[cfg(not(feature = "spin"))]
282impl<T, S: ?Sized + Signal> Hook<T, S> {
283 pub fn slot(msg: Option<T>, signal: S) -> Arc<Self>
284 where
285 S: Sized,
286 {
287 Arc::new(Self(Some(Mutex::new(msg)), signal))
288 }
289
290 fn lock(&self) -> Option<MutexGuard<'_, Option<T>>> {
291 self.0.as_ref().map(|s| s.lock().unwrap())
292 }
293}
294
295impl<T, S: ?Sized + Signal> Hook<T, S> {
296 pub fn fire_recv(&self) -> (T, &S) {
297 let msg = self.lock().unwrap().take().unwrap();
298 (msg, self.signal())
299 }
300
301 pub fn fire_send(&self, msg: T) -> (Option<T>, &S) {
302 let ret = match self.lock() {
303 Some(mut lock) => {
304 *lock = Some(msg);
305 None
306 }
307 None => Some(msg),
308 };
309 (ret, self.signal())
310 }
311
312 pub fn is_empty(&self) -> bool {
313 self.lock().map(|s| s.is_none()).unwrap_or(true)
314 }
315
316 pub fn try_take(&self) -> Option<T> {
317 self.lock().unwrap().take()
318 }
319
320 pub fn trigger(signal: S) -> Arc<Self>
321 where
322 S: Sized,
323 {
324 Arc::new(Self(None, signal))
325 }
326
327 pub fn signal(&self) -> &S {
328 &self.1
329 }
330
331 pub fn fire_nothing(&self) -> bool {
332 self.signal().fire()
333 }
334}
335
336impl<T> Hook<T, SyncSignal> {
337 pub fn wait_recv(&self, abort: &AtomicBool) -> Option<T> {
338 loop {
339 let disconnected = abort.load(Ordering::SeqCst); let msg = self.lock().unwrap().take();
341 if let Some(msg) = msg {
342 break Some(msg);
343 } else if disconnected {
344 break None;
345 } else {
346 self.signal().wait()
347 }
348 }
349 }
350
351 pub fn wait_deadline_recv(&self, abort: &AtomicBool, deadline: Instant) -> Result<T, bool> {
353 loop {
354 let disconnected = abort.load(Ordering::SeqCst); let msg = self.lock().unwrap().take();
356 if let Some(msg) = msg {
357 break Ok(msg);
358 } else if disconnected {
359 break Err(false);
360 } else if let Some(dur) = deadline.checked_duration_since(Instant::now()) {
361 self.signal().wait_timeout(dur);
362 } else {
363 break Err(true);
364 }
365 }
366 }
367
368 pub fn wait_send(&self, abort: &AtomicBool) {
369 loop {
370 let disconnected = abort.load(Ordering::SeqCst); if disconnected || self.lock().unwrap().is_none() {
372 break;
373 }
374
375 self.signal().wait();
376 }
377 }
378
379 pub fn wait_deadline_send(&self, abort: &AtomicBool, deadline: Instant) -> Result<(), bool> {
381 loop {
382 let disconnected = abort.load(Ordering::SeqCst); if self.lock().unwrap().is_none() {
384 break Ok(());
385 } else if disconnected {
386 break Err(false);
387 } else if let Some(dur) = deadline.checked_duration_since(Instant::now()) {
388 self.signal().wait_timeout(dur);
389 } else {
390 break Err(true);
391 }
392 }
393 }
394}
395
396#[cfg(feature = "spin")]
397#[inline]
398fn wait_lock<T>(lock: &Spinlock<T>) -> SpinlockGuard<T> {
399 #[cfg(any(target_family = "unix", target_family = "windows"))]
403 {
404 let mut i = 4;
405 loop {
406 for _ in 0..10 {
407 if let Some(guard) = lock.try_lock() {
408 return guard;
409 }
410 thread::yield_now();
411 }
412 thread::sleep(Duration::from_nanos(1 << i.min(20)));
414 i += 1;
415 }
416 }
417 #[cfg(not(any(target_family = "unix", target_family = "windows")))]
418 lock.lock()
419}
420
421#[cfg(not(feature = "spin"))]
422#[inline]
423fn wait_lock<'a, T>(lock: &'a Mutex<T>) -> MutexGuard<'a, T> {
424 lock.lock().unwrap()
425}
426
427#[cfg(not(feature = "spin"))]
428use std::sync::{Mutex, MutexGuard};
429
430#[cfg(feature = "spin")]
431type ChanLock<T> = Spinlock<T>;
432#[cfg(not(feature = "spin"))]
433type ChanLock<T> = Mutex<T>;
434
435
436type SignalVec<T> = VecDeque<Arc<Hook<T, dyn signal::Signal>>>;
437struct Chan<T> {
438 sending: Option<(usize, SignalVec<T>)>,
439 queue: VecDeque<T>,
440 waiting: SignalVec<T>,
441}
442
443impl<T> Chan<T> {
444 fn pull_pending(&mut self, pull_extra: bool) {
445 if let Some((cap, sending)) = &mut self.sending {
446 let effective_cap = *cap + pull_extra as usize;
447
448 while self.queue.len() < effective_cap {
449 if let Some(s) = sending.pop_front() {
450 let (msg, signal) = s.fire_recv();
451 signal.fire();
452 self.queue.push_back(msg);
453 } else {
454 break;
455 }
456 }
457 }
458 }
459
460 fn try_wake_receiver_if_pending(&mut self) {
461 if !self.queue.is_empty() {
462 while Some(false) == self.waiting.pop_front().map(|s| s.fire_nothing()) {}
463 }
464 }
465}
466
467struct Shared<T> {
468 chan: ChanLock<Chan<T>>,
469 disconnected: AtomicBool,
470 sender_count: AtomicUsize,
471 receiver_count: AtomicUsize,
472}
473
474impl<T> Shared<T> {
475 fn new(cap: Option<usize>) -> Self {
476 Self {
477 chan: ChanLock::new(Chan {
478 sending: cap.map(|cap| (cap, VecDeque::new())),
479 queue: VecDeque::new(),
480 waiting: VecDeque::new(),
481 }),
482 disconnected: AtomicBool::new(false),
483 sender_count: AtomicUsize::new(1),
484 receiver_count: AtomicUsize::new(1),
485 }
486 }
487
488 fn send<S: Signal, R: From<Result<(), TrySendTimeoutError<T>>>>(
489 &self,
490 msg: T,
491 should_block: bool,
492 make_signal: impl FnOnce(T) -> Arc<Hook<T, S>>,
493 do_block: impl FnOnce(Arc<Hook<T, S>>) -> R,
494 ) -> R {
495 let mut chan = wait_lock(&self.chan);
496
497 if self.is_disconnected() {
498 Err(TrySendTimeoutError::Disconnected(msg)).into()
499 } else if !chan.waiting.is_empty() {
500 let mut msg = Some(msg);
501
502 loop {
503 let slot = chan.waiting.pop_front();
504 match slot.as_ref().map(|r| r.fire_send(msg.take().unwrap())) {
505 None if msg.is_none() => break,
507 None => {
509 chan.queue.push_back(msg.unwrap());
510 break;
511 }
512 Some((Some(m), signal)) => {
513 if signal.fire() {
514 msg.replace(m);
517 continue;
518 } else {
519 chan.queue.push_back(m);
522 drop(chan);
523 break;
524 }
525 },
526 Some((None, signal)) => {
527 drop(chan);
528 signal.fire();
529 break; },
531 }
532 }
533
534 Ok(()).into()
535 } else if chan.sending.as_ref().map(|(cap, _)| chan.queue.len() < *cap).unwrap_or(true) {
536 chan.queue.push_back(msg);
537 Ok(()).into()
538 } else if should_block { let hook = make_signal(msg);
540 chan.sending.as_mut().unwrap().1.push_back(hook.clone());
541 drop(chan);
542
543 do_block(hook)
544 } else {
545 Err(TrySendTimeoutError::Full(msg)).into()
546 }
547 }
548
549 fn send_sync(
550 &self,
551 msg: T,
552 block: Option<Option<Instant>>,
553 ) -> Result<(), TrySendTimeoutError<T>> {
554 self.send(
555 msg,
557 block.is_some(),
559 |msg| Hook::slot(Some(msg), SyncSignal::default()),
561 |hook| if let Some(deadline) = block.unwrap() {
563 hook.wait_deadline_send(&self.disconnected, deadline)
564 .or_else(|timed_out| {
565 if timed_out { let hook: Arc<Hook<T, dyn signal::Signal>> = hook.clone();
567 wait_lock(&self.chan).sending
568 .as_mut()
569 .unwrap().1
570 .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
571 }
572 hook.try_take().map(|msg| if self.is_disconnected() {
573 Err(TrySendTimeoutError::Disconnected(msg))
574 } else {
575 Err(TrySendTimeoutError::Timeout(msg))
576 })
577 .unwrap_or(Ok(()))
578 })
579 } else {
580 hook.wait_send(&self.disconnected);
581
582 match hook.try_take() {
583 Some(msg) => Err(TrySendTimeoutError::Disconnected(msg)),
584 None => Ok(()),
585 }
586 },
587 )
588 }
589
590 fn recv<S: Signal, R: From<Result<T, TryRecvTimeoutError>>>(
591 &self,
592 should_block: bool,
593 make_signal: impl FnOnce() -> Arc<Hook<T, S>>,
594 do_block: impl FnOnce(Arc<Hook<T, S>>) -> R,
595 ) -> R {
596 let mut chan = wait_lock(&self.chan);
597 chan.pull_pending(true);
598
599 if let Some(msg) = chan.queue.pop_front() {
600 drop(chan);
601 Ok(msg).into()
602 } else if self.is_disconnected() {
603 drop(chan);
604 Err(TryRecvTimeoutError::Disconnected).into()
605 } else if should_block {
606 let hook = make_signal();
607 chan.waiting.push_back(hook.clone());
608 drop(chan);
609
610 do_block(hook)
611 } else {
612 drop(chan);
613 Err(TryRecvTimeoutError::Empty).into()
614 }
615 }
616
617 fn recv_sync(&self, block: Option<Option<Instant>>) -> Result<T, TryRecvTimeoutError> {
618 self.recv(
619 block.is_some(),
621 || Hook::slot(None, SyncSignal::default()),
623 |hook| if let Some(deadline) = block.unwrap() {
625 hook.wait_deadline_recv(&self.disconnected, deadline)
626 .or_else(|timed_out| {
627 if timed_out { let hook: Arc<Hook<T, dyn Signal>> = hook.clone();
629 wait_lock(&self.chan).waiting
630 .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
631 }
632 match hook.try_take() {
633 Some(msg) => Ok(msg),
634 None => {
635 let disconnected = self.is_disconnected(); if let Some(msg) = wait_lock(&self.chan).queue.pop_front() {
637 Ok(msg)
638 } else if disconnected {
639 Err(TryRecvTimeoutError::Disconnected)
640 } else {
641 Err(TryRecvTimeoutError::Timeout)
642 }
643 },
644 }
645 })
646 } else {
647 hook.wait_recv(&self.disconnected)
648 .or_else(|| wait_lock(&self.chan).queue.pop_front())
649 .ok_or(TryRecvTimeoutError::Disconnected)
650 },
651 )
652 }
653
654 fn disconnect_all(&self) {
657 self.disconnected.store(true, Ordering::Relaxed);
658
659 let mut chan = wait_lock(&self.chan);
660 chan.pull_pending(false);
661 if let Some((_, sending)) = chan.sending.as_ref() {
662 sending.iter().for_each(|hook| {
663 hook.signal().fire();
664 })
665 }
666 chan.waiting.iter().for_each(|hook| {
667 hook.signal().fire();
668 });
669 }
670
671 fn is_disconnected(&self) -> bool {
672 self.disconnected.load(Ordering::SeqCst)
673 }
674
675 fn is_empty(&self) -> bool {
676 self.len() == 0
677 }
678
679 fn is_full(&self) -> bool {
680 self.capacity().map(|cap| cap == self.len()).unwrap_or(false)
681 }
682
683 fn len(&self) -> usize {
684 let mut chan = wait_lock(&self.chan);
685 chan.pull_pending(false);
686 chan.queue.len()
687 }
688
689 fn capacity(&self) -> Option<usize> {
690 wait_lock(&self.chan).sending.as_ref().map(|(cap, _)| *cap)
691 }
692
693 fn sender_count(&self) -> usize {
694 self.sender_count.load(Ordering::Relaxed)
695 }
696
697 fn receiver_count(&self) -> usize {
698 self.receiver_count.load(Ordering::Relaxed)
699 }
700}
701
702pub struct Sender<T> {
704 shared: Arc<Shared<T>>,
705}
706
707impl<T> Sender<T> {
708 pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
712 self.shared.send_sync(msg, None).map_err(|err| match err {
713 TrySendTimeoutError::Full(msg) => TrySendError::Full(msg),
714 TrySendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
715 _ => unreachable!(),
716 })
717 }
718
719 pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
724 self.shared.send_sync(msg, Some(None)).map_err(|err| match err {
725 TrySendTimeoutError::Disconnected(msg) => SendError(msg),
726 _ => unreachable!(),
727 })
728 }
729
730 pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
735 self.shared.send_sync(msg, Some(Some(deadline))).map_err(|err| match err {
736 TrySendTimeoutError::Disconnected(msg) => SendTimeoutError::Disconnected(msg),
737 TrySendTimeoutError::Timeout(msg) => SendTimeoutError::Timeout(msg),
738 _ => unreachable!(),
739 })
740 }
741
742 pub fn send_timeout(&self, msg: T, dur: Duration) -> Result<(), SendTimeoutError<T>> {
747 self.send_deadline(msg, Instant::now().checked_add(dur).unwrap())
748 }
749
750 pub fn is_disconnected(&self) -> bool {
752 self.shared.is_disconnected()
753 }
754
755 pub fn is_empty(&self) -> bool {
758 self.shared.is_empty()
759 }
760
761 pub fn is_full(&self) -> bool {
764 self.shared.is_full()
765 }
766
767 pub fn len(&self) -> usize {
769 self.shared.len()
770 }
771
772 pub fn capacity(&self) -> Option<usize> {
774 self.shared.capacity()
775 }
776
777 pub fn sender_count(&self) -> usize {
779 self.shared.sender_count()
780 }
781
782 pub fn receiver_count(&self) -> usize {
788 self.shared.receiver_count()
789 }
790
791 pub fn downgrade(&self) -> WeakSender<T> {
796 WeakSender {
797 shared: Arc::downgrade(&self.shared),
798 }
799 }
800
801 pub fn same_channel(&self, other: &Sender<T>) -> bool {
803 Arc::ptr_eq(&self.shared, &other.shared)
804 }
805}
806
807impl<T> Clone for Sender<T> {
808 fn clone(&self) -> Self {
811 self.shared.sender_count.fetch_add(1, Ordering::Relaxed);
812 Self { shared: self.shared.clone() }
813 }
814}
815
816impl<T> fmt::Debug for Sender<T> {
817 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
818 f.debug_struct("Sender").finish()
819 }
820}
821
822impl<T> Drop for Sender<T> {
823 fn drop(&mut self) {
824 if self.shared.sender_count.fetch_sub(1, Ordering::Relaxed) == 1 {
826 self.shared.disconnect_all();
827 }
828 }
829}
830
831pub struct WeakSender<T> {
840 shared: Weak<Shared<T>>,
841}
842
843impl<T> WeakSender<T> {
844 pub fn upgrade(&self) -> Option<Sender<T>> {
849 self.shared
850 .upgrade()
851 .filter(|shared| {
853 shared
854 .sender_count
855 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |count| {
856 if count == 0 {
857 None
859 } else {
860 Some(count + 1)
862 }
863 })
864 .is_ok()
865 })
866 .map(|shared| Sender { shared })
867 }
868}
869
870impl<T> fmt::Debug for WeakSender<T> {
871 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
872 f.debug_struct("WeakSender").finish()
873 }
874}
875
876impl<T> Clone for WeakSender<T> {
877 fn clone(&self) -> Self {
879 Self { shared: self.shared.clone() }
880 }
881}
882
883pub struct Receiver<T> {
889 shared: Arc<Shared<T>>,
890}
891
892impl<T> Receiver<T> {
893 pub fn try_recv(&self) -> Result<T, TryRecvError> {
896 self.shared.recv_sync(None).map_err(|err| match err {
897 TryRecvTimeoutError::Disconnected => TryRecvError::Disconnected,
898 TryRecvTimeoutError::Empty => TryRecvError::Empty,
899 _ => unreachable!(),
900 })
901 }
902
903 pub fn recv(&self) -> Result<T, RecvError> {
906 self.shared.recv_sync(Some(None)).map_err(|err| match err {
907 TryRecvTimeoutError::Disconnected => RecvError::Disconnected,
908 _ => unreachable!(),
909 })
910 }
911
912 pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
915 self.shared.recv_sync(Some(Some(deadline))).map_err(|err| match err {
916 TryRecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected,
917 TryRecvTimeoutError::Timeout => RecvTimeoutError::Timeout,
918 _ => unreachable!(),
919 })
920 }
921
922 pub fn recv_timeout(&self, dur: Duration) -> Result<T, RecvTimeoutError> {
925 self.recv_deadline(Instant::now().checked_add(dur).unwrap())
926 }
927
928 pub fn iter(&self) -> Iter<T> {
933 Iter { receiver: &self }
934 }
935
936 pub fn try_iter(&self) -> TryIter<T> {
939 TryIter { receiver: &self }
940 }
941
942 pub fn drain(&self) -> Drain<T> {
946 let mut chan = wait_lock(&self.shared.chan);
947 chan.pull_pending(false);
948 let queue = std::mem::take(&mut chan.queue);
949
950 Drain { queue, _phantom: PhantomData }
951 }
952
953 pub fn is_disconnected(&self) -> bool {
955 self.shared.is_disconnected()
956 }
957
958 pub fn is_empty(&self) -> bool {
961 self.shared.is_empty()
962 }
963
964 pub fn is_full(&self) -> bool {
967 self.shared.is_full()
968 }
969
970 pub fn len(&self) -> usize {
972 self.shared.len()
973 }
974
975 pub fn capacity(&self) -> Option<usize> {
977 self.shared.capacity()
978 }
979
980 pub fn sender_count(&self) -> usize {
982 self.shared.sender_count()
983 }
984
985 pub fn receiver_count(&self) -> usize {
987 self.shared.receiver_count()
988 }
989
990 pub fn same_channel(&self, other: &Receiver<T>) -> bool {
992 Arc::ptr_eq(&self.shared, &other.shared)
993 }
994}
995
996impl<T> Clone for Receiver<T> {
997 fn clone(&self) -> Self {
1005 self.shared.receiver_count.fetch_add(1, Ordering::Relaxed);
1006 Self { shared: self.shared.clone() }
1007 }
1008}
1009
1010impl<T> fmt::Debug for Receiver<T> {
1011 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1012 f.debug_struct("Receiver").finish()
1013 }
1014}
1015
1016impl<T> Drop for Receiver<T> {
1017 fn drop(&mut self) {
1018 if self.shared.receiver_count.fetch_sub(1, Ordering::Relaxed) == 1 {
1021 self.shared.disconnect_all();
1022 }
1023 }
1024}
1025
1026impl<'a, T> IntoIterator for &'a Receiver<T> {
1028 type Item = T;
1029 type IntoIter = Iter<'a, T>;
1030
1031 fn into_iter(self) -> Self::IntoIter {
1032 Iter { receiver: self }
1033 }
1034}
1035
1036impl<T> IntoIterator for Receiver<T> {
1037 type Item = T;
1038 type IntoIter = IntoIter<T>;
1039
1040 fn into_iter(self) -> Self::IntoIter {
1042 IntoIter { receiver: self }
1043 }
1044}
1045
1046pub struct Iter<'a, T> {
1048 receiver: &'a Receiver<T>,
1049}
1050
1051impl<'a, T> fmt::Debug for Iter<'a, T> {
1052 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1053 f.debug_struct("Iter").field("receiver", &self.receiver).finish()
1054 }
1055}
1056
1057impl<'a, T> Iterator for Iter<'a, T> {
1058 type Item = T;
1059
1060 fn next(&mut self) -> Option<Self::Item> {
1061 self.receiver.recv().ok()
1062 }
1063}
1064
1065pub struct TryIter<'a, T> {
1067 receiver: &'a Receiver<T>,
1068}
1069
1070impl<'a, T> fmt::Debug for TryIter<'a, T> {
1071 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1072 f.debug_struct("TryIter").field("receiver", &self.receiver).finish()
1073 }
1074}
1075
1076impl<'a, T> Iterator for TryIter<'a, T> {
1077 type Item = T;
1078
1079 fn next(&mut self) -> Option<Self::Item> {
1080 self.receiver.try_recv().ok()
1081 }
1082}
1083
1084#[derive(Debug)]
1086pub struct Drain<'a, T> {
1087 queue: VecDeque<T>,
1088 _phantom: PhantomData<&'a ()>,
1092}
1093
1094impl<'a, T> Iterator for Drain<'a, T> {
1095 type Item = T;
1096
1097 fn next(&mut self) -> Option<Self::Item> {
1098 self.queue.pop_front()
1099 }
1100}
1101
1102impl<'a, T> ExactSizeIterator for Drain<'a, T> {
1103 fn len(&self) -> usize {
1104 self.queue.len()
1105 }
1106}
1107
1108pub struct IntoIter<T> {
1110 receiver: Receiver<T>,
1111}
1112
1113impl<T> fmt::Debug for IntoIter<T> {
1114 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1115 f.debug_struct("IntoIter").field("receiver", &self.receiver).finish()
1116 }
1117}
1118
1119impl<T> Iterator for IntoIter<T> {
1120 type Item = T;
1121
1122 fn next(&mut self) -> Option<Self::Item> {
1123 self.receiver.recv().ok()
1124 }
1125}
1126
1127pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
1142 let shared = Arc::new(Shared::new(None));
1143 (
1144 Sender { shared: shared.clone() },
1145 Receiver { shared },
1146 )
1147}
1148
1149pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
1177 let shared = Arc::new(Shared::new(Some(cap)));
1178 (
1179 Sender { shared: shared.clone() },
1180 Receiver { shared },
1181 )
1182}