rodio/source/
buffered.rs
1use std::cmp;
2use std::mem;
3use std::sync::{Arc, Mutex};
4use std::time::Duration;
5
6use crate::{Sample, Source};
7
8use super::SeekError;
9
10#[inline]
12pub fn buffered<I>(input: I) -> Buffered<I>
13where
14 I: Source,
15 I::Item: Sample,
16{
17 let total_duration = input.total_duration();
18 let first_frame = extract(input);
19
20 Buffered {
21 current_frame: first_frame,
22 position_in_frame: 0,
23 total_duration,
24 }
25}
26
27pub struct Buffered<I>
29where
30 I: Source,
31 I::Item: Sample,
32{
33 current_frame: Arc<Frame<I>>,
35
36 position_in_frame: usize,
38
39 total_duration: Option<Duration>,
41}
42
43enum Frame<I>
44where
45 I: Source,
46 I::Item: Sample,
47{
48 Data(FrameData<I>),
51
52 End,
54
55 Input(Mutex<Option<I>>),
58}
59
60struct FrameData<I>
61where
62 I: Source,
63 I::Item: Sample,
64{
65 data: Vec<I::Item>,
66 channels: u16,
67 rate: u32,
68 next: Mutex<Arc<Frame<I>>>,
69}
70
71impl<I> Drop for FrameData<I>
72where
73 I: Source,
74 I::Item: Sample,
75{
76 fn drop(&mut self) {
77 while let Ok(arc_next) = self.next.get_mut() {
84 if let Some(next_ref) = Arc::get_mut(arc_next) {
85 let next = mem::replace(next_ref, Frame::End);
87 if let Frame::Data(next_data) = next {
88 *self = next_data;
91 } else {
92 break;
93 }
94 } else {
95 break;
96 }
97 }
98 }
99}
100
101fn extract<I>(mut input: I) -> Arc<Frame<I>>
103where
104 I: Source,
105 I::Item: Sample,
106{
107 let frame_len = input.current_frame_len();
108
109 if frame_len == Some(0) {
110 return Arc::new(Frame::End);
111 }
112
113 let channels = input.channels();
114 let rate = input.sample_rate();
115 let data: Vec<I::Item> = input
116 .by_ref()
117 .take(cmp::min(frame_len.unwrap_or(32768), 32768))
118 .collect();
119
120 if data.is_empty() {
121 return Arc::new(Frame::End);
122 }
123
124 Arc::new(Frame::Data(FrameData {
125 data,
126 channels,
127 rate,
128 next: Mutex::new(Arc::new(Frame::Input(Mutex::new(Some(input))))),
129 }))
130}
131
132impl<I> Buffered<I>
133where
134 I: Source,
135 I::Item: Sample,
136{
137 fn next_frame(&mut self) {
139 let next_frame = {
140 let mut next_frame_ptr = match &*self.current_frame {
141 Frame::Data(FrameData { next, .. }) => next.lock().unwrap(),
142 _ => unreachable!(),
143 };
144
145 let next_frame = match &**next_frame_ptr {
146 Frame::Data(_) => next_frame_ptr.clone(),
147 Frame::End => next_frame_ptr.clone(),
148 Frame::Input(input) => {
149 let input = input.lock().unwrap().take().unwrap();
150 extract(input)
151 }
152 };
153
154 *next_frame_ptr = next_frame.clone();
155 next_frame
156 };
157
158 self.current_frame = next_frame;
159 self.position_in_frame = 0;
160 }
161}
162
163impl<I> Iterator for Buffered<I>
164where
165 I: Source,
166 I::Item: Sample,
167{
168 type Item = I::Item;
169
170 #[inline]
171 fn next(&mut self) -> Option<I::Item> {
172 let current_sample;
173 let advance_frame;
174
175 match &*self.current_frame {
176 Frame::Data(FrameData { data, .. }) => {
177 current_sample = Some(data[self.position_in_frame]);
178 self.position_in_frame += 1;
179 advance_frame = self.position_in_frame >= data.len();
180 }
181
182 Frame::End => {
183 current_sample = None;
184 advance_frame = false;
185 }
186
187 Frame::Input(_) => unreachable!(),
188 };
189
190 if advance_frame {
191 self.next_frame();
192 }
193
194 current_sample
195 }
196
197 #[inline]
198 fn size_hint(&self) -> (usize, Option<usize>) {
199 (0, None)
201 }
202}
203
204impl<I> Source for Buffered<I>
209where
210 I: Source,
211 I::Item: Sample,
212{
213 #[inline]
214 fn current_frame_len(&self) -> Option<usize> {
215 match &*self.current_frame {
216 Frame::Data(FrameData { data, .. }) => Some(data.len() - self.position_in_frame),
217 Frame::End => Some(0),
218 Frame::Input(_) => unreachable!(),
219 }
220 }
221
222 #[inline]
223 fn channels(&self) -> u16 {
224 match *self.current_frame {
225 Frame::Data(FrameData { channels, .. }) => channels,
226 Frame::End => 1,
227 Frame::Input(_) => unreachable!(),
228 }
229 }
230
231 #[inline]
232 fn sample_rate(&self) -> u32 {
233 match *self.current_frame {
234 Frame::Data(FrameData { rate, .. }) => rate,
235 Frame::End => 44100,
236 Frame::Input(_) => unreachable!(),
237 }
238 }
239
240 #[inline]
241 fn total_duration(&self) -> Option<Duration> {
242 self.total_duration
243 }
244
245 #[inline]
248 fn try_seek(&mut self, _: Duration) -> Result<(), SeekError> {
249 Err(SeekError::NotSupported {
250 underlying_source: std::any::type_name::<Self>(),
251 })
252 }
253}
254
255impl<I> Clone for Buffered<I>
256where
257 I: Source,
258 I::Item: Sample,
259{
260 #[inline]
261 fn clone(&self) -> Buffered<I> {
262 Buffered {
263 current_frame: self.current_frame.clone(),
264 position_in_frame: self.position_in_frame,
265 total_duration: self.total_duration,
266 }
267 }
268}