Bug 1403048 - Update media/audioipc to b5559d28. r=kamidphish
authorMatthew Gregan <kinetik@flim.org>
Tue, 26 Sep 2017 15:49:26 +1300
changeset 433873 676427998df38ea39286b546b8b8ac372c9dee13
parent 433872 17b17a40b14a376b5509b759b2de11d7d3456846
child 433874 a51ac764452ac1939023f3d321febb9a67a362e4
push id8114
push userjlorenzo@mozilla.com
push dateThu, 02 Nov 2017 16:33:21 +0000
treeherdermozilla-beta@73e0d89a540f [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewerskamidphish
bugs1403048
milestone58.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1403048 - Update media/audioipc to b5559d28. r=kamidphish
media/audioipc/audioipc/Cargo.toml
media/audioipc/audioipc/src/async.rs
media/audioipc/audioipc/src/codec.rs
media/audioipc/audioipc/src/connection.rs
media/audioipc/audioipc/src/lib.rs
media/audioipc/audioipc/src/messages.rs
media/audioipc/audioipc/src/msg.rs
media/audioipc/client/Cargo.toml
media/audioipc/client/src/context.rs
media/audioipc/client/src/lib.rs
media/audioipc/client/src/send_recv.rs
media/audioipc/client/src/stream.rs
media/audioipc/server/Cargo.toml
media/audioipc/server/src/channel.rs
media/audioipc/server/src/lib.rs
--- a/media/audioipc/audioipc/Cargo.toml
+++ b/media/audioipc/audioipc/Cargo.toml
@@ -3,18 +3,19 @@ name = "audioipc"
 version = "0.1.0"
 authors = [
         "Matthew Gregan <kinetik@flim.org>",
         "Dan Glastonbury <dan.glastonbury@gmail.com>"
         ]
 description = "Remote Cubeb IPC"
 
 [dependencies]
+cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
+bincode = "0.8"
+bytes = "0.4"
 error-chain = "0.10.0"
+libc = "0.2"
 log = "^0.3.6"
+memmap = "0.5.2"
+mio = "0.6.7"
+mio-uds = "0.6.4"
 serde = "1.*.*"
 serde_derive = "1.*.*"
-bincode = "0.8"
-libc = "0.2"
-mio = "0.6.7"
-cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
-byteorder = "1"
-memmap = "0.5.2"
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/async.rs
@@ -0,0 +1,153 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license.  See the
+// accompanying file LICENSE for details
+
+//! Various async helpers modelled after futures-rs and tokio-io.
+
+use {RecvFd, SendFd};
+use bytes::{Buf, BufMut};
+use mio_uds;
+use std::io as std_io;
+use std::os::unix::io::RawFd;
+use std::os::unix::net;
+
+/// A convenience macro for working with `io::Result<T>` from the
+/// `std::io::Read` and `std::io::Write` traits.
+///
+/// This macro takes `io::Result<T>` as input, and returns `T` as the output. If
+/// the input type is of the `Err` variant, then `Async::NotReady` is returned if
+/// it indicates `WouldBlock` or otherwise `Err` is returned.
+#[macro_export]
+macro_rules! try_nb {
+    ($e:expr) => (match $e {
+        Ok(t) => t,
+        Err(ref e) if e.kind() == ::std::io::ErrorKind::WouldBlock => {
+            return Ok(Async::NotReady)
+        }
+        Err(e) => return Err(e.into()),
+    })
+}
+
+/////////////////////////////////////////////////////////////////////////////////////////
+// Async support - Handle EWOULDBLOCK/EAGAIN from non-blocking I/O operations.
+
+/// Return type for async methods, indicates whether the operation was
+/// ready or not.
+///
+/// * `Ok(Async::Ready(t))` means that the operation has completed successfully.
+/// * `Ok(Async::NotReady)` means that the underlying system is not ready to handle operation.
+/// * `Err(e)` means that the operation has completed with the given error `e`.
+pub type AsyncResult<T, E> = Result<Async<T>, E>;
+
+#[derive(Copy, Clone, Debug, PartialEq)]
+pub enum Async<T> {
+    /// Represents that a value is immediately ready.
+    Ready(T),
+    /// Represents that a value is not ready yet, but may be so later.
+    NotReady
+}
+
+impl<T> Async<T> {
+    pub fn is_ready(&self) -> bool {
+        match *self {
+            Async::Ready(_) => true,
+            Async::NotReady => false,
+        }
+    }
+
+    pub fn is_not_ready(&self) -> bool {
+        !self.is_ready()
+    }
+}
+
+/// Return type for an async attempt to send a value.
+///
+/// * `Ok(AsyncSend::Ready)` means that the operation has completed successfully.
+/// * `Ok(AsyncSend::NotReady(t))` means that the underlying system is not ready to handle
+///    send. returns the value that tried to be sent in `t`.
+/// * `Err(e)` means that operation has completed with the given error `e`.
+pub type AsyncSendResult<T, E> = Result<AsyncSend<T>, E>;
+
+#[derive(Copy, Clone, Debug, PartialEq)]
+pub enum AsyncSend<T> {
+    Ready,
+    NotReady(T)
+}
+
+pub trait AsyncRecvFd: RecvFd {
+    unsafe fn prepare_uninitialized_buffer(&self, bytes: &mut [u8]) -> bool {
+        for byte in bytes.iter_mut() {
+            *byte = 0;
+        }
+
+        true
+    }
+
+    /// Pull some bytes from this source into the specified `Buf`, returning
+    /// how many bytes were read.
+    ///
+    /// The `buf` provided will have bytes read into it and the internal cursor
+    /// will be advanced if any bytes were read. Note that this method typically
+    /// will not reallocate the buffer provided.
+    fn recv_buf_fd<B>(&mut self, buf: &mut B) -> AsyncResult<(usize, Option<RawFd>), std_io::Error>
+    where
+        Self: Sized,
+        B: BufMut,
+    {
+        if !buf.has_remaining_mut() {
+            return Ok(Async::Ready((0, None)));
+        }
+
+        unsafe {
+            let (n, fd) = {
+                let bytes = buf.bytes_mut();
+                self.prepare_uninitialized_buffer(bytes);
+                try_nb!(self.recv_fd(bytes))
+            };
+
+            buf.advance_mut(n);
+            Ok(Async::Ready((n, fd)))
+        }
+    }
+}
+
+impl AsyncRecvFd for net::UnixStream {}
+impl AsyncRecvFd for mio_uds::UnixStream {}
+
+/// A trait for writable objects which operated in an async fashion.
+///
+/// This trait inherits from `std::io::Write` and indicates that an I/O object is
+/// **nonblocking**, meaning that it will return an error instead of blocking
+/// when bytes cannot currently be written, but hasn't closed. Specifically
+/// this means that the `write` function for types that implement this trait
+/// can have a few return values:
+///
+/// * `Ok(n)` means that `n` bytes of data was immediately written .
+/// * `Err(e) if e.kind() == ErrorKind::WouldBlock` means that no data was
+///   written from the buffer provided. The I/O object is not currently
+///   writable but may become writable in the future.
+/// * `Err(e)` for other errors are standard I/O errors coming from the
+///   underlying object.
+pub trait AsyncSendFd: SendFd {
+    /// Write a `Buf` into this value, returning how many bytes were written.
+    ///
+    /// Note that this method will advance the `buf` provided automatically by
+    /// the number of bytes written.
+    fn send_buf_fd<B>(&mut self, buf: &mut B, fd: Option<RawFd>) -> AsyncResult<usize, std_io::Error>
+    where
+        Self: Sized,
+        B: Buf,
+    {
+        if !buf.has_remaining() {
+            return Ok(Async::Ready(0));
+        }
+
+        let n = try_nb!(self.send_fd(buf.bytes(), fd));
+        buf.advance(n);
+        Ok(Async::Ready(n))
+    }
+}
+
+impl AsyncSendFd for net::UnixStream {}
+impl AsyncSendFd for mio_uds::UnixStream {}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/codec.rs
@@ -0,0 +1,141 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license.  See the
+// accompanying file LICENSE for details
+
+//! `Encoder`s and `Decoder`s from items to/from `BytesMut` buffers.
+
+use bincode::{self, Bounded, deserialize, serialize_into, serialized_size};
+use bytes::{Buf, BufMut, BytesMut, LittleEndian};
+use serde::de::DeserializeOwned;
+use serde::ser::Serialize;
+use std::io as std_io;
+use std::io::Cursor;
+use std::mem;
+
+////////////////////////////////////////////////////////////////////////////////
+// Split buffer into size delimited frames - This appears more complicated than
+// might be necessary due to handling the possibility of messages being split
+// across reads.
+
+#[derive(Debug)]
+enum FrameState {
+    Head,
+    Data(usize)
+}
+
+#[derive(Debug)]
+pub struct Decoder {
+    state: FrameState
+}
+
+impl Decoder {
+    pub fn new() -> Self {
+        Decoder {
+            state: FrameState::Head
+        }
+    }
+
+    fn decode_head(&mut self, src: &mut BytesMut) -> std_io::Result<Option<usize>> {
+        let head_size = mem::size_of::<u16>();
+        if src.len() < head_size {
+            // Not enough data
+            return Ok(None);
+        }
+
+        let n = {
+            let mut src = Cursor::new(&mut *src);
+
+            // match endianess
+            let n = src.get_uint::<LittleEndian>(head_size);
+
+            if n > u64::from(u16::max_value()) {
+                return Err(std_io::Error::new(
+                    std_io::ErrorKind::InvalidData,
+                    "frame size too big"
+                ));
+            }
+
+            // The check above ensures there is no overflow
+            n as usize
+        };
+
+        // Consume the length field
+        let _ = src.split_to(head_size);
+
+        Ok(Some(n))
+    }
+
+    fn decode_data(&self, n: usize, src: &mut BytesMut) -> std_io::Result<Option<BytesMut>> {
+        // At this point, the buffer has already had the required capacity
+        // reserved. All there is to do is read.
+        if src.len() < n {
+            return Ok(None);
+        }
+
+        Ok(Some(src.split_to(n)))
+    }
+
+    pub fn split_frame(&mut self, src: &mut BytesMut) -> std_io::Result<Option<BytesMut>> {
+        let n = match self.state {
+            FrameState::Head => {
+                match try!(self.decode_head(src)) {
+                    Some(n) => {
+                        self.state = FrameState::Data(n);
+
+                        // Ensure that the buffer has enough space to read the
+                        // incoming payload
+                        src.reserve(n);
+
+                        n
+                    },
+                    None => return Ok(None),
+                }
+            },
+            FrameState::Data(n) => n,
+        };
+
+        match try!(self.decode_data(n, src)) {
+            Some(data) => {
+                // Update the decode state
+                self.state = FrameState::Head;
+
+                // Make sure the buffer has enough space to read the next head
+                src.reserve(mem::size_of::<u16>());
+
+                Ok(Some(data))
+            },
+            None => Ok(None),
+        }
+    }
+
+    pub fn decode<ITEM: DeserializeOwned>(&mut self, src: &mut BytesMut) -> Result<Option<ITEM>, bincode::Error> {
+        match try!(self.split_frame(src)) {
+            Some(buf) => deserialize::<ITEM>(buf.as_ref()).and_then(|t| Ok(Some(t))),
+            None => Ok(None),
+        }
+    }
+}
+
+impl Default for Decoder {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+pub fn encode<ITEM: Serialize>(dst: &mut BytesMut, item: &ITEM) -> Result<(), bincode::Error> {
+    let head_len = mem::size_of::<u16>() as u64;
+    let item_len = serialized_size(item);
+
+    if head_len + item_len > u64::from(u16::max_value()) {
+        return Err(Box::new(bincode::ErrorKind::IoError(std_io::Error::new(
+            std_io::ErrorKind::InvalidInput,
+            "frame too big"
+        ))));
+    }
+
+    let n = (head_len + item_len) as usize;
+    dst.reserve(n);
+    dst.put_u16::<LittleEndian>(item_len as u16);
+    serialize_into(&mut dst.writer(), item, Bounded(item_len))
+}
--- a/media/audioipc/audioipc/src/connection.rs
+++ b/media/audioipc/audioipc/src/connection.rs
@@ -1,48 +1,50 @@
-use bincode::{self, deserialize, serialize};
+use {AutoCloseFd, RecvFd, SendFd};
+use async::{Async, AsyncRecvFd};
+use bytes::{BufMut, BytesMut};
+use codec::{Decoder, encode};
 use errors::*;
-use msg;
 use mio::{Poll, PollOpt, Ready, Token};
 use mio::event::Evented;
 use mio::unix::EventedFd;
 use serde::de::DeserializeOwned;
 use serde::ser::Serialize;
+use std::collections::VecDeque;
 use std::fmt::Debug;
 use std::io::{self, Read};
 use std::os::unix::io::{AsRawFd, RawFd};
 use std::os::unix::net;
 use std::os::unix::prelude::*;
-use libc;
-use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
-
-pub trait RecvFd {
-    fn recv_fd(&mut self, bytes: &mut [u8]) -> io::Result<(usize, Option<RawFd>)>;
-}
-
-pub trait SendFd {
-    fn send_fd<FD: IntoRawFd>(&mut self, bytes: &[u8], fd: Option<FD>) -> io::Result<(usize)>;
-}
 
 // Because of the trait implementation rules in Rust, this needs to be
 // a wrapper class to allow implementation of a trait from another
 // crate on a struct from yet another crate.
 //
 // This class is effectively net::UnixStream.
 
 #[derive(Debug)]
 pub struct Connection {
-    stream: net::UnixStream
+    stream: net::UnixStream,
+    recv_buffer: BytesMut,
+    recv_fd: VecDeque<AutoCloseFd>,
+    send_buffer: BytesMut,
+    decoder: Decoder
 }
 
 impl Connection {
     pub fn new(stream: net::UnixStream) -> Connection {
         info!("Create new connection");
+        stream.set_nonblocking(false).unwrap();
         Connection {
-            stream: stream
+            stream: stream,
+            recv_buffer: BytesMut::with_capacity(1024),
+            recv_fd: VecDeque::new(),
+            send_buffer: BytesMut::with_capacity(1024),
+            decoder: Decoder::new()
         }
     }
 
     /// Creates an unnamed pair of connected sockets.
     ///
     /// Returns two `Connection`s which are connected to each other.
     ///
     /// # Examples
@@ -55,77 +57,102 @@ impl Connection {
     ///     Err(e) => {
     ///         println!("Couldn't create a pair of connections: {:?}", e);
     ///         return
     ///     }
     /// };
     /// ```
     pub fn pair() -> io::Result<(Connection, Connection)> {
         let (s1, s2) = net::UnixStream::pair()?;
-        Ok((
-            Connection {
-                stream: s1
-            },
-            Connection {
-                stream: s2
-            }
-        ))
+        Ok((Connection::new(s1), Connection::new(s2)))
+    }
+
+    pub fn take_fd(&mut self) -> Option<RawFd> {
+        self.recv_fd.pop_front().map(|fd| fd.into_raw_fd())
     }
 
     pub fn receive<RT>(&mut self) -> Result<RT>
     where
         RT: DeserializeOwned + Debug,
     {
-        match self.receive_with_fd() {
-            Ok((r, None)) => Ok(r),
-            Ok((_, Some(_))) => panic!("unexpected fd received"),
-            Err(e) => Err(e),
-        }
+        self.receive_with_fd()
     }
 
-    pub fn receive_with_fd<RT>(&mut self) -> Result<(RT, Option<RawFd>)>
+    pub fn receive_with_fd<RT>(&mut self) -> Result<RT>
     where
         RT: DeserializeOwned + Debug,
     {
-        // TODO: Check deserialize_from and serialize_into.
-        let mut encoded = vec![0; 32 * 1024]; // TODO: Get max size from bincode, or at least assert.
-        // TODO: Read until block, EOF, or error.
-        // TODO: Switch back to recv_fd.
-        match self.stream.recv_fd(&mut encoded) {
-            Ok((0, _)) => Err(ErrorKind::Disconnected.into()),
-            // TODO: Handle partial read?
-            Ok((n, fd)) => {
-                let r = deserialize(&encoded[..n]);
-                debug!("receive {:?}", r);
+        trace!("received_with_fd...");
+        loop {
+            trace!("   recv_buffer = {:?}", self.recv_buffer);
+            if !self.recv_buffer.is_empty() {
+                let r = self.decoder.decode(&mut self.recv_buffer);
+                trace!("receive {:?}", r);
                 match r {
-                    Ok(r) => Ok((r, fd)),
-                    Err(e) => Err(e).chain_err(|| "Failed to deserialize message"),
+                    Ok(Some(r)) => return Ok(r),
+                    Ok(None) => {
+                        /* Buffer doesn't contain enough data for a complete
+                         * message, so need to enter recv_buf_fd to get more. */
+                    },
+                    Err(e) => return Err(e).chain_err(|| "Failed to deserialize message"),
                 }
-            },
-            // TODO: Handle dropped message.
-            // Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => panic!("wouldblock"),
-            _ => bail!("socket write"),
+            }
+
+            // Otherwise, try to read more data and try again. Make sure we've
+            // got room for at least one byte to read to ensure that we don't
+            // get a spurious 0 that looks like EOF
+
+            // The decoder.decode should have reserved an amount for
+            // the next bit it needs to read.  Check that we reserved
+            // enough space for, at least the 2 byte size prefix.
+            assert!(self.recv_buffer.remaining_mut() > 2);
+
+            // TODO: Read until block, EOF, or error.
+            // TODO: Switch back to recv_fd.
+            match self.stream.recv_buf_fd(&mut self.recv_buffer) {
+                Ok(Async::Ready((0, _))) => return Err(ErrorKind::Disconnected.into()),
+                // TODO: Handle partial read?
+                Ok(Async::Ready((_, fd))) => {
+                    trace!(
+                        "   recv_buf_fd: recv_buffer: {:?}, recv_fd: {:?}, fd: {:?}",
+                        self.recv_buffer,
+                        self.recv_fd,
+                        fd
+                    );
+                    if let Some(fd) = fd {
+                        self.recv_fd.push_back(
+                            unsafe { AutoCloseFd::from_raw_fd(fd) }
+                        );
+                    }
+                },
+                Ok(Async::NotReady) => bail!("Socket should be blocking."),
+                // TODO: Handle dropped message.
+                // Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => panic!("wouldblock"),
+                _ => bail!("socket write"),
+            }
         }
     }
 
     pub fn send<ST>(&mut self, msg: ST) -> Result<usize>
     where
         ST: Serialize + Debug,
     {
         self.send_with_fd::<ST, Connection>(msg, None)
     }
 
     pub fn send_with_fd<ST, FD>(&mut self, msg: ST, fd_to_send: Option<FD>) -> Result<usize>
     where
         ST: Serialize + Debug,
         FD: IntoRawFd + Debug,
     {
-        let encoded: Vec<u8> = serialize(&msg, bincode::Infinite)?;
-        info!("send_with_fd {:?}, {:?}", msg, fd_to_send);
-        self.stream.send_fd(&encoded, fd_to_send).chain_err(
+        trace!("send_with_fd {:?}, {:?}", msg, fd_to_send);
+        try!(encode(&mut self.send_buffer, &msg));
+        let fd_to_send = fd_to_send.map(|fd| fd.into_raw_fd());
+        let send = self.send_buffer.take().freeze();
+        self.stream.send_fd(send.as_ref(), fd_to_send).chain_err(
             || "Failed to send message with fd"
         )
     }
 }
 
 impl Evented for Connection {
     fn register(&self, poll: &Poll, token: Token, events: Ready, opts: PollOpt) -> io::Result<()> {
         EventedFd(&self.stream.as_raw_fd()).register(poll, token, events, opts)
@@ -148,52 +175,31 @@ impl Read for Connection {
 
 // TODO: Is this required?
 impl<'a> Read for &'a Connection {
     fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> {
         (&self.stream).read(bytes)
     }
 }
 
-impl RecvFd for net::UnixStream {
-    fn recv_fd(&mut self, buf_to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> {
-        let length = self.read_u32::<LittleEndian>()?;
-
-        msg::recvmsg(self.as_raw_fd(), &mut buf_to_recv[..length as usize])
-    }
-}
-
 impl RecvFd for Connection {
     fn recv_fd(&mut self, buf_to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> {
         self.stream.recv_fd(buf_to_recv)
     }
 }
 
 impl FromRawFd for Connection {
     unsafe fn from_raw_fd(fd: RawFd) -> Connection {
-        Connection {
-            stream: net::UnixStream::from_raw_fd(fd)
-        }
+        Connection::new(net::UnixStream::from_raw_fd(fd))
     }
 }
 
 impl IntoRawFd for Connection {
     fn into_raw_fd(self) -> RawFd {
         self.stream.into_raw_fd()
     }
 }
 
-impl SendFd for net::UnixStream {
-    fn send_fd<FD: IntoRawFd>(&mut self, buf_to_send: &[u8], fd_to_send: Option<FD>) -> io::Result<usize> {
-        self.write_u32::<LittleEndian>(buf_to_send.len() as u32)?;
-
-        let fd_to_send = fd_to_send.map(|fd| fd.into_raw_fd());
-        let r = msg::sendmsg(self.as_raw_fd(), buf_to_send, fd_to_send);
-        fd_to_send.map(|fd| unsafe { libc::close(fd) });
-        r
-    }
-}
-
 impl SendFd for Connection {
-    fn send_fd<FD: IntoRawFd>(&mut self, buf_to_send: &[u8], fd_to_send: Option<FD>) -> io::Result<usize> {
+    fn send_fd(&mut self, buf_to_send: &[u8], fd_to_send: Option<RawFd>) -> io::Result<usize> {
         self.stream.send_fd(buf_to_send, fd_to_send)
     }
 }
--- a/media/audioipc/audioipc/src/lib.rs
+++ b/media/audioipc/audioipc/src/lib.rs
@@ -8,39 +8,119 @@
 #[macro_use]
 extern crate error_chain;
 
 #[macro_use]
 extern crate log;
 
 #[macro_use]
 extern crate serde_derive;
-extern crate serde;
+
 extern crate bincode;
-
-extern crate mio;
-
+extern crate bytes;
 extern crate cubeb_core;
+extern crate libc;
+extern crate memmap;
+extern crate mio;
+extern crate mio_uds;
+extern crate serde;
 
-extern crate libc;
-extern crate byteorder;
-
-extern crate memmap;
-
+pub mod async;
+pub mod codec;
 mod connection;
 pub mod errors;
 pub mod messages;
 mod msg;
 pub mod shm;
 
 pub use connection::*;
 pub use messages::{ClientMessage, ServerMessage};
+
 use std::env::temp_dir;
+use std::io;
+use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
+use std::os::unix::net;
 use std::path::PathBuf;
 
+// Extend sys::os::unix::net::UnixStream to support sending and receiving a single file desc.
+// We can extend UnixStream by using traits, eliminating the need to introduce a new wrapped
+// UnixStream type.
+pub trait RecvFd {
+    fn recv_fd(&mut self, bytes: &mut [u8]) -> io::Result<(usize, Option<RawFd>)>;
+}
+
+pub trait SendFd {
+    fn send_fd(&mut self, bytes: &[u8], fd: Option<RawFd>) -> io::Result<(usize)>;
+}
+
+impl RecvFd for net::UnixStream {
+    fn recv_fd(&mut self, buf_to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> {
+        msg::recvmsg(self.as_raw_fd(), buf_to_recv)
+    }
+}
+
+impl RecvFd for mio_uds::UnixStream {
+    fn recv_fd(&mut self, buf_to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> {
+        msg::recvmsg(self.as_raw_fd(), buf_to_recv)
+    }
+}
+
+impl SendFd for net::UnixStream {
+    fn send_fd(&mut self, buf_to_send: &[u8], fd_to_send: Option<RawFd>) -> io::Result<usize> {
+        msg::sendmsg(self.as_raw_fd(), buf_to_send, fd_to_send)
+    }
+}
+
+impl SendFd for mio_uds::UnixStream {
+    fn send_fd(&mut self, buf_to_send: &[u8], fd_to_send: Option<RawFd>) -> io::Result<usize> {
+        msg::sendmsg(self.as_raw_fd(), buf_to_send, fd_to_send)
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+#[derive(Debug)]
+pub struct AutoCloseFd(RawFd);
+
+impl Drop for AutoCloseFd {
+    fn drop(&mut self) {
+        unsafe {
+            libc::close(self.0);
+        }
+    }
+}
+
+impl FromRawFd for AutoCloseFd {
+    unsafe fn from_raw_fd(fd: RawFd) -> Self {
+        AutoCloseFd(fd)
+    }
+}
+
+impl IntoRawFd for AutoCloseFd {
+    fn into_raw_fd(self) -> RawFd {
+        let fd = self.0;
+        ::std::mem::forget(self);
+        fd
+    }
+}
+
+impl AsRawFd for AutoCloseFd {
+    fn as_raw_fd(&self) -> RawFd {
+        self.0
+    }
+}
+
+impl<'a> AsRawFd for &'a AutoCloseFd {
+    fn as_raw_fd(&self) -> RawFd {
+        self.0
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
 fn get_temp_path(name: &str) -> PathBuf {
     let mut path = temp_dir();
     path.push(name);
     path
 }
 
 pub fn get_uds_path() -> PathBuf {
     get_temp_path("cubeb-sock")
--- a/media/audioipc/audioipc/src/messages.rs
+++ b/media/audioipc/audioipc/src/messages.rs
@@ -121,33 +121,33 @@ pub struct StreamParams {
     pub format: u32,
     pub rate: u16,
     pub channels: u8,
     pub layout: i32
 }
 
 impl<'a> From<&'a ffi::cubeb_stream_params> for StreamParams {
     fn from(params: &'a ffi::cubeb_stream_params) -> Self {
-        assert!(params.channels <= u8::max_value() as u32);
+        assert!(params.channels <= u32::from(u8::max_value()));
 
         StreamParams {
             format: params.format,
             rate: params.rate as u16,
             channels: params.channels as u8,
             layout: params.layout
         }
     }
 }
 
 impl<'a> From<&'a StreamParams> for ffi::cubeb_stream_params {
     fn from(params: &StreamParams) -> Self {
         ffi::cubeb_stream_params {
             format: params.format,
-            rate: params.rate as u32,
-            channels: params.channels as u32,
+            rate: u32::from(params.rate),
+            channels: u32::from(params.channels),
             layout: params.layout
         }
     }
 }
 
 #[derive(Debug, Serialize, Deserialize)]
 pub struct StreamInitParams {
     pub stream_name: Option<Vec<u8>>,
--- a/media/audioipc/audioipc/src/msg.rs
+++ b/media/audioipc/audioipc/src/msg.rs
@@ -1,105 +1,161 @@
 use libc;
 use std::io;
 use std::mem;
+use std::os::unix::io::RawFd;
 use std::ptr;
-use std::os::unix::io::RawFd;
-use std;
 
 // Note: The following fields must be laid out together, the OS expects them
 // to be part of a single allocation.
 #[repr(C)]
 struct CmsgSpace {
     cmsghdr: libc::cmsghdr,
+    #[cfg(not(target_os = "macos"))]
+    __padding: [usize; 0],
     data: libc::c_int,
 }
 
-unsafe fn sendmsg_retry(fd: libc::c_int, msg: *const libc::msghdr, flags: libc::c_int) -> libc::ssize_t {
-    loop {
-        let r = libc::sendmsg(fd, msg, flags);
-        if r == -1 && io::Error::last_os_error().raw_os_error().unwrap() == libc::EAGAIN {
-            std::thread::yield_now();
-            continue;
-        }
-        return r;
-    }
+#[cfg(not(target_os = "macos"))]
+fn cmsg_align(len: usize) -> usize {
+    let align_bytes = mem::size_of::<usize>() - 1;
+    (len + align_bytes) & !align_bytes
+}
+
+#[cfg(target_os = "macos")]
+fn cmsg_align(len: usize) -> usize {
+    len
+}
+
+fn cmsg_space() -> usize {
+    mem::size_of::<CmsgSpace>()
+}
+
+fn cmsg_len() -> usize {
+    cmsg_align(mem::size_of::<libc::cmsghdr>()) + mem::size_of::<libc::c_int>()
 }
 
 pub fn sendmsg(fd: RawFd, to_send: &[u8], fd_to_send: Option<RawFd>) -> io::Result<usize> {
     let mut msghdr: libc::msghdr = unsafe { mem::zeroed() };
     let mut iovec: libc::iovec = unsafe { mem::zeroed() };
     let mut cmsg: CmsgSpace = unsafe { mem::zeroed() };
 
     msghdr.msg_iov = &mut iovec as *mut _;
     msghdr.msg_iovlen = 1;
     if fd_to_send.is_some() {
         msghdr.msg_control = &mut cmsg.cmsghdr as *mut _ as *mut _;
-        msghdr.msg_controllen = mem::size_of::<CmsgSpace>() as _;
+        msghdr.msg_controllen = cmsg_space() as _;
     }
 
     iovec.iov_base = if to_send.is_empty() {
         // Empty Vecs have a non-null pointer.
         ptr::null_mut()
     } else {
         to_send.as_ptr() as *const _ as *mut _
     };
     iovec.iov_len = to_send.len();
 
-    cmsg.cmsghdr.cmsg_len = msghdr.msg_controllen;
+    cmsg.cmsghdr.cmsg_len = cmsg_len() as _;
     cmsg.cmsghdr.cmsg_level = libc::SOL_SOCKET;
     cmsg.cmsghdr.cmsg_type = libc::SCM_RIGHTS;
 
     cmsg.data = fd_to_send.unwrap_or(-1);
 
-    let result = unsafe { sendmsg_retry(fd, &msghdr, 0) };
+    let result = unsafe { libc::sendmsg(fd, &msghdr, 0) };
     if result >= 0 {
         Ok(result as usize)
     } else {
         Err(io::Error::last_os_error())
     }
 }
 
-unsafe fn recvmsg_retry(fd: libc::c_int, msg: *mut libc::msghdr, flags: libc::c_int) -> libc::ssize_t {
-    loop {
-        let r = libc::recvmsg(fd, msg, flags);
-        if r == -1 && io::Error::last_os_error().raw_os_error().unwrap() == libc::EAGAIN {
-            std::thread::yield_now();
-            continue;
-        }
-        return r;
-    }
-}
-
 pub fn recvmsg(fd: RawFd, to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> {
     let mut msghdr: libc::msghdr = unsafe { mem::zeroed() };
     let mut iovec: libc::iovec = unsafe { mem::zeroed() };
     let mut cmsg: CmsgSpace = unsafe { mem::zeroed() };
 
     msghdr.msg_iov = &mut iovec as *mut _;
     msghdr.msg_iovlen = 1;
     msghdr.msg_control = &mut cmsg.cmsghdr as *mut _ as *mut _;
-    msghdr.msg_controllen = mem::size_of::<CmsgSpace>() as _;
+    msghdr.msg_controllen = cmsg_space() as _;
 
     iovec.iov_base = if to_recv.is_empty() {
         // Empty Vecs have a non-null pointer.
         ptr::null_mut()
     } else {
         to_recv.as_ptr() as *const _ as *mut _
     };
     iovec.iov_len = to_recv.len();
 
-    let result = unsafe { recvmsg_retry(fd, &mut msghdr, 0) };
+    let result = unsafe { libc::recvmsg(fd, &mut msghdr, 0) };
     if result >= 0 {
-        let fd = if msghdr.msg_controllen == mem::size_of::<CmsgSpace>() as _ &&
-            cmsg.cmsghdr.cmsg_len == mem::size_of::<CmsgSpace>() as _ &&
+        let fd = if msghdr.msg_controllen == cmsg_space() as _ &&
+            cmsg.cmsghdr.cmsg_len == cmsg_len() as _ &&
             cmsg.cmsghdr.cmsg_level == libc::SOL_SOCKET &&
             cmsg.cmsghdr.cmsg_type == libc::SCM_RIGHTS {
                 Some(cmsg.data)
             } else {
                 None
             };
 
         Ok((result as usize, fd))
     } else {
         Err(io::Error::last_os_error())
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use libc;
+    use std::mem;
+    use std::os::unix::net::UnixStream;
+    use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd};
+    use std::io::{Read, Write};
+    use super::{cmsg_len, cmsg_space, sendmsg, recvmsg};
+
+    #[test]
+    fn portable_sizes() {
+        if cfg!(all(target_os = "linux", target_pointer_width = "64")) {
+            assert_eq!(mem::size_of::<libc::cmsghdr>(), 16);
+            assert_eq!(cmsg_len(), 20);
+            assert_eq!(cmsg_space(), 24);
+        } else if cfg!(all(target_os = "linux", target_pointer_width = "32")) {
+            assert_eq!(mem::size_of::<libc::cmsghdr>(), 12);
+            assert_eq!(cmsg_len(), 16);
+            assert_eq!(cmsg_space(), 16);
+        } else if cfg!(target_os = "macos") {
+            assert_eq!(mem::size_of::<libc::cmsghdr>(), 12);
+            assert_eq!(cmsg_len(), 16);
+            assert_eq!(cmsg_space(), 16);
+        } else if cfg!(target_pointer_width = "64") {
+            assert_eq!(mem::size_of::<libc::cmsghdr>(), 12);
+            assert_eq!(cmsg_len(), 20);
+            assert_eq!(cmsg_space(), 24);
+        } else {
+            assert_eq!(mem::size_of::<libc::cmsghdr>(), 12);
+            assert_eq!(cmsg_len(), 16);
+            assert_eq!(cmsg_space(), 16);
+        }
+    }
+
+    #[test]
+    fn fd_passing() {
+        let (tx, rx) = UnixStream::pair().unwrap();
+
+        let (send_tx, mut send_rx) = UnixStream::pair().unwrap();
+
+        let fd = send_tx.into_raw_fd();
+        assert_eq!(sendmsg(tx.as_raw_fd(), b"a", Some(fd)).unwrap(), 1);
+        unsafe { libc::close(fd) };
+
+        let mut buf = [0u8];
+        let (got, fd) = recvmsg(rx.as_raw_fd(), &mut buf).unwrap();
+        assert_eq!(got, 1);
+        assert_eq!(&buf, b"a");
+
+        let mut send_tx = unsafe { UnixStream::from_raw_fd(fd.unwrap()) };
+        assert_eq!(send_tx.write(b"b").unwrap(), 1);
+
+        let mut buf = [0u8];
+        assert_eq!(send_rx.read(&mut buf).unwrap(), 1);
+        assert_eq!(&buf, b"b");
+    }
+}
--- a/media/audioipc/client/Cargo.toml
+++ b/media/audioipc/client/Cargo.toml
@@ -1,11 +1,11 @@
 [package]
 name = "audioipc-client"
 version = "0.1.0"
 authors = ["Dan Glastonbury <dan.glastonbury@gmail.com>"]
 description = "Cubeb Backend for talking to remote cubeb server."
 
 [dependencies]
 audioipc = { path="../audioipc" }
+cubeb-backend = { path="../../cubeb-rs/cubeb-backend" }
 cubeb-core = { path="../../cubeb-rs/cubeb-core" }
-cubeb-backend = { path="../../cubeb-rs/cubeb-backend" }
 log = "^0.3.6"
--- a/media/audioipc/client/src/context.rs
+++ b/media/audioipc/client/src/context.rs
@@ -1,17 +1,18 @@
 // Copyright © 2017 Mozilla Foundation
 //
 // This program is made available under an ISC-style license.  See the
 // accompanying file LICENSE for details
 
 use ClientStream;
+use assert_not_in_callback;
 use audioipc::{self, ClientMessage, Connection, ServerMessage, messages};
 use cubeb_backend::{Context, Ops};
-use cubeb_core::{DeviceId, DeviceType, Error, Result, StreamParams, ffi};
+use cubeb_core::{DeviceId, DeviceType, Error, ErrorCode, Result, StreamParams, ffi};
 use cubeb_core::binding::Binding;
 use std::ffi::{CStr, CString};
 use std::mem;
 use std::os::raw::c_void;
 use std::os::unix::net::UnixStream;
 use std::sync::{Mutex, MutexGuard};
 use stream;
 
@@ -28,79 +29,96 @@ macro_rules! t(
             Err(_) => return Err(Error::default())
         }
     ));
 
 pub const CLIENT_OPS: Ops = capi_new!(ClientContext, ClientStream);
 
 impl ClientContext {
     #[doc(hidden)]
-    pub fn conn(&self) -> MutexGuard<Connection> {
+    pub fn connection(&self) -> MutexGuard<Connection> {
         self.connection.lock().unwrap()
     }
 }
 
 impl Context for ClientContext {
     fn init(_context_name: Option<&CStr>) -> Result<*mut ffi::cubeb> {
+        assert_not_in_callback();
         // TODO: encapsulate connect, etc inside audioipc.
         let stream = t!(UnixStream::connect(audioipc::get_uds_path()));
         let ctx = Box::new(ClientContext {
             _ops: &CLIENT_OPS as *const _,
             connection: Mutex::new(Connection::new(stream))
         });
         Ok(Box::into_raw(ctx) as *mut _)
     }
 
     fn backend_id(&self) -> &'static CStr {
+        // HACK: This is called reentrantly from Gecko's AudioStream::DataCallback.
+        //assert_not_in_callback();
         unsafe { CStr::from_ptr(b"remote\0".as_ptr() as *const _) }
     }
 
     fn max_channel_count(&self) -> Result<u32> {
-        send_recv!(self.conn(), ContextGetMaxChannelCount => ContextMaxChannelCount())
+        // HACK: This needs to be reentrant as MSG calls it from within data_callback.
+        //assert_not_in_callback();
+        //let mut conn = self.connection();
+        //send_recv!(conn, ContextGetMaxChannelCount => ContextMaxChannelCount())
+        warn!("Context::max_channel_count lying about result until reentrancy issues resolved.");
+        Ok(2)
     }
 
     fn min_latency(&self, params: &StreamParams) -> Result<u32> {
+        assert_not_in_callback();
         let params = messages::StreamParams::from(unsafe { &*params.raw() });
-        send_recv!(self.conn(), ContextGetMinLatency(params) => ContextMinLatency())
+        let mut conn = self.connection();
+        send_recv!(conn, ContextGetMinLatency(params) => ContextMinLatency())
     }
 
     fn preferred_sample_rate(&self) -> Result<u32> {
-        send_recv!(self.conn(), ContextGetPreferredSampleRate => ContextPreferredSampleRate())
+        assert_not_in_callback();
+        let mut conn = self.connection();
+        send_recv!(conn, ContextGetPreferredSampleRate => ContextPreferredSampleRate())
     }
 
     fn preferred_channel_layout(&self) -> Result<ffi::cubeb_channel_layout> {
-        send_recv!(self.conn(), ContextGetPreferredChannelLayout => ContextPreferredChannelLayout())
+        assert_not_in_callback();
+        let mut conn = self.connection();
+        send_recv!(conn, ContextGetPreferredChannelLayout => ContextPreferredChannelLayout())
     }
 
     fn enumerate_devices(&self, devtype: DeviceType) -> Result<ffi::cubeb_device_collection> {
+        assert_not_in_callback();
+        let mut conn = self.connection();
         let v: Vec<ffi::cubeb_device_info> =
-            match send_recv!(self.conn(), ContextGetDeviceEnumeration(devtype.bits()) => ContextEnumeratedDevices()) {
+            match send_recv!(conn, ContextGetDeviceEnumeration(devtype.bits()) => ContextEnumeratedDevices()) {
                 Ok(mut v) => v.drain(..).map(|i| i.into()).collect(),
                 Err(e) => return Err(e),
             };
         let vs = v.into_boxed_slice();
         let coll = ffi::cubeb_device_collection {
             count: vs.len(),
             device: vs.as_ptr()
         };
         // Giving away the memory owned by vs.  Don't free it!
         // Reclaimed in `device_collection_destroy`.
         mem::forget(vs);
         Ok(coll)
     }
 
     fn device_collection_destroy(&self, collection: *mut ffi::cubeb_device_collection) {
+        assert_not_in_callback();
         unsafe {
             let coll = &*collection;
             let mut devices = Vec::from_raw_parts(
                 coll.device as *mut ffi::cubeb_device_info,
                 coll.count,
                 coll.count
             );
-            for dev in devices.iter_mut() {
+            for dev in &mut devices {
                 if !dev.device_id.is_null() {
                     let _ = CString::from_raw(dev.device_id as *mut _);
                 }
                 if !dev.group_id.is_null() {
                     let _ = CString::from_raw(dev.group_id as *mut _);
                 }
                 if !dev.vendor_name.is_null() {
                     let _ = CString::from_raw(dev.vendor_name as *mut _);
@@ -120,16 +138,17 @@ impl Context for ClientContext {
         output_device: DeviceId,
         output_stream_params: Option<&ffi::cubeb_stream_params>,
         latency_frame: u32,
         // These params aren't sent to the server
         data_callback: ffi::cubeb_data_callback,
         state_callback: ffi::cubeb_state_callback,
         user_ptr: *mut c_void,
     ) -> Result<*mut ffi::cubeb_stream> {
+        assert_not_in_callback();
 
         fn opt_stream_params(p: Option<&ffi::cubeb_stream_params>) -> Option<messages::StreamParams> {
             match p {
                 Some(raw) => Some(messages::StreamParams::from(raw)),
                 None => None,
             }
         }
 
@@ -144,27 +163,38 @@ impl Context for ClientContext {
         let init_params = messages::StreamInitParams {
             stream_name: stream_name,
             input_device: input_device.raw() as _,
             input_stream_params: input_stream_params,
             output_device: output_device.raw() as _,
             output_stream_params: output_stream_params,
             latency_frames: latency_frame
         };
-        stream::init(&self, init_params, data_callback, state_callback, user_ptr)
+        stream::init(self, init_params, data_callback, state_callback, user_ptr)
     }
 
     fn register_device_collection_changed(
         &self,
         _dev_type: DeviceType,
         _collection_changed_callback: ffi::cubeb_device_collection_changed_callback,
         _user_ptr: *mut c_void,
     ) -> Result<()> {
+        assert_not_in_callback();
         Ok(())
     }
 }
 
 impl Drop for ClientContext {
     fn drop(&mut self) {
+        let mut conn = self.connection();
         info!("ClientContext drop...");
-        let _: Result<()> = send_recv!(self.conn(), ClientDisconnect => ClientDisconnected);
+        let r = conn.send(ServerMessage::ClientDisconnect);
+        if r.is_err() {
+            debug!("ClientContext::Drop send error={:?}", r);
+        } else {
+            let r = conn.receive();
+            if let Ok(ClientMessage::ClientDisconnected) = r {
+            } else {
+                debug!("ClientContext::Drop receive error={:?}", r);
+            }
+        }
     }
 }
--- a/media/audioipc/client/src/lib.rs
+++ b/media/audioipc/client/src/lib.rs
@@ -16,13 +16,28 @@ mod context;
 mod stream;
 
 use context::ClientContext;
 use cubeb_backend::capi;
 use cubeb_core::ffi;
 use std::os::raw::{c_char, c_int};
 use stream::ClientStream;
 
+thread_local!(static IN_CALLBACK: std::cell::RefCell<bool> = std::cell::RefCell::new(false));
+
+fn set_in_callback(in_callback: bool) {
+    IN_CALLBACK.with(|b| {
+        assert_eq!(*b.borrow(), !in_callback);
+        *b.borrow_mut() = in_callback;
+    });
+}
+
+fn assert_not_in_callback() {
+    IN_CALLBACK.with(|b| {
+        assert_eq!(*b.borrow(), false);
+    });
+}
+
 #[no_mangle]
 /// Entry point from C code.
 pub unsafe extern "C" fn audioipc_client_init(c: *mut *mut ffi::cubeb, context_name: *const c_char) -> c_int {
     capi::capi_init::<ClientContext>(c, context_name)
 }
--- a/media/audioipc/client/src/send_recv.rs
+++ b/media/audioipc/client/src/send_recv.rs
@@ -12,31 +12,41 @@ macro_rules! send_recv {
         send_recv!(__send $conn, $smsg, $($a),*);
         send_recv!(__recv $conn, $rmsg)
     }};
     ($conn:expr, $smsg:ident($($a:expr),*) => $rmsg:ident()) => {{
         send_recv!(__send $conn, $smsg, $($a),*);
         send_recv!(__recv $conn, $rmsg __result)
     }};
     //
-    (__send $conn:expr, $smsg:ident) => (
-        $conn.send(ServerMessage::$smsg)
-            .unwrap();
-    );
-    (__send $conn:expr, $smsg:ident, $($a:expr),*) => (
-        $conn.send(ServerMessage::$smsg($($a),*))
-            .unwrap();
-    );
-    (__recv $conn:expr, $rmsg:ident) => (
-        if let ClientMessage::$rmsg = $conn.receive().unwrap() {
+    (__send $conn:expr, $smsg:ident) => ({
+        let r = $conn.send(ServerMessage::$smsg);
+        if r.is_err() {
+            debug!("send error - got={:?}", r);
+            return Err(ErrorCode::Error.into());
+        }
+    });
+    (__send $conn:expr, $smsg:ident, $($a:expr),*) => ({
+        let r = $conn.send(ServerMessage::$smsg($($a),*));
+        if r.is_err() {
+            debug!("send error - got={:?}", r);
+            return Err(ErrorCode::Error.into());
+        }
+    });
+    (__recv $conn:expr, $rmsg:ident) => ({
+        let r = $conn.receive().unwrap();
+        if let ClientMessage::$rmsg = r {
             Ok(())
         } else {
-            panic!("wrong message received");
+            debug!("receive error - got={:?}", r);
+            Err(ErrorCode::Error.into())
         }
-    );
-    (__recv $conn:expr, $rmsg:ident __result) => (
-        if let ClientMessage::$rmsg(v) = $conn.receive().unwrap() {
+    });
+    (__recv $conn:expr, $rmsg:ident __result) => ({
+        let r = $conn.receive().unwrap();
+        if let ClientMessage::$rmsg(v) = r {
             Ok(v)
         } else {
-            panic!("wrong message received");
+            debug!("receive error - got={:?}", r);
+            Err(ErrorCode::Error.into())
         }
-    )
+    })
 }
--- a/media/audioipc/client/src/stream.rs
+++ b/media/audioipc/client/src/stream.rs
@@ -1,214 +1,277 @@
 // Copyright © 2017 Mozilla Foundation
 //
 // This program is made available under an ISC-style license.  See the
 // accompanying file LICENSE for details
 
 use ClientContext;
+use {set_in_callback, assert_not_in_callback};
 use audioipc::{ClientMessage, Connection, ServerMessage, messages};
-use audioipc::shm::{SharedMemSlice, SharedMemMutSlice};
+use audioipc::shm::{SharedMemMutSlice, SharedMemSlice};
 use cubeb_backend::Stream;
 use cubeb_core::{ErrorCode, Result, ffi};
 use std::ffi::CString;
+use std::fs::File;
 use std::os::raw::c_void;
 use std::os::unix::io::FromRawFd;
-use std::fs::File;
 use std::ptr;
 use std::thread;
 
 // TODO: Remove and let caller allocate based on cubeb backend requirements.
 const SHM_AREA_SIZE: usize = 2 * 1024 * 1024;
 
 pub struct ClientStream<'ctx> {
     // This must be a reference to Context for cubeb, cubeb accesses stream methods via stream->context->ops
     context: &'ctx ClientContext,
     token: usize,
     join_handle: Option<thread::JoinHandle<()>>
 }
 
 fn stream_thread(
     mut conn: Connection,
-    input_shm: SharedMemSlice,
+    input_shm: &SharedMemSlice,
     mut output_shm: SharedMemMutSlice,
     data_cb: ffi::cubeb_data_callback,
     state_cb: ffi::cubeb_state_callback,
     user_ptr: usize,
 ) {
     loop {
         let r = match conn.receive::<ClientMessage>() {
             Ok(r) => r,
             Err(e) => {
                 debug!("stream_thread: Failed to receive message: {:?}", e);
-                continue;
+                return;
             },
         };
 
         match r {
             ClientMessage::StreamDestroyed => {
                 info!("stream_thread: Shutdown callback thread.");
                 return;
             },
             ClientMessage::StreamDataCallback(nframes, frame_size) => {
-                info!(
+                trace!(
                     "stream_thread: Data Callback: nframes={} frame_size={}",
                     nframes,
                     frame_size
                 );
                 // TODO: This is proof-of-concept. Make it better.
-                let input_ptr: *const u8 = input_shm.get_slice(nframes as usize * frame_size).unwrap().as_ptr();
-                let output_ptr: *mut u8 = output_shm.get_mut_slice(nframes as usize * frame_size).unwrap().as_mut_ptr();
+                let input_ptr: *const u8 = input_shm
+                    .get_slice(nframes as usize * frame_size)
+                    .unwrap()
+                    .as_ptr();
+                let output_ptr: *mut u8 = output_shm
+                    .get_mut_slice(nframes as usize * frame_size)
+                    .unwrap()
+                    .as_mut_ptr();
+                set_in_callback(true);
                 let nframes = data_cb(
                     ptr::null_mut(),
                     user_ptr as *mut c_void,
                     input_ptr as *const _,
                     output_ptr as *mut _,
                     nframes as _
                 );
-                conn.send(ServerMessage::StreamDataCallback(nframes as isize)).unwrap();
+                set_in_callback(false);
+                let r = conn.send(ServerMessage::StreamDataCallback(nframes as isize));
+                if r.is_err() {
+                    debug!("stream_thread: Failed to send StreamDataCallback: {:?}", r);
+                    return;
+                }
             },
             ClientMessage::StreamStateCallback(state) => {
                 info!("stream_thread: State Callback: {:?}", state);
+                set_in_callback(true);
                 state_cb(ptr::null_mut(), user_ptr as *mut _, state);
+                set_in_callback(false);
             },
             m => {
                 info!("Unexpected ClientMessage: {:?}", m);
             },
         }
     }
 }
 
 impl<'ctx> ClientStream<'ctx> {
     fn init(
         ctx: &'ctx ClientContext,
         init_params: messages::StreamInitParams,
         data_callback: ffi::cubeb_data_callback,
         state_callback: ffi::cubeb_state_callback,
         user_ptr: *mut c_void,
     ) -> Result<*mut ffi::cubeb_stream> {
+        assert_not_in_callback();
+        let mut conn = ctx.connection();
 
-        ctx.conn()
-            .send(ServerMessage::StreamInit(init_params))
-            .unwrap();
+        let r = conn.send(ServerMessage::StreamInit(init_params));
+        if r.is_err() {
+            debug!("ClientStream::init: Failed to send StreamInit: {:?}", r);
+            return Err(ErrorCode::Error.into());
+        }
 
-        let r = match ctx.conn().receive_with_fd::<ClientMessage>() {
+        let r = match conn.receive_with_fd::<ClientMessage>() {
             Ok(r) => r,
             Err(_) => return Err(ErrorCode::Error.into()),
         };
 
-        let (token, conn) = match r {
-            (ClientMessage::StreamCreated(tok), Some(fd)) => (tok, unsafe {
-                Connection::from_raw_fd(fd)
-            }),
-            (ClientMessage::StreamCreated(_), None) => {
-                debug!("Missing fd!");
-                return Err(ErrorCode::Error.into());
+        let (token, conn2) = match r {
+            ClientMessage::StreamCreated(tok) => {
+                let fd = conn.take_fd();
+                if fd.is_none() {
+                    debug!("Missing fd!");
+                    return Err(ErrorCode::Error.into());
+                }
+                (tok, unsafe { Connection::from_raw_fd(fd.unwrap()) })
             },
-            (m, _) => {
+            m => {
                 debug!("Unexpected message: {:?}", m);
                 return Err(ErrorCode::Error.into());
             },
         };
 
         // TODO: It'd be nicer to receive these two fds as part of
         // StreamCreated, but that requires changing sendmsg/recvmsg to
         // support multiple fds.
-        let r = match ctx.conn().receive_with_fd::<ClientMessage>() {
+        let r = match conn.receive_with_fd::<ClientMessage>() {
             Ok(r) => r,
             Err(_) => return Err(ErrorCode::Error.into()),
         };
 
         let input_file = match r {
-            (ClientMessage::StreamCreatedInputShm, Some(fd)) => unsafe {
-                File::from_raw_fd(fd)
+            ClientMessage::StreamCreatedInputShm => {
+                let fd = conn.take_fd();
+                if fd.is_none() {
+                    debug!("Missing fd!");
+                    return Err(ErrorCode::Error.into());
+                }
+                unsafe { File::from_raw_fd(fd.unwrap()) }
             },
-            (m, _) => {
+            m => {
                 debug!("Unexpected message: {:?}", m);
                 return Err(ErrorCode::Error.into());
             },
         };
 
-        let input_shm = SharedMemSlice::from(input_file,
-                                             SHM_AREA_SIZE).unwrap();
+        let input_shm = SharedMemSlice::from(input_file, SHM_AREA_SIZE).unwrap();
 
-        let r = match ctx.conn().receive_with_fd::<ClientMessage>() {
+        let r = match conn.receive_with_fd::<ClientMessage>() {
             Ok(r) => r,
             Err(_) => return Err(ErrorCode::Error.into()),
         };
 
         let output_file = match r {
-            (ClientMessage::StreamCreatedOutputShm, Some(fd)) => unsafe {
-                File::from_raw_fd(fd)
+            ClientMessage::StreamCreatedOutputShm => {
+                let fd = conn.take_fd();
+                if fd.is_none() {
+                    debug!("Missing fd!");
+                    return Err(ErrorCode::Error.into());
+                }
+                unsafe { File::from_raw_fd(fd.unwrap()) }
             },
-            (m, _) => {
+            m => {
                 debug!("Unexpected message: {:?}", m);
                 return Err(ErrorCode::Error.into());
             },
         };
 
-        let output_shm = SharedMemMutSlice::from(output_file,
-                                                 SHM_AREA_SIZE).unwrap();
+        let output_shm = SharedMemMutSlice::from(output_file, SHM_AREA_SIZE).unwrap();
 
         let user_data = user_ptr as usize;
         let join_handle = thread::spawn(move || {
-            stream_thread(conn, input_shm, output_shm, data_callback, state_callback, user_data)
+            stream_thread(
+                conn2,
+                &input_shm,
+                output_shm,
+                data_callback,
+                state_callback,
+                user_data
+            )
         });
 
         Ok(Box::into_raw(Box::new(ClientStream {
             context: ctx,
             token: token,
             join_handle: Some(join_handle)
         })) as _)
     }
 }
 
 impl<'ctx> Drop for ClientStream<'ctx> {
     fn drop(&mut self) {
-        let _: Result<()> = send_recv!(self.context.conn(), StreamDestroy(self.token) => StreamDestroyed);
+        let mut conn = self.context.connection();
+        let r = conn.send(ServerMessage::StreamDestroy(self.token));
+        if r.is_err() {
+            debug!("ClientStream::Drop send error={:?}", r);
+        } else {
+            let r = conn.receive();
+            if let Ok(ClientMessage::StreamDestroyed) = r {
+            } else {
+                debug!("ClientStream::Drop receive error={:?}", r);
+            }
+        }
+        // XXX: This is guaranteed to wait forever if the send failed.
         self.join_handle.take().unwrap().join().unwrap();
     }
 }
 
 impl<'ctx> Stream for ClientStream<'ctx> {
     fn start(&self) -> Result<()> {
-        send_recv!(self.context.conn(), StreamStart(self.token) => StreamStarted)
+        assert_not_in_callback();
+        let mut conn = self.context.connection();
+        send_recv!(conn, StreamStart(self.token) => StreamStarted)
     }
 
     fn stop(&self) -> Result<()> {
-        send_recv!(self.context.conn(), StreamStop(self.token) => StreamStopped)
+        assert_not_in_callback();
+        let mut conn = self.context.connection();
+        send_recv!(conn, StreamStop(self.token) => StreamStopped)
     }
 
     fn reset_default_device(&self) -> Result<()> {
-        send_recv!(self.context.conn(), StreamResetDefaultDevice(self.token) => StreamDefaultDeviceReset)
+        assert_not_in_callback();
+        let mut conn = self.context.connection();
+        send_recv!(conn, StreamResetDefaultDevice(self.token) => StreamDefaultDeviceReset)
     }
 
     fn position(&self) -> Result<u64> {
-        send_recv!(self.context.conn(), StreamGetPosition(self.token) => StreamPosition())
+        assert_not_in_callback();
+        let mut conn = self.context.connection();
+        send_recv!(conn, StreamGetPosition(self.token) => StreamPosition())
     }
 
     fn latency(&self) -> Result<u32> {
-        send_recv!(self.context.conn(), StreamGetLatency(self.token) => StreamLatency())
+        assert_not_in_callback();
+        let mut conn = self.context.connection();
+        send_recv!(conn, StreamGetLatency(self.token) => StreamLatency())
     }
 
     fn set_volume(&self, volume: f32) -> Result<()> {
-        send_recv!(self.context.conn(), StreamSetVolume(self.token, volume) => StreamVolumeSet)
+        assert_not_in_callback();
+        let mut conn = self.context.connection();
+        send_recv!(conn, StreamSetVolume(self.token, volume) => StreamVolumeSet)
     }
 
     fn set_panning(&self, panning: f32) -> Result<()> {
-        send_recv!(self.context.conn(), StreamSetPanning(self.token, panning) => StreamPanningSet)
+        assert_not_in_callback();
+        let mut conn = self.context.connection();
+        send_recv!(conn, StreamSetPanning(self.token, panning) => StreamPanningSet)
     }
 
     fn current_device(&self) -> Result<*const ffi::cubeb_device> {
-        match send_recv!(self.context.conn(), StreamGetCurrentDevice(self.token) => StreamCurrentDevice()) {
+        assert_not_in_callback();
+        let mut conn = self.context.connection();
+        match send_recv!(conn, StreamGetCurrentDevice(self.token) => StreamCurrentDevice()) {
             Ok(d) => Ok(Box::into_raw(Box::new(d.into()))),
             Err(e) => Err(e),
         }
     }
 
     fn device_destroy(&self, device: *const ffi::cubeb_device) -> Result<()> {
+        assert_not_in_callback();
         // It's all unsafe...
         if !device.is_null() {
             unsafe {
                 if !(*device).output_name.is_null() {
                     let _ = CString::from_raw((*device).output_name as *mut _);
                 }
                 if !(*device).input_name.is_null() {
                     let _ = CString::from_raw((*device).input_name as *mut _);
@@ -219,16 +282,17 @@ impl<'ctx> Stream for ClientStream<'ctx>
         Ok(())
     }
 
     // TODO: How do we call this back? On what thread?
     fn register_device_changed_callback(
         &self,
         _device_changed_callback: ffi::cubeb_device_changed_callback,
     ) -> Result<()> {
+        assert_not_in_callback();
         Ok(())
     }
 }
 
 pub fn init(
     ctx: &ClientContext,
     init_params: messages::StreamInitParams,
     data_callback: ffi::cubeb_data_callback,
--- a/media/audioipc/server/Cargo.toml
+++ b/media/audioipc/server/Cargo.toml
@@ -1,17 +1,18 @@
 [package]
 name = "audioipc-server"
 version = "0.1.0"
 authors = ["Dan Glastonbury <dan.glastonbury@gmail.com>"]
 description = "Remote cubeb server"
 
 [dependencies]
 audioipc = { path = "../audioipc" }
+cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
 cubeb = { path = "../../cubeb-rs/cubeb-api" }
-cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
+bytes = "0.4"
 error-chain = "0.10.0"
 lazycell = "^0.4"
 log = "^0.3.6"
 mio = "0.6.7"
 mio-uds = "0.6.4"
 slab = "0.3.0"
 
--- a/media/audioipc/server/src/channel.rs
+++ b/media/audioipc/server/src/channel.rs
@@ -109,20 +109,20 @@ impl Evented for ReceiverCtl {
         let (registration, set_readiness) = Registration::new2();
         try!(registration.register(poll, token, interest, opts));
 
         if self.inner.pending.load(Ordering::Relaxed) > 0 {
             // TODO: Don't drop readiness
             let _ = set_readiness.set_readiness(Ready::readable());
         }
 
-        self.registration.fill(registration).ok().expect(
+        self.registration.fill(registration).expect(
             "unexpected state encountered"
         );
-        self.inner.set_readiness.fill(set_readiness).ok().expect(
+        self.inner.set_readiness.fill(set_readiness).expect(
             "unexpected state encountered"
         );
 
         Ok(())
     }
 
     fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
         match self.registration.borrow() {
@@ -131,17 +131,17 @@ impl Evented for ReceiverCtl {
                 io::ErrorKind::Other,
                 "receiver not registered"
             )),
         }
     }
 
     fn deregister(&self, poll: &Poll) -> io::Result<()> {
         match self.registration.borrow() {
-            Some(registration) => <Registration as Evented>::deregister(&registration, poll),
+            Some(registration) => <Registration as Evented>::deregister(registration, poll),
             None => Err(io::Error::new(
                 io::ErrorKind::Other,
                 "receiver not registered"
             )),
         }
     }
 }
 
@@ -187,29 +187,29 @@ impl<T> From<io::Error> for TrySendError
 /*
  *
  * ===== Implement Error, Debug and Display for Errors =====
  *
  */
 
 impl<T: Any> error::Error for SendError<T> {
     fn description(&self) -> &str {
-        match self {
-            &SendError::Io(ref io_err) => io_err.description(),
-            &SendError::Disconnected(..) => "Disconnected",
+        match *self {
+            SendError::Io(ref io_err) => io_err.description(),
+            SendError::Disconnected(..) => "Disconnected",
         }
     }
 }
 
 impl<T: Any> error::Error for TrySendError<T> {
     fn description(&self) -> &str {
-        match self {
-            &TrySendError::Io(ref io_err) => io_err.description(),
-            &TrySendError::Full(..) => "Full",
-            &TrySendError::Disconnected(..) => "Disconnected",
+        match *self {
+            TrySendError::Io(ref io_err) => io_err.description(),
+            TrySendError::Full(..) => "Full",
+            TrySendError::Disconnected(..) => "Disconnected",
         }
     }
 }
 
 impl<T> fmt::Debug for SendError<T> {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
         format_send_error(self, f)
     }
@@ -230,22 +230,22 @@ impl<T> fmt::Debug for TrySendError<T> {
 impl<T> fmt::Display for TrySendError<T> {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
         format_try_send_error(self, f)
     }
 }
 
 #[inline]
 fn format_send_error<T>(e: &SendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
-    match e {
-        &SendError::Io(ref io_err) => write!(f, "{}", io_err),
-        &SendError::Disconnected(..) => write!(f, "Disconnected"),
+    match *e {
+        SendError::Io(ref io_err) => write!(f, "{}", io_err),
+        SendError::Disconnected(..) => write!(f, "Disconnected"),
     }
 }
 
 #[inline]
 fn format_try_send_error<T>(e: &TrySendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
-    match e {
-        &TrySendError::Io(ref io_err) => write!(f, "{}", io_err),
-        &TrySendError::Full(..) => write!(f, "Full"),
-        &TrySendError::Disconnected(..) => write!(f, "Disconnected"),
+    match *e {
+        TrySendError::Io(ref io_err) => write!(f, "{}", io_err),
+        TrySendError::Full(..) => write!(f, "Full"),
+        TrySendError::Disconnected(..) => write!(f, "Disconnected"),
     }
 }
--- a/media/audioipc/server/src/lib.rs
+++ b/media/audioipc/server/src/lib.rs
@@ -1,30 +1,38 @@
 #[macro_use]
 extern crate error_chain;
 
 #[macro_use]
 extern crate log;
 
 extern crate audioipc;
+extern crate bytes;
 extern crate cubeb;
 extern crate cubeb_core;
 extern crate lazycell;
 extern crate mio;
 extern crate mio_uds;
 extern crate slab;
 
-use audioipc::messages::{ClientMessage, DeviceInfo, ServerMessage, StreamParams};
+use audioipc::AutoCloseFd;
+use audioipc::async::{Async, AsyncRecvFd, AsyncSendFd};
+use audioipc::codec::{Decoder, encode};
+use audioipc::messages::{ClientMessage, DeviceInfo, ServerMessage, StreamInitParams, StreamParams};
 use audioipc::shm::{SharedMemReader, SharedMemWriter};
+use bytes::{Bytes, BytesMut};
 use cubeb_core::binding::Binding;
 use cubeb_core::ffi;
-use mio::Token;
-use mio_uds::UnixListener;
+use mio::{Ready, Token};
+use mio_uds::{UnixListener, UnixStream};
 use std::{slice, thread};
+use std::collections::VecDeque;
+use std::collections::HashSet;
 use std::convert::From;
+use std::io::Cursor;
 use std::os::raw::c_void;
 use std::os::unix::prelude::*;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, Ordering};
 
 mod channel;
 
 pub mod errors {
@@ -47,44 +55,46 @@ const SHM_AREA_SIZE: usize = 2 * 1024 * 
 // TODO: this should forward to the client.
 struct Callback {
     /// Size of input frame in bytes
     input_frame_size: u16,
     /// Size of output frame in bytes
     output_frame_size: u16,
     connection: audioipc::Connection,
     input_shm: SharedMemWriter,
-    output_shm: SharedMemReader,
+    output_shm: SharedMemReader
 }
 
 impl cubeb::StreamCallback for Callback {
     type Frame = u8;
 
     fn data_callback(&mut self, input: &[u8], output: &mut [u8]) -> isize {
-        info!("Stream data callback: {} {}", input.len(), output.len());
+        trace!("Stream data callback: {} {}", input.len(), output.len());
 
         // len is of input and output is frame len. Turn these into the real lengths.
         let real_input = unsafe {
             let size_bytes = input.len() * self.input_frame_size as usize;
             slice::from_raw_parts(input.as_ptr(), size_bytes)
         };
         let real_output = unsafe {
             let size_bytes = output.len() * self.output_frame_size as usize;
-            info!("Resize output to {}", size_bytes);
+            trace!("Resize output to {}", size_bytes);
             slice::from_raw_parts_mut(output.as_mut_ptr(), size_bytes)
         };
 
-        self.input_shm.write(&real_input).unwrap();
+        self.input_shm.write(real_input).unwrap();
 
-        self.connection
-            .send(ClientMessage::StreamDataCallback(
-                output.len() as isize,
-                self.output_frame_size as usize
-            ))
-            .unwrap();
+        let r = self.connection.send(ClientMessage::StreamDataCallback(
+            output.len() as isize,
+            self.output_frame_size as usize
+        ));
+        if r.is_err() {
+            debug!("data_callback: Failed to send to client - got={:?}", r);
+            return -1;
+        }
 
         let r = self.connection.receive();
         match r {
             Ok(ServerMessage::StreamDataCallback(cb_result)) => {
                 if cb_result >= 0 {
                     let len = cb_result as usize * self.output_frame_size as usize;
                     self.output_shm.read(&mut real_output[..len]).unwrap();
                     cb_result
@@ -96,372 +106,486 @@ impl cubeb::StreamCallback for Callback 
                 debug!("Unexpected message {:?} during callback", r);
                 -1
             },
         }
     }
 
     fn state_callback(&mut self, state: cubeb::State) {
         info!("Stream state callback: {:?}", state);
+        // TODO: Share this conversion with the same code in cubeb-rs?
+        let state = match state {
+            cubeb::State::Started => ffi::CUBEB_STATE_STARTED,
+            cubeb::State::Stopped => ffi::CUBEB_STATE_STOPPED,
+            cubeb::State::Drained => ffi::CUBEB_STATE_DRAINED,
+            cubeb::State::Error => ffi::CUBEB_STATE_ERROR,
+        };
+        let r = self.connection.send(
+            ClientMessage::StreamStateCallback(state)
+        );
+        if r.is_err() {
+            debug!("state_callback: Failed to send to client - got={:?}", r);
+        }
     }
 }
 
 impl Drop for Callback {
     fn drop(&mut self) {
-        self.connection
-            .send(ClientMessage::StreamDestroyed)
-            .unwrap();
+        let r = self.connection.send(ClientMessage::StreamDestroyed);
+        if r.is_err() {
+            debug!("Callback::drop failed to send StreamDestroyed = {:?}", r);
+        }
     }
 }
 
 type Slab<T> = slab::Slab<T, Token>;
 type StreamSlab = slab::Slab<cubeb::Stream<Callback>, usize>;
 
 // TODO: Server token must be outside range used by server.connections slab.
 // usize::MAX is already used internally in mio.
 const QUIT: Token = Token(std::usize::MAX - 2);
 const SERVER: Token = Token(std::usize::MAX - 1);
 
 struct ServerConn {
-    connection: audioipc::Connection,
+    //connection: audioipc::Connection,
+    io: UnixStream,
     token: Option<Token>,
-    streams: StreamSlab
+    streams: StreamSlab,
+    decoder: Decoder,
+    recv_buffer: BytesMut,
+    send_buffer: BytesMut,
+    pending_send: VecDeque<(Bytes, Option<AutoCloseFd>)>,
+    device_ids: HashSet<usize>
 }
 
 impl ServerConn {
-    fn new<FD>(fd: FD) -> ServerConn
-    where
-        FD: IntoRawFd,
-    {
-        ServerConn {
-            connection: unsafe { audioipc::Connection::from_raw_fd(fd.into_raw_fd()) },
+    fn new(io: UnixStream) -> ServerConn {
+        let mut sc = ServerConn {
+            io: io,
             token: None,
             // TODO: Handle increasing slab size. Pick a good default size.
-            streams: StreamSlab::with_capacity(64)
+            streams: StreamSlab::with_capacity(64),
+            decoder: Decoder::new(),
+            recv_buffer: BytesMut::with_capacity(4096),
+            send_buffer: BytesMut::with_capacity(4096),
+            pending_send: VecDeque::new(),
+            device_ids: HashSet::new()
+        };
+        sc.device_ids.insert(0); // nullptr is always a valid (default) device id.
+        sc
+    }
+
+    fn process_read(&mut self, context: &Result<cubeb::Context>) -> Result<Ready> {
+        // According to *something*, processing non-blocking stream
+        // should attempt to read until EWOULDBLOCK is returned.
+        while let Async::Ready((n, fd)) = try!(self.io.recv_buf_fd(&mut self.recv_buffer)) {
+            trace!("Received {} bytes and fd {:?}", n, fd);
+
+            // Reading 0 signifies EOF
+            if n == 0 {
+                return Err(
+                    ::errors::ErrorKind::AudioIPC(::audioipc::errors::ErrorKind::Disconnected).into()
+                );
+            }
+
+            if let Some(fd) = fd {
+                trace!("Unexpectedly received an fd from client.");
+                let _ = unsafe { AutoCloseFd::from_raw_fd(fd) };
+            }
+
+            // Process all the complete messages contained in
+            // send.recv_buffer.  It's possible that a read might not
+            // return a complete message, so self.decoder.decode
+            // returns Ok(None).
+            loop {
+                match self.decoder.decode::<ServerMessage>(&mut self.recv_buffer) {
+                    Ok(Some(msg)) => {
+                        info!("ServerConn::process: got {:?}", msg);
+                        try!(self.process_msg(&msg, context));
+                    },
+                    Ok(None) => {
+                        break;
+                    },
+                    Err(e) => {
+                        return Err(e).chain_err(|| "Failed to decoder ServerMessage");
+                    },
+                }
+            }
         }
+
+        // Send any pending responses to client.
+        self.flush_pending_send()
     }
 
-    fn process(&mut self, poll: &mut mio::Poll, context: &Result<Option<cubeb::Context>>) -> Result<()> {
-        let r = self.connection.receive();
-        info!("ServerConn::process: got {:?}", r);
+    // Process a request coming from the client.
+    fn process_msg(&mut self, msg: &ServerMessage, context: &Result<cubeb::Context>) -> Result<()> {
+        let resp: ClientMessage = if let Ok(ref context) = *context {
+            if let ServerMessage::StreamInit(ref params) = *msg {
+                return self.process_stream_init(context, params);
+            };
+
+            match *msg {
+                ServerMessage::ClientConnect => {
+                    panic!("already connected");
+                },
+
+                ServerMessage::ClientDisconnect => {
+                    // TODO:
+                    //self.connection.client_disconnect();
+                    ClientMessage::ClientDisconnected
+                },
+
+                ServerMessage::ContextGetBackendId => ClientMessage::ContextBackendId(),
+
+                ServerMessage::ContextGetMaxChannelCount => {
+                    context
+                        .max_channel_count()
+                        .map(ClientMessage::ContextMaxChannelCount)
+                        .unwrap_or_else(error)
+                },
+
+                ServerMessage::ContextGetMinLatency(ref params) => {
+                    let format = cubeb::SampleFormat::from(params.format);
+                    let layout = cubeb::ChannelLayout::from(params.layout);
+
+                    let params = cubeb::StreamParamsBuilder::new()
+                        .format(format)
+                        .rate(u32::from(params.rate))
+                        .channels(u32::from(params.channels))
+                        .layout(layout)
+                        .take();
+
+                    context
+                        .min_latency(&params)
+                        .map(ClientMessage::ContextMinLatency)
+                        .unwrap_or_else(error)
+                },
+
+                ServerMessage::ContextGetPreferredSampleRate => {
+                    context
+                        .preferred_sample_rate()
+                        .map(ClientMessage::ContextPreferredSampleRate)
+                        .unwrap_or_else(error)
+                },
+
+                ServerMessage::ContextGetPreferredChannelLayout => {
+                    context
+                        .preferred_channel_layout()
+                        .map(|l| ClientMessage::ContextPreferredChannelLayout(l as _))
+                        .unwrap_or_else(error)
+                },
+
+                ServerMessage::ContextGetDeviceEnumeration(device_type) => {
+                    context
+                        .enumerate_devices(cubeb::DeviceType::from_bits_truncate(device_type))
+                        .map(|devices| {
+                            let v: Vec<DeviceInfo> = devices.iter()
+                                                            .map(|i| i.raw().into())
+                                                            .collect();
+                            for i in &v {
+                                self.device_ids.insert(i.devid);
+                            }
+                            ClientMessage::ContextEnumeratedDevices(v)
+                        })
+                        .unwrap_or_else(error)
+                },
 
-        if let &Ok(Some(ref ctx)) = context {
-            // TODO: Might need a simple state machine to deal with
-            // create/use/destroy ordering, etc.
-            // TODO: receive() and all this handling should be moved out
-            // of this event loop code.
-            let msg = try!(r);
-            let _ = try!(self.process_msg(&msg, ctx));
+                ServerMessage::StreamInit(_) => {
+                    panic!("StreamInit should have already been handled.");
+                },
+
+                ServerMessage::StreamDestroy(stm_tok) => {
+                    self.streams.remove(stm_tok);
+                    ClientMessage::StreamDestroyed
+                },
+
+                ServerMessage::StreamStart(stm_tok) => {
+                    let _ = self.streams[stm_tok].start();
+                    ClientMessage::StreamStarted
+                },
+
+                ServerMessage::StreamStop(stm_tok) => {
+                    let _ = self.streams[stm_tok].stop();
+                    ClientMessage::StreamStopped
+                },
+
+                ServerMessage::StreamGetPosition(stm_tok) => {
+                    self.streams[stm_tok]
+                        .position()
+                        .map(ClientMessage::StreamPosition)
+                        .unwrap_or_else(error)
+                },
+
+                ServerMessage::StreamGetLatency(stm_tok) => {
+                    self.streams[stm_tok]
+                        .latency()
+                        .map(ClientMessage::StreamLatency)
+                        .unwrap_or_else(error)
+                },
+
+                ServerMessage::StreamSetVolume(stm_tok, volume) => {
+                    self.streams[stm_tok]
+                        .set_volume(volume)
+                        .map(|_| ClientMessage::StreamVolumeSet)
+                        .unwrap_or_else(error)
+                },
+
+                ServerMessage::StreamSetPanning(stm_tok, panning) => {
+                    self.streams[stm_tok]
+                        .set_panning(panning)
+                        .map(|_| ClientMessage::StreamPanningSet)
+                        .unwrap_or_else(error)
+                },
+
+                ServerMessage::StreamGetCurrentDevice(stm_tok) => {
+                    self.streams[stm_tok]
+                        .current_device()
+                        .map(|device| ClientMessage::StreamCurrentDevice(device.into()))
+                        .unwrap_or_else(error)
+                },
+
+                _ => {
+                    bail!("Unexpected Message");
+                },
+            }
         } else {
-            self.send_error(cubeb::Error::new());
+            error(cubeb::Error::new())
+        };
+
+        debug!("process_msg: req={:?}, resp={:?}", msg, resp);
+
+        self.queue_message(resp)
+    }
+
+    // Stream init is special, so it's been separated from process_msg.
+    fn process_stream_init(&mut self, context: &cubeb::Context, params: &StreamInitParams) -> Result<()> {
+        fn opt_stream_params(params: Option<&StreamParams>) -> Option<cubeb::StreamParams> {
+            params.and_then(|p| {
+                let raw = ffi::cubeb_stream_params::from(p);
+                Some(unsafe { cubeb::StreamParams::from_raw(&raw as *const _) })
+            })
         }
 
-        poll.reregister(
-            &self.connection,
-            self.token.unwrap(),
-            mio::Ready::readable(),
-            mio::PollOpt::edge() | mio::PollOpt::oneshot()
-        ).unwrap();
+        fn frame_size_in_bytes(params: Option<cubeb::StreamParams>) -> u16 {
+            params
+                .map(|p| {
+                    let sample_size = match p.format() {
+                        cubeb::SampleFormat::S16LE |
+                        cubeb::SampleFormat::S16BE |
+                        cubeb::SampleFormat::S16NE => 2,
+                        cubeb::SampleFormat::Float32LE |
+                        cubeb::SampleFormat::Float32BE |
+                        cubeb::SampleFormat::Float32NE => 4,
+                    };
+                    let channel_count = p.channels() as u16;
+                    sample_size * channel_count
+                })
+                .unwrap_or(0u16)
+        }
+
+
+        if !self.device_ids.contains(&params.input_device) {
+            bail!("Invalid input_device passed to stream_init");
+        }
+        // TODO: Yuck!
+        let input_device = unsafe { cubeb::DeviceId::from_raw(params.input_device as *const _) };
+
+        if !self.device_ids.contains(&params.output_device) {
+            bail!("Invalid output_device passed to stream_init");
+        }
+        // TODO: Yuck!
+        let output_device = unsafe { cubeb::DeviceId::from_raw(params.output_device as *const _) };
+
+        let latency = params.latency_frames;
+        let mut builder = cubeb::StreamInitOptionsBuilder::new();
+        builder
+            .input_device(input_device)
+            .output_device(output_device)
+            .latency(latency);
+
+        if let Some(ref stream_name) = params.stream_name {
+            builder.stream_name(stream_name);
+        }
+        let input_stream_params = opt_stream_params(params.input_stream_params.as_ref());
+        if let Some(ref isp) = input_stream_params {
+            builder.input_stream_param(isp);
+        }
+        let output_stream_params = opt_stream_params(params.output_stream_params.as_ref());
+        if let Some(ref osp) = output_stream_params {
+            builder.output_stream_param(osp);
+        }
+        let params = builder.take();
+
+        let input_frame_size = frame_size_in_bytes(input_stream_params);
+        let output_frame_size = frame_size_in_bytes(output_stream_params);
+
+        let (conn1, conn2) = audioipc::Connection::pair()?;
+        info!("Created connection pair: {:?}-{:?}", conn1, conn2);
+
+        let (input_shm, input_file) = SharedMemWriter::new(&audioipc::get_shm_path("input"), SHM_AREA_SIZE)?;
+        let (output_shm, output_file) = SharedMemReader::new(&audioipc::get_shm_path("output"), SHM_AREA_SIZE)?;
+
+        let err = match context.stream_init(
+            &params,
+            Callback {
+                input_frame_size: input_frame_size,
+                output_frame_size: output_frame_size,
+                connection: conn2,
+                input_shm: input_shm,
+                output_shm: output_shm
+            }
+        ) {
+            Ok(stream) => {
+                let stm_tok = match self.streams.vacant_entry() {
+                    Some(entry) => {
+                        debug!(
+                            "Registering stream {:?}",
+                            entry.index(),
+                        );
+
+                        entry.insert(stream).index()
+                    },
+                    None => {
+                        // TODO: Turn into error
+                        panic!("Failed to insert stream into slab. No entries");
+                    },
+                };
+
+                try!(self.queue_init_messages(
+                    stm_tok,
+                    conn1,
+                    input_file,
+                    output_file
+                ));
+                None
+            },
+            Err(e) => Some(error(e)),
+        };
+
+        if let Some(err) = err {
+            try!(self.queue_message(err))
+        }
 
         Ok(())
     }
 
-    fn process_msg(&mut self, msg: &ServerMessage, context: &cubeb::Context) -> Result<()> {
-        match msg {
-            &ServerMessage::ClientConnect => {
-                panic!("already connected");
-            },
-            &ServerMessage::ClientDisconnect => {
-                // TODO:
-                //self.connection.client_disconnect();
-                self.connection
-                    .send(ClientMessage::ClientDisconnected)
-                    .unwrap();
-            },
-
-            &ServerMessage::ContextGetBackendId => {},
-
-            &ServerMessage::ContextGetMaxChannelCount => {
-                match context.max_channel_count() {
-                    Ok(channel_count) => {
-                        self.connection
-                            .send(ClientMessage::ContextMaxChannelCount(channel_count))
-                            .unwrap();
-                    },
-                    Err(e) => {
-                        self.send_error(e);
-                    },
-                }
-            },
-
-            &ServerMessage::ContextGetMinLatency(ref params) => {
-
-                let format = cubeb::SampleFormat::from(params.format);
-                let layout = cubeb::ChannelLayout::from(params.layout);
-
-                let params = cubeb::StreamParamsBuilder::new()
-                    .format(format)
-                    .rate(params.rate as _)
-                    .channels(params.channels as _)
-                    .layout(layout)
-                    .take();
-
-                match context.min_latency(&params) {
-                    Ok(latency) => {
-                        self.connection
-                            .send(ClientMessage::ContextMinLatency(latency))
-                            .unwrap();
-                    },
-                    Err(e) => {
-                        self.send_error(e);
-                    },
-                }
-            },
-
-            &ServerMessage::ContextGetPreferredSampleRate => {
-                match context.preferred_sample_rate() {
-                    Ok(rate) => {
-                        self.connection
-                            .send(ClientMessage::ContextPreferredSampleRate(rate))
-                            .unwrap();
-                    },
-                    Err(e) => {
-                        self.send_error(e);
-                    },
-                }
-            },
-
-            &ServerMessage::ContextGetPreferredChannelLayout => {
-                match context.preferred_channel_layout() {
-                    Ok(layout) => {
-                        self.connection
-                            .send(ClientMessage::ContextPreferredChannelLayout(layout as _))
-                            .unwrap();
-                    },
-                    Err(e) => {
-                        self.send_error(e);
-                    },
-                }
-            },
-
-            &ServerMessage::ContextGetDeviceEnumeration(device_type) => {
-                match context.enumerate_devices(cubeb::DeviceType::from_bits_truncate(device_type)) {
-                    Ok(devices) => {
-                        let v: Vec<DeviceInfo> = devices.iter().map(|i| i.raw().into()).collect();
-                        self.connection
-                            .send(ClientMessage::ContextEnumeratedDevices(v))
-                            .unwrap();
-                    },
-                    Err(e) => {
-                        self.send_error(e);
-                    },
-                }
-            },
-
-            &ServerMessage::StreamInit(ref params) => {
-                fn opt_stream_params(params: Option<&StreamParams>) -> Option<cubeb::StreamParams> {
-                    match params {
-                        Some(p) => {
-                            let raw = ffi::cubeb_stream_params::from(p);
-                            Some(unsafe { cubeb::StreamParams::from_raw(&raw as *const _) })
-                        },
-                        None => None,
-                    }
-                }
-
-                fn frame_size_in_bytes(params: Option<cubeb::StreamParams>) -> u16 {
-                    match params.as_ref() {
-                        Some(p) => {
-                            let sample_size = match p.format() {
-                                cubeb::SampleFormat::S16LE |
-                                cubeb::SampleFormat::S16BE |
-                                cubeb::SampleFormat::S16NE => 2,
-                                cubeb::SampleFormat::Float32LE |
-                                cubeb::SampleFormat::Float32BE |
-                                cubeb::SampleFormat::Float32NE => 4,
-                            };
-                            let channel_count = p.channels() as u16;
-                            sample_size * channel_count
-                        },
-                        None => 0,
-                    }
-                }
-
-                // TODO: Yuck!
-                let input_device = unsafe { cubeb::DeviceId::from_raw(params.input_device as *const _) };
-                let output_device = unsafe { cubeb::DeviceId::from_raw(params.output_device as *const _) };
-                let latency = params.latency_frames;
-                let mut builder = cubeb::StreamInitOptionsBuilder::new();
-                builder
-                    .input_device(input_device)
-                    .output_device(output_device)
-                    .latency(latency);
+    fn queue_init_messages<T, U, V>(&mut self, stm_tok: usize, conn: T, input_file: U, output_file: V) -> Result<()>
+    where
+        T: IntoRawFd,
+        U: IntoRawFd,
+        V: IntoRawFd,
+    {
+        try!(self.queue_message_fd(
+            ClientMessage::StreamCreated(stm_tok),
+            conn
+        ));
+        try!(self.queue_message_fd(
+            ClientMessage::StreamCreatedInputShm,
+            input_file
+        ));
+        try!(self.queue_message_fd(
+            ClientMessage::StreamCreatedOutputShm,
+            output_file
+        ));
+        Ok(())
+    }
 
-                if let Some(ref stream_name) = params.stream_name {
-                    builder.stream_name(stream_name);
-                }
-                let input_stream_params = opt_stream_params(params.input_stream_params.as_ref());
-                if let Some(ref isp) = input_stream_params {
-                    builder.input_stream_param(isp);
-                }
-                let output_stream_params = opt_stream_params(params.output_stream_params.as_ref());
-                if let Some(ref osp) = output_stream_params {
-                    builder.output_stream_param(osp);
-                }
-                let params = builder.take();
-
-                let input_frame_size = frame_size_in_bytes(input_stream_params);
-                let output_frame_size = frame_size_in_bytes(output_stream_params);
-
-                let (conn1, conn2) = audioipc::Connection::pair()?;
-                info!("Created connection pair: {:?}-{:?}", conn1, conn2);
-
-                let (input_shm, input_file) =
-                    SharedMemWriter::new(&audioipc::get_shm_path("input"), SHM_AREA_SIZE)?;
-                let (output_shm, output_file) =
-                    SharedMemReader::new(&audioipc::get_shm_path("output"), SHM_AREA_SIZE)?;
-
-                match context.stream_init(
-                    &params,
-                    Callback {
-                        input_frame_size: input_frame_size,
-                        output_frame_size: output_frame_size,
-                        connection: conn2,
-                        input_shm: input_shm,
-                        output_shm: output_shm,
-                    }
-                ) {
-                    Ok(stream) => {
-                        let stm_tok = match self.streams.vacant_entry() {
-                            Some(entry) => {
-                                debug!(
-                                    "Registering stream {:?}",
-                                    entry.index(),
-                                );
-
-                                entry.insert(stream).index()
-                            },
-                            None => {
-                                // TODO: Turn into error
-                                panic!("Failed to insert stream into slab. No entries");
-                            },
-                        };
-
-                        self.connection
-                            .send_with_fd(ClientMessage::StreamCreated(stm_tok), Some(conn1))
-                            .unwrap();
-                        // TODO: It'd be nicer to send these as part of
-                        // StreamCreated, but that requires changing
-                        // sendmsg/recvmsg to support multiple fds.
-                        self.connection
-                            .send_with_fd(ClientMessage::StreamCreatedInputShm, Some(input_file))
-                            .unwrap();
-                        self.connection
-                            .send_with_fd(ClientMessage::StreamCreatedOutputShm, Some(output_file))
-                            .unwrap();
-                    },
-                    Err(e) => {
-                        self.send_error(e);
-                    },
-                }
-            },
+    fn queue_message(&mut self, msg: ClientMessage) -> Result<()> {
+        debug!("queue_message: {:?}", msg);
+        encode::<ClientMessage>(&mut self.send_buffer, &msg).or_else(|e| {
+            Err(e).chain_err(|| "Failed to encode msg into send buffer")
+        })
+    }
 
-            &ServerMessage::StreamDestroy(stm_tok) => {
-                self.streams.remove(stm_tok);
-                self.connection
-                    .send(ClientMessage::StreamDestroyed)
-                    .unwrap();
-            },
-
-            &ServerMessage::StreamStart(stm_tok) => {
-                let _ = self.streams[stm_tok].start();
-                self.connection.send(ClientMessage::StreamStarted).unwrap();
-            },
-            &ServerMessage::StreamStop(stm_tok) => {
-                let _ = self.streams[stm_tok].stop();
-                self.connection.send(ClientMessage::StreamStopped).unwrap();
-            },
-            &ServerMessage::StreamGetPosition(stm_tok) => {
-                match self.streams[stm_tok].position() {
-                    Ok(position) => {
-                        self.connection
-                            .send(ClientMessage::StreamPosition(position))
-                            .unwrap();
-                    },
-                    Err(e) => {
-                        self.send_error(e);
-                    },
-                }
-            },
-            &ServerMessage::StreamGetLatency(stm_tok) => {
-                match self.streams[stm_tok].latency() {
-                    Ok(latency) => {
-                        self.connection
-                            .send(ClientMessage::StreamLatency(latency))
-                            .unwrap();
-                    },
-                    Err(e) => self.send_error(e),
-                }
-            },
-            &ServerMessage::StreamSetVolume(stm_tok, volume) => {
-                let _ = self.streams[stm_tok].set_volume(volume);
-                self.connection
-                    .send(ClientMessage::StreamVolumeSet)
-                    .unwrap();
-            },
-            &ServerMessage::StreamSetPanning(stm_tok, panning) => {
-                let _ = self.streams[stm_tok].set_panning(panning);
-                self.connection
-                    .send(ClientMessage::StreamPanningSet)
-                    .unwrap();
-            },
-            &ServerMessage::StreamGetCurrentDevice(stm_tok) => {
-                let err = match self.streams[stm_tok].current_device() {
-                    Ok(device) => {
-                        // TODO: Yuck!
-                        self.connection
-                            .send(ClientMessage::StreamCurrentDevice(device.into()))
-                            .unwrap();
-                        None
-                    },
-                    Err(e) => Some(e),
-                };
-                if let Some(e) = err {
-                    self.send_error(e);
-                }
-            },
-            _ => {
-                bail!("Unexpected Message");
-            },
-        }
+    // Since send_fd supports sending one RawFd at a time, queuing a
+    // message with a RawFd forces use to take the current send_buffer
+    // and move it pending queue.
+    fn queue_message_fd<FD: IntoRawFd>(&mut self, msg: ClientMessage, fd: FD) -> Result<()> {
+        let fd = fd.into_raw_fd();
+        debug!("queue_message_fd: {:?} {:?}", msg, fd);
+        try!(self.queue_message(msg));
+        self.take_pending_send(Some(fd));
         Ok(())
     }
 
-    fn send_error(&mut self, error: cubeb::Error) {
-        self.connection
-            .send(ClientMessage::ContextError(error.raw_code()))
-            .unwrap();
+    // Take the current messages in the send_buffer and move them to
+    // pending queue.
+    fn take_pending_send(&mut self, fd: Option<RawFd>) {
+        let pending = self.send_buffer.take().freeze();
+        debug!("take_pending_send: ({:?} {:?})", pending, fd);
+        self.pending_send.push_back((
+            pending,
+            fd.map(|fd| unsafe { AutoCloseFd::from_raw_fd(fd) })
+        ));
+    }
+
+    // Process the pending queue and send them to client.
+    fn flush_pending_send(&mut self) -> Result<Ready> {
+        debug!("flush_pending_send");
+        // take any pending messages in the send buffer.
+        if !self.send_buffer.is_empty() {
+            self.take_pending_send(None);
+        }
+
+        trace!("pending queue: {:?}", self.pending_send);
+
+        let mut result = Ready::readable();
+        let mut processed = 0;
+
+        for &mut (ref mut buf, ref mut fd) in &mut self.pending_send {
+            trace!("sending buf {:?}, fd {:?}", buf, fd);
+            let r = {
+                let mut src = Cursor::new(buf.as_ref());
+                let fd = match *fd {
+                    Some(ref fd) => Some(fd.as_raw_fd()),
+                    None => None,
+                };
+                try!(self.io.send_buf_fd(&mut src, fd))
+            };
+            match r {
+                Async::Ready(n) if n == buf.len() => {
+                    processed += 1;
+                },
+                Async::Ready(n) => {
+                    let _ = buf.split_to(n);
+                    let _ = fd.take();
+                    result.insert(Ready::writable());
+                    break;
+                },
+                Async::NotReady => {
+                    result.insert(Ready::writable());
+                },
+            }
+        }
+
+        debug!("processed {} buffers", processed);
+
+        self.pending_send = self.pending_send.split_off(processed);
+
+        trace!("pending queue: {:?}", self.pending_send);
+
+        Ok(result)
     }
 }
 
 pub struct Server {
     socket: UnixListener,
     // Ok(None)      - Server hasn't tried to create cubeb::Context.
     // Ok(Some(ctx)) - Server has successfully created cubeb::Context.
     // Err(_)        - Server has tried and failed to create cubeb::Context.
     //                 Don't try again.
-    context: Result<Option<cubeb::Context>>,
+    context: Option<Result<cubeb::Context>>,
     conns: Slab<ServerConn>
 }
 
 impl Server {
     pub fn new(socket: UnixListener) -> Server {
         Server {
             socket: socket,
-            context: Ok(None),
+            context: None,
             conns: Slab::with_capacity(16)
         }
     }
 
     fn accept(&mut self, poll: &mut mio::Poll) -> Result<()> {
         debug!("Server accepting connection");
 
         let client_socket = match self.socket.accept() {
@@ -481,83 +605,98 @@ impl Server {
             None => {
                 panic!("failed to insert connection");
             },
         };
 
         // Register the connection
         self.conns[token].token = Some(token);
         poll.register(
-            &self.conns[token].connection,
+            &self.conns[token].io,
             token,
             mio::Ready::readable(),
             mio::PollOpt::edge() | mio::PollOpt::oneshot()
         ).unwrap();
         /*
         let r = self.conns[token].receive();
         debug!("received {:?}", r);
         let r = self.conns[token].send(ClientMessage::ClientConnected);
         debug!("sent {:?} (ClientConnected)", r);
          */
 
         // Since we have a connection try creating a cubeb context. If
         // it fails, mark the failure with Err.
-        if let Ok(None) = self.context {
-            self.context = cubeb::Context::init("AudioIPC Server", None)
-                .and_then(|ctx| Ok(Some(ctx)))
-                .or_else(|err| Err(err.into()));
+        if self.context.is_none() {
+            self.context = Some(cubeb::Context::init("AudioIPC Server", None).or_else(|e| {
+                Err(e).chain_err(|| "Unable to create cubeb context.")
+            }))
         }
 
         Ok(())
     }
 
     pub fn poll(&mut self, poll: &mut mio::Poll) -> Result<()> {
         let mut events = mio::Events::with_capacity(16);
 
         match poll.poll(&mut events, None) {
             Ok(_) => {},
             Err(e) => error!("server poll error: {}", e),
         }
 
         for event in events.iter() {
             match event.token() {
                 SERVER => {
-                    match self.accept(poll) {
-                        Err(e) => {
-                            error!("server accept error: {}", e);
-                        },
-                        _ => {},
+                    if let Err(e) = self.accept(poll) {
+                        error!("server accept error: {}", e);
                     };
                 },
                 QUIT => {
                     info!("Quitting Audio Server loop");
                     bail!("quit");
                 },
                 token => {
-                    debug!("token {:?} ready", token);
+                    trace!("token {:?} ready", token);
+
+                    let context = self.context.as_ref().expect(
+                        "Shouldn't receive a message before accepting connection."
+                    );
+
+                    let mut readiness = Ready::readable();
 
-                    let r = self.conns[token].process(poll, &self.context);
+                    if event.readiness().is_readable() {
+                        let r = self.conns[token].process_read(context);
+                        trace!("got {:?}", r);
 
-                    debug!("got {:?}", r);
+                        if let Err(e) = r {
+                            debug!("dropped client {:?} due to error {:?}", token, e);
+                            self.conns.remove(token);
+                            continue;
+                        }
+                    };
 
-                    // TODO: Handle disconnection etc.
-                    // TODO: Should be handled at a higher level by a
-                    // disconnect message.
-                    if let Err(e) = r {
-                        debug!("dropped client {:?} due to error {:?}", token, e);
-                        self.conns.remove(token);
-                        continue;
-                    }
+                    if event.readiness().is_writable() {
+                        let r = self.conns[token].flush_pending_send();
+                        trace!("got {:?}", r);
 
-                    // poll.reregister(
-                    //     &self.conn(token).connection,
-                    //     token,
-                    //     mio::Ready::readable(),
-                    //     mio::PollOpt::edge() | mio::PollOpt::oneshot()
-                    // ).unwrap();
+                        match r {
+                            Ok(r) => readiness = r,
+                            Err(e) => {
+                                debug!("dropped client {:?} due to error {:?}", token, e);
+                                self.conns.remove(token);
+                                continue;
+                            },
+                        }
+                    };
+
+                    poll.reregister(
+                        &self.conns[token].io,
+                        token,
+                        readiness,
+                        mio::PollOpt::edge() | mio::PollOpt::oneshot()
+                    ).unwrap();
                 },
             }
         }
 
         Ok(())
     }
 }
 
@@ -582,28 +721,45 @@ pub fn run(running: Arc<AtomicBool>) -> 
         mio::PollOpt::edge()
     ).unwrap();
 
     loop {
         if !running.load(Ordering::SeqCst) {
             bail!("server quit due to ctrl-c");
         }
 
-        let _ = try!(server.poll(&mut poll));
+        try!(server.poll(&mut poll));
     }
 
     //poll.deregister(&server.socket).unwrap();
 }
 
+fn error(error: cubeb::Error) -> ClientMessage {
+    ClientMessage::ContextError(error.raw_code())
+}
+
+struct ServerWrapper {
+    thread_handle: std::thread::JoinHandle<()>,
+    sender_ctl: channel::SenderCtl,
+}
+
+impl ServerWrapper {
+    fn shutdown(self) {
+        // Dropping SenderCtl here will notify the other end.
+        drop(self.sender_ctl);
+        self.thread_handle.join().unwrap();
+    }
+}
+
 #[no_mangle]
 pub extern "C" fn audioipc_server_start() -> *mut c_void {
 
     let (tx, rx) = channel::ctl_pair();
 
-    thread::spawn(move || {
+    let handle = thread::spawn(move || {
         // Ignore result.
         let _ = std::fs::remove_file(audioipc::get_uds_path());
 
         // TODO: Use a SEQPACKET, wrap it in UnixStream?
         let mut poll = mio::Poll::new().unwrap();
         let mut server = Server::new(UnixListener::bind(audioipc::get_uds_path()).unwrap());
 
         poll.register(
@@ -612,25 +768,27 @@ pub extern "C" fn audioipc_server_start(
             mio::Ready::readable(),
             mio::PollOpt::edge()
         ).unwrap();
 
         poll.register(&rx, QUIT, mio::Ready::readable(), mio::PollOpt::edge())
             .unwrap();
 
         loop {
-            match server.poll(&mut poll) {
-                Err(_) => {
-                    return;
-                },
-                _ => (),
+            if server.poll(&mut poll).is_err() {
+                return;
             }
         }
     });
 
-    Box::into_raw(Box::new(tx)) as *mut _
+    let wrapper = ServerWrapper {
+        thread_handle: handle,
+        sender_ctl: tx
+    };
+
+    Box::into_raw(Box::new(wrapper)) as *mut _
 }
 
 #[no_mangle]
 pub extern "C" fn audioipc_server_stop(p: *mut c_void) {
-    // Dropping SenderCtl here will notify the other end.
-    let _ = unsafe { Box::<channel::SenderCtl>::from_raw(p as *mut _) };
+    let wrapper = unsafe { Box::<ServerWrapper>::from_raw(p as *mut _) };
+    wrapper.shutdown();
 }