Bug 1567457 - Update cubeb-pulse-rs to version 3a748a2. r=kinetik
☠☠ backed out by 5394db272ad1 ☠ ☠
authorPaul Adenot <paul@paul.cx>
Fri, 19 Jul 2019 13:36:44 +0000
changeset 547243 35503df3d3e8ddf5c6abdc295703178590b58691
parent 547242 02c56b8d3530ed03584810aecb28f34f4ef1496a
child 547244 ce5d76c8be1b2dca90f9e512c19d98b64c754fe0
push id2165
push userffxbld-merge
push dateMon, 14 Oct 2019 16:30:58 +0000
treeherdermozilla-release@0eae18af659f [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewerskinetik
bugs1567457
milestone70.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1567457 - Update cubeb-pulse-rs to version 3a748a2. r=kinetik Reviewed upstream by :achronop, :kinetik, :chunmin in: https://github.com/djg/cubeb-pulse-rs/pull/41 https://github.com/djg/cubeb-pulse-rs/pull/42 Differential Revision: https://phabricator.services.mozilla.com/D38661
media/libcubeb/cubeb-pulse-rs/Cargo.toml
media/libcubeb/cubeb-pulse-rs/README_MOZILLA
media/libcubeb/cubeb-pulse-rs/pulse-ffi/src/ffi_funcs.rs
media/libcubeb/cubeb-pulse-rs/pulse-rs/src/context.rs
media/libcubeb/cubeb-pulse-rs/pulse-rs/src/lib.rs
media/libcubeb/cubeb-pulse-rs/pulse-rs/src/stream.rs
media/libcubeb/cubeb-pulse-rs/src/backend/stream.rs
media/libcubeb/cubeb-pulse-rs/src/lib.rs
--- a/media/libcubeb/cubeb-pulse-rs/Cargo.toml
+++ b/media/libcubeb/cubeb-pulse-rs/Cargo.toml
@@ -10,8 +10,9 @@ pulse-dlopen = ["pulse-ffi/dlopen"]
 [lib]
 crate-type = ["staticlib", "rlib"]
 
 [dependencies]
 cubeb-backend = "0.5"
 pulse-ffi = { path = "pulse-ffi" }
 pulse = { path = "pulse-rs" }
 semver = "^0.6"
+ringbuf = "0.1"
--- a/media/libcubeb/cubeb-pulse-rs/README_MOZILLA
+++ b/media/libcubeb/cubeb-pulse-rs/README_MOZILLA
@@ -1,8 +1,8 @@
 The source from this directory was copied from the cubeb-pulse-rs
 git repository using the update.sh script.  The only changes
 made were those applied by update.sh and the addition of
 Makefile.in build files for the Mozilla build system.
 
 The cubeb-pulse-rs git repository is: https://github.com/djg/cubeb-pulse-rs.git
 
-The git commit ID used was 17c1629c323ff24d656ff9449bf50d6758aafc1a (2019-01-24 07:50:09 +1300)
+The git commit ID used was 3a748a2df25658f1c8c5a475b9dd2ae4561b174b (2019-07-19 21:44:07 +1200)
--- a/media/libcubeb/cubeb-pulse-rs/pulse-ffi/src/ffi_funcs.rs
+++ b/media/libcubeb/cubeb-pulse-rs/pulse-ffi/src/ffi_funcs.rs
@@ -79,16 +79,17 @@ mod static_fns {
         pub fn pa_context_ref(c: *mut pa_context) -> *mut pa_context;
         pub fn pa_context_unref(c: *mut pa_context);
         pub fn pa_cvolume_set(a: *mut pa_cvolume, channels: c_uint, v: pa_volume_t) -> *mut pa_cvolume;
         pub fn pa_cvolume_set_balance(v: *mut pa_cvolume,
                                       map: *const pa_channel_map,
                                       new_balance: c_float)
                                       -> *mut pa_cvolume;
         pub fn pa_frame_size(spec: *const pa_sample_spec) -> usize;
+        pub fn pa_sample_size(spec: *const pa_sample_spec) -> usize;
         pub fn pa_mainloop_api_once(m: *mut pa_mainloop_api,
                                     callback: pa_mainloop_api_once_cb_t,
                                     userdata: *mut c_void);
         pub fn pa_strerror(error: pa_error_code_t) -> *const c_char;
         pub fn pa_operation_ref(o: *mut pa_operation) -> *mut pa_operation;
         pub fn pa_operation_unref(o: *mut pa_operation);
         pub fn pa_operation_cancel(o: *mut pa_operation);
         pub fn pa_operation_get_state(o: *const pa_operation) -> pa_operation_state_t;
@@ -360,16 +361,23 @@ mod dynamic_fns {
             };
             PA_FRAME_SIZE = {
                 let fp = dlsym(h, cstr!("pa_frame_size"));
                 if fp.is_null() {
                     return None;
                 }
                 fp
             };
+            PA_SAMPLE_SIZE = {
+                let fp = dlsym(h, cstr!("pa_sample_size"));
+                if fp.is_null() {
+                    return None;
+                }
+                fp
+            };
             PA_MAINLOOP_API_ONCE = {
                 let fp = dlsym(h, cstr!("pa_mainloop_api_once"));
                 if fp.is_null() {
                     return None;
                 }
                 fp
             };
             PA_STRERROR = {
@@ -994,16 +1002,22 @@ mod dynamic_fns {
     }
 
     static mut PA_FRAME_SIZE: *mut ::libc::c_void = 0 as *mut _;
     #[inline]
     pub unsafe fn pa_frame_size(spec: *const pa_sample_spec) -> usize {
         (::std::mem::transmute::<_, extern "C" fn(*const pa_sample_spec) -> usize>(PA_FRAME_SIZE))(spec)
     }
 
+    static mut PA_SAMPLE_SIZE: *mut ::libc::c_void = 0 as *mut _;
+    #[inline]
+    pub unsafe fn pa_sample_size(spec: *const pa_sample_spec) -> usize {
+        (::std::mem::transmute::<_, extern "C" fn(*const pa_sample_spec) -> usize>(PA_SAMPLE_SIZE))(spec)
+    }
+
     static mut PA_MAINLOOP_API_ONCE: *mut ::libc::c_void = 0 as *mut _;
     #[inline]
     pub unsafe fn pa_mainloop_api_once(m: *mut pa_mainloop_api,
                                        callback: pa_mainloop_api_once_cb_t,
                                        userdata: *mut c_void) {
         (::std::mem::transmute::<_,
                                  extern "C" fn(*mut pa_mainloop_api,
                                                pa_mainloop_api_once_cb_t,
--- a/media/libcubeb/cubeb-pulse-rs/pulse-rs/src/context.rs
+++ b/media/libcubeb/cubeb-pulse-rs/pulse-rs/src/context.rs
@@ -90,17 +90,17 @@ impl Context {
         unsafe {
             ffi::pa_context_set_state_callback(self.raw_mut(), None, ptr::null_mut());
         }
     }
 
     pub fn set_state_callback<CB>(&self, _: CB, userdata: *mut c_void)
         where CB: Fn(&Context, *mut c_void)
     {
-        debug_assert_eq!(::std::mem::size_of::<CB>(), 0);
+        assert_eq!(::std::mem::size_of::<CB>(), 0);
 
         // See: A note about `wrapped` functions
         unsafe extern "C" fn wrapped<F>(c: *mut ffi::pa_context, userdata: *mut c_void)
             where F: Fn(&Context, *mut c_void)
         {
             use std::mem::{forget, uninitialized};
             let ctx = context::from_raw_ptr(c);
             let result = uninitialized::<F>()(&ctx, userdata);
@@ -141,17 +141,17 @@ impl Context {
             ffi::pa_context_disconnect(self.raw_mut());
         }
     }
 
 
     pub fn drain<CB>(&self, _: CB, userdata: *mut c_void) -> Result<Operation>
         where CB: Fn(&Context, *mut c_void)
     {
-        debug_assert_eq!(::std::mem::size_of::<CB>(), 0);
+        assert_eq!(::std::mem::size_of::<CB>(), 0);
 
         // See: A note about `wrapped` functions
         unsafe extern "C" fn wrapped<F>(c: *mut ffi::pa_context, userdata: *mut c_void)
             where F: Fn(&Context, *mut c_void)
         {
             use std::mem::{forget, uninitialized};
             let ctx = context::from_raw_ptr(c);
             let result = uninitialized::<F>()(&ctx, userdata);
@@ -162,17 +162,17 @@ impl Context {
 
         op_or_err!(self,
                    ffi::pa_context_drain(self.raw_mut(), Some(wrapped::<CB>), userdata))
     }
 
     pub fn rttime_new<CB>(&self, usec: USec, _: CB, userdata: *mut c_void) -> *mut ffi::pa_time_event
         where CB: Fn(&MainloopApi, *mut ffi::pa_time_event, &TimeVal, *mut c_void)
     {
-        debug_assert_eq!(::std::mem::size_of::<CB>(), 0);
+        assert_eq!(::std::mem::size_of::<CB>(), 0);
 
         // See: A note about `wrapped` functions
         unsafe extern "C" fn wrapped<F>(a: *mut ffi::pa_mainloop_api,
                                         e: *mut ffi::pa_time_event,
                                         tv: *const TimeVal,
                                         userdata: *mut c_void)
             where F: Fn(&MainloopApi, *mut ffi::pa_time_event, &TimeVal, *mut c_void)
         {
@@ -186,17 +186,17 @@ impl Context {
         }
 
         unsafe { ffi::pa_context_rttime_new(self.raw_mut(), usec, Some(wrapped::<CB>), userdata) }
     }
 
     pub fn get_server_info<CB>(&self, _: CB, userdata: *mut c_void) -> Result<Operation>
         where CB: Fn(&Context, Option<&ServerInfo>, *mut c_void)
     {
-        debug_assert_eq!(::std::mem::size_of::<CB>(), 0);
+        assert_eq!(::std::mem::size_of::<CB>(), 0);
 
         // See: A note about `wrapped` functions
         unsafe extern "C" fn wrapped<F>(c: *mut ffi::pa_context, i: *const ffi::pa_server_info, userdata: *mut c_void)
             where F: Fn(&Context, Option<&ServerInfo>, *mut c_void)
         {
             use std::mem::{forget, uninitialized};
             let info = if i.is_null() {
                 None
@@ -214,17 +214,17 @@ impl Context {
                    ffi::pa_context_get_server_info(self.raw_mut(), Some(wrapped::<CB>), userdata))
     }
 
     pub fn get_sink_info_by_name<'str, CS, CB>(&self, name: CS, _: CB, userdata: *mut c_void) -> Result<Operation>
     where
         CB: Fn(&Context, *const SinkInfo, i32, *mut c_void),
         CS: Into<Option<&'str CStr>>,
     {
-        debug_assert_eq!(::std::mem::size_of::<CB>(), 0);
+        assert_eq!(::std::mem::size_of::<CB>(), 0);
 
         // See: A note about `wrapped` functions
         unsafe extern "C" fn wrapped<F>(c: *mut ffi::pa_context,
                                         info: *const ffi::pa_sink_info,
                                         eol: c_int,
                                         userdata: *mut c_void)
             where F: Fn(&Context, *const SinkInfo, i32, *mut c_void)
         {
@@ -241,17 +241,17 @@ impl Context {
                                                         name.into().unwrap_cstr(),
                                                         Some(wrapped::<CB>),
                                                         userdata))
     }
 
     pub fn get_sink_info_list<CB>(&self, _: CB, userdata: *mut c_void) -> Result<Operation>
         where CB: Fn(&Context, *const SinkInfo, i32, *mut c_void)
     {
-        debug_assert_eq!(::std::mem::size_of::<CB>(), 0);
+        assert_eq!(::std::mem::size_of::<CB>(), 0);
 
         // See: A note about `wrapped` functions
         unsafe extern "C" fn wrapped<F>(c: *mut ffi::pa_context,
                                         info: *const ffi::pa_sink_info,
                                         eol: c_int,
                                         userdata: *mut c_void)
             where F: Fn(&Context, *const SinkInfo, i32, *mut c_void)
         {
@@ -265,17 +265,17 @@ impl Context {
 
         op_or_err!(self,
                    ffi::pa_context_get_sink_info_list(self.raw_mut(), Some(wrapped::<CB>), userdata))
     }
 
     pub fn get_sink_input_info<CB>(&self, idx: u32, _: CB, userdata: *mut c_void) -> Result<Operation>
         where CB: Fn(&Context, *const SinkInputInfo, i32, *mut c_void)
     {
-        debug_assert_eq!(::std::mem::size_of::<CB>(), 0);
+        assert_eq!(::std::mem::size_of::<CB>(), 0);
 
         // See: A note about `wrapped` functions
         unsafe extern "C" fn wrapped<F>(c: *mut ffi::pa_context,
                                         info: *const ffi::pa_sink_input_info,
                                         eol: c_int,
                                         userdata: *mut c_void)
             where F: Fn(&Context, *const SinkInputInfo, i32, *mut c_void)
         {
@@ -289,17 +289,17 @@ impl Context {
 
         op_or_err!(self,
                    ffi::pa_context_get_sink_input_info(self.raw_mut(), idx, Some(wrapped::<CB>), userdata))
     }
 
     pub fn get_source_info_list<CB>(&self, _: CB, userdata: *mut c_void) -> Result<Operation>
         where CB: Fn(&Context, *const SourceInfo, i32, *mut c_void)
     {
-        debug_assert_eq!(::std::mem::size_of::<CB>(), 0);
+        assert_eq!(::std::mem::size_of::<CB>(), 0);
 
         // See: A note about `wrapped` functions
         unsafe extern "C" fn wrapped<F>(c: *mut ffi::pa_context,
                                         info: *const ffi::pa_source_info,
                                         eol: c_int,
                                         userdata: *mut c_void)
             where F: Fn(&Context, *const SourceInfo, i32, *mut c_void)
         {
@@ -318,17 +318,17 @@ impl Context {
     pub fn set_sink_input_volume<CB>(&self,
                                      idx: u32,
                                      volume: &CVolume,
                                      _: CB,
                                      userdata: *mut c_void)
                                      -> Result<Operation>
         where CB: Fn(&Context, i32, *mut c_void)
     {
-        debug_assert_eq!(::std::mem::size_of::<CB>(), 0);
+        assert_eq!(::std::mem::size_of::<CB>(), 0);
 
         // See: A note about `wrapped` functions
         unsafe extern "C" fn wrapped<F>(c: *mut ffi::pa_context, success: c_int, userdata: *mut c_void)
             where F: Fn(&Context, i32, *mut c_void)
         {
             use std::mem::{forget, uninitialized};
             let ctx = context::from_raw_ptr(c);
             let result = uninitialized::<F>()(&ctx, success, userdata);
@@ -339,17 +339,17 @@ impl Context {
 
         op_or_err!(self,
                    ffi::pa_context_set_sink_input_volume(self.raw_mut(), idx, volume, Some(wrapped::<CB>), userdata))
     }
 
     pub fn subscribe<CB>(&self, m: SubscriptionMask, _: CB, userdata: *mut c_void) -> Result<Operation>
         where CB: Fn(&Context, i32, *mut c_void)
     {
-        debug_assert_eq!(::std::mem::size_of::<CB>(), 0);
+        assert_eq!(::std::mem::size_of::<CB>(), 0);
 
         // See: A note about `wrapped` functions
         unsafe extern "C" fn wrapped<F>(c: *mut ffi::pa_context, success: c_int, userdata: *mut c_void)
             where F: Fn(&Context, i32, *mut c_void)
         {
             use std::mem::{forget, uninitialized};
             let ctx = context::from_raw_ptr(c);
             let result = uninitialized::<F>()(&ctx, success, userdata);
@@ -366,17 +366,17 @@ impl Context {
         unsafe {
             ffi::pa_context_set_subscribe_callback(self.raw_mut(), None, ptr::null_mut());
         }
     }
 
     pub fn set_subscribe_callback<CB>(&self, _: CB, userdata: *mut c_void)
         where CB: Fn(&Context, SubscriptionEvent, u32, *mut c_void)
     {
-        debug_assert_eq!(::std::mem::size_of::<CB>(), 0);
+        assert_eq!(::std::mem::size_of::<CB>(), 0);
 
         // See: A note about `wrapped` functions
         unsafe extern "C" fn wrapped<F>(c: *mut ffi::pa_context,
                                         t: ffi::pa_subscription_event_type_t,
                                         idx: u32,
                                         userdata: *mut c_void)
             where F: Fn(&Context, SubscriptionEvent, u32, *mut c_void)
         {
--- a/media/libcubeb/cubeb-pulse-rs/pulse-rs/src/lib.rs
+++ b/media/libcubeb/cubeb-pulse-rs/pulse-rs/src/lib.rs
@@ -634,22 +634,26 @@ impl ProplistExt for SinkInfo {
 impl ProplistExt for SourceInfo {
     fn proplist(&self) -> Proplist {
         unsafe { proplist::from_raw_ptr(self.proplist) }
     }
 }
 
 pub trait SampleSpecExt {
     fn frame_size(&self) -> usize;
+    fn sample_size(&self) -> usize;
 }
 
 impl SampleSpecExt for SampleSpec {
     fn frame_size(&self) -> usize {
         unsafe { ffi::pa_frame_size(self) }
     }
+    fn sample_size(&self) -> usize {
+        unsafe { ffi::pa_sample_size(self) }
+    }
 }
 
 pub trait USecExt {
     fn to_bytes(self, spec: &SampleSpec) -> usize;
 }
 
 impl USecExt for USec {
     fn to_bytes(self, spec: &SampleSpec) -> usize {
--- a/media/libcubeb/cubeb-pulse-rs/pulse-rs/src/stream.rs
+++ b/media/libcubeb/cubeb-pulse-rs/pulse-rs/src/stream.rs
@@ -179,17 +179,17 @@ impl Stream {
             return Err(ErrorCode::from_error_code(err));
         }
         Ok(r)
     }
 
     pub fn update_timing_info<CB>(&self, _: CB, userdata: *mut c_void) -> Result<Operation>
         where CB: Fn(&Stream, i32, *mut c_void)
     {
-        debug_assert_eq!(mem::size_of::<CB>(), 0);
+        assert_eq!(mem::size_of::<CB>(), 0);
 
         // See: A note about `wrapped` functions
         unsafe extern "C" fn wrapped<F>(s: *mut ffi::pa_stream, success: c_int, userdata: *mut c_void)
             where F: Fn(&Stream, i32, *mut c_void)
         {
             use std::mem::{forget, uninitialized};
             let mut stm = stream::from_raw_ptr(s);
             let result = uninitialized::<F>()(&mut stm, success, userdata);
@@ -214,17 +214,17 @@ impl Stream {
         unsafe {
             ffi::pa_stream_set_state_callback(self.raw_mut(), None, ptr::null_mut());
         }
     }
 
     pub fn set_state_callback<CB>(&self, _: CB, userdata: *mut c_void)
         where CB: Fn(&Stream, *mut c_void)
     {
-        debug_assert_eq!(mem::size_of::<CB>(), 0);
+        assert_eq!(mem::size_of::<CB>(), 0);
 
         // See: A note about `wrapped` functions
         unsafe extern "C" fn wrapped<F>(s: *mut ffi::pa_stream, userdata: *mut c_void)
             where F: Fn(&Stream, *mut c_void)
         {
             use std::mem::{forget, uninitialized};
             let mut stm = stream::from_raw_ptr(s);
             let result = uninitialized::<F>()(&mut stm, userdata);
@@ -242,17 +242,17 @@ impl Stream {
         unsafe {
             ffi::pa_stream_set_write_callback(self.raw_mut(), None, ptr::null_mut());
         }
     }
 
     pub fn set_write_callback<CB>(&self, _: CB, userdata: *mut c_void)
         where CB: Fn(&Stream, usize, *mut c_void)
     {
-        debug_assert_eq!(mem::size_of::<CB>(), 0);
+        assert_eq!(mem::size_of::<CB>(), 0);
 
         // See: A note about `wrapped` functions
         unsafe extern "C" fn wrapped<F>(s: *mut ffi::pa_stream, nbytes: usize, userdata: *mut c_void)
             where F: Fn(&Stream, usize, *mut c_void)
         {
             use std::mem::{forget, uninitialized};
             let mut stm = stream::from_raw_ptr(s);
             let result = uninitialized::<F>()(&mut stm, nbytes, userdata);
@@ -270,17 +270,17 @@ impl Stream {
         unsafe {
             ffi::pa_stream_set_read_callback(self.raw_mut(), None, ptr::null_mut());
         }
     }
 
     pub fn set_read_callback<CB>(&self, _: CB, userdata: *mut c_void)
         where CB: Fn(&Stream, usize, *mut c_void)
     {
-        debug_assert_eq!(mem::size_of::<CB>(), 0);
+        assert_eq!(mem::size_of::<CB>(), 0);
 
         // See: A note about `wrapped` functions
         unsafe extern "C" fn wrapped<F>(s: *mut ffi::pa_stream, nbytes: usize, userdata: *mut c_void)
             where F: Fn(&Stream, usize, *mut c_void)
         {
             use std::mem::{forget, uninitialized};
             let mut stm = stream::from_raw_ptr(s);
             let result = uninitialized::<F>()(&mut stm, nbytes, userdata);
@@ -292,17 +292,17 @@ impl Stream {
         unsafe {
             ffi::pa_stream_set_read_callback(self.raw_mut(), Some(wrapped::<CB>), userdata);
         }
     }
 
     pub fn cork<CB>(&self, b: i32, _: CB, userdata: *mut c_void) -> Result<Operation>
         where CB: Fn(&Stream, i32, *mut c_void)
     {
-        debug_assert_eq!(mem::size_of::<CB>(), 0);
+        assert_eq!(mem::size_of::<CB>(), 0);
 
         // See: A note about `wrapped` functions
         unsafe extern "C" fn wrapped<F>(s: *mut ffi::pa_stream, success: c_int, userdata: *mut c_void)
             where F: Fn(&Stream, i32, *mut c_void)
         {
             use std::mem::{forget, uninitialized};
             let mut stm = stream::from_raw_ptr(s);
             let result = uninitialized::<F>()(&mut stm, success, userdata);
--- a/media/libcubeb/cubeb-pulse-rs/src/backend/stream.rs
+++ b/media/libcubeb/cubeb-pulse-rs/src/backend/stream.rs
@@ -7,18 +7,30 @@ use backend::*;
 use backend::cork_state::CorkState;
 use cubeb_backend::{ffi, log_enabled, ChannelLayout, DeviceId, DeviceRef, Error, Result,
                     SampleFormat, StreamOps, StreamParamsRef, StreamPrefs};
 use pulse::{self, CVolumeExt, ChannelMapExt, SampleSpecExt, StreamLatency, USecExt};
 use pulse_ffi::*;
 use std::{mem, ptr};
 use std::ffi::{CStr, CString};
 use std::os::raw::{c_long, c_void};
+use std::slice;
+use ringbuf::RingBuffer;
+
+use self::RingBufferConsumer::*;
+use self::RingBufferProducer::*;
+use self::LinearInputBuffer::*;
 
 const PULSE_NO_GAIN: f32 = -1.0;
+// When running duplex callbacks, the input data is fed to a ring buffer, and then later copied to
+// a linear piece of memory is used to hold the input samples, so that they are passed to the audio
+// callback that delivers it to the callees. Their size depends on the buffer size requested
+// initially.  This is to be tuned when changing tlength and fragsize, but this value works for
+// now.
+const INPUT_BUFFER_CAPACITY: usize = 4096;
 
 /// Iterator interface to `ChannelLayout`.
 ///
 /// Iterates each channel in the set represented by `ChannelLayout`.
 struct ChannelLayoutIter {
     /// The layout set being iterated
     layout: ChannelLayout,
     /// The next flag to test
@@ -107,31 +119,167 @@ impl Drop for Device {
             }
             if !self.0.output_name.is_null() {
                 let _ = CString::from_raw(self.0.output_name as *mut _);
             }
         }
     }
 }
 
+
+enum RingBufferConsumer {
+    IntegerRingBufferConsumer(ringbuf::Consumer<i16>),
+    FloatRingBufferConsumer(ringbuf::Consumer<f32>)
+}
+
+enum RingBufferProducer {
+    IntegerRingBufferProducer(ringbuf::Producer<i16>),
+    FloatRingBufferProducer(ringbuf::Producer<f32>)
+}
+
+enum LinearInputBuffer {
+    IntegerLinearInputBuffer(Vec<i16>),
+    FloatLinearInputBuffer(Vec<f32>)
+}
+
+struct BufferManager {
+    consumer: RingBufferConsumer,
+    producer: RingBufferProducer,
+    linear_input_buffer: LinearInputBuffer
+}
+
+impl BufferManager {
+    // When opening a duplex stream, the sample-spec are guaranteed to match. It's ok to have
+    // either the input or output sample-spec here.
+    fn new(sample_spec: &pulse::SampleSpec) -> BufferManager {
+        if sample_spec.format == PA_SAMPLE_S16BE ||
+           sample_spec.format == PA_SAMPLE_S16LE  {
+                let ring = RingBuffer::<i16>::new(INPUT_BUFFER_CAPACITY);
+                let (prod, cons) = ring.split();
+                return BufferManager {
+                    producer: IntegerRingBufferProducer(prod),
+                    consumer: IntegerRingBufferConsumer(cons),
+                    linear_input_buffer: IntegerLinearInputBuffer(Vec::<i16>::with_capacity(INPUT_BUFFER_CAPACITY))
+                };
+            } else {
+                let ring = RingBuffer::<f32>::new(INPUT_BUFFER_CAPACITY);
+                let (prod, cons) = ring.split();
+                return BufferManager {
+                    producer: FloatRingBufferProducer(prod),
+                    consumer: FloatRingBufferConsumer(cons),
+                    linear_input_buffer: FloatLinearInputBuffer(Vec::<f32>::with_capacity(INPUT_BUFFER_CAPACITY))
+                };
+            }
+    }
+
+    fn push_input_data(&mut self, input_data: *const c_void, read_samples: usize) {
+        match &mut self.producer {
+            RingBufferProducer::FloatRingBufferProducer(p) => {
+                let input_data = unsafe { slice::from_raw_parts::<f32>(input_data as *const f32, read_samples) };
+                match p.push_slice(input_data) {
+                    Ok(_) => { }
+                    Err(_) => {
+                        // do nothing: the data are ignored. This happens when underruning the
+                        // output callback.
+                    }
+                }
+            }
+            RingBufferProducer::IntegerRingBufferProducer(p) => {
+                let input_data = unsafe { slice::from_raw_parts::<i16>(input_data as *const i16, read_samples) };
+                match p.push_slice(input_data) {
+                    Ok(_) => { }
+                    Err(_) => {
+                        // do nothing: the data are ignored. This happens when underruning the
+                        // output callback.
+                    }
+                }
+            }
+        }
+    }
+
+    fn pull_input_data(&mut self, input_data: *mut c_void, needed_samples: usize) {
+        match &mut self.consumer {
+            IntegerRingBufferConsumer(p) => {
+                let mut input: &mut[i16] = unsafe { slice::from_raw_parts_mut::<i16>(input_data as *mut i16, needed_samples) };
+                match p.pop_slice(&mut input) {
+                    Ok(read) => {
+                        if read < needed_samples {
+                            for i in 0..(needed_samples - read) {
+                                input[read + i] = 0;
+                            }
+                        }
+                    }
+                    Err(_) => {
+                        // Buffer empty
+                        for i in input.iter_mut() {
+                            *i = 0;
+                        }
+                    }
+                }
+            }
+            FloatRingBufferConsumer(p) => {
+                let mut input: &mut[f32] = unsafe { slice::from_raw_parts_mut::<f32>(input_data as *mut f32, needed_samples) };
+                match p.pop_slice(&mut input) {
+                    Ok(read) => {
+                        if read < needed_samples {
+                            for i in 0..(needed_samples - read) {
+                                input[read + i] = 0.;
+                            }
+                        }
+                    }
+                    Err(_) => {
+                        // Buffer empty
+                        for i in input.iter_mut() {
+                            *i = 0.;
+                        }
+                    }
+                }
+            }
+        }
+    }
+    fn get_linear_input_data(&mut self, nsamples: usize) -> *const c_void {
+        let p: *mut c_void;
+        match &mut self.linear_input_buffer {
+            LinearInputBuffer::IntegerLinearInputBuffer(b) => {
+                b.resize(nsamples, 0);
+                p = b.as_mut_ptr() as *mut c_void;
+            }
+            LinearInputBuffer::FloatLinearInputBuffer(b) => {
+                b.resize(nsamples, 0.);
+                p = b.as_mut_ptr() as *mut c_void;
+            }
+        }
+        self.pull_input_data(p, nsamples);
+
+        return p;
+    }
+}
+
+impl std::fmt::Debug for BufferManager {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "")
+    }
+}
+
 #[repr(C)]
 #[derive(Debug)]
 pub struct PulseStream<'ctx> {
     context: &'ctx PulseContext,
     user_ptr: *mut c_void,
     output_stream: Option<pulse::Stream>,
     input_stream: Option<pulse::Stream>,
     data_callback: ffi::cubeb_data_callback,
     state_callback: ffi::cubeb_state_callback,
     drain_timer: *mut pa_time_event,
     output_sample_spec: pulse::SampleSpec,
     input_sample_spec: pulse::SampleSpec,
     shutdown: bool,
     volume: f32,
     state: ffi::cubeb_state,
+    input_buffer_manager: Option<BufferManager>
 }
 
 impl<'ctx> PulseStream<'ctx> {
     #[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
     pub fn new(
         context: &'ctx PulseContext,
         stream_name: Option<&CStr>,
         input_device: DeviceId,
@@ -172,23 +320,21 @@ impl<'ctx> PulseStream<'ctx> {
 
             let mut read_data: *const c_void = ptr::null();
             let mut read_size: usize = 0;
             while read_from_input(s, &mut read_data, &mut read_size) > 0 {
                 /* read_data can be NULL in case of a hole. */
                 if !read_data.is_null() {
                     let in_frame_size = stm.input_sample_spec.frame_size();
                     let read_frames = read_size / in_frame_size;
+                    let read_samples = read_size / stm.input_sample_spec.sample_size();
 
                     if stm.output_stream.is_some() {
-                        // input/capture + output/playback operation
-                        let out_frame_size = stm.output_sample_spec.frame_size();
-                        let write_size = read_frames * out_frame_size;
-                        // Offer full duplex data for writing
-                        stm.trigger_user_callback(read_data, write_size);
+                        // duplex stream: push the input data to the ring buffer.
+                        stm.input_buffer_manager.as_mut().unwrap().push_input_data(read_data, read_samples);
                     } else {
                         // input/capture only operation. Call callback directly
                         let got = unsafe {
                             stm.data_callback.unwrap()(
                                 stm as *mut _ as *mut _,
                                 stm.user_ptr,
                                 read_data,
                                 ptr::null_mut(),
@@ -216,17 +362,21 @@ impl<'ctx> PulseStream<'ctx> {
 
         fn write_data(_: &pulse::Stream, nbytes: usize, u: *mut c_void) {
             cubeb_logv!("Output callback to be written buffer size {}", nbytes);
             let stm = unsafe { &mut *(u as *mut PulseStream) };
             if stm.shutdown || stm.state != ffi::CUBEB_STATE_STARTED {
                 return;
             }
 
-            if stm.input_stream.is_none() {
+            if stm.input_stream.is_some() {
+                let nsamples = nbytes / stm.output_sample_spec.sample_size();
+                let p = stm.input_buffer_manager.as_mut().unwrap().get_linear_input_data(nsamples);
+                stm.trigger_user_callback(p, nbytes);
+            } else {
                 // Output/playback only operation.
                 // Write directly to output
                 debug_assert!(stm.output_stream.is_some());
                 stm.trigger_user_callback(ptr::null(), nbytes);
             }
         }
 
         let mut stm = Box::new(PulseStream {
@@ -237,16 +387,17 @@ impl<'ctx> PulseStream<'ctx> {
             state_callback: state_callback,
             user_ptr: user_ptr,
             drain_timer: ptr::null_mut(),
             output_sample_spec: pulse::SampleSpec::default(),
             input_sample_spec: pulse::SampleSpec::default(),
             shutdown: false,
             volume: PULSE_NO_GAIN,
             state: ffi::CUBEB_STATE_ERROR,
+            input_buffer_manager: None
         });
 
         if let Some(ref context) = stm.context.context {
             stm.context.mainloop.lock();
 
             // Setup output stream
             if let Some(stream_params) = output_stream_params {
                 match PulseStream::stream_init(context, stream_params, stream_name) {
@@ -305,16 +456,21 @@ impl<'ctx> PulseStream<'ctx> {
                     Err(e) => {
                         stm.context.mainloop.unlock();
                         stm.destroy();
                         return Err(e);
                     }
                 }
             }
 
+            // Duplex, set up the ringbuffer
+            if input_stream_params.is_some() && output_stream_params.is_some() {
+                stm.input_buffer_manager = Some(BufferManager::new(&stm.input_sample_spec))
+            }
+
             let r = if stm.wait_until_ready() {
                 /* force a timing update now, otherwise timing info does not become valid
                 until some point after initialization has completed. */
                 stm.update_timing_info()
             } else {
                 false
             };
 
@@ -401,20 +557,20 @@ impl<'ctx> StreamOps for PulseStream<'ct
                     .map_or(0, |s| s.writable_size().unwrap_or(0));
                 stm.trigger_user_callback(ptr::null_mut(), size);
             }
         }
 
         self.shutdown = false;
         self.cork(CorkState::uncork() | CorkState::notify());
 
-        if self.output_stream.is_some() && self.input_stream.is_none() {
-            /* On output only case need to manually call user cb once in order to make
-             * things roll. This is done via a defer event in order to execute it
-             * from PA server thread. */
+        if self.output_stream.is_some() {
+            /* When doing output-only or duplex, we need to manually call user cb once in order to
+             * make things roll. This is done via a defer event in order to execute it from PA
+             * server thread. */
             self.context.mainloop.lock();
             self.context
                 .mainloop
                 .get_api()
                 .once(output_preroll, self as *const _ as *mut _);
             self.context.mainloop.unlock();
         }
 
@@ -792,16 +948,17 @@ impl<'ctx> PulseStream<'ctx> {
             if !wait_until_io_stream_ready(stm, &self.context.mainloop) {
                 return false;
             }
         }
 
         true
     }
 
+
     #[cfg_attr(feature = "cargo-clippy", allow(cyclomatic_complexity))]
     fn trigger_user_callback(&mut self, input_data: *const c_void, nbytes: usize) {
         fn drained_cb(
             a: &pulse::MainloopApi,
             e: *mut pa_time_event,
             _tv: &pulse::TimeVal,
             u: *mut c_void,
         ) {
@@ -936,16 +1093,17 @@ fn context_success(_: &pulse::Context, s
     let ctx = unsafe { &*(u as *mut PulseContext) };
     if success != 1 {
         cubeb_log!("context_success ignored failure: {}", success);
     }
     ctx.mainloop.signal();
 }
 
 fn set_buffering_attribute(latency_frames: u32, sample_spec: &pa_sample_spec) -> pa_buffer_attr {
+    // When changing this, change the constant INPUT_BUFFER_CAPACITY to reflect the new sizes.
     let tlength = latency_frames * sample_spec.frame_size() as u32;
     let minreq = tlength / 4;
     let battr = pa_buffer_attr {
         maxlength: u32::max_value(),
         prebuf: u32::max_value(),
         tlength: tlength,
         minreq: minreq,
         fragsize: minreq,
--- a/media/libcubeb/cubeb-pulse-rs/src/lib.rs
+++ b/media/libcubeb/cubeb-pulse-rs/src/lib.rs
@@ -5,13 +5,14 @@
 // This program is made available under an ISC-style license.  See the
 // accompanying file LICENSE for details.
 
 #[macro_use]
 extern crate cubeb_backend;
 extern crate pulse;
 extern crate pulse_ffi;
 extern crate semver;
+extern crate ringbuf;
 
 mod capi;
 mod backend;
 
 pub use capi::pulse_rust_init;