1use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::{Arc, Mutex};
5use std::time::Duration;
6
7use crate::source::{SeekError, Source, UniformSourceIterator};
8use crate::Sample;
9
10pub fn mixer<S>(
17 channels: u16,
18 sample_rate: u32,
19) -> (Arc<DynamicMixerController<S>>, DynamicMixer<S>)
20where
21 S: Sample + Send + 'static,
22{
23 let input = Arc::new(DynamicMixerController {
24 has_pending: AtomicBool::new(false),
25 pending_sources: Mutex::new(Vec::new()),
26 channels,
27 sample_rate,
28 });
29
30 let output = DynamicMixer {
31 current_sources: Vec::with_capacity(16),
32 input: input.clone(),
33 sample_count: 0,
34 still_pending: vec![],
35 still_current: vec![],
36 };
37
38 (input, output)
39}
40
41pub struct DynamicMixerController<S> {
43 has_pending: AtomicBool,
44 pending_sources: Mutex<Vec<Box<dyn Source<Item = S> + Send>>>,
45 channels: u16,
46 sample_rate: u32,
47}
48
49impl<S> DynamicMixerController<S>
50where
51 S: Sample + Send + 'static,
52{
53 #[inline]
55 pub fn add<T>(&self, source: T)
56 where
57 T: Source<Item = S> + Send + 'static,
58 {
59 let uniform_source = UniformSourceIterator::new(source, self.channels, self.sample_rate);
60 self.pending_sources
61 .lock()
62 .unwrap()
63 .push(Box::new(uniform_source) as Box<_>);
64 self.has_pending.store(true, Ordering::SeqCst); }
66}
67
68pub struct DynamicMixer<S> {
70 current_sources: Vec<Box<dyn Source<Item = S> + Send>>,
72
73 input: Arc<DynamicMixerController<S>>,
75
76 sample_count: usize,
78
79 still_pending: Vec<Box<dyn Source<Item = S> + Send>>,
81
82 still_current: Vec<Box<dyn Source<Item = S> + Send>>,
84}
85
86impl<S> Source for DynamicMixer<S>
87where
88 S: Sample + Send + 'static,
89{
90 #[inline]
91 fn current_frame_len(&self) -> Option<usize> {
92 None
93 }
94
95 #[inline]
96 fn channels(&self) -> u16 {
97 self.input.channels
98 }
99
100 #[inline]
101 fn sample_rate(&self) -> u32 {
102 self.input.sample_rate
103 }
104
105 #[inline]
106 fn total_duration(&self) -> Option<Duration> {
107 None
108 }
109
110 #[inline]
111 fn try_seek(&mut self, _: Duration) -> Result<(), SeekError> {
112 Err(SeekError::NotSupported {
113 underlying_source: std::any::type_name::<Self>(),
114 })
115
116 }
145}
146
147impl<S> Iterator for DynamicMixer<S>
148where
149 S: Sample + Send + 'static,
150{
151 type Item = S;
152
153 #[inline]
154 fn next(&mut self) -> Option<S> {
155 if self.input.has_pending.load(Ordering::SeqCst) {
156 self.start_pending_sources();
157 }
158
159 self.sample_count += 1;
160
161 let sum = self.sum_current_sources();
162
163 if self.current_sources.is_empty() {
164 None
165 } else {
166 Some(sum)
167 }
168 }
169
170 #[inline]
171 fn size_hint(&self) -> (usize, Option<usize>) {
172 (0, None)
173 }
174}
175
176impl<S> DynamicMixer<S>
177where
178 S: Sample + Send + 'static,
179{
180 fn start_pending_sources(&mut self) {
185 let mut pending = self.input.pending_sources.lock().unwrap(); for source in pending.drain(..) {
188 let in_step = self.sample_count % source.channels() as usize == 0;
189
190 if in_step {
191 self.current_sources.push(source);
192 } else {
193 self.still_pending.push(source);
194 }
195 }
196 std::mem::swap(&mut self.still_pending, &mut pending);
197
198 let has_pending = !pending.is_empty();
199 self.input.has_pending.store(has_pending, Ordering::SeqCst); }
201
202 fn sum_current_sources(&mut self) -> S {
203 let mut sum = S::zero_value();
204
205 for mut source in self.current_sources.drain(..) {
206 if let Some(value) = source.next() {
207 sum = sum.saturating_add(value);
208 self.still_current.push(source);
209 }
210 }
211 std::mem::swap(&mut self.still_current, &mut self.current_sources);
212
213 sum
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use crate::buffer::SamplesBuffer;
220 use crate::dynamic_mixer;
221 use crate::source::Source;
222
223 #[test]
224 fn basic() {
225 let (tx, mut rx) = dynamic_mixer::mixer(1, 48000);
226
227 tx.add(SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10]));
228 tx.add(SamplesBuffer::new(1, 48000, vec![5i16, 5, 5, 5]));
229
230 assert_eq!(rx.channels(), 1);
231 assert_eq!(rx.sample_rate(), 48000);
232 assert_eq!(rx.next(), Some(15));
233 assert_eq!(rx.next(), Some(-5));
234 assert_eq!(rx.next(), Some(15));
235 assert_eq!(rx.next(), Some(-5));
236 assert_eq!(rx.next(), None);
237 }
238
239 #[test]
240 fn channels_conv() {
241 let (tx, mut rx) = dynamic_mixer::mixer(2, 48000);
242
243 tx.add(SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10]));
244 tx.add(SamplesBuffer::new(1, 48000, vec![5i16, 5, 5, 5]));
245
246 assert_eq!(rx.channels(), 2);
247 assert_eq!(rx.sample_rate(), 48000);
248 assert_eq!(rx.next(), Some(15));
249 assert_eq!(rx.next(), Some(15));
250 assert_eq!(rx.next(), Some(-5));
251 assert_eq!(rx.next(), Some(-5));
252 assert_eq!(rx.next(), Some(15));
253 assert_eq!(rx.next(), Some(15));
254 assert_eq!(rx.next(), Some(-5));
255 assert_eq!(rx.next(), Some(-5));
256 assert_eq!(rx.next(), None);
257 }
258
259 #[test]
260 fn rate_conv() {
261 let (tx, mut rx) = dynamic_mixer::mixer(1, 96000);
262
263 tx.add(SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10]));
264 tx.add(SamplesBuffer::new(1, 48000, vec![5i16, 5, 5, 5]));
265
266 assert_eq!(rx.channels(), 1);
267 assert_eq!(rx.sample_rate(), 96000);
268 assert_eq!(rx.next(), Some(15));
269 assert_eq!(rx.next(), Some(5));
270 assert_eq!(rx.next(), Some(-5));
271 assert_eq!(rx.next(), Some(5));
272 assert_eq!(rx.next(), Some(15));
273 assert_eq!(rx.next(), Some(5));
274 assert_eq!(rx.next(), Some(-5));
275 assert_eq!(rx.next(), None);
276 }
277
278 #[test]
279 fn start_afterwards() {
280 let (tx, mut rx) = dynamic_mixer::mixer(1, 48000);
281
282 tx.add(SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10]));
283
284 assert_eq!(rx.next(), Some(10));
285 assert_eq!(rx.next(), Some(-10));
286
287 tx.add(SamplesBuffer::new(1, 48000, vec![5i16, 5, 6, 6, 7, 7, 7]));
288
289 assert_eq!(rx.next(), Some(15));
290 assert_eq!(rx.next(), Some(-5));
291
292 assert_eq!(rx.next(), Some(6));
293 assert_eq!(rx.next(), Some(6));
294
295 tx.add(SamplesBuffer::new(1, 48000, vec![2i16]));
296
297 assert_eq!(rx.next(), Some(9));
298 assert_eq!(rx.next(), Some(7));
299 assert_eq!(rx.next(), Some(7));
300
301 assert_eq!(rx.next(), None);
302 }
303}