rodio/
queue.rs

1//! Queue that plays sounds one after the other.
2
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::{Arc, Mutex};
5use std::time::Duration;
6
7use crate::source::{Empty, SeekError, Source, Zero};
8use crate::Sample;
9
10#[cfg(feature = "crossbeam-channel")]
11use crossbeam_channel::{unbounded as channel, Receiver, Sender};
12#[cfg(not(feature = "crossbeam-channel"))]
13use std::sync::mpsc::{channel, Receiver, Sender};
14
15/// Builds a new queue. It consists of an input and an output.
16///
17/// The input can be used to add sounds to the end of the queue, while the output implements
18/// `Source` and plays the sounds.
19///
20/// The parameter indicates how the queue should behave if the queue becomes empty:
21///
22/// - If you pass `true`, then the queue is infinite and will play a silence instead until you add
23///   a new sound.
24/// - If you pass `false`, then the queue will report that it has finished playing.
25///
26pub fn queue<S>(keep_alive_if_empty: bool) -> (Arc<SourcesQueueInput<S>>, SourcesQueueOutput<S>)
27where
28    S: Sample + Send + 'static,
29{
30    let input = Arc::new(SourcesQueueInput {
31        next_sounds: Mutex::new(Vec::new()),
32        keep_alive_if_empty: AtomicBool::new(keep_alive_if_empty),
33    });
34
35    let output = SourcesQueueOutput {
36        current: Box::new(Empty::<S>::new()) as Box<_>,
37        signal_after_end: None,
38        input: input.clone(),
39    };
40
41    (input, output)
42}
43
44// TODO: consider reimplementing this with `from_factory`
45
46type Sound<S> = Box<dyn Source<Item = S> + Send>;
47type SignalDone = Option<Sender<()>>;
48
49/// The input of the queue.
50pub struct SourcesQueueInput<S> {
51    next_sounds: Mutex<Vec<(Sound<S>, SignalDone)>>,
52
53    // See constructor.
54    keep_alive_if_empty: AtomicBool,
55}
56
57impl<S> SourcesQueueInput<S>
58where
59    S: Sample + Send + 'static,
60{
61    /// Adds a new source to the end of the queue.
62    #[inline]
63    pub fn append<T>(&self, source: T)
64    where
65        T: Source<Item = S> + Send + 'static,
66    {
67        self.next_sounds
68            .lock()
69            .unwrap()
70            .push((Box::new(source) as Box<_>, None));
71    }
72
73    /// Adds a new source to the end of the queue.
74    ///
75    /// The `Receiver` will be signalled when the sound has finished playing.
76    ///
77    /// Enable the feature flag `crossbeam-channel` in rodio to use a `crossbeam_channel::Receiver` instead.
78    #[inline]
79    pub fn append_with_signal<T>(&self, source: T) -> Receiver<()>
80    where
81        T: Source<Item = S> + Send + 'static,
82    {
83        let (tx, rx) = channel();
84        self.next_sounds
85            .lock()
86            .unwrap()
87            .push((Box::new(source) as Box<_>, Some(tx)));
88        rx
89    }
90
91    /// Sets whether the queue stays alive if there's no more sound to play.
92    ///
93    /// See also the constructor.
94    pub fn set_keep_alive_if_empty(&self, keep_alive_if_empty: bool) {
95        self.keep_alive_if_empty
96            .store(keep_alive_if_empty, Ordering::Release);
97    }
98
99    /// Removes all the sounds from the queue. Returns the number of sounds cleared.
100    pub fn clear(&self) -> usize {
101        let mut sounds = self.next_sounds.lock().unwrap();
102        let len = sounds.len();
103        sounds.clear();
104        len
105    }
106}
107/// The output of the queue. Implements `Source`.
108pub struct SourcesQueueOutput<S> {
109    // The current iterator that produces samples.
110    current: Box<dyn Source<Item = S> + Send>,
111
112    // Signal this sender before picking from `next`.
113    signal_after_end: Option<Sender<()>>,
114
115    // The next sounds.
116    input: Arc<SourcesQueueInput<S>>,
117}
118
119const THRESHOLD: usize = 512;
120impl<S> Source for SourcesQueueOutput<S>
121where
122    S: Sample + Send + 'static,
123{
124    #[inline]
125    fn current_frame_len(&self) -> Option<usize> {
126        // This function is non-trivial because the boundary between two sounds in the queue should
127        // be a frame boundary as well.
128        //
129        // The current sound is free to return `None` for `current_frame_len()`, in which case
130        // we *should* return the number of samples remaining the current sound.
131        // This can be estimated with `size_hint()`.
132        //
133        // If the `size_hint` is `None` as well, we are in the worst case scenario. To handle this
134        // situation we force a frame to have a maximum number of samples indicate by this
135        // constant.
136
137        // Try the current `current_frame_len`.
138        if let Some(val) = self.current.current_frame_len() {
139            if val != 0 {
140                return Some(val);
141            } else if self.input.keep_alive_if_empty.load(Ordering::Acquire)
142                && self.input.next_sounds.lock().unwrap().is_empty()
143            {
144                // The next source will be a filler silence which will have the length of `THRESHOLD`
145                return Some(THRESHOLD);
146            }
147        }
148
149        // Try the size hint.
150        let (lower_bound, _) = self.current.size_hint();
151        // The iterator default implementation just returns 0.
152        // That's a problematic value, so skip it.
153        if lower_bound > 0 {
154            return Some(lower_bound);
155        }
156
157        // Otherwise we use the constant value.
158        Some(THRESHOLD)
159    }
160
161    #[inline]
162    fn channels(&self) -> u16 {
163        self.current.channels()
164    }
165
166    #[inline]
167    fn sample_rate(&self) -> u32 {
168        self.current.sample_rate()
169    }
170
171    #[inline]
172    fn total_duration(&self) -> Option<Duration> {
173        None
174    }
175
176    /// Only seeks within the current source.
177    // We can not go back to previous sources. We could implement seek such
178    // that it advances the queue if the position is beyond the current song.
179    //
180    // We would then however need to enable seeking backwards across sources too.
181    // That no longer seems in line with the queue behaviour.
182    //
183    // A final pain point is that we would need the total duration for the
184    // next few songs.
185    #[inline]
186    fn try_seek(&mut self, pos: Duration) -> Result<(), SeekError> {
187        self.current.try_seek(pos)
188    }
189}
190
191impl<S> Iterator for SourcesQueueOutput<S>
192where
193    S: Sample + Send + 'static,
194{
195    type Item = S;
196
197    #[inline]
198    fn next(&mut self) -> Option<S> {
199        loop {
200            // Basic situation that will happen most of the time.
201            if let Some(sample) = self.current.next() {
202                return Some(sample);
203            }
204
205            // Since `self.current` has finished, we need to pick the next sound.
206            // In order to avoid inlining this expensive operation, the code is in another function.
207            if self.go_next().is_err() {
208                return None;
209            }
210        }
211    }
212
213    #[inline]
214    fn size_hint(&self) -> (usize, Option<usize>) {
215        (self.current.size_hint().0, None)
216    }
217}
218
219impl<S> SourcesQueueOutput<S>
220where
221    S: Sample + Send + 'static,
222{
223    // Called when `current` is empty and we must jump to the next element.
224    // Returns `Ok` if the sound should continue playing, or an error if it should stop.
225    //
226    // This method is separate so that it is not inlined.
227    fn go_next(&mut self) -> Result<(), ()> {
228        if let Some(signal_after_end) = self.signal_after_end.take() {
229            let _ = signal_after_end.send(());
230        }
231
232        let (next, signal_after_end) = {
233            let mut next = self.input.next_sounds.lock().unwrap();
234
235            if next.len() == 0 {
236                let silence = Box::new(Zero::<S>::new_samples(1, 44100, THRESHOLD)) as Box<_>;
237                if self.input.keep_alive_if_empty.load(Ordering::Acquire) {
238                    // Play a short silence in order to avoid spinlocking.
239                    (silence, None)
240                } else {
241                    return Err(());
242                }
243            } else {
244                next.remove(0)
245            }
246        };
247
248        self.current = next;
249        self.signal_after_end = signal_after_end;
250        Ok(())
251    }
252}
253
254#[cfg(test)]
255mod tests {
256    use crate::buffer::SamplesBuffer;
257    use crate::queue;
258    use crate::source::Source;
259
260    #[test]
261    #[ignore] // FIXME: samples rate and channel not updated immediately after transition
262    fn basic() {
263        let (tx, mut rx) = queue::queue(false);
264
265        tx.append(SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10]));
266        tx.append(SamplesBuffer::new(2, 96000, vec![5i16, 5, 5, 5]));
267
268        assert_eq!(rx.channels(), 1);
269        assert_eq!(rx.sample_rate(), 48000);
270        assert_eq!(rx.next(), Some(10));
271        assert_eq!(rx.next(), Some(-10));
272        assert_eq!(rx.next(), Some(10));
273        assert_eq!(rx.next(), Some(-10));
274        assert_eq!(rx.channels(), 2);
275        assert_eq!(rx.sample_rate(), 96000);
276        assert_eq!(rx.next(), Some(5));
277        assert_eq!(rx.next(), Some(5));
278        assert_eq!(rx.next(), Some(5));
279        assert_eq!(rx.next(), Some(5));
280        assert_eq!(rx.next(), None);
281    }
282
283    #[test]
284    fn immediate_end() {
285        let (_, mut rx) = queue::queue::<i16>(false);
286        assert_eq!(rx.next(), None);
287    }
288
289    #[test]
290    fn keep_alive() {
291        let (tx, mut rx) = queue::queue(true);
292        tx.append(SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10]));
293
294        assert_eq!(rx.next(), Some(10));
295        assert_eq!(rx.next(), Some(-10));
296        assert_eq!(rx.next(), Some(10));
297        assert_eq!(rx.next(), Some(-10));
298
299        for _ in 0..100000 {
300            assert_eq!(rx.next(), Some(0));
301        }
302    }
303
304    #[test]
305    #[ignore] // TODO: not yet implemented
306    fn no_delay_when_added() {
307        let (tx, mut rx) = queue::queue(true);
308
309        for _ in 0..500 {
310            assert_eq!(rx.next(), Some(0));
311        }
312
313        tx.append(SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10]));
314        assert_eq!(rx.next(), Some(10));
315        assert_eq!(rx.next(), Some(-10));
316        assert_eq!(rx.next(), Some(10));
317        assert_eq!(rx.next(), Some(-10));
318    }
319}