blob: 32c73f0c69bc1e897818e11da95ceeba965ab7e6 [file] [log] [blame]
use std::fmt;
use std::io::{self, Read, Write, Cursor, ErrorKind};
use std::mem;
use std::net::{self, SocketAddr};
use std::sync::{Mutex, MutexGuard};
use io::would_block;
use miow::iocp::CompletionStatus;
use miow::net::*;
use net2::{TcpBuilder, TcpStreamExt as Net2TcpExt};
use net::tcp::Shutdown;
use winapi::*;
use {Evented, Ready, Poll, PollOpt, Token};
use poll;
use sys::windows::from_raw_arc::FromRawArc;
use sys::windows::selector::{Overlapped, Registration};
use sys::windows::{wouldblock, Family};
pub struct TcpStream {
/// Separately stored implementation to ensure that the `Drop`
/// implementation on this type is only executed when it's actually dropped
/// (many clones of this `imp` are made).
imp: StreamImp,
registration: Mutex<Option<poll::Registration>>,
}
pub struct TcpListener {
imp: ListenerImp,
registration: Mutex<Option<poll::Registration>>,
}
#[derive(Clone)]
struct StreamImp {
/// A stable address and synchronized access for all internals. This serves
/// to ensure that all `Overlapped` pointers are valid for a long period of
/// time as well as allowing completion callbacks to have access to the
/// internals without having ownership.
///
/// Note that the reference count also allows us "loan out" copies to
/// completion ports while I/O is running to guarantee that this stays alive
/// until the I/O completes. You'll notice a number of calls to
/// `mem::forget` below, and these only happen on successful scheduling of
/// I/O and are paired with `overlapped2arc!` macro invocations in the
/// completion callbacks (to have a decrement match the increment).
inner: FromRawArc<StreamIo>,
}
#[derive(Clone)]
struct ListenerImp {
inner: FromRawArc<ListenerIo>,
}
struct StreamIo {
inner: Mutex<StreamInner>,
read: Overlapped, // also used for connect
write: Overlapped,
socket: net::TcpStream,
}
struct ListenerIo {
inner: Mutex<ListenerInner>,
accept: Overlapped,
family: Family,
socket: net::TcpListener,
}
struct StreamInner {
iocp: Registration,
deferred_connect: Option<SocketAddr>,
read: State<Vec<u8>, Cursor<Vec<u8>>>,
write: State<(Vec<u8>, usize), (Vec<u8>, usize)>,
}
struct ListenerInner {
iocp: Registration,
accept: State<net::TcpStream, (net::TcpStream, SocketAddr)>,
accept_buf: AcceptAddrsBuf,
}
enum State<T, U> {
Empty, // no I/O operation in progress
Pending(T), // an I/O operation is in progress
Ready(U), // I/O has finished with this value
Error(io::Error), // there was an I/O error
}
impl TcpStream {
fn new(socket: net::TcpStream,
deferred_connect: Option<SocketAddr>) -> TcpStream {
TcpStream {
registration: Mutex::new(None),
imp: StreamImp {
inner: FromRawArc::new(StreamIo {
read: Overlapped::new(read_done),
write: Overlapped::new(write_done),
socket: socket,
inner: Mutex::new(StreamInner {
iocp: Registration::new(),
deferred_connect: deferred_connect,
read: State::Empty,
write: State::Empty,
}),
}),
},
}
}
pub fn connect(socket: net::TcpStream, addr: &SocketAddr)
-> io::Result<TcpStream> {
Ok(TcpStream::new(socket, Some(*addr)))
}
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
self.imp.inner.socket.peer_addr()
}
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.imp.inner.socket.local_addr()
}
pub fn try_clone(&self) -> io::Result<TcpStream> {
self.imp.inner.socket.try_clone().map(|s| TcpStream::new(s, None))
}
pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
self.imp.inner.socket.shutdown(how)
}
pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
self.imp.inner.socket.set_nodelay(nodelay)
}
pub fn nodelay(&self) -> io::Result<bool> {
self.imp.inner.socket.nodelay()
}
pub fn set_keepalive_ms(&self, millis: Option<u32>) -> io::Result<()> {
self.imp.inner.socket.set_keepalive_ms(millis)
}
pub fn keepalive_ms(&self) -> io::Result<Option<u32>> {
self.imp.inner.socket.keepalive_ms()
}
pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
self.imp.inner.socket.set_ttl(ttl)
}
pub fn ttl(&self) -> io::Result<u32> {
self.imp.inner.socket.ttl()
}
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
self.imp.inner.socket.take_error()
}
fn inner(&self) -> MutexGuard<StreamInner> {
self.imp.inner()
}
fn post_register(&self, interest: Ready, me: &mut StreamInner) {
if interest.is_readable() {
self.imp.schedule_read(me);
}
// At least with epoll, if a socket is registered with an interest in
// writing and it's immediately writable then a writable event is
// generated immediately, so do so here.
if interest.is_writable() {
if let State::Empty = me.write {
self.imp.add_readiness(me, Ready::writable());
}
}
}
pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
let mut me = self.inner();
match mem::replace(&mut me.read, State::Empty) {
State::Empty => Err(wouldblock()),
State::Pending(buf) => {
me.read = State::Pending(buf);
Err(wouldblock())
}
State::Ready(mut cursor) => {
let amt = try!(cursor.read(buf));
// Once the entire buffer is written we need to schedule the
// next read operation.
if cursor.position() as usize == cursor.get_ref().len() {
me.iocp.put_buffer(cursor.into_inner());
self.imp.schedule_read(&mut me);
} else {
me.read = State::Ready(cursor);
}
Ok(amt)
}
State::Error(e) => {
self.imp.schedule_read(&mut me);
Err(e)
}
}
}
pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
let mut me = self.inner();
let me = &mut *me;
match me.write {
State::Empty => {}
_ => return Err(wouldblock())
}
if !me.iocp.registered() {
return Err(wouldblock())
}
let mut intermediate = me.iocp.get_buffer(64 * 1024);
let amt = try!(intermediate.write(buf));
self.imp.schedule_write(intermediate, 0, me);
Ok(amt)
}
pub fn flush(&self) -> io::Result<()> {
Ok(())
}
}
impl StreamImp {
fn inner(&self) -> MutexGuard<StreamInner> {
self.inner.inner.lock().unwrap()
}
fn schedule_connect(&self, addr: &SocketAddr) -> io::Result<()> {
unsafe {
trace!("scheduling a connect");
try!(self.inner.socket.connect_overlapped(addr,
self.inner.read.get_mut()));
}
// see docs above on StreamImp.inner for rationale on forget
mem::forget(self.clone());
Ok(())
}
/// Issues a "read" operation for this socket, if applicable.
///
/// This is intended to be invoked from either a completion callback or a
/// normal context. The function is infallible because errors are stored
/// internally to be returned later.
///
/// It is required that this function is only called after the handle has
/// been registered with an event loop.
fn schedule_read(&self, me: &mut StreamInner) {
match me.read {
State::Empty => {}
State::Ready(_) | State::Error(_) => {
self.add_readiness(me, Ready::readable());
return;
}
_ => return,
}
me.iocp.set_readiness(me.iocp.readiness() & !Ready::readable());
let mut buf = me.iocp.get_buffer(64 * 1024);
let res = unsafe {
trace!("scheduling a read");
let cap = buf.capacity();
buf.set_len(cap);
self.inner.socket.read_overlapped(&mut buf,
self.inner.read.get_mut())
};
match res {
Ok(_) => {
// see docs above on StreamImp.inner for rationale on forget
me.read = State::Pending(buf);
mem::forget(self.clone());
}
Err(e) => {
// Like above, be sure to indicate that hup has happened
// whenever we get `ECONNRESET`
let mut set = Ready::readable();
if e.raw_os_error() == Some(WSAECONNRESET as i32) {
trace!("tcp stream at hup: econnreset");
set = set | Ready::hup();
}
me.read = State::Error(e);
self.add_readiness(me, set);
me.iocp.put_buffer(buf);
}
}
}
/// Similar to `schedule_read`, except that this issues, well, writes.
///
/// This function will continually attempt to write the entire contents of
/// the buffer `buf` until they have all been written. The `pos` argument is
/// the current offset within the buffer up to which the contents have
/// already been written.
///
/// A new writable event (e.g. allowing another write) will only happen once
/// the buffer has been written completely (or hit an error).
fn schedule_write(&self,
buf: Vec<u8>,
pos: usize,
me: &mut StreamInner) {
// About to write, clear any pending level triggered events
me.iocp.set_readiness(me.iocp.readiness() & !Ready::writable());
trace!("scheduling a write");
let err = unsafe {
self.inner.socket.write_overlapped(&buf[pos..],
self.inner.write.get_mut())
};
match err {
Ok(_) => {
// see docs above on StreamImp.inner for rationale on forget
me.write = State::Pending((buf, pos));
mem::forget(self.clone());
}
Err(e) => {
me.write = State::Error(e);
self.add_readiness(me, Ready::writable());
me.iocp.put_buffer(buf);
}
}
}
/// Pushes an event for this socket onto the selector its registered for.
///
/// When an event is generated on this socket, if it happened after the
/// socket was closed then we don't want to actually push the event onto our
/// selector as otherwise it's just a spurious notification.
fn add_readiness(&self, me: &mut StreamInner, set: Ready) {
me.iocp.set_readiness(set | me.iocp.readiness());
}
}
fn read_done(status: &CompletionStatus) {
let me2 = StreamImp {
inner: unsafe { overlapped2arc!(status.overlapped(), StreamIo, read) },
};
let mut me = me2.inner();
match mem::replace(&mut me.read, State::Empty) {
State::Pending(mut buf) => {
trace!("finished a read: {}", status.bytes_transferred());
unsafe {
buf.set_len(status.bytes_transferred() as usize);
}
me.read = State::Ready(Cursor::new(buf));
// If we transferred 0 bytes then be sure to indicate that hup
// happened.
let mut e = Ready::readable();
if status.bytes_transferred() == 0 {
trace!("tcp stream at hup: 0-byte read");
e = e | Ready::hup();
}
return me2.add_readiness(&mut me, e)
}
s => me.read = s,
}
// If a read didn't complete, then the connect must have just finished.
trace!("finished a connect");
match me2.inner.socket.connect_complete() {
Ok(()) => {
me2.add_readiness(&mut me, Ready::writable());
me2.schedule_read(&mut me);
}
Err(e) => {
me2.add_readiness(&mut me, Ready::readable());
me.read = State::Error(e);
}
}
}
fn write_done(status: &CompletionStatus) {
trace!("finished a write {}", status.bytes_transferred());
let me2 = StreamImp {
inner: unsafe { overlapped2arc!(status.overlapped(), StreamIo, write) },
};
let mut me = me2.inner();
let (buf, pos) = match mem::replace(&mut me.write, State::Empty) {
State::Pending(pair) => pair,
_ => unreachable!(),
};
let new_pos = pos + (status.bytes_transferred() as usize);
if new_pos == buf.len() {
me2.add_readiness(&mut me, Ready::writable());
} else {
me2.schedule_write(buf, new_pos, &mut me);
}
}
impl Evented for TcpStream {
fn register(&self, poll: &Poll, token: Token,
interest: Ready, opts: PollOpt) -> io::Result<()> {
let mut me = self.inner();
try!(me.iocp.register_socket(&self.imp.inner.socket, poll, token,
interest, opts, &self.registration));
// If we were connected before being registered process that request
// here and go along our merry ways. Note that the callback for a
// successful connect will worry about generating writable/readable
// events and scheduling a new read.
if let Some(addr) = me.deferred_connect.take() {
return self.imp.schedule_connect(&addr).map(|_| ())
}
self.post_register(interest, &mut me);
Ok(())
}
fn reregister(&self, poll: &Poll, token: Token,
interest: Ready, opts: PollOpt) -> io::Result<()> {
let mut me = self.inner();
try!(me.iocp.reregister_socket(&self.imp.inner.socket, poll, token,
interest, opts, &self.registration));
self.post_register(interest, &mut me);
Ok(())
}
fn deregister(&self, poll: &Poll) -> io::Result<()> {
self.inner().iocp.deregister(poll, &self.registration)
}
}
impl fmt::Debug for TcpStream {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
"TcpStream { ... }".fmt(f)
}
}
impl Drop for TcpStream {
fn drop(&mut self) {
// If we're still internally reading, we're no longer interested. Note
// though that we don't cancel any writes which may have been issued to
// preserve the same semantics as Unix.
//
// Note that "Empty" here may mean that a connect is pending, so we
// cancel even if that happens as well.
unsafe {
match self.inner().read {
State::Pending(_) | State::Empty => {
trace!("cancelling active TCP read");
drop(super::cancel(&self.imp.inner.socket,
&self.imp.inner.read));
}
State::Ready(_) | State::Error(_) => {}
}
}
}
}
impl TcpListener {
pub fn new(socket: net::TcpListener, addr: &SocketAddr)
-> io::Result<TcpListener> {
Ok(TcpListener::new_family(socket, match *addr {
SocketAddr::V4(..) => Family::V4,
SocketAddr::V6(..) => Family::V6,
}))
}
fn new_family(socket: net::TcpListener, family: Family) -> TcpListener {
TcpListener {
registration: Mutex::new(None),
imp: ListenerImp {
inner: FromRawArc::new(ListenerIo {
accept: Overlapped::new(accept_done),
family: family,
socket: socket,
inner: Mutex::new(ListenerInner {
iocp: Registration::new(),
accept: State::Empty,
accept_buf: AcceptAddrsBuf::new(),
}),
}),
},
}
}
pub fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
let mut me = self.inner();
let ret = match mem::replace(&mut me.accept, State::Empty) {
State::Empty => return Err(would_block()),
State::Pending(t) => {
me.accept = State::Pending(t);
return Err(would_block());
}
State::Ready((s, a)) => {
Ok((TcpStream::new(s, None), a))
}
State::Error(e) => Err(e),
};
self.imp.schedule_accept(&mut me);
return ret
}
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.imp.inner.socket.local_addr()
}
pub fn try_clone(&self) -> io::Result<TcpListener> {
self.imp.inner.socket.try_clone().map(|s| {
TcpListener::new_family(s, self.imp.inner.family)
})
}
pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
self.imp.inner.socket.set_only_v6(only_v6)
}
pub fn only_v6(&self) -> io::Result<bool> {
self.imp.inner.socket.only_v6()
}
pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
self.imp.inner.socket.set_ttl(ttl)
}
pub fn ttl(&self) -> io::Result<u32> {
self.imp.inner.socket.ttl()
}
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
self.imp.inner.socket.take_error()
}
fn inner(&self) -> MutexGuard<ListenerInner> {
self.imp.inner()
}
}
impl ListenerImp {
fn inner(&self) -> MutexGuard<ListenerInner> {
self.inner.inner.lock().unwrap()
}
fn schedule_accept(&self, me: &mut ListenerInner) {
match me.accept {
State::Empty => {}
_ => return
}
me.iocp.set_readiness(me.iocp.readiness() & !Ready::readable());
let res = match self.inner.family {
Family::V4 => TcpBuilder::new_v4(),
Family::V6 => TcpBuilder::new_v6(),
}.and_then(|builder| unsafe {
trace!("scheduling an accept");
self.inner.socket.accept_overlapped(&builder, &mut me.accept_buf,
self.inner.accept.get_mut())
});
match res {
Ok((socket, _)) => {
// see docs above on StreamImp.inner for rationale on forget
me.accept = State::Pending(socket);
mem::forget(self.clone());
}
Err(e) => {
me.accept = State::Error(e);
self.add_readiness(me, Ready::readable());
}
}
}
// See comments in StreamImp::push
fn add_readiness(&self, me: &mut ListenerInner, set: Ready) {
me.iocp.set_readiness(set | me.iocp.readiness());
}
}
fn accept_done(status: &CompletionStatus) {
let me2 = ListenerImp {
inner: unsafe { overlapped2arc!(status.overlapped(), ListenerIo, accept) },
};
let mut me = me2.inner();
let socket = match mem::replace(&mut me.accept, State::Empty) {
State::Pending(s) => s,
_ => unreachable!(),
};
trace!("finished an accept");
let result = me2.inner.socket.accept_complete(&socket).and_then(|()| {
me.accept_buf.parse(&me2.inner.socket)
}).and_then(|buf| {
buf.remote().ok_or_else(|| {
io::Error::new(ErrorKind::Other, "could not obtain remote address")
})
});
me.accept = match result {
Ok(remote_addr) => State::Ready((socket, remote_addr)),
Err(e) => State::Error(e),
};
me2.add_readiness(&mut me, Ready::readable());
}
impl Evented for TcpListener {
fn register(&self, poll: &Poll, token: Token,
interest: Ready, opts: PollOpt) -> io::Result<()> {
let mut me = self.inner();
try!(me.iocp.register_socket(&self.imp.inner.socket, poll, token,
interest, opts, &self.registration));
self.imp.schedule_accept(&mut me);
Ok(())
}
fn reregister(&self, poll: &Poll, token: Token,
interest: Ready, opts: PollOpt) -> io::Result<()> {
let mut me = self.inner();
try!(me.iocp.reregister_socket(&self.imp.inner.socket, poll, token,
interest, opts, &self.registration));
self.imp.schedule_accept(&mut me);
Ok(())
}
fn deregister(&self, poll: &Poll) -> io::Result<()> {
self.inner().iocp.deregister(poll, &self.registration)
}
}
impl fmt::Debug for TcpListener {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
"TcpListener { ... }".fmt(f)
}
}
impl Drop for TcpListener {
fn drop(&mut self) {
// If we're still internally reading, we're no longer interested.
unsafe {
match self.inner().accept {
State::Pending(_) => {
trace!("cancelling active TCP accept");
drop(super::cancel(&self.imp.inner.socket,
&self.imp.inner.accept));
}
State::Empty |
State::Ready(_) |
State::Error(_) => {}
}
}
}
}