Bug 1501148 - Refactor AudioIPC to make way for multiple OS backends. r=chunmin
authorMatthew Gregan <kinetik@flim.org>
Tue, 23 Oct 2018 16:46:52 +1300
changeset 490751 74f6186ded6d62b5cfd170cc099acf9d11419382
parent 490750 69e4a961b4e4b21548dba0c2f3a80e216258fb50
child 490752 f7525008369c91098d956634894662ffa185de22
child 490850 6e96c7ec0d1187c1b488dd4ba645df9cfd68ec16
push id247
push userfmarier@mozilla.com
push dateSat, 27 Oct 2018 01:06:44 +0000
reviewerschunmin
bugs1501148
milestone65.0a1
Bug 1501148 - Refactor AudioIPC to make way for multiple OS backends. r=chunmin
media/audioipc/README_MOZILLA
media/audioipc/audioipc/src/async.rs
media/audioipc/audioipc/src/cmsg.rs
media/audioipc/audioipc/src/core.rs
media/audioipc/audioipc/src/errors.rs
media/audioipc/audioipc/src/fd_passing.rs
media/audioipc/audioipc/src/frame.rs
media/audioipc/audioipc/src/lib.rs
media/audioipc/audioipc/src/msg.rs
media/audioipc/audioipc/src/rpc/client/mod.rs
media/audioipc/audioipc/src/rpc/client/proxy.rs
media/audioipc/audioipc/src/rpc/mod.rs
media/audioipc/audioipc/src/rpc/server.rs
media/audioipc/audioipc/src/shm.rs
media/audioipc/client/src/context.rs
media/audioipc/client/src/lib.rs
media/audioipc/client/src/send_recv.rs
media/audioipc/client/src/stream.rs
media/audioipc/server/src/lib.rs
media/audioipc/server/src/server.rs
--- a/media/audioipc/README_MOZILLA
+++ b/media/audioipc/README_MOZILLA
@@ -1,8 +1,8 @@
 The source from this directory was copied from the audioipc-2
 git repository using the update.sh script.  The only changes
 made were those applied by update.sh and the addition of
 Makefile.in build files for the Mozilla build system.
 
 The audioipc-2 git repository is: https://github.com/djg/audioipc-2.git
 
-The git commit ID used was 709eeb98ce93d949f05f7ecd8a7f15162b44dfad (2018-10-23 16:40:20 +1300)
+The git commit ID used was 572d6a6a16501cde726dcc09604a0cbc895d93e3 (2018-10-23 16:43:12 +1300)
--- a/media/audioipc/audioipc/src/async.rs
+++ b/media/audioipc/audioipc/src/async.rs
@@ -1,19 +1,19 @@
 // Copyright © 2017 Mozilla Foundation
 //
 // This program is made available under an ISC-style license.  See the
 // accompanying file LICENSE for details
 
 //! Various async helpers modelled after futures-rs and tokio-io.
 
-use {RecvMsg, SendMsg};
 use bytes::{Buf, BufMut};
 use futures::{Async, Poll};
 use iovec::IoVec;
