zbus/abstractions/
executor.rs

1#[cfg(not(feature = "tokio"))]
2use async_executor::Executor as AsyncExecutor;
3#[cfg(not(feature = "tokio"))]
4use async_task::Task as AsyncTask;
5#[cfg(not(feature = "tokio"))]
6use std::sync::Arc;
7#[cfg(feature = "tokio")]
8use std::{future::pending, marker::PhantomData};
9use std::{
10    future::Future,
11    pin::Pin,
12    task::{Context, Poll},
13};
14#[cfg(feature = "tokio")]
15use tokio::task::JoinHandle;
16
17/// A wrapper around the underlying runtime/executor.
18///
19/// This is used to run asynchronous tasks internally and allows integration with various runtimes.
20/// See [`crate::Connection::executor`] for an example of integration with external runtimes.
21///
22/// **Note:** You can (and should) completely ignore this type when building with `tokio` feature
23/// enabled.
24#[cfg(not(feature = "tokio"))]
25#[derive(Debug, Clone)]
26pub struct Executor<'a> {
27    executor: Arc<AsyncExecutor<'a>>,
28}
29#[cfg(feature = "tokio")]
30#[derive(Debug, Clone)]
31pub struct Executor<'a> {
32    phantom: PhantomData<&'a ()>,
33}
34
35impl<'a> Executor<'a> {
36    /// Spawns a task onto the executor.
37    #[doc(hidden)]
38    pub fn spawn<T: Send + 'static>(
39        &self,
40        future: impl Future<Output = T> + Send + 'static,
41        #[allow(unused)] name: &str,
42    ) -> Task<T> {
43        #[cfg(not(feature = "tokio"))]
44        {
45            Task(Some(self.executor.spawn(future)))
46        }
47
48        #[cfg(feature = "tokio")]
49        {
50            #[cfg(tokio_unstable)]
51            {
52                Task(Some(
53                    tokio::task::Builder::new()
54                        .name(name)
55                        .spawn(future)
56                        // SAFETY: Looking at the code, this call always returns an `Ok`.
57                        .unwrap(),
58                ))
59            }
60            #[cfg(not(tokio_unstable))]
61            {
62                Task(Some(tokio::task::spawn(future)))
63            }
64        }
65    }
66
67    /// Returns `true` if there are no unfinished tasks.
68    ///
69    /// With `tokio` feature enabled, this always returns `true`.
70    pub fn is_empty(&self) -> bool {
71        #[cfg(not(feature = "tokio"))]
72        {
73            self.executor.is_empty()
74        }
75
76        #[cfg(feature = "tokio")]
77        true
78    }
79
80    /// Runs a single task.
81    ///
82    /// With `tokio` feature enabled, its a noop and never returns.
83    pub async fn tick(&self) {
84        #[cfg(not(feature = "tokio"))]
85        {
86            self.executor.tick().await
87        }
88
89        #[cfg(feature = "tokio")]
90        {
91            pending().await
92        }
93    }
94
95    /// Create a new `Executor`.
96    pub(crate) fn new() -> Self {
97        #[cfg(not(feature = "tokio"))]
98        {
99            Self {
100                executor: Arc::new(AsyncExecutor::new()),
101            }
102        }
103
104        #[cfg(feature = "tokio")]
105        {
106            Self {
107                phantom: PhantomData,
108            }
109        }
110    }
111
112    /// Runs the executor until the given future completes.
113    ///
114    /// With `tokio` feature enabled, it just awaits on the `future`.
115    pub(crate) async fn run<T>(&self, future: impl Future<Output = T>) -> T {
116        #[cfg(not(feature = "tokio"))]
117        {
118            self.executor.run(future).await
119        }
120        #[cfg(feature = "tokio")]
121        {
122            future.await
123        }
124    }
125}
126
127/// A wrapper around the task API of the underlying runtime/executor.
128///
129/// This follows the semantics of `async_task::Task` on drop:
130///
131/// * it will be cancelled, rather than detached. For detaching, use the `detach` method.
132/// * errors from the task cancellation will will be ignored. If you need to know about task errors,
133///   convert the task to a `FallibleTask` using the `fallible` method.
134#[cfg(not(feature = "tokio"))]
135#[doc(hidden)]
136#[derive(Debug)]
137pub struct Task<T>(Option<AsyncTask<T>>);
138#[cfg(feature = "tokio")]
139#[doc(hidden)]
140#[derive(Debug)]
141pub struct Task<T>(Option<JoinHandle<T>>);
142
143impl<T> Task<T> {
144    /// Detaches the task to let it keep running in the background.
145    #[allow(unused_mut)]
146    #[allow(unused)]
147    pub fn detach(mut self) {
148        #[cfg(not(feature = "tokio"))]
149        {
150            self.0.take().expect("async_task::Task is none").detach()
151        }
152
153        #[cfg(feature = "tokio")]
154        {
155            self.0.take().expect("tokio::task::JoinHandle is none");
156        }
157    }
158}
159
160impl<T> Task<T>
161where
162    T: Send + 'static,
163{
164    /// Launch the given blocking function in a task.
165    #[allow(unused)]
166    pub(crate) fn spawn_blocking<F>(f: F, #[allow(unused)] name: &str) -> Self
167    where
168        F: FnOnce() -> T + Send + 'static,
169    {
170        #[cfg(not(feature = "tokio"))]
171        {
172            Self(Some(blocking::unblock(f)))
173        }
174
175        #[cfg(feature = "tokio")]
176        {
177            #[cfg(tokio_unstable)]
178            {
179                Self(Some(
180                    tokio::task::Builder::new()
181                        .name(name)
182                        .spawn_blocking(f)
183                        // SAFETY: Looking at the code, this call always returns an `Ok`.
184                        .unwrap(),
185                ))
186            }
187            #[cfg(not(tokio_unstable))]
188            {
189                Self(Some(tokio::task::spawn_blocking(f)))
190            }
191        }
192    }
193}
194
195impl<T> Drop for Task<T> {
196    fn drop(&mut self) {
197        #[cfg(feature = "tokio")]
198        {
199            if let Some(join_handle) = self.0.take() {
200                join_handle.abort();
201            }
202        }
203    }
204}
205
206impl<T> Future for Task<T> {
207    type Output = T;
208
209    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
210        #[cfg(not(feature = "tokio"))]
211        {
212            Pin::new(&mut self.get_mut().0.as_mut().expect("async_task::Task is none")).poll(cx)
213        }
214
215        #[cfg(feature = "tokio")]
216        {
217            Pin::new(
218                &mut self
219                    .get_mut()
220                    .0
221                    .as_mut()
222                    .expect("tokio::task::JoinHandle is none"),
223            )
224            .poll(cx)
225            .map(|r| r.expect("tokio::task::JoinHandle error"))
226        }
227    }
228}