cpal/host/alsa/
mod.rs

1extern crate alsa;
2extern crate libc;
3
4use self::alsa::poll::Descriptors;
5use crate::traits::{DeviceTrait, HostTrait, StreamTrait};
6use crate::{
7    BackendSpecificError, BufferSize, BuildStreamError, ChannelCount, Data,
8    DefaultStreamConfigError, DeviceNameError, DevicesError, InputCallbackInfo, OutputCallbackInfo,
9    PauseStreamError, PlayStreamError, SampleFormat, SampleRate, StreamConfig, StreamError,
10    SupportedBufferSize, SupportedStreamConfig, SupportedStreamConfigRange,
11    SupportedStreamConfigsError,
12};
13use std::cmp;
14use std::convert::TryInto;
15use std::sync::{Arc, Mutex};
16use std::thread::{self, JoinHandle};
17use std::time::Duration;
18use std::vec::IntoIter as VecIntoIter;
19
20pub use self::enumerate::{default_input_device, default_output_device, Devices};
21
22pub type SupportedInputConfigs = VecIntoIter<SupportedStreamConfigRange>;
23pub type SupportedOutputConfigs = VecIntoIter<SupportedStreamConfigRange>;
24
25mod enumerate;
26
27/// The default linux, dragonfly, freebsd and netbsd host type.
28#[derive(Debug)]
29pub struct Host;
30
31impl Host {
32    pub fn new() -> Result<Self, crate::HostUnavailable> {
33        Ok(Host)
34    }
35}
36
37impl HostTrait for Host {
38    type Devices = Devices;
39    type Device = Device;
40
41    fn is_available() -> bool {
42        // Assume ALSA is always available on linux/dragonfly/freebsd/netbsd.
43        true
44    }
45
46    fn devices(&self) -> Result<Self::Devices, DevicesError> {
47        Devices::new()
48    }
49
50    fn default_input_device(&self) -> Option<Self::Device> {
51        default_input_device()
52    }
53
54    fn default_output_device(&self) -> Option<Self::Device> {
55        default_output_device()
56    }
57}
58
59impl DeviceTrait for Device {
60    type SupportedInputConfigs = SupportedInputConfigs;
61    type SupportedOutputConfigs = SupportedOutputConfigs;
62    type Stream = Stream;
63
64    fn name(&self) -> Result<String, DeviceNameError> {
65        Device::name(self)
66    }
67
68    fn supported_input_configs(
69        &self,
70    ) -> Result<Self::SupportedInputConfigs, SupportedStreamConfigsError> {
71        Device::supported_input_configs(self)
72    }
73
74    fn supported_output_configs(
75        &self,
76    ) -> Result<Self::SupportedOutputConfigs, SupportedStreamConfigsError> {
77        Device::supported_output_configs(self)
78    }
79
80    fn default_input_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
81        Device::default_input_config(self)
82    }
83
84    fn default_output_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
85        Device::default_output_config(self)
86    }
87
88    fn build_input_stream_raw<D, E>(
89        &self,
90        conf: &StreamConfig,
91        sample_format: SampleFormat,
92        data_callback: D,
93        error_callback: E,
94        timeout: Option<Duration>,
95    ) -> Result<Self::Stream, BuildStreamError>
96    where
97        D: FnMut(&Data, &InputCallbackInfo) + Send + 'static,
98        E: FnMut(StreamError) + Send + 'static,
99    {
100        let stream_inner =
101            self.build_stream_inner(conf, sample_format, alsa::Direction::Capture)?;
102        let stream = Stream::new_input(
103            Arc::new(stream_inner),
104            data_callback,
105            error_callback,
106            timeout,
107        );
108        Ok(stream)
109    }
110
111    fn build_output_stream_raw<D, E>(
112        &self,
113        conf: &StreamConfig,
114        sample_format: SampleFormat,
115        data_callback: D,
116        error_callback: E,
117        timeout: Option<Duration>,
118    ) -> Result<Self::Stream, BuildStreamError>
119    where
120        D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static,
121        E: FnMut(StreamError) + Send + 'static,
122    {
123        let stream_inner =
124            self.build_stream_inner(conf, sample_format, alsa::Direction::Playback)?;
125        let stream = Stream::new_output(
126            Arc::new(stream_inner),
127            data_callback,
128            error_callback,
129            timeout,
130        );
131        Ok(stream)
132    }
133}
134
135struct TriggerSender(libc::c_int);
136
137struct TriggerReceiver(libc::c_int);
138
139impl TriggerSender {
140    fn wakeup(&self) {
141        let buf = 1u64;
142        let ret = unsafe { libc::write(self.0, &buf as *const u64 as *const _, 8) };
143        assert_eq!(ret, 8);
144    }
145}
146
147impl TriggerReceiver {
148    fn clear_pipe(&self) {
149        let mut out = 0u64;
150        let ret = unsafe { libc::read(self.0, &mut out as *mut u64 as *mut _, 8) };
151        assert_eq!(ret, 8);
152    }
153}
154
155fn trigger() -> (TriggerSender, TriggerReceiver) {
156    let mut fds = [0, 0];
157    match unsafe { libc::pipe(fds.as_mut_ptr()) } {
158        0 => (TriggerSender(fds[1]), TriggerReceiver(fds[0])),
159        _ => panic!("Could not create pipe"),
160    }
161}
162
163impl Drop for TriggerSender {
164    fn drop(&mut self) {
165        unsafe {
166            libc::close(self.0);
167        }
168    }
169}
170
171impl Drop for TriggerReceiver {
172    fn drop(&mut self) {
173        unsafe {
174            libc::close(self.0);
175        }
176    }
177}
178
179#[derive(Default)]
180struct DeviceHandles {
181    playback: Option<alsa::PCM>,
182    capture: Option<alsa::PCM>,
183}
184
185impl DeviceHandles {
186    /// Create `DeviceHandles` for `name` and try to open a handle for both
187    /// directions. Returns `Ok` if either direction is opened successfully.
188    fn open(name: &str) -> Result<Self, alsa::Error> {
189        let mut handles = Self::default();
190        let playback_err = handles.try_open(name, alsa::Direction::Playback).err();
191        let capture_err = handles.try_open(name, alsa::Direction::Capture).err();
192        if let Some(err) = capture_err.and(playback_err) {
193            Err(err)
194        } else {
195            Ok(handles)
196        }
197    }
198
199    /// Get a mutable reference to the `Option` for a specific `stream_type`.
200    /// If the `Option` is `None`, the `alsa::PCM` will be opened and placed in
201    /// the `Option` before returning. If `handle_mut()` returns `Ok` the contained
202    /// `Option` is guaranteed to be `Some(..)`.
203    fn try_open(
204        &mut self,
205        name: &str,
206        stream_type: alsa::Direction,
207    ) -> Result<&mut Option<alsa::PCM>, alsa::Error> {
208        let handle = match stream_type {
209            alsa::Direction::Playback => &mut self.playback,
210            alsa::Direction::Capture => &mut self.capture,
211        };
212
213        if handle.is_none() {
214            *handle = Some(alsa::pcm::PCM::new(name, stream_type, true)?);
215        }
216
217        Ok(handle)
218    }
219
220    /// Get a mutable reference to the `alsa::PCM` handle for a specific `stream_type`.
221    /// If the handle is not yet opened, it will be opened and stored in `self`.
222    fn get_mut(
223        &mut self,
224        name: &str,
225        stream_type: alsa::Direction,
226    ) -> Result<&mut alsa::PCM, alsa::Error> {
227        Ok(self.try_open(name, stream_type)?.as_mut().unwrap())
228    }
229
230    /// Take ownership of the `alsa::PCM` handle for a specific `stream_type`.
231    /// If the handle is not yet opened, it will be opened and returned.
232    fn take(&mut self, name: &str, stream_type: alsa::Direction) -> Result<alsa::PCM, alsa::Error> {
233        Ok(self.try_open(name, stream_type)?.take().unwrap())
234    }
235}
236
237#[derive(Clone)]
238pub struct Device {
239    name: String,
240    handles: Arc<Mutex<DeviceHandles>>,
241}
242
243impl Device {
244    fn build_stream_inner(
245        &self,
246        conf: &StreamConfig,
247        sample_format: SampleFormat,
248        stream_type: alsa::Direction,
249    ) -> Result<StreamInner, BuildStreamError> {
250        let handle_result = self
251            .handles
252            .lock()
253            .unwrap()
254            .take(&self.name, stream_type)
255            .map_err(|e| (e, e.errno()));
256
257        let handle = match handle_result {
258            Err((_, libc::EBUSY)) => return Err(BuildStreamError::DeviceNotAvailable),
259            Err((_, libc::EINVAL)) => return Err(BuildStreamError::InvalidArgument),
260            Err((e, _)) => return Err(e.into()),
261            Ok(handle) => handle,
262        };
263        let can_pause = set_hw_params_from_format(&handle, conf, sample_format)?;
264        let period_len = set_sw_params_from_format(&handle, conf, stream_type)?;
265
266        handle.prepare()?;
267
268        let num_descriptors = handle.count();
269        if num_descriptors == 0 {
270            let description = "poll descriptor count for stream was 0".to_string();
271            let err = BackendSpecificError { description };
272            return Err(err.into());
273        }
274
275        // Check to see if we can retrieve valid timestamps from the device.
276        // Related: https://bugs.freedesktop.org/show_bug.cgi?id=88503
277        let ts = handle.status()?.get_htstamp();
278        let creation_instant = match (ts.tv_sec, ts.tv_nsec) {
279            (0, 0) => Some(std::time::Instant::now()),
280            _ => None,
281        };
282
283        if let alsa::Direction::Capture = stream_type {
284            handle.start()?;
285        }
286
287        let stream_inner = StreamInner {
288            channel: handle,
289            sample_format,
290            num_descriptors,
291            conf: conf.clone(),
292            period_len,
293            can_pause,
294            creation_instant,
295        };
296
297        Ok(stream_inner)
298    }
299
300    #[inline]
301    fn name(&self) -> Result<String, DeviceNameError> {
302        Ok(self.name.clone())
303    }
304
305    fn supported_configs(
306        &self,
307        stream_t: alsa::Direction,
308    ) -> Result<VecIntoIter<SupportedStreamConfigRange>, SupportedStreamConfigsError> {
309        let mut guard = self.handles.lock().unwrap();
310        let handle_result = guard
311            .get_mut(&self.name, stream_t)
312            .map_err(|e| (e, e.errno()));
313
314        let handle = match handle_result {
315            Err((_, libc::ENOENT)) | Err((_, libc::EBUSY)) => {
316                return Err(SupportedStreamConfigsError::DeviceNotAvailable)
317            }
318            Err((_, libc::EINVAL)) => return Err(SupportedStreamConfigsError::InvalidArgument),
319            Err((e, _)) => return Err(e.into()),
320            Ok(handle) => handle,
321        };
322
323        let hw_params = alsa::pcm::HwParams::any(handle)?;
324
325        // TODO: check endianness
326        const FORMATS: [(SampleFormat, alsa::pcm::Format); 8] = [
327            (SampleFormat::I8, alsa::pcm::Format::S8),
328            (SampleFormat::U8, alsa::pcm::Format::U8),
329            (SampleFormat::I16, alsa::pcm::Format::S16LE),
330            //SND_PCM_FORMAT_S16_BE,
331            (SampleFormat::U16, alsa::pcm::Format::U16LE),
332            //SND_PCM_FORMAT_U16_BE,
333            //SND_PCM_FORMAT_S24_LE,
334            //SND_PCM_FORMAT_S24_BE,
335            //SND_PCM_FORMAT_U24_LE,
336            //SND_PCM_FORMAT_U24_BE,
337            (SampleFormat::I32, alsa::pcm::Format::S32LE),
338            //SND_PCM_FORMAT_S32_BE,
339            (SampleFormat::U32, alsa::pcm::Format::U32LE),
340            //SND_PCM_FORMAT_U32_BE,
341            (SampleFormat::F32, alsa::pcm::Format::FloatLE),
342            //SND_PCM_FORMAT_FLOAT_BE,
343            (SampleFormat::F64, alsa::pcm::Format::Float64LE),
344            //SND_PCM_FORMAT_FLOAT64_BE,
345            //SND_PCM_FORMAT_IEC958_SUBFRAME_LE,
346            //SND_PCM_FORMAT_IEC958_SUBFRAME_BE,
347            //SND_PCM_FORMAT_MU_LAW,
348            //SND_PCM_FORMAT_A_LAW,
349            //SND_PCM_FORMAT_IMA_ADPCM,
350            //SND_PCM_FORMAT_MPEG,
351            //SND_PCM_FORMAT_GSM,
352            //SND_PCM_FORMAT_SPECIAL,
353            //SND_PCM_FORMAT_S24_3LE,
354            //SND_PCM_FORMAT_S24_3BE,
355            //SND_PCM_FORMAT_U24_3LE,
356            //SND_PCM_FORMAT_U24_3BE,
357            //SND_PCM_FORMAT_S20_3LE,
358            //SND_PCM_FORMAT_S20_3BE,
359            //SND_PCM_FORMAT_U20_3LE,
360            //SND_PCM_FORMAT_U20_3BE,
361            //SND_PCM_FORMAT_S18_3LE,
362            //SND_PCM_FORMAT_S18_3BE,
363            //SND_PCM_FORMAT_U18_3LE,
364            //SND_PCM_FORMAT_U18_3BE,
365        ];
366
367        let mut supported_formats = Vec::new();
368        for &(sample_format, alsa_format) in FORMATS.iter() {
369            if hw_params.test_format(alsa_format).is_ok() {
370                supported_formats.push(sample_format);
371            }
372        }
373
374        let min_rate = hw_params.get_rate_min()?;
375        let max_rate = hw_params.get_rate_max()?;
376
377        let sample_rates = if min_rate == max_rate || hw_params.test_rate(min_rate + 1).is_ok() {
378            vec![(min_rate, max_rate)]
379        } else {
380            const RATES: [libc::c_uint; 13] = [
381                5512, 8000, 11025, 16000, 22050, 32000, 44100, 48000, 64000, 88200, 96000, 176400,
382                192000,
383            ];
384
385            let mut rates = Vec::new();
386            for &rate in RATES.iter() {
387                if hw_params.test_rate(rate).is_ok() {
388                    rates.push((rate, rate));
389                }
390            }
391
392            if rates.is_empty() {
393                vec![(min_rate, max_rate)]
394            } else {
395                rates
396            }
397        };
398
399        let min_channels = hw_params.get_channels_min()?;
400        let max_channels = hw_params.get_channels_max()?;
401
402        let max_channels = cmp::min(max_channels, 32); // TODO: limiting to 32 channels or too much stuff is returned
403        let supported_channels = (min_channels..max_channels + 1)
404            .filter_map(|num| {
405                if hw_params.test_channels(num).is_ok() {
406                    Some(num as ChannelCount)
407                } else {
408                    None
409                }
410            })
411            .collect::<Vec<_>>();
412
413        let min_buffer_size = hw_params.get_buffer_size_min()?;
414        let max_buffer_size = hw_params.get_buffer_size_max()?;
415
416        let buffer_size_range = SupportedBufferSize::Range {
417            min: min_buffer_size as u32,
418            max: max_buffer_size as u32,
419        };
420
421        let mut output = Vec::with_capacity(
422            supported_formats.len() * supported_channels.len() * sample_rates.len(),
423        );
424        for &sample_format in supported_formats.iter() {
425            for &channels in supported_channels.iter() {
426                for &(min_rate, max_rate) in sample_rates.iter() {
427                    output.push(SupportedStreamConfigRange {
428                        channels,
429                        min_sample_rate: SampleRate(min_rate as u32),
430                        max_sample_rate: SampleRate(max_rate as u32),
431                        buffer_size: buffer_size_range.clone(),
432                        sample_format,
433                    });
434                }
435            }
436        }
437
438        Ok(output.into_iter())
439    }
440
441    fn supported_input_configs(
442        &self,
443    ) -> Result<SupportedInputConfigs, SupportedStreamConfigsError> {
444        self.supported_configs(alsa::Direction::Capture)
445    }
446
447    fn supported_output_configs(
448        &self,
449    ) -> Result<SupportedOutputConfigs, SupportedStreamConfigsError> {
450        self.supported_configs(alsa::Direction::Playback)
451    }
452
453    // ALSA does not offer default stream formats, so instead we compare all supported formats by
454    // the `SupportedStreamConfigRange::cmp_default_heuristics` order and select the greatest.
455    fn default_config(
456        &self,
457        stream_t: alsa::Direction,
458    ) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
459        let mut formats: Vec<_> = {
460            match self.supported_configs(stream_t) {
461                Err(SupportedStreamConfigsError::DeviceNotAvailable) => {
462                    return Err(DefaultStreamConfigError::DeviceNotAvailable);
463                }
464                Err(SupportedStreamConfigsError::InvalidArgument) => {
465                    // this happens sometimes when querying for input and output capabilities, but
466                    // the device supports only one
467                    return Err(DefaultStreamConfigError::StreamTypeNotSupported);
468                }
469                Err(SupportedStreamConfigsError::BackendSpecific { err }) => {
470                    return Err(err.into());
471                }
472                Ok(fmts) => fmts.collect(),
473            }
474        };
475
476        formats.sort_by(|a, b| a.cmp_default_heuristics(b));
477
478        match formats.into_iter().last() {
479            Some(f) => {
480                let min_r = f.min_sample_rate;
481                let max_r = f.max_sample_rate;
482                let mut format = f.with_max_sample_rate();
483                const HZ_44100: SampleRate = SampleRate(44_100);
484                if min_r <= HZ_44100 && HZ_44100 <= max_r {
485                    format.sample_rate = HZ_44100;
486                }
487                Ok(format)
488            }
489            None => Err(DefaultStreamConfigError::StreamTypeNotSupported),
490        }
491    }
492
493    fn default_input_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
494        self.default_config(alsa::Direction::Capture)
495    }
496
497    fn default_output_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
498        self.default_config(alsa::Direction::Playback)
499    }
500}
501
502struct StreamInner {
503    // The ALSA channel.
504    channel: alsa::pcm::PCM,
505
506    // When converting between file descriptors and `snd_pcm_t`, this is the number of
507    // file descriptors that this `snd_pcm_t` uses.
508    num_descriptors: usize,
509
510    // Format of the samples.
511    sample_format: SampleFormat,
512
513    // The configuration used to open this stream.
514    conf: StreamConfig,
515
516    // Minimum number of samples to put in the buffer.
517    period_len: usize,
518
519    #[allow(dead_code)]
520    // Whether or not the hardware supports pausing the stream.
521    // TODO: We need an API to expose this. See #197, #284.
522    can_pause: bool,
523
524    // In the case that the device does not return valid timestamps via `get_htstamp`, this field
525    // will be `Some` and will contain an `Instant` representing the moment the stream was created.
526    //
527    // If this field is `Some`, then the stream will use the duration since this instant as a
528    // source for timestamps.
529    //
530    // If this field is `None` then the elapsed duration between `get_trigger_htstamp` and
531    // `get_htstamp` is used.
532    creation_instant: Option<std::time::Instant>,
533}
534
535// Assume that the ALSA library is built with thread safe option.
536unsafe impl Sync for StreamInner {}
537
538#[derive(Debug, Eq, PartialEq)]
539enum StreamType {
540    Input,
541    Output,
542}
543
544pub struct Stream {
545    /// The high-priority audio processing thread calling callbacks.
546    /// Option used for moving out in destructor.
547    thread: Option<JoinHandle<()>>,
548
549    /// Handle to the underlying stream for playback controls.
550    inner: Arc<StreamInner>,
551
552    /// Used to signal to stop processing.
553    trigger: TriggerSender,
554}
555
556struct StreamWorkerContext {
557    descriptors: Vec<libc::pollfd>,
558    buffer: Vec<u8>,
559    poll_timeout: i32,
560}
561
562impl StreamWorkerContext {
563    fn new(poll_timeout: &Option<Duration>) -> Self {
564        let poll_timeout: i32 = if let Some(d) = poll_timeout {
565            d.as_millis().try_into().unwrap()
566        } else {
567            -1
568        };
569
570        Self {
571            descriptors: Vec::new(),
572            buffer: Vec::new(),
573            poll_timeout,
574        }
575    }
576}
577
578fn input_stream_worker(
579    rx: TriggerReceiver,
580    stream: &StreamInner,
581    data_callback: &mut (dyn FnMut(&Data, &InputCallbackInfo) + Send + 'static),
582    error_callback: &mut (dyn FnMut(StreamError) + Send + 'static),
583    timeout: Option<Duration>,
584) {
585    let mut ctxt = StreamWorkerContext::new(&timeout);
586    loop {
587        let flow =
588            poll_descriptors_and_prepare_buffer(&rx, stream, &mut ctxt).unwrap_or_else(|err| {
589                error_callback(err.into());
590                PollDescriptorsFlow::Continue
591            });
592
593        match flow {
594            PollDescriptorsFlow::Continue => {
595                continue;
596            }
597            PollDescriptorsFlow::XRun => {
598                if let Err(err) = stream.channel.prepare() {
599                    error_callback(err.into());
600                }
601                continue;
602            }
603            PollDescriptorsFlow::Return => return,
604            PollDescriptorsFlow::Ready {
605                status,
606                avail_frames: _,
607                delay_frames,
608                stream_type,
609            } => {
610                assert_eq!(
611                    stream_type,
612                    StreamType::Input,
613                    "expected input stream, but polling descriptors indicated output",
614                );
615                if let Err(err) = process_input(
616                    stream,
617                    &mut ctxt.buffer,
618                    status,
619                    delay_frames,
620                    data_callback,
621                ) {
622                    error_callback(err.into());
623                }
624            }
625        }
626    }
627}
628
629fn output_stream_worker(
630    rx: TriggerReceiver,
631    stream: &StreamInner,
632    data_callback: &mut (dyn FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static),
633    error_callback: &mut (dyn FnMut(StreamError) + Send + 'static),
634    timeout: Option<Duration>,
635) {
636    let mut ctxt = StreamWorkerContext::new(&timeout);
637    loop {
638        let flow =
639            poll_descriptors_and_prepare_buffer(&rx, stream, &mut ctxt).unwrap_or_else(|err| {
640                error_callback(err.into());
641                PollDescriptorsFlow::Continue
642            });
643
644        match flow {
645            PollDescriptorsFlow::Continue => continue,
646            PollDescriptorsFlow::XRun => {
647                if let Err(err) = stream.channel.prepare() {
648                    error_callback(err.into());
649                }
650                continue;
651            }
652            PollDescriptorsFlow::Return => return,
653            PollDescriptorsFlow::Ready {
654                status,
655                avail_frames,
656                delay_frames,
657                stream_type,
658            } => {
659                assert_eq!(
660                    stream_type,
661                    StreamType::Output,
662                    "expected output stream, but polling descriptors indicated input",
663                );
664                if let Err(err) = process_output(
665                    stream,
666                    &mut ctxt.buffer,
667                    status,
668                    avail_frames,
669                    delay_frames,
670                    data_callback,
671                    error_callback,
672                ) {
673                    error_callback(err.into());
674                }
675            }
676        }
677    }
678}
679
680enum PollDescriptorsFlow {
681    Continue,
682    Return,
683    Ready {
684        stream_type: StreamType,
685        status: alsa::pcm::Status,
686        avail_frames: usize,
687        delay_frames: usize,
688    },
689    XRun,
690}
691
692// This block is shared between both input and output stream worker functions.
693fn poll_descriptors_and_prepare_buffer(
694    rx: &TriggerReceiver,
695    stream: &StreamInner,
696    ctxt: &mut StreamWorkerContext,
697) -> Result<PollDescriptorsFlow, BackendSpecificError> {
698    let StreamWorkerContext {
699        ref mut descriptors,
700        ref mut buffer,
701        ref poll_timeout,
702    } = *ctxt;
703
704    descriptors.clear();
705
706    // Add the self-pipe for signaling termination.
707    descriptors.push(libc::pollfd {
708        fd: rx.0,
709        events: libc::POLLIN,
710        revents: 0,
711    });
712
713    // Add ALSA polling fds.
714    let len = descriptors.len();
715    descriptors.resize(
716        stream.num_descriptors + len,
717        libc::pollfd {
718            fd: 0,
719            events: 0,
720            revents: 0,
721        },
722    );
723    let filled = stream.channel.fill(&mut descriptors[len..])?;
724    debug_assert_eq!(filled, stream.num_descriptors);
725
726    // Don't timeout, wait forever.
727    let res = alsa::poll::poll(descriptors, *poll_timeout)?;
728    if res == 0 {
729        let description = String::from("`alsa::poll()` spuriously returned");
730        return Err(BackendSpecificError { description });
731    }
732
733    if descriptors[0].revents != 0 {
734        // The stream has been requested to be destroyed.
735        rx.clear_pipe();
736        return Ok(PollDescriptorsFlow::Return);
737    }
738
739    let revents = stream.channel.revents(&descriptors[1..])?;
740    if revents.contains(alsa::poll::Flags::ERR) {
741        let description = String::from("`alsa::poll()` returned POLLERR");
742        return Err(BackendSpecificError { description });
743    }
744    let stream_type = match revents {
745        alsa::poll::Flags::OUT => StreamType::Output,
746        alsa::poll::Flags::IN => StreamType::Input,
747        _ => {
748            // Nothing to process, poll again
749            return Ok(PollDescriptorsFlow::Continue);
750        }
751    };
752
753    let status = stream.channel.status()?;
754    let avail_frames = match stream.channel.avail() {
755        Err(err) if err.errno() == libc::EPIPE => return Ok(PollDescriptorsFlow::XRun),
756        res => res,
757    }? as usize;
758    let delay_frames = match status.get_delay() {
759        // Buffer underrun. TODO: Notify the user.
760        d if d < 0 => 0,
761        d => d as usize,
762    };
763    let available_samples = avail_frames * stream.conf.channels as usize;
764
765    // Only go on if there is at least `stream.period_len` samples.
766    if available_samples < stream.period_len {
767        return Ok(PollDescriptorsFlow::Continue);
768    }
769
770    // Prepare the data buffer.
771    let buffer_size = stream.sample_format.sample_size() * available_samples;
772    buffer.resize(buffer_size, 0u8);
773
774    Ok(PollDescriptorsFlow::Ready {
775        stream_type,
776        status,
777        avail_frames,
778        delay_frames,
779    })
780}
781
782// Read input data from ALSA and deliver it to the user.
783fn process_input(
784    stream: &StreamInner,
785    buffer: &mut [u8],
786    status: alsa::pcm::Status,
787    delay_frames: usize,
788    data_callback: &mut (dyn FnMut(&Data, &InputCallbackInfo) + Send + 'static),
789) -> Result<(), BackendSpecificError> {
790    stream.channel.io_bytes().readi(buffer)?;
791    let sample_format = stream.sample_format;
792    let data = buffer.as_mut_ptr() as *mut ();
793    let len = buffer.len() / sample_format.sample_size();
794    let data = unsafe { Data::from_parts(data, len, sample_format) };
795    let callback = stream_timestamp(&status, stream.creation_instant)?;
796    let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate);
797    let capture = callback
798        .sub(delay_duration)
799        .expect("`capture` is earlier than representation supported by `StreamInstant`");
800    let timestamp = crate::InputStreamTimestamp { callback, capture };
801    let info = crate::InputCallbackInfo { timestamp };
802    data_callback(&data, &info);
803
804    Ok(())
805}
806
807// Request data from the user's function and write it via ALSA.
808//
809// Returns `true`
810fn process_output(
811    stream: &StreamInner,
812    buffer: &mut [u8],
813    status: alsa::pcm::Status,
814    available_frames: usize,
815    delay_frames: usize,
816    data_callback: &mut (dyn FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static),
817    error_callback: &mut dyn FnMut(StreamError),
818) -> Result<(), BackendSpecificError> {
819    {
820        // We're now sure that we're ready to write data.
821        let sample_format = stream.sample_format;
822        let data = buffer.as_mut_ptr() as *mut ();
823        let len = buffer.len() / sample_format.sample_size();
824        let mut data = unsafe { Data::from_parts(data, len, sample_format) };
825        let callback = stream_timestamp(&status, stream.creation_instant)?;
826        let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate);
827        let playback = callback
828            .add(delay_duration)
829            .expect("`playback` occurs beyond representation supported by `StreamInstant`");
830        let timestamp = crate::OutputStreamTimestamp { callback, playback };
831        let info = crate::OutputCallbackInfo { timestamp };
832        data_callback(&mut data, &info);
833    }
834    loop {
835        match stream.channel.io_bytes().writei(buffer) {
836            Err(err) if err.errno() == libc::EPIPE => {
837                // buffer underrun
838                // TODO: Notify the user of this.
839                let _ = stream.channel.try_recover(err, false);
840            }
841            Err(err) => {
842                error_callback(err.into());
843                continue;
844            }
845            Ok(result) if result != available_frames => {
846                let description = format!(
847                    "unexpected number of frames written: expected {}, \
848                     result {} (this should never happen)",
849                    available_frames, result,
850                );
851                error_callback(BackendSpecificError { description }.into());
852                continue;
853            }
854            _ => {
855                break;
856            }
857        }
858    }
859    Ok(())
860}
861
862// Use the elapsed duration since the start of the stream.
863//
864// This ensures positive values that are compatible with our `StreamInstant` representation.
865fn stream_timestamp(
866    status: &alsa::pcm::Status,
867    creation_instant: Option<std::time::Instant>,
868) -> Result<crate::StreamInstant, BackendSpecificError> {
869    match creation_instant {
870        None => {
871            let trigger_ts = status.get_trigger_htstamp();
872            let ts = status.get_htstamp();
873            let nanos = timespec_diff_nanos(ts, trigger_ts);
874            if nanos < 0 {
875                panic!(
876                    "get_htstamp `{}.{}` was earlier than get_trigger_htstamp `{}.{}`",
877                    ts.tv_sec, ts.tv_nsec, trigger_ts.tv_sec, trigger_ts.tv_nsec
878                );
879            }
880            Ok(crate::StreamInstant::from_nanos(nanos))
881        }
882        Some(creation) => {
883            let now = std::time::Instant::now();
884            let duration = now.duration_since(creation);
885            let instant = crate::StreamInstant::from_nanos_i128(duration.as_nanos() as i128)
886                .expect("stream duration has exceeded `StreamInstant` representation");
887            Ok(instant)
888        }
889    }
890}
891
892// Adapted from `timestamp2ns` here:
893// https://fossies.org/linux/alsa-lib/test/audio_time.c
894fn timespec_to_nanos(ts: libc::timespec) -> i64 {
895    ts.tv_sec as i64 * 1_000_000_000 + ts.tv_nsec as i64
896}
897
898// Adapted from `timediff` here:
899// https://fossies.org/linux/alsa-lib/test/audio_time.c
900fn timespec_diff_nanos(a: libc::timespec, b: libc::timespec) -> i64 {
901    timespec_to_nanos(a) - timespec_to_nanos(b)
902}
903
904// Convert the given duration in frames at the given sample rate to a `std::time::Duration`.
905fn frames_to_duration(frames: usize, rate: crate::SampleRate) -> std::time::Duration {
906    let secsf = frames as f64 / rate.0 as f64;
907    let secs = secsf as u64;
908    let nanos = ((secsf - secs as f64) * 1_000_000_000.0) as u32;
909    std::time::Duration::new(secs, nanos)
910}
911
912impl Stream {
913    fn new_input<D, E>(
914        inner: Arc<StreamInner>,
915        mut data_callback: D,
916        mut error_callback: E,
917        timeout: Option<Duration>,
918    ) -> Stream
919    where
920        D: FnMut(&Data, &InputCallbackInfo) + Send + 'static,
921        E: FnMut(StreamError) + Send + 'static,
922    {
923        let (tx, rx) = trigger();
924        // Clone the handle for passing into worker thread.
925        let stream = inner.clone();
926        let thread = thread::Builder::new()
927            .name("cpal_alsa_in".to_owned())
928            .spawn(move || {
929                input_stream_worker(
930                    rx,
931                    &stream,
932                    &mut data_callback,
933                    &mut error_callback,
934                    timeout,
935                );
936            })
937            .unwrap();
938        Stream {
939            thread: Some(thread),
940            inner,
941            trigger: tx,
942        }
943    }
944
945    fn new_output<D, E>(
946        inner: Arc<StreamInner>,
947        mut data_callback: D,
948        mut error_callback: E,
949        timeout: Option<Duration>,
950    ) -> Stream
951    where
952        D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static,
953        E: FnMut(StreamError) + Send + 'static,
954    {
955        let (tx, rx) = trigger();
956        // Clone the handle for passing into worker thread.
957        let stream = inner.clone();
958        let thread = thread::Builder::new()
959            .name("cpal_alsa_out".to_owned())
960            .spawn(move || {
961                output_stream_worker(
962                    rx,
963                    &stream,
964                    &mut data_callback,
965                    &mut error_callback,
966                    timeout,
967                );
968            })
969            .unwrap();
970        Stream {
971            thread: Some(thread),
972            inner,
973            trigger: tx,
974        }
975    }
976}
977
978impl Drop for Stream {
979    fn drop(&mut self) {
980        self.trigger.wakeup();
981        self.thread.take().unwrap().join().unwrap();
982    }
983}
984
985impl StreamTrait for Stream {
986    fn play(&self) -> Result<(), PlayStreamError> {
987        self.inner.channel.pause(false).ok();
988        Ok(())
989    }
990    fn pause(&self) -> Result<(), PauseStreamError> {
991        self.inner.channel.pause(true).ok();
992        Ok(())
993    }
994}
995
996fn set_hw_params_from_format(
997    pcm_handle: &alsa::pcm::PCM,
998    config: &StreamConfig,
999    sample_format: SampleFormat,
1000) -> Result<bool, BackendSpecificError> {
1001    let hw_params = alsa::pcm::HwParams::any(pcm_handle)?;
1002    hw_params.set_access(alsa::pcm::Access::RWInterleaved)?;
1003
1004    let sample_format = if cfg!(target_endian = "big") {
1005        match sample_format {
1006            SampleFormat::I8 => alsa::pcm::Format::S8,
1007            SampleFormat::I16 => alsa::pcm::Format::S16BE,
1008            // SampleFormat::I24 => alsa::pcm::Format::S24BE,
1009            SampleFormat::I32 => alsa::pcm::Format::S32BE,
1010            // SampleFormat::I48 => alsa::pcm::Format::S48BE,
1011            // SampleFormat::I64 => alsa::pcm::Format::S64BE,
1012            SampleFormat::U8 => alsa::pcm::Format::U8,
1013            SampleFormat::U16 => alsa::pcm::Format::U16BE,
1014            // SampleFormat::U24 => alsa::pcm::Format::U24BE,
1015            SampleFormat::U32 => alsa::pcm::Format::U32BE,
1016            // SampleFormat::U48 => alsa::pcm::Format::U48BE,
1017            // SampleFormat::U64 => alsa::pcm::Format::U64BE,
1018            SampleFormat::F32 => alsa::pcm::Format::FloatBE,
1019            SampleFormat::F64 => alsa::pcm::Format::Float64BE,
1020            sample_format => {
1021                return Err(BackendSpecificError {
1022                    description: format!(
1023                        "Sample format '{}' is not supported by this backend",
1024                        sample_format
1025                    ),
1026                })
1027            }
1028        }
1029    } else {
1030        match sample_format {
1031            SampleFormat::I8 => alsa::pcm::Format::S8,
1032            SampleFormat::I16 => alsa::pcm::Format::S16LE,
1033            // SampleFormat::I24 => alsa::pcm::Format::S24LE,
1034            SampleFormat::I32 => alsa::pcm::Format::S32LE,
1035            // SampleFormat::I48 => alsa::pcm::Format::S48LE,
1036            // SampleFormat::I64 => alsa::pcm::Format::S64LE,
1037            SampleFormat::U8 => alsa::pcm::Format::U8,
1038            SampleFormat::U16 => alsa::pcm::Format::U16LE,
1039            // SampleFormat::U24 => alsa::pcm::Format::U24LE,
1040            SampleFormat::U32 => alsa::pcm::Format::U32LE,
1041            // SampleFormat::U48 => alsa::pcm::Format::U48LE,
1042            // SampleFormat::U64 => alsa::pcm::Format::U64LE,
1043            SampleFormat::F32 => alsa::pcm::Format::FloatLE,
1044            SampleFormat::F64 => alsa::pcm::Format::Float64LE,
1045            sample_format => {
1046                return Err(BackendSpecificError {
1047                    description: format!(
1048                        "Sample format '{}' is not supported by this backend",
1049                        sample_format
1050                    ),
1051                })
1052            }
1053        }
1054    };
1055
1056    hw_params.set_format(sample_format)?;
1057    hw_params.set_rate(config.sample_rate.0, alsa::ValueOr::Nearest)?;
1058    hw_params.set_channels(config.channels as u32)?;
1059
1060    match config.buffer_size {
1061        BufferSize::Fixed(v) => {
1062            hw_params.set_period_size_near((v / 4) as alsa::pcm::Frames, alsa::ValueOr::Nearest)?;
1063            hw_params.set_buffer_size(v as alsa::pcm::Frames)?;
1064        }
1065        BufferSize::Default => {
1066            // These values together represent a moderate latency and wakeup interval.
1067            // Without them, we are at the mercy of the device
1068            hw_params.set_period_time_near(25_000, alsa::ValueOr::Nearest)?;
1069            hw_params.set_buffer_time_near(100_000, alsa::ValueOr::Nearest)?;
1070        }
1071    }
1072
1073    pcm_handle.hw_params(&hw_params)?;
1074
1075    Ok(hw_params.can_pause())
1076}
1077
1078fn set_sw_params_from_format(
1079    pcm_handle: &alsa::pcm::PCM,
1080    config: &StreamConfig,
1081    stream_type: alsa::Direction,
1082) -> Result<usize, BackendSpecificError> {
1083    let sw_params = pcm_handle.sw_params_current()?;
1084
1085    let period_len = {
1086        let (buffer, period) = pcm_handle.get_params()?;
1087        if buffer == 0 {
1088            return Err(BackendSpecificError {
1089                description: "initialization resulted in a null buffer".to_string(),
1090            });
1091        }
1092        sw_params.set_avail_min(period as alsa::pcm::Frames)?;
1093
1094        let start_threshold = match stream_type {
1095            alsa::Direction::Playback => buffer - period,
1096
1097            // For capture streams, the start threshold is irrelevant and ignored,
1098            // because build_stream_inner() starts the stream before process_input()
1099            // reads from it. Set it anyway I guess, since it's better than leaving
1100            // it at an unspecified default value.
1101            alsa::Direction::Capture => 1,
1102        };
1103        sw_params.set_start_threshold(start_threshold.try_into().unwrap())?;
1104
1105        period as usize * config.channels as usize
1106    };
1107
1108    sw_params.set_tstamp_mode(true)?;
1109    sw_params.set_tstamp_type(alsa::pcm::TstampType::MonotonicRaw)?;
1110
1111    // tstamp_type param cannot be changed after the device is opened.
1112    // The default tstamp_type value on most Linux systems is "monotonic",
1113    // let's try to use it if setting the tstamp_type fails.
1114    if pcm_handle.sw_params(&sw_params).is_err() {
1115        sw_params.set_tstamp_type(alsa::pcm::TstampType::Monotonic)?;
1116        pcm_handle.sw_params(&sw_params)?;
1117    }
1118
1119    Ok(period_len)
1120}
1121
1122impl From<alsa::Error> for BackendSpecificError {
1123    fn from(err: alsa::Error) -> Self {
1124        BackendSpecificError {
1125            description: err.to_string(),
1126        }
1127    }
1128}
1129
1130impl From<alsa::Error> for BuildStreamError {
1131    fn from(err: alsa::Error) -> Self {
1132        let err: BackendSpecificError = err.into();
1133        err.into()
1134    }
1135}
1136
1137impl From<alsa::Error> for SupportedStreamConfigsError {
1138    fn from(err: alsa::Error) -> Self {
1139        let err: BackendSpecificError = err.into();
1140        err.into()
1141    }
1142}
1143
1144impl From<alsa::Error> for PlayStreamError {
1145    fn from(err: alsa::Error) -> Self {
1146        let err: BackendSpecificError = err.into();
1147        err.into()
1148    }
1149}
1150
1151impl From<alsa::Error> for PauseStreamError {
1152    fn from(err: alsa::Error) -> Self {
1153        let err: BackendSpecificError = err.into();
1154        err.into()
1155    }
1156}
1157
1158impl From<alsa::Error> for StreamError {
1159    fn from(err: alsa::Error) -> Self {
1160        let err: BackendSpecificError = err.into();
1161        err.into()
1162    }
1163}