blob: 645f8b7b248b8684d9d5bf4ec08dd4dde3ba946e [file] [log] [blame]
#![cfg(all(unix, feature = "os-poll", feature = "os-ext"))]
use std::io::{Read, Write};
use std::process::{Command, Stdio};
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Duration;
use mio::unix::pipe::{self, Receiver, Sender};
use mio::{Events, Interest, Poll, Token};
mod util;
use util::{assert_would_block, expect_events, ExpectEvent};
const RECEIVER: Token = Token(0);
const SENDER: Token = Token(1);
const DATA1: &[u8; 11] = b"Hello world";
#[test]
fn smoke() {
let mut poll = Poll::new().unwrap();
let mut events = Events::with_capacity(8);
let (mut sender, mut receiver) = pipe::new().unwrap();
let mut buf = [0; 20];
assert_would_block(receiver.read(&mut buf));
poll.registry()
.register(&mut receiver, RECEIVER, Interest::READABLE)
.unwrap();
poll.registry()
.register(&mut sender, SENDER, Interest::WRITABLE)
.unwrap();
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(SENDER, Interest::WRITABLE)],
);
let n = sender.write(DATA1).unwrap();
assert_eq!(n, DATA1.len());
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(RECEIVER, Interest::READABLE)],
);
let n = receiver.read(&mut buf).unwrap();
assert_eq!(n, DATA1.len());
assert_eq!(&buf[..n], &*DATA1);
}
#[test]
fn event_when_sender_is_dropped() {
let mut poll = Poll::new().unwrap();
let mut events = Events::with_capacity(8);
let (mut sender, mut receiver) = pipe::new().unwrap();
poll.registry()
.register(&mut receiver, RECEIVER, Interest::READABLE)
.unwrap();
let barrier = Arc::new(Barrier::new(2));
let thread_barrier = barrier.clone();
let handle = thread::spawn(move || {
let n = sender.write(DATA1).unwrap();
assert_eq!(n, DATA1.len());
thread_barrier.wait();
thread_barrier.wait();
drop(sender);
thread_barrier.wait();
});
barrier.wait(); // Wait for the write to complete.
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(RECEIVER, Interest::READABLE)],
);
barrier.wait(); // Unblock the thread.
barrier.wait(); // Wait until the sending end is dropped.
expect_one_closed_event(&mut poll, &mut events, RECEIVER, true);
handle.join().unwrap();
}
#[test]
fn event_when_receiver_is_dropped() {
let mut poll = Poll::new().unwrap();
let mut events = Events::with_capacity(8);
let (mut sender, receiver) = pipe::new().unwrap();
poll.registry()
.register(&mut sender, SENDER, Interest::WRITABLE)
.unwrap();
let barrier = Arc::new(Barrier::new(2));
let thread_barrier = barrier.clone();
let handle = thread::spawn(move || {
thread_barrier.wait();
drop(receiver);
thread_barrier.wait();
});
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(SENDER, Interest::WRITABLE)],
);
barrier.wait(); // Unblock the thread.
barrier.wait(); // Wait until the receiving end is dropped.
expect_one_closed_event(&mut poll, &mut events, SENDER, false);
handle.join().unwrap();
}
#[test]
fn from_child_process_io() {
// `cat` simply echo everything that we write via standard in.
let mut child = Command::new("cat")
.env_clear()
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.expect("failed to start `cat` command");
let mut sender = Sender::from(child.stdin.take().unwrap());
let mut receiver = Receiver::from(child.stdout.take().unwrap());
let mut poll = Poll::new().unwrap();
let mut events = Events::with_capacity(8);
poll.registry()
.register(&mut receiver, RECEIVER, Interest::READABLE)
.unwrap();
poll.registry()
.register(&mut sender, SENDER, Interest::WRITABLE)
.unwrap();
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(SENDER, Interest::WRITABLE)],
);
let n = sender.write(DATA1).unwrap();
assert_eq!(n, DATA1.len());
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(RECEIVER, Interest::READABLE)],
);
let mut buf = [0; 20];
let n = receiver.read(&mut buf).unwrap();
assert_eq!(n, DATA1.len());
assert_eq!(&buf[..n], &*DATA1);
drop(sender);
expect_one_closed_event(&mut poll, &mut events, RECEIVER, true);
child.wait().unwrap();
}
#[test]
fn nonblocking_child_process_io() {
// `cat` simply echo everything that we write via standard in.
let mut child = Command::new("cat")
.env_clear()
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.expect("failed to start `cat` command");
let sender = Sender::from(child.stdin.take().unwrap());
let mut receiver = Receiver::from(child.stdout.take().unwrap());
receiver.set_nonblocking(true).unwrap();
let mut buf = [0; 20];
assert_would_block(receiver.read(&mut buf));
drop(sender);
child.wait().unwrap();
}
/// Expected a closed event. If `read` is true is checks for `is_read_closed`,
/// otherwise for `is_write_closed`.
pub fn expect_one_closed_event(poll: &mut Poll, events: &mut Events, token: Token, read: bool) {
poll.poll(events, Some(Duration::from_secs(1))).unwrap();
let mut iter = events.iter();
let event = iter.next().unwrap();
assert_eq!(event.token(), token, "invalid token, event: {:#?}", event);
if read {
assert!(
event.is_read_closed(),
"expected closed or error, event: {:#?}",
event
);
} else {
assert!(
event.is_write_closed(),
"expected closed or error, event: {:#?}",
event
);
}
assert!(iter.next().is_none());
}