Fix error handling in NamedPipe::write
diff --git a/src/sys/windows/named_pipe.rs b/src/sys/windows/named_pipe.rs
index a5688ce..8c81f38 100644
--- a/src/sys/windows/named_pipe.rs
+++ b/src/sys/windows/named_pipe.rs
@@ -1,6 +1,6 @@
-use crate::{poll, Registry};
 use crate::event::Source;
 use crate::sys::windows::{Event, Overlapped};
+use crate::{poll, Registry};
 use winapi::um::minwinbase::OVERLAPPED_ENTRY;
 
 use std::ffi::OsStr;
@@ -9,8 +9,8 @@
 use std::mem;
 use std::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
 use std::slice;
-use std::sync::atomic::{AtomicUsize, AtomicBool};
 use std::sync::atomic::Ordering::{Relaxed, SeqCst};
+use std::sync::atomic::{AtomicBool, AtomicUsize};
 use std::sync::{Arc, Mutex};
 
 use crate::{Interest, Token};
@@ -128,9 +128,7 @@
 impl NamedPipe {
     /// Creates a new named pipe at the specified `addr` given a "reasonable
     /// set" of initial configuration options.
-    pub fn new<A: AsRef<OsStr>>(
-        addr: A,
-    ) -> io::Result<NamedPipe> {
+    pub fn new<A: AsRef<OsStr>>(addr: A) -> io::Result<NamedPipe> {
         let pipe = pipe::NamedPipe::new(addr)?;
         // Safety: nothing actually unsafe about this. The trait fn includes
         // `unsafe`.
@@ -226,9 +224,7 @@
 }
 
 impl FromRawHandle for NamedPipe {
-    unsafe fn from_raw_handle(
-        handle: RawHandle,
-    ) -> NamedPipe {
+    unsafe fn from_raw_handle(handle: RawHandle) -> NamedPipe {
         NamedPipe {
             inner: Arc::new(Inner {
                 // Safety: not really unsafe
@@ -281,9 +277,7 @@
         match mem::replace(&mut state.read, State::None) {
             // In theory not possible with `token` checked above,
             // but return would block for now.
-            State::None => {
-                Err(would_block())
-            }
+            State::None => Err(would_block()),
 
             // A read is in flight, still waiting for it to finish
             State::Pending(buf, amt) => {
@@ -324,7 +318,7 @@
 }
 
 impl<'a> Write for &'a NamedPipe {
-    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {        
+    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
         // Make sure there's no writes pending
         let mut io = self.inner.io.lock().unwrap();
 
@@ -334,6 +328,12 @@
 
         match io.write {
             State::None => {}
+            State::Err(_) => match mem::replace(&mut io.write, State::None) {
+                State::Err(e) => return Err(e),
+                // `io` is locked, so this branch is unreachable
+                _ => unreachable!(),
+            },
+            // any other state should be handled in `write_done`
             _ => {
                 return Err(would_block());
             }
@@ -342,17 +342,26 @@
         // Move `buf` onto the heap and fire off the write
         let mut owned_buf = self.inner.get_buffer();
         owned_buf.extend(buf);
-        Inner::schedule_write(&self.inner, owned_buf, 0, &mut io, None);
-        Ok(buf.len())
+        match Inner::maybe_schedule_write(&self.inner, owned_buf, 0, &mut io)? {
+            // Some bytes are written immediately
+            Some(n) => Ok(n),
+            // Write operation is anqueued for whole buffer
+            None => Ok(buf.len()),
+        }
     }
 
-    fn flush(&mut self) -> io::Result<()> {      
-        Ok(())  
+    fn flush(&mut self) -> io::Result<()> {
+        Ok(())
     }
 }
 
 impl Source for NamedPipe {
-    fn register(&mut self, registry: &Registry, token: Token, interest: Interest) -> io::Result<()> {
+    fn register(
+        &mut self,
+        registry: &Registry,
+        token: Token,
+        interest: Interest,
+    ) -> io::Result<()> {
         let mut io = self.inner.io.lock().unwrap();
 
         io.check_association(registry, false)?;
@@ -368,7 +377,10 @@
             io.cp = Some(poll::selector(registry).clone_port());
 
             let inner_token = NEXT_TOKEN.fetch_add(2, Relaxed) + 2;
-            poll::selector(registry).inner.cp.add_handle(inner_token, &self.inner.handle)?;
+            poll::selector(registry)
+                .inner
+                .cp
+                .add_handle(inner_token, &self.inner.handle)?;
         }
 
         io.token = Some(token);
@@ -381,7 +393,12 @@
         Ok(())
     }
 
-    fn reregister(&mut self, registry: &Registry, token: Token, interest: Interest) -> io::Result<()> { 
+    fn reregister(
+        &mut self,
+        registry: &Registry,
+        token: Token,
+        interest: Interest,
+    ) -> io::Result<()> {
         let mut io = self.inner.io.lock().unwrap();
 
         io.check_association(registry, true)?;
@@ -491,19 +508,61 @@
         }
     }
 
-    fn schedule_write(me: &Arc<Inner>, buf: Vec<u8>, pos: usize, io: &mut Io, events: Option<&mut Vec<Event>>) {
+    /// Maybe schedules overlapped write operation.
+    ///
+    /// * `None` means that overlapped operation was enqueued
+    /// * `Some(n)` means that `n` bytes was immediately written.
+    ///   Note, that `write_done` will fire anyway to clean up the state.
+    fn maybe_schedule_write(
+        me: &Arc<Inner>,
+        buf: Vec<u8>,
+        pos: usize,
+        io: &mut Io,
+    ) -> io::Result<Option<usize>> {
         // Very similar to `schedule_read` above, just done for the write half.
         let e = unsafe {
             let overlapped = me.write.as_ptr() as *mut _;
             me.handle.write_overlapped(&buf[pos..], overlapped)
         };
 
+        // See `connect` above for the rationale behind `forget`
         match e {
-            // See `connect` above for the rationale behind `forget`
-            Ok(_) => {
-                io.write = State::Pending(buf, pos);
-                mem::forget(me.clone())
+            // `n` bytes are written immediately
+            Ok(Some(n)) => {
+                io.write = State::Ok(buf, pos);
+                mem::forget(me.clone());
+                Ok(Some(n))
             }
+            // write operation is enqueued
+            Ok(None) => {
+                io.write = State::Pending(buf, pos);
+                mem::forget(me.clone());
+                Ok(None)
+            }
+            Err(e) => Err(e),
+        }
+    }
+
+    fn schedule_write(
+        me: &Arc<Inner>,
+        buf: Vec<u8>,
+        pos: usize,
+        io: &mut Io,
+        events: Option<&mut Vec<Event>>,
+    ) {
+        match Inner::maybe_schedule_write(me, buf, pos, io) {
+            Ok(Some(_)) => {
+                // immediate result will be handled in `write_done`,
+                // so we'll reinterpret the `Ok` state
+                let state = mem::replace(&mut io.write, State::None);
+                io.write = match state {
+                    State::Ok(buf, pos) => State::Pending(buf, pos),
+                    // io is locked, so this branch is unreachable
+                    _ => unreachable!(),
+                };
+                mem::forget(me.clone());
+            }
+            Ok(None) => (),
             Err(e) => {
                 io.write = State::Err(e);
                 io.notify_writable(events);
@@ -610,6 +669,12 @@
     // then we're writable again and otherwise we schedule another write.
     let mut io = me.io.lock().unwrap();
     let (buf, pos) = match mem::replace(&mut io.write, State::None) {
+        // `Ok` here means, that the operation was completed immediately
+        // `bytes_transferred` is already reported to a client
+        State::Ok(..) => {
+            io.notify_writable(events);
+            return;
+        }
         State::Pending(buf, pos) => (buf, pos),
         _ => unreachable!(),
     };
@@ -638,18 +703,14 @@
 impl Io {
     fn check_association(&self, registry: &Registry, required: bool) -> io::Result<()> {
         match self.cp {
-            Some(ref cp) if !poll::selector(registry).same_port(cp) => {
-                Err(io::Error::new(
-                    io::ErrorKind::AlreadyExists,
-                    "I/O source already registered with a different `Registry`"
-                ))
-            }
-            None if required => {
-                Err(io::Error::new(
-                    io::ErrorKind::NotFound,
-                    "I/O source not registered with `Registry`"
-                ))
-            }
+            Some(ref cp) if !poll::selector(registry).same_port(cp) => Err(io::Error::new(
+                io::ErrorKind::AlreadyExists,
+                "I/O source already registered with a different `Registry`",
+            )),
+            None if required => Err(io::Error::new(
+                io::ErrorKind::NotFound,
+                "I/O source not registered with `Registry`",
+            )),
             _ => Ok(()),
         }
     }
diff --git a/tests/win_named_pipe.rs b/tests/win_named_pipe.rs
index b1a1913..5d4d302 100644
--- a/tests/win_named_pipe.rs
+++ b/tests/win_named_pipe.rs
@@ -9,6 +9,7 @@
 use mio::windows::NamedPipe;
 use mio::{Events, Interest, Poll, Token};
 use rand::Rng;
+use winapi::shared::winerror::*;
 use winapi::um::winbase::FILE_FLAG_OVERLAPPED;
 
 fn _assert_kinds() {
@@ -178,6 +179,44 @@
 }
 
 #[test]
+fn write_disconnected() {
+    let mut poll = t!(Poll::new());
+    let (mut server, mut client) = pipe();
+    t!(poll.registry().register(
+        &mut server,
+        Token(0),
+        Interest::READABLE | Interest::WRITABLE,
+    ));
+    t!(poll.registry().register(
+        &mut client,
+        Token(1),
+        Interest::READABLE | Interest::WRITABLE,
+    ));
+
+    drop(client);
+
+    let mut events = Events::with_capacity(128);
+    t!(poll.poll(&mut events, None));
+    assert!(events.iter().count() > 0);
+
+    // this should not hang
+    let mut i = 0;
+    loop {
+        i += 1;
+        assert!(i < 16, "too many iterations");
+
+        match server.write(&[0]) {
+            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
+                t!(poll.poll(&mut events, None));
+                assert!(events.iter().count() > 0);
+            }
+            Err(e) if e.raw_os_error() == Some(ERROR_NO_DATA as i32) => break,
+            e => panic!("{:?}", e),
+        }
+    }
+}
+
+#[test]
 fn write_then_drop() {
     let (mut server, mut client) = pipe();
     let mut poll = t!(Poll::new());