async_executor/lib.rs
1//! Async executors.
2//!
3//! This crate provides two reference executors that trade performance for
4//! functionality. They should be considered reference executors that are "good
5//! enough" for most use cases. For more specialized use cases, consider writing
6//! your own executor on top of [`async-task`].
7//!
8//! [`async-task`]: https://crates.io/crates/async-task
9//!
10//! # Examples
11//!
12//! ```
13//! use async_executor::Executor;
14//! use futures_lite::future;
15//!
16//! // Create a new executor.
17//! let ex = Executor::new();
18//!
19//! // Spawn a task.
20//! let task = ex.spawn(async {
21//! println!("Hello world");
22//! });
23//!
24//! // Run the executor until the task completes.
25//! future::block_on(ex.run(task));
26//! ```
27
28#![warn(
29 missing_docs,
30 missing_debug_implementations,
31 rust_2018_idioms,
32 clippy::undocumented_unsafe_blocks
33)]
34#![doc(
35 html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
36)]
37#![doc(
38 html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
39)]
40#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
41
42use std::fmt;
43use std::marker::PhantomData;
44use std::panic::{RefUnwindSafe, UnwindSafe};
45use std::pin::Pin;
46use std::rc::Rc;
47use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
48use std::sync::{Arc, Mutex, MutexGuard, RwLock, TryLockError};
49use std::task::{Context, Poll, Waker};
50
51use async_task::{Builder, Runnable};
52use concurrent_queue::ConcurrentQueue;
53use futures_lite::{future, prelude::*};
54use pin_project_lite::pin_project;
55use slab::Slab;
56
57#[cfg(feature = "static")]
58mod static_executors;
59
60#[doc(no_inline)]
61pub use async_task::{FallibleTask, Task};
62#[cfg(feature = "static")]
63#[cfg_attr(docsrs, doc(cfg(any(feature = "static"))))]
64pub use static_executors::*;
65
66/// An async executor.
67///
68/// # Examples
69///
70/// A multi-threaded executor:
71///
72/// ```
73/// use async_channel::unbounded;
74/// use async_executor::Executor;
75/// use easy_parallel::Parallel;
76/// use futures_lite::future;
77///
78/// let ex = Executor::new();
79/// let (signal, shutdown) = unbounded::<()>();
80///
81/// Parallel::new()
82/// // Run four executor threads.
83/// .each(0..4, |_| future::block_on(ex.run(shutdown.recv())))
84/// // Run the main future on the current thread.
85/// .finish(|| future::block_on(async {
86/// println!("Hello world!");
87/// drop(signal);
88/// }));
89/// ```
90pub struct Executor<'a> {
91 /// The executor state.
92 state: AtomicPtr<State>,
93
94 /// Makes the `'a` lifetime invariant.
95 _marker: PhantomData<std::cell::UnsafeCell<&'a ()>>,
96}
97
98// SAFETY: Executor stores no thread local state that can be accessed via other thread.
99unsafe impl Send for Executor<'_> {}
100// SAFETY: Executor internally synchronizes all of it's operations internally.
101unsafe impl Sync for Executor<'_> {}
102
103impl UnwindSafe for Executor<'_> {}
104impl RefUnwindSafe for Executor<'_> {}
105
106impl fmt::Debug for Executor<'_> {
107 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108 debug_executor(self, "Executor", f)
109 }
110}
111
112impl<'a> Executor<'a> {
113 /// Creates a new executor.
114 ///
115 /// # Examples
116 ///
117 /// ```
118 /// use async_executor::Executor;
119 ///
120 /// let ex = Executor::new();
121 /// ```
122 pub const fn new() -> Executor<'a> {
123 Executor {
124 state: AtomicPtr::new(std::ptr::null_mut()),
125 _marker: PhantomData,
126 }
127 }
128
129 /// Returns `true` if there are no unfinished tasks.
130 ///
131 /// # Examples
132 ///
133 /// ```
134 /// use async_executor::Executor;
135 ///
136 /// let ex = Executor::new();
137 /// assert!(ex.is_empty());
138 ///
139 /// let task = ex.spawn(async {
140 /// println!("Hello world");
141 /// });
142 /// assert!(!ex.is_empty());
143 ///
144 /// assert!(ex.try_tick());
145 /// assert!(ex.is_empty());
146 /// ```
147 pub fn is_empty(&self) -> bool {
148 self.state().active().is_empty()
149 }
150
151 /// Spawns a task onto the executor.
152 ///
153 /// # Examples
154 ///
155 /// ```
156 /// use async_executor::Executor;
157 ///
158 /// let ex = Executor::new();
159 ///
160 /// let task = ex.spawn(async {
161 /// println!("Hello world");
162 /// });
163 /// ```
164 pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> {
165 let mut active = self.state().active();
166
167 // SAFETY: `T` and the future are `Send`.
168 unsafe { self.spawn_inner(future, &mut active) }
169 }
170
171 /// Spawns many tasks onto the executor.
172 ///
173 /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and
174 /// spawns all of the tasks in one go. With large amounts of tasks this can improve
175 /// contention.
176 ///
177 /// For very large numbers of tasks the lock is occasionally dropped and re-acquired to
178 /// prevent runner thread starvation. It is assumed that the iterator provided does not
179 /// block; blocking iterators can lock up the internal mutex and therefore the entire
180 /// executor.
181 ///
182 /// ## Example
183 ///
184 /// ```
185 /// use async_executor::Executor;
186 /// use futures_lite::{stream, prelude::*};
187 /// use std::future::ready;
188 ///
189 /// # futures_lite::future::block_on(async {
190 /// let mut ex = Executor::new();
191 ///
192 /// let futures = [
193 /// ready(1),
194 /// ready(2),
195 /// ready(3)
196 /// ];
197 ///
198 /// // Spawn all of the futures onto the executor at once.
199 /// let mut tasks = vec![];
200 /// ex.spawn_many(futures, &mut tasks);
201 ///
202 /// // Await all of them.
203 /// let results = ex.run(async move {
204 /// stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await
205 /// }).await;
206 /// assert_eq!(results, [1, 2, 3]);
207 /// # });
208 /// ```
209 ///
210 /// [`spawn`]: Executor::spawn
211 pub fn spawn_many<T: Send + 'a, F: Future<Output = T> + Send + 'a>(
212 &self,
213 futures: impl IntoIterator<Item = F>,
214 handles: &mut impl Extend<Task<F::Output>>,
215 ) {
216 let mut active = Some(self.state().active());
217
218 // Convert the futures into tasks.
219 let tasks = futures.into_iter().enumerate().map(move |(i, future)| {
220 // SAFETY: `T` and the future are `Send`.
221 let task = unsafe { self.spawn_inner(future, active.as_mut().unwrap()) };
222
223 // Yield the lock every once in a while to ease contention.
224 if i.wrapping_sub(1) % 500 == 0 {
225 drop(active.take());
226 active = Some(self.state().active());
227 }
228
229 task
230 });
231
232 // Push the tasks to the user's collection.
233 handles.extend(tasks);
234 }
235
236 /// Spawn a future while holding the inner lock.
237 ///
238 /// # Safety
239 ///
240 /// If this is an `Executor`, `F` and `T` must be `Send`.
241 unsafe fn spawn_inner<T: 'a>(
242 &self,
243 future: impl Future<Output = T> + 'a,
244 active: &mut Slab<Waker>,
245 ) -> Task<T> {
246 // Remove the task from the set of active tasks when the future finishes.
247 let entry = active.vacant_entry();
248 let index = entry.key();
249 let state = self.state_as_arc();
250 let future = AsyncCallOnDrop::new(future, move || drop(state.active().try_remove(index)));
251
252 // Create the task and register it in the set of active tasks.
253 //
254 // SAFETY:
255 //
256 // If `future` is not `Send`, this must be a `LocalExecutor` as per this
257 // function's unsafe precondition. Since `LocalExecutor` is `!Sync`,
258 // `try_tick`, `tick` and `run` can only be called from the origin
259 // thread of the `LocalExecutor`. Similarly, `spawn` can only be called
260 // from the origin thread, ensuring that `future` and the executor share
261 // the same origin thread. The `Runnable` can be scheduled from other
262 // threads, but because of the above `Runnable` can only be called or
263 // dropped on the origin thread.
264 //
265 // `future` is not `'static`, but we make sure that the `Runnable` does
266 // not outlive `'a`. When the executor is dropped, the `active` field is
267 // drained and all of the `Waker`s are woken. Then, the queue inside of
268 // the `Executor` is drained of all of its runnables. This ensures that
269 // runnables are dropped and this precondition is satisfied.
270 //
271 // `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
272 // Therefore we do not need to worry about what is done with the
273 // `Waker`.
274 let (runnable, task) = Builder::new()
275 .propagate_panic(true)
276 .spawn_unchecked(|()| future, self.schedule());
277 entry.insert(runnable.waker());
278
279 runnable.schedule();
280 task
281 }
282
283 /// Attempts to run a task if at least one is scheduled.
284 ///
285 /// Running a scheduled task means simply polling its future once.
286 ///
287 /// # Examples
288 ///
289 /// ```
290 /// use async_executor::Executor;
291 ///
292 /// let ex = Executor::new();
293 /// assert!(!ex.try_tick()); // no tasks to run
294 ///
295 /// let task = ex.spawn(async {
296 /// println!("Hello world");
297 /// });
298 /// assert!(ex.try_tick()); // a task was found
299 /// ```
300 pub fn try_tick(&self) -> bool {
301 self.state().try_tick()
302 }
303
304 /// Runs a single task.
305 ///
306 /// Running a task means simply polling its future once.
307 ///
308 /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
309 ///
310 /// # Examples
311 ///
312 /// ```
313 /// use async_executor::Executor;
314 /// use futures_lite::future;
315 ///
316 /// let ex = Executor::new();
317 ///
318 /// let task = ex.spawn(async {
319 /// println!("Hello world");
320 /// });
321 /// future::block_on(ex.tick()); // runs the task
322 /// ```
323 pub async fn tick(&self) {
324 self.state().tick().await;
325 }
326
327 /// Runs the executor until the given future completes.
328 ///
329 /// # Examples
330 ///
331 /// ```
332 /// use async_executor::Executor;
333 /// use futures_lite::future;
334 ///
335 /// let ex = Executor::new();
336 ///
337 /// let task = ex.spawn(async { 1 + 2 });
338 /// let res = future::block_on(ex.run(async { task.await * 2 }));
339 ///
340 /// assert_eq!(res, 6);
341 /// ```
342 pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
343 self.state().run(future).await
344 }
345
346 /// Returns a function that schedules a runnable task when it gets woken up.
347 fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
348 let state = self.state_as_arc();
349
350 // TODO: If possible, push into the current local queue and notify the ticker.
351 move |runnable| {
352 state.queue.push(runnable).unwrap();
353 state.notify();
354 }
355 }
356
357 /// Returns a pointer to the inner state.
358 #[inline]
359 fn state_ptr(&self) -> *const State {
360 #[cold]
361 fn alloc_state(atomic_ptr: &AtomicPtr<State>) -> *mut State {
362 let state = Arc::new(State::new());
363 // TODO: Switch this to use cast_mut once the MSRV can be bumped past 1.65
364 let ptr = Arc::into_raw(state) as *mut State;
365 if let Err(actual) = atomic_ptr.compare_exchange(
366 std::ptr::null_mut(),
367 ptr,
368 Ordering::AcqRel,
369 Ordering::Acquire,
370 ) {
371 // SAFETY: This was just created from Arc::into_raw.
372 drop(unsafe { Arc::from_raw(ptr) });
373 actual
374 } else {
375 ptr
376 }
377 }
378
379 let mut ptr = self.state.load(Ordering::Acquire);
380 if ptr.is_null() {
381 ptr = alloc_state(&self.state);
382 }
383 ptr
384 }
385
386 /// Returns a reference to the inner state.
387 #[inline]
388 fn state(&self) -> &State {
389 // SAFETY: So long as an Executor lives, it's state pointer will always be valid
390 // when accessed through state_ptr.
391 unsafe { &*self.state_ptr() }
392 }
393
394 // Clones the inner state Arc
395 #[inline]
396 fn state_as_arc(&self) -> Arc<State> {
397 // SAFETY: So long as an Executor lives, it's state pointer will always be a valid
398 // Arc when accessed through state_ptr.
399 let arc = unsafe { Arc::from_raw(self.state_ptr()) };
400 let clone = arc.clone();
401 std::mem::forget(arc);
402 clone
403 }
404}
405
406impl Drop for Executor<'_> {
407 fn drop(&mut self) {
408 let ptr = *self.state.get_mut();
409 if ptr.is_null() {
410 return;
411 }
412
413 // SAFETY: As ptr is not null, it was allocated via Arc::new and converted
414 // via Arc::into_raw in state_ptr.
415 let state = unsafe { Arc::from_raw(ptr) };
416
417 let mut active = state.active();
418 for w in active.drain() {
419 w.wake();
420 }
421 drop(active);
422
423 while state.queue.pop().is_ok() {}
424 }
425}
426
427impl<'a> Default for Executor<'a> {
428 fn default() -> Executor<'a> {
429 Executor::new()
430 }
431}
432
433/// A thread-local executor.
434///
435/// The executor can only be run on the thread that created it.
436///
437/// # Examples
438///
439/// ```
440/// use async_executor::LocalExecutor;
441/// use futures_lite::future;
442///
443/// let local_ex = LocalExecutor::new();
444///
445/// future::block_on(local_ex.run(async {
446/// println!("Hello world!");
447/// }));
448/// ```
449pub struct LocalExecutor<'a> {
450 /// The inner executor.
451 inner: Executor<'a>,
452
453 /// Makes the type `!Send` and `!Sync`.
454 _marker: PhantomData<Rc<()>>,
455}
456
457impl UnwindSafe for LocalExecutor<'_> {}
458impl RefUnwindSafe for LocalExecutor<'_> {}
459
460impl fmt::Debug for LocalExecutor<'_> {
461 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
462 debug_executor(&self.inner, "LocalExecutor", f)
463 }
464}
465
466impl<'a> LocalExecutor<'a> {
467 /// Creates a single-threaded executor.
468 ///
469 /// # Examples
470 ///
471 /// ```
472 /// use async_executor::LocalExecutor;
473 ///
474 /// let local_ex = LocalExecutor::new();
475 /// ```
476 pub const fn new() -> LocalExecutor<'a> {
477 LocalExecutor {
478 inner: Executor::new(),
479 _marker: PhantomData,
480 }
481 }
482
483 /// Returns `true` if there are no unfinished tasks.
484 ///
485 /// # Examples
486 ///
487 /// ```
488 /// use async_executor::LocalExecutor;
489 ///
490 /// let local_ex = LocalExecutor::new();
491 /// assert!(local_ex.is_empty());
492 ///
493 /// let task = local_ex.spawn(async {
494 /// println!("Hello world");
495 /// });
496 /// assert!(!local_ex.is_empty());
497 ///
498 /// assert!(local_ex.try_tick());
499 /// assert!(local_ex.is_empty());
500 /// ```
501 pub fn is_empty(&self) -> bool {
502 self.inner().is_empty()
503 }
504
505 /// Spawns a task onto the executor.
506 ///
507 /// # Examples
508 ///
509 /// ```
510 /// use async_executor::LocalExecutor;
511 ///
512 /// let local_ex = LocalExecutor::new();
513 ///
514 /// let task = local_ex.spawn(async {
515 /// println!("Hello world");
516 /// });
517 /// ```
518 pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> {
519 let mut active = self.inner().state().active();
520
521 // SAFETY: This executor is not thread safe, so the future and its result
522 // cannot be sent to another thread.
523 unsafe { self.inner().spawn_inner(future, &mut active) }
524 }
525
526 /// Spawns many tasks onto the executor.
527 ///
528 /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and
529 /// spawns all of the tasks in one go. With large amounts of tasks this can improve
530 /// contention.
531 ///
532 /// It is assumed that the iterator provided does not block; blocking iterators can lock up
533 /// the internal mutex and therefore the entire executor. Unlike [`Executor::spawn`], the
534 /// mutex is not released, as there are no other threads that can poll this executor.
535 ///
536 /// ## Example
537 ///
538 /// ```
539 /// use async_executor::LocalExecutor;
540 /// use futures_lite::{stream, prelude::*};
541 /// use std::future::ready;
542 ///
543 /// # futures_lite::future::block_on(async {
544 /// let mut ex = LocalExecutor::new();
545 ///
546 /// let futures = [
547 /// ready(1),
548 /// ready(2),
549 /// ready(3)
550 /// ];
551 ///
552 /// // Spawn all of the futures onto the executor at once.
553 /// let mut tasks = vec![];
554 /// ex.spawn_many(futures, &mut tasks);
555 ///
556 /// // Await all of them.
557 /// let results = ex.run(async move {
558 /// stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await
559 /// }).await;
560 /// assert_eq!(results, [1, 2, 3]);
561 /// # });
562 /// ```
563 ///
564 /// [`spawn`]: LocalExecutor::spawn
565 /// [`Executor::spawn_many`]: Executor::spawn_many
566 pub fn spawn_many<T: 'a, F: Future<Output = T> + 'a>(
567 &self,
568 futures: impl IntoIterator<Item = F>,
569 handles: &mut impl Extend<Task<F::Output>>,
570 ) {
571 let mut active = self.inner().state().active();
572
573 // Convert all of the futures to tasks.
574 let tasks = futures.into_iter().map(|future| {
575 // SAFETY: This executor is not thread safe, so the future and its result
576 // cannot be sent to another thread.
577 unsafe { self.inner().spawn_inner(future, &mut active) }
578
579 // As only one thread can spawn or poll tasks at a time, there is no need
580 // to release lock contention here.
581 });
582
583 // Push them to the user's collection.
584 handles.extend(tasks);
585 }
586
587 /// Attempts to run a task if at least one is scheduled.
588 ///
589 /// Running a scheduled task means simply polling its future once.
590 ///
591 /// # Examples
592 ///
593 /// ```
594 /// use async_executor::LocalExecutor;
595 ///
596 /// let ex = LocalExecutor::new();
597 /// assert!(!ex.try_tick()); // no tasks to run
598 ///
599 /// let task = ex.spawn(async {
600 /// println!("Hello world");
601 /// });
602 /// assert!(ex.try_tick()); // a task was found
603 /// ```
604 pub fn try_tick(&self) -> bool {
605 self.inner().try_tick()
606 }
607
608 /// Runs a single task.
609 ///
610 /// Running a task means simply polling its future once.
611 ///
612 /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
613 ///
614 /// # Examples
615 ///
616 /// ```
617 /// use async_executor::LocalExecutor;
618 /// use futures_lite::future;
619 ///
620 /// let ex = LocalExecutor::new();
621 ///
622 /// let task = ex.spawn(async {
623 /// println!("Hello world");
624 /// });
625 /// future::block_on(ex.tick()); // runs the task
626 /// ```
627 pub async fn tick(&self) {
628 self.inner().tick().await
629 }
630
631 /// Runs the executor until the given future completes.
632 ///
633 /// # Examples
634 ///
635 /// ```
636 /// use async_executor::LocalExecutor;
637 /// use futures_lite::future;
638 ///
639 /// let local_ex = LocalExecutor::new();
640 ///
641 /// let task = local_ex.spawn(async { 1 + 2 });
642 /// let res = future::block_on(local_ex.run(async { task.await * 2 }));
643 ///
644 /// assert_eq!(res, 6);
645 /// ```
646 pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
647 self.inner().run(future).await
648 }
649
650 /// Returns a reference to the inner executor.
651 fn inner(&self) -> &Executor<'a> {
652 &self.inner
653 }
654}
655
656impl<'a> Default for LocalExecutor<'a> {
657 fn default() -> LocalExecutor<'a> {
658 LocalExecutor::new()
659 }
660}
661
662/// The state of a executor.
663struct State {
664 /// The global queue.
665 queue: ConcurrentQueue<Runnable>,
666
667 /// Local queues created by runners.
668 local_queues: RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>,
669
670 /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping.
671 notified: AtomicBool,
672
673 /// A list of sleeping tickers.
674 sleepers: Mutex<Sleepers>,
675
676 /// Currently active tasks.
677 active: Mutex<Slab<Waker>>,
678}
679
680impl State {
681 /// Creates state for a new executor.
682 const fn new() -> State {
683 State {
684 queue: ConcurrentQueue::unbounded(),
685 local_queues: RwLock::new(Vec::new()),
686 notified: AtomicBool::new(true),
687 sleepers: Mutex::new(Sleepers {
688 count: 0,
689 wakers: Vec::new(),
690 free_ids: Vec::new(),
691 }),
692 active: Mutex::new(Slab::new()),
693 }
694 }
695
696 /// Returns a reference to currently active tasks.
697 fn active(&self) -> MutexGuard<'_, Slab<Waker>> {
698 self.active.lock().unwrap_or_else(|e| e.into_inner())
699 }
700
701 /// Notifies a sleeping ticker.
702 #[inline]
703 fn notify(&self) {
704 if self
705 .notified
706 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
707 .is_ok()
708 {
709 let waker = self.sleepers.lock().unwrap().notify();
710 if let Some(w) = waker {
711 w.wake();
712 }
713 }
714 }
715
716 pub(crate) fn try_tick(&self) -> bool {
717 match self.queue.pop() {
718 Err(_) => false,
719 Ok(runnable) => {
720 // Notify another ticker now to pick up where this ticker left off, just in case
721 // running the task takes a long time.
722 self.notify();
723
724 // Run the task.
725 runnable.run();
726 true
727 }
728 }
729 }
730
731 pub(crate) async fn tick(&self) {
732 let runnable = Ticker::new(self).runnable().await;
733 runnable.run();
734 }
735
736 pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
737 let mut runner = Runner::new(self);
738 let mut rng = fastrand::Rng::new();
739
740 // A future that runs tasks forever.
741 let run_forever = async {
742 loop {
743 for _ in 0..200 {
744 let runnable = runner.runnable(&mut rng).await;
745 runnable.run();
746 }
747 future::yield_now().await;
748 }
749 };
750
751 // Run `future` and `run_forever` concurrently until `future` completes.
752 future.or(run_forever).await
753 }
754}
755
756/// A list of sleeping tickers.
757struct Sleepers {
758 /// Number of sleeping tickers (both notified and unnotified).
759 count: usize,
760
761 /// IDs and wakers of sleeping unnotified tickers.
762 ///
763 /// A sleeping ticker is notified when its waker is missing from this list.
764 wakers: Vec<(usize, Waker)>,
765
766 /// Reclaimed IDs.
767 free_ids: Vec<usize>,
768}
769
770impl Sleepers {
771 /// Inserts a new sleeping ticker.
772 fn insert(&mut self, waker: &Waker) -> usize {
773 let id = match self.free_ids.pop() {
774 Some(id) => id,
775 None => self.count + 1,
776 };
777 self.count += 1;
778 self.wakers.push((id, waker.clone()));
779 id
780 }
781
782 /// Re-inserts a sleeping ticker's waker if it was notified.
783 ///
784 /// Returns `true` if the ticker was notified.
785 fn update(&mut self, id: usize, waker: &Waker) -> bool {
786 for item in &mut self.wakers {
787 if item.0 == id {
788 item.1.clone_from(waker);
789 return false;
790 }
791 }
792
793 self.wakers.push((id, waker.clone()));
794 true
795 }
796
797 /// Removes a previously inserted sleeping ticker.
798 ///
799 /// Returns `true` if the ticker was notified.
800 fn remove(&mut self, id: usize) -> bool {
801 self.count -= 1;
802 self.free_ids.push(id);
803
804 for i in (0..self.wakers.len()).rev() {
805 if self.wakers[i].0 == id {
806 self.wakers.remove(i);
807 return false;
808 }
809 }
810 true
811 }
812
813 /// Returns `true` if a sleeping ticker is notified or no tickers are sleeping.
814 fn is_notified(&self) -> bool {
815 self.count == 0 || self.count > self.wakers.len()
816 }
817
818 /// Returns notification waker for a sleeping ticker.
819 ///
820 /// If a ticker was notified already or there are no tickers, `None` will be returned.
821 fn notify(&mut self) -> Option<Waker> {
822 if self.wakers.len() == self.count {
823 self.wakers.pop().map(|item| item.1)
824 } else {
825 None
826 }
827 }
828}
829
830/// Runs task one by one.
831struct Ticker<'a> {
832 /// The executor state.
833 state: &'a State,
834
835 /// Set to a non-zero sleeper ID when in sleeping state.
836 ///
837 /// States a ticker can be in:
838 /// 1) Woken.
839 /// 2a) Sleeping and unnotified.
840 /// 2b) Sleeping and notified.
841 sleeping: usize,
842}
843
844impl Ticker<'_> {
845 /// Creates a ticker.
846 fn new(state: &State) -> Ticker<'_> {
847 Ticker { state, sleeping: 0 }
848 }
849
850 /// Moves the ticker into sleeping and unnotified state.
851 ///
852 /// Returns `false` if the ticker was already sleeping and unnotified.
853 fn sleep(&mut self, waker: &Waker) -> bool {
854 let mut sleepers = self.state.sleepers.lock().unwrap();
855
856 match self.sleeping {
857 // Move to sleeping state.
858 0 => {
859 self.sleeping = sleepers.insert(waker);
860 }
861
862 // Already sleeping, check if notified.
863 id => {
864 if !sleepers.update(id, waker) {
865 return false;
866 }
867 }
868 }
869
870 self.state
871 .notified
872 .store(sleepers.is_notified(), Ordering::Release);
873
874 true
875 }
876
877 /// Moves the ticker into woken state.
878 fn wake(&mut self) {
879 if self.sleeping != 0 {
880 let mut sleepers = self.state.sleepers.lock().unwrap();
881 sleepers.remove(self.sleeping);
882
883 self.state
884 .notified
885 .store(sleepers.is_notified(), Ordering::Release);
886 }
887 self.sleeping = 0;
888 }
889
890 /// Waits for the next runnable task to run.
891 async fn runnable(&mut self) -> Runnable {
892 self.runnable_with(|| self.state.queue.pop().ok()).await
893 }
894
895 /// Waits for the next runnable task to run, given a function that searches for a task.
896 async fn runnable_with(&mut self, mut search: impl FnMut() -> Option<Runnable>) -> Runnable {
897 future::poll_fn(|cx| {
898 loop {
899 match search() {
900 None => {
901 // Move to sleeping and unnotified state.
902 if !self.sleep(cx.waker()) {
903 // If already sleeping and unnotified, return.
904 return Poll::Pending;
905 }
906 }
907 Some(r) => {
908 // Wake up.
909 self.wake();
910
911 // Notify another ticker now to pick up where this ticker left off, just in
912 // case running the task takes a long time.
913 self.state.notify();
914
915 return Poll::Ready(r);
916 }
917 }
918 }
919 })
920 .await
921 }
922}
923
924impl Drop for Ticker<'_> {
925 fn drop(&mut self) {
926 // If this ticker is in sleeping state, it must be removed from the sleepers list.
927 if self.sleeping != 0 {
928 let mut sleepers = self.state.sleepers.lock().unwrap();
929 let notified = sleepers.remove(self.sleeping);
930
931 self.state
932 .notified
933 .store(sleepers.is_notified(), Ordering::Release);
934
935 // If this ticker was notified, then notify another ticker.
936 if notified {
937 drop(sleepers);
938 self.state.notify();
939 }
940 }
941 }
942}
943
944/// A worker in a work-stealing executor.
945///
946/// This is just a ticker that also has an associated local queue for improved cache locality.
947struct Runner<'a> {
948 /// The executor state.
949 state: &'a State,
950
951 /// Inner ticker.
952 ticker: Ticker<'a>,
953
954 /// The local queue.
955 local: Arc<ConcurrentQueue<Runnable>>,
956
957 /// Bumped every time a runnable task is found.
958 ticks: usize,
959}
960
961impl Runner<'_> {
962 /// Creates a runner and registers it in the executor state.
963 fn new(state: &State) -> Runner<'_> {
964 let runner = Runner {
965 state,
966 ticker: Ticker::new(state),
967 local: Arc::new(ConcurrentQueue::bounded(512)),
968 ticks: 0,
969 };
970 state
971 .local_queues
972 .write()
973 .unwrap()
974 .push(runner.local.clone());
975 runner
976 }
977
978 /// Waits for the next runnable task to run.
979 async fn runnable(&mut self, rng: &mut fastrand::Rng) -> Runnable {
980 let runnable = self
981 .ticker
982 .runnable_with(|| {
983 // Try the local queue.
984 if let Ok(r) = self.local.pop() {
985 return Some(r);
986 }
987
988 // Try stealing from the global queue.
989 if let Ok(r) = self.state.queue.pop() {
990 steal(&self.state.queue, &self.local);
991 return Some(r);
992 }
993
994 // Try stealing from other runners.
995 let local_queues = self.state.local_queues.read().unwrap();
996
997 // Pick a random starting point in the iterator list and rotate the list.
998 let n = local_queues.len();
999 let start = rng.usize(..n);
1000 let iter = local_queues
1001 .iter()
1002 .chain(local_queues.iter())
1003 .skip(start)
1004 .take(n);
1005
1006 // Remove this runner's local queue.
1007 let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local));
1008
1009 // Try stealing from each local queue in the list.
1010 for local in iter {
1011 steal(local, &self.local);
1012 if let Ok(r) = self.local.pop() {
1013 return Some(r);
1014 }
1015 }
1016
1017 None
1018 })
1019 .await;
1020
1021 // Bump the tick counter.
1022 self.ticks = self.ticks.wrapping_add(1);
1023
1024 if self.ticks % 64 == 0 {
1025 // Steal tasks from the global queue to ensure fair task scheduling.
1026 steal(&self.state.queue, &self.local);
1027 }
1028
1029 runnable
1030 }
1031}
1032
1033impl Drop for Runner<'_> {
1034 fn drop(&mut self) {
1035 // Remove the local queue.
1036 self.state
1037 .local_queues
1038 .write()
1039 .unwrap()
1040 .retain(|local| !Arc::ptr_eq(local, &self.local));
1041
1042 // Re-schedule remaining tasks in the local queue.
1043 while let Ok(r) = self.local.pop() {
1044 r.schedule();
1045 }
1046 }
1047}
1048
1049/// Steals some items from one queue into another.
1050fn steal<T>(src: &ConcurrentQueue<T>, dest: &ConcurrentQueue<T>) {
1051 // Half of `src`'s length rounded up.
1052 let mut count = (src.len() + 1) / 2;
1053
1054 if count > 0 {
1055 // Don't steal more than fits into the queue.
1056 if let Some(cap) = dest.capacity() {
1057 count = count.min(cap - dest.len());
1058 }
1059
1060 // Steal tasks.
1061 for _ in 0..count {
1062 if let Ok(t) = src.pop() {
1063 assert!(dest.push(t).is_ok());
1064 } else {
1065 break;
1066 }
1067 }
1068 }
1069}
1070
1071/// Debug implementation for `Executor` and `LocalExecutor`.
1072fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1073 // Get a reference to the state.
1074 let ptr = executor.state.load(Ordering::Acquire);
1075 if ptr.is_null() {
1076 // The executor has not been initialized.
1077 struct Uninitialized;
1078
1079 impl fmt::Debug for Uninitialized {
1080 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1081 f.write_str("<uninitialized>")
1082 }
1083 }
1084
1085 return f.debug_tuple(name).field(&Uninitialized).finish();
1086 }
1087
1088 // SAFETY: If the state pointer is not null, it must have been
1089 // allocated properly by Arc::new and converted via Arc::into_raw
1090 // in state_ptr.
1091 let state = unsafe { &*ptr };
1092
1093 debug_state(state, name, f)
1094}
1095
1096/// Debug implementation for `Executor` and `LocalExecutor`.
1097fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1098 /// Debug wrapper for the number of active tasks.
1099 struct ActiveTasks<'a>(&'a Mutex<Slab<Waker>>);
1100
1101 impl fmt::Debug for ActiveTasks<'_> {
1102 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1103 match self.0.try_lock() {
1104 Ok(lock) => fmt::Debug::fmt(&lock.len(), f),
1105 Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
1106 Err(TryLockError::Poisoned(err)) => fmt::Debug::fmt(&err.into_inner().len(), f),
1107 }
1108 }
1109 }
1110
1111 /// Debug wrapper for the local runners.
1112 struct LocalRunners<'a>(&'a RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>);
1113
1114 impl fmt::Debug for LocalRunners<'_> {
1115 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1116 match self.0.try_read() {
1117 Ok(lock) => f
1118 .debug_list()
1119 .entries(lock.iter().map(|queue| queue.len()))
1120 .finish(),
1121 Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
1122 Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
1123 }
1124 }
1125 }
1126
1127 /// Debug wrapper for the sleepers.
1128 struct SleepCount<'a>(&'a Mutex<Sleepers>);
1129
1130 impl fmt::Debug for SleepCount<'_> {
1131 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1132 match self.0.try_lock() {
1133 Ok(lock) => fmt::Debug::fmt(&lock.count, f),
1134 Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
1135 Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
1136 }
1137 }
1138 }
1139
1140 f.debug_struct(name)
1141 .field("active", &ActiveTasks(&state.active))
1142 .field("global_tasks", &state.queue.len())
1143 .field("local_runners", &LocalRunners(&state.local_queues))
1144 .field("sleepers", &SleepCount(&state.sleepers))
1145 .finish()
1146}
1147
1148/// Runs a closure when dropped.
1149struct CallOnDrop<F: FnMut()>(F);
1150
1151impl<F: FnMut()> Drop for CallOnDrop<F> {
1152 fn drop(&mut self) {
1153 (self.0)();
1154 }
1155}
1156
1157pin_project! {
1158 /// A wrapper around a future, running a closure when dropped.
1159 struct AsyncCallOnDrop<Fut, Cleanup: FnMut()> {
1160 #[pin]
1161 future: Fut,
1162 cleanup: CallOnDrop<Cleanup>,
1163 }
1164}
1165
1166impl<Fut, Cleanup: FnMut()> AsyncCallOnDrop<Fut, Cleanup> {
1167 fn new(future: Fut, cleanup: Cleanup) -> Self {
1168 Self {
1169 future,
1170 cleanup: CallOnDrop(cleanup),
1171 }
1172 }
1173}
1174
1175impl<Fut: Future, Cleanup: FnMut()> Future for AsyncCallOnDrop<Fut, Cleanup> {
1176 type Output = Fut::Output;
1177
1178 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1179 self.project().future.poll(cx)
1180 }
1181}
1182
1183fn _ensure_send_and_sync() {
1184 use futures_lite::future::pending;
1185
1186 fn is_send<T: Send>(_: T) {}
1187 fn is_sync<T: Sync>(_: T) {}
1188 fn is_static<T: 'static>(_: T) {}
1189
1190 is_send::<Executor<'_>>(Executor::new());
1191 is_sync::<Executor<'_>>(Executor::new());
1192
1193 let ex = Executor::new();
1194 is_send(ex.run(pending::<()>()));
1195 is_sync(ex.run(pending::<()>()));
1196 is_send(ex.tick());
1197 is_sync(ex.tick());
1198 is_send(ex.schedule());
1199 is_sync(ex.schedule());
1200 is_static(ex.schedule());
1201
1202 /// ```compile_fail
1203 /// use async_executor::LocalExecutor;
1204 /// use futures_lite::future::pending;
1205 ///
1206 /// fn is_send<T: Send>(_: T) {}
1207 /// fn is_sync<T: Sync>(_: T) {}
1208 ///
1209 /// is_send::<LocalExecutor<'_>>(LocalExecutor::new());
1210 /// is_sync::<LocalExecutor<'_>>(LocalExecutor::new());
1211 ///
1212 /// let ex = LocalExecutor::new();
1213 /// is_send(ex.run(pending::<()>()));
1214 /// is_sync(ex.run(pending::<()>()));
1215 /// is_send(ex.tick());
1216 /// is_sync(ex.tick());
1217 /// ```
1218 fn _negative_test() {}
1219}