blob: 5d4d3022d6e72620d0c0de4d44e4a64e1d27f5ea [file] [log] [blame]
#![cfg(all(windows, feature = "os-poll", feature = "os-ext"))]
use std::fs::OpenOptions;
use std::io::{self, Read, Write};
use std::os::windows::fs::OpenOptionsExt;
use std::os::windows::io::{FromRawHandle, IntoRawHandle};
use std::time::Duration;
use mio::windows::NamedPipe;
use mio::{Events, Interest, Poll, Token};
use rand::Rng;
use winapi::shared::winerror::*;
use winapi::um::winbase::FILE_FLAG_OVERLAPPED;
fn _assert_kinds() {
fn _assert_send<T: Send>() {}
fn _assert_sync<T: Sync>() {}
_assert_send::<NamedPipe>();
_assert_sync::<NamedPipe>();
}
macro_rules! t {
($e:expr) => {
match $e {
Ok(e) => e,
Err(e) => panic!("{} failed with {}", stringify!($e), e),
}
};
}
fn server() -> (NamedPipe, String) {
let num: u64 = rand::thread_rng().gen();
let name = format!(r"\\.\pipe\my-pipe-{}", num);
let pipe = t!(NamedPipe::new(&name));
(pipe, name)
}
fn client(name: &str) -> NamedPipe {
let mut opts = OpenOptions::new();
opts.read(true)
.write(true)
.custom_flags(FILE_FLAG_OVERLAPPED);
let file = t!(opts.open(name));
unsafe { NamedPipe::from_raw_handle(file.into_raw_handle()) }
}
fn pipe() -> (NamedPipe, NamedPipe) {
let (pipe, name) = server();
(pipe, client(&name))
}
#[test]
fn writable_after_register() {
let (mut server, mut client) = pipe();
let mut poll = t!(Poll::new());
t!(poll.registry().register(
&mut server,
Token(0),
Interest::WRITABLE | Interest::READABLE,
));
t!(poll
.registry()
.register(&mut client, Token(1), Interest::WRITABLE));
let mut events = Events::with_capacity(128);
t!(poll.poll(&mut events, None));
let events = events.iter().collect::<Vec<_>>();
assert!(events
.iter()
.any(|e| { e.token() == Token(0) && e.is_writable() }));
assert!(events
.iter()
.any(|e| { e.token() == Token(1) && e.is_writable() }));
}
#[test]
fn write_then_read() {
let (mut server, mut client) = pipe();
let mut poll = t!(Poll::new());
t!(poll.registry().register(
&mut server,
Token(0),
Interest::READABLE | Interest::WRITABLE,
));
t!(poll.registry().register(
&mut client,
Token(1),
Interest::READABLE | Interest::WRITABLE,
));
let mut events = Events::with_capacity(128);
t!(poll.poll(&mut events, None));
assert_eq!(t!(client.write(b"1234")), 4);
loop {
t!(poll.poll(&mut events, None));
let events = events.iter().collect::<Vec<_>>();
if let Some(event) = events.iter().find(|e| e.token() == Token(0)) {
if event.is_readable() {
break;
}
}
}
let mut buf = [0; 10];
assert_eq!(t!(server.read(&mut buf)), 4);
assert_eq!(&buf[..4], b"1234");
}
#[test]
fn connect_before_client() {
let (mut server, name) = server();
let mut poll = t!(Poll::new());
t!(poll.registry().register(
&mut server,
Token(0),
Interest::READABLE | Interest::WRITABLE,
));
let mut events = Events::with_capacity(128);
t!(poll.poll(&mut events, Some(Duration::new(0, 0))));
let e = events.iter().collect::<Vec<_>>();
assert_eq!(e.len(), 0);
assert_eq!(
server.connect().err().unwrap().kind(),
io::ErrorKind::WouldBlock
);
let mut client = client(&name);
t!(poll.registry().register(
&mut client,
Token(1),
Interest::READABLE | Interest::WRITABLE,
));
loop {
t!(poll.poll(&mut events, None));
let e = events.iter().collect::<Vec<_>>();
if let Some(event) = e.iter().find(|e| e.token() == Token(0)) {
if event.is_writable() {
break;
}
}
}
}
#[test]
fn connect_after_client() {
let (mut server, name) = server();
let mut poll = t!(Poll::new());
t!(poll.registry().register(
&mut server,
Token(0),
Interest::READABLE | Interest::WRITABLE,
));
let mut events = Events::with_capacity(128);
t!(poll.poll(&mut events, Some(Duration::new(0, 0))));
let e = events.iter().collect::<Vec<_>>();
assert_eq!(e.len(), 0);
let mut client = client(&name);
t!(poll.registry().register(
&mut client,
Token(1),
Interest::READABLE | Interest::WRITABLE,
));
t!(server.connect());
loop {
t!(poll.poll(&mut events, None));
let e = events.iter().collect::<Vec<_>>();
if let Some(event) = e.iter().find(|e| e.token() == Token(0)) {
if event.is_writable() {
break;
}
}
}
}
#[test]
fn write_disconnected() {
let mut poll = t!(Poll::new());
let (mut server, mut client) = pipe();
t!(poll.registry().register(
&mut server,
Token(0),
Interest::READABLE | Interest::WRITABLE,
));
t!(poll.registry().register(
&mut client,
Token(1),
Interest::READABLE | Interest::WRITABLE,
));
drop(client);
let mut events = Events::with_capacity(128);
t!(poll.poll(&mut events, None));
assert!(events.iter().count() > 0);
// this should not hang
let mut i = 0;
loop {
i += 1;
assert!(i < 16, "too many iterations");
match server.write(&[0]) {
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
t!(poll.poll(&mut events, None));
assert!(events.iter().count() > 0);
}
Err(e) if e.raw_os_error() == Some(ERROR_NO_DATA as i32) => break,
e => panic!("{:?}", e),
}
}
}
#[test]
fn write_then_drop() {
let (mut server, mut client) = pipe();
let mut poll = t!(Poll::new());
t!(poll.registry().register(
&mut server,
Token(0),
Interest::READABLE | Interest::WRITABLE,
));
t!(poll.registry().register(
&mut client,
Token(1),
Interest::READABLE | Interest::WRITABLE,
));
assert_eq!(t!(client.write(b"1234")), 4);
drop(client);
let mut events = Events::with_capacity(128);
'outer: loop {
t!(poll.poll(&mut events, None));
let events = events.iter().collect::<Vec<_>>();
for event in &events {
if event.is_readable() && event.token() == Token(0) {
break 'outer;
}
}
}
let mut buf = [0; 10];
assert_eq!(t!(server.read(&mut buf)), 4);
assert_eq!(&buf[..4], b"1234");
}
#[test]
fn connect_twice() {
let (mut server, name) = server();
let mut c1 = client(&name);
let mut poll = t!(Poll::new());
t!(poll.registry().register(
&mut server,
Token(0),
Interest::READABLE | Interest::WRITABLE,
));
t!(poll
.registry()
.register(&mut c1, Token(1), Interest::READABLE | Interest::WRITABLE,));
drop(c1);
let mut events = Events::with_capacity(128);
loop {
t!(poll.poll(&mut events, None));
let events = events.iter().collect::<Vec<_>>();
if let Some(event) = events.iter().find(|e| e.token() == Token(0)) {
if event.is_readable() {
let mut buf = [0; 10];
match server.read(&mut buf) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Ok(0) => break,
res => panic!("{:?}", res),
}
}
}
}
t!(server.disconnect());
assert_eq!(
server.connect().err().unwrap().kind(),
io::ErrorKind::WouldBlock
);
let mut c2 = client(&name);
t!(poll
.registry()
.register(&mut c2, Token(2), Interest::READABLE | Interest::WRITABLE,));
'outer: loop {
t!(poll.poll(&mut events, None));
let events = events.iter().collect::<Vec<_>>();
for event in &events {
if event.is_writable() && event.token() == Token(0) {
break 'outer;
}
}
}
}
#[test]
fn reregister_deregister_before_register() {
let (mut pipe, _) = server();
let poll = t!(Poll::new());
assert_eq!(
poll.registry()
.reregister(&mut pipe, Token(0), Interest::READABLE,)
.unwrap_err()
.kind(),
io::ErrorKind::NotFound,
);
assert_eq!(
poll.registry().deregister(&mut pipe).unwrap_err().kind(),
io::ErrorKind::NotFound,
);
}
#[test]
fn reregister_deregister_different_poll() {
let (mut pipe, _) = server();
let poll1 = t!(Poll::new());
let poll2 = t!(Poll::new());
// Register with 1
t!(poll1
.registry()
.register(&mut pipe, Token(0), Interest::READABLE));
assert_eq!(
poll2
.registry()
.reregister(&mut pipe, Token(0), Interest::READABLE,)
.unwrap_err()
.kind(),
io::ErrorKind::AlreadyExists,
);
assert_eq!(
poll2.registry().deregister(&mut pipe).unwrap_err().kind(),
io::ErrorKind::AlreadyExists,
);
}