async_executor/
lib.rs

1//! Async executors.
2//!
3//! This crate provides two reference executors that trade performance for
4//! functionality. They should be considered reference executors that are "good
5//! enough" for most use cases. For more specialized use cases, consider writing
6//! your own executor on top of [`async-task`].
7//!
8//! [`async-task`]: https://crates.io/crates/async-task
9//!
10//! # Examples
11//!
12//! ```
13//! use async_executor::Executor;
14//! use futures_lite::future;
15//!
16//! // Create a new executor.
17//! let ex = Executor::new();
18//!
19//! // Spawn a task.
20//! let task = ex.spawn(async {
21//!     println!("Hello world");
22//! });
23//!
24//! // Run the executor until the task completes.
25//! future::block_on(ex.run(task));
26//! ```
27
28#![warn(
29    missing_docs,
30    missing_debug_implementations,
31    rust_2018_idioms,
32    clippy::undocumented_unsafe_blocks
33)]
34#![doc(
35    html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
36)]
37#![doc(
38    html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
39)]
40#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
41
42use std::fmt;
43use std::marker::PhantomData;
44use std::panic::{RefUnwindSafe, UnwindSafe};
45use std::pin::Pin;
46use std::rc::Rc;
47use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
48use std::sync::{Arc, Mutex, MutexGuard, RwLock, TryLockError};
49use std::task::{Context, Poll, Waker};
50
51use async_task::{Builder, Runnable};
52use concurrent_queue::ConcurrentQueue;
53use futures_lite::{future, prelude::*};
54use pin_project_lite::pin_project;
55use slab::Slab;
56
57#[cfg(feature = "static")]
58mod static_executors;
59
60#[doc(no_inline)]
61pub use async_task::{FallibleTask, Task};
62#[cfg(feature = "static")]
63#[cfg_attr(docsrs, doc(cfg(any(feature = "static"))))]
64pub use static_executors::*;
65
66/// An async executor.
67///
68/// # Examples
69///
70/// A multi-threaded executor:
71///
72/// ```
73/// use async_channel::unbounded;
74/// use async_executor::Executor;
75/// use easy_parallel::Parallel;
76/// use futures_lite::future;
77///
78/// let ex = Executor::new();
79/// let (signal, shutdown) = unbounded::<()>();
80///
81/// Parallel::new()
82///     // Run four executor threads.
83///     .each(0..4, |_| future::block_on(ex.run(shutdown.recv())))
84///     // Run the main future on the current thread.
85///     .finish(|| future::block_on(async {
86///         println!("Hello world!");
87///         drop(signal);
88///     }));
89/// ```
90pub struct Executor<'a> {
91    /// The executor state.
92    state: AtomicPtr<State>,
93
94    /// Makes the `'a` lifetime invariant.
95    _marker: PhantomData<std::cell::UnsafeCell<&'a ()>>,
96}
97
98// SAFETY: Executor stores no thread local state that can be accessed via other thread.
99unsafe impl Send for Executor<'_> {}
100// SAFETY: Executor internally synchronizes all of it's operations internally.
101unsafe impl Sync for Executor<'_> {}
102
103impl UnwindSafe for Executor<'_> {}
104impl RefUnwindSafe for Executor<'_> {}
105
106impl fmt::Debug for Executor<'_> {
107    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108        debug_executor(self, "Executor", f)
109    }
110}
111
112impl<'a> Executor<'a> {
113    /// Creates a new executor.
114    ///
115    /// # Examples
116    ///
117    /// ```
118    /// use async_executor::Executor;
119    ///
120    /// let ex = Executor::new();
121    /// ```
122    pub const fn new() -> Executor<'a> {
123        Executor {
124            state: AtomicPtr::new(std::ptr::null_mut()),
125            _marker: PhantomData,
126        }
127    }
128
129    /// Returns `true` if there are no unfinished tasks.
130    ///
131    /// # Examples
132    ///
133    /// ```
134    /// use async_executor::Executor;
135    ///
136    /// let ex = Executor::new();
137    /// assert!(ex.is_empty());
138    ///
139    /// let task = ex.spawn(async {
140    ///     println!("Hello world");
141    /// });
142    /// assert!(!ex.is_empty());
143    ///
144    /// assert!(ex.try_tick());
145    /// assert!(ex.is_empty());
146    /// ```
147    pub fn is_empty(&self) -> bool {
148        self.state().active().is_empty()
149    }
150
151    /// Spawns a task onto the executor.
152    ///
153    /// # Examples
154    ///
155    /// ```
156    /// use async_executor::Executor;
157    ///
158    /// let ex = Executor::new();
159    ///
160    /// let task = ex.spawn(async {
161    ///     println!("Hello world");
162    /// });
163    /// ```
164    pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> {
165        let mut active = self.state().active();
166
167        // SAFETY: `T` and the future are `Send`.
168        unsafe { self.spawn_inner(future, &mut active) }
169    }
170
171    /// Spawns many tasks onto the executor.
172    ///
173    /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and
174    /// spawns all of the tasks in one go. With large amounts of tasks this can improve
175    /// contention.
176    ///
177    /// For very large numbers of tasks the lock is occasionally dropped and re-acquired to
178    /// prevent runner thread starvation. It is assumed that the iterator provided does not
179    /// block; blocking iterators can lock up the internal mutex and therefore the entire
180    /// executor.
181    ///
182    /// ## Example
183    ///
184    /// ```
185    /// use async_executor::Executor;
186    /// use futures_lite::{stream, prelude::*};
187    /// use std::future::ready;
188    ///
189    /// # futures_lite::future::block_on(async {
190    /// let mut ex = Executor::new();
191    ///
192    /// let futures = [
193    ///     ready(1),
194    ///     ready(2),
195    ///     ready(3)
196    /// ];
197    ///
198    /// // Spawn all of the futures onto the executor at once.
199    /// let mut tasks = vec![];
200    /// ex.spawn_many(futures, &mut tasks);
201    ///
202    /// // Await all of them.
203    /// let results = ex.run(async move {
204    ///     stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await
205    /// }).await;
206    /// assert_eq!(results, [1, 2, 3]);
207    /// # });
208    /// ```
209    ///
210    /// [`spawn`]: Executor::spawn
211    pub fn spawn_many<T: Send + 'a, F: Future<Output = T> + Send + 'a>(
212        &self,
213        futures: impl IntoIterator<Item = F>,
214        handles: &mut impl Extend<Task<F::Output>>,
215    ) {
216        let mut active = Some(self.state().active());
217
218        // Convert the futures into tasks.
219        let tasks = futures.into_iter().enumerate().map(move |(i, future)| {
220            // SAFETY: `T` and the future are `Send`.
221            let task = unsafe { self.spawn_inner(future, active.as_mut().unwrap()) };
222
223            // Yield the lock every once in a while to ease contention.
224            if i.wrapping_sub(1) % 500 == 0 {
225                drop(active.take());
226                active = Some(self.state().active());
227            }
228
229            task
230        });
231
232        // Push the tasks to the user's collection.
233        handles.extend(tasks);
234    }
235
236    /// Spawn a future while holding the inner lock.
237    ///
238    /// # Safety
239    ///
240    /// If this is an `Executor`, `F` and `T` must be `Send`.
241    unsafe fn spawn_inner<T: 'a>(
242        &self,
243        future: impl Future<Output = T> + 'a,
244        active: &mut Slab<Waker>,
245    ) -> Task<T> {
246        // Remove the task from the set of active tasks when the future finishes.
247        let entry = active.vacant_entry();
248        let index = entry.key();
249        let state = self.state_as_arc();
250        let future = AsyncCallOnDrop::new(future, move || drop(state.active().try_remove(index)));
251
252        // Create the task and register it in the set of active tasks.
253        //
254        // SAFETY:
255        //
256        // If `future` is not `Send`, this must be a `LocalExecutor` as per this
257        // function's unsafe precondition. Since `LocalExecutor` is `!Sync`,
258        // `try_tick`, `tick` and `run` can only be called from the origin
259        // thread of the `LocalExecutor`. Similarly, `spawn` can only  be called
260        // from the origin thread, ensuring that `future` and the executor share
261        // the same origin thread. The `Runnable` can be scheduled from other
262        // threads, but because of the above `Runnable` can only be called or
263        // dropped on the origin thread.
264        //
265        // `future` is not `'static`, but we make sure that the `Runnable` does
266        // not outlive `'a`. When the executor is dropped, the `active` field is
267        // drained and all of the `Waker`s are woken. Then, the queue inside of
268        // the `Executor` is drained of all of its runnables. This ensures that
269        // runnables are dropped and this precondition is satisfied.
270        //
271        // `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
272        // Therefore we do not need to worry about what is done with the
273        // `Waker`.
274        let (runnable, task) = Builder::new()
275            .propagate_panic(true)
276            .spawn_unchecked(|()| future, self.schedule());
277        entry.insert(runnable.waker());
278
279        runnable.schedule();
280        task
281    }
282
283    /// Attempts to run a task if at least one is scheduled.
284    ///
285    /// Running a scheduled task means simply polling its future once.
286    ///
287    /// # Examples
288    ///
289    /// ```
290    /// use async_executor::Executor;
291    ///
292    /// let ex = Executor::new();
293    /// assert!(!ex.try_tick()); // no tasks to run
294    ///
295    /// let task = ex.spawn(async {
296    ///     println!("Hello world");
297    /// });
298    /// assert!(ex.try_tick()); // a task was found
299    /// ```
300    pub fn try_tick(&self) -> bool {
301        self.state().try_tick()
302    }
303
304    /// Runs a single task.
305    ///
306    /// Running a task means simply polling its future once.
307    ///
308    /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
309    ///
310    /// # Examples
311    ///
312    /// ```
313    /// use async_executor::Executor;
314    /// use futures_lite::future;
315    ///
316    /// let ex = Executor::new();
317    ///
318    /// let task = ex.spawn(async {
319    ///     println!("Hello world");
320    /// });
321    /// future::block_on(ex.tick()); // runs the task
322    /// ```
323    pub async fn tick(&self) {
324        self.state().tick().await;
325    }
326
327    /// Runs the executor until the given future completes.
328    ///
329    /// # Examples
330    ///
331    /// ```
332    /// use async_executor::Executor;
333    /// use futures_lite::future;
334    ///
335    /// let ex = Executor::new();
336    ///
337    /// let task = ex.spawn(async { 1 + 2 });
338    /// let res = future::block_on(ex.run(async { task.await * 2 }));
339    ///
340    /// assert_eq!(res, 6);
341    /// ```
342    pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
343        self.state().run(future).await
344    }
345
346    /// Returns a function that schedules a runnable task when it gets woken up.
347    fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
348        let state = self.state_as_arc();
349
350        // TODO: If possible, push into the current local queue and notify the ticker.
351        move |runnable| {
352            state.queue.push(runnable).unwrap();
353            state.notify();
354        }
355    }
356
357    /// Returns a pointer to the inner state.
358    #[inline]
359    fn state_ptr(&self) -> *const State {
360        #[cold]
361        fn alloc_state(atomic_ptr: &AtomicPtr<State>) -> *mut State {
362            let state = Arc::new(State::new());
363            // TODO: Switch this to use cast_mut once the MSRV can be bumped past 1.65
364            let ptr = Arc::into_raw(state) as *mut State;
365            if let Err(actual) = atomic_ptr.compare_exchange(
366                std::ptr::null_mut(),
367                ptr,
368                Ordering::AcqRel,
369                Ordering::Acquire,
370            ) {
371                // SAFETY: This was just created from Arc::into_raw.
372                drop(unsafe { Arc::from_raw(ptr) });
373                actual
374            } else {
375                ptr
376            }
377        }
378
379        let mut ptr = self.state.load(Ordering::Acquire);
380        if ptr.is_null() {
381            ptr = alloc_state(&self.state);
382        }
383        ptr
384    }
385
386    /// Returns a reference to the inner state.
387    #[inline]
388    fn state(&self) -> &State {
389        // SAFETY: So long as an Executor lives, it's state pointer will always be valid
390        // when accessed through state_ptr.
391        unsafe { &*self.state_ptr() }
392    }
393
394    // Clones the inner state Arc
395    #[inline]
396    fn state_as_arc(&self) -> Arc<State> {
397        // SAFETY: So long as an Executor lives, it's state pointer will always be a valid
398        // Arc when accessed through state_ptr.
399        let arc = unsafe { Arc::from_raw(self.state_ptr()) };
400        let clone = arc.clone();
401        std::mem::forget(arc);
402        clone
403    }
404}
405
406impl Drop for Executor<'_> {
407    fn drop(&mut self) {
408        let ptr = *self.state.get_mut();
409        if ptr.is_null() {
410            return;
411        }
412
413        // SAFETY: As ptr is not null, it was allocated via Arc::new and converted
414        // via Arc::into_raw in state_ptr.
415        let state = unsafe { Arc::from_raw(ptr) };
416
417        let mut active = state.active();
418        for w in active.drain() {
419            w.wake();
420        }
421        drop(active);
422
423        while state.queue.pop().is_ok() {}
424    }
425}
426
427impl<'a> Default for Executor<'a> {
428    fn default() -> Executor<'a> {
429        Executor::new()
430    }
431}
432
433/// A thread-local executor.
434///
435/// The executor can only be run on the thread that created it.
436///
437/// # Examples
438///
439/// ```
440/// use async_executor::LocalExecutor;
441/// use futures_lite::future;
442///
443/// let local_ex = LocalExecutor::new();
444///
445/// future::block_on(local_ex.run(async {
446///     println!("Hello world!");
447/// }));
448/// ```
449pub struct LocalExecutor<'a> {
450    /// The inner executor.
451    inner: Executor<'a>,
452
453    /// Makes the type `!Send` and `!Sync`.
454    _marker: PhantomData<Rc<()>>,
455}
456
457impl UnwindSafe for LocalExecutor<'_> {}
458impl RefUnwindSafe for LocalExecutor<'_> {}
459
460impl fmt::Debug for LocalExecutor<'_> {
461    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
462        debug_executor(&self.inner, "LocalExecutor", f)
463    }
464}
465
466impl<'a> LocalExecutor<'a> {
467    /// Creates a single-threaded executor.
468    ///
469    /// # Examples
470    ///
471    /// ```
472    /// use async_executor::LocalExecutor;
473    ///
474    /// let local_ex = LocalExecutor::new();
475    /// ```
476    pub const fn new() -> LocalExecutor<'a> {
477        LocalExecutor {
478            inner: Executor::new(),
479            _marker: PhantomData,
480        }
481    }
482
483    /// Returns `true` if there are no unfinished tasks.
484    ///
485    /// # Examples
486    ///
487    /// ```
488    /// use async_executor::LocalExecutor;
489    ///
490    /// let local_ex = LocalExecutor::new();
491    /// assert!(local_ex.is_empty());
492    ///
493    /// let task = local_ex.spawn(async {
494    ///     println!("Hello world");
495    /// });
496    /// assert!(!local_ex.is_empty());
497    ///
498    /// assert!(local_ex.try_tick());
499    /// assert!(local_ex.is_empty());
500    /// ```
501    pub fn is_empty(&self) -> bool {
502        self.inner().is_empty()
503    }
504
505    /// Spawns a task onto the executor.
506    ///
507    /// # Examples
508    ///
509    /// ```
510    /// use async_executor::LocalExecutor;
511    ///
512    /// let local_ex = LocalExecutor::new();
513    ///
514    /// let task = local_ex.spawn(async {
515    ///     println!("Hello world");
516    /// });
517    /// ```
518    pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> {
519        let mut active = self.inner().state().active();
520
521        // SAFETY: This executor is not thread safe, so the future and its result
522        //         cannot be sent to another thread.
523        unsafe { self.inner().spawn_inner(future, &mut active) }
524    }
525
526    /// Spawns many tasks onto the executor.
527    ///
528    /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and
529    /// spawns all of the tasks in one go. With large amounts of tasks this can improve
530    /// contention.
531    ///
532    /// It is assumed that the iterator provided does not block; blocking iterators can lock up
533    /// the internal mutex and therefore the entire executor. Unlike [`Executor::spawn`], the
534    /// mutex is not released, as there are no other threads that can poll this executor.
535    ///
536    /// ## Example
537    ///
538    /// ```
539    /// use async_executor::LocalExecutor;
540    /// use futures_lite::{stream, prelude::*};
541    /// use std::future::ready;
542    ///
543    /// # futures_lite::future::block_on(async {
544    /// let mut ex = LocalExecutor::new();
545    ///
546    /// let futures = [
547    ///     ready(1),
548    ///     ready(2),
549    ///     ready(3)
550    /// ];
551    ///
552    /// // Spawn all of the futures onto the executor at once.
553    /// let mut tasks = vec![];
554    /// ex.spawn_many(futures, &mut tasks);
555    ///
556    /// // Await all of them.
557    /// let results = ex.run(async move {
558    ///     stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await
559    /// }).await;
560    /// assert_eq!(results, [1, 2, 3]);
561    /// # });
562    /// ```
563    ///
564    /// [`spawn`]: LocalExecutor::spawn
565    /// [`Executor::spawn_many`]: Executor::spawn_many
566    pub fn spawn_many<T: 'a, F: Future<Output = T> + 'a>(
567        &self,
568        futures: impl IntoIterator<Item = F>,
569        handles: &mut impl Extend<Task<F::Output>>,
570    ) {
571        let mut active = self.inner().state().active();
572
573        // Convert all of the futures to tasks.
574        let tasks = futures.into_iter().map(|future| {
575            // SAFETY: This executor is not thread safe, so the future and its result
576            //         cannot be sent to another thread.
577            unsafe { self.inner().spawn_inner(future, &mut active) }
578
579            // As only one thread can spawn or poll tasks at a time, there is no need
580            // to release lock contention here.
581        });
582
583        // Push them to the user's collection.
584        handles.extend(tasks);
585    }
586
587    /// Attempts to run a task if at least one is scheduled.
588    ///
589    /// Running a scheduled task means simply polling its future once.
590    ///
591    /// # Examples
592    ///
593    /// ```
594    /// use async_executor::LocalExecutor;
595    ///
596    /// let ex = LocalExecutor::new();
597    /// assert!(!ex.try_tick()); // no tasks to run
598    ///
599    /// let task = ex.spawn(async {
600    ///     println!("Hello world");
601    /// });
602    /// assert!(ex.try_tick()); // a task was found
603    /// ```
604    pub fn try_tick(&self) -> bool {
605        self.inner().try_tick()
606    }
607
608    /// Runs a single task.
609    ///
610    /// Running a task means simply polling its future once.
611    ///
612    /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
613    ///
614    /// # Examples
615    ///
616    /// ```
617    /// use async_executor::LocalExecutor;
618    /// use futures_lite::future;
619    ///
620    /// let ex = LocalExecutor::new();
621    ///
622    /// let task = ex.spawn(async {
623    ///     println!("Hello world");
624    /// });
625    /// future::block_on(ex.tick()); // runs the task
626    /// ```
627    pub async fn tick(&self) {
628        self.inner().tick().await
629    }
630
631    /// Runs the executor until the given future completes.
632    ///
633    /// # Examples
634    ///
635    /// ```
636    /// use async_executor::LocalExecutor;
637    /// use futures_lite::future;
638    ///
639    /// let local_ex = LocalExecutor::new();
640    ///
641    /// let task = local_ex.spawn(async { 1 + 2 });
642    /// let res = future::block_on(local_ex.run(async { task.await * 2 }));
643    ///
644    /// assert_eq!(res, 6);
645    /// ```
646    pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
647        self.inner().run(future).await
648    }
649
650    /// Returns a reference to the inner executor.
651    fn inner(&self) -> &Executor<'a> {
652        &self.inner
653    }
654}
655
656impl<'a> Default for LocalExecutor<'a> {
657    fn default() -> LocalExecutor<'a> {
658        LocalExecutor::new()
659    }
660}
661
662/// The state of a executor.
663struct State {
664    /// The global queue.
665    queue: ConcurrentQueue<Runnable>,
666
667    /// Local queues created by runners.
668    local_queues: RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>,
669
670    /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping.
671    notified: AtomicBool,
672
673    /// A list of sleeping tickers.
674    sleepers: Mutex<Sleepers>,
675
676    /// Currently active tasks.
677    active: Mutex<Slab<Waker>>,
678}
679
680impl State {
681    /// Creates state for a new executor.
682    const fn new() -> State {
683        State {
684            queue: ConcurrentQueue::unbounded(),
685            local_queues: RwLock::new(Vec::new()),
686            notified: AtomicBool::new(true),
687            sleepers: Mutex::new(Sleepers {
688                count: 0,
689                wakers: Vec::new(),
690                free_ids: Vec::new(),
691            }),
692            active: Mutex::new(Slab::new()),
693        }
694    }
695
696    /// Returns a reference to currently active tasks.
697    fn active(&self) -> MutexGuard<'_, Slab<Waker>> {
698        self.active.lock().unwrap_or_else(|e| e.into_inner())
699    }
700
701    /// Notifies a sleeping ticker.
702    #[inline]
703    fn notify(&self) {
704        if self
705            .notified
706            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
707            .is_ok()
708        {
709            let waker = self.sleepers.lock().unwrap().notify();
710            if let Some(w) = waker {
711                w.wake();
712            }
713        }
714    }
715
716    pub(crate) fn try_tick(&self) -> bool {
717        match self.queue.pop() {
718            Err(_) => false,
719            Ok(runnable) => {
720                // Notify another ticker now to pick up where this ticker left off, just in case
721                // running the task takes a long time.
722                self.notify();
723
724                // Run the task.
725                runnable.run();
726                true
727            }
728        }
729    }
730
731    pub(crate) async fn tick(&self) {
732        let runnable = Ticker::new(self).runnable().await;
733        runnable.run();
734    }
735
736    pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
737        let mut runner = Runner::new(self);
738        let mut rng = fastrand::Rng::new();
739
740        // A future that runs tasks forever.
741        let run_forever = async {
742            loop {
743                for _ in 0..200 {
744                    let runnable = runner.runnable(&mut rng).await;
745                    runnable.run();
746                }
747                future::yield_now().await;
748            }
749        };
750
751        // Run `future` and `run_forever` concurrently until `future` completes.
752        future.or(run_forever).await
753    }
754}
755
756/// A list of sleeping tickers.
757struct Sleepers {
758    /// Number of sleeping tickers (both notified and unnotified).
759    count: usize,
760
761    /// IDs and wakers of sleeping unnotified tickers.
762    ///
763    /// A sleeping ticker is notified when its waker is missing from this list.
764    wakers: Vec<(usize, Waker)>,
765
766    /// Reclaimed IDs.
767    free_ids: Vec<usize>,
768}
769
770impl Sleepers {
771    /// Inserts a new sleeping ticker.
772    fn insert(&mut self, waker: &Waker) -> usize {
773        let id = match self.free_ids.pop() {
774            Some(id) => id,
775            None => self.count + 1,
776        };
777        self.count += 1;
778        self.wakers.push((id, waker.clone()));
779        id
780    }
781
782    /// Re-inserts a sleeping ticker's waker if it was notified.
783    ///
784    /// Returns `true` if the ticker was notified.
785    fn update(&mut self, id: usize, waker: &Waker) -> bool {
786        for item in &mut self.wakers {
787            if item.0 == id {
788                item.1.clone_from(waker);
789                return false;
790            }
791        }
792
793        self.wakers.push((id, waker.clone()));
794        true
795    }
796
797    /// Removes a previously inserted sleeping ticker.
798    ///
799    /// Returns `true` if the ticker was notified.
800    fn remove(&mut self, id: usize) -> bool {
801        self.count -= 1;
802        self.free_ids.push(id);
803
804        for i in (0..self.wakers.len()).rev() {
805            if self.wakers[i].0 == id {
806                self.wakers.remove(i);
807                return false;
808            }
809        }
810        true
811    }
812
813    /// Returns `true` if a sleeping ticker is notified or no tickers are sleeping.
814    fn is_notified(&self) -> bool {
815        self.count == 0 || self.count > self.wakers.len()
816    }
817
818    /// Returns notification waker for a sleeping ticker.
819    ///
820    /// If a ticker was notified already or there are no tickers, `None` will be returned.
821    fn notify(&mut self) -> Option<Waker> {
822        if self.wakers.len() == self.count {
823            self.wakers.pop().map(|item| item.1)
824        } else {
825            None
826        }
827    }
828}
829
830/// Runs task one by one.
831struct Ticker<'a> {
832    /// The executor state.
833    state: &'a State,
834
835    /// Set to a non-zero sleeper ID when in sleeping state.
836    ///
837    /// States a ticker can be in:
838    /// 1) Woken.
839    ///    2a) Sleeping and unnotified.
840    ///    2b) Sleeping and notified.
841    sleeping: usize,
842}
843
844impl Ticker<'_> {
845    /// Creates a ticker.
846    fn new(state: &State) -> Ticker<'_> {
847        Ticker { state, sleeping: 0 }
848    }
849
850    /// Moves the ticker into sleeping and unnotified state.
851    ///
852    /// Returns `false` if the ticker was already sleeping and unnotified.
853    fn sleep(&mut self, waker: &Waker) -> bool {
854        let mut sleepers = self.state.sleepers.lock().unwrap();
855
856        match self.sleeping {
857            // Move to sleeping state.
858            0 => {
859                self.sleeping = sleepers.insert(waker);
860            }
861
862            // Already sleeping, check if notified.
863            id => {
864                if !sleepers.update(id, waker) {
865                    return false;
866                }
867            }
868        }
869
870        self.state
871            .notified
872            .store(sleepers.is_notified(), Ordering::Release);
873
874        true
875    }
876
877    /// Moves the ticker into woken state.
878    fn wake(&mut self) {
879        if self.sleeping != 0 {
880            let mut sleepers = self.state.sleepers.lock().unwrap();
881            sleepers.remove(self.sleeping);
882
883            self.state
884                .notified
885                .store(sleepers.is_notified(), Ordering::Release);
886        }
887        self.sleeping = 0;
888    }
889
890    /// Waits for the next runnable task to run.
891    async fn runnable(&mut self) -> Runnable {
892        self.runnable_with(|| self.state.queue.pop().ok()).await
893    }
894
895    /// Waits for the next runnable task to run, given a function that searches for a task.
896    async fn runnable_with(&mut self, mut search: impl FnMut() -> Option<Runnable>) -> Runnable {
897        future::poll_fn(|cx| {
898            loop {
899                match search() {
900                    None => {
901                        // Move to sleeping and unnotified state.
902                        if !self.sleep(cx.waker()) {
903                            // If already sleeping and unnotified, return.
904                            return Poll::Pending;
905                        }
906                    }
907                    Some(r) => {
908                        // Wake up.
909                        self.wake();
910
911                        // Notify another ticker now to pick up where this ticker left off, just in
912                        // case running the task takes a long time.
913                        self.state.notify();
914
915                        return Poll::Ready(r);
916                    }
917                }
918            }
919        })
920        .await
921    }
922}
923
924impl Drop for Ticker<'_> {
925    fn drop(&mut self) {
926        // If this ticker is in sleeping state, it must be removed from the sleepers list.
927        if self.sleeping != 0 {
928            let mut sleepers = self.state.sleepers.lock().unwrap();
929            let notified = sleepers.remove(self.sleeping);
930
931            self.state
932                .notified
933                .store(sleepers.is_notified(), Ordering::Release);
934
935            // If this ticker was notified, then notify another ticker.
936            if notified {
937                drop(sleepers);
938                self.state.notify();
939            }
940        }
941    }
942}
943
944/// A worker in a work-stealing executor.
945///
946/// This is just a ticker that also has an associated local queue for improved cache locality.
947struct Runner<'a> {
948    /// The executor state.
949    state: &'a State,
950
951    /// Inner ticker.
952    ticker: Ticker<'a>,
953
954    /// The local queue.
955    local: Arc<ConcurrentQueue<Runnable>>,
956
957    /// Bumped every time a runnable task is found.
958    ticks: usize,
959}
960
961impl Runner<'_> {
962    /// Creates a runner and registers it in the executor state.
963    fn new(state: &State) -> Runner<'_> {
964        let runner = Runner {
965            state,
966            ticker: Ticker::new(state),
967            local: Arc::new(ConcurrentQueue::bounded(512)),
968            ticks: 0,
969        };
970        state
971            .local_queues
972            .write()
973            .unwrap()
974            .push(runner.local.clone());
975        runner
976    }
977
978    /// Waits for the next runnable task to run.
979    async fn runnable(&mut self, rng: &mut fastrand::Rng) -> Runnable {
980        let runnable = self
981            .ticker
982            .runnable_with(|| {
983                // Try the local queue.
984                if let Ok(r) = self.local.pop() {
985                    return Some(r);
986                }
987
988                // Try stealing from the global queue.
989                if let Ok(r) = self.state.queue.pop() {
990                    steal(&self.state.queue, &self.local);
991                    return Some(r);
992                }
993
994                // Try stealing from other runners.
995                let local_queues = self.state.local_queues.read().unwrap();
996
997                // Pick a random starting point in the iterator list and rotate the list.
998                let n = local_queues.len();
999                let start = rng.usize(..n);
1000                let iter = local_queues
1001                    .iter()
1002                    .chain(local_queues.iter())
1003                    .skip(start)
1004                    .take(n);
1005
1006                // Remove this runner's local queue.
1007                let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local));
1008
1009                // Try stealing from each local queue in the list.
1010                for local in iter {
1011                    steal(local, &self.local);
1012                    if let Ok(r) = self.local.pop() {
1013                        return Some(r);
1014                    }
1015                }
1016
1017                None
1018            })
1019            .await;
1020
1021        // Bump the tick counter.
1022        self.ticks = self.ticks.wrapping_add(1);
1023
1024        if self.ticks % 64 == 0 {
1025            // Steal tasks from the global queue to ensure fair task scheduling.
1026            steal(&self.state.queue, &self.local);
1027        }
1028
1029        runnable
1030    }
1031}
1032
1033impl Drop for Runner<'_> {
1034    fn drop(&mut self) {
1035        // Remove the local queue.
1036        self.state
1037            .local_queues
1038            .write()
1039            .unwrap()
1040            .retain(|local| !Arc::ptr_eq(local, &self.local));
1041
1042        // Re-schedule remaining tasks in the local queue.
1043        while let Ok(r) = self.local.pop() {
1044            r.schedule();
1045        }
1046    }
1047}
1048
1049/// Steals some items from one queue into another.
1050fn steal<T>(src: &ConcurrentQueue<T>, dest: &ConcurrentQueue<T>) {
1051    // Half of `src`'s length rounded up.
1052    let mut count = (src.len() + 1) / 2;
1053
1054    if count > 0 {
1055        // Don't steal more than fits into the queue.
1056        if let Some(cap) = dest.capacity() {
1057            count = count.min(cap - dest.len());
1058        }
1059
1060        // Steal tasks.
1061        for _ in 0..count {
1062            if let Ok(t) = src.pop() {
1063                assert!(dest.push(t).is_ok());
1064            } else {
1065                break;
1066            }
1067        }
1068    }
1069}
1070
1071/// Debug implementation for `Executor` and `LocalExecutor`.
1072fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1073    // Get a reference to the state.
1074    let ptr = executor.state.load(Ordering::Acquire);
1075    if ptr.is_null() {
1076        // The executor has not been initialized.
1077        struct Uninitialized;
1078
1079        impl fmt::Debug for Uninitialized {
1080            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1081                f.write_str("<uninitialized>")
1082            }
1083        }
1084
1085        return f.debug_tuple(name).field(&Uninitialized).finish();
1086    }
1087
1088    // SAFETY: If the state pointer is not null, it must have been
1089    // allocated properly by Arc::new and converted via Arc::into_raw
1090    // in state_ptr.
1091    let state = unsafe { &*ptr };
1092
1093    debug_state(state, name, f)
1094}
1095
1096/// Debug implementation for `Executor` and `LocalExecutor`.
1097fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1098    /// Debug wrapper for the number of active tasks.
1099    struct ActiveTasks<'a>(&'a Mutex<Slab<Waker>>);
1100
1101    impl fmt::Debug for ActiveTasks<'_> {
1102        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1103            match self.0.try_lock() {
1104                Ok(lock) => fmt::Debug::fmt(&lock.len(), f),
1105                Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
1106                Err(TryLockError::Poisoned(err)) => fmt::Debug::fmt(&err.into_inner().len(), f),
1107            }
1108        }
1109    }
1110
1111    /// Debug wrapper for the local runners.
1112    struct LocalRunners<'a>(&'a RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>);
1113
1114    impl fmt::Debug for LocalRunners<'_> {
1115        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1116            match self.0.try_read() {
1117                Ok(lock) => f
1118                    .debug_list()
1119                    .entries(lock.iter().map(|queue| queue.len()))
1120                    .finish(),
1121                Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
1122                Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
1123            }
1124        }
1125    }
1126
1127    /// Debug wrapper for the sleepers.
1128    struct SleepCount<'a>(&'a Mutex<Sleepers>);
1129
1130    impl fmt::Debug for SleepCount<'_> {
1131        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1132            match self.0.try_lock() {
1133                Ok(lock) => fmt::Debug::fmt(&lock.count, f),
1134                Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
1135                Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
1136            }
1137        }
1138    }
1139
1140    f.debug_struct(name)
1141        .field("active", &ActiveTasks(&state.active))
1142        .field("global_tasks", &state.queue.len())
1143        .field("local_runners", &LocalRunners(&state.local_queues))
1144        .field("sleepers", &SleepCount(&state.sleepers))
1145        .finish()
1146}
1147
1148/// Runs a closure when dropped.
1149struct CallOnDrop<F: FnMut()>(F);
1150
1151impl<F: FnMut()> Drop for CallOnDrop<F> {
1152    fn drop(&mut self) {
1153        (self.0)();
1154    }
1155}
1156
1157pin_project! {
1158    /// A wrapper around a future, running a closure when dropped.
1159    struct AsyncCallOnDrop<Fut, Cleanup: FnMut()> {
1160        #[pin]
1161        future: Fut,
1162        cleanup: CallOnDrop<Cleanup>,
1163    }
1164}
1165
1166impl<Fut, Cleanup: FnMut()> AsyncCallOnDrop<Fut, Cleanup> {
1167    fn new(future: Fut, cleanup: Cleanup) -> Self {
1168        Self {
1169            future,
1170            cleanup: CallOnDrop(cleanup),
1171        }
1172    }
1173}
1174
1175impl<Fut: Future, Cleanup: FnMut()> Future for AsyncCallOnDrop<Fut, Cleanup> {
1176    type Output = Fut::Output;
1177
1178    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1179        self.project().future.poll(cx)
1180    }
1181}
1182
1183fn _ensure_send_and_sync() {
1184    use futures_lite::future::pending;
1185
1186    fn is_send<T: Send>(_: T) {}
1187    fn is_sync<T: Sync>(_: T) {}
1188    fn is_static<T: 'static>(_: T) {}
1189
1190    is_send::<Executor<'_>>(Executor::new());
1191    is_sync::<Executor<'_>>(Executor::new());
1192
1193    let ex = Executor::new();
1194    is_send(ex.run(pending::<()>()));
1195    is_sync(ex.run(pending::<()>()));
1196    is_send(ex.tick());
1197    is_sync(ex.tick());
1198    is_send(ex.schedule());
1199    is_sync(ex.schedule());
1200    is_static(ex.schedule());
1201
1202    /// ```compile_fail
1203    /// use async_executor::LocalExecutor;
1204    /// use futures_lite::future::pending;
1205    ///
1206    /// fn is_send<T: Send>(_: T) {}
1207    /// fn is_sync<T: Sync>(_: T) {}
1208    ///
1209    /// is_send::<LocalExecutor<'_>>(LocalExecutor::new());
1210    /// is_sync::<LocalExecutor<'_>>(LocalExecutor::new());
1211    ///
1212    /// let ex = LocalExecutor::new();
1213    /// is_send(ex.run(pending::<()>()));
1214    /// is_sync(ex.run(pending::<()>()));
1215    /// is_send(ex.tick());
1216    /// is_sync(ex.tick());
1217    /// ```
1218    fn _negative_test() {}
1219}