Allow registration of custom handles on Windows

This commit intends to extend the functionality of mio on Windows to support
custom handles being registered with the internal IOCP object. This in turn
should unlock the ability to work with named pipes, filesystem changes, or any
other IOCP-enabled object on Windows. Named pipes are in particular quite
important as they're often a foundational IPC mechanism on Windows.

This support is provided by exporting two new types in a `windows` module. A
`Binding` serves as the ability to register with the actual IOCP port in an
`Evented` implementation. Internally the `Binding` keeps track of what it
was last associated with to implement IOCP semantics. This may one day be
possible to make a zero-sized-type.

The second type, `Overlapped`, is exported as a contract that all overlapped I/O
operations must be executed with this particular type. This ensures that after
an event is received from an IOCP object we know what to do with it. Essentially
this is just a `OVERLAPPED` with a function pointer after it.

Along the way this exposes the `winapi` crate as a public dependency of `mio`.
The `OVERLAPPED_ENTRY` and `OVERLAPPED` types in `winapi` are exposed through
the `Overlapped` type that mio itself exports.

I've implemented [bindings to named pipes][bindings] and I've also got a
proof-of-concept [process management library][tokio-process] using these
bindings. So far it seems that this support in mio is sufficient for building up
these applications, and it all appears to be working so far.

I personally see this as a much bigger committment on the mio side of things
than the Unix implementation. The `EventedFd` type on Unix is quite small and
minimal, but the `Overlapped` and `binding` types on Windows are just
pieces of a larger puzzle when dealing with overlapped operations. Questions
about ownership of I/O objects arise along with the method of handling
completion status notifications. For now this is essentially binding mio to
stick to at least the same strategy for handling IOCP for the 0.6 series. A
major version bump of mio could perhaps change these semantics, but it may be
difficult to do so.

It seems, though, that the Windows semantics are unlikely to change much in the
near future. The overhead seems to have essentially reached its limit ("bolting
readiness on completion") and otherwise the ownership management seems
negligible.

Closes #252
Closes #320
diff --git a/Cargo.toml b/Cargo.toml
index b5419fe..11a5095 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -29,7 +29,7 @@
 
 [target.'cfg(windows)'.dependencies]
 winapi = "0.2.1"
-miow   = "0.1.3"
+miow   = "0.1.4"
 kernel32-sys = "0.2"
 
 [dev-dependencies]
diff --git a/src/lib.rs b/src/lib.rs
index 213c0e0..79c954b 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -146,6 +146,62 @@
     };
 }
 
+/// Windows-only extensions to the mio crate.
+///
+/// Mio on windows is currently implemented with IOCP for a high-performance
+/// implementation of asynchronous I/O. Mio then provides TCP and UDP as sample
+/// bindings for the system to connect networking types to asynchronous I/O. On
+/// Unix this scheme is then also extensible to all other file descriptors with
+/// the `EventedFd` type, but on Windows no such analog is available. The
+/// purpose of this module, however, is to similarly provide a mechanism for
+/// foreign I/O types to get hooked up into the IOCP event loop.
+///
+/// This module provides two types for interfacing with a custom IOCP handle:
+///
+/// * `Binding` - this type is intended to govern binding with mio's `Poll`
+///   type. Each I/O object should contain an instance of `Binding` that's
+///   interfaced with for the implementation of the `Evented` trait. The
+///   `register`, `reregister`, and `deregister` methods for the `Evented` trait
+///   all have rough analogs with `Binding`.
+///
+///   Note that this type **does not handle readiness**. That is, this type does
+///   not handle whether sockets are readable/writable/etc. It's intended that
+///   IOCP types will internally manage this state with a `SetReadiness` type
+///   from the `poll` module. The `SetReadiness` is typically lazily created on
+///   the first time that `Evented::register` is called and then stored in the
+///   I/O object.
+///
+///   Also note that for types which represent streams of bytes the mio
+///   interface of *readiness* doesn't map directly to the Windows model of
+///   *completion*. This means that types will have to perform internal
+///   buffering to ensure that a readiness interface can be provided. For a
+///   sample implementation see the TCP/UDP modules in mio itself.
+///
+/// * `Overlapped` - this type is intended to be used as the concreate instances
+///   of the `OVERLAPPED` type that most win32 methods expect. It's crucial, for
+///   safety, that all asynchronous operations are initiated with an instance of
+///   `Overlapped` and not another instantiation of `OVERLAPPED`.
+///
+///   Mio's `Overlapped` type is created with a function pointer that receives
+///   a `OVERLAPPED_ENTRY` type when called. This `OVERLAPPED_ENTRY` type is
+///   defined in the `winapi` crate. Whenever a completion is posted to an IOCP
+///   object the `OVERLAPPED` that was signaled will be interpreted as
+///   `Overlapped` in the mio crate and this function pointer will be invoked.
+///   Through this function pointer, and through the `OVERLAPPED` pointer,
+///   implementations can handle management of I/O events.
+///
+/// When put together these two types enable custom Windows handles to be
+/// registered with mio's event loops. The `Binding` type is used to associate
+/// handles and the `Overlapped` type is used to execute I/O operations. When
+/// the I/O operations are completed a custom function pointer is called which
+/// typically modifies a `SetReadiness` set by `Evented` methods which will get
+/// later hooked into the mio event loop.
+#[cfg(windows)]
+pub mod windows {
+
+    pub use sys::{Overlapped, Binding};
+}
+
 // Conversion utilities
 mod convert {
     use std::time::Duration;
diff --git a/src/sys/mod.rs b/src/sys/mod.rs
index 9297f06..9638661 100644
--- a/src/sys/mod.rs
+++ b/src/sys/mod.rs
@@ -25,6 +25,8 @@
     TcpStream,
     TcpListener,
     UdpSocket,
+    Overlapped,
+    Binding,
 };
 
 #[cfg(windows)]
