third_party/rust/futures/tests/unsync.rs
author Bastien Orivel <eijebong@bananium.fr>
Wed, 22 Aug 2018 20:19:04 +0200
changeset 488156 03df1a267ea54f1024d3f015f4bf781477262c92
parent 459638 53cea1ff8415c833c631bfa0a417bcfade0dd062
permissions -rw-r--r--
Bug 1484462 - Revendor Rust dependencies. r=ato

#![cfg(feature = "use_std")]

extern crate futures;

mod support;

use futures::prelude::*;
use futures::unsync::oneshot;
use futures::unsync::mpsc::{self, SendError};
use futures::future::lazy;
use futures::stream::{iter_ok, unfold};

use support::local_executor::Core;

#[test]
fn mpsc_send_recv() {
    let (tx, rx) = mpsc::channel::<i32>(1);
    let mut rx = rx.wait();

    tx.send(42).wait().unwrap();

    assert_eq!(rx.next(), Some(Ok(42)));
    assert_eq!(rx.next(), None);
}

#[test]
fn mpsc_rx_notready() {
    let (_tx, mut rx) = mpsc::channel::<i32>(1);

    lazy(|| {
        assert_eq!(rx.poll().unwrap(), Async::NotReady);
        Ok(()) as Result<(), ()>
    }).wait().unwrap();
}

#[test]
fn mpsc_rx_end() {
    let (_, mut rx) = mpsc::channel::<i32>(1);

    lazy(|| {
        assert_eq!(rx.poll().unwrap(), Async::Ready(None));
        Ok(()) as Result<(), ()>
    }).wait().unwrap();
}

#[test]
fn mpsc_tx_clone_weak_rc() {
    let (tx, mut rx) = mpsc::channel::<i32>(1); // rc = 1

    let tx_clone = tx.clone(); // rc = 2
    lazy(|| {
        assert_eq!(rx.poll().unwrap(), Async::NotReady);
        Ok(()) as Result<(), ()>
    }).wait().unwrap();

    drop(tx); // rc = 1
    lazy(|| {
        assert_eq!(rx.poll().unwrap(), Async::NotReady);
        Ok(()) as Result<(), ()>
    }).wait().unwrap();

    drop(tx_clone); // rc = 0
    lazy(|| {
        assert_eq!(rx.poll().unwrap(), Async::Ready(None));
        Ok(()) as Result<(), ()>
    }).wait().unwrap();
}

#[test]
fn mpsc_tx_notready() {
    let (tx, _rx) = mpsc::channel::<i32>(1);
    let tx = tx.send(1).wait().unwrap();
    lazy(move || {
        assert!(tx.send(2).poll().unwrap().is_not_ready());
        Ok(()) as Result<(), ()>
    }).wait().unwrap();
}

#[test]
fn mpsc_tx_err() {
    let (tx, _) = mpsc::channel::<i32>(1);
    lazy(move || {
        assert!(tx.send(2).poll().is_err());
        Ok(()) as Result<(), ()>
    }).wait().unwrap();
}

#[test]
fn mpsc_backpressure() {
    let (tx, rx) = mpsc::channel::<i32>(1);
    lazy(move || {
        iter_ok(vec![1, 2, 3])
            .forward(tx)
            .map_err(|e: SendError<i32>| panic!("{}", e))
            .join(rx.take(3).collect().map(|xs| {
                assert_eq!(xs, [1, 2, 3]);
            }))
    }).wait().unwrap();
}

#[test]
fn mpsc_unbounded() {
    let (tx, rx) = mpsc::unbounded::<i32>();
    lazy(move || {
        iter_ok(vec![1, 2, 3])
            .forward(tx)
            .map_err(|e: SendError<i32>| panic!("{}", e))
            .join(rx.take(3).collect().map(|xs| {
                assert_eq!(xs, [1, 2, 3]);
            }))
    }).wait().unwrap();
}

#[test]
fn mpsc_recv_unpark() {
    let core = Core::new();
    let (tx, rx) = mpsc::channel::<i32>(1);
    let tx2 = tx.clone();
    core.spawn(rx.collect().map(|xs| assert_eq!(xs, [1, 2])));
    core.spawn(lazy(move || tx.send(1).map(|_| ()).map_err(|e| panic!("{}", e))));
    core.run(lazy(move || tx2.send(2))).unwrap();
}

