rodio/source/
buffered.rs

1use std::cmp;
2use std::mem;
3use std::sync::{Arc, Mutex};
4use std::time::Duration;
5
6use crate::{Sample, Source};
7
8use super::SeekError;
9
10/// Internal function that builds a `Buffered` object.
11#[inline]
12pub fn buffered<I>(input: I) -> Buffered<I>
13where
14    I: Source,
15    I::Item: Sample,
16{
17    let total_duration = input.total_duration();
18    let first_frame = extract(input);
19
20    Buffered {
21        current_frame: first_frame,
22        position_in_frame: 0,
23        total_duration,
24    }
25}
26
27/// Iterator that at the same time extracts data from the iterator and stores it in a buffer.
28pub struct Buffered<I>
29where
30    I: Source,
31    I::Item: Sample,
32{
33    /// Immutable reference to the next frame of data. Cannot be `Frame::Input`.
34    current_frame: Arc<Frame<I>>,
35
36    /// The position in number of samples of this iterator inside `current_frame`.
37    position_in_frame: usize,
38
39    /// Obtained once at creation and never modified again.
40    total_duration: Option<Duration>,
41}
42
43enum Frame<I>
44where
45    I: Source,
46    I::Item: Sample,
47{
48    /// Data that has already been extracted from the iterator. Also contains a pointer to the
49    /// next frame.
50    Data(FrameData<I>),
51
52    /// No more data.
53    End,
54
55    /// Unextracted data. The `Option` should never be `None` and is only here for easier data
56    /// processing.
57    Input(Mutex<Option<I>>),
58}
59
60struct FrameData<I>
61where
62    I: Source,
63    I::Item: Sample,
64{
65    data: Vec<I::Item>,
66    channels: u16,
67    rate: u32,
68    next: Mutex<Arc<Frame<I>>>,
69}
70
71impl<I> Drop for FrameData<I>
72where
73    I: Source,
74    I::Item: Sample,
75{
76    fn drop(&mut self) {
77        // This is necessary to prevent stack overflows deallocating long chains of the mutually
78        // recursive `Frame` and `FrameData` types. This iteratively traverses as much of the
79        // chain as needs to be deallocated, and repeatedly "pops" the head off the list. This
80        // solves the problem, as when the time comes to actually deallocate the `FrameData`,
81        // the `next` field will contain a `Frame::End`, or an `Arc` with additional references,
82        // so the depth of recursive drops will be bounded.
83        while let Ok(arc_next) = self.next.get_mut() {
84            if let Some(next_ref) = Arc::get_mut(arc_next) {
85                // This allows us to own the next Frame.
86                let next = mem::replace(next_ref, Frame::End);
87                if let Frame::Data(next_data) = next {
88                    // Swap the current FrameData with the next one, allowing the current one
89                    // to go out of scope.
90                    *self = next_data;
91                } else {
92                    break;
93                }
94            } else {
95                break;
96            }
97        }
98    }
99}
100
101/// Builds a frame from the input iterator.
102fn extract<I>(mut input: I) -> Arc<Frame<I>>
103where
104    I: Source,
105    I::Item: Sample,
106{
107    let frame_len = input.current_frame_len();
108
109    if frame_len == Some(0) {
110        return Arc::new(Frame::End);
111    }
112
113    let channels = input.channels();
114    let rate = input.sample_rate();
115    let data: Vec<I::Item> = input
116        .by_ref()
117        .take(cmp::min(frame_len.unwrap_or(32768), 32768))
118        .collect();
119
120    if data.is_empty() {
121        return Arc::new(Frame::End);
122    }
123
124    Arc::new(Frame::Data(FrameData {
125        data,
126        channels,
127        rate,
128        next: Mutex::new(Arc::new(Frame::Input(Mutex::new(Some(input))))),
129    }))
130}
131
132impl<I> Buffered<I>
133where
134    I: Source,
135    I::Item: Sample,
136{
137    /// Advances to the next frame.
138    fn next_frame(&mut self) {
139        let next_frame = {
140            let mut next_frame_ptr = match &*self.current_frame {
141                Frame::Data(FrameData { next, .. }) => next.lock().unwrap(),
142                _ => unreachable!(),
143            };
144
145            let next_frame = match &**next_frame_ptr {
146                Frame::Data(_) => next_frame_ptr.clone(),
147                Frame::End => next_frame_ptr.clone(),
148                Frame::Input(input) => {
149                    let input = input.lock().unwrap().take().unwrap();
150                    extract(input)
151                }
152            };
153
154            *next_frame_ptr = next_frame.clone();
155            next_frame
156        };
157
158        self.current_frame = next_frame;
159        self.position_in_frame = 0;
160    }
161}
162
163impl<I> Iterator for Buffered<I>
164where
165    I: Source,
166    I::Item: Sample,
167{
168    type Item = I::Item;
169
170    #[inline]
171    fn next(&mut self) -> Option<I::Item> {
172        let current_sample;
173        let advance_frame;
174
175        match &*self.current_frame {
176            Frame::Data(FrameData { data, .. }) => {
177                current_sample = Some(data[self.position_in_frame]);
178                self.position_in_frame += 1;
179                advance_frame = self.position_in_frame >= data.len();
180            }
181
182            Frame::End => {
183                current_sample = None;
184                advance_frame = false;
185            }
186
187            Frame::Input(_) => unreachable!(),
188        };
189
190        if advance_frame {
191            self.next_frame();
192        }
193
194        current_sample
195    }
196
197    #[inline]
198    fn size_hint(&self) -> (usize, Option<usize>) {
199        // TODO:
200        (0, None)
201    }
202}
203
204// TODO: uncomment when `size_hint` is fixed
205/*impl<I> ExactSizeIterator for Amplify<I> where I: Source + ExactSizeIterator, I::Item: Sample {
206}*/
207
208impl<I> Source for Buffered<I>
209where
210    I: Source,
211    I::Item: Sample,
212{
213    #[inline]
214    fn current_frame_len(&self) -> Option<usize> {
215        match &*self.current_frame {
216            Frame::Data(FrameData { data, .. }) => Some(data.len() - self.position_in_frame),
217            Frame::End => Some(0),
218            Frame::Input(_) => unreachable!(),
219        }
220    }
221
222    #[inline]
223    fn channels(&self) -> u16 {
224        match *self.current_frame {
225            Frame::Data(FrameData { channels, .. }) => channels,
226            Frame::End => 1,
227            Frame::Input(_) => unreachable!(),
228        }
229    }
230
231    #[inline]
232    fn sample_rate(&self) -> u32 {
233        match *self.current_frame {
234            Frame::Data(FrameData { rate, .. }) => rate,
235            Frame::End => 44100,
236            Frame::Input(_) => unreachable!(),
237        }
238    }
239
240    #[inline]
241    fn total_duration(&self) -> Option<Duration> {
242        self.total_duration
243    }
244
245    /// Can not support seek, in the end state we lose the underlying source
246    /// which makes seeking back impossible.
247    #[inline]
248    fn try_seek(&mut self, _: Duration) -> Result<(), SeekError> {
249        Err(SeekError::NotSupported {
250            underlying_source: std::any::type_name::<Self>(),
251        })
252    }
253}
254
255impl<I> Clone for Buffered<I>
256where
257    I: Source,
258    I::Item: Sample,
259{
260    #[inline]
261    fn clone(&self) -> Buffered<I> {
262        Buffered {
263            current_frame: self.current_frame.clone(),
264            position_in_frame: self.position_in_frame,
265            total_duration: self.total_duration,
266        }
267    }
268}