1use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::{Arc, Mutex};
5use std::time::Duration;
6
7use crate::source::{Empty, SeekError, Source, Zero};
8use crate::Sample;
9
10#[cfg(feature = "crossbeam-channel")]
11use crossbeam_channel::{unbounded as channel, Receiver, Sender};
12#[cfg(not(feature = "crossbeam-channel"))]
13use std::sync::mpsc::{channel, Receiver, Sender};
14
15pub fn queue<S>(keep_alive_if_empty: bool) -> (Arc<SourcesQueueInput<S>>, SourcesQueueOutput<S>)
27where
28 S: Sample + Send + 'static,
29{
30 let input = Arc::new(SourcesQueueInput {
31 next_sounds: Mutex::new(Vec::new()),
32 keep_alive_if_empty: AtomicBool::new(keep_alive_if_empty),
33 });
34
35 let output = SourcesQueueOutput {
36 current: Box::new(Empty::<S>::new()) as Box<_>,
37 signal_after_end: None,
38 input: input.clone(),
39 };
40
41 (input, output)
42}
43
44type Sound<S> = Box<dyn Source<Item = S> + Send>;
47type SignalDone = Option<Sender<()>>;
48
49pub struct SourcesQueueInput<S> {
51 next_sounds: Mutex<Vec<(Sound<S>, SignalDone)>>,
52
53 keep_alive_if_empty: AtomicBool,
55}
56
57impl<S> SourcesQueueInput<S>
58where
59 S: Sample + Send + 'static,
60{
61 #[inline]
63 pub fn append<T>(&self, source: T)
64 where
65 T: Source<Item = S> + Send + 'static,
66 {
67 self.next_sounds
68 .lock()
69 .unwrap()
70 .push((Box::new(source) as Box<_>, None));
71 }
72
73 #[inline]
79 pub fn append_with_signal<T>(&self, source: T) -> Receiver<()>
80 where
81 T: Source<Item = S> + Send + 'static,
82 {
83 let (tx, rx) = channel();
84 self.next_sounds
85 .lock()
86 .unwrap()
87 .push((Box::new(source) as Box<_>, Some(tx)));
88 rx
89 }
90
91 pub fn set_keep_alive_if_empty(&self, keep_alive_if_empty: bool) {
95 self.keep_alive_if_empty
96 .store(keep_alive_if_empty, Ordering::Release);
97 }
98
99 pub fn clear(&self) -> usize {
101 let mut sounds = self.next_sounds.lock().unwrap();
102 let len = sounds.len();
103 sounds.clear();
104 len
105 }
106}
107pub struct SourcesQueueOutput<S> {
109 current: Box<dyn Source<Item = S> + Send>,
111
112 signal_after_end: Option<Sender<()>>,
114
115 input: Arc<SourcesQueueInput<S>>,
117}
118
119const THRESHOLD: usize = 512;
120impl<S> Source for SourcesQueueOutput<S>
121where
122 S: Sample + Send + 'static,
123{
124 #[inline]
125 fn current_frame_len(&self) -> Option<usize> {
126 if let Some(val) = self.current.current_frame_len() {
139 if val != 0 {
140 return Some(val);
141 } else if self.input.keep_alive_if_empty.load(Ordering::Acquire)
142 && self.input.next_sounds.lock().unwrap().is_empty()
143 {
144 return Some(THRESHOLD);
146 }
147 }
148
149 let (lower_bound, _) = self.current.size_hint();
151 if lower_bound > 0 {
154 return Some(lower_bound);
155 }
156
157 Some(THRESHOLD)
159 }
160
161 #[inline]
162 fn channels(&self) -> u16 {
163 self.current.channels()
164 }
165
166 #[inline]
167 fn sample_rate(&self) -> u32 {
168 self.current.sample_rate()
169 }
170
171 #[inline]
172 fn total_duration(&self) -> Option<Duration> {
173 None
174 }
175
176 #[inline]
186 fn try_seek(&mut self, pos: Duration) -> Result<(), SeekError> {
187 self.current.try_seek(pos)
188 }
189}
190
191impl<S> Iterator for SourcesQueueOutput<S>
192where
193 S: Sample + Send + 'static,
194{
195 type Item = S;
196
197 #[inline]
198 fn next(&mut self) -> Option<S> {
199 loop {
200 if let Some(sample) = self.current.next() {
202 return Some(sample);
203 }
204
205 if self.go_next().is_err() {
208 return None;
209 }
210 }
211 }
212
213 #[inline]
214 fn size_hint(&self) -> (usize, Option<usize>) {
215 (self.current.size_hint().0, None)
216 }
217}
218
219impl<S> SourcesQueueOutput<S>
220where
221 S: Sample + Send + 'static,
222{
223 fn go_next(&mut self) -> Result<(), ()> {
228 if let Some(signal_after_end) = self.signal_after_end.take() {
229 let _ = signal_after_end.send(());
230 }
231
232 let (next, signal_after_end) = {
233 let mut next = self.input.next_sounds.lock().unwrap();
234
235 if next.len() == 0 {
236 let silence = Box::new(Zero::<S>::new_samples(1, 44100, THRESHOLD)) as Box<_>;
237 if self.input.keep_alive_if_empty.load(Ordering::Acquire) {
238 (silence, None)
240 } else {
241 return Err(());
242 }
243 } else {
244 next.remove(0)
245 }
246 };
247
248 self.current = next;
249 self.signal_after_end = signal_after_end;
250 Ok(())
251 }
252}
253
254#[cfg(test)]
255mod tests {
256 use crate::buffer::SamplesBuffer;
257 use crate::queue;
258 use crate::source::Source;
259
260 #[test]
261 #[ignore] fn basic() {
263 let (tx, mut rx) = queue::queue(false);
264
265 tx.append(SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10]));
266 tx.append(SamplesBuffer::new(2, 96000, vec![5i16, 5, 5, 5]));
267
268 assert_eq!(rx.channels(), 1);
269 assert_eq!(rx.sample_rate(), 48000);
270 assert_eq!(rx.next(), Some(10));
271 assert_eq!(rx.next(), Some(-10));
272 assert_eq!(rx.next(), Some(10));
273 assert_eq!(rx.next(), Some(-10));
274 assert_eq!(rx.channels(), 2);
275 assert_eq!(rx.sample_rate(), 96000);
276 assert_eq!(rx.next(), Some(5));
277 assert_eq!(rx.next(), Some(5));
278 assert_eq!(rx.next(), Some(5));
279 assert_eq!(rx.next(), Some(5));
280 assert_eq!(rx.next(), None);
281 }
282
283 #[test]
284 fn immediate_end() {
285 let (_, mut rx) = queue::queue::<i16>(false);
286 assert_eq!(rx.next(), None);
287 }
288
289 #[test]
290 fn keep_alive() {
291 let (tx, mut rx) = queue::queue(true);
292 tx.append(SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10]));
293
294 assert_eq!(rx.next(), Some(10));
295 assert_eq!(rx.next(), Some(-10));
296 assert_eq!(rx.next(), Some(10));
297 assert_eq!(rx.next(), Some(-10));
298
299 for _ in 0..100000 {
300 assert_eq!(rx.next(), Some(0));
301 }
302 }
303
304 #[test]
305 #[ignore] fn no_delay_when_added() {
307 let (tx, mut rx) = queue::queue(true);
308
309 for _ in 0..500 {
310 assert_eq!(rx.next(), Some(0));
311 }
312
313 tx.append(SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10]));
314 assert_eq!(rx.next(), Some(10));
315 assert_eq!(rx.next(), Some(-10));
316 assert_eq!(rx.next(), Some(10));
317 assert_eq!(rx.next(), Some(-10));
318 }
319}