media/audioipc/audioipc/src/async.rs
author Matthew Gregan <kinetik@flim.org>
Wed, 06 Mar 2019 20:42:38 +0000
changeset 520578 0a78b7e72e1bf620cd70608abb7f31f389f87fcc
parent 498950 74f6186ded6d62b5cfd170cc099acf9d11419382
child 520589 873b90887e3af682383dea7a7e1fb52d6ec02960
permissions -rw-r--r--
Bug 1512445 - Import latest AudioIPC from upstream, including Windows backend. r=chunmin Differential Revision: https://phabricator.services.mozilla.com/D22153

// Copyright © 2017 Mozilla Foundation
//
// This program is made available under an ISC-style license.  See the
// accompanying file LICENSE for details

//! Various async helpers modelled after futures-rs and tokio-io.

use bytes::{Buf, BufMut};
#[cfg(unix)]
use futures::Async;
use futures::Poll;
#[cfg(unix)]
use iovec::IoVec;
#[cfg(unix)]
use msg::{RecvMsg, SendMsg};
use std::io;
use tokio_io::{AsyncRead, AsyncWrite};

pub trait AsyncRecvMsg: AsyncRead {
    /// Pull some bytes from this source into the specified `Buf`, returning
    /// how many bytes were read.
    ///
    /// The `buf` provided will have bytes read into it and the internal cursor
    /// will be advanced if any bytes were read. Note that this method typically
    /// will not reallocate the buffer provided.
    fn recv_msg_buf<B>(&mut self, buf: &mut B, cmsg: &mut B) -> Poll<(usize, i32), io::Error>
    where
        Self: Sized,
        B: BufMut;
}

/// A trait for writable objects which operated in an async fashion.
///
/// This trait inherits from `std::io::Write` and indicates that an I/O object is
/// **nonblocking**, meaning that it will return an error instead of blocking
/// when bytes cannot currently be written, but hasn't closed. Specifically
/// this means that the `write` function for types that implement this trait
/// can have a few return values:
///
/// * `Ok(n)` means that `n` bytes of data was immediately written .
/// * `Err(e) if e.kind() == ErrorKind::WouldBlock` means that no data was
///   written from the buffer provided. The I/O object is not currently
///   writable but may become writable in the future.
/// * `Err(e)` for other errors are standard I/O errors coming from the
///   underlying object.
pub trait AsyncSendMsg: AsyncWrite {
    /// Write a `Buf` into this value, returning how many bytes were written.
    ///
    /// Note that this method will advance the `buf` provided automatically by
    /// the number of bytes written.
    fn send_msg_buf<B, C>(&mut self, buf: &mut B, cmsg: &C) -> Poll<usize, io::Error>
    where
        Self: Sized,
        B: Buf,
        C: Buf;
}

////////////////////////////////////////////////////////////////////////////////

#[cfg(unix)]
impl AsyncRecvMsg for super::AsyncMessageStream {
    fn recv_msg_buf<B>(&mut self, buf: &mut B, cmsg: &mut B) -> Poll<(usize, i32), io::Error>
    where
        B: BufMut,
    {
        if let Async::NotReady = <super::AsyncMessageStream>::poll_read(self) {
            return Ok(Async::NotReady);
        }
        let r = unsafe {
            // The `IoVec` type can't have a 0-length size, so we create a bunch
            // of dummy versions on the stack with 1 length which we'll quickly
            // overwrite.
            let b1: &mut [u8] = &mut [0];
            let b2: &mut [u8] = &mut [0];
            let b3: &mut [u8] = &mut [0];
            let b4: &mut [u8] = &mut [0];
            let b5: &mut [u8] = &mut [0];
            let b6: &mut [u8] = &mut [0];
            let b7: &mut [u8] = &mut [0];
            let b8: &mut [u8] = &mut [0];
            let b9: &mut [u8] = &mut [0];
            let b10: &mut [u8] = &mut [0];
            let b11: &mut [u8] = &mut [0];
            let b12: &mut [u8] = &mut [0];
            let b13: &mut [u8] = &mut [0];
            let b14: &mut [u8] = &mut [0];
            let b15: &mut [u8] = &mut [0];
            let b16: &mut [u8] = &mut [0];
            let mut bufs: [&mut IoVec; 16] = [
                b1.into(),
                b2.into(),
                b3.into(),
                b4.into(),
                b5.into(),
                b6.into(),
                b7.into(),
                b8.into(),
                b9.into(),
                b10.into(),
                b11.into(),
                b12.into(),
                b13.into(),
                b14.into(),
                b15.into(),
                b16.into(),
            ];
            let n = buf.bytes_vec_mut(&mut bufs);
            self.recv_msg(&mut bufs[..n], cmsg.bytes_mut())
        };

        match r {
            Ok((n, cmsg_len, flags)) => {
                unsafe {
                    buf.advance_mut(n);
                }
                unsafe {
                    cmsg.advance_mut(cmsg_len);
                }
                Ok((n, flags).into())
            }
            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                self.need_read();
                Ok(Async::NotReady)
            }
            Err(e) => Err(e),
        }
    }
}

#[cfg(unix)]
impl AsyncSendMsg for super::AsyncMessageStream {
    fn send_msg_buf<B, C>(&mut self, buf: &mut B, cmsg: &C) -> Poll<usize, io::Error>
    where
        B: Buf,
        C: Buf,
    {
        if let Async::NotReady = <super::AsyncMessageStream>::poll_write(self) {
            return Ok(Async::NotReady);
        }
        let r = {
            // The `IoVec` type can't have a zero-length size, so create a dummy
            // version from a 1-length slice which we'll overwrite with the
            // `bytes_vec` method.
            static DUMMY: &[u8] = &[0];
            let nom = <&IoVec>::from(DUMMY);
            let mut bufs = [
                nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom,
            ];
            let n = buf.bytes_vec(&mut bufs);
            self.send_msg(&bufs[..n], cmsg.bytes())
        };
        match r {
            Ok(n) => {
                buf.advance(n);
                Ok(Async::Ready(n))
            }
            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                self.need_write();
                Ok(Async::NotReady)
            }
            Err(e) => Err(e),
        }
    }
}