zbus/abstractions/
executor.rs
1#[cfg(not(feature = "tokio"))]
2use async_executor::Executor as AsyncExecutor;
3#[cfg(not(feature = "tokio"))]
4use async_task::Task as AsyncTask;
5#[cfg(not(feature = "tokio"))]
6use std::sync::Arc;
7#[cfg(feature = "tokio")]
8use std::{future::pending, marker::PhantomData};
9use std::{
10 future::Future,
11 pin::Pin,
12 task::{Context, Poll},
13};
14#[cfg(feature = "tokio")]
15use tokio::task::JoinHandle;
16
17#[cfg(not(feature = "tokio"))]
25#[derive(Debug, Clone)]
26pub struct Executor<'a> {
27 executor: Arc<AsyncExecutor<'a>>,
28}
29#[cfg(feature = "tokio")]
30#[derive(Debug, Clone)]
31pub struct Executor<'a> {
32 phantom: PhantomData<&'a ()>,
33}
34
35impl<'a> Executor<'a> {
36 #[doc(hidden)]
38 pub fn spawn<T: Send + 'static>(
39 &self,
40 future: impl Future<Output = T> + Send + 'static,
41 #[allow(unused)] name: &str,
42 ) -> Task<T> {
43 #[cfg(not(feature = "tokio"))]
44 {
45 Task(Some(self.executor.spawn(future)))
46 }
47
48 #[cfg(feature = "tokio")]
49 {
50 #[cfg(tokio_unstable)]
51 {
52 Task(Some(
53 tokio::task::Builder::new()
54 .name(name)
55 .spawn(future)
56 .unwrap(),
58 ))
59 }
60 #[cfg(not(tokio_unstable))]
61 {
62 Task(Some(tokio::task::spawn(future)))
63 }
64 }
65 }
66
67 pub fn is_empty(&self) -> bool {
71 #[cfg(not(feature = "tokio"))]
72 {
73 self.executor.is_empty()
74 }
75
76 #[cfg(feature = "tokio")]
77 true
78 }
79
80 pub async fn tick(&self) {
84 #[cfg(not(feature = "tokio"))]
85 {
86 self.executor.tick().await
87 }
88
89 #[cfg(feature = "tokio")]
90 {
91 pending().await
92 }
93 }
94
95 pub(crate) fn new() -> Self {
97 #[cfg(not(feature = "tokio"))]
98 {
99 Self {
100 executor: Arc::new(AsyncExecutor::new()),
101 }
102 }
103
104 #[cfg(feature = "tokio")]
105 {
106 Self {
107 phantom: PhantomData,
108 }
109 }
110 }
111
112 pub(crate) async fn run<T>(&self, future: impl Future<Output = T>) -> T {
116 #[cfg(not(feature = "tokio"))]
117 {
118 self.executor.run(future).await
119 }
120 #[cfg(feature = "tokio")]
121 {
122 future.await
123 }
124 }
125}
126
127#[cfg(not(feature = "tokio"))]
135#[doc(hidden)]
136#[derive(Debug)]
137pub struct Task<T>(Option<AsyncTask<T>>);
138#[cfg(feature = "tokio")]
139#[doc(hidden)]
140#[derive(Debug)]
141pub struct Task<T>(Option<JoinHandle<T>>);
142
143impl<T> Task<T> {
144 #[allow(unused_mut)]
146 #[allow(unused)]
147 pub fn detach(mut self) {
148 #[cfg(not(feature = "tokio"))]
149 {
150 self.0.take().expect("async_task::Task is none").detach()
151 }
152
153 #[cfg(feature = "tokio")]
154 {
155 self.0.take().expect("tokio::task::JoinHandle is none");
156 }
157 }
158}
159
160impl<T> Task<T>
161where
162 T: Send + 'static,
163{
164 #[allow(unused)]
166 pub(crate) fn spawn_blocking<F>(f: F, #[allow(unused)] name: &str) -> Self
167 where
168 F: FnOnce() -> T + Send + 'static,
169 {
170 #[cfg(not(feature = "tokio"))]
171 {
172 Self(Some(blocking::unblock(f)))
173 }
174
175 #[cfg(feature = "tokio")]
176 {
177 #[cfg(tokio_unstable)]
178 {
179 Self(Some(
180 tokio::task::Builder::new()
181 .name(name)
182 .spawn_blocking(f)
183 .unwrap(),
185 ))
186 }
187 #[cfg(not(tokio_unstable))]
188 {
189 Self(Some(tokio::task::spawn_blocking(f)))
190 }
191 }
192 }
193}
194
195impl<T> Drop for Task<T> {
196 fn drop(&mut self) {
197 #[cfg(feature = "tokio")]
198 {
199 if let Some(join_handle) = self.0.take() {
200 join_handle.abort();
201 }
202 }
203 }
204}
205
206impl<T> Future for Task<T> {
207 type Output = T;
208
209 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
210 #[cfg(not(feature = "tokio"))]
211 {
212 Pin::new(&mut self.get_mut().0.as_mut().expect("async_task::Task is none")).poll(cx)
213 }
214
215 #[cfg(feature = "tokio")]
216 {
217 Pin::new(
218 &mut self
219 .get_mut()
220 .0
221 .as_mut()
222 .expect("tokio::task::JoinHandle is none"),
223 )
224 .poll(cx)
225 .map(|r| r.expect("tokio::task::JoinHandle error"))
226 }
227 }
228}