Bug 1512445 - Import latest AudioIPC from upstream, including Windows backend. r=chunmin
☠☠ backed out by 873b90887e3a ☠ ☠
authorMatthew Gregan <kinetik@flim.org>
Wed, 06 Mar 2019 20:42:38 +0000
changeset 520578 0a78b7e72e1bf620cd70608abb7f31f389f87fcc
parent 520577 9f04204160ffc4ee6337d628bee0527c4d85d794
child 520579 04c1982c1a7e914aeac4ff0fc1e5a9d280470407
push id10862
push userffxbld-merge
push dateMon, 11 Mar 2019 13:01:11 +0000
treeherdermozilla-beta@a2e7f5c935da [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewerschunmin
bugs1512445
milestone67.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1512445 - Import latest AudioIPC from upstream, including Windows backend. r=chunmin Differential Revision: https://phabricator.services.mozilla.com/D22153
media/audioipc/README_MOZILLA
media/audioipc/audioipc/Cargo.toml
media/audioipc/audioipc/src/async.rs
media/audioipc/audioipc/src/fd_passing.rs
media/audioipc/audioipc/src/handle_passing.rs
media/audioipc/audioipc/src/lib.rs
media/audioipc/audioipc/src/messages.rs
media/audioipc/audioipc/src/messagestream_unix.rs
media/audioipc/audioipc/src/messagestream_win.rs
media/audioipc/audioipc/src/tokio_named_pipes.rs
media/audioipc/client/Cargo.toml
media/audioipc/client/src/context.rs
media/audioipc/client/src/lib.rs
media/audioipc/client/src/stream.rs
media/audioipc/register-collection-not-supported.patch
media/audioipc/server/Cargo.toml
media/audioipc/server/src/lib.rs
media/audioipc/server/src/server.rs
media/audioipc/update.sh
--- a/media/audioipc/README_MOZILLA
+++ b/media/audioipc/README_MOZILLA
@@ -1,8 +1,8 @@
 The source from this directory was copied from the audioipc-2
 git repository using the update.sh script.  The only changes
 made were those applied by update.sh and the addition of
 Makefile.in build files for the Mozilla build system.
 
 The audioipc-2 git repository is: https://github.com/djg/audioipc-2.git
 
-The git commit ID used was c2148b95128f5e54ef1d18e5e457a2fa45e7ac43 (2019-02-05 15:18:22 +1300)
+The git commit ID used was 9600391d94854d971ed723e3499c0ab8ede3045b (2019-03-05 15:05:43 +1300)
--- a/media/audioipc/audioipc/Cargo.toml
+++ b/media/audioipc/audioipc/Cargo.toml
@@ -3,26 +3,31 @@ name = "audioipc"
 version = "0.2.4"
 authors = [
         "Matthew Gregan <kinetik@flim.org>",
         "Dan Glastonbury <dan.glastonbury@gmail.com>"
         ]
 description = "Remote Cubeb IPC"
 
 [dependencies]
-cubeb = "0.5.2"
+cubeb = "0.5.3"
 bincode = "1.0"
 bytes = "0.4"
 futures = "0.1.18"
 iovec = "0.1"
 libc = "0.2"
 log = "0.4"
 memmap = "0.5.2"
 scoped-tls = "0.1"
 serde = "1.*.*"
 serde_derive = "1.*.*"
 tokio-core = "0.1"
 tokio-io = "0.1"
 tokio-uds = "0.1.7"
+winapi = "0.3.6"
+mio-named-pipes = "=0.1.5"
 
 [dependencies.error-chain]
 version = "0.11.0"
 default-features = false
+
+[build-dependencies]
+cc = "1.0"
--- a/media/audioipc/audioipc/src/async.rs
+++ b/media/audioipc/audioipc/src/async.rs
@@ -1,22 +1,25 @@
 // Copyright © 2017 Mozilla Foundation
 //
 // This program is made available under an ISC-style license.  See the
 // accompanying file LICENSE for details
 
 //! Various async helpers modelled after futures-rs and tokio-io.
 
 use bytes::{Buf, BufMut};
-use futures::{Async, Poll};
+#[cfg(unix)]
+use futures::Async;
+use futures::Poll;
+#[cfg(unix)]
 use iovec::IoVec;
+#[cfg(unix)]
 use msg::{RecvMsg, SendMsg};
 use std::io;
 use tokio_io::{AsyncRead, AsyncWrite};
-use tokio_uds::UnixStream;
 
 pub trait AsyncRecvMsg: AsyncRead {
     /// Pull some bytes from this source into the specified `Buf`, returning
     /// how many bytes were read.
     ///
     /// The `buf` provided will have bytes read into it and the internal cursor
     /// will be advanced if any bytes were read. Note that this method typically
     /// will not reallocate the buffer provided.
@@ -49,22 +52,23 @@ pub trait AsyncSendMsg: AsyncWrite {
     where
         Self: Sized,
         B: Buf,
         C: Buf;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 
-impl AsyncRecvMsg for UnixStream {
+#[cfg(unix)]
+impl AsyncRecvMsg for super::AsyncMessageStream {
     fn recv_msg_buf<B>(&mut self, buf: &mut B, cmsg: &mut B) -> Poll<(usize, i32), io::Error>
     where
         B: BufMut,
     {
-        if let Async::NotReady = <UnixStream>::poll_read(self) {
+        if let Async::NotReady = <super::AsyncMessageStream>::poll_read(self) {
             return Ok(Async::NotReady);
         }
         let r = unsafe {
             // The `IoVec` type can't have a 0-length size, so we create a bunch
             // of dummy versions on the stack with 1 length which we'll quickly
             // overwrite.
             let b1: &mut [u8] = &mut [0];
             let b2: &mut [u8] = &mut [0];
@@ -118,23 +122,24 @@ impl AsyncRecvMsg for UnixStream {
                 self.need_read();
                 Ok(Async::NotReady)
             }
             Err(e) => Err(e),
         }
     }
 }
 
-impl AsyncSendMsg for UnixStream {
+#[cfg(unix)]
+impl AsyncSendMsg for super::AsyncMessageStream {
     fn send_msg_buf<B, C>(&mut self, buf: &mut B, cmsg: &C) -> Poll<usize, io::Error>
     where
         B: Buf,
         C: Buf,
     {
-        if let Async::NotReady = <UnixStream>::poll_write(self) {
+        if let Async::NotReady = <super::AsyncMessageStream>::poll_write(self) {
             return Ok(Async::NotReady);
         }
         let r = {
             // The `IoVec` type can't have a zero-length size, so create a dummy
             // version from a 1-length slice which we'll overwrite with the
             // `bytes_vec` method.
             static DUMMY: &[u8] = &[0];
             let nom = <&IoVec>::from(DUMMY);
--- a/media/audioipc/audioipc/src/fd_passing.rs
+++ b/media/audioipc/audioipc/src/fd_passing.rs
@@ -3,18 +3,17 @@
 // This program is made available under an ISC-style license.  See the
 // accompanying file LICENSE for details
 
 use async::{AsyncRecvMsg, AsyncSendMsg};
 use bytes::{Bytes, BytesMut, IntoBuf};
 use cmsg;
 use codec::Codec;
 use futures::{AsyncSink, Poll, Sink, StartSend, Stream};
-use libc;
-use messages::AssocRawFd;
+use messages::AssocRawPlatformHandle;
 use std::collections::VecDeque;
 use std::os::unix::io::RawFd;
 use std::{fmt, io, mem};
 
 const INITIAL_CAPACITY: usize = 1024;
 const BACKPRESSURE_THRESHOLD: usize = 4 * INITIAL_CAPACITY;
 const FDS_CAPACITY: usize = 16;
 
@@ -61,31 +60,31 @@ impl IncomingFds {
 #[derive(Debug)]
 struct Frame {
     msgs: Bytes,
     fds: Option<Bytes>,
 }
 
 /// A unified `Stream` and `Sink` interface over an I/O object, using
 /// the `Codec` trait to encode and decode the payload.
-pub struct FramedWithFds<A, C> {
+pub struct FramedWithPlatformHandles<A, C> {
     io: A,
     codec: C,
     // Stream
     read_buf: BytesMut,
     incoming_fds: IncomingFds,
     is_readable: bool,
     eof: bool,
     // Sink
     frames: VecDeque<Frame>,
     write_buf: BytesMut,
     outgoing_fds: BytesMut,
 }
 
-impl<A, C> FramedWithFds<A, C>
+impl<A, C> FramedWithPlatformHandles<A, C>
 where
     A: AsyncSendMsg,
 {
     // If there is a buffered frame, try to write it to `A`
     fn do_write(&mut self) -> Poll<(), io::Error> {
         trace!("do_write...");
         // Create a frame from any pending message in `write_buf`.
         if !self.write_buf.is_empty() {
@@ -157,78 +156,74 @@ where
 
         let msgs = self.write_buf.take().freeze();
         trace!("set_frame: msgs={:?} fds={:?}", msgs, fds);
 
         self.frames.push_back(Frame { msgs, fds });
     }
 }
 
-impl<A, C> Stream for FramedWithFds<A, C>
+impl<A, C> Stream for FramedWithPlatformHandles<A, C>
 where
     A: AsyncRecvMsg,
     C: Codec,
-    C::Out: AssocRawFd,
+    C::Out: AssocRawPlatformHandle,
 {
     type Item = C::Out;
     type Error = io::Error;
 
     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
         loop {
             // Repeatedly call `decode` or `decode_eof` as long as it is
             // "readable". Readable is defined as not having returned `None`. If
             // the upstream has returned EOF, and the decoder is no longer
             // readable, it can be assumed that the decoder will never become
             // readable again, at which point the stream is terminated.
             if self.is_readable {
                 if self.eof {
                     let mut item = try!(self.codec.decode_eof(&mut self.read_buf));
-                    item.take_fd(|| self.incoming_fds.take_fds());
+                    item.take_platform_handles(|| self.incoming_fds.take_fds());
                     return Ok(Some(item).into());
                 }
 
                 trace!("attempting to decode a frame");
 
                 if let Some(mut item) = try!(self.codec.decode(&mut self.read_buf)) {
                     trace!("frame decoded from buffer");
-                    item.take_fd(|| self.incoming_fds.take_fds());
+                    item.take_platform_handles(|| self.incoming_fds.take_fds());
                     return Ok(Some(item).into());
                 }
 
                 self.is_readable = false;
             }
 
             assert!(!self.eof);
 
             // Otherwise, try to read more data and try again. Make sure we've
             // got room for at least one byte to read to ensure that we don't
             // get a spurious 0 that looks like EOF
             let (n, _) = try_ready!(
                 self.io
                     .recv_msg_buf(&mut self.read_buf, self.incoming_fds.cmsg())
             );
 
-            // if flags != 0 {
-            //     error!("recv_msg_buf: flags = {:x}", flags)
-            // }
-
             if n == 0 {
                 self.eof = true;
             }
 
             self.is_readable = true;
         }
     }
 }
 
-impl<A, C> Sink for FramedWithFds<A, C>
+impl<A, C> Sink for FramedWithPlatformHandles<A, C>
 where
     A: AsyncSendMsg,
     C: Codec,
-    C::In: AssocRawFd + fmt::Debug,
+    C::In: AssocRawPlatformHandle + fmt::Debug,
 {
     type SinkItem = C::In;
     type SinkError = io::Error;
 
     fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
         trace!("start_send: item={:?}", item);
 
         // If the buffer is already over BACKPRESSURE_THRESHOLD,
@@ -236,21 +231,21 @@ where
         // over BACKPRESSURE_THRESHOLD, then reject the send.
         if self.write_buf.len() > BACKPRESSURE_THRESHOLD {
             try!(self.poll_complete());
             if self.write_buf.len() > BACKPRESSURE_THRESHOLD {
                 return Ok(AsyncSink::NotReady(item));
             }
         }
 
-        let fds = item.fd();
+        let fds = item.platform_handles();
         try!(self.codec.encode(item, &mut self.write_buf));
         let fds = fds.and_then(|fds| {
             cmsg::builder(&mut self.outgoing_fds)
-                .rights(&fds[..])
+                .rights(&fds.0[..])
                 .finish()
                 .ok()
         });
 
         trace!("item fds: {:?}", fds);
 
         if fds.is_some() {
             // Enforce splitting sends on messages that contain file
@@ -273,18 +268,18 @@ where
     }
 
     fn close(&mut self) -> Poll<(), Self::SinkError> {
         try_ready!(self.poll_complete());
         self.io.shutdown()
     }
 }
 
-pub fn framed_with_fds<A, C>(io: A, codec: C) -> FramedWithFds<A, C> {
-    FramedWithFds {
+pub fn framed_with_platformhandles<A, C>(io: A, codec: C) -> FramedWithPlatformHandles<A, C> {
+    FramedWithPlatformHandles {
         io: io,
         codec: codec,
         read_buf: BytesMut::with_capacity(INITIAL_CAPACITY),
         incoming_fds: IncomingFds::new(FDS_CAPACITY),
         is_readable: false,
         eof: false,
         frames: VecDeque::new(),
         write_buf: BytesMut::with_capacity(INITIAL_CAPACITY),
@@ -302,53 +297,65 @@ where
     let mut a = Default::default();
     <A as AsMut<[T]>>::as_mut(&mut a).clone_from_slice(slice);
     a
 }
 
 fn close_fds(fds: &[RawFd]) {
     for fd in fds {
         unsafe {
-            libc::close(*fd);
+            super::close_platformhandle(*fd);
         }
     }
 }
 
 #[cfg(test)]
 mod tests {
     use bytes::BufMut;
+    use libc;
+    use std;
 
-    const CMSG_BYTES: &[u8] =
-        b"\x1c\0\0\0\0\0\0\0\x01\0\0\0\x01\0\0\02\0\0\0[\0\0\0\\\0\0\0\xe5\xe5\xe5\xe5";
+    extern {
+        fn cmsghdr_bytes(size: *mut libc::size_t) -> *const libc::uint8_t;
+    }
+
+    fn cmsg_bytes() -> &'static [u8] {
+        let mut size = 0;
+        unsafe {
+            let ptr = cmsghdr_bytes(&mut size);
+            std::slice::from_raw_parts(ptr, size)
+        }
+    }
 
     #[test]
     fn single_cmsg() {
         let mut incoming = super::IncomingFds::new(16);
 
-        incoming.cmsg().put_slice(CMSG_BYTES);
+        incoming.cmsg().put_slice(cmsg_bytes());
         assert!(incoming.take_fds().is_some());
         assert!(incoming.take_fds().is_none());
     }
 
     #[test]
     fn multiple_cmsg_1() {
         let mut incoming = super::IncomingFds::new(16);
 
-        incoming.cmsg().put_slice(CMSG_BYTES);
+        incoming.cmsg().put_slice(cmsg_bytes());
         assert!(incoming.take_fds().is_some());
-        incoming.cmsg().put_slice(CMSG_BYTES);
+        incoming.cmsg().put_slice(cmsg_bytes());
         assert!(incoming.take_fds().is_some());
         assert!(incoming.take_fds().is_none());
     }
 
     #[test]
     fn multiple_cmsg_2() {
         let mut incoming = super::IncomingFds::new(16);
+        println!("cmsg_bytes() {}", cmsg_bytes().len());
 
-        incoming.cmsg().put_slice(CMSG_BYTES);
-        incoming.cmsg().put_slice(CMSG_BYTES);
+        incoming.cmsg().put_slice(cmsg_bytes());
+        incoming.cmsg().put_slice(cmsg_bytes());
         assert!(incoming.take_fds().is_some());
-        incoming.cmsg().put_slice(CMSG_BYTES);
+        incoming.cmsg().put_slice(cmsg_bytes());
         assert!(incoming.take_fds().is_some());
         assert!(incoming.take_fds().is_some());
         assert!(incoming.take_fds().is_none());
     }
 }
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/handle_passing.rs
@@ -0,0 +1,256 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license.  See the
+// accompanying file LICENSE for details
+
+use tokio_io::{AsyncRead, AsyncWrite};
+use bytes::{Bytes, BytesMut, IntoBuf};
+use codec::Codec;
+use futures::{AsyncSink, Poll, Sink, StartSend, Stream};
+use messages::AssocRawPlatformHandle;
+use std::collections::VecDeque;
+use std::{fmt, io};
+
+const INITIAL_CAPACITY: usize = 1024;
+const BACKPRESSURE_THRESHOLD: usize = 4 * INITIAL_CAPACITY;
+
+#[derive(Debug)]
+struct Frame {
+    msgs: Bytes,
+}
+
+/// A unified `Stream` and `Sink` interface over an I/O object, using
+/// the `Codec` trait to encode and decode the payload.
+pub struct FramedWithPlatformHandles<A, C> {
+    io: A,
+    codec: C,
+    // Stream
+    read_buf: BytesMut,
+    is_readable: bool,
+    eof: bool,
+    // Sink
+    frames: VecDeque<Frame>,
+    write_buf: BytesMut,
+}
+
+impl<A, C> FramedWithPlatformHandles<A, C>
+where
+    A: AsyncWrite,
+{
+    // If there is a buffered frame, try to write it to `A`
+    fn do_write(&mut self) -> Poll<(), io::Error> {
+        trace!("do_write...");
+        // Create a frame from any pending message in `write_buf`.
+        if !self.write_buf.is_empty() {
+            self.set_frame();
+        }
+
+        trace!("pending frames: {:?}", self.frames);
+
+        let mut processed = 0;
+
+        loop {
+            let n = match self.frames.front() {
+                Some(frame) => {
+                    trace!("sending msg {:?}", frame.msgs);
+                    let mut msgs = frame.msgs.clone().into_buf();
+                    try_ready!(self.io.write_buf(&mut msgs))
+                }
+                _ => {
+                    // No pending frames.
+                    return Ok(().into());
+                }
+            };
+
+            match self.frames.pop_front() {
+                Some(mut frame) => {
+                    processed += 1;
+
+                    if n != frame.msgs.len() {
+                        // If only part of the message was sent then
+                        // re-queue the remaining message at the head
+                        // of the queue. (Don't need to resend the
+                        // handles since they've been sent with the
+                        // first part.)
+                        drop(frame.msgs.split_to(n));
+                        self.frames.push_front(frame);
+                        break;
+                    }
+                }
+                _ => panic!(),
+            }
+        }
+        trace!("process {} frames", processed);
+        trace!("pending frames: {:?}", self.frames);
+
+        Ok(().into())
+    }
+
+    fn set_frame(&mut self) {
+        if self.write_buf.is_empty() {
+            trace!("set_frame: No pending messages...");
+            return;
+        }
+
+        let msgs = self.write_buf.take().freeze();
+        trace!("set_frame: msgs={:?}", msgs);
+
+        self.frames.push_back(Frame { msgs });
+    }
+}
+
+impl<A, C> Stream for FramedWithPlatformHandles<A, C>
+where
+    A: AsyncRead,
+    C: Codec,
+    C::Out: AssocRawPlatformHandle,
+{
+    type Item = C::Out;
+    type Error = io::Error;
+
+    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+        loop {
+            // Repeatedly call `decode` or `decode_eof` as long as it is
+            // "readable". Readable is defined as not having returned `None`. If
+            // the upstream has returned EOF, and the decoder is no longer
+            // readable, it can be assumed that the decoder will never become
+            // readable again, at which point the stream is terminated.
+            if self.is_readable {
+                if self.eof {
+                    let item = try!(self.codec.decode_eof(&mut self.read_buf));
+                    return Ok(Some(item).into());
+                }
+
+                trace!("attempting to decode a frame");
+
+                if let Some(item) = try!(self.codec.decode(&mut self.read_buf)) {
+                    trace!("frame decoded from buffer");
+                    return Ok(Some(item).into());
+                }
+
+                self.is_readable = false;
+            }
+
+            assert!(!self.eof);
+
+            // Otherwise, try to read more data and try again. Make sure we've
+            // got room for at least one byte to read to ensure that we don't
+            // get a spurious 0 that looks like EOF
+            let n = try_ready!(
+                self.io
+                    .read_buf(&mut self.read_buf)
+            );
+
+            if n == 0 {
+                self.eof = true;
+            }
+
+            self.is_readable = true;
+        }
+    }
+}
+
+impl<A, C> Sink for FramedWithPlatformHandles<A, C>
+where
+    A: AsyncWrite,
+    C: Codec,
+    C::In: AssocRawPlatformHandle + fmt::Debug,
+{
+    type SinkItem = C::In;
+    type SinkError = io::Error;
+
+    fn start_send(&mut self, mut item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
+        trace!("start_send: item={:?}", item);
+
+        // If the buffer is already over BACKPRESSURE_THRESHOLD,
+        // then attempt to flush it. If after flush it's *still*
+        // over BACKPRESSURE_THRESHOLD, then reject the send.
+        if self.write_buf.len() > BACKPRESSURE_THRESHOLD {
+            try!(self.poll_complete());
+            if self.write_buf.len() > BACKPRESSURE_THRESHOLD {
+                return Ok(AsyncSink::NotReady(item));
+            }
+        }
+
+        let mut got_handles = false;
+        if let Some((handles, target_pid)) = item.platform_handles() {
+            got_handles = true;
+            let remote_handles = unsafe {
+                [duplicate_platformhandle(handles[0], target_pid)?,
+                 duplicate_platformhandle(handles[1], target_pid)?,
+                 duplicate_platformhandle(handles[2], target_pid)?]
+            };
+            trace!("item handles: {:?} remote_handles: {:?}", handles, remote_handles);
+            item.take_platform_handles(|| Some(remote_handles));
+        }
+
+        try!(self.codec.encode(item, &mut self.write_buf));
+
+        if got_handles {
+            // Enforce splitting sends on messages that contain file
+            // descriptors.
+            self.set_frame();
+        }
+
+        Ok(AsyncSink::Ready)
+    }
+
+    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
+        trace!("flushing framed transport");
+
+        try_ready!(self.do_write());
+
+        try_nb!(self.io.flush());
+
+        trace!("framed transport flushed");
+        Ok(().into())
+    }
+
+    fn close(&mut self) -> Poll<(), Self::SinkError> {
+        try_ready!(self.poll_complete());
+        self.io.shutdown()
+    }
+}
+
+pub fn framed_with_platformhandles<A, C>(io: A, codec: C) -> FramedWithPlatformHandles<A, C> {
+    FramedWithPlatformHandles {
+        io: io,
+        codec: codec,
+        read_buf: BytesMut::with_capacity(INITIAL_CAPACITY),
+        is_readable: false,
+        eof: false,
+        frames: VecDeque::new(),
+        write_buf: BytesMut::with_capacity(INITIAL_CAPACITY),
+    }
+}
+
+use winapi::um::{processthreadsapi, winnt, handleapi};
+use winapi::shared::minwindef::{DWORD, FALSE};
+use super::PlatformHandleType;
+
+// source_handle is effectively taken ownership of (consumed) and
+// closed when duplicate_platformhandle is called.
+// TODO: Make this transfer more explicit via the type system.
+unsafe fn duplicate_platformhandle(source_handle: PlatformHandleType,
+                                   target_pid: DWORD) -> Result<PlatformHandleType, std::io::Error> {
+    let source = processthreadsapi::GetCurrentProcess();
+    let target = processthreadsapi::OpenProcess(winnt::PROCESS_DUP_HANDLE,
+                                                FALSE,
+                                                target_pid);
+    if !super::valid_handle(target) {
+        return Err(std::io::Error::new(std::io::ErrorKind::Other, "invalid target process"));
+    }
+
+    let mut target_handle = std::ptr::null_mut();
+    let ok = handleapi::DuplicateHandle(source,
+                                        source_handle,
+                                        target,
+                                        &mut target_handle,
+                                        0,
+                                        FALSE,
+                                        winnt::DUPLICATE_CLOSE_SOURCE | winnt::DUPLICATE_SAME_ACCESS);
+    if ok == FALSE {
+        return Err(std::io::Error::new(std::io::ErrorKind::Other, "DuplicateHandle failed"));
+    }
+    Ok(target_handle)
+}
--- a/media/audioipc/audioipc/src/lib.rs
+++ b/media/audioipc/audioipc/src/lib.rs
@@ -23,39 +23,175 @@ extern crate libc;
 extern crate memmap;
 #[macro_use]
 extern crate scoped_tls;
 extern crate serde;
 extern crate tokio_core;
 #[macro_use]
 extern crate tokio_io;
 extern crate tokio_uds;
+#[cfg(windows)]
+extern crate winapi;
 
 mod async;
+#[cfg(unix)]
 mod cmsg;
 pub mod codec;
 pub mod core;
 #[allow(deprecated)]
 pub mod errors;
+#[cfg(unix)]
 pub mod fd_passing;
+#[cfg(unix)]
+pub use fd_passing as platformhandle_passing;
+#[cfg(windows)]
+pub mod handle_passing;
+#[cfg(windows)]
+pub use handle_passing as platformhandle_passing;
 pub mod frame;
 pub mod messages;
+#[cfg(unix)]
 mod msg;
 pub mod rpc;
 pub mod shm;
 
 pub use messages::{ClientMessage, ServerMessage};
 use std::env::temp_dir;
 use std::path::PathBuf;
 
+#[cfg(windows)]
+use std::os::windows::io::{FromRawHandle, IntoRawHandle};
+#[cfg(unix)]
+use std::os::unix::io::{FromRawFd, IntoRawFd};
+
 // This must match the definition of
 // ipc::FileDescriptor::PlatformHandleType in Gecko.
-#[cfg(target_os = "windows")]
-pub type PlatformHandleType = *mut std::os::raw::c_void;
-#[cfg(not(target_os = "windows"))]
+#[cfg(windows)]
+pub type PlatformHandleType = std::os::windows::raw::HANDLE;
+#[cfg(unix)]
 pub type PlatformHandleType = libc::c_int;
 
+// This stands in for RawFd/RawHandle.
+#[derive(Copy, Clone, Debug)]
+pub struct PlatformHandle(PlatformHandleType);
+
+unsafe impl Send for PlatformHandle {}
+
+// Custom serialization to treat HANDLEs as i64.  This is not valid in
+// general, but after sending the HANDLE value to a remote process we
+// use it to create a valid HANDLE via DuplicateHandle.
+// To avoid duplicating the serialization code, we're lazy and treat
+// file descriptors as i64 rather than i32.
+impl serde::Serialize for PlatformHandle {
+    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+    where
+        S: serde::Serializer,
+    {
+        serializer.serialize_i64(self.0 as i64)
+    }
+}
+
+struct PlatformHandleVisitor;
+impl<'de> serde::de::Visitor<'de> for PlatformHandleVisitor {
+    type Value = PlatformHandle;
+
+    fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
+        formatter.write_str("an integer between -2^63 and 2^63")
+    }
+
+    fn visit_i64<E>(self, value: i64) -> Result<Self::Value, E>
+    where
+        E: serde::de::Error,
+    {
+        Ok(PlatformHandle::new(value as PlatformHandleType))
+    }
+}
+
+impl<'de> serde::Deserialize<'de> for PlatformHandle {
+    fn deserialize<D>(deserializer: D) -> Result<PlatformHandle, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        deserializer.deserialize_i64(PlatformHandleVisitor)
+    }
+}
+
+#[cfg(unix)]
+fn valid_handle(handle: PlatformHandleType) -> bool {
+    handle >= 0
+}
+
+#[cfg(windows)]
+fn valid_handle(handle: PlatformHandleType) -> bool {
+    const INVALID_HANDLE_VALUE: PlatformHandleType = -1isize as PlatformHandleType;
+    const NULL_HANDLE_VALUE: PlatformHandleType = 0isize as PlatformHandleType;
+    handle != INVALID_HANDLE_VALUE && handle != NULL_HANDLE_VALUE
+}
+
+impl PlatformHandle {
+    pub fn new(raw: PlatformHandleType) -> PlatformHandle {
+        PlatformHandle(raw)
+    }
+
+    pub fn try_new(raw: PlatformHandleType) -> Option<PlatformHandle> {
+        if !valid_handle(raw) {
+            return None;
+        }
+        Some(PlatformHandle::new(raw))
+    }
+
+    #[cfg(windows)]
+    pub fn from<T: IntoRawHandle>(from: T) -> PlatformHandle {
+        PlatformHandle::new(from.into_raw_handle())
+    }
+
+    #[cfg(unix)]
+    pub fn from<T: IntoRawFd>(from: T) -> PlatformHandle {
+        PlatformHandle::new(from.into_raw_fd())
+    }
+
+    #[cfg(windows)]
+    pub unsafe fn into_file(self) -> std::fs::File {
+        std::fs::File::from_raw_handle(self.0)
+    }
+
+    #[cfg(unix)]
+    pub unsafe fn into_file(self) -> std::fs::File {
+        std::fs::File::from_raw_fd(self.0)
+    }
+
+    pub fn as_raw(&self) -> PlatformHandleType {
+        self.0
+    }
+
+    pub unsafe fn close(self) {
+        close_platformhandle(self.0);
+    }
+}
+
+#[cfg(unix)]
+unsafe fn close_platformhandle(handle: PlatformHandleType) {
+    libc::close(handle);
+}
+
+#[cfg(windows)]
+unsafe fn close_platformhandle(handle: PlatformHandleType) {
+    winapi::um::handleapi::CloseHandle(handle);
+}
+
 pub fn get_shm_path(dir: &str) -> PathBuf {
-    let pid = unsafe { libc::getpid() };
+    let pid = std::process::id();
     let mut temp = temp_dir();
     temp.push(&format!("cubeb-shm-{}-{}", pid, dir));
     temp
 }
+
+#[cfg(unix)]
+pub mod messagestream_unix;
+#[cfg(unix)]
+pub use messagestream_unix::*;
+
+#[cfg(windows)]
+pub mod messagestream_win;
+#[cfg(windows)]
+pub use messagestream_win::*;
+#[cfg(windows)]
+mod tokio_named_pipes;
--- a/media/audioipc/audioipc/src/messages.rs
+++ b/media/audioipc/audioipc/src/messages.rs
@@ -1,17 +1,18 @@
 // Copyright © 2017 Mozilla Foundation
 //
 // This program is made available under an ISC-style license.  See the
 // accompanying file LICENSE for details
 
+use PlatformHandle;
+use PlatformHandleType;
 use cubeb::{self, ffi};
 use std::ffi::{CStr, CString};
 use std::os::raw::{c_char, c_int, c_uint};
-use std::os::unix::io::RawFd;
 use std::ptr;
 
 #[derive(Debug, Serialize, Deserialize)]
 pub struct Device {
     pub output_name: Option<Vec<u8>>,
     pub input_name: Option<Vec<u8>>,
 }
 
@@ -164,25 +165,26 @@ fn opt_str(v: Option<Vec<u8>>) -> *mut c
         },
         None => ptr::null_mut(),
     }
 }
 
 #[derive(Debug, Serialize, Deserialize)]
 pub struct StreamCreate {
     pub token: usize,
-    pub fds: [RawFd; 3],
+    pub platform_handles: [PlatformHandle; 3],
+    pub target_pid: u32,
 }
 
 // Client -> Server messages.
 // TODO: Callbacks should be different messages types so
 // ServerConn::process_msg doesn't have a catch-all case.
 #[derive(Debug, Serialize, Deserialize)]
 pub enum ServerMessage {
-    ClientConnect,
+    ClientConnect(u32),
     ClientDisconnect,
 
     ContextGetBackendId,
     ContextGetMaxChannelCount,
     ContextGetMinLatency(StreamParams),
     ContextGetPreferredSampleRate,
     ContextGetDeviceEnumeration(ffi::cubeb_device_type),
 
@@ -236,42 +238,50 @@ pub enum CallbackReq {
 }
 
 #[derive(Debug, Deserialize, Serialize)]
 pub enum CallbackResp {
     Data(isize),
     State,
 }
 
-pub trait AssocRawFd {
-    fn fd(&self) -> Option<[RawFd; 3]> {
+pub trait AssocRawPlatformHandle {
+    fn platform_handles(&self) -> Option<([PlatformHandleType; 3], u32)> {
         None
     }
-    fn take_fd<F>(&mut self, _: F)
+
+    fn take_platform_handles<F>(&mut self, f: F)
     where
-        F: FnOnce() -> Option<[RawFd; 3]>,
-    {
+        F: FnOnce() -> Option<[PlatformHandleType; 3]> {
+        assert!(f().is_none());
     }
 }
 
-impl AssocRawFd for ServerMessage {}
-impl AssocRawFd for ClientMessage {
-    fn fd(&self) -> Option<[RawFd; 3]> {
+impl AssocRawPlatformHandle for ServerMessage {}
+
+impl AssocRawPlatformHandle for ClientMessage {
+    fn platform_handles(&self) -> Option<([PlatformHandleType; 3], u32)> {
         match *self {
-            ClientMessage::StreamCreated(ref data) => Some(data.fds),
+            ClientMessage::StreamCreated(ref data) => Some(([data.platform_handles[0].as_raw(),
+                                                             data.platform_handles[1].as_raw(),
+                                                             data.platform_handles[2].as_raw()],
+                                                            data.target_pid)),
             _ => None,
         }
     }
 
-    fn take_fd<F>(&mut self, f: F)
+    fn take_platform_handles<F>(&mut self, f: F)
     where
-        F: FnOnce() -> Option<[RawFd; 3]>,
+        F: FnOnce() -> Option<[PlatformHandleType; 3]>,
     {
         if let ClientMessage::StreamCreated(ref mut data) = *self {
-            data.fds = f().unwrap();
+            let handles = f().expect("platform_handles must be available when processing StreamCreated");
+            data.platform_handles = [PlatformHandle::new(handles[0]),
+                                     PlatformHandle::new(handles[1]),
+                                     PlatformHandle::new(handles[2])]
         }
     }
 }
 
 #[cfg(test)]
 mod test {
     use super::StreamParams;
     use cubeb::ffi;
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/messagestream_unix.rs
@@ -0,0 +1,96 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license.  See the
+// accompanying file LICENSE for details
+
+use std::os::unix::io::{IntoRawFd, FromRawFd, AsRawFd, RawFd};
+use std::os::unix::net;
+use tokio_io::{AsyncRead, AsyncWrite};
+
+#[derive(Debug)]
+pub struct MessageStream(net::UnixStream);
+pub struct AsyncMessageStream(tokio_uds::UnixStream);
+
+impl MessageStream {
+    fn new(stream: net::UnixStream) -> MessageStream {
+        MessageStream(stream)
+    }
+
+    pub fn anonymous_ipc_pair() -> std::result::Result<(MessageStream, MessageStream), std::io::Error> {
+        let pair = net::UnixStream::pair()?;
+        Ok((MessageStream::new(pair.0), MessageStream::new(pair.1)))
+    }
+
+    pub unsafe fn from_raw_fd(raw: super::PlatformHandleType) -> MessageStream {
+        MessageStream::new(net::UnixStream::from_raw_fd(raw))
+    }
+
+    pub fn into_tokio_ipc(self, handle: &tokio_core::reactor::Handle) -> std::result::Result<AsyncMessageStream, std::io::Error> {
+        Ok(AsyncMessageStream::new(tokio_uds::UnixStream::from_stream(self.0, handle)?))
+    }
+}
+
+impl AsyncMessageStream {
+    fn new(stream: tokio_uds::UnixStream) -> AsyncMessageStream {
+        AsyncMessageStream(stream)
+    }
+
+    pub fn poll_read(&self) -> futures::Async<()> {
+        self.0.poll_read()
+    }
+
+    pub fn poll_write(&self) -> futures::Async<()> {
+        self.0.poll_write()
+    }
+
+    pub fn need_read(&self) {
+        self.0.need_read()
+    }
+
+    pub fn need_write(&self) {
+        self.0.need_write()
+    }
+}
+
+impl std::io::Read for AsyncMessageStream {
+    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
+        self.0.read(buf)
+    }
+}
+
+impl std::io::Write for AsyncMessageStream {
+    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+        self.0.write(buf)
+    }
+    fn flush(&mut self) -> std::io::Result<()> {
+        self.0.flush()
+    }
+}
+
+impl AsyncRead for AsyncMessageStream {
+    fn read_buf<B: bytes::BufMut>(&mut self, buf: &mut B) -> futures::Poll<usize, std::io::Error> {
+        <&tokio_uds::UnixStream>::read_buf(&mut &self.0, buf)
+    }
+}
+
+impl AsyncWrite for AsyncMessageStream {
+    fn shutdown(&mut self) -> futures::Poll<(), std::io::Error> {
+        <&tokio_uds::UnixStream>::shutdown(&mut &self.0)
+    }
+
+    fn write_buf<B: bytes::Buf>(&mut self, buf: &mut B) -> futures::Poll<usize, std::io::Error> {
+        <&tokio_uds::UnixStream>::write_buf(&mut &self.0, buf)
+    }
+}
+
+impl AsRawFd for AsyncMessageStream {
+    fn as_raw_fd(&self) -> RawFd {
+        self.0.as_raw_fd()
+    }
+}
+
+impl IntoRawFd for MessageStream {
+    fn into_raw_fd(self) -> RawFd {
+        self.0.into_raw_fd()
+    }
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/messagestream_win.rs
@@ -0,0 +1,109 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license.  See the
+// accompanying file LICENSE for details
+
+extern crate mio_named_pipes;
+use std::os::windows::io::{IntoRawHandle, FromRawHandle, AsRawHandle, RawHandle};
+use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
+use tokio_io::{AsyncRead, AsyncWrite};
+use tokio_named_pipes;
+
+#[derive(Debug)]
+pub struct MessageStream(mio_named_pipes::NamedPipe);
+pub struct AsyncMessageStream(tokio_named_pipes::NamedPipe);
+
+impl MessageStream {
+    fn new(stream: mio_named_pipes::NamedPipe) -> MessageStream {
+        MessageStream(stream)
+    }
+
+
+    pub fn anonymous_ipc_pair() -> std::result::Result<(MessageStream, MessageStream), std::io::Error> {
+        let pipe1 = mio_named_pipes::NamedPipe::new(get_pipe_name())?;
+        let pipe2 = unsafe { mio_named_pipes::NamedPipe::from_raw_handle(pipe1.as_raw_handle()) };
+        Ok((MessageStream::new(pipe1), MessageStream::new(pipe2)))
+    }
+
+    pub unsafe fn from_raw_fd(raw: super::PlatformHandleType) -> MessageStream {
+        MessageStream::new(mio_named_pipes::NamedPipe::from_raw_handle(raw))
+    }
+
+    pub fn into_tokio_ipc(self, handle: &tokio_core::reactor::Handle) -> std::result::Result<AsyncMessageStream, std::io::Error> {
+        Ok(AsyncMessageStream::new(tokio_named_pipes::NamedPipe::from_pipe(self.0, handle)?))
+    }
+}
+
+impl AsyncMessageStream {
+    fn new(stream: tokio_named_pipes::NamedPipe) -> AsyncMessageStream {
+        AsyncMessageStream(stream)
+    }
+
+    pub fn poll_read(&self) -> futures::Async<()> {
+        self.0.poll_read()
+    }
+
+    pub fn poll_write(&self) -> futures::Async<()> {
+        self.0.poll_write()
+    }
+
+    pub fn need_read(&self) {
+        self.0.need_read()
+    }
+
+    pub fn need_write(&self) {
+        self.0.need_write()
+    }
+}
+
+impl std::io::Read for AsyncMessageStream {
+    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
+        self.0.read(buf)
+    }
+}
+
+impl std::io::Write for AsyncMessageStream {
+    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+        self.0.write(buf)
+    }
+    fn flush(&mut self) -> std::io::Result<()> {
+        self.0.flush()
+    }
+}
+
+impl AsyncRead for AsyncMessageStream {
+    fn read_buf<B: bytes::BufMut>(&mut self, buf: &mut B) -> futures::Poll<usize, std::io::Error> {
+        <tokio_named_pipes::NamedPipe>::read_buf(&mut self.0, buf)
+    }
+}
+
+impl AsyncWrite for AsyncMessageStream {
+    fn shutdown(&mut self) -> futures::Poll<(), std::io::Error> {
+        <tokio_named_pipes::NamedPipe>::shutdown(&mut self.0)
+    }
+
+    fn write_buf<B: bytes::Buf>(&mut self, buf: &mut B) -> futures::Poll<usize, std::io::Error> {
+        <tokio_named_pipes::NamedPipe>::write_buf(&mut self.0, buf)
+    }
+}
+
+impl AsRawHandle for AsyncMessageStream {
+    fn as_raw_handle(&self) -> RawHandle {
+        self.0.as_raw_handle()
+    }
+}
+
+impl IntoRawHandle for MessageStream {
+    fn into_raw_handle(self) -> RawHandle {
+        // XXX: Ideally this would call into_raw_handle.
+        self.0.as_raw_handle()
+    }
+}
+
+static PIPE_ID: AtomicUsize = ATOMIC_USIZE_INIT;
+
+fn get_pipe_name() -> String {
+    let pid = std::process::id();
+    let pipe_id = PIPE_ID.fetch_add(1, Ordering::SeqCst);
+    format!("\\\\.\\pipe\\cubeb-pipe-{}-{}", pid, pipe_id)
+}
new file mode 100644
--- /dev/null
+++ b/media/audioipc/audioipc/src/tokio_named_pipes.rs
@@ -0,0 +1,175 @@
+// From https://github.com/alexcrichton/tokio-named-pipes/commit/3a22f8fc9a441b548aec25bd5df3b1e0ab99fabe
+// License MIT/Apache-2.0
+// Sloppily updated to be compatible with tokio_io
+// To be replaced with tokio_named_pipes crate after tokio 0.1 update.
+#![cfg(windows)]
+
+extern crate bytes;
+extern crate tokio_core;
+extern crate mio_named_pipes;
+extern crate futures;
+
+use std::ffi::OsStr;
+use std::fmt;
+use std::io::{self, Read, Write};
+use std::os::windows::io::*;
+
+use futures::{Async, Poll};
+use bytes::{BufMut, Buf};
+#[allow(deprecated)]
+use tokio_core::io::Io;
+use tokio_io::{AsyncRead, AsyncWrite};
+use tokio_core::reactor::{PollEvented, Handle};
+
+pub struct NamedPipe {
+    io: PollEvented<mio_named_pipes::NamedPipe>,
+}
+
+impl NamedPipe {
+    pub fn new<P: AsRef<OsStr>>(p: P, handle: &Handle) -> io::Result<NamedPipe> {
+        NamedPipe::_new(p.as_ref(), handle)
+    }
+
+    fn _new(p: &OsStr, handle: &Handle) -> io::Result<NamedPipe> {
+        let inner = try!(mio_named_pipes::NamedPipe::new(p));
+        NamedPipe::from_pipe(inner, handle)
+    }
+
+    pub fn from_pipe(pipe: mio_named_pipes::NamedPipe,
+                     handle: &Handle)
+                     -> io::Result<NamedPipe> {
+        Ok(NamedPipe {
+            io: try!(PollEvented::new(pipe, handle)),
+        })
+    }
+
+    pub fn connect(&self) -> io::Result<()> {
+        self.io.get_ref().connect()
+    }
+
+    pub fn disconnect(&self) -> io::Result<()> {
+        self.io.get_ref().disconnect()
+    }
+
+    pub fn need_read(&self) {
+        self.io.need_read()
+    }
+
+    pub fn need_write(&self) {
+        self.io.need_write()
+    }
+
+    pub fn poll_read(&self) -> Async<()> {
+        self.io.poll_read()
+    }
+
+    pub fn poll_write(&self) -> Async<()> {
+        self.io.poll_write()
+    }
+}
+
+impl Read for NamedPipe {
+    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+        self.io.read(buf)
+    }
+}
+
+impl Write for NamedPipe {
+    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+        self.io.write(buf)
+    }
+    fn flush(&mut self) -> io::Result<()> {
+        self.io.flush()
+    }
+}
+
+#[allow(deprecated)]
+impl Io for NamedPipe {
+    fn poll_read(&mut self) -> Async<()> {
+        <NamedPipe>::poll_read(self)
+    }
+
+    fn poll_write(&mut self) -> Async<()> {
+        <NamedPipe>::poll_write(self)
+    }
+}
+
+impl AsyncRead for NamedPipe {
+    unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
+        false
+    }
+
+    fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
+        if NamedPipe::poll_read(self).is_not_ready() {
+            return Ok(Async::NotReady)
+        }
+
+        let mut stack_buf = [0u8; 1024];
+        let bytes_read = self.io.read(&mut stack_buf);
+        match bytes_read {
+            Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
+                self.io.need_read();
+                return Ok(Async::NotReady);
+            },
+            Err(e) => Err(e),
+            Ok(bytes_read) => {
+                buf.put_slice(&stack_buf[0..bytes_read]);
+                Ok(Async::Ready(bytes_read))
+            }
+        }
+    }
+}
+
+impl AsyncWrite for NamedPipe {
+    fn shutdown(&mut self) -> Poll<(), io::Error> {
+        Ok(().into())
+    }
+
+    fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
+        if NamedPipe::poll_write(self).is_not_ready() {
+            return Ok(Async::NotReady)
+        }
+
+        let bytes_wrt = self.io.write(buf.bytes());
+        match bytes_wrt {
+            Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
+                self.io.need_write();
+                return Ok(Async::NotReady);
+            },
+            Err(e) => Err(e),
+            Ok(bytes_wrt) => {
+                buf.advance(bytes_wrt);
+                Ok(Async::Ready(bytes_wrt))
+            }
+        }
+    }
+
+}
+
+impl<'a> Read for &'a NamedPipe {
+    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+        (&self.io).read(buf)
+    }
+}
+
+impl<'a> Write for &'a NamedPipe {
+    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+        (&self.io).write(buf)
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        (&self.io).flush()
+    }
+}
+
+impl fmt::Debug for NamedPipe {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        self.io.get_ref().fmt(f)
+    }
+}
+
+impl AsRawHandle for NamedPipe {
+    fn as_raw_handle(&self) -> RawHandle {
+        self.io.get_ref().as_raw_handle()
+    }
+}
--- a/media/audioipc/client/Cargo.toml
+++ b/media/audioipc/client/Cargo.toml
@@ -4,16 +4,16 @@ version = "0.4.0"
 authors = [
         "Matthew Gregan <kinetik@flim.org>",
         "Dan Glastonbury <dan.glastonbury@gmail.com>"
         ]
 description = "Cubeb Backend for talking to remote cubeb server."
 
 [dependencies]
 audioipc = { path="../audioipc" }
-cubeb-backend = "0.5"
+cubeb-backend = "0.5.3"
 foreign-types = "0.3"
 futures = { version="0.1.18", default-features=false, features=["use_std"] }
 futures-cpupool = { version="0.1.8", default-features=false }
 libc = "0.2"
 log = "0.4"
 tokio-core = "0.1"
 tokio-uds = "0.1.7"
--- a/media/audioipc/client/src/context.rs
+++ b/media/audioipc/client/src/context.rs
@@ -1,43 +1,39 @@
 // Copyright © 2017 Mozilla Foundation
 //
 // This program is made available under an ISC-style license.  See the
 // accompanying file LICENSE for details
 
 use assert_not_in_callback;
 use audioipc::codec::LengthDelimitedCodec;
-use audioipc::fd_passing::{framed_with_fds, FramedWithFds};
+use audioipc::platformhandle_passing::{framed_with_platformhandles, FramedWithPlatformHandles};
 use audioipc::{core, rpc};
 use audioipc::{messages, ClientMessage, ServerMessage};
 use cubeb_backend::{
     ffi, Context, ContextOps, DeviceCollectionRef, DeviceId, DeviceType, Error, Ops, Result,
     Stream, StreamParams, StreamParamsRef,
 };
 use futures::Future;
 use futures_cpupool::{self, CpuPool};
-use libc;
 use std::ffi::{CStr, CString};
 use std::os::raw::c_void;
-use std::os::unix::io::FromRawFd;
-use std::os::unix::net;
 use std::sync::mpsc;
 use std::thread;
 use std::{fmt, io, mem, ptr};
 use stream;
 use tokio_core::reactor::{Handle, Remote};
-use tokio_uds::UnixStream;
 use {ClientStream, CPUPOOL_INIT_PARAMS, G_SERVER_FD};
 
 struct CubebClient;
 
 impl rpc::Client for CubebClient {
     type Request = ServerMessage;
     type Response = ClientMessage;
-    type Transport = FramedWithFds<UnixStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
+    type Transport = FramedWithPlatformHandles<audioipc::AsyncMessageStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
 }
 
 macro_rules! t(
     ($e:expr) => (
         match $e {
             Ok(e) => e,
             Err(_) => return Err(Error::default())
         }
@@ -68,34 +64,34 @@ impl ClientContext {
 
     #[doc(hidden)]
     pub fn cpu_pool(&self) -> CpuPool {
         self.cpu_pool.clone()
     }
 }
 
 // TODO: encapsulate connect, etc inside audioipc.
-fn open_server_stream() -> Result<net::UnixStream> {
+fn open_server_stream() -> Result<audioipc::MessageStream> {
     unsafe {
         if let Some(fd) = G_SERVER_FD {
-            return Ok(net::UnixStream::from_raw_fd(fd));
+            return Ok(audioipc::MessageStream::from_raw_fd(fd.as_raw()));
         }
 
         Err(Error::default())
     }
 }
 
 impl ContextOps for ClientContext {
     fn init(_context_name: Option<&CStr>) -> Result<Context> {
         fn bind_and_send_client(
-            stream: UnixStream,
+            stream: audioipc::AsyncMessageStream,
             handle: &Handle,
             tx_rpc: &mpsc::Sender<rpc::ClientProxy<ServerMessage, ClientMessage>>,
         ) -> Option<()> {
-            let transport = framed_with_fds(stream, Default::default());
+            let transport = framed_with_platformhandles(stream, Default::default());
             let rpc = rpc::bind_client::<CubebClient>(transport, handle);
             // If send fails then the rx end has closed
             // which is unlikely here.
             let _ = tx_rpc.send(rpc);
             Some(())
         }
 
         assert_not_in_callback();
@@ -116,17 +112,17 @@ impl ContextOps for ClientContext {
 
         let core = t!(core::spawn_thread("AudioIPC Client RPC", move || {
             let handle = core::handle();
 
             register_thread();
 
             open_server_stream()
                 .ok()
-                .and_then(|stream| UnixStream::from_stream(stream, &handle).ok())
+                .and_then(|stream| stream.into_tokio_ipc(&handle).ok())
                 .and_then(|stream| bind_and_send_client(stream, &handle, &tx_rpc))
                 .ok_or_else(|| {
                     io::Error::new(
                         io::ErrorKind::Other,
                         "Failed to open stream and create rpc.",
                     )
                 })
         }));
@@ -135,16 +131,18 @@ impl ContextOps for ClientContext {
 
         let cpupool = futures_cpupool::Builder::new()
             .name_prefix("AudioIPC")
             .after_start(register_thread)
             .pool_size(params.pool_size)
             .stack_size(params.stack_size)
             .create();
 
+        send_recv!(rpc, ClientConnect(std::process::id()) => ClientConnected)?;
+
         let ctx = Box::new(ClientContext {
             _ops: &CLIENT_OPS as *const _,
             rpc: rpc,
             core: core,
             cpu_pool: cpupool,
         });
         Ok(unsafe { Context::from_ptr(Box::into_raw(ctx) as *mut _) })
     }
@@ -275,17 +273,17 @@ impl ContextOps for ClientContext {
 }
 
 impl Drop for ClientContext {
     fn drop(&mut self) {
         debug!("ClientContext dropped...");
         let _ = send_recv!(self.rpc(), ClientDisconnect => ClientDisconnected);
         unsafe {
             if G_SERVER_FD.is_some() {
-                libc::close(super::G_SERVER_FD.take().unwrap());
+                G_SERVER_FD.take().unwrap().close();
             }
         }
     }
 }
 
 impl fmt::Debug for ClientContext {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
         f.debug_struct("ClientContext")
--- a/media/audioipc/client/src/lib.rs
+++ b/media/audioipc/client/src/lib.rs
@@ -15,21 +15,20 @@ extern crate log;
 extern crate tokio_core;
 extern crate tokio_uds;
 
 #[macro_use]
 mod send_recv;
 mod context;
 mod stream;
 
-use audioipc::PlatformHandleType;
+use audioipc::{PlatformHandleType, PlatformHandle};
 use context::ClientContext;
 use cubeb_backend::{capi, ffi};
 use std::os::raw::{c_char, c_int};
-use std::os::unix::io::RawFd;
 use stream::ClientStream;
 
 type InitParamsTls = std::cell::RefCell<Option<CpuPoolInitParams>>;
 
 thread_local!(static IN_CALLBACK: std::cell::RefCell<bool> = std::cell::RefCell::new(false));
 thread_local!(static CPUPOOL_INIT_PARAMS: InitParamsTls = std::cell::RefCell::new(None));
 
 // This must match the definition of AudioIpcInitParams in
@@ -78,36 +77,36 @@ fn set_cpupool_init_params<P>(params: P)
 where
     P: Into<Option<CpuPoolInitParams>>,
 {
     CPUPOOL_INIT_PARAMS.with(|p| {
         *p.borrow_mut() = params.into();
     });
 }
 
-static mut G_SERVER_FD: Option<RawFd> = None;
+static mut G_SERVER_FD: Option<PlatformHandle> = None;
 
 #[no_mangle]
 /// Entry point from C code.
 pub unsafe extern "C" fn audioipc_client_init(
     c: *mut *mut ffi::cubeb,
     context_name: *const c_char,
     init_params: *const AudioIpcInitParams,
 ) -> c_int {
     if init_params.is_null() {
         return cubeb_backend::ffi::CUBEB_ERROR;
     }
 
     let init_params = &*init_params;
 
-    // TODO: Windows portability (for fd).
     // TODO: Better way to pass extra parameters to Context impl.
     if G_SERVER_FD.is_some() {
-        panic!("audioipc client's server connection already initialized.");
+        return cubeb_backend::ffi::CUBEB_ERROR;
     }
-    if init_params.server_connection >= 0 {
-        G_SERVER_FD = Some(init_params.server_connection);
+    G_SERVER_FD = PlatformHandle::try_new(init_params.server_connection);
+    if G_SERVER_FD.is_none() {
+        return cubeb_backend::ffi::CUBEB_ERROR;
     }
 
     let cpupool_init_params = CpuPoolInitParams::init_with(&init_params);
     set_cpupool_init_params(cpupool_init_params);
     capi::capi_init::<ClientContext>(c, context_name)
 }
--- a/media/audioipc/client/src/stream.rs
+++ b/media/audioipc/client/src/stream.rs
@@ -7,23 +7,19 @@ use audioipc::codec::LengthDelimitedCode
 use audioipc::frame::{framed, Framed};
 use audioipc::messages::{self, CallbackReq, CallbackResp, ClientMessage, ServerMessage};
 use audioipc::rpc;
 use audioipc::shm::{SharedMemMutSlice, SharedMemSlice};
 use cubeb_backend::{ffi, DeviceRef, Error, Result, Stream, StreamOps};
 use futures::Future;
 use futures_cpupool::{CpuFuture, CpuPool};
 use std::ffi::CString;
-use std::fs::File;
 use std::os::raw::c_void;
-use std::os::unix::io::FromRawFd;
-use std::os::unix::net;
 use std::ptr;
 use std::sync::mpsc;
-use tokio_uds::UnixStream;
 use ClientContext;
 use {assert_not_in_callback, set_in_callback};
 
 // TODO: Remove and let caller allocate based on cubeb backend requirements.
 const SHM_AREA_SIZE: usize = 2 * 1024 * 1024;
 
 pub struct Device(ffi::cubeb_device);
 
@@ -60,17 +56,17 @@ struct CallbackServer {
     user_ptr: usize,
     cpu_pool: CpuPool,
 }
 
 impl rpc::Server for CallbackServer {
     type Request = CallbackReq;
     type Response = CallbackResp;
     type Future = CpuFuture<Self::Response, ()>;
-    type Transport = Framed<UnixStream, LengthDelimitedCodec<Self::Response, Self::Request>>;
+    type Transport = Framed<audioipc::AsyncMessageStream, LengthDelimitedCodec<Self::Response, Self::Request>>;
 
     fn process(&mut self, req: Self::Request) -> Self::Future {
         match req {
             CallbackReq::Data { nframes, input_frame_size, output_frame_size } => {
                 trace!(
                     "stream_thread: Data Callback: nframes={} input_fs={} output_fs={}",
                     nframes,
                     input_frame_size,
@@ -150,31 +146,31 @@ impl<'ctx> ClientStream<'ctx> {
         assert_not_in_callback();
 
         let has_input = init_params.input_stream_params.is_some();
         let has_output = init_params.output_stream_params.is_some();
 
         let rpc = ctx.rpc();
         let data = try!(send_recv!(rpc, StreamInit(init_params) => StreamCreated()));
 
-        debug!("token = {}, fds = {:?}", data.token, data.fds);
+        debug!("token = {}, handles = {:?}", data.token, data.platform_handles);
 
-        let stm = data.fds[0];
-        let stream = unsafe { net::UnixStream::from_raw_fd(stm) };
+        let stm = data.platform_handles[0];
+        let stream = unsafe { audioipc::MessageStream::from_raw_fd(stm.as_raw()) };
 
-        let input = data.fds[1];
-        let input_file = unsafe { File::from_raw_fd(input) };
+        let input = data.platform_handles[1];
+        let input_file = unsafe { input.into_file() };
         let input_shm = if has_input {
             Some(SharedMemSlice::from(&input_file, SHM_AREA_SIZE).unwrap())
         } else {
             None
         };
 
-        let output = data.fds[2];
-        let output_file = unsafe { File::from_raw_fd(output) };
+        let output = data.platform_handles[2];
+        let output_file = unsafe { output.into_file() };
         let output_shm = if has_output {
             Some(SharedMemMutSlice::from(&output_file, SHM_AREA_SIZE).unwrap())
         } else {
             None
         };
 
         let user_data = user_ptr as usize;
 
@@ -186,17 +182,17 @@ impl<'ctx> ClientStream<'ctx> {
             data_cb: data_callback,
             state_cb: state_callback,
             user_ptr: user_data,
             cpu_pool: cpu_pool,
         };
 
         let (wait_tx, wait_rx) = mpsc::channel();
         ctx.remote().spawn(move |handle| {
-            let stream = UnixStream::from_stream(stream, handle).unwrap();
+            let stream = stream.into_tokio_ipc(handle).unwrap();
             let transport = framed(stream, Default::default());
             rpc::bind_server(transport, server, handle);
             wait_tx.send(()).unwrap();
             Ok(())
         });
         wait_rx.recv().unwrap();
 
         let stream = Box::into_raw(Box::new(ClientStream {
deleted file mode 100644
--- a/media/audioipc/register-collection-not-supported.patch
+++ /dev/null
@@ -1,22 +0,0 @@
-diff --git a/media/audioipc/client/src/context.rs b/media/audioipc/client/src/context.rs
---- a/media/audioipc/client/src/context.rs
-+++ b/media/audioipc/client/src/context.rs
-@@ -265,17 +265,17 @@ impl ContextOps for ClientContext {
- 
-     fn register_device_collection_changed(
-         &mut self,
-         _dev_type: DeviceType,
-         _collection_changed_callback: ffi::cubeb_device_collection_changed_callback,
-         _user_ptr: *mut c_void,
-     ) -> Result<()> {
-         assert_not_in_callback();
--        Ok(())
-+        Err(Error::not_supported())
-     }
- }
- 
- impl Drop for ClientContext {
-     fn drop(&mut self) {
-         debug!("ClientContext dropped...");
-         let _ = send_recv!(self.rpc(), ClientDisconnect => ClientDisconnected);
-         unsafe {
--- a/media/audioipc/server/Cargo.toml
+++ b/media/audioipc/server/Cargo.toml
@@ -4,17 +4,17 @@ version = "0.2.3"
 authors = [
         "Matthew Gregan <kinetik@flim.org>",
         "Dan Glastonbury <dan.glastonbury@gmail.com>"
         ]
 description = "Remote cubeb server"
 
 [dependencies]
 audioipc = { path = "../audioipc" }
-cubeb-core = "0.5.1"
+cubeb-core = "0.5.3"
 bytes = "0.4"
 lazycell = "^0.4"
 libc = "0.2"
 log = "0.4"
 slab = "0.3.0"
 futures = "0.1.18"
 tokio-core = "0.1"
 tokio-uds = "0.1.7"
--- a/media/audioipc/server/src/lib.rs
+++ b/media/audioipc/server/src/lib.rs
@@ -15,27 +15,24 @@ extern crate cubeb_core as cubeb;
 extern crate futures;
 extern crate lazycell;
 extern crate libc;
 extern crate slab;
 extern crate tokio_core;
 extern crate tokio_uds;
 
 use audioipc::core;
-use audioipc::fd_passing::framed_with_fds;
+use audioipc::platformhandle_passing::framed_with_platformhandles;
 use audioipc::rpc;
-use audioipc::PlatformHandleType;
+use audioipc::{MessageStream, PlatformHandle, PlatformHandleType};
 use futures::sync::oneshot;
 use futures::Future;
 use std::error::Error;
 use std::os::raw::c_void;
-use std::os::unix::io::IntoRawFd;
-use std::os::unix::net;
 use std::ptr;
-use tokio_uds::UnixStream;
 
 mod server;
 
 #[allow(deprecated)]
 pub mod errors {
     error_chain! {
         links {
             AudioIPC(::audioipc::errors::Error, ::audioipc::errors::ErrorKind);
@@ -97,38 +94,38 @@ pub extern "C" fn audioipc_server_start(
 
 #[no_mangle]
 pub extern "C" fn audioipc_server_new_client(p: *mut c_void) -> PlatformHandleType {
     let (wait_tx, wait_rx) = oneshot::channel();
     let wrapper: &ServerWrapper = unsafe { &*(p as *mut _) };
 
     let cb_remote = wrapper.callback_thread.remote();
 
-    // We create a pair of connected unix domain sockets. One socket is
-    // registered with the reactor core, the other is returned to the
-    // caller.
-    net::UnixStream::pair()
+    // We create a connected pair of anonymous IPC endpoints. One side
+    // is registered with the reactor core, the other side is returned
+    // to the caller.
+    MessageStream::anonymous_ipc_pair()
         .and_then(|(sock1, sock2)| {
             // Spawn closure to run on same thread as reactor::Core
             // via remote handle.
             wrapper.core_thread.remote().spawn(|handle| {
                 trace!("Incoming connection");
-                UnixStream::from_stream(sock2, handle)
+                sock2.into_tokio_ipc(handle)
                     .and_then(|sock| {
-                        let transport = framed_with_fds(sock, Default::default());
+                        let transport = framed_with_platformhandles(sock, Default::default());
                         rpc::bind_server(transport, server::CubebServer::new(cb_remote), handle);
                         Ok(())
                     }).map_err(|_| ())
                     // Notify waiting thread that sock2 has been registered.
                     .and_then(|_| wait_tx.send(()))
             });
             // Wait for notification that sock2 has been registered
             // with reactor::Core.
             let _ = wait_rx.wait();
-            Ok(sock1.into_raw_fd())
-        }).unwrap_or(-1)
+            Ok(PlatformHandle::from(sock1).as_raw())
+        }).unwrap_or(-1isize as PlatformHandleType)
 }
 
 #[no_mangle]
 pub extern "C" fn audioipc_server_stop(p: *mut c_void) {
     let wrapper = unsafe { Box::<ServerWrapper>::from_raw(p as *mut _) };
     drop(wrapper);
 }
--- a/media/audioipc/server/src/server.rs
+++ b/media/audioipc/server/src/server.rs
@@ -1,17 +1,18 @@
 // Copyright © 2017 Mozilla Foundation
 //
 // This program is made available under an ISC-style license.  See the
 // accompanying file LICENSE for details
 
 use audioipc;
+use audioipc::{MessageStream, PlatformHandle};
 use audioipc::codec::LengthDelimitedCodec;
 use audioipc::core;
-use audioipc::fd_passing::FramedWithFds;
+use audioipc::platformhandle_passing::FramedWithPlatformHandles;
 use audioipc::frame::{framed, Framed};
 use audioipc::messages::{
     CallbackReq, CallbackResp, ClientMessage, Device, DeviceInfo, ServerMessage, StreamCreate,
     StreamInitParams, StreamParams,
 };
 use audioipc::rpc;
 use audioipc::shm::{SharedMemReader, SharedMemWriter};
 use cubeb;
@@ -20,21 +21,18 @@ use futures::future::{self, FutureResult
 use futures::sync::oneshot;
 use futures::Future;
 use slab;
 use std::cell::RefCell;
 use std::convert::From;
 use std::ffi::{CStr, CString};
 use std::mem::{size_of, ManuallyDrop};
 use std::os::raw::{c_long, c_void};
-use std::os::unix::io::IntoRawFd;
-use std::os::unix::net;
 use std::{panic, slice};
 use tokio_core::reactor::Remote;
-use tokio_uds::UnixStream;
 
 use errors::*;
 
 fn error(error: cubeb::Error) -> ClientMessage {
     ClientMessage::Error(error.raw_code())
 }
 
 type ContextKey = RefCell<Option<cubeb::Result<cubeb::Context>>>;
@@ -60,17 +58,17 @@ const SHM_AREA_SIZE: usize = 2 * 1024 * 
 // The size in which the stream slab is grown.
 const STREAM_CONN_CHUNK_SIZE: usize = 64;
 
 struct CallbackClient;
 
 impl rpc::Client for CallbackClient {
     type Request = CallbackReq;
     type Response = CallbackResp;
-    type Transport = Framed<UnixStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
+    type Transport = Framed<audioipc::AsyncMessageStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
 }
 
 struct ServerStreamCallbacks {
     /// Size of input frame in bytes
     input_frame_size: u16,
     /// Size of output frame in bytes
     output_frame_size: u16,
     /// Shared memory buffer for sending input data to client
@@ -137,45 +135,50 @@ impl Drop for ServerStream {
     }
 }
 
 type StreamSlab = slab::Slab<ServerStream, usize>;
 
 pub struct CubebServer {
     cb_remote: Remote,
     streams: StreamSlab,
+    remote_pid: Option<u32>,
 }
 
 impl rpc::Server for CubebServer {
     type Request = ServerMessage;
     type Response = ClientMessage;
     type Future = FutureResult<Self::Response, ()>;
-    type Transport = FramedWithFds<UnixStream, LengthDelimitedCodec<Self::Response, Self::Request>>;
+    type Transport = FramedWithPlatformHandles<audioipc::AsyncMessageStream, LengthDelimitedCodec<Self::Response, Self::Request>>;
 
     fn process(&mut self, req: Self::Request) -> Self::Future {
         let resp = with_local_context(|context| match *context {
             Err(_) => error(cubeb::Error::error()),
             Ok(ref context) => self.process_msg(context, &req),
         });
         future::ok(resp)
     }
 }
 
 impl CubebServer {
     pub fn new(cb_remote: Remote) -> Self {
         CubebServer {
             cb_remote: cb_remote,
             streams: StreamSlab::with_capacity(STREAM_CONN_CHUNK_SIZE),
+            remote_pid: None,
         }
     }
 
     // Process a request coming from the client.
     fn process_msg(&mut self, context: &cubeb::Context, msg: &ServerMessage) -> ClientMessage {
         let resp: ClientMessage = match *msg {
-            ServerMessage::ClientConnect => panic!("already connected"),
+            ServerMessage::ClientConnect(pid) => {
+                self.remote_pid = Some(pid);
+                ClientMessage::ClientConnected
+            }
 
             ServerMessage::ClientDisconnect => {
                 // TODO:
                 //self.connection.client_disconnect();
                 ClientMessage::ClientDisconnected
             }
 
             ServerMessage::ContextGetBackendId => ClientMessage::ContextBackendId(),
@@ -299,17 +302,17 @@ impl CubebServer {
                     sample_size * channel_count
                 }).unwrap_or(0u16)
         }
 
         // Create the callback handling struct which is attached the cubeb stream.
         let input_frame_size = frame_size_in_bytes(params.input_stream_params.as_ref());
         let output_frame_size = frame_size_in_bytes(params.output_stream_params.as_ref());
 
-        let (stm1, stm2) = net::UnixStream::pair()?;
+        let (stm1, stm2) = MessageStream::anonymous_ipc_pair()?;
         debug!("Created callback pair: {:?}-{:?}", stm1, stm2);
         let (input_shm, input_file) =
             SharedMemWriter::new(&audioipc::get_shm_path("input"), SHM_AREA_SIZE)?;
         let (output_shm, output_file) =
             SharedMemReader::new(&audioipc::get_shm_path("output"), SHM_AREA_SIZE)?;
 
         // This code is currently running on the Client/Server RPC
         // handling thread.  We need to move the registration of the
@@ -318,17 +321,17 @@ impl CubebServer {
 
         let id = core::handle().id();
 
         let (tx, rx) = oneshot::channel();
         self.cb_remote.spawn(move |handle| {
             // Ensure we're running on a loop different to the one
             // invoking spawn_fn.
             assert_ne!(id, handle.id());
-            let stream = UnixStream::from_stream(stm2, handle).unwrap();
+            let stream = stm2.into_tokio_ipc(handle).unwrap();
             let transport = framed(stream, Default::default());
             let rpc = rpc::bind_client::<CallbackClient>(transport, handle);
             drop(tx.send(rpc));
             Ok(())
         });
 
         let rpc: rpc::ClientProxy<CallbackReq, CallbackResp> = match rx.wait() {
             Ok(rpc) => rpc,
@@ -397,21 +400,22 @@ impl CubebServer {
                         None => {
                             // TODO: Turn into error
                             panic!("Failed to insert stream into slab. No entries")
                         }
                     };
 
                     Ok(ClientMessage::StreamCreated(StreamCreate {
                         token: stm_tok,
-                        fds: [
-                            stm1.into_raw_fd(),
-                            input_file.into_raw_fd(),
-                            output_file.into_raw_fd(),
+                        platform_handles: [
+                            PlatformHandle::from(stm1),
+                            PlatformHandle::from(input_file),
+                            PlatformHandle::from(output_file),
                         ],
+                        target_pid: self.remote_pid.unwrap()
                     }))
                 }).map_err(|e| e.into())
         }
     }
 }
 
 // C callable callbacks
 unsafe extern "C" fn data_cb_c(
--- a/media/audioipc/update.sh
+++ b/media/audioipc/update.sh
@@ -6,16 +6,18 @@ cp -p $1/Cargo.toml .
 
 for crate in audioipc client server; do
     test -d $crate/src || mkdir -p $crate/src
     rm -fr $crate/*
     cp -pr $1/$crate/Cargo.toml $crate/
     cp -pr $1/$crate/src/ $crate/src/
 done
 
+rm audioipc/src/cmsghdr.c
+
 if [ -d $1/.git ]; then
   rev=$(cd $1 && git rev-parse --verify HEAD)
   date=$(cd $1 && git show -s --format=%ci HEAD)
   dirty=$(cd $1 && git diff-index --name-only HEAD)
 fi
 
 if [ -n "$rev" ]; then
   version=$rev
@@ -27,10 +29,8 @@ if [ -n "$rev" ]; then
   rm README_MOZILLA.bak
 else
   echo "Remember to update README_MOZILLA with the version details."
 fi
 
 echo "Applying gecko.patch on top of $rev"
 patch -p3 < gecko.patch
 
-echo "Applying register-collection-not-supported.patch on top of $rev"
-patch -p3 < register-collection-not-supported.patch