rodio/
sink.rs

1use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5#[cfg(feature = "crossbeam-channel")]
6use crossbeam_channel::{Receiver, Sender};
7#[cfg(not(feature = "crossbeam-channel"))]
8use std::sync::mpsc::{Receiver, Sender};
9
10use crate::source::SeekError;
11use crate::stream::{OutputStreamHandle, PlayError};
12use crate::{queue, source::Done, Sample, Source};
13use cpal::FromSample;
14
15/// Handle to a device that outputs sounds.
16///
17/// Dropping the `Sink` stops all sounds. You can use `detach` if you want the sounds to continue
18/// playing.
19pub struct Sink {
20    queue_tx: Arc<queue::SourcesQueueInput<f32>>,
21    sleep_until_end: Mutex<Option<Receiver<()>>>,
22
23    controls: Arc<Controls>,
24    sound_count: Arc<AtomicUsize>,
25
26    detached: bool,
27}
28
29struct SeekOrder {
30    pos: Duration,
31    feedback: Sender<Result<(), SeekError>>,
32}
33
34impl SeekOrder {
35    fn new(pos: Duration) -> (Self, Receiver<Result<(), SeekError>>) {
36        #[cfg(not(feature = "crossbeam-channel"))]
37        let (tx, rx) = {
38            use std::sync::mpsc;
39            mpsc::channel()
40        };
41
42        #[cfg(feature = "crossbeam-channel")]
43        let (tx, rx) = {
44            use crossbeam_channel::bounded;
45            bounded(1)
46        };
47        (Self { pos, feedback: tx }, rx)
48    }
49
50    fn attempt<S>(self, maybe_seekable: &mut S)
51    where
52        S: Source,
53        S::Item: Sample + Send,
54    {
55        let res = maybe_seekable.try_seek(self.pos);
56        let _ignore_receiver_dropped = self.feedback.send(res);
57    }
58}
59
60struct Controls {
61    pause: AtomicBool,
62    volume: Mutex<f32>,
63    stopped: AtomicBool,
64    speed: Mutex<f32>,
65    to_clear: Mutex<u32>,
66    seek: Mutex<Option<SeekOrder>>,
67    position: Mutex<Duration>,
68}
69
70impl Sink {
71    /// Builds a new `Sink`, beginning playback on a stream.
72    #[inline]
73    pub fn try_new(stream: &OutputStreamHandle) -> Result<Sink, PlayError> {
74        let (sink, queue_rx) = Sink::new_idle();
75        stream.play_raw(queue_rx)?;
76        Ok(sink)
77    }
78
79    /// Builds a new `Sink`.
80    #[inline]
81    pub fn new_idle() -> (Sink, queue::SourcesQueueOutput<f32>) {
82        let (queue_tx, queue_rx) = queue::queue(true);
83
84        let sink = Sink {
85            queue_tx,
86            sleep_until_end: Mutex::new(None),
87            controls: Arc::new(Controls {
88                pause: AtomicBool::new(false),
89                volume: Mutex::new(1.0),
90                stopped: AtomicBool::new(false),
91                speed: Mutex::new(1.0),
92                to_clear: Mutex::new(0),
93                seek: Mutex::new(None),
94                position: Mutex::new(Duration::ZERO),
95            }),
96            sound_count: Arc::new(AtomicUsize::new(0)),
97            detached: false,
98        };
99        (sink, queue_rx)
100    }
101
102    /// Appends a sound to the queue of sounds to play.
103    #[inline]
104    pub fn append<S>(&self, source: S)
105    where
106        S: Source + Send + 'static,
107        f32: FromSample<S::Item>,
108        S::Item: Sample + Send,
109    {
110        // Wait for queue to flush then resume stopped playback
111        if self.controls.stopped.load(Ordering::SeqCst) {
112            if self.sound_count.load(Ordering::SeqCst) > 0 {
113                self.sleep_until_end();
114            }
115            self.controls.stopped.store(false, Ordering::SeqCst);
116        }
117
118        let controls = self.controls.clone();
119
120        let start_played = AtomicBool::new(false);
121
122        let source = source
123            .speed(1.0)
124            // must be placed before pausable but after speed & delay
125            .track_position()
126            .pausable(false)
127            .amplify(1.0)
128            .skippable()
129            .stoppable()
130            // if you change the duration update the docs for try_seek!
131            .periodic_access(Duration::from_millis(5), move |src| {
132                if controls.stopped.load(Ordering::SeqCst) {
133                    src.stop();
134                    *controls.position.lock().unwrap() = Duration::ZERO;
135                }
136                {
137                    let mut to_clear = controls.to_clear.lock().unwrap();
138                    if *to_clear > 0 {
139                        src.inner_mut().skip();
140                        *to_clear -= 1;
141                        *controls.position.lock().unwrap() = Duration::ZERO;
142                    } else {
143                        *controls.position.lock().unwrap() = src.inner().inner().inner().inner().get_pos();
144                    }
145                }
146                let amp = src.inner_mut().inner_mut();
147                amp.set_factor(*controls.volume.lock().unwrap());
148                amp.inner_mut()
149                    .set_paused(controls.pause.load(Ordering::SeqCst));
150                amp.inner_mut()
151                    .inner_mut()
152                    .inner_mut()
153                    .set_factor(*controls.speed.lock().unwrap());
154                if let Some(seek) = controls.seek.lock().unwrap().take() {
155                    seek.attempt(amp)
156                }
157                start_played.store(true, Ordering::SeqCst);
158            })
159            .convert_samples();
160        self.sound_count.fetch_add(1, Ordering::Relaxed);
161        let source = Done::new(source, self.sound_count.clone());
162        *self.sleep_until_end.lock().unwrap() = Some(self.queue_tx.append_with_signal(source));
163    }
164
165    /// Gets the volume of the sound.
166    ///
167    /// The value `1.0` is the "normal" volume (unfiltered input). Any value other than 1.0 will
168    /// multiply each sample by this value.
169    #[inline]
170    pub fn volume(&self) -> f32 {
171        *self.controls.volume.lock().unwrap()
172    }
173
174    /// Changes the volume of the sound.
175    ///
176    /// The value `1.0` is the "normal" volume (unfiltered input). Any value other than `1.0` will
177    /// multiply each sample by this value.
178    #[inline]
179    pub fn set_volume(&self, value: f32) {
180        *self.controls.volume.lock().unwrap() = value;
181    }
182
183    /// Changes the play speed of the sound. Does not adjust the samples, only the playback speed.
184    ///
185    /// # Note:
186    /// 1. **Increasing the speed will increase the pitch by the same factor**
187    /// - If you set the speed to 0.5 this will halve the frequency of the sound
188    ///   lowering its pitch.
189    /// - If you set the speed to 2 the frequency will double raising the
190    ///   pitch of the sound.
191    /// 2. **Change in the speed affect the total duration inversely**
192    /// - If you set the speed to 0.5, the total duration will be twice as long.
193    /// - If you set the speed to 2 the total duration will be halve of what it
194    ///   was.
195    ///
196    /// See [`Speed`] for details
197    #[inline]
198    pub fn speed(&self) -> f32 {
199        *self.controls.speed.lock().unwrap()
200    }
201
202    /// Changes the speed of the sound.
203    ///
204    /// The value `1.0` is the "normal" speed (unfiltered input). Any value other than `1.0` will
205    /// change the play speed of the sound.
206    ///
207    /// #### Note:
208    /// 1. **Increasing the speed would also increase the pitch by the same factor**
209    /// - If you increased set the speed to 0.5, the frequency would be slower (0.5x the original frequency) .
210    /// - Also if you set the speed to 1.5 the frequency would be faster ( 1.5x the original frequency).
211    /// 2. **Change in the speed would affect your total duration inversely**
212    /// - if you set the speed by 0.5, your total duration would be (2x the original total duration) longer.
213    /// - Also if you set the speed to 2 the total duration would be (0.5 the original total_duration) shorter
214    #[inline]
215    pub fn set_speed(&self, value: f32) {
216        *self.controls.speed.lock().unwrap() = value;
217    }
218
219    /// Resumes playback of a paused sink.
220    ///
221    /// No effect if not paused.
222    #[inline]
223    pub fn play(&self) {
224        self.controls.pause.store(false, Ordering::SeqCst);
225    }
226
227    // There is no `can_seek()` method as it is impossible to use correctly. Between
228    // checking if a source supports seeking and actually seeking the sink can
229    // switch to a new source.
230
231    /// Attempts to seek to a given position in the current source.
232    ///
233    /// This blocks between 0 and ~5 milliseconds.
234    ///
235    /// As long as the duration of the source is known, seek is guaranteed to saturate
236    /// at the end of the source. For example given a source that reports a total duration
237    /// of 42 seconds calling `try_seek()` with 60 seconds as argument will seek to
238    /// 42 seconds.
239    ///
240    /// # Errors
241    /// This function will return [`SeekError::NotSupported`] if one of the underlying
242    /// sources does not support seeking.
243    ///
244    /// It will return an error if an implementation ran
245    /// into one during the seek.
246    ///
247    /// When seeking beyond the end of a source this
248    /// function might return an error if the duration of the source is not known.
249    pub fn try_seek(&self, pos: Duration) -> Result<(), SeekError> {
250        let (order, feedback) = SeekOrder::new(pos);
251        *self.controls.seek.lock().unwrap() = Some(order);
252
253        if self.sound_count.load(Ordering::Acquire) == 0 {
254            // No sound is playing, seek will not be performed
255            return Ok(());
256        }
257
258        match feedback.recv() {
259            Ok(seek_res) => {
260                *self.controls.position.lock().unwrap() = pos;
261                seek_res
262            }
263            // The feedback channel closed. Probably another SeekOrder was set
264            // invalidating this one and closing the feedback channel
265            // ... or the audio thread panicked.
266            Err(_) => Ok(()),
267        }
268    }
269
270    /// Pauses playback of this sink.
271    ///
272    /// No effect if already paused.
273    ///
274    /// A paused sink can be resumed with `play()`.
275    pub fn pause(&self) {
276        self.controls.pause.store(true, Ordering::SeqCst);
277    }
278
279    /// Gets if a sink is paused
280    ///
281    /// Sinks can be paused and resumed using `pause()` and `play()`. This returns `true` if the
282    /// sink is paused.
283    pub fn is_paused(&self) -> bool {
284        self.controls.pause.load(Ordering::SeqCst)
285    }
286
287    /// Removes all currently loaded `Source`s from the `Sink`, and pauses it.
288    ///
289    /// See `pause()` for information about pausing a `Sink`.
290    pub fn clear(&self) {
291        let len = self.sound_count.load(Ordering::SeqCst) as u32;
292        *self.controls.to_clear.lock().unwrap() = len;
293        self.sleep_until_end();
294        self.pause();
295    }
296
297    /// Skips to the next `Source` in the `Sink`
298    ///
299    /// If there are more `Source`s appended to the `Sink` at the time,
300    /// it will play the next one. Otherwise, the `Sink` will finish as if
301    /// it had finished playing a `Source` all the way through.
302    pub fn skip_one(&self) {
303        let len = self.sound_count.load(Ordering::SeqCst) as u32;
304        let mut to_clear = self.controls.to_clear.lock().unwrap();
305        if len > *to_clear {
306            *to_clear += 1;
307        }
308    }
309
310    /// Stops the sink by emptying the queue.
311    #[inline]
312    pub fn stop(&self) {
313        self.controls.stopped.store(true, Ordering::SeqCst);
314    }
315
316    /// Destroys the sink without stopping the sounds that are still playing.
317    #[inline]
318    pub fn detach(mut self) {
319        self.detached = true;
320    }
321
322    /// Sleeps the current thread until the sound ends.
323    #[inline]
324    pub fn sleep_until_end(&self) {
325        if let Some(sleep_until_end) = self.sleep_until_end.lock().unwrap().take() {
326            let _ = sleep_until_end.recv();
327        }
328    }
329
330    /// Returns true if this sink has no more sounds to play.
331    #[inline]
332    pub fn empty(&self) -> bool {
333        self.len() == 0
334    }
335
336    /// Returns the number of sounds currently in the queue.
337    #[allow(clippy::len_without_is_empty)]
338    #[inline]
339    pub fn len(&self) -> usize {
340        self.sound_count.load(Ordering::Relaxed)
341    }
342
343    /// Returns the position of the sound that's being played.
344    ///
345    /// This takes into account any speedup or delay applied.
346    ///
347    /// Example: if you apply a speedup of *2* to an mp3 decoder source and
348    /// [`get_pos()`](Sink::get_pos) returns *5s* then the position in the mp3
349    /// recording is *10s* from its start.
350    #[inline]
351    pub fn get_pos(&self) -> Duration {
352        *self.controls.position.lock().unwrap()
353    }
354}
355
356impl Drop for Sink {
357    #[inline]
358    fn drop(&mut self) {
359        self.queue_tx.set_keep_alive_if_empty(false);
360
361        if !self.detached {
362            self.controls.stopped.store(true, Ordering::Relaxed);
363        }
364    }
365}
366
367#[cfg(test)]
368mod tests {
369    use crate::buffer::SamplesBuffer;
370    use crate::{Sink, Source};
371    use std::sync::atomic::Ordering;
372
373    #[test]
374    fn test_pause_and_stop() {
375        let (sink, mut queue_rx) = Sink::new_idle();
376
377        // assert_eq!(queue_rx.next(), Some(0.0));
378
379        let v = vec![10i16, -10, 20, -20, 30, -30];
380
381        // Low rate to ensure immediate control.
382        sink.append(SamplesBuffer::new(1, 1, v.clone()));
383        let mut src = SamplesBuffer::new(1, 1, v).convert_samples();
384
385        assert_eq!(queue_rx.next(), src.next());
386        assert_eq!(queue_rx.next(), src.next());
387
388        sink.pause();
389
390        assert_eq!(queue_rx.next(), Some(0.0));
391
392        sink.play();
393
394        assert_eq!(queue_rx.next(), src.next());
395        assert_eq!(queue_rx.next(), src.next());
396
397        sink.stop();
398
399        assert_eq!(queue_rx.next(), Some(0.0));
400
401        assert_eq!(sink.empty(), true);
402    }
403
404    #[test]
405    fn test_stop_and_start() {
406        let (sink, mut queue_rx) = Sink::new_idle();
407
408        let v = vec![10i16, -10, 20, -20, 30, -30];
409
410        sink.append(SamplesBuffer::new(1, 1, v.clone()));
411        let mut src = SamplesBuffer::new(1, 1, v.clone()).convert_samples();
412
413        assert_eq!(queue_rx.next(), src.next());
414        assert_eq!(queue_rx.next(), src.next());
415
416        sink.stop();
417
418        assert!(sink.controls.stopped.load(Ordering::SeqCst));
419        assert_eq!(queue_rx.next(), Some(0.0));
420
421        src = SamplesBuffer::new(1, 1, v.clone()).convert_samples();
422        sink.append(SamplesBuffer::new(1, 1, v));
423
424        assert!(!sink.controls.stopped.load(Ordering::SeqCst));
425        // Flush silence
426        let mut queue_rx = queue_rx.skip_while(|v| *v == 0.0);
427
428        assert_eq!(queue_rx.next(), src.next());
429        assert_eq!(queue_rx.next(), src.next());
430    }
431
432    #[test]
433    fn test_volume() {
434        let (sink, mut queue_rx) = Sink::new_idle();
435
436        let v = vec![10i16, -10, 20, -20, 30, -30];
437
438        // High rate to avoid immediate control.
439        sink.append(SamplesBuffer::new(2, 44100, v.clone()));
440        let src = SamplesBuffer::new(2, 44100, v.clone()).convert_samples();
441
442        let mut src = src.amplify(0.5);
443        sink.set_volume(0.5);
444
445        for _ in 0..v.len() {
446            assert_eq!(queue_rx.next(), src.next());
447        }
448    }
449}