#[test]
fn mpsc_send_unpark() {
    let core = Core::new();
    let (tx, rx) = mpsc::channel::<i32>(1);
    let (donetx, donerx) = oneshot::channel();
    core.spawn(iter_ok(vec![1, 2]).forward(tx)
        .then(|x: Result<_, SendError<i32>>| {
            assert!(x.is_err());
            donetx.send(()).unwrap();
            Ok(())
        }));
    core.spawn(lazy(move || { let _ = rx; Ok(()) }));
    core.run(donerx).unwrap();
}

#[test]
fn spawn_sends_items() {
    let core = Core::new();
    let stream = unfold(0, |i| Some(Ok::<_,u8>((i, i + 1))));
    let rx = mpsc::spawn(stream, &core, 1);
    assert_eq!(core.run(rx.take(4).collect()).unwrap(),
               [0, 1, 2, 3]);
}

#[test]
fn spawn_kill_dead_stream() {
    use std::thread;
    use std::time::Duration;
    use futures::future::Either;

    // a stream which never returns anything (maybe a remote end isn't
    // responding), but dropping it leads to observable side effects
    // (like closing connections, releasing limited resources, ...)
    #[derive(Debug)]
    struct Dead {
        // when dropped you should get Err(oneshot::Canceled) on the
        // receiving end
        done: oneshot::Sender<()>,
    }
    impl Stream for Dead {
        type Item = ();
        type Error = ();

        fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
            Ok(Async::NotReady)
        }
    }

    // need to implement a timeout for the test, as it would hang
    // forever right now
    let (timeout_tx, timeout_rx) = futures::sync::oneshot::channel();
    thread::spawn(move || {
        thread::sleep(Duration::from_millis(1000));
        let _ = timeout_tx.send(());
    });

    let core = Core::new();
    let (done_tx, done_rx) = oneshot::channel();
    let stream = Dead{done: done_tx};
    let rx = mpsc::spawn(stream, &core, 1);
    let res = core.run(
        Ok::<_, ()>(())
        .into_future()
        .then(move |_| {
            // now drop the spawned stream: maybe some timeout exceeded,
            // or some connection on this end was closed by the remote
            // end.
            drop(rx);
            // and wait for the spawned stream to release its resources
            done_rx
        })
        .select2(timeout_rx)
    );
    match res {
        Err(Either::A((oneshot::Canceled, _))) => (),
        _ => {
            panic!("dead stream wasn't canceled");
        },
    }
}


/// Test case for PR #768 (issue #766).
/// The issue was:
/// Given that an empty channel is polled by the Receiver, and the only Sender
/// gets dropped without sending anything, then the Receiver would get stuck.

#[test]
fn dropped_sender_of_unused_channel_notifies_receiver() {
    let core = Core::new();
    type FUTURE = Box<futures::Future<Item=u8, Error=()>>;

    // Constructs the channel which we want to test, and two futures which
    // act on that channel.
    let pair = |reverse| -> Vec<FUTURE> {
        // This is the channel which we want to test.
        let (tx, rx) = mpsc::channel::<u8>(1);
        let mut futures: Vec<FUTURE> = vec![
            Box::new(futures::stream::iter_ok(vec![])
                .forward(tx)
                .map_err(|_: mpsc::SendError<u8>| ())
                .map(|_| 42)
            ),
            Box::new(rx.fold((), |_, _| Ok(()))
                .map(|_| 24)
            ),
        ];
        if reverse {
            futures.reverse();
        }
        futures
    };

    let make_test_future = |reverse| -> Box<Future<Item=Vec<u8>, Error=()>> {
        let f = futures::future::join_all(pair(reverse));

        // Use a timeout. This is not meant to test the `sync::oneshot` but
        // merely uses it to implement this timeout.
        let (timeout_tx, timeout_rx) = futures::sync::oneshot::channel::<Vec<u8>>();
        std::thread::spawn(move || {
            std::thread::sleep(std::time::Duration::from_millis(1000));
            let x = timeout_tx.send(vec![0]);
            assert!(x.is_err(), "Test timed out.");
        });

        Box::new(f.select(timeout_rx.map_err(|_|()))
            .map_err(|x| x.0)
            .map(|x| x.0)
        )
    };

    // The order of the tested futures is important to test fix of PR #768.
    // We want future_2 to poll on the Receiver before the Sender is dropped.
    let result = core.run(make_test_future(false));
    assert!(result.is_ok());
    assert_eq!(vec![42, 24], result.unwrap());

    // Test also the other ordering:
    let result = core.run(make_test_future(true));
    assert!(result.is_ok());
    assert_eq!(vec![24, 42], result.unwrap());
}