rodio/
dynamic_mixer.rs

1//! Mixer that plays multiple sounds at the same time.
2
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::{Arc, Mutex};
5use std::time::Duration;
6
7use crate::source::{SeekError, Source, UniformSourceIterator};
8use crate::Sample;
9
10/// Builds a new mixer.
11///
12/// You can choose the characteristics of the output thanks to this constructor. All the sounds
13/// added to the mixer will be converted to these values.
14///
15/// After creating a mixer, you can add new sounds with the controller.
16pub fn mixer<S>(
17    channels: u16,
18    sample_rate: u32,
19) -> (Arc<DynamicMixerController<S>>, DynamicMixer<S>)
20where
21    S: Sample + Send + 'static,
22{
23    let input = Arc::new(DynamicMixerController {
24        has_pending: AtomicBool::new(false),
25        pending_sources: Mutex::new(Vec::new()),
26        channels,
27        sample_rate,
28    });
29
30    let output = DynamicMixer {
31        current_sources: Vec::with_capacity(16),
32        input: input.clone(),
33        sample_count: 0,
34        still_pending: vec![],
35        still_current: vec![],
36    };
37
38    (input, output)
39}
40
41/// The input of the mixer.
42pub struct DynamicMixerController<S> {
43    has_pending: AtomicBool,
44    pending_sources: Mutex<Vec<Box<dyn Source<Item = S> + Send>>>,
45    channels: u16,
46    sample_rate: u32,
47}
48
49impl<S> DynamicMixerController<S>
50where
51    S: Sample + Send + 'static,
52{
53    /// Adds a new source to mix to the existing ones.
54    #[inline]
55    pub fn add<T>(&self, source: T)
56    where
57        T: Source<Item = S> + Send + 'static,
58    {
59        let uniform_source = UniformSourceIterator::new(source, self.channels, self.sample_rate);
60        self.pending_sources
61            .lock()
62            .unwrap()
63            .push(Box::new(uniform_source) as Box<_>);
64        self.has_pending.store(true, Ordering::SeqCst); // TODO: can we relax this ordering?
65    }
66}
67
68/// The output of the mixer. Implements `Source`.
69pub struct DynamicMixer<S> {
70    // The current iterator that produces samples.
71    current_sources: Vec<Box<dyn Source<Item = S> + Send>>,
72
73    // The pending sounds.
74    input: Arc<DynamicMixerController<S>>,
75
76    // The number of samples produced so far.
77    sample_count: usize,
78
79    // A temporary vec used in start_pending_sources.
80    still_pending: Vec<Box<dyn Source<Item = S> + Send>>,
81
82    // A temporary vec used in sum_current_sources.
83    still_current: Vec<Box<dyn Source<Item = S> + Send>>,
84}
85
86impl<S> Source for DynamicMixer<S>
87where
88    S: Sample + Send + 'static,
89{
90    #[inline]
91    fn current_frame_len(&self) -> Option<usize> {
92        None
93    }
94
95    #[inline]
96    fn channels(&self) -> u16 {
97        self.input.channels
98    }
99
100    #[inline]
101    fn sample_rate(&self) -> u32 {
102        self.input.sample_rate
103    }
104
105    #[inline]
106    fn total_duration(&self) -> Option<Duration> {
107        None
108    }
109
110    #[inline]
111    fn try_seek(&mut self, _: Duration) -> Result<(), SeekError> {
112        Err(SeekError::NotSupported {
113            underlying_source: std::any::type_name::<Self>(),
114        })
115
116        // uncomment when #510 is implemented (query position of playback)
117
118        // let mut org_positions = Vec::with_capacity(self.current_sources.len());
119        // let mut encounterd_err = None;
120        //
121        // for source in &mut self.current_sources {
122        //     let pos = /* source.playback_pos() */ todo!();
123        //     if let Err(e) = source.try_seek(pos) {
124        //         encounterd_err = Some(e);
125        //         break;
126        //     } else {
127        //         // store pos in case we need to roll back
128        //         org_positions.push(pos);
129        //     }
130        // }
131        //
132        // if let Some(e) = encounterd_err {
133        //     // rollback seeks that happend before err
134        //     for (pos, source) in org_positions
135        //         .into_iter()
136        //         .zip(self.current_sources.iter_mut())
137        //     {
138        //         source.try_seek(pos)?;
139        //     }
140        //     Err(e)
141        // } else {
142        //     Ok(())
143        // }
144    }
145}
146
147impl<S> Iterator for DynamicMixer<S>
148where
149    S: Sample + Send + 'static,
150{
151    type Item = S;
152
153    #[inline]
154    fn next(&mut self) -> Option<S> {
155        if self.input.has_pending.load(Ordering::SeqCst) {
156            self.start_pending_sources();
157        }
158
159        self.sample_count += 1;
160
161        let sum = self.sum_current_sources();
162
163        if self.current_sources.is_empty() {
164            None
165        } else {
166            Some(sum)
167        }
168    }
169
170    #[inline]
171    fn size_hint(&self) -> (usize, Option<usize>) {
172        (0, None)
173    }
174}
175
176impl<S> DynamicMixer<S>
177where
178    S: Sample + Send + 'static,
179{
180    // Samples from the #next() function are interlaced for each of the channels.
181    // We need to ensure we start playing sources so that their samples are
182    // in-step with the modulo of the samples produced so far. Otherwise, the
183    // sound will play on the wrong channels, e.g. left / right will be reversed.
184    fn start_pending_sources(&mut self) {
185        let mut pending = self.input.pending_sources.lock().unwrap(); // TODO: relax ordering?
186
187        for source in pending.drain(..) {
188            let in_step = self.sample_count % source.channels() as usize == 0;
189
190            if in_step {
191                self.current_sources.push(source);
192            } else {
193                self.still_pending.push(source);
194            }
195        }
196        std::mem::swap(&mut self.still_pending, &mut pending);
197
198        let has_pending = !pending.is_empty();
199        self.input.has_pending.store(has_pending, Ordering::SeqCst); // TODO: relax ordering?
200    }
201
202    fn sum_current_sources(&mut self) -> S {
203        let mut sum = S::zero_value();
204
205        for mut source in self.current_sources.drain(..) {
206            if let Some(value) = source.next() {
207                sum = sum.saturating_add(value);
208                self.still_current.push(source);
209            }
210        }
211        std::mem::swap(&mut self.still_current, &mut self.current_sources);
212
213        sum
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use crate::buffer::SamplesBuffer;
220    use crate::dynamic_mixer;
221    use crate::source::Source;
222
223    #[test]
224    fn basic() {
225        let (tx, mut rx) = dynamic_mixer::mixer(1, 48000);
226
227        tx.add(SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10]));
228        tx.add(SamplesBuffer::new(1, 48000, vec![5i16, 5, 5, 5]));
229
230        assert_eq!(rx.channels(), 1);
231        assert_eq!(rx.sample_rate(), 48000);
232        assert_eq!(rx.next(), Some(15));
233        assert_eq!(rx.next(), Some(-5));
234        assert_eq!(rx.next(), Some(15));
235        assert_eq!(rx.next(), Some(-5));
236        assert_eq!(rx.next(), None);
237    }
238
239    #[test]
240    fn channels_conv() {
241        let (tx, mut rx) = dynamic_mixer::mixer(2, 48000);
242
243        tx.add(SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10]));
244        tx.add(SamplesBuffer::new(1, 48000, vec![5i16, 5, 5, 5]));
245
246        assert_eq!(rx.channels(), 2);
247        assert_eq!(rx.sample_rate(), 48000);
248        assert_eq!(rx.next(), Some(15));
249        assert_eq!(rx.next(), Some(15));
250        assert_eq!(rx.next(), Some(-5));
251        assert_eq!(rx.next(), Some(-5));
252        assert_eq!(rx.next(), Some(15));
253        assert_eq!(rx.next(), Some(15));
254        assert_eq!(rx.next(), Some(-5));
255        assert_eq!(rx.next(), Some(-5));
256        assert_eq!(rx.next(), None);
257    }
258
259    #[test]
260    fn rate_conv() {
261        let (tx, mut rx) = dynamic_mixer::mixer(1, 96000);
262
263        tx.add(SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10]));
264        tx.add(SamplesBuffer::new(1, 48000, vec![5i16, 5, 5, 5]));
265
266        assert_eq!(rx.channels(), 1);
267        assert_eq!(rx.sample_rate(), 96000);
268        assert_eq!(rx.next(), Some(15));
269        assert_eq!(rx.next(), Some(5));
270        assert_eq!(rx.next(), Some(-5));
271        assert_eq!(rx.next(), Some(5));
272        assert_eq!(rx.next(), Some(15));
273        assert_eq!(rx.next(), Some(5));
274        assert_eq!(rx.next(), Some(-5));
275        assert_eq!(rx.next(), None);
276    }
277
278    #[test]
279    fn start_afterwards() {
280        let (tx, mut rx) = dynamic_mixer::mixer(1, 48000);
281
282        tx.add(SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10]));
283
284        assert_eq!(rx.next(), Some(10));
285        assert_eq!(rx.next(), Some(-10));
286
287        tx.add(SamplesBuffer::new(1, 48000, vec![5i16, 5, 6, 6, 7, 7, 7]));
288
289        assert_eq!(rx.next(), Some(15));
290        assert_eq!(rx.next(), Some(-5));
291
292        assert_eq!(rx.next(), Some(6));
293        assert_eq!(rx.next(), Some(6));
294
295        tx.add(SamplesBuffer::new(1, 48000, vec![2i16]));
296
297        assert_eq!(rx.next(), Some(9));
298        assert_eq!(rx.next(), Some(7));
299        assert_eq!(rx.next(), Some(7));
300
301        assert_eq!(rx.next(), None);
302    }
303}