+use msg::{RecvMsg, SendMsg};
 use std::io;
 use tokio_io::{AsyncRead, AsyncWrite};
 use tokio_uds::UnixStream;
 
 pub trait AsyncRecvMsg: AsyncRead {
     /// Pull some bytes from this source into the specified `Buf`, returning
     /// how many bytes were read.
     ///
@@ -134,17 +134,17 @@ impl AsyncSendMsg for UnixStream {
         }
         let r = {
             // The `IoVec` type can't have a zero-length size, so create a dummy
             // version from a 1-length slice which we'll overwrite with the
             // `bytes_vec` method.
             static DUMMY: &[u8] = &[0];
             let nom = <&IoVec>::from(DUMMY);
             let mut bufs = [
-                nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom
+                nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom,
             ];
             let n = buf.bytes_vec(&mut bufs);
             self.send_msg(&bufs[..n], cmsg.bytes())
         };
         match r {
             Ok(n) => {
                 buf.advance(n);
                 Ok(Async::Ready(n))
--- a/media/audioipc/audioipc/src/cmsg.rs
+++ b/media/audioipc/audioipc/src/cmsg.rs
@@ -1,17 +1,17 @@
 // Copyright © 2017 Mozilla Foundation
 //
 // This program is made available under an ISC-style license.  See the
 // accompanying file LICENSE for details
 
 use bytes::{BufMut, Bytes, BytesMut};
 use libc::{self, cmsghdr};
+use std::os::unix::io::RawFd;
 use std::{convert, mem, ops, slice};
-use std::os::unix::io::RawFd;
 
 #[derive(Clone, Debug)]
 pub struct Fds {
     fds: Bytes,
 }
 
 impl convert::AsRef<[RawFd]> for Fds {
     fn as_ref(&self) -> &[RawFd] {
--- a/media/audioipc/audioipc/src/core.rs
+++ b/media/audioipc/audioipc/src/core.rs
@@ -1,14 +1,19 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license.  See the
+// accompanying file LICENSE for details.
+
 // Ease accessing reactor::Core handles.
 
+use futures::sync::oneshot;
 use futures::{Future, IntoFuture};
-use futures::sync::oneshot;
+use std::sync::mpsc;
 use std::{fmt, io, thread};
-use std::sync::mpsc;
 use tokio_core::reactor::{Core, Handle, Remote};
 
 scoped_thread_local! {
     static HANDLE: Handle
 }
 
 pub fn handle() -> Handle {
     HANDLE.with(|handle| handle.clone())
--- a/media/audioipc/audioipc/src/errors.rs
+++ b/media/audioipc/audioipc/src/errors.rs
@@ -1,8 +1,13 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license.  See the
+// accompanying file LICENSE for details.
+
 use bincode;
 use cubeb;
 use std;
 
 error_chain! {
     // Maybe replace with chain_err to improve the error info.
     foreign_links {
         Bincode(bincode::Error);
--- a/media/audioipc/audioipc/src/fd_passing.rs
+++ b/media/audioipc/audioipc/src/fd_passing.rs
@@ -5,19 +5,19 @@
 
 use async::{AsyncRecvMsg, AsyncSendMsg};
 use bytes::{Bytes, BytesMut, IntoBuf};
 use cmsg;
 use codec::Codec;
 use futures::{AsyncSink, Poll, Sink, StartSend, Stream};
 use libc;
 use messages::AssocRawFd;
-use std::{fmt, io, mem};
 use std::collections::VecDeque;
 use std::os::unix::io::RawFd;
+use std::{fmt, io, mem};
 
 const INITIAL_CAPACITY: usize = 1024;
 const BACKPRESSURE_THRESHOLD: usize = 4 * INITIAL_CAPACITY;
 const FDS_CAPACITY: usize = 16;
 
 struct IncomingFds {
     cmsg: BytesMut,
     recv_fds: Option<cmsg::ControlMsgIter>,
@@ -29,17 +29,18 @@ impl IncomingFds {
         IncomingFds {
             cmsg: BytesMut::with_capacity(capacity),
             recv_fds: None,
         }
     }
 
     pub fn take_fds(&mut self) -> Option<[RawFd; 3]> {
         loop {
-            let fds = self.recv_fds
+            let fds = self
+                .recv_fds
                 .as_mut()
                 .and_then(|recv_fds| recv_fds.next())
                 .and_then(|fds| Some(clone_into_array(&fds)));
 
             if fds.is_some() {
                 return fds;
             }
 
@@ -288,20 +289,16 @@ pub fn framed_with_fds<A, C>(io: A, code
         frames: VecDeque::new(),
         write_buf: BytesMut::with_capacity(INITIAL_CAPACITY),
         outgoing_fds: BytesMut::with_capacity(
             FDS_CAPACITY * cmsg::space(mem::size_of::<[RawFd; 3]>()),
         ),
     }
 }
 
-fn write_zero() -> io::Error {
-    io::Error::new(io::ErrorKind::WriteZero, "failed to write frame to io")
-}
-
 fn clone_into_array<A, T>(slice: &[T]) -> A
 where
     A: Sized + Default + AsMut<[T]>,
     T: Clone,
 {
     let mut a = Default::default();
     <A as AsMut<[T]>>::as_mut(&mut a).clone_from_slice(slice);
     a
--- a/media/audioipc/audioipc/src/frame.rs
+++ b/media/audioipc/audioipc/src/frame.rs
@@ -142,20 +142,16 @@ where
     }
 
     fn close(&mut self) -> Poll<(), Self::SinkError> {
         try_ready!(self.poll_complete());
         self.io.shutdown()
     }
 }
 
-fn write_zero() -> io::Error {
-    io::Error::new(io::ErrorKind::WriteZero, "failed to write frame to io")
-}
-
 pub fn framed<A, C>(io: A, codec: C) -> Framed<A, C> {
     Framed {
         io: io,
         codec: codec,
         read_buf: BytesMut::with_capacity(INITIAL_CAPACITY),
         write_buf: BytesMut::with_capacity(INITIAL_CAPACITY),
         frame: None,
         is_readable: false,
--- a/media/audioipc/audioipc/src/lib.rs
+++ b/media/audioipc/audioipc/src/lib.rs
@@ -1,13 +1,13 @@
 // Copyright © 2017 Mozilla Foundation
 //
 // This program is made available under an ISC-style license.  See the
 // accompanying file LICENSE for details
-#![allow(dead_code)] // TODO: Remove.
+
 #![recursion_limit = "1024"]
 #[macro_use]
 extern crate error_chain;
 
 #[macro_use]
 extern crate log;
 
 #[macro_use]
@@ -24,116 +24,37 @@ extern crate memmap;
 #[macro_use]
 extern crate scoped_tls;
 extern crate serde;
 extern crate tokio_core;
 #[macro_use]
 extern crate tokio_io;
 extern crate tokio_uds;
 
-pub mod async;
-pub mod cmsg;
+mod async;
+mod cmsg;
 pub mod codec;
+pub mod core;
 pub mod errors;
 pub mod fd_passing;
 pub mod frame;
-pub mod rpc;
-pub mod core;
 pub mod messages;
 mod msg;
+pub mod rpc;
 pub mod shm;
 
-use iovec::IoVec;
-#[cfg(target_os = "linux")]
-use libc::MSG_CMSG_CLOEXEC;
 pub use messages::{ClientMessage, ServerMessage};
 use std::env::temp_dir;
-use std::io;
-use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
 use std::path::PathBuf;
-#[cfg(not(target_os = "linux"))]
-const MSG_CMSG_CLOEXEC: libc::c_int = 0;
 
 // This must match the definition of
 // ipc::FileDescriptor::PlatformHandleType in Gecko.
 #[cfg(target_os = "windows")]
 pub type PlatformHandleType = *mut std::os::raw::c_void;
 #[cfg(not(target_os = "windows"))]
 pub type PlatformHandleType = libc::c_int;
 
-// Extend sys::os::unix::net::UnixStream to support sending and receiving a single file desc.
-// We can extend UnixStream by using traits, eliminating the need to introduce a new wrapped
-// UnixStream type.
-pub trait RecvMsg {
-    fn recv_msg(
-        &mut self,
-        iov: &mut [&mut IoVec],
-        cmsg: &mut [u8],
-    ) -> io::Result<(usize, usize, i32)>;
-}
-
-pub trait SendMsg {
-    fn send_msg(&mut self, iov: &[&IoVec], cmsg: &[u8]) -> io::Result<usize>;
-}
-
-impl<T: AsRawFd> RecvMsg for T {
-    fn recv_msg(
-        &mut self,
-        iov: &mut [&mut IoVec],
-        cmsg: &mut [u8],
-    ) -> io::Result<(usize, usize, i32)> {
-        msg::recv_msg_with_flags(self.as_raw_fd(), iov, cmsg, MSG_CMSG_CLOEXEC)
-    }
-}
-
-impl<T: AsRawFd> SendMsg for T {
-    fn send_msg(&mut self, iov: &[&IoVec], cmsg: &[u8]) -> io::Result<usize> {
-        msg::send_msg_with_flags(self.as_raw_fd(), iov, cmsg, 0)
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-#[derive(Debug)]
-pub struct AutoCloseFd(RawFd);
-
-impl Drop for AutoCloseFd {
-    fn drop(&mut self) {
-        unsafe {
-            libc::close(self.0);
-        }
-    }
-}
-
-impl FromRawFd for AutoCloseFd {
-    unsafe fn from_raw_fd(fd: RawFd) -> Self {
-        AutoCloseFd(fd)
-    }
-}
-
-impl IntoRawFd for AutoCloseFd {
-    fn into_raw_fd(self) -> RawFd {
-        let fd = self.0;
-        ::std::mem::forget(self);
-        fd
-    }
-}
-
-impl AsRawFd for AutoCloseFd {
-    fn as_raw_fd(&self) -> RawFd {
-        self.0
-    }
-}
-
-impl<'a> AsRawFd for &'a AutoCloseFd {
-    fn as_raw_fd(&self) -> RawFd {
-        self.0
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
 pub fn get_shm_path(dir: &str) -> PathBuf {
     let pid = unsafe { libc::getpid() };
     let mut temp = temp_dir();
     temp.push(&format!("cubeb-shm-{}-{}", pid, dir));
     temp
 }
--- a/media/audioipc/audioipc/src/msg.rs
+++ b/media/audioipc/audioipc/src/msg.rs
@@ -1,13 +1,53 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license.  See the
+// accompanying file LICENSE for details.
+
+use iovec::unix as iovec;
 use iovec::IoVec;
-use iovec::unix as iovec;
 use libc;
+use std::os::unix::io::{AsRawFd, RawFd};
 use std::{cmp, io, mem, ptr};
-use std::os::unix::io::RawFd;
+
+// Extend sys::os::unix::net::UnixStream to support sending and receiving a single file desc.
+// We can extend UnixStream by using traits, eliminating the need to introduce a new wrapped
+// UnixStream type.
+pub trait RecvMsg {
+    fn recv_msg(
+        &mut self,
+        iov: &mut [&mut IoVec],
+        cmsg: &mut [u8],
+    ) -> io::Result<(usize, usize, i32)>;
+}
+
+pub trait SendMsg {
+    fn send_msg(&mut self, iov: &[&IoVec], cmsg: &[u8]) -> io::Result<usize>;
+}
+
+impl<T: AsRawFd> RecvMsg for T {
+    fn recv_msg(
+        &mut self,
+        iov: &mut [&mut IoVec],
+        cmsg: &mut [u8],
+    ) -> io::Result<(usize, usize, i32)> {
+        #[cfg(target_os = "linux")]
+        let flags = libc::MSG_CMSG_CLOEXEC;
+        #[cfg(not(target_os = "linux"))]
+        let flags = 0;
+        recv_msg_with_flags(self.as_raw_fd(), iov, cmsg, flags)
+    }
+}
+
+impl<T: AsRawFd> SendMsg for T {
+    fn send_msg(&mut self, iov: &[&IoVec], cmsg: &[u8]) -> io::Result<usize> {
+        send_msg_with_flags(self.as_raw_fd(), iov, cmsg, 0)
+    }
+}
 
 fn cvt(r: libc::ssize_t) -> io::Result<usize> {
     if r == -1 {
         Err(io::Error::last_os_error())
     } else {
         Ok(r as usize)
     }
 }
--- a/media/audioipc/audioipc/src/rpc/client/mod.rs
+++ b/media/audioipc/audioipc/src/rpc/client/mod.rs
@@ -34,20 +34,20 @@
 // TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
 // PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
 // SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
 // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
 // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
 // DEALINGS IN THE SOFTWARE.
 
+use futures::sync::oneshot;
 use futures::{Async, Future, Poll, Sink, Stream};
-use futures::sync::oneshot;
+use rpc::driver::Driver;
 use rpc::Handler;
-use rpc::driver::Driver;
 use std::collections::VecDeque;
 use std::io;
 use tokio_core::reactor::Handle;
 
 mod proxy;
 
 pub use self::proxy::{ClientProxy, Response};
 
--- a/media/audioipc/audioipc/src/rpc/client/proxy.rs
+++ b/media/audioipc/audioipc/src/rpc/client/proxy.rs
@@ -38,18 +38,18 @@
 // TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
 // PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
 // SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
 // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
 // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
 // DEALINGS IN THE SOFTWARE.
 
+use futures::sync::{mpsc, oneshot};
 use futures::{Async, Future, Poll};
-use futures::sync::{mpsc, oneshot};
 use std::fmt;
 use std::io;
 
 /// Message used to dispatch requests to the task managing the
 /// client connection.
 pub type Request<R, Q> = (R, oneshot::Sender<Q>);
 
 /// Receive requests submitted to the client
--- a/media/audioipc/audioipc/src/rpc/mod.rs
+++ b/media/audioipc/audioipc/src/rpc/mod.rs
@@ -1,18 +1,18 @@
 // Copyright © 2017 Mozilla Foundation
 //
 // This program is made available under an ISC-style license.  See the
 // accompanying file LICENSE for details
 
 use futures::{Poll, Sink, Stream};
 use std::io;
 
+mod client;
 mod driver;
-mod client;
 mod server;
 
 pub use self::client::{bind_client, Client, ClientProxy, Response};
 pub use self::server::{bind_server, Server};
 
 pub trait Handler {
     /// Message type read from transport
     type In;
--- a/media/audioipc/audioipc/src/rpc/server.rs
+++ b/media/audioipc/audioipc/src/rpc/server.rs
@@ -35,18 +35,18 @@
 // PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
 // SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
 // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
 // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
 // DEALINGS IN THE SOFTWARE.
 
 use futures::{Async, Future, Poll, Sink, Stream};
+use rpc::driver::Driver;
 use rpc::Handler;
-use rpc::driver::Driver;
 use std::collections::VecDeque;
 use std::io;
 use tokio_core::reactor::Handle;
 
 /// Bind an async I/O object `io` to the `server`.
 pub fn bind_server<S>(transport: S::Transport, server: S, handle: &Handle)
 where
     S: Server,
--- a/media/audioipc/audioipc/src/shm.rs
+++ b/media/audioipc/audioipc/src/shm.rs
@@ -1,8 +1,13 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license.  See the
+// accompanying file LICENSE for details.
+
 use errors::*;
 use memmap::{Mmap, MmapViewSync, Protection};
 use std::fs::{remove_file, File, OpenOptions};
 use std::path::Path;
 use std::sync::atomic;
 
 pub struct SharedMemReader {
     mmap: Mmap,
--- a/media/audioipc/client/src/context.rs
+++ b/media/audioipc/client/src/context.rs
@@ -1,34 +1,36 @@
 // Copyright © 2017 Mozilla Foundation
 //
 // This program is made available under an ISC-style license.  See the
 // accompanying file LICENSE for details
 
-use {ClientStream, CPUPOOL_INIT_PARAMS, G_SERVER_FD};
 use assert_not_in_callback;
-use audioipc::{messages, ClientMessage, ServerMessage};
-use audioipc::{core, rpc};
 use audioipc::codec::LengthDelimitedCodec;
 use audioipc::fd_passing::{framed_with_fds, FramedWithFds};
-use cubeb_backend::{ffi, Context, ContextOps, DeviceCollectionRef, DeviceId, DeviceType, Error,
-                    Ops, Result, Stream, StreamParams, StreamParamsRef};
+use audioipc::{core, rpc};
+use audioipc::{messages, ClientMessage, ServerMessage};
+use cubeb_backend::{
+    ffi, Context, ContextOps, DeviceCollectionRef, DeviceId, DeviceType, Error, Ops, Result,
+    Stream, StreamParams, StreamParamsRef,
+};
 use futures::Future;
 use futures_cpupool::{self, CpuPool};
 use libc;
-use std::{fmt, io, mem, ptr};
 use std::ffi::{CStr, CString};
 use std::os::raw::c_void;
 use std::os::unix::io::FromRawFd;
 use std::os::unix::net;
 use std::sync::mpsc;
 use std::thread;
+use std::{fmt, io, mem, ptr};
 use stream;
 use tokio_core::reactor::{Handle, Remote};
 use tokio_uds::UnixStream;
+use {ClientStream, CPUPOOL_INIT_PARAMS, G_SERVER_FD};
 
 struct CubebClient;
 
 impl rpc::Client for CubebClient {
     type Request = ServerMessage;
     type Response = ClientMessage;
     type Transport = FramedWithFds<UnixStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
 }
@@ -95,19 +97,17 @@ impl ContextOps for ClientContext {
             let _ = tx_rpc.send(rpc);
             Some(())
         }
 
         assert_not_in_callback();
 
         let (tx_rpc, rx_rpc) = mpsc::channel();
 
-        let params = CPUPOOL_INIT_PARAMS.with(|p| {
-            p.replace(None).unwrap()
-        });
+        let params = CPUPOOL_INIT_PARAMS.with(|p| p.replace(None).unwrap());
 
         let thread_create_callback = params.thread_create_callback;
 
         let register_thread = move || {
             if let Some(func) = thread_create_callback {
                 let thr = thread::current();
                 let name = CString::new(thr.name().unwrap()).unwrap();
                 func(name.as_ptr());
@@ -129,21 +129,21 @@ impl ContextOps for ClientContext {
                         "Failed to open stream and create rpc.",
                     )
                 })
         }));
 
         let rpc = t!(rx_rpc.recv());
 
         let cpupool = futures_cpupool::Builder::new()
-                .name_prefix("AudioIPC")
-                .after_start(register_thread)
-                .pool_size(params.pool_size)
-                .stack_size(params.stack_size)
-                .create();
+            .name_prefix("AudioIPC")
+            .after_start(register_thread)
+            .pool_size(params.pool_size)
+            .stack_size(params.stack_size)
+            .create();
 
         let ctx = Box::new(ClientContext {
             _ops: &CLIENT_OPS as *const _,
             rpc: rpc,
             core: core,
             cpu_pool: cpupool,
         });
         Ok(unsafe { Context::from_ptr(Box::into_raw(ctx) as *mut _) })
--- a/media/audioipc/client/src/lib.rs
+++ b/media/audioipc/client/src/lib.rs
@@ -27,34 +27,37 @@ use std::os::raw::{c_char, c_int};
 use std::os::unix::io::RawFd;
 use stream::ClientStream;
 
 type InitParamsTls = std::cell::RefCell<Option<CpuPoolInitParams>>;
 
 thread_local!(static IN_CALLBACK: std::cell::RefCell<bool> = std::cell::RefCell::new(false));
 thread_local!(static CPUPOOL_INIT_PARAMS: InitParamsTls = std::cell::RefCell::new(None));
 
+// This must match the definition of AudioIpcInitParams in
+// dom/media/CubebUtils.cpp in Gecko.
 #[repr(C)]
 #[derive(Clone, Copy, Debug)]
 pub struct AudioIpcInitParams {
+    // Fields only need to be public for ipctest.
     pub server_connection: PlatformHandleType,
     pub pool_size: usize,
     pub stack_size: usize,
     pub thread_create_callback: Option<extern "C" fn(*const ::std::os::raw::c_char)>,
 }
 
 #[derive(Clone, Copy, Debug)]
 struct CpuPoolInitParams {
-    pub pool_size: usize,
-    pub stack_size: usize,
-    pub thread_create_callback: Option<extern "C" fn(*const ::std::os::raw::c_char)>,
+    pool_size: usize,
+    stack_size: usize,
+    thread_create_callback: Option<extern "C" fn(*const ::std::os::raw::c_char)>,
 }
 
 impl CpuPoolInitParams {
-    pub fn init_with(params: &AudioIpcInitParams) -> Self {
+    fn init_with(params: &AudioIpcInitParams) -> Self {
         CpuPoolInitParams {
             pool_size: params.pool_size,
             stack_size: params.stack_size,
             thread_create_callback: params.thread_create_callback,
         }
     }
 }
 
--- a/media/audioipc/client/src/send_recv.rs
+++ b/media/audioipc/client/src/send_recv.rs
@@ -1,8 +1,13 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license.  See the
+// accompanying file LICENSE for details.
+
 use cubeb_backend::Error;
 use std::os::raw::c_int;
 
 #[doc(hidden)]
 pub fn _err<E>(e: E) -> Error
 where
     E: Into<Option<c_int>>,
 {
--- a/media/audioipc/client/src/stream.rs
+++ b/media/audioipc/client/src/stream.rs
@@ -1,31 +1,31 @@
 // Copyright © 2017 Mozilla Foundation
 //
 // This program is made available under an ISC-style license.  See the
 // accompanying file LICENSE for details
 
-use {assert_not_in_callback, set_in_callback};
-use ClientContext;
 use audioipc::codec::LengthDelimitedCodec;
 use audioipc::frame::{framed, Framed};
 use audioipc::messages::{self, CallbackReq, CallbackResp, ClientMessage, ServerMessage};
 use audioipc::rpc;
 use audioipc::shm::{SharedMemMutSlice, SharedMemSlice};
 use cubeb_backend::{ffi, DeviceRef, Error, Result, Stream, StreamOps};
 use futures::Future;
 use futures_cpupool::{CpuFuture, CpuPool};
 use std::ffi::CString;
 use std::fs::File;
 use std::os::raw::c_void;
 use std::os::unix::io::FromRawFd;
 use std::os::unix::net;
 use std::ptr;
 use std::sync::mpsc;
 use tokio_uds::UnixStream;
+use ClientContext;
+use {assert_not_in_callback, set_in_callback};
 
 // TODO: Remove and let caller allocate based on cubeb backend requirements.
 const SHM_AREA_SIZE: usize = 2 * 1024 * 1024;
 
 pub struct Device(ffi::cubeb_device);
 
 impl Drop for Device {
     fn drop(&mut self) {
@@ -90,24 +90,24 @@ impl rpc::Server for CallbackServer {
 
                 self.cpu_pool.spawn_fn(move || {
                     // TODO: This is proof-of-concept. Make it better.
                     let input_ptr: *const u8 = match input_shm {
                         Some(shm) => shm
                             .get_slice(nframes as usize * frame_size)
                             .unwrap()
                             .as_ptr(),
-                        None => ptr::null()
+                        None => ptr::null(),
                     };
                     let output_ptr: *mut u8 = match output_shm {
                         Some(ref mut shm) => shm
                             .get_mut_slice(nframes as usize * frame_size)
                             .unwrap()
                             .as_mut_ptr(),
-                        None => ptr::null_mut()
+                        None => ptr::null_mut(),
                     };
 
                     set_in_callback(true);
                     let nframes = unsafe {
                         cb(
                             ptr::null_mut(),
                             user_ptr as *mut c_void,
                             input_ptr as *const _,
--- a/media/audioipc/server/src/lib.rs
+++ b/media/audioipc/server/src/lib.rs
@@ -1,452 +1,59 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license.  See the
+// accompanying file LICENSE for details
+
 #[macro_use]
 extern crate error_chain;
 
 #[macro_use]
 extern crate log;
 
 extern crate audioipc;
 extern crate bytes;
 extern crate cubeb_core as cubeb;
 extern crate futures;
 extern crate lazycell;
 extern crate libc;
 extern crate slab;
 extern crate tokio_core;
 extern crate tokio_uds;
 
-use audioipc::codec::LengthDelimitedCodec;
 use audioipc::core;
-use audioipc::fd_passing::{framed_with_fds, FramedWithFds};
-use audioipc::frame::{framed, Framed};
-use audioipc::messages::{CallbackReq, CallbackResp, ClientMessage, Device, DeviceInfo,
-                         ServerMessage, StreamCreate, StreamInitParams, StreamParams};
+use audioipc::fd_passing::framed_with_fds;
 use audioipc::rpc;
-use audioipc::shm::{SharedMemReader, SharedMemWriter};
 use audioipc::PlatformHandleType;
-use cubeb::ffi;
-use futures::future::{self, FutureResult};
 use futures::sync::oneshot;
 use futures::Future;
-use std::cell::RefCell;
-use std::convert::From;
 use std::error::Error;
-use std::ffi::{CStr, CString};
-use std::mem::{size_of, ManuallyDrop};
-use std::os::raw::{c_long, c_void};
+use std::os::raw::c_void;
+use std::os::unix::io::IntoRawFd;
 use std::os::unix::net;
-use std::os::unix::prelude::*;
-use std::{panic, ptr, slice};
-use tokio_core::reactor::Remote;
+use std::ptr;
 use tokio_uds::UnixStream;
 
+mod server;
+
 pub mod errors {
     error_chain! {
         links {
             AudioIPC(::audioipc::errors::Error, ::audioipc::errors::ErrorKind);
         }
         foreign_links {
             Cubeb(::cubeb::Error);
             Io(::std::io::Error);
             Canceled(::futures::sync::oneshot::Canceled);
         }
     }
 }
 
 use errors::*;
 
-type ContextKey = RefCell<Option<cubeb::Result<cubeb::Context>>>;
-thread_local!(static CONTEXT_KEY:ContextKey = RefCell::new(None));
-
-fn with_local_context<T, F>(f: F) -> T
-where
-    F: FnOnce(&cubeb::Result<cubeb::Context>) -> T,
-{
-    CONTEXT_KEY.with(|k| {
-        let mut context = k.borrow_mut();
-        if context.is_none() {
-            let name = CString::new("AudioIPC Server").unwrap();
-            *context = Some(cubeb::Context::init(Some(name.as_c_str()), None));
-        }
-        f(context.as_ref().unwrap())
-    })
-}
-
-// TODO: Remove and let caller allocate based on cubeb backend requirements.
-const SHM_AREA_SIZE: usize = 2 * 1024 * 1024;
-
-// The size in which the stream slab is grown.
-const STREAM_CONN_CHUNK_SIZE: usize = 64;
-
-struct CallbackClient;
-
-impl rpc::Client for CallbackClient {
-    type Request = CallbackReq;
-    type Response = CallbackResp;
-    type Transport = Framed<UnixStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
-}
-
-struct ServerStreamCallbacks {
-    /// Size of input frame in bytes
-    input_frame_size: u16,
-    /// Size of output frame in bytes
-    output_frame_size: u16,
-    /// Shared memory buffer for sending input data to client
-    input_shm: SharedMemWriter,
-    /// Shared memory buffer for receiving output data from client
-    output_shm: SharedMemReader,
-    /// RPC interface to callback server running in client
-    rpc: rpc::ClientProxy<CallbackReq, CallbackResp>,
-}
-
-impl ServerStreamCallbacks {
-    fn data_callback(&mut self, input: &[u8], output: &mut [u8]) -> isize {
-        trace!("Stream data callback: {} {}", input.len(), output.len());
-
-        // len is of input and output is frame len. Turn these into the real lengths.
-        let real_input = unsafe {
-            let nbytes = input.len() * self.input_frame_size as usize;
-            slice::from_raw_parts(input.as_ptr(), nbytes)
-        };
-
-        self.input_shm.write(real_input).unwrap();
-
-        let r = self.rpc
-            .call(CallbackReq::Data(
-                output.len() as isize,
-                self.output_frame_size as usize,
-            ))
-            .wait();
-
-        match r {
-            Ok(CallbackResp::Data(frames)) => {
-                if frames >= 0 {
-                    let nbytes = frames as usize * self.output_frame_size as usize;
-                    let real_output = unsafe {
-                        trace!("Resize output to {}", nbytes);
-                        slice::from_raw_parts_mut(output.as_mut_ptr(), nbytes)
-                    };
-                    self.output_shm.read(&mut real_output[..nbytes]).unwrap();
-                }
-                frames
-            }
-            _ => {
-                debug!("Unexpected message {:?} during data_callback", r);
-                -1
-            }
-        }
-    }
-
-    fn state_callback(&mut self, state: cubeb::State) {
-        trace!("Stream state callback: {:?}", state);
-        let r = self.rpc.call(CallbackReq::State(state.into())).wait();
-        match r {
-            Ok(CallbackResp::State) => {}
-            _ => {
-                debug!("Unexpected message {:?} during callback", r);
-            }
-        }
-    }
-}
-
-struct ServerStream {
-    stream: ManuallyDrop<cubeb::Stream>,
-    cbs: ManuallyDrop<Box<ServerStreamCallbacks>>,
-}
-
-impl Drop for ServerStream {
-    fn drop(&mut self) {
-        unsafe {
-            ManuallyDrop::drop(&mut self.stream);
-            ManuallyDrop::drop(&mut self.cbs);
-        }
-    }
-}
-
-type StreamSlab = slab::Slab<ServerStream, usize>;
-
-pub struct CubebServer {
-    cb_remote: Remote,
-    streams: StreamSlab,
-}
-
-impl rpc::Server for CubebServer {
-    type Request = ServerMessage;
-    type Response = ClientMessage;
-    type Future = FutureResult<Self::Response, ()>;
-    type Transport = FramedWithFds<UnixStream, LengthDelimitedCodec<Self::Response, Self::Request>>;
-
-    fn process(&mut self, req: Self::Request) -> Self::Future {
-        let resp = with_local_context(|context| match *context {
-            Err(_) => error(cubeb::Error::error()),
-            Ok(ref context) => self.process_msg(context, &req),
-        });
-        future::ok(resp)
-    }
-}
-
-impl CubebServer {
-    pub fn new(cb_remote: Remote) -> Self {
-        CubebServer {
-            cb_remote: cb_remote,
-            streams: StreamSlab::with_capacity(STREAM_CONN_CHUNK_SIZE),
-        }
-    }
-
-    // Process a request coming from the client.
-    fn process_msg(&mut self, context: &cubeb::Context, msg: &ServerMessage) -> ClientMessage {
-        let resp: ClientMessage = match *msg {
-            ServerMessage::ClientConnect => panic!("already connected"),
-
-            ServerMessage::ClientDisconnect => {
-                // TODO:
-                //self.connection.client_disconnect();
-                ClientMessage::ClientDisconnected
-            }
-
-            ServerMessage::ContextGetBackendId => ClientMessage::ContextBackendId(),
-
-            ServerMessage::ContextGetMaxChannelCount => context
-                .max_channel_count()
-                .map(ClientMessage::ContextMaxChannelCount)
-                .unwrap_or_else(error),
-
-            ServerMessage::ContextGetMinLatency(ref params) => {
-                let format = cubeb::SampleFormat::from(params.format);
-                let layout = cubeb::ChannelLayout::from(params.layout);
-
-                let params = cubeb::StreamParamsBuilder::new()
-                    .format(format)
-                    .rate(u32::from(params.rate))
-                    .channels(u32::from(params.channels))
-                    .layout(layout)
-                    .take();
-
-                context
-                    .min_latency(&params)
-                    .map(ClientMessage::ContextMinLatency)
-                    .unwrap_or_else(error)
-            }
-
-            ServerMessage::ContextGetPreferredSampleRate => context
-                .preferred_sample_rate()
-                .map(ClientMessage::ContextPreferredSampleRate)
-                .unwrap_or_else(error),
-
-            ServerMessage::ContextGetDeviceEnumeration(device_type) => context
-                .enumerate_devices(cubeb::DeviceType::from_bits_truncate(device_type))
-                .map(|devices| {
-                    let v: Vec<DeviceInfo> = devices.iter().map(|i| i.as_ref().into()).collect();
-                    ClientMessage::ContextEnumeratedDevices(v)
-                })
-                .unwrap_or_else(error),
-
-            ServerMessage::StreamInit(ref params) => self.process_stream_init(context, params)
-                .unwrap_or_else(|_| error(cubeb::Error::error())),
-
-            ServerMessage::StreamDestroy(stm_tok) => {
-                self.streams.remove(stm_tok);
-                ClientMessage::StreamDestroyed
-            }
-
-            ServerMessage::StreamStart(stm_tok) => self.streams[stm_tok]
-                .stream
-                .start()
-                .map(|_| ClientMessage::StreamStarted)
-                .unwrap_or_else(error),
-
-            ServerMessage::StreamStop(stm_tok) => self.streams[stm_tok]
-                .stream
-                .stop()
-                .map(|_| ClientMessage::StreamStopped)
-                .unwrap_or_else(error),
-
-            ServerMessage::StreamResetDefaultDevice(stm_tok) => self.streams[stm_tok]
-                .stream
-                .reset_default_device()
-                .map(|_| ClientMessage::StreamDefaultDeviceReset)
-                .unwrap_or_else(error),
-
-            ServerMessage::StreamGetPosition(stm_tok) => self.streams[stm_tok]
-                .stream
-                .position()
-                .map(ClientMessage::StreamPosition)
-                .unwrap_or_else(error),
-
-            ServerMessage::StreamGetLatency(stm_tok) => self.streams[stm_tok]
-                .stream
-                .latency()
-                .map(ClientMessage::StreamLatency)
-                .unwrap_or_else(error),
-
-            ServerMessage::StreamSetVolume(stm_tok, volume) => self.streams[stm_tok]
-                .stream
-                .set_volume(volume)
-                .map(|_| ClientMessage::StreamVolumeSet)
-                .unwrap_or_else(error),
-
-            ServerMessage::StreamSetPanning(stm_tok, panning) => self.streams[stm_tok]
-                .stream
-                .set_panning(panning)
-                .map(|_| ClientMessage::StreamPanningSet)
-                .unwrap_or_else(error),
-
-            ServerMessage::StreamGetCurrentDevice(stm_tok) => self.streams[stm_tok]
-                .stream
-                .current_device()
-                .map(|device| ClientMessage::StreamCurrentDevice(Device::from(device)))
-                .unwrap_or_else(error),
-        };
-
-        trace!("process_msg: req={:?}, resp={:?}", msg, resp);
-
-        resp
-    }
-
-    // Stream init is special, so it's been separated from process_msg.
-    fn process_stream_init(
-        &mut self,
-        context: &cubeb::Context,
-        params: &StreamInitParams,
-    ) -> Result<ClientMessage> {
-        fn frame_size_in_bytes(params: Option<&StreamParams>) -> u16 {
-            params
-                .map(|p| {
-                    let format = p.format.into();
-                    let sample_size = match format {
-                        cubeb::SampleFormat::S16LE
-                        | cubeb::SampleFormat::S16BE
-                        | cubeb::SampleFormat::S16NE => 2,
-                        cubeb::SampleFormat::Float32LE
-                        | cubeb::SampleFormat::Float32BE
-                        | cubeb::SampleFormat::Float32NE => 4,
-                    };
-                    let channel_count = p.channels as u16;
-                    sample_size * channel_count
-                })
-                .unwrap_or(0u16)
-        }
-
-        // Create the callback handling struct which is attached the cubeb stream.
-        let input_frame_size = frame_size_in_bytes(params.input_stream_params.as_ref());
-        let output_frame_size = frame_size_in_bytes(params.output_stream_params.as_ref());
-
-        let (stm1, stm2) = net::UnixStream::pair()?;
-        debug!("Created callback pair: {:?}-{:?}", stm1, stm2);
-        let (input_shm, input_file) =
-            SharedMemWriter::new(&audioipc::get_shm_path("input"), SHM_AREA_SIZE)?;
-        let (output_shm, output_file) =
-            SharedMemReader::new(&audioipc::get_shm_path("output"), SHM_AREA_SIZE)?;
-
-        // This code is currently running on the Client/Server RPC
-        // handling thread.  We need to move the registration of the
-        // bind_client to the callback RPC handling thread.  This is
-        // done by spawning a future on cb_remote.
-
-        let id = core::handle().id();
-
-        let (tx, rx) = oneshot::channel();
-        self.cb_remote.spawn(move |handle| {
-            // Ensure we're running on a loop different to the one
-            // invoking spawn_fn.
-            assert_ne!(id, handle.id());
-            let stream = UnixStream::from_stream(stm2, handle).unwrap();
-            let transport = framed(stream, Default::default());
-            let rpc = rpc::bind_client::<CallbackClient>(transport, handle);
-            drop(tx.send(rpc));
-            Ok(())
-        });
-
-        let rpc: rpc::ClientProxy<CallbackReq, CallbackResp> = match rx.wait() {
-            Ok(rpc) => rpc,
-            Err(_) => bail!("Failed to create callback rpc."),
-        };
-
-        let cbs = Box::new(ServerStreamCallbacks {
-            input_frame_size,
-            output_frame_size,
-            input_shm,
-            output_shm,
-            rpc,
-        });
-
-        // Create cubeb stream from params
-        let stream_name = params
-            .stream_name
-            .as_ref()
-            .and_then(|name| CStr::from_bytes_with_nul(name).ok());
-
-        let input_device = params.input_device as *const _;
-        let input_stream_params = params.input_stream_params.as_ref().map(|isp| unsafe {
-            cubeb::StreamParamsRef::from_ptr(isp as *const StreamParams as *mut _)
-        });
-
-        let output_device = params.output_device as *const _;
-        let output_stream_params = params.output_stream_params.as_ref().map(|osp| unsafe {
-            cubeb::StreamParamsRef::from_ptr(osp as *const StreamParams as *mut _)
-        });
-
-        let latency = params.latency_frames;
-        assert!(size_of::<Box<ServerStreamCallbacks>>() == size_of::<usize>());
-        let user_ptr = cbs.as_ref() as *const ServerStreamCallbacks as *mut c_void;
-
-        unsafe {
-            context
-                .stream_init(
-                    stream_name,
-                    input_device,
-                    input_stream_params,
-                    output_device,
-                    output_stream_params,
-                    latency,
-                    Some(data_cb_c),
-                    Some(state_cb_c),
-                    user_ptr,
-                )
-                .and_then(|stream| {
-                    if !self.streams.has_available() {
-                        trace!(
-                            "server connection ran out of stream slots. reserving {} more.",
-                            STREAM_CONN_CHUNK_SIZE
-                        );
-                        self.streams.reserve_exact(STREAM_CONN_CHUNK_SIZE);
-                    }
-
-                    let stm_tok = match self.streams.vacant_entry() {
-                        Some(entry) => {
-                            debug!("Registering stream {:?}", entry.index(),);
-
-                            entry
-                                .insert(ServerStream {
-                                    stream: ManuallyDrop::new(stream),
-                                    cbs: ManuallyDrop::new(cbs),
-                                })
-                                .index()
-                        }
-                        None => {
-                            // TODO: Turn into error
-                            panic!("Failed to insert stream into slab. No entries")
-                        }
-                    };
-
-                    Ok(ClientMessage::StreamCreated(StreamCreate {
-                        token: stm_tok,
-                        fds: [
-                            stm1.into_raw_fd(),
-                            input_file.into_raw_fd(),
-                            output_file.into_raw_fd(),
-                        ],
-                    }))
-                })
-                .map_err(|e| e.into())
-        }
-    }
-}
-
 struct ServerWrapper {
     core_thread: core::CoreThread,
     callback_thread: core::CoreThread,
 }
 
 fn run() -> Result<ServerWrapper> {
     trace!("Starting up cubeb audio server event loop thread...");
 
@@ -501,70 +108,26 @@ pub extern "C" fn audioipc_server_new_cl
         .and_then(|(sock1, sock2)| {
             // Spawn closure to run on same thread as reactor::Core
             // via remote handle.
             wrapper.core_thread.remote().spawn(|handle| {
                 trace!("Incoming connection");
                 UnixStream::from_stream(sock2, handle)
                     .and_then(|sock| {
                         let transport = framed_with_fds(sock, Default::default());
-                        rpc::bind_server(transport, CubebServer::new(cb_remote), handle);
+                        rpc::bind_server(transport, server::CubebServer::new(cb_remote), handle);
                         Ok(())
-                    })
-                    .map_err(|_| ())
+                    }).map_err(|_| ())
                     // Notify waiting thread that sock2 has been registered.
                     .and_then(|_| wait_tx.send(()))
             });
             // Wait for notification that sock2 has been registered
             // with reactor::Core.
             let _ = wait_rx.wait();
             Ok(sock1.into_raw_fd())
-        })
-        .unwrap_or(-1)
+        }).unwrap_or(-1)
 }
 
 #[no_mangle]
 pub extern "C" fn audioipc_server_stop(p: *mut c_void) {
     let wrapper = unsafe { Box::<ServerWrapper>::from_raw(p as *mut _) };
     drop(wrapper);
 }
-
-fn error(error: cubeb::Error) -> ClientMessage {
-    ClientMessage::Error(error.raw_code())
-}
-
-// C callable callbacks
-unsafe extern "C" fn data_cb_c(
-    _: *mut ffi::cubeb_stream,
-    user_ptr: *mut c_void,
-    input_buffer: *const c_void,
-    output_buffer: *mut c_void,
-    nframes: c_long,
-) -> c_long {
-    let ok = panic::catch_unwind(|| {
-        let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks);
-        let input = if input_buffer.is_null() {
-            &[]
-        } else {
-            slice::from_raw_parts(input_buffer as *const u8, nframes as usize)
-        };
-        let output: &mut [u8] = if output_buffer.is_null() {
-            &mut []
-        } else {
-            slice::from_raw_parts_mut(output_buffer as *mut u8, nframes as usize)
-        };
-        cbs.data_callback(input, output) as c_long
-    });
-    ok.unwrap_or(0)
-}
-
-unsafe extern "C" fn state_cb_c(
-    _: *mut ffi::cubeb_stream,
-    user_ptr: *mut c_void,
-    state: ffi::cubeb_state,
-) {
-    let ok = panic::catch_unwind(|| {
-        let state = cubeb::State::from(state);
-        let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks);
-        cbs.state_callback(state);
-    });
-    ok.expect("State callback panicked");
-}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/server/src/server.rs
@@ -0,0 +1,460 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license.  See the
+// accompanying file LICENSE for details
+
+use audioipc;
+use audioipc::codec::LengthDelimitedCodec;
+use audioipc::core;
+use audioipc::fd_passing::FramedWithFds;
+use audioipc::frame::{framed, Framed};
+use audioipc::messages::{
+    CallbackReq, CallbackResp, ClientMessage, Device, DeviceInfo, ServerMessage, StreamCreate,
+    StreamInitParams, StreamParams,
+};
+use audioipc::rpc;
+use audioipc::shm::{SharedMemReader, SharedMemWriter};
+use cubeb;
+use cubeb::ffi;
+use futures::future::{self, FutureResult};
+use futures::sync::oneshot;
+use futures::Future;
+use slab;
+use std::cell::RefCell;
+use std::convert::From;
+use std::ffi::{CStr, CString};
+use std::mem::{size_of, ManuallyDrop};
+use std::os::raw::{c_long, c_void};
+use std::os::unix::io::IntoRawFd;
+use std::os::unix::net;
+use std::{panic, slice};
+use tokio_core::reactor::Remote;
+use tokio_uds::UnixStream;
+
+use errors::*;
+
+fn error(error: cubeb::Error) -> ClientMessage {
+    ClientMessage::Error(error.raw_code())
+}
+
+type ContextKey = RefCell<Option<cubeb::Result<cubeb::Context>>>;
+thread_local!(static CONTEXT_KEY:ContextKey = RefCell::new(None));
+
+fn with_local_context<T, F>(f: F) -> T
+where
+    F: FnOnce(&cubeb::Result<cubeb::Context>) -> T,
+{
+    CONTEXT_KEY.with(|k| {
+        let mut context = k.borrow_mut();
+        if context.is_none() {
+            let name = CString::new("AudioIPC Server").unwrap();
+            *context = Some(cubeb::Context::init(Some(name.as_c_str()), None));
+        }
+        f(context.as_ref().unwrap())
+    })
+}
+
+// TODO: Remove and let caller allocate based on cubeb backend requirements.
+const SHM_AREA_SIZE: usize = 2 * 1024 * 1024;
+
+// The size in which the stream slab is grown.
+const STREAM_CONN_CHUNK_SIZE: usize = 64;
+
+struct CallbackClient;
+
+impl rpc::Client for CallbackClient {
+    type Request = CallbackReq;
+    type Response = CallbackResp;
+    type Transport = Framed<UnixStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
+}
+
+struct ServerStreamCallbacks {
+    /// Size of input frame in bytes
+    input_frame_size: u16,
+    /// Size of output frame in bytes
+    output_frame_size: u16,
+    /// Shared memory buffer for sending input data to client
+    input_shm: SharedMemWriter,
+    /// Shared memory buffer for receiving output data from client
+    output_shm: SharedMemReader,
+    /// RPC interface to callback server running in client
+    rpc: rpc::ClientProxy<CallbackReq, CallbackResp>,
+}
+
+impl ServerStreamCallbacks {
+    fn data_callback(&mut self, input: &[u8], output: &mut [u8]) -> isize {
+        trace!("Stream data callback: {} {}", input.len(), output.len());
+
+        // len is of input and output is frame len. Turn these into the real lengths.
+        let real_input = unsafe {
+            let nbytes = input.len() * self.input_frame_size as usize;
+            slice::from_raw_parts(input.as_ptr(), nbytes)
+        };
+
+        self.input_shm.write(real_input).unwrap();
+
+        let r = self
+            .rpc
+            .call(CallbackReq::Data(
+                output.len() as isize,
+                self.output_frame_size as usize,
+            )).wait();
+
+        match r {
+            Ok(CallbackResp::Data(frames)) => {
+                if frames >= 0 {
+                    let nbytes = frames as usize * self.output_frame_size as usize;
+                    let real_output = unsafe {
+                        trace!("Resize output to {}", nbytes);
+                        slice::from_raw_parts_mut(output.as_mut_ptr(), nbytes)
+                    };
+                    self.output_shm.read(&mut real_output[..nbytes]).unwrap();
+                }
+                frames
+            }
+            _ => {
+                debug!("Unexpected message {:?} during data_callback", r);
+                -1
+            }
+        }
+    }
+
+    fn state_callback(&mut self, state: cubeb::State) {
+        trace!("Stream state callback: {:?}", state);
+        let r = self.rpc.call(CallbackReq::State(state.into())).wait();
+        match r {
+            Ok(CallbackResp::State) => {}
+            _ => {
+                debug!("Unexpected message {:?} during callback", r);
+            }
+        }
+    }
+}
+
+struct ServerStream {
+    stream: ManuallyDrop<cubeb::Stream>,
+    cbs: ManuallyDrop<Box<ServerStreamCallbacks>>,
+}
+
+impl Drop for ServerStream {
+    fn drop(&mut self) {
+        unsafe {
+            ManuallyDrop::drop(&mut self.stream);
+            ManuallyDrop::drop(&mut self.cbs);
+        }
+    }
+}
+
+type StreamSlab = slab::Slab<ServerStream, usize>;
+
+pub struct CubebServer {
+    cb_remote: Remote,
+    streams: StreamSlab,
+}
+
+impl rpc::Server for CubebServer {
+    type Request = ServerMessage;
+    type Response = ClientMessage;
+    type Future = FutureResult<Self::Response, ()>;
+    type Transport = FramedWithFds<UnixStream, LengthDelimitedCodec<Self::Response, Self::Request>>;
+
+    fn process(&mut self, req: Self::Request) -> Self::Future {
+        let resp = with_local_context(|context| match *context {
+            Err(_) => error(cubeb::Error::error()),
+            Ok(ref context) => self.process_msg(context, &req),
+        });
+        future::ok(resp)
+    }
+}
+
+impl CubebServer {
+    pub fn new(cb_remote: Remote) -> Self {
+        CubebServer {
+            cb_remote: cb_remote,
+            streams: StreamSlab::with_capacity(STREAM_CONN_CHUNK_SIZE),
+        }
+    }
+
+    // Process a request coming from the client.
+    fn process_msg(&mut self, context: &cubeb::Context, msg: &ServerMessage) -> ClientMessage {
+        let resp: ClientMessage = match *msg {
+            ServerMessage::ClientConnect => panic!("already connected"),
+
+            ServerMessage::ClientDisconnect => {
+                // TODO:
+                //self.connection.client_disconnect();
+                ClientMessage::ClientDisconnected
+            }
+
+            ServerMessage::ContextGetBackendId => ClientMessage::ContextBackendId(),
+
+            ServerMessage::ContextGetMaxChannelCount => context
+                .max_channel_count()
+                .map(ClientMessage::ContextMaxChannelCount)
+                .unwrap_or_else(error),
+
+            ServerMessage::ContextGetMinLatency(ref params) => {
+                let format = cubeb::SampleFormat::from(params.format);
+                let layout = cubeb::ChannelLayout::from(params.layout);
+
+                let params = cubeb::StreamParamsBuilder::new()
+                    .format(format)
+                    .rate(u32::from(params.rate))
+                    .channels(u32::from(params.channels))
+                    .layout(layout)
+                    .take();
+
+                context
+                    .min_latency(&params)
+                    .map(ClientMessage::ContextMinLatency)
+                    .unwrap_or_else(error)
+            }
+
+            ServerMessage::ContextGetPreferredSampleRate => context
+                .preferred_sample_rate()
+                .map(ClientMessage::ContextPreferredSampleRate)
+                .unwrap_or_else(error),
+
+            ServerMessage::ContextGetDeviceEnumeration(device_type) => context
+                .enumerate_devices(cubeb::DeviceType::from_bits_truncate(device_type))
+                .map(|devices| {
+                    let v: Vec<DeviceInfo> = devices.iter().map(|i| i.as_ref().into()).collect();
+                    ClientMessage::ContextEnumeratedDevices(v)
+                }).unwrap_or_else(error),
+
+            ServerMessage::StreamInit(ref params) => self
+                .process_stream_init(context, params)
+                .unwrap_or_else(|_| error(cubeb::Error::error())),
+
+            ServerMessage::StreamDestroy(stm_tok) => {
+                self.streams.remove(stm_tok);
+                ClientMessage::StreamDestroyed
+            }
+
+            ServerMessage::StreamStart(stm_tok) => self.streams[stm_tok]
+                .stream
+                .start()
+                .map(|_| ClientMessage::StreamStarted)
+                .unwrap_or_else(error),
+
+            ServerMessage::StreamStop(stm_tok) => self.streams[stm_tok]
+                .stream
+                .stop()
+                .map(|_| ClientMessage::StreamStopped)
+                .unwrap_or_else(error),
+
+            ServerMessage::StreamResetDefaultDevice(stm_tok) => self.streams[stm_tok]
+                .stream
+                .reset_default_device()
+                .map(|_| ClientMessage::StreamDefaultDeviceReset)
+                .unwrap_or_else(error),
+
+            ServerMessage::StreamGetPosition(stm_tok) => self.streams[stm_tok]
+                .stream
+                .position()
+                .map(ClientMessage::StreamPosition)
+                .unwrap_or_else(error),
+
+            ServerMessage::StreamGetLatency(stm_tok) => self.streams[stm_tok]
+                .stream
+                .latency()
+                .map(ClientMessage::StreamLatency)
+                .unwrap_or_else(error),
+
+            ServerMessage::StreamSetVolume(stm_tok, volume) => self.streams[stm_tok]
+                .stream
+                .set_volume(volume)
+                .map(|_| ClientMessage::StreamVolumeSet)
+                .unwrap_or_else(error),
+
+            ServerMessage::StreamSetPanning(stm_tok, panning) => self.streams[stm_tok]
+                .stream
+                .set_panning(panning)
+                .map(|_| ClientMessage::StreamPanningSet)
+                .unwrap_or_else(error),
+
+            ServerMessage::StreamGetCurrentDevice(stm_tok) => self.streams[stm_tok]
+                .stream
+                .current_device()
+                .map(|device| ClientMessage::StreamCurrentDevice(Device::from(device)))
+                .unwrap_or_else(error),
+        };
+
+        trace!("process_msg: req={:?}, resp={:?}", msg, resp);
+
+        resp
+    }
+
+    // Stream init is special, so it's been separated from process_msg.
+    fn process_stream_init(
+        &mut self,
+        context: &cubeb::Context,
+        params: &StreamInitParams,
+    ) -> Result<ClientMessage> {
+        fn frame_size_in_bytes(params: Option<&StreamParams>) -> u16 {
+            params
+                .map(|p| {
+                    let format = p.format.into();
+                    let sample_size = match format {
+                        cubeb::SampleFormat::S16LE
+                        | cubeb::SampleFormat::S16BE
+                        | cubeb::SampleFormat::S16NE => 2,
+                        cubeb::SampleFormat::Float32LE
+                        | cubeb::SampleFormat::Float32BE
+                        | cubeb::SampleFormat::Float32NE => 4,
+                    };
+                    let channel_count = p.channels as u16;
+                    sample_size * channel_count
+                }).unwrap_or(0u16)
+        }
+
+        // Create the callback handling struct which is attached the cubeb stream.
+        let input_frame_size = frame_size_in_bytes(params.input_stream_params.as_ref());
+        let output_frame_size = frame_size_in_bytes(params.output_stream_params.as_ref());
+
+        let (stm1, stm2) = net::UnixStream::pair()?;
+        debug!("Created callback pair: {:?}-{:?}", stm1, stm2);
+        let (input_shm, input_file) =
+            SharedMemWriter::new(&audioipc::get_shm_path("input"), SHM_AREA_SIZE)?;
+        let (output_shm, output_file) =
+            SharedMemReader::new(&audioipc::get_shm_path("output"), SHM_AREA_SIZE)?;
+
+        // This code is currently running on the Client/Server RPC
+        // handling thread.  We need to move the registration of the
+        // bind_client to the callback RPC handling thread.  This is
+        // done by spawning a future on cb_remote.
+
+        let id = core::handle().id();
+
+        let (tx, rx) = oneshot::channel();
+        self.cb_remote.spawn(move |handle| {
+            // Ensure we're running on a loop different to the one
+            // invoking spawn_fn.
+            assert_ne!(id, handle.id());
+            let stream = UnixStream::from_stream(stm2, handle).unwrap();
+            let transport = framed(stream, Default::default());
+            let rpc = rpc::bind_client::<CallbackClient>(transport, handle);
+            drop(tx.send(rpc));
+            Ok(())
+        });
+
+        let rpc: rpc::ClientProxy<CallbackReq, CallbackResp> = match rx.wait() {
+            Ok(rpc) => rpc,
+            Err(_) => bail!("Failed to create callback rpc."),
+        };
+
+        let cbs = Box::new(ServerStreamCallbacks {
+            input_frame_size,
+            output_frame_size,
+            input_shm,
+            output_shm,
+            rpc,
+        });
+
+        // Create cubeb stream from params
+        let stream_name = params
+            .stream_name
+            .as_ref()
+            .and_then(|name| CStr::from_bytes_with_nul(name).ok());
+
+        let input_device = params.input_device as *const _;
+        let input_stream_params = params.input_stream_params.as_ref().map(|isp| unsafe {
+            cubeb::StreamParamsRef::from_ptr(isp as *const StreamParams as *mut _)
+        });
+
+        let output_device = params.output_device as *const _;
+        let output_stream_params = params.output_stream_params.as_ref().map(|osp| unsafe {
+            cubeb::StreamParamsRef::from_ptr(osp as *const StreamParams as *mut _)
+        });
+
+        let latency = params.latency_frames;
+        assert!(size_of::<Box<ServerStreamCallbacks>>() == size_of::<usize>());
+        let user_ptr = cbs.as_ref() as *const ServerStreamCallbacks as *mut c_void;
+
+        unsafe {
+            context
+                .stream_init(
+                    stream_name,
+                    input_device,
+                    input_stream_params,
+                    output_device,
+                    output_stream_params,
+                    latency,
+                    Some(data_cb_c),
+                    Some(state_cb_c),
+                    user_ptr,
+                ).and_then(|stream| {
+                    if !self.streams.has_available() {
+                        trace!(
+                            "server connection ran out of stream slots. reserving {} more.",
+                            STREAM_CONN_CHUNK_SIZE
+                        );
+                        self.streams.reserve_exact(STREAM_CONN_CHUNK_SIZE);
+                    }
+
+                    let stm_tok = match self.streams.vacant_entry() {
+                        Some(entry) => {
+                            debug!("Registering stream {:?}", entry.index(),);
+
+                            entry
+                                .insert(ServerStream {
+                                    stream: ManuallyDrop::new(stream),
+                                    cbs: ManuallyDrop::new(cbs),
+                                }).index()
+                        }
+                        None => {
+                            // TODO: Turn into error
+                            panic!("Failed to insert stream into slab. No entries")
+                        }
+                    };
+
+                    Ok(ClientMessage::StreamCreated(StreamCreate {
+                        token: stm_tok,
+                        fds: [
+                            stm1.into_raw_fd(),
+                            input_file.into_raw_fd(),
+                            output_file.into_raw_fd(),
+                        ],
+                    }))
+                }).map_err(|e| e.into())
+        }
+    }
+}
+
+// C callable callbacks
+unsafe extern "C" fn data_cb_c(
+    _: *mut ffi::cubeb_stream,
+    user_ptr: *mut c_void,
+    input_buffer: *const c_void,
+    output_buffer: *mut c_void,
+    nframes: c_long,
+) -> c_long {
+    let ok = panic::catch_unwind(|| {
+        let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks);
+        let input = if input_buffer.is_null() {
+            &[]
+        } else {
+            slice::from_raw_parts(input_buffer as *const u8, nframes as usize)
+        };
+        let output: &mut [u8] = if output_buffer.is_null() {
+            &mut []
+        } else {
+            slice::from_raw_parts_mut(output_buffer as *mut u8, nframes as usize)
+        };
+        cbs.data_callback(input, output) as c_long
+    });
+    ok.unwrap_or(0)
+}
+
+unsafe extern "C" fn state_cb_c(
+    _: *mut ffi::cubeb_stream,
+    user_ptr: *mut c_void,
+    state: ffi::cubeb_state,
+) {
+    let ok = panic::catch_unwind(|| {
+        let state = cubeb::State::from(state);
+        let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks);
+        cbs.state_callback(state);
+    });
+    ok.expect("State callback panicked");
+}