1use std::{
4 future::Future,
5 pin::Pin,
6 task::{Context, Poll, Waker},
7 any::Any,
8 ops::Deref,
9};
10use std::fmt::{Debug, Formatter};
11use crate::*;
12use futures_core::{stream::{Stream, FusedStream}, future::FusedFuture};
13use futures_sink::Sink;
14use spin1::Mutex as Spinlock;
15
16struct AsyncSignal {
17 waker: Spinlock<Waker>,
18 woken: AtomicBool,
19 stream: bool,
20}
21
22impl AsyncSignal {
23 fn new(cx: &Context, stream: bool) -> Self {
24 AsyncSignal {
25 waker: Spinlock::new(cx.waker().clone()),
26 woken: AtomicBool::new(false),
27 stream,
28 }
29 }
30}
31
32impl Signal for AsyncSignal {
33 fn fire(&self) -> bool {
34 self.woken.store(true, Ordering::SeqCst);
35 self.waker.lock().wake_by_ref();
36 self.stream
37 }
38
39 fn as_any(&self) -> &(dyn Any + 'static) { self }
40 fn as_ptr(&self) -> *const () { self as *const _ as *const () }
41}
42
43impl<T> Hook<T, AsyncSignal> {
44 fn update_waker(&self, cx_waker: &Waker) -> bool {
47 let mut waker = self.1.waker.lock();
48 let woken = self.1.woken.load(Ordering::SeqCst);
49 if !waker.will_wake(cx_waker) {
50 *waker = cx_waker.clone();
51
52 if woken {
55 cx_waker.wake_by_ref();
56 }
57 }
58 woken
59 }
60}
61
62#[derive(Clone)]
63enum OwnedOrRef<'a, T> {
64 Owned(T),
65 Ref(&'a T),
66}
67
68impl<'a, T> Deref for OwnedOrRef<'a, T> {
69 type Target = T;
70
71 fn deref(&self) -> &T {
72 match self {
73 OwnedOrRef::Owned(arc) => &arc,
74 OwnedOrRef::Ref(r) => r,
75 }
76 }
77}
78
79impl<T> Sender<T> {
80 pub fn send_async(&self, item: T) -> SendFut<T> {
87 SendFut {
88 sender: OwnedOrRef::Ref(&self),
89 hook: Some(SendState::NotYetSent(item)),
90 }
91 }
92
93 pub fn into_send_async<'a>(self, item: T) -> SendFut<'a, T> {
100 SendFut {
101 sender: OwnedOrRef::Owned(self),
102 hook: Some(SendState::NotYetSent(item)),
103 }
104 }
105
106 pub fn sink(&self) -> SendSink<'_, T> {
112 SendSink(SendFut {
113 sender: OwnedOrRef::Ref(&self),
114 hook: None,
115 })
116 }
117
118 pub fn into_sink<'a>(self) -> SendSink<'a, T> {
123 SendSink(SendFut {
124 sender: OwnedOrRef::Owned(self),
125 hook: None,
126 })
127 }
128}
129
130enum SendState<T> {
131 NotYetSent(T),
132 QueuedItem(Arc<Hook<T, AsyncSignal>>),
133}
134
135#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
139pub struct SendFut<'a, T> {
140 sender: OwnedOrRef<'a, Sender<T>>,
141 hook: Option<SendState<T>>,
143}
144
145impl<'a, T> Debug for SendFut<'a, T> {
146 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
147 f.debug_struct("SendFut").finish()
148 }
149}
150
151impl<T> std::marker::Unpin for SendFut<'_, T> {}
152
153impl<'a, T> SendFut<'a, T> {
154 fn reset_hook(&mut self) {
157 if let Some(SendState::QueuedItem(hook)) = self.hook.take() {
158 let hook: Arc<Hook<T, dyn Signal>> = hook;
159 wait_lock(&self.sender.shared.chan).sending
160 .as_mut()
161 .unwrap().1
162 .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
163 }
164 }
165
166 pub fn is_disconnected(&self) -> bool {
168 self.sender.is_disconnected()
169 }
170
171 pub fn is_empty(&self) -> bool {
173 self.sender.is_empty()
174 }
175
176 pub fn is_full(&self) -> bool {
178 self.sender.is_full()
179 }
180
181 pub fn len(&self) -> usize {
183 self.sender.len()
184 }
185
186 pub fn capacity(&self) -> Option<usize> {
188 self.sender.capacity()
189 }
190}
191
192impl<'a, T> Drop for SendFut<'a, T> {
193 fn drop(&mut self) {
194 self.reset_hook()
195 }
196}
197
198
199impl<'a, T> Future for SendFut<'a, T> {
200 type Output = Result<(), SendError<T>>;
201
202 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
203 if let Some(SendState::QueuedItem(hook)) = self.hook.as_ref() {
204 if hook.is_empty() {
205 Poll::Ready(Ok(()))
206 } else if self.sender.shared.is_disconnected() {
207 let item = hook.try_take();
208 self.hook = None;
209 match item {
210 Some(item) => Poll::Ready(Err(SendError(item))),
211 None => Poll::Ready(Ok(())),
212 }
213 } else {
214 hook.update_waker(cx.waker());
215 Poll::Pending
216 }
217 } else if let Some(SendState::NotYetSent(item)) = self.hook.take() {
218 let this = self.get_mut();
219 let (shared, this_hook) = (&this.sender.shared, &mut this.hook);
220
221 shared.send(
222 item,
224 true,
226 |msg| Hook::slot(Some(msg), AsyncSignal::new(cx, false)),
228 |hook| {
230 *this_hook = Some(SendState::QueuedItem(hook));
231 Poll::Pending
232 },
233 )
234 .map(|r| r.map_err(|err| match err {
235 TrySendTimeoutError::Disconnected(msg) => SendError(msg),
236 _ => unreachable!(),
237 }))
238 } else { Poll::Ready(Ok(()))
240 }
241 }
242}
243
244impl<'a, T> FusedFuture for SendFut<'a, T> {
245 fn is_terminated(&self) -> bool {
246 self.sender.shared.is_disconnected()
247 }
248}
249
250pub struct SendSink<'a, T>(SendFut<'a, T>);
254
255impl<'a, T> SendSink<'a, T> {
256 pub fn sender(&self) -> &Sender<T> {
258 &self.0.sender
259 }
260
261 pub fn is_disconnected(&self) -> bool {
263 self.0.is_disconnected()
264 }
265
266 pub fn is_empty(&self) -> bool {
268 self.0.is_empty()
269 }
270
271 pub fn is_full(&self) -> bool {
273 self.0.is_full()
274 }
275
276 pub fn len(&self) -> usize {
278 self.0.len()
279 }
280
281 pub fn capacity(&self) -> Option<usize> {
283 self.0.capacity()
284 }
285
286 pub fn same_channel(&self, other: &Self) -> bool {
288 self.sender().same_channel(other.sender())
289 }
290}
291
292impl<'a, T> Debug for SendSink<'a, T> {
293 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
294 f.debug_struct("SendSink").finish()
295 }
296}
297
298impl<'a, T> Sink<T> for SendSink<'a, T> {
299 type Error = SendError<T>;
300
301 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
302 Pin::new(&mut self.0).poll(cx)
303 }
304
305 fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
306 self.0.reset_hook();
307 self.0.hook = Some(SendState::NotYetSent(item));
308
309 Ok(())
310 }
311
312 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
313 Pin::new(&mut self.0).poll(cx) }
315
316 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
317 Pin::new(&mut self.0).poll(cx) }
319}
320
321impl<'a, T> Clone for SendSink<'a, T> {
322 fn clone(&self) -> SendSink<'a, T> {
323 SendSink(SendFut {
324 sender: self.0.sender.clone(),
325 hook: None,
326 })
327 }
328}
329
330impl<T> Receiver<T> {
331 pub fn recv_async(&self) -> RecvFut<'_, T> {
334 RecvFut::new(OwnedOrRef::Ref(self))
335 }
336
337 pub fn into_recv_async<'a>(self) -> RecvFut<'a, T> {
341 RecvFut::new(OwnedOrRef::Owned(self))
342 }
343
344 pub fn stream(&self) -> RecvStream<'_, T> {
347 RecvStream(RecvFut::new(OwnedOrRef::Ref(self)))
348 }
349
350 pub fn into_stream<'a>(self) -> RecvStream<'a, T> {
352 RecvStream(RecvFut::new(OwnedOrRef::Owned(self)))
353 }
354}
355
356#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
360pub struct RecvFut<'a, T> {
361 receiver: OwnedOrRef<'a, Receiver<T>>,
362 hook: Option<Arc<Hook<T, AsyncSignal>>>,
363}
364
365impl<'a, T> RecvFut<'a, T> {
366 fn new(receiver: OwnedOrRef<'a, Receiver<T>>) -> Self {
367 Self {
368 receiver,
369 hook: None,
370 }
371 }
372
373 fn reset_hook(&mut self) {
377 if let Some(hook) = self.hook.take() {
378 let hook: Arc<Hook<T, dyn Signal>> = hook;
379 let mut chan = wait_lock(&self.receiver.shared.chan);
380 chan.waiting.retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
382 if hook.signal().as_any().downcast_ref::<AsyncSignal>().unwrap().woken.load(Ordering::SeqCst) {
383 chan.try_wake_receiver_if_pending();
386 }
387 }
388 }
389
390 fn poll_inner(
391 self: Pin<&mut Self>,
392 cx: &mut Context,
393 stream: bool,
394 ) -> Poll<Result<T, RecvError>> {
395 if self.hook.is_some() {
396 match self.receiver.shared.recv_sync(None) {
397 Ok(msg) => return Poll::Ready(Ok(msg)),
398 Err(TryRecvTimeoutError::Disconnected) => {
399 return Poll::Ready(Err(RecvError::Disconnected))
400 }
401 _ => (),
402 }
403
404 let hook = self.hook.as_ref().map(Arc::clone).unwrap();
405 if hook.update_waker(cx.waker()) {
406 wait_lock(&self.receiver.shared.chan)
409 .waiting
410 .push_back(hook);
411 }
412 if self.receiver.shared.is_disconnected() {
415 Poll::Ready(
418 self.receiver
419 .shared
420 .recv_sync(None)
421 .map(Ok)
422 .unwrap_or(Err(RecvError::Disconnected)),
423 )
424 } else {
425 Poll::Pending
426 }
427 } else {
428 let mut_self = self.get_mut();
429 let (shared, this_hook) = (&mut_self.receiver.shared, &mut mut_self.hook);
430
431 shared.recv(
432 true,
434 || Hook::trigger(AsyncSignal::new(cx, stream)),
436 |hook| {
438 *this_hook = Some(hook);
439 Poll::Pending
440 },
441 )
442 .map(|r| r.map_err(|err| match err {
443 TryRecvTimeoutError::Disconnected => RecvError::Disconnected,
444 _ => unreachable!(),
445 }))
446 }
447 }
448
449 pub fn is_disconnected(&self) -> bool {
451 self.receiver.is_disconnected()
452 }
453
454 pub fn is_empty(&self) -> bool {
456 self.receiver.is_empty()
457 }
458
459 pub fn is_full(&self) -> bool {
461 self.receiver.is_full()
462 }
463
464 pub fn len(&self) -> usize {
466 self.receiver.len()
467 }
468
469 pub fn capacity(&self) -> Option<usize> {
471 self.receiver.capacity()
472 }
473}
474
475impl<'a, T> Debug for RecvFut<'a, T> {
476 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
477 f.debug_struct("RecvFut").finish()
478 }
479}
480
481impl<'a, T> Drop for RecvFut<'a, T> {
482 fn drop(&mut self) {
483 self.reset_hook();
484 }
485}
486
487impl<'a, T> Future for RecvFut<'a, T> {
488 type Output = Result<T, RecvError>;
489
490 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
491 self.poll_inner(cx, false) }
493}
494
495impl<'a, T> FusedFuture for RecvFut<'a, T> {
496 fn is_terminated(&self) -> bool {
497 self.receiver.shared.is_disconnected() && self.receiver.shared.is_empty()
498 }
499}
500
501pub struct RecvStream<'a, T>(RecvFut<'a, T>);
505
506impl<'a, T> RecvStream<'a, T> {
507 pub fn is_disconnected(&self) -> bool {
509 self.0.is_disconnected()
510 }
511
512 pub fn is_empty(&self) -> bool {
514 self.0.is_empty()
515 }
516
517 pub fn is_full(&self) -> bool {
519 self.0.is_full()
520 }
521
522 pub fn len(&self) -> usize {
524 self.0.len()
525 }
526
527 pub fn capacity(&self) -> Option<usize> {
529 self.0.capacity()
530 }
531
532 pub fn same_channel(&self, other: &Self) -> bool {
534 self.0.receiver.same_channel(&*other.0.receiver)
535 }
536}
537
538impl<'a, T> Debug for RecvStream<'a, T> {
539 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
540 f.debug_struct("RecvStream").finish()
541 }
542}
543
544impl<'a, T> Clone for RecvStream<'a, T> {
545 fn clone(&self) -> RecvStream<'a, T> {
546 RecvStream(RecvFut::new(self.0.receiver.clone()))
547 }
548}
549
550impl<'a, T> Stream for RecvStream<'a, T> {
551 type Item = T;
552
553 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
554 match Pin::new(&mut self.0).poll_inner(cx, true) { Poll::Pending => Poll::Pending,
556 Poll::Ready(item) => {
557 self.0.reset_hook();
558 Poll::Ready(item.ok())
559 }
560 }
561 }
562}
563
564impl<'a, T> FusedStream for RecvStream<'a, T> {
565 fn is_terminated(&self) -> bool {
566 self.0.is_terminated()
567 }
568}