1use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5#[cfg(feature = "crossbeam-channel")]
6use crossbeam_channel::{Receiver, Sender};
7#[cfg(not(feature = "crossbeam-channel"))]
8use std::sync::mpsc::{Receiver, Sender};
9
10use crate::source::SeekError;
11use crate::stream::{OutputStreamHandle, PlayError};
12use crate::{queue, source::Done, Sample, Source};
13use cpal::FromSample;
14
15pub struct Sink {
20 queue_tx: Arc<queue::SourcesQueueInput<f32>>,
21 sleep_until_end: Mutex<Option<Receiver<()>>>,
22
23 controls: Arc<Controls>,
24 sound_count: Arc<AtomicUsize>,
25
26 detached: bool,
27}
28
29struct SeekOrder {
30 pos: Duration,
31 feedback: Sender<Result<(), SeekError>>,
32}
33
34impl SeekOrder {
35 fn new(pos: Duration) -> (Self, Receiver<Result<(), SeekError>>) {
36 #[cfg(not(feature = "crossbeam-channel"))]
37 let (tx, rx) = {
38 use std::sync::mpsc;
39 mpsc::channel()
40 };
41
42 #[cfg(feature = "crossbeam-channel")]
43 let (tx, rx) = {
44 use crossbeam_channel::bounded;
45 bounded(1)
46 };
47 (Self { pos, feedback: tx }, rx)
48 }
49
50 fn attempt<S>(self, maybe_seekable: &mut S)
51 where
52 S: Source,
53 S::Item: Sample + Send,
54 {
55 let res = maybe_seekable.try_seek(self.pos);
56 let _ignore_receiver_dropped = self.feedback.send(res);
57 }
58}
59
60struct Controls {
61 pause: AtomicBool,
62 volume: Mutex<f32>,
63 stopped: AtomicBool,
64 speed: Mutex<f32>,
65 to_clear: Mutex<u32>,
66 seek: Mutex<Option<SeekOrder>>,
67 position: Mutex<Duration>,
68}
69
70impl Sink {
71 #[inline]
73 pub fn try_new(stream: &OutputStreamHandle) -> Result<Sink, PlayError> {
74 let (sink, queue_rx) = Sink::new_idle();
75 stream.play_raw(queue_rx)?;
76 Ok(sink)
77 }
78
79 #[inline]
81 pub fn new_idle() -> (Sink, queue::SourcesQueueOutput<f32>) {
82 let (queue_tx, queue_rx) = queue::queue(true);
83
84 let sink = Sink {
85 queue_tx,
86 sleep_until_end: Mutex::new(None),
87 controls: Arc::new(Controls {
88 pause: AtomicBool::new(false),
89 volume: Mutex::new(1.0),
90 stopped: AtomicBool::new(false),
91 speed: Mutex::new(1.0),
92 to_clear: Mutex::new(0),
93 seek: Mutex::new(None),
94 position: Mutex::new(Duration::ZERO),
95 }),
96 sound_count: Arc::new(AtomicUsize::new(0)),
97 detached: false,
98 };
99 (sink, queue_rx)
100 }
101
102 #[inline]
104 pub fn append<S>(&self, source: S)
105 where
106 S: Source + Send + 'static,
107 f32: FromSample<S::Item>,
108 S::Item: Sample + Send,
109 {
110 if self.controls.stopped.load(Ordering::SeqCst) {
112 if self.sound_count.load(Ordering::SeqCst) > 0 {
113 self.sleep_until_end();
114 }
115 self.controls.stopped.store(false, Ordering::SeqCst);
116 }
117
118 let controls = self.controls.clone();
119
120 let start_played = AtomicBool::new(false);
121
122 let source = source
123 .speed(1.0)
124 .track_position()
126 .pausable(false)
127 .amplify(1.0)
128 .skippable()
129 .stoppable()
130 .periodic_access(Duration::from_millis(5), move |src| {
132 if controls.stopped.load(Ordering::SeqCst) {
133 src.stop();
134 *controls.position.lock().unwrap() = Duration::ZERO;
135 }
136 {
137 let mut to_clear = controls.to_clear.lock().unwrap();
138 if *to_clear > 0 {
139 src.inner_mut().skip();
140 *to_clear -= 1;
141 *controls.position.lock().unwrap() = Duration::ZERO;
142 } else {
143 *controls.position.lock().unwrap() = src.inner().inner().inner().inner().get_pos();
144 }
145 }
146 let amp = src.inner_mut().inner_mut();
147 amp.set_factor(*controls.volume.lock().unwrap());
148 amp.inner_mut()
149 .set_paused(controls.pause.load(Ordering::SeqCst));
150 amp.inner_mut()
151 .inner_mut()
152 .inner_mut()
153 .set_factor(*controls.speed.lock().unwrap());
154 if let Some(seek) = controls.seek.lock().unwrap().take() {
155 seek.attempt(amp)
156 }
157 start_played.store(true, Ordering::SeqCst);
158 })
159 .convert_samples();
160 self.sound_count.fetch_add(1, Ordering::Relaxed);
161 let source = Done::new(source, self.sound_count.clone());
162 *self.sleep_until_end.lock().unwrap() = Some(self.queue_tx.append_with_signal(source));
163 }
164
165 #[inline]
170 pub fn volume(&self) -> f32 {
171 *self.controls.volume.lock().unwrap()
172 }
173
174 #[inline]
179 pub fn set_volume(&self, value: f32) {
180 *self.controls.volume.lock().unwrap() = value;
181 }
182
183 #[inline]
198 pub fn speed(&self) -> f32 {
199 *self.controls.speed.lock().unwrap()
200 }
201
202 #[inline]
215 pub fn set_speed(&self, value: f32) {
216 *self.controls.speed.lock().unwrap() = value;
217 }
218
219 #[inline]
223 pub fn play(&self) {
224 self.controls.pause.store(false, Ordering::SeqCst);
225 }
226
227 pub fn try_seek(&self, pos: Duration) -> Result<(), SeekError> {
250 let (order, feedback) = SeekOrder::new(pos);
251 *self.controls.seek.lock().unwrap() = Some(order);
252
253 if self.sound_count.load(Ordering::Acquire) == 0 {
254 return Ok(());
256 }
257
258 match feedback.recv() {
259 Ok(seek_res) => {
260 *self.controls.position.lock().unwrap() = pos;
261 seek_res
262 }
263 Err(_) => Ok(()),
267 }
268 }
269
270 pub fn pause(&self) {
276 self.controls.pause.store(true, Ordering::SeqCst);
277 }
278
279 pub fn is_paused(&self) -> bool {
284 self.controls.pause.load(Ordering::SeqCst)
285 }
286
287 pub fn clear(&self) {
291 let len = self.sound_count.load(Ordering::SeqCst) as u32;
292 *self.controls.to_clear.lock().unwrap() = len;
293 self.sleep_until_end();
294 self.pause();
295 }
296
297 pub fn skip_one(&self) {
303 let len = self.sound_count.load(Ordering::SeqCst) as u32;
304 let mut to_clear = self.controls.to_clear.lock().unwrap();
305 if len > *to_clear {
306 *to_clear += 1;
307 }
308 }
309
310 #[inline]
312 pub fn stop(&self) {
313 self.controls.stopped.store(true, Ordering::SeqCst);
314 }
315
316 #[inline]
318 pub fn detach(mut self) {
319 self.detached = true;
320 }
321
322 #[inline]
324 pub fn sleep_until_end(&self) {
325 if let Some(sleep_until_end) = self.sleep_until_end.lock().unwrap().take() {
326 let _ = sleep_until_end.recv();
327 }
328 }
329
330 #[inline]
332 pub fn empty(&self) -> bool {
333 self.len() == 0
334 }
335
336 #[allow(clippy::len_without_is_empty)]
338 #[inline]
339 pub fn len(&self) -> usize {
340 self.sound_count.load(Ordering::Relaxed)
341 }
342
343 #[inline]
351 pub fn get_pos(&self) -> Duration {
352 *self.controls.position.lock().unwrap()
353 }
354}
355
356impl Drop for Sink {
357 #[inline]
358 fn drop(&mut self) {
359 self.queue_tx.set_keep_alive_if_empty(false);
360
361 if !self.detached {
362 self.controls.stopped.store(true, Ordering::Relaxed);
363 }
364 }
365}
366
367#[cfg(test)]
368mod tests {
369 use crate::buffer::SamplesBuffer;
370 use crate::{Sink, Source};
371 use std::sync::atomic::Ordering;
372
373 #[test]
374 fn test_pause_and_stop() {
375 let (sink, mut queue_rx) = Sink::new_idle();
376
377 let v = vec![10i16, -10, 20, -20, 30, -30];
380
381 sink.append(SamplesBuffer::new(1, 1, v.clone()));
383 let mut src = SamplesBuffer::new(1, 1, v).convert_samples();
384
385 assert_eq!(queue_rx.next(), src.next());
386 assert_eq!(queue_rx.next(), src.next());
387
388 sink.pause();
389
390 assert_eq!(queue_rx.next(), Some(0.0));
391
392 sink.play();
393
394 assert_eq!(queue_rx.next(), src.next());
395 assert_eq!(queue_rx.next(), src.next());
396
397 sink.stop();
398
399 assert_eq!(queue_rx.next(), Some(0.0));
400
401 assert_eq!(sink.empty(), true);
402 }
403
404 #[test]
405 fn test_stop_and_start() {
406 let (sink, mut queue_rx) = Sink::new_idle();
407
408 let v = vec![10i16, -10, 20, -20, 30, -30];
409
410 sink.append(SamplesBuffer::new(1, 1, v.clone()));
411 let mut src = SamplesBuffer::new(1, 1, v.clone()).convert_samples();
412
413 assert_eq!(queue_rx.next(), src.next());
414 assert_eq!(queue_rx.next(), src.next());
415
416 sink.stop();
417
418 assert!(sink.controls.stopped.load(Ordering::SeqCst));
419 assert_eq!(queue_rx.next(), Some(0.0));
420
421 src = SamplesBuffer::new(1, 1, v.clone()).convert_samples();
422 sink.append(SamplesBuffer::new(1, 1, v));
423
424 assert!(!sink.controls.stopped.load(Ordering::SeqCst));
425 let mut queue_rx = queue_rx.skip_while(|v| *v == 0.0);
427
428 assert_eq!(queue_rx.next(), src.next());
429 assert_eq!(queue_rx.next(), src.next());
430 }
431
432 #[test]
433 fn test_volume() {
434 let (sink, mut queue_rx) = Sink::new_idle();
435
436 let v = vec![10i16, -10, 20, -20, 30, -30];
437
438 sink.append(SamplesBuffer::new(2, 44100, v.clone()));
440 let src = SamplesBuffer::new(2, 44100, v.clone()).convert_samples();
441
442 let mut src = src.amplify(0.5);
443 sink.set_volume(0.5);
444
445 for _ in 0..v.len() {
446 assert_eq!(queue_rx.next(), src.next());
447 }
448 }
449}