diff --git a/src/sys/windows/mod.rs b/src/sys/windows/mod.rs
index d73be4b..6d493ef 100644
--- a/src/sys/windows/mod.rs
+++ b/src/sys/windows/mod.rs
@@ -142,8 +142,6 @@
 use kernel32;
 use winapi;
 
-use self::selector::Overlapped;
-
 mod awakener;
 #[macro_use]
 mod selector;
@@ -153,7 +151,7 @@
 mod buffer_pool;
 
 pub use self::awakener::Awakener;
-pub use self::selector::{Events, Selector};
+pub use self::selector::{Events, Selector, Overlapped, Binding};
 pub use self::tcp::{TcpStream, TcpListener};
 pub use self::udp::UdpSocket;
 
@@ -169,8 +167,7 @@
 unsafe fn cancel(socket: &AsRawSocket,
                  overlapped: &Overlapped) -> io::Result<()> {
     let handle = socket.as_raw_socket() as winapi::HANDLE;
-    let overlapped = overlapped.get_mut().raw();
-    let ret = kernel32::CancelIoEx(handle, overlapped);
+    let ret = kernel32::CancelIoEx(handle, overlapped.as_mut_ptr());
     if ret == 0 {
         Err(io::Error::last_os_error())
     } else {
diff --git a/src/sys/windows/selector.rs b/src/sys/windows/selector.rs
index 6a592fb..bf74711 100644
--- a/src/sys/windows/selector.rs
+++ b/src/sys/windows/selector.rs
@@ -1,10 +1,12 @@
-use std::{cmp, io, mem, u32};
+use std::{cmp, io, u32};
 use std::cell::UnsafeCell;
 use std::os::windows::prelude::*;
 use std::sync::{Arc, Mutex};
 use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
 use std::time::Duration;
 
+use lazycell::AtomicLazyCell;
+
 use convert;
 use winapi::*;
 use miow;
@@ -13,7 +15,6 @@
 use event::{Event, Ready};
 use poll::{self, Poll};
 use sys::windows::buffer_pool::BufferPool;
-use sys::windows::from_raw_arc::FromRawArc;
 use {Token, PollOpt};
 
 /// Each Selector has a globally unique(ish) ID associated with it. This ID
@@ -93,11 +94,11 @@
             }
 
             let callback = unsafe {
-                (*(status.overlapped() as *mut Overlapped)).callback()
+                (*(status.overlapped() as *mut Overlapped)).callback
             };
 
             trace!("select; -> got overlapped");
-            callback(status);
+            callback(status.entry());
         }
 
         trace!("returning");
@@ -127,36 +128,169 @@
     }
 }
 
-/// A registration is stored in each I/O object which keeps track of how it is
-/// associated with a `Selector` above.
+// A registration is stored in each I/O object which keeps track of how it is
+// associated with a `Selector` above.
+//
+// Once associated with a `Selector`, a registration can never be un-associated
+// (due to IOCP requirements). This is actually implemented through the
+// `poll::Registration` and `poll::SetReadiness` APIs to keep track of all the
+// level/edge/filtering business.
+/// A `Binding` is embedded in all I/O objects associated with a `Poll`
+/// object.
 ///
-/// Once associated with a `Selector`, a registration can never be un-associated
-/// (due to IOCP requirements). This is actually implemented through the
-/// `poll::Registration` and `poll::SetReadiness` APIs to keep track of all the
-/// level/edge/filtering business.
-pub struct Registration {
-    inner: Option<RegistrationInner>,
+/// Each registration keeps track of which selector the I/O object is
+/// associated with, ensuring that implementations of `Evented` can be
+/// conformant for the various methods on Windows.
+///
+/// If you're working with custom IOCP-enabled objects then you'll want to
+/// ensure that one of these instances is stored in your object and used in the
+/// implementation of `Evented`.
+///
+/// For more information about how to use this see the `windows` module
+/// documentation in this crate.
+pub struct Binding {
+    selector: AtomicLazyCell<Arc<SelectorInner>>,
 }
 
-struct RegistrationInner {
-    set_readiness: poll::SetReadiness,
-    selector: Arc<SelectorInner>,
+impl Binding {
+    /// Creates a new blank binding ready to be inserted into an I/O
+    /// object.
+    ///
+    /// Won't actually do anything until associated with an `Poll` loop.
+    pub fn new() -> Binding {
+        Binding { selector: AtomicLazyCell::new() }
+    }
+
+    /// Registers a new handle with the `Poll` specified, also assigning the
+    /// `token` specified.
+    ///
+    /// This function is intended to be used as part of `Evented::register` for
+    /// custom IOCP objects. It will add the specified handle to the internal
+    /// IOCP object with the provided `token`. All future events generated by
+    /// the handled provided will be recieved by the `Poll`'s internal IOCP
+    /// object.
+    ///
+    /// # Unsafety
+    ///
+    /// This function is unsafe as the `Poll` instance has assumptions about
+    /// what the `OVERLAPPED` pointer used for each I/O operation looks like.
+    /// Specifically they must all be instances of the `Overlapped` type in
+    /// this crate. More information about this can be found on the
+    /// `windows` module in this crate.
+    pub unsafe fn register_handle(&self,
+                                  handle: &AsRawHandle,
+                                  token: Token,
+                                  poll: &Poll) -> io::Result<()> {
+        let selector = poll::selector(poll);
+
+        // Ignore errors, we'll see them on the next line.
+        drop(self.selector.fill(selector.inner.clone()));
+        try!(self.check_same_selector(poll));
+
+        selector.inner.port.add_handle(usize::from(token), handle)
+    }
+
+    /// Same as `register_handle` but for sockets.
+    pub unsafe fn register_socket(&self,
+                                  handle: &AsRawSocket,
+                                  token: Token,
+                                  poll: &Poll) -> io::Result<()> {
+        let selector = poll::selector(poll);
+        drop(self.selector.fill(selector.inner.clone()));
+        try!(self.check_same_selector(poll));
+        selector.inner.port.add_socket(usize::from(token), handle)
+    }
+
+    /// Reregisters the handle provided from the `Poll` provided.
+    ///
+    /// This is intended to be used as part of `Evented::reregister` but note
+    /// that this function does not currently reregister the provided handle
+    /// with the `poll` specified. IOCP has a special binding for changing the
+    /// token which has not yet been implemented. Instead this function should
+    /// be used to assert that the call to `reregister` happened on the same
+    /// `Poll` that was passed into to `register`.
+    ///
+    /// Eventually, though, the provided `handle` will be re-assigned to have
+    /// the token `token` on the given `poll` assuming that it's been
+    /// previously registered with it.
+    ///
+    /// # Unsafety
+    ///
+    /// This function is unsafe for similar reasons to `register`. That is,
+    /// there may be pending I/O events and such which aren't handled correctly.
+    pub unsafe fn reregister_handle(&self,
+                                    _handle: &AsRawHandle,
+                                    _token: Token,
+                                    poll: &Poll) -> io::Result<()> {
+        self.check_same_selector(poll)
+    }
+
+    /// Same as `reregister_handle`, but for sockets.
+    pub unsafe fn reregister_socket(&self,
+                                    _socket: &AsRawSocket,
+                                    _token: Token,
+                                    poll: &Poll) -> io::Result<()> {
+        self.check_same_selector(poll)
+    }
+
+    /// Deregisters the handle provided from the `Poll` provided.
+    ///
+    /// This is intended to be used as part of `Evented::deregister` but note
+    /// that this function does not currently deregister the provided handle
+    /// from the `poll` specified. IOCP has a special binding for that which has
+    /// not yet been implemented. Instead this function should be used to assert
+    /// that the call to `deregister` happened on the same `Poll` that was
+    /// passed into to `register`.
+    ///
+    /// # Unsafety
+    ///
+    /// This function is unsafe for similar reasons to `register`. That is,
+    /// there may be pending I/O events and such which aren't handled correctly.
+    pub unsafe fn deregister_handle(&self,
+                                    _handle: &AsRawHandle,
+                                    poll: &Poll) -> io::Result<()> {
+        self.check_same_selector(poll)
+    }
+
+    /// Same as `deregister_handle`, but for sockets.
+    pub unsafe fn deregister_socket(&self,
+                                    _socket: &AsRawSocket,
+                                    poll: &Poll) -> io::Result<()> {
+        self.check_same_selector(poll)
+    }
+
+    fn check_same_selector(&self, poll: &Poll) -> io::Result<()> {
+        let selector = poll::selector(poll);
+        match self.selector.borrow() {
+            Some(prev) if prev.identical(&selector.inner) => Ok(()),
+            Some(_) |
+            None => Err(other("socket already registered")),
+        }
+    }
 }
 
-impl Registration {
-    /// Creates a new blank registration ready to be inserted into an I/O object.
+/// Helper struct used for TCP and UDP which bundles a `binding` with a
+/// `SetReadiness` handle.
+pub struct ReadyBinding {
+    binding: Binding,
+    readiness: Option<poll::SetReadiness>,
+}
+
+impl ReadyBinding {
+    /// Creates a new blank binding ready to be inserted into an I/O object.
     ///
     /// Won't actually do anything until associated with an `Selector` loop.
-    pub fn new() -> Registration {
-        Registration {
-            inner: None,
+    pub fn new() -> ReadyBinding {
+        ReadyBinding {
+            binding: Binding::new(),
+            readiness: None,
         }
     }
 
-    /// Returns whether this registration has been associated with a selector
+    /// Returns whether this binding has been associated with a selector
     /// yet.
     pub fn registered(&self) -> bool {
-        self.inner.is_some()
+        self.readiness.is_some()
     }
 
     /// Acquires a buffer with at least `size` capacity.
@@ -165,19 +299,19 @@
     /// that buffer pool. If not associated with a selector, this will allocate
     /// a fresh buffer.
     pub fn get_buffer(&self, size: usize) -> Vec<u8> {
-        match self.inner {
-            Some(ref i) => i.selector.buffers.lock().unwrap().get(size),
+        match self.binding.selector.borrow() {
+            Some(i) => i.buffers.lock().unwrap().get(size),
             None => Vec::with_capacity(size),
         }
     }
 
-    /// Returns a buffer to this registration.
+    /// Returns a buffer to this binding.
     ///
     /// If associated with a selector, this will push the buffer back into the
     /// selector's pool of buffers. Otherwise this will just drop the buffer.
     pub fn put_buffer(&self, buf: Vec<u8>) {
-        if let Some(ref i) = self.inner {
-            i.selector.buffers.lock().unwrap().put(buf);
+        if let Some(i) = self.binding.selector.borrow() {
+            i.buffers.lock().unwrap().put(buf);
         }
     }
 
@@ -187,10 +321,9 @@
     /// that this is all implemented through the `SetReadiness` structure in the
     /// `poll` module.
     pub fn set_readiness(&self, set: Ready) {
-        if let Some(ref i) = self.inner {
+        if let Some(ref i) = self.readiness {
             trace!("set readiness to {:?}", set);
-            let s = &i.set_readiness;
-            s.set_readiness(set).expect("event loop disappeared?");
+            i.set_readiness(set).expect("event loop disappeared?");
         }
     }
 
@@ -198,8 +331,8 @@
     ///
     /// This is what's being used to generate events returned by `poll`.
     pub fn readiness(&self) -> Ready {
-        match self.inner {
-            Some(ref i) => i.set_readiness.readiness(),
+        match self.readiness {
+            Some(ref i) => i.readiness(),
             None => Ready::none(),
         }
     }
@@ -213,42 +346,14 @@
                            socket: &AsRawSocket,
                            poll: &Poll,
                            token: Token,
-                           interest: Ready,
+                           events: Ready,
                            opts: PollOpt,
                            registration: &Mutex<Option<poll::Registration>>)
                            -> io::Result<()> {
-        trace!("register {:?} {:?}", token, interest);
-        try!(self.associate(poll, token, interest, opts, registration));
-        let selector = poll::selector(poll);
-        try!(selector.inner.port.add_socket(usize::from(token), socket));
-        Ok(())
-    }
-
-    /// Implementation of `Evented::reregister` function.
-    pub fn reregister_socket(&mut self,
-                             _socket: &AsRawSocket,
-                             poll: &Poll,
-                             token: Token,
-                             interest: Ready,
-                             opts: PollOpt,
-                             registration: &Mutex<Option<poll::Registration>>)
-                             -> io::Result<()> {
-        trace!("reregister {:?} {:?}", token, interest);
-        if self.inner.is_none() {
-            return Err(other("cannot reregister unregistered socket"))
+        trace!("register {:?} {:?}", token, events);
+        unsafe {
+            try!(self.binding.register_socket(socket, token, poll));
         }
-        try!(self.associate(poll, token, interest, opts, registration));
-        Ok(())
-    }
-
-    fn associate(&mut self,
-                 poll: &Poll,
-                 token: Token,
-                 events: Ready,
-                 opts: PollOpt,
-                 registration: &Mutex<Option<poll::Registration>>)
-                 -> io::Result<()> {
-        let selector = poll::selector(poll);
 
         // To keep the same semantics as epoll, if I/O objects are interested in
         // being readable then they're also interested in listening for hup
@@ -257,37 +362,36 @@
         }  else {
             events
         };
+        let (r, s) = poll::Registration::new(poll, token, events, opts);
+        self.readiness = Some(s);
+        *registration.lock().unwrap() = Some(r);
+        Ok(())
+    }
 
-        match self.inner {
-            // Ensure that we're only ever associated with at most one event
-            // loop. IOCP doesn't allow a handle to ever be associated with more
-            // than one event loop.
-            Some(ref i) if !i.selector.identical(&selector.inner) => {
-                return Err(other("socket already registered"));
-            }
-
-            // If we're already registered, then just update the existing
-            // registration.
-            Some(_) => {
-                let registration = registration.lock().unwrap();
-                let registration = registration.as_ref().unwrap();
-                trace!("updating existing registration node");
-                registration.update(poll, token, events, opts)
-            }
-
-            // Create a new registration and we'll soon be added to the
-            // completion port for IOCP as well.
-            None => {
-                trace!("allocating new registration node");
-                let (r, s) = poll::Registration::new(poll, token, events, opts);
-                self.inner = Some(RegistrationInner {
-                    set_readiness: s,
-                    selector: selector.inner.clone(),
-                });
-                *registration.lock().unwrap() = Some(r);
-                Ok(())
-            }
+    /// Implementation of `Evented::reregister` function.
+    pub fn reregister_socket(&mut self,
+                             socket: &AsRawSocket,
+                             poll: &Poll,
+                             token: Token,
+                             events: Ready,
+                             opts: PollOpt,
+                             registration: &Mutex<Option<poll::Registration>>)
+                             -> io::Result<()> {
+        trace!("reregister {:?} {:?}", token, events);
+        unsafe {
+            try!(self.binding.reregister_socket(socket, token, poll));
         }
+
+        // To keep the same semantics as epoll, if I/O objects are interested in
+        // being readable then they're also interested in listening for hup
+        let events = if events.is_readable() {
+            events | Ready::hup()
+        }  else {
+            events
+        };
+        registration.lock().unwrap()
+                    .as_mut().unwrap()
+                    .update(poll, token, events, opts)
     }
 
     /// Implementation of the `Evented::deregister` function.
@@ -295,23 +399,18 @@
     /// Doesn't allow registration with another event loop, just shuts down
     /// readiness notifications and such.
     pub fn deregister(&mut self,
+                      socket: &AsRawSocket,
                       poll: &Poll,
                       registration: &Mutex<Option<poll::Registration>>)
                       -> io::Result<()> {
         trace!("deregistering");
-        let selector = poll::selector(poll);
-        match self.inner {
-            Some(ref mut i) => {
-                let registration = registration.lock().unwrap();
-                let registration = registration.as_ref().unwrap();
-                if !selector.inner.identical(&i.selector) {
-                    return Err(other("socket already registered"));
-                }
-                try!(registration.deregister(poll));
-                Ok(())
-            }
-            None => Err(other("socket not registered")),
+        unsafe {
+            try!(self.binding.deregister_socket(socket, poll));
         }
+
+        registration.lock().unwrap()
+                    .as_ref().unwrap()
+                    .deregister(poll)
     }
 }
 
@@ -361,10 +460,11 @@
 }
 
 macro_rules! overlapped2arc {
-    ($e:expr, $t:ty, $($field:ident).+) => (
-        ::sys::windows::selector::Overlapped::cast_to_arc::<$t>($e,
-                offset_of!($t, $($field).+))
-    )
+    ($e:expr, $t:ty, $($field:ident).+) => ({
+        let offset = offset_of!($t, $($field).+);
+        debug_assert!(offset < mem::size_of::<$t>());
+        FromRawArc::from_raw(($e as usize - offset) as *mut $t)
+    })
 }
 
 macro_rules! offset_of {
@@ -373,42 +473,50 @@
     )
 }
 
-pub type Callback = fn(&CompletionStatus);
-
-/// See sys::windows module docs for why this exists.
+// See sys::windows module docs for why this exists.
+//
+// The gist of it is that `Selector` assumes that all `OVERLAPPED` pointers are
+// actually inside one of these structures so it can use the `Callback` stored
+// right after it.
+//
+// We use repr(C) here to ensure that we can assume the overlapped pointer is
+// at the start of the structure so we can just do a cast.
+/// A wrapper around an internal instance over `miow::Overlapped` which is in
+/// turn a wrapper around the Windows type `OVERLAPPED`.
 ///
-/// The gist of it is that `Selector` assumes that all `OVERLAPPED` pointers are
-/// actually inside one of these structures so it can use the `Callback` stored
-/// right after it.
-///
-/// We use repr(C) here to ensure that we can assume the overlapped pointer is
-/// at the start of the structure so we can just do a cast.
+/// This type is required to be used for all IOCP operations on handles that are
+/// registered with an event loop. The event loop will receive notifications
+/// over `OVERLAPPED` pointers that have completed, and it will cast that
+/// pointer to a pointer to this structure and invoke the associated callback.
 #[repr(C)]
 pub struct Overlapped {
     inner: UnsafeCell<miow::Overlapped>,
-    callback: Callback,
+    callback: fn(&OVERLAPPED_ENTRY),
 }
 
 impl Overlapped {
-    pub fn new(cb: Callback) -> Overlapped {
+    /// Creates a new `Overlapped` which will invoke the provided `cb` callback
+    /// whenever it's triggered.
+    ///
+    /// The returned `Overlapped` must be used as the `OVERLAPPED` passed to all
+    /// I/O operations that are registered with mio's event loop. When the I/O
+    /// operation associated with an `OVERLAPPED` pointer completes the event
+    /// loop will invoke the function pointer provided by `cb`.
+    pub fn new(cb: fn(&OVERLAPPED_ENTRY)) -> Overlapped {
         Overlapped {
             inner: UnsafeCell::new(miow::Overlapped::zero()),
             callback: cb,
         }
     }
 
-    pub unsafe fn get_mut(&self) -> &mut miow::Overlapped {
-        &mut *self.inner.get()
-    }
-
-    pub unsafe fn cast_to_arc<T>(overlapped: *mut miow::Overlapped,
-                                 offset: usize) -> FromRawArc<T> {
-        debug_assert!(offset < mem::size_of::<T>());
-        FromRawArc::from_raw((overlapped as usize - offset) as *mut T)
-    }
-
-    pub unsafe fn callback(&self) -> &Callback {
-        &self.callback
+    /// Get the underlying `Overlapped` instance as a raw pointer.
+    ///
+    /// This can be useful when only a shared borrow is held and the overlapped
+    /// pointer needs to be passed down to winapi.
+    pub fn as_mut_ptr(&self) -> *mut OVERLAPPED {
+        unsafe {
+            (*self.inner.get()).raw()
+        }
     }
 }
 
diff --git a/src/sys/windows/tcp.rs b/src/sys/windows/tcp.rs
index ffdcafc..dea9af3 100644
--- a/src/sys/windows/tcp.rs
+++ b/src/sys/windows/tcp.rs
@@ -4,7 +4,7 @@
 use std::net::{self, SocketAddr};
 use std::sync::{Mutex, MutexGuard};
 
-use io::would_block;
+use miow;
 use miow::iocp::CompletionStatus;
 use miow::net::*;
 use net2::{TcpBuilder, TcpStreamExt as Net2TcpExt};
@@ -12,9 +12,10 @@
 use winapi::*;
 
 use {Evented, Ready, Poll, PollOpt, Token};
+use io::would_block;
 use poll;
 use sys::windows::from_raw_arc::FromRawArc;
-use sys::windows::selector::{Overlapped, Registration};
+use sys::windows::selector::{Overlapped, ReadyBinding};
 use sys::windows::{wouldblock, Family};
 
 pub struct TcpStream {
@@ -66,14 +67,14 @@
 }
 
 struct StreamInner {
-    iocp: Registration,
+    iocp: ReadyBinding,
     deferred_connect: Option<SocketAddr>,
     read: State<(), ()>,
     write: State<(Vec<u8>, usize), (Vec<u8>, usize)>,
 }
 
 struct ListenerInner {
-    iocp: Registration,
+    iocp: ReadyBinding,
     accept: State<net::TcpStream, (net::TcpStream, SocketAddr)>,
     accept_buf: AcceptAddrsBuf,
 }
@@ -96,7 +97,7 @@
                     write: Overlapped::new(write_done),
                     socket: socket,
                     inner: Mutex::new(StreamInner {
-                        iocp: Registration::new(),
+                        iocp: ReadyBinding::new(),
                         deferred_connect: deferred_connect,
                         read: State::Empty,
                         write: State::Empty,
@@ -244,8 +245,8 @@
     fn schedule_connect(&self, addr: &SocketAddr) -> io::Result<()> {
         unsafe {
             trace!("scheduling a connect");
-            try!(self.inner.socket.connect_overlapped(addr,
-                                                      self.inner.read.get_mut()));
+            let overlapped = miow::Overlapped::from_raw(self.inner.read.as_mut_ptr());
+            try!(self.inner.socket.connect_overlapped(addr, overlapped));
         }
         // see docs above on StreamImp.inner for rationale on forget
         mem::forget(self.clone());
@@ -273,8 +274,8 @@
 
         trace!("scheduling a read");
         let res = unsafe {
-            self.inner.socket.read_overlapped(&mut [],
-                                              self.inner.read.get_mut())
+            let overlapped = miow::Overlapped::from_raw(self.inner.read.as_mut_ptr());
+            self.inner.socket.read_overlapped(&mut [], overlapped)
         };
         match res {
             // TODO: investigate better handling `Ok(true)`
@@ -336,8 +337,8 @@
 
         trace!("scheduling a write");
         let err = unsafe {
-            self.inner.socket.write_overlapped(&buf[pos..],
-                                               self.inner.write.get_mut())
+            let overlapped = miow::Overlapped::from_raw(self.inner.write.as_mut_ptr());
+            self.inner.socket.write_overlapped(&buf[pos..], overlapped)
         };
         match err {
             Ok(_) => {
@@ -363,7 +364,8 @@
     }
 }
 
-fn read_done(status: &CompletionStatus) {
+fn read_done(status: &OVERLAPPED_ENTRY) {
+    let status = CompletionStatus::from_entry(status);
     let me2 = StreamImp {
         inner: unsafe { overlapped2arc!(status.overlapped(), StreamIo, read) },
     };
@@ -394,7 +396,8 @@
     }
 }
 
-fn write_done(status: &CompletionStatus) {
+fn write_done(status: &OVERLAPPED_ENTRY) {
+    let status = CompletionStatus::from_entry(status);
     trace!("finished a write {}", status.bytes_transferred());
     let me2 = StreamImp {
         inner: unsafe { overlapped2arc!(status.overlapped(), StreamIo, write) },
@@ -440,7 +443,8 @@
     }
 
     fn deregister(&self, poll: &Poll) -> io::Result<()> {
-        self.inner().iocp.deregister(poll, &self.registration)
+        self.inner().iocp.deregister(&self.imp.inner.socket,
+                                     poll, &self.registration)
     }
 }
 
@@ -489,7 +493,7 @@
                     family: family,
                     socket: socket,
                     inner: Mutex::new(ListenerInner {
-                        iocp: Registration::new(),
+                        iocp: ReadyBinding::new(),
                         accept: State::Empty,
                         accept_buf: AcceptAddrsBuf::new(),
                     }),
@@ -572,8 +576,9 @@
             Family::V6 => TcpBuilder::new_v6(),
         }.and_then(|builder| unsafe {
             trace!("scheduling an accept");
+            let overlapped = miow::Overlapped::from_raw(self.inner.accept.as_mut_ptr());
             self.inner.socket.accept_overlapped(&builder, &mut me.accept_buf,
-                                                self.inner.accept.get_mut())
+                                                overlapped)
         });
         match res {
             Ok((socket, _)) => {
@@ -594,7 +599,8 @@
     }
 }
 
-fn accept_done(status: &CompletionStatus) {
+fn accept_done(status: &OVERLAPPED_ENTRY) {
+    let status = CompletionStatus::from_entry(status);
     let me2 = ListenerImp {
         inner: unsafe { overlapped2arc!(status.overlapped(), ListenerIo, accept) },
     };
@@ -639,7 +645,8 @@
     }
 
     fn deregister(&self, poll: &Poll) -> io::Result<()> {
-        self.inner().iocp.deregister(poll, &self.registration)
+        self.inner().iocp.deregister(&self.imp.inner.socket,
+                                     poll, &self.registration)
     }
 }
 
diff --git a/src/sys/windows/udp.rs b/src/sys/windows/udp.rs
index d90be9e..6dd0346 100644
--- a/src/sys/windows/udp.rs
+++ b/src/sys/windows/udp.rs
@@ -13,6 +13,7 @@
 #[allow(unused_imports)]
 use net2::{UdpBuilder, UdpSocketExt};
 use winapi::*;
+use miow;
 use miow::iocp::CompletionStatus;
 use miow::net::SocketAddrBuf;
 use miow::net::UdpSocketExt as MiowUdpSocketExt;
@@ -20,7 +21,7 @@
 use {Evented, Ready, Poll, PollOpt, Token};
 use poll;
 use sys::windows::from_raw_arc::FromRawArc;
-use sys::windows::selector::{Overlapped, Registration};
+use sys::windows::selector::{Overlapped, ReadyBinding};
 
 pub struct UdpSocket {
     imp: Imp,
@@ -40,7 +41,7 @@
 }
 
 struct Inner {
-    iocp: Registration,
+    iocp: ReadyBinding,
     read: State<Vec<u8>, Vec<u8>>,
     write: State<Vec<u8>, (Vec<u8>, usize)>,
     read_buf: SocketAddrBuf,
@@ -63,7 +64,7 @@
                     write: Overlapped::new(send_done),
                     socket: socket,
                     inner: Mutex::new(Inner {
-                        iocp: Registration::new(),
+                        iocp: ReadyBinding::new(),
                         read: State::Empty,
                         write: State::Empty,
                         read_buf: SocketAddrBuf::new(),
@@ -108,8 +109,9 @@
         let amt = try!(owned_buf.write(buf));
         try!(unsafe {
             trace!("scheduling a send");
+            let overlapped = miow::Overlapped::from_raw(self.imp.inner.write.as_mut_ptr());
             self.imp.inner.socket.send_to_overlapped(&owned_buf, target,
-                                                     self.imp.inner.write.get_mut())
+                                                     overlapped)
         });
         me.write = State::Pending(owned_buf);
         mem::forget(self.imp.clone());
@@ -252,8 +254,9 @@
             trace!("scheduling a read");
             let cap = buf.capacity();
             buf.set_len(cap);
+            let overlapped = miow::Overlapped::from_raw(self.inner.read.as_mut_ptr());
             self.inner.socket.recv_from_overlapped(&mut buf, &mut me.read_buf,
-                                                   self.inner.read.get_mut())
+                                                   overlapped)
         };
         match res {
             Ok(_) => {
@@ -296,7 +299,8 @@
     }
 
     fn deregister(&self, poll: &Poll) -> io::Result<()> {
-        self.inner().iocp.deregister(poll, &self.registration)
+        self.inner().iocp.deregister(&self.imp.inner.socket,
+                                     poll, &self.registration)
     }
 }
 
@@ -327,7 +331,8 @@
     }
 }
 
-fn send_done(status: &CompletionStatus) {
+fn send_done(status: &OVERLAPPED_ENTRY) {
+    let status = CompletionStatus::from_entry(status);
     trace!("finished a send {}", status.bytes_transferred());
     let me2 = Imp {
         inner: unsafe { overlapped2arc!(status.overlapped(), Io, write) },
@@ -337,7 +342,8 @@
     me2.add_readiness(&mut me, Ready::writable());
 }
 
-fn recv_done(status: &CompletionStatus) {
+fn recv_done(status: &OVERLAPPED_ENTRY) {
+    let status = CompletionStatus::from_entry(status);
     trace!("finished a recv {}", status.bytes_transferred());
     let me2 = Imp {
         inner: unsafe { overlapped2arc!(status.overlapped(), Io, read) },