Fuchsia port (#629)

* Fuchsia Shell

* Selector

* Working selector and awakener

* A bunch more crap

* Impl UdpSocket and add comments

* UdpSocket fixup

* Initial TCP

* All tests passing

* Start cleanup

* Separate out modules

* Add EventedHandle and fuchsia_channel test

* Remove unused [feature]

* Add only v6 methods and fixup reregistration of EventedFds

* Remove dependency on concurrent_hashmap

* Don't overwrite existing tokens

* panic -> err

* Remove unnecessary pub restrictions and document unsafety

* Fix privacy error and convert panic on invalid token into err

* Use saturating ops for time modification

* Cleanup EventedFd registration

* Replace Events Option with Vec

* Use accessors rather than pub fields

* Clear up unnecessary repeating events on reregistered handles

* Fix orderings

* Cleanup

* Backcompat with rust 1.9
diff --git a/Cargo.toml b/Cargo.toml
index c7fb8dd..4a37e21 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -29,6 +29,10 @@
 net2     = "0.2.29"
 iovec    = "0.1.0"
 
+[target.'cfg(target_os = "fuchsia")'.dependencies]
+magenta = "0.1.0"
+magenta-sys = "0.1.0"
+
 [target.'cfg(unix)'.dependencies]
 libc   = "0.2.19"
 
diff --git a/src/deprecated/mod.rs b/src/deprecated/mod.rs
index a6002e4..124a2ee 100644
--- a/src/deprecated/mod.rs
+++ b/src/deprecated/mod.rs
@@ -5,7 +5,7 @@
 mod handler;
 mod notify;
 
-#[cfg(unix)]
+#[cfg(all(unix, not(target_os = "fuchsia")))]
 pub mod unix;
 
 pub use self::event_loop::{
@@ -24,7 +24,7 @@
 pub use self::notify::{
     NotifyError,
 };
-#[cfg(unix)]
+#[cfg(all(unix, not(target_os = "fuchsia")))]
 pub use self::unix::{
     pipe,
     PipeReader,
diff --git a/src/lib.rs b/src/lib.rs
index ca87ab8..2f26f60 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -85,6 +85,11 @@
 extern crate slab;
 extern crate iovec;
 
+#[cfg(target_os = "fuchsia")]
+extern crate magenta;
+#[cfg(target_os = "fuchsia")]
+extern crate magenta_sys;
+
 #[cfg(unix)]
 extern crate libc;
 
@@ -181,7 +186,7 @@
 #[doc(hidden)]
 pub use io::deprecated::would_block;
 
-#[cfg(unix)]
+#[cfg(all(unix, not(target_os = "fuchsia")))]
 pub mod unix {
     //! Unix only extensions
     pub use sys::{
@@ -190,6 +195,12 @@
     pub use sys::unix::UnixReady;
 }
 
+#[cfg(target_os = "fuchsia")]
+pub mod fuchsia {
+    //! Fuchsia-only extensions
+    pub use sys::EventedHandle;
+}
+
 /// Windows-only extensions to the mio crate.
 ///
 /// Mio on windows is currently implemented with IOCP for a high-performance
diff --git a/src/net/tcp.rs b/src/net/tcp.rs
index 936570f..f3f222b 100644
--- a/src/net/tcp.rs
+++ b/src/net/tcp.rs
@@ -59,6 +59,18 @@
 
 use std::net::Shutdown;
 
+// TODO: remove when fuchsia's set_nonblocking is fixed in libstd
+#[cfg(target_os = "fuchsia")]
+fn set_nonblocking(stream: &net::TcpStream) -> io::Result<()> {
+    sys::set_nonblock(
+        ::std::os::unix::io::AsRawFd::as_raw_fd(stream))
+}
+#[cfg(not(target_os = "fuchsia"))]
+fn set_nonblocking(stream: &net::TcpStream) -> io::Result<()> {
+    stream.set_nonblocking(true)
+}
+
+
 impl TcpStream {
     /// Create a new TCP stream and issue a non-blocking connect to the
     /// specified address.
@@ -119,7 +131,8 @@
     /// it should already be connected via some other means (be it manually, the
     /// net2 crate, or the standard library).
     pub fn from_stream(stream: net::TcpStream) -> io::Result<TcpStream> {
-        try!(stream.set_nonblocking(true));
+        try!(set_nonblocking(&stream));
+
         Ok(TcpStream {
             sys: sys::TcpStream::from_stream(stream),
             selector_id: SelectorId::new(),
@@ -618,24 +631,24 @@
  *
  */
 
-#[cfg(unix)]
+#[cfg(all(unix, not(target_os = "fuchsia")))]
 use std::os::unix::io::{IntoRawFd, AsRawFd, FromRawFd, RawFd};
 
-#[cfg(unix)]
+#[cfg(all(unix, not(target_os = "fuchsia")))]
 impl IntoRawFd for TcpStream {
     fn into_raw_fd(self) -> RawFd {
         self.sys.into_raw_fd()
     }
 }
 
-#[cfg(unix)]
+#[cfg(all(unix, not(target_os = "fuchsia")))]
 impl AsRawFd for TcpStream {
     fn as_raw_fd(&self) -> RawFd {
         self.sys.as_raw_fd()
     }
 }
 
-#[cfg(unix)]
+#[cfg(all(unix, not(target_os = "fuchsia")))]
 impl FromRawFd for TcpStream {
     unsafe fn from_raw_fd(fd: RawFd) -> TcpStream {
         TcpStream {
@@ -645,21 +658,21 @@
     }
 }
 
-#[cfg(unix)]
+#[cfg(all(unix, not(target_os = "fuchsia")))]
 impl IntoRawFd for TcpListener {
     fn into_raw_fd(self) -> RawFd {
         self.sys.into_raw_fd()
     }
 }
 
-#[cfg(unix)]
+#[cfg(all(unix, not(target_os = "fuchsia")))]
 impl AsRawFd for TcpListener {
     fn as_raw_fd(&self) -> RawFd {
         self.sys.as_raw_fd()
     }
 }
 
-#[cfg(unix)]
+#[cfg(all(unix, not(target_os = "fuchsia")))]
 impl FromRawFd for TcpListener {
     unsafe fn from_raw_fd(fd: RawFd) -> TcpListener {
         TcpListener {
diff --git a/src/net/udp.rs b/src/net/udp.rs
index 820a46a..01f13cb 100644
--- a/src/net/udp.rs
+++ b/src/net/udp.rs
@@ -293,24 +293,24 @@
  *
  */
 
-#[cfg(unix)]
+#[cfg(all(unix, not(target_os = "fuchsia")))]
 use std::os::unix::io::{IntoRawFd, AsRawFd, FromRawFd, RawFd};
 
-#[cfg(unix)]
+#[cfg(all(unix, not(target_os = "fuchsia")))]
 impl IntoRawFd for UdpSocket {
     fn into_raw_fd(self) -> RawFd {
         self.sys.into_raw_fd()
     }
 }
 
-#[cfg(unix)]
+#[cfg(all(unix, not(target_os = "fuchsia")))]
 impl AsRawFd for UdpSocket {
     fn as_raw_fd(&self) -> RawFd {
         self.sys.as_raw_fd()
     }
 }
 
-#[cfg(unix)]
+#[cfg(all(unix, not(target_os = "fuchsia")))]
 impl FromRawFd for UdpSocket {
     unsafe fn from_raw_fd(fd: RawFd) -> UdpSocket {
         UdpSocket {
diff --git a/src/poll.rs b/src/poll.rs
index 456f788..09575ff 100644
--- a/src/poll.rs
+++ b/src/poll.rs
@@ -3,15 +3,15 @@
 use std::{fmt, io, ptr, usize};
 use std::cell::UnsafeCell;
 use std::{mem, ops, isize};
-#[cfg(unix)]
+#[cfg(all(unix, not(target_os = "fuchsia")))]
 use std::os::unix::io::AsRawFd;
-#[cfg(unix)]
+#[cfg(all(unix, not(target_os = "fuchsia")))]
 use std::os::unix::io::RawFd;
 use std::sync::{Arc, Mutex, Condvar};
 use std::sync::atomic::{AtomicUsize, AtomicPtr, AtomicBool};
 use std::sync::atomic::Ordering::{self, Acquire, Release, AcqRel, Relaxed, SeqCst};
 use std::time::{Duration, Instant};
-#[cfg(unix)]
+#[cfg(all(unix, not(target_os = "fuchsia")))]
 use sys::unix::UnixReady;
 
 // Poll is backed by two readiness queues. The first is a system readiness queue
@@ -1109,13 +1109,13 @@
     }
 }
 
-#[cfg(unix)]
+#[cfg(all(unix, not(target_os = "fuchsia")))]
 fn registerable(interest: Ready) -> bool {
     let unixinterest = UnixReady::from(interest);
     unixinterest.is_readable() || unixinterest.is_writable() || unixinterest.is_aio()
 }
 
-#[cfg(not(unix))]
+#[cfg(not(all(unix, not(target_os = "fuchsia"))))]
 fn registerable(interest: Ready) -> bool {
     interest.is_readable() || interest.is_writable()
 }
@@ -1138,7 +1138,7 @@
     }
 }
 
-#[cfg(unix)]
+#[cfg(all(unix, not(target_os = "fuchsia")))]
 impl AsRawFd for Poll {
     fn as_raw_fd(&self) -> RawFd {
         self.selector.as_raw_fd()
@@ -2519,7 +2519,7 @@
 }
 
 #[test]
-#[cfg(unix)]
+#[cfg(all(unix, not(target_os = "fuchsia")))]
 pub fn as_raw_fd() {
     let poll = Poll::new().unwrap();
     assert!(poll.as_raw_fd() > 0);
diff --git a/src/sys/fuchsia/awakener.rs b/src/sys/fuchsia/awakener.rs
new file mode 100644
index 0000000..553e373
--- /dev/null
+++ b/src/sys/fuchsia/awakener.rs
@@ -0,0 +1,74 @@
+use {io, poll, Evented, Ready, Poll, PollOpt, Token};
+use sys::fuchsia::status_to_io_err;
+use magenta;
+use std::sync::{Arc, Mutex, Weak};
+
+pub struct Awakener {
+    /// Token and weak reference to the port on which Awakener was registered.
+    ///
+    /// When `Awakener::wakeup` is called, these are used to send a wakeup message to the port.
+    inner: Mutex<Option<(Token, Weak<magenta::Port>)>>,
+}
+
+impl Awakener {
+    /// Create a new `Awakener`.
+    pub fn new() -> io::Result<Awakener> {
+        Ok(Awakener {
+            inner: Mutex::new(None)
+        })
+    }
+
+    /// Send a wakeup signal to the `Selector` on which the `Awakener` was registered.
+    pub fn wakeup(&self) -> io::Result<()> {
+        let inner_locked = self.inner.lock().unwrap();
+        let &(token, ref weak_port) =
+            inner_locked.as_ref().expect("Called wakeup on unregistered awakener.");
+
+        let port = weak_port.upgrade().expect("Tried to wakeup a closed port.");
+
+        let status = 0; // arbitrary
+        let packet = magenta::Packet::from_user_packet(
+            token.0 as u64, status, magenta::UserPacket::from_u8_array([0; 32]));
+
+        port.queue(&packet).map_err(status_to_io_err)
+    }
+
+    pub fn cleanup(&self) {}
+}
+
+impl Evented for Awakener {
+    fn register(&self,
+                poll: &Poll,
+                token: Token,
+                _events: Ready,
+                _opts: PollOpt) -> io::Result<()>
+    {
+        let mut inner_locked = self.inner.lock().unwrap();
+        if inner_locked.is_some() {
+            panic!("Called register on already-registered Awakener.");
+        }
+        *inner_locked = Some((token, Arc::downgrade(poll::selector(poll).port())));
+
+        Ok(())
+    }
+
+    fn reregister(&self,
+                  poll: &Poll,
+                  token: Token,
+                  _events: Ready,
+                  _opts: PollOpt) -> io::Result<()>
+    {
+        let mut inner_locked = self.inner.lock().unwrap();
+        *inner_locked = Some((token, Arc::downgrade(poll::selector(poll).port())));
+
+        Ok(())
+    }
+
+    fn deregister(&self, _poll: &Poll) -> io::Result<()>
+    {
+        let mut inner_locked = self.inner.lock().unwrap();
+        *inner_locked = None;
+
+        Ok(())
+    }
+}
\ No newline at end of file
diff --git a/src/sys/fuchsia/eventedfd.rs b/src/sys/fuchsia/eventedfd.rs
new file mode 100644
index 0000000..33e5930
--- /dev/null
+++ b/src/sys/fuchsia/eventedfd.rs
@@ -0,0 +1,262 @@
+use {io, poll, Evented, Ready, Poll, PollOpt, Token};
+use libc;
+use magenta;
+use magenta::HandleBase;
+use sys::fuchsia::{DontDrop, poll_opts_to_wait_async, sys};
+use std::mem;
+use std::os::unix::io::RawFd;
+use std::sync::{Arc, Mutex};
+
+/// Properties of an `EventedFd`'s current registration
+#[derive(Debug)]
+pub struct EventedFdRegistration {
+    token: Token,
+    handle: DontDrop<magenta::Handle>,
+    rereg_signals: Option<(magenta::Signals, magenta::WaitAsyncOpts)>,
+}
+
+impl EventedFdRegistration {
+    unsafe fn new(token: Token,
+                  raw_handle: sys::mx_handle_t,
+                  rereg_signals: Option<(magenta::Signals, magenta::WaitAsyncOpts)>,
+                  ) -> Self
+    {
+        EventedFdRegistration {
+            token: token,
+            handle: DontDrop::new(magenta::Handle::from_raw(raw_handle)),
+            rereg_signals: rereg_signals
+        }
+    }
+
+    pub fn rereg_signals(&self) -> Option<(magenta::Signals, magenta::WaitAsyncOpts)> {
+        self.rereg_signals
+    }
+}
+
+/// An event-ed file descriptor. The file descriptor is owned by this structure.
+#[derive(Debug)]
+pub struct EventedFdInner {
+    /// Properties of the current registration.
+    registration: Mutex<Option<EventedFdRegistration>>,
+
+    /// Owned file descriptor.
+    ///
+    /// `fd` is closed on `Drop`, so modifying `fd` is a memory-unsafe operation.
+    fd: RawFd,
+
+    /// Owned `mxio_t` ponter.
+    mxio: *const sys::mxio_t,
+}
+
+impl EventedFdInner {
+    pub fn rereg_for_level(&self, port: &magenta::Port) {
+        let registration_opt = self.registration.lock().unwrap();
+        if let Some(ref registration) = *registration_opt {
+            if let Some((rereg_signals, rereg_opts)) = registration.rereg_signals {
+                let _res =
+                    registration
+                        .handle.inner_ref()
+                        .wait_async(port,
+                                    registration.token.0 as u64,
+                                    rereg_signals,
+                                    rereg_opts);
+            }
+        }
+    }
+
+    pub fn registration(&self) -> &Mutex<Option<EventedFdRegistration>> {
+        &self.registration
+    }
+
+    pub fn mxio(&self) -> &sys::mxio_t {
+        unsafe { &*self.mxio }
+    }
+}
+
+impl Drop for EventedFdInner {
+    fn drop(&mut self) {
+        unsafe {
+            sys::__mxio_release(self.mxio);
+            let _ = libc::close(self.fd);
+        }
+    }
+}
+
+// `EventedInner` must be manually declared `Send + Sync` because it contains a `RawFd` and a
+// `*const sys::mxio_t`. These are only used to make thread-safe system calls, so accessing
+// them is entirely thread-safe.
+//
+// Note: one minor exception to this are the calls to `libc::close` and `__mxio_release`, which
+// happen on `Drop`. These accesses are safe because `drop` can only be called at most once from
+// a single thread, and after it is called no other functions can be called on the `EventedFdInner`.
+unsafe impl Sync for EventedFdInner {}
+unsafe impl Send for EventedFdInner {}
+
+#[derive(Clone, Debug)]
+pub struct EventedFd {
+    pub inner: Arc<EventedFdInner>
+}
+
+impl EventedFd {
+    pub unsafe fn new(fd: RawFd) -> Self {
+        let mxio = sys::__mxio_fd_to_io(fd);
+        assert!(mxio != ::std::ptr::null(), "FileDescriptor given to EventedFd must be valid.");
+
+        EventedFd {
+            inner: Arc::new(EventedFdInner {
+                registration: Mutex::new(None),
+                fd: fd,
+                mxio: mxio,
+            })
+        }
+    }
+
+    fn handle_and_signals_for_events(&self, interest: Ready, opts: PollOpt)
+                -> (sys::mx_handle_t, magenta::Signals)
+    {
+        let epoll_events = ioevent_to_epoll(interest, opts);
+
+        unsafe {
+            let mut raw_handle: sys::mx_handle_t = mem::uninitialized();
+            let mut signals: sys::mx_signals_t = mem::uninitialized();
+            sys::__mxio_wait_begin(self.inner.mxio, epoll_events, &mut raw_handle, &mut signals);
+
+            (raw_handle, signals)
+        }
+    }
+
+    fn register_with_lock(
+        &self,
+        registration: &mut Option<EventedFdRegistration>,
+        poll: &Poll,
+        token: Token,
+        interest: Ready,
+        opts: PollOpt) -> io::Result<()>
+    {
+        if registration.is_some() {
+            return Err(io::Error::new(
+                io::ErrorKind::AlreadyExists,
+                "Called register on an already registered file descriptor."));
+        }
+
+        let (raw_handle, signals) = self.handle_and_signals_for_events(interest, opts);
+
+        let needs_rereg = opts.is_level() && !opts.is_oneshot();
+
+        // If we need to reregister, then each registration should be `oneshot`
+        let opts = opts | if needs_rereg { PollOpt::oneshot() } else { PollOpt::empty() };
+
+        let rereg_signals = if needs_rereg {
+            Some((signals, poll_opts_to_wait_async(opts)))
+        } else {
+            None
+        };
+
+        *registration = Some(
+            unsafe { EventedFdRegistration::new(token, raw_handle, rereg_signals) }
+        );
+
+        // We don't have ownership of the handle, so we can't drop it
+        let handle = DontDrop::new(unsafe { magenta::Handle::from_raw(raw_handle) });
+
+        let registered = poll::selector(poll)
+            .register_fd(handle.inner_ref(), self, token, signals, opts);
+
+        if registered.is_err() {
+            *registration = None;
+        }
+
+        registered
+    }
+
+    fn deregister_with_lock(
+        &self,
+        registration: &mut Option<EventedFdRegistration>,
+        poll: &Poll) -> io::Result<()>
+    {
+        let old_registration = if let Some(old_reg) = registration.take() {
+            old_reg
+        } else {
+            return Err(io::Error::new(
+                io::ErrorKind::NotFound,
+                "Called rereregister on an unregistered file descriptor."))
+        };
+
+        poll::selector(poll)
+            .deregister_fd(old_registration.handle.inner_ref(), old_registration.token)
+    }
+}
+
+impl Evented for EventedFd {
+    fn register(&self,
+                poll: &Poll,
+                token: Token,
+                interest: Ready,
+                opts: PollOpt) -> io::Result<()>
+    {
+        self.register_with_lock(
+            &mut *self.inner.registration.lock().unwrap(),
+            poll,
+            token,
+            interest,
+            opts)
+    }
+
+    fn reregister(&self,
+                  poll: &Poll,
+                  token: Token,
+                  interest: Ready,
+                  opts: PollOpt) -> io::Result<()>
+    {
+        // Take out the registration lock
+        let mut registration_lock = self.inner.registration.lock().unwrap();
+
+        // Deregister
+        self.deregister_with_lock(&mut *registration_lock, poll)?;
+
+        self.register_with_lock(
+            &mut *registration_lock,
+            poll,
+            token,
+            interest,
+            opts)
+    }
+
+    fn deregister(&self, poll: &Poll) -> io::Result<()> {
+        let mut registration_lock = self.inner.registration.lock().unwrap();
+        self.deregister_with_lock(&mut *registration_lock, poll)
+    }
+}
+
+fn ioevent_to_epoll(interest: Ready, opts: PollOpt) -> u32 {
+    use event_imp::ready_from_usize;
+    const HUP: usize   = 0b01000;
+
+    let mut kind = 0;
+
+    if interest.is_readable() {
+        kind |= libc::EPOLLIN;
+    }
+
+    if interest.is_writable() {
+        kind |= libc::EPOLLOUT;
+    }
+
+    if interest.contains(ready_from_usize(HUP)) {
+        kind |= libc::EPOLLRDHUP;
+    }
+
+    if opts.is_edge() {
+        kind |= libc::EPOLLET;
+    }
+
+    if opts.is_oneshot() {
+        kind |= libc::EPOLLONESHOT;
+    }
+
+    if opts.is_level() {
+        kind &= !libc::EPOLLET;
+    }
+
+    kind as u32
+}
diff --git a/src/sys/fuchsia/handles.rs b/src/sys/fuchsia/handles.rs
new file mode 100644
index 0000000..fc70c15
--- /dev/null
+++ b/src/sys/fuchsia/handles.rs
@@ -0,0 +1,85 @@
+use {io, poll, Evented, Ready, Poll, PollOpt, Token};
+use magenta::HandleBase;
+use std::sync::Mutex;
+
+/// Wrapper for registering a `HandleBase` type with mio.
+#[derive(Debug)]
+pub struct EventedHandle<T> where T: HandleBase {
+    /// The handle to be registered.
+    handle: T,
+
+    /// The current `Token` with which the handle is registered with mio.
+    token: Mutex<Option<Token>>,
+}
+
+impl<T> EventedHandle<T> where T: HandleBase {
+    /// Create a new `EventedHandle` which can be registered with mio
+    /// in order to receive event notifications.
+    pub fn new(handle: T) -> Self {
+        EventedHandle {
+            handle: handle,
+            token: Mutex::new(None),
+        }
+    }
+
+    /// Get a reference to the underlying `HandleBase`.
+    pub fn get_ref(&self) -> &T {
+        &self.handle
+    }
+
+    /// Get a mutable reference to the underlying `HandleBase`.
+    pub fn get_mut(&mut self) -> &mut T {
+        &mut self.handle
+    }
+
+    /// Convert back into the inner `HandleBase`.
+    pub fn into_inner(self) -> T {
+        self.handle
+    }
+}
+
+impl<T> Evented for EventedHandle<T> where T: HandleBase {
+    fn register(&self,
+                poll: &Poll,
+                token: Token,
+                interest: Ready,
+                opts: PollOpt) -> io::Result<()>
+    {
+        let mut this_token = self.token.lock().unwrap();
+        {
+            poll::selector(poll).register_handle(&self.handle, token, interest, opts)?;
+            *this_token = Some(token);
+        }
+        Ok(())
+    }
+
+    fn reregister(&self,
+        poll: &Poll,
+        token: Token,
+        interest: Ready,
+        opts: PollOpt) -> io::Result<()>
+    {
+        let mut this_token = self.token.lock().unwrap();
+        {
+            poll::selector(poll).deregister_handle(&self.handle, token)?;
+            *this_token = None;
+            poll::selector(poll).register_handle(&self.handle, token, interest, opts)?;
+            *this_token = Some(token);
+        }
+        Ok(())
+    }
+
+    fn deregister(&self, poll: &Poll) -> io::Result<()> {
+        let mut this_token = self.token.lock().unwrap();
+        let token = if let Some(token) = *this_token { token } else {
+            return Err(io::Error::new(
+                io::ErrorKind::NotFound,
+                "Attempted to deregister an unregistered handle."))
+        };
+        {
+            poll::selector(poll).deregister_handle(&self.handle, token)?;
+            *this_token = None;
+        }
+        Ok(())
+    }
+}
diff --git a/src/sys/fuchsia/mod.rs b/src/sys/fuchsia/mod.rs
new file mode 100644
index 0000000..bb63338
--- /dev/null
+++ b/src/sys/fuchsia/mod.rs
@@ -0,0 +1,225 @@
+use {io, Ready, PollOpt};
+use libc;
+use magenta;
+use std::mem;
+use std::net::{IpAddr, Ipv4Addr, SocketAddr};
+use std::ops::{Deref, DerefMut};
+use std::os::unix::io::RawFd;
+
+mod awakener;
+mod handles;
+mod eventedfd;
+mod net;
+mod selector;
+
+use self::eventedfd::{EventedFd, EventedFdInner};
+
+pub use self::awakener::Awakener;
+pub use self::handles::EventedHandle;
+pub use self::net::{TcpListener, TcpStream, UdpSocket};
+pub use self::selector::{Events, Selector};
+
+// Set non-blocking (workaround since the std version doesn't work in fuchsia)
+// TODO: fix the std version and replace this
+pub fn set_nonblock(fd: RawFd) -> io::Result<()> {
+    cvt(unsafe { libc::fcntl(fd, libc::F_SETFL, libc::O_NONBLOCK) }).map(|_| ())
+}
+
+/// Workaround until fuchsia's recv_from is fixed
+unsafe fn recv_from(fd: RawFd, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
+    let flags = 0;
+
+    let n = cvt(
+        libc::recv(fd,
+                   buf.as_mut_ptr() as *mut libc::c_void,
+                   buf.len(),
+                   flags)
+    )?;
+
+    // random address-- we don't use it
+    let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
+    Ok((n as usize, addr))
+}
+
+mod sys {
+    #![allow(non_camel_case_types)]
+    use std::os::unix::io::RawFd;
+    pub use magenta_sys::{mx_handle_t, mx_signals_t};
+
+    // 17 fn pointers we don't need for mio :)
+    pub type mxio_ops_t = [usize; 17];
+
+    pub type atomic_int_fast32_t = usize; // TODO: https://github.com/rust-lang/libc/issues/631
+
+    #[repr(C)]
+    pub struct mxio_t {
+        pub ops: *const mxio_ops_t,
+        pub magic: u32,
+        pub refcount: atomic_int_fast32_t,
+        pub dupcount: u32,
+        pub flags: u32,
+    }
+
+    #[link(name="mxio")]
+    extern {
+        pub fn __mxio_fd_to_io(fd: RawFd) -> *const mxio_t;
+        pub fn __mxio_release(io: *const mxio_t);
+
+        pub fn __mxio_wait_begin(
+            io: *const mxio_t,
+            events: u32,
+            handle_out: &mut mx_handle_t,
+            signals_out: &mut mx_signals_t,
+        );
+        pub fn __mxio_wait_end(
+            io: *const mxio_t,
+            signals: mx_signals_t,
+            events_out: &mut u32,
+        );
+    }
+}
+
+/// Convert from magenta::Status to io::Error.
+///
+/// Note: these conversions are done on a "best-effort" basis and may not necessarily reflect
+/// exactly equivalent error types.
+fn status_to_io_err(status: magenta::Status) -> io::Error {
+    use magenta::Status;
+
+    let err_kind: io::ErrorKind = match status {
+        Status::ErrInterruptedRetry => io::ErrorKind::Interrupted,
+        Status::ErrBadHandle => io::ErrorKind::BrokenPipe,
+        Status::ErrTimedOut => io::ErrorKind::TimedOut,
+        Status::ErrShouldWait => io::ErrorKind::WouldBlock,
+        Status::ErrPeerClosed => io::ErrorKind::ConnectionAborted,
+        Status::ErrNotFound => io::ErrorKind::NotFound,
+        Status::ErrAlreadyExists => io::ErrorKind::AlreadyExists,
+        Status::ErrAlreadyBound => io::ErrorKind::AddrInUse,
+        Status::ErrUnavailable => io::ErrorKind::AddrNotAvailable,
+        Status::ErrAccessDenied => io::ErrorKind::PermissionDenied,
+        Status::ErrIoRefused => io::ErrorKind::ConnectionRefused,
+        Status::ErrIoDataIntegrity => io::ErrorKind::InvalidData,
+
+        Status::ErrBadPath |
+        Status::ErrInvalidArgs |
+        Status::ErrOutOfRange |
+        Status::ErrWrongType => io::ErrorKind::InvalidInput,
+
+        Status::UnknownOther |
+        Status::ErrNext |
+        Status::ErrStop |
+        Status::ErrNoSpace |
+        Status::ErrFileBig |
+        Status::ErrNotFile |
+        Status::ErrNotDir |
+        Status::ErrIoDataLoss |
+        Status::ErrIo |
+        Status::ErrCanceled |
+        Status::ErrBadState |
+        Status::ErrBufferTooSmall |
+        Status::ErrBadSyscall |
+        Status::NoError |
+        Status::ErrInternal |
+        Status::ErrNotSupported |
+        Status::ErrNoResources |
+        Status::ErrNoMemory |
+        Status::ErrCallFailed
+        => io::ErrorKind::Other
+    }.into();
+
+    err_kind.into()
+}
+
+fn epoll_event_to_ready(epoll: u32) -> Ready {
+    let epoll = epoll as i32; // casts the bits directly
+    let mut kind = Ready::empty();
+
+    if (epoll & libc::EPOLLIN) != 0 || (epoll & libc::EPOLLPRI) != 0 {
+        kind = kind | Ready::readable();
+    }
+
+    if (epoll & libc::EPOLLOUT) != 0 {
+        kind = kind | Ready::writable();
+    }
+
+    kind
+
+    /* TODO: support?
+    // EPOLLHUP - Usually means a socket error happened
+    if (epoll & libc::EPOLLERR) != 0 {
+        kind = kind | UnixReady::error();
+    }
+
+    if (epoll & libc::EPOLLRDHUP) != 0 || (epoll & libc::EPOLLHUP) != 0 {
+        kind = kind | UnixReady::hup();
+    }
+    */
+}
+
+fn poll_opts_to_wait_async(poll_opts: PollOpt) -> magenta::WaitAsyncOpts {
+    if poll_opts.is_oneshot() {
+        magenta::WaitAsyncOpts::Once
+    } else {
+        magenta::WaitAsyncOpts::Repeating
+    }
+}
+
+trait IsMinusOne {
+    fn is_minus_one(&self) -> bool;
+}
+
+impl IsMinusOne for i32 {
+    fn is_minus_one(&self) -> bool { *self == -1 }
+}
+
+impl IsMinusOne for isize {
+    fn is_minus_one(&self) -> bool { *self == -1 }
+}
+
+fn cvt<T: IsMinusOne>(t: T) -> ::io::Result<T> {
+    use std::io;
+
+    if t.is_minus_one() {
+        Err(io::Error::last_os_error())
+    } else {
+        Ok(t)
+    }
+}
+
+/// Utility type to prevent the type inside of it from being dropped.
+#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
+struct DontDrop<T>(Option<T>);
+
+impl<T> DontDrop<T> {
+    fn new(t: T) -> DontDrop<T> {
+        DontDrop(Some(t))
+    }
+
+    fn inner_ref(&self) -> &T {
+        self.0.as_ref().unwrap()
+    }
+
+    fn inner_mut(&mut self) -> &mut T {
+        self.0.as_mut().unwrap()
+    }
+}
+
+impl<T> Deref for DontDrop<T> {
+    type Target = T;
+    fn deref(&self) -> &Self::Target {
+        self.inner_ref()
+    }
+}
+
+impl<T> DerefMut for DontDrop<T> {
+    fn deref_mut(&mut self) -> &mut Self::Target {
+        self.inner_mut()
+    }
+}
+
+impl<T> Drop for DontDrop<T> {
+    fn drop(&mut self) {
+        let inner = self.0.take();
+        mem::forget(inner);
+    }
+}
diff --git a/src/sys/fuchsia/net.rs b/src/sys/fuchsia/net.rs
new file mode 100644
index 0000000..9b37d52
--- /dev/null
+++ b/src/sys/fuchsia/net.rs
@@ -0,0 +1,440 @@
+use {io, Evented, Ready, Poll, PollOpt, Token};
+use iovec::IoVec;
+use iovec::unix as iovec;
+use libc;
+use net2::TcpStreamExt;
+#[allow(unused_imports)] // only here for Rust 1.8
+use net2::UdpSocketExt;
+use sys::fuchsia::{recv_from, set_nonblock, EventedFd, DontDrop};
+use std::cmp;
+use std::io::{Read, Write};
+use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr};
+use std::os::unix::io::AsRawFd;
+use std::time::Duration;
+
+#[derive(Debug)]
+pub struct TcpStream {
+    io: DontDrop<net::TcpStream>,
+    evented_fd: EventedFd,
+}
+
+impl TcpStream {
+    pub fn connect(stream: net::TcpStream, addr: &SocketAddr) -> io::Result<TcpStream> {
+        try!(set_nonblock(stream.as_raw_fd()));
+
+        let connected = stream.connect(addr);
+        match connected {
+            Ok(..) => {}
+            Err(ref e) if e.raw_os_error() == Some(libc::EINPROGRESS) => {}
+            Err(e) => return Err(e),
+        }
+
+        let evented_fd = unsafe { EventedFd::new(stream.as_raw_fd()) };
+
+        return Ok(TcpStream {
+            io: DontDrop::new(stream),
+            evented_fd: evented_fd,
+        })
+    }
+
+    pub fn from_stream(stream: net::TcpStream) -> TcpStream {
+        let evented_fd = unsafe { EventedFd::new(stream.as_raw_fd()) };
+
+        TcpStream {
+            io: DontDrop::new(stream),
+            evented_fd: evented_fd,
+        }
+    }
+
+    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+        self.io.peer_addr()
+    }
+
+    pub fn local_addr(&self) -> io::Result<SocketAddr> {
+        self.io.local_addr()
+    }
+
+    pub fn try_clone(&self) -> io::Result<TcpStream> {
+        self.io.try_clone().map(|s| {
+            let evented_fd = unsafe { EventedFd::new(s.as_raw_fd()) };
+            TcpStream {
+                io: DontDrop::new(s),
+                evented_fd: evented_fd,
+            }
+        })
+    }
+
+    pub fn shutdown(&self, how: net::Shutdown) -> io::Result<()> {
+        self.io.shutdown(how)
+    }
+
+    pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
+        self.io.set_nodelay(nodelay)
+    }
+
+    pub fn nodelay(&self) -> io::Result<bool> {
+        self.io.nodelay()
+    }
+
+    pub fn set_recv_buffer_size(&self, size: usize) -> io::Result<()> {
+        self.io.set_recv_buffer_size(size)
+    }
+
+    pub fn recv_buffer_size(&self) -> io::Result<usize> {
+        self.io.recv_buffer_size()
+    }
+
+    pub fn set_send_buffer_size(&self, size: usize) -> io::Result<()> {
+        self.io.set_send_buffer_size(size)
+    }
+
+    pub fn send_buffer_size(&self) -> io::Result<usize> {
+        self.io.send_buffer_size()
+    }
+
+    pub fn set_keepalive(&self, keepalive: Option<Duration>) -> io::Result<()> {
+        self.io.set_keepalive(keepalive)
+    }
+
+    pub fn keepalive(&self) -> io::Result<Option<Duration>> {
+        self.io.keepalive()
+    }
+
+    pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
+        self.io.set_ttl(ttl)
+    }
+
+    pub fn ttl(&self) -> io::Result<u32> {
+        self.io.ttl()
+    }
+
+    pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
+        self.io.set_only_v6(only_v6)
+    }
+
+    pub fn only_v6(&self) -> io::Result<bool> {
+        self.io.only_v6()
+    }
+
+    pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
+        self.io.set_linger(dur)
+    }
+
+    pub fn linger(&self) -> io::Result<Option<Duration>> {
+        self.io.linger()
+    }
+
+    pub fn take_error(&self) -> io::Result<Option<io::Error>> {
+        self.io.take_error()
+    }
+
+    pub fn readv(&self, bufs: &mut [&mut IoVec]) -> io::Result<usize> {
+        unsafe {
+            let slice = iovec::as_os_slice_mut(bufs);
+            let len = cmp::min(<libc::c_int>::max_value() as usize, slice.len());
+            let rc = libc::readv(self.io.as_raw_fd(),
+                                 slice.as_ptr(),
+                                 len as libc::c_int);
+            if rc < 0 {
+                Err(io::Error::last_os_error())
+            } else {
+                Ok(rc as usize)
+            }
+        }
+    }
+
+    pub fn writev(&self, bufs: &[&IoVec]) -> io::Result<usize> {
+        unsafe {
+            let slice = iovec::as_os_slice(bufs);
+            let len = cmp::min(<libc::c_int>::max_value() as usize, slice.len());
+            let rc = libc::writev(self.io.as_raw_fd(),
+                                  slice.as_ptr(),
+                                  len as libc::c_int);
+            if rc < 0 {
+                Err(io::Error::last_os_error())
+            } else {
+                Ok(rc as usize)
+            }
+        }
+    }
+}
+
+impl<'a> Read for &'a TcpStream {
+    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+        self.io.inner_ref().read(buf)
+    }
+}
+
+impl<'a> Write for &'a TcpStream {
+    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+        self.io.inner_ref().write(buf)
+    }
+    fn flush(&mut self) -> io::Result<()> {
+        self.io.inner_ref().flush()
+    }
+}
+
+impl Evented for TcpStream {
+    fn register(&self,
+                poll: &Poll,
+                token: Token,
+                interest: Ready,
+                opts: PollOpt) -> io::Result<()>
+    {
+        self.evented_fd.register(poll, token, interest, opts)
+    }
+
+    fn reregister(&self,
+                  poll: &Poll,
+                  token: Token,
+                  interest: Ready,
+                  opts: PollOpt) -> io::Result<()>
+    {
+        self.evented_fd.reregister(poll, token, interest, opts)
+    }
+
+    fn deregister(&self, poll: &Poll) -> io::Result<()> {
+        self.evented_fd.deregister(poll)
+    }
+}
+
+#[derive(Debug)]
+pub struct TcpListener {
+    io: DontDrop<net::TcpListener>,
+    evented_fd: EventedFd,
+}
+
+impl TcpListener {
+    pub fn new(inner: net::TcpListener, _addr: &SocketAddr) -> io::Result<TcpListener> {
+        set_nonblock(inner.as_raw_fd())?;
+
+        let evented_fd = unsafe { EventedFd::new(inner.as_raw_fd()) };
+
+        Ok(TcpListener {
+            io: DontDrop::new(inner),
+            evented_fd: evented_fd,
+        })
+    }
+
+    pub fn local_addr(&self) -> io::Result<SocketAddr> {
+        self.io.local_addr()
+    }
+
+    pub fn try_clone(&self) -> io::Result<TcpListener> {
+        self.io.try_clone().map(|io| {
+            let evented_fd = unsafe { EventedFd::new(io.as_raw_fd()) };
+            TcpListener {
+                io: DontDrop::new(io),
+                evented_fd: evented_fd,
+            }
+        })
+    }
+
+    pub fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
+        self.io.accept().and_then(|(s, a)| {
+            set_nonblock(s.as_raw_fd())?;
+            let evented_fd = unsafe { EventedFd::new(s.as_raw_fd()) };
+            return Ok((TcpStream {
+                io: DontDrop::new(s),
+                evented_fd: evented_fd,
+            }, a))
+        })
+    }
+
+    #[allow(deprecated)]
+    pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
+        self.io.set_only_v6(only_v6)
+    }
+
+    #[allow(deprecated)]
+    pub fn only_v6(&self) -> io::Result<bool> {
+        self.io.only_v6()
+    }
+
+    pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
+        self.io.set_ttl(ttl)
+    }
+
+    pub fn ttl(&self) -> io::Result<u32> {
+        self.io.ttl()
+    }
+
+    pub fn take_error(&self) -> io::Result<Option<io::Error>> {
+        self.io.take_error()
+    }
+}
+
+impl Evented for TcpListener {
+    fn register(&self,
+                poll: &Poll,
+                token: Token,
+                interest: Ready,
+                opts: PollOpt) -> io::Result<()>
+    {
+        self.evented_fd.register(poll, token, interest, opts)
+    }
+
+    fn reregister(&self,
+                  poll: &Poll,
+                  token: Token,
+                  interest: Ready,
+                  opts: PollOpt) -> io::Result<()>
+    {
+        self.evented_fd.reregister(poll, token, interest, opts)
+    }
+
+    fn deregister(&self, poll: &Poll) -> io::Result<()> {
+        self.evented_fd.deregister(poll)
+    }
+}
+
+#[derive(Debug)]
+pub struct UdpSocket {
+    io: DontDrop<net::UdpSocket>,
+    evented_fd: EventedFd,
+}
+
+impl UdpSocket {
+    pub fn new(socket: net::UdpSocket) -> io::Result<UdpSocket> {
+        set_nonblock(socket.as_raw_fd())?;
+
+        let evented_fd = unsafe { EventedFd::new(socket.as_raw_fd()) };
+
+        Ok(UdpSocket {
+            io: DontDrop::new(socket),
+            evented_fd: evented_fd,
+        })
+    }
+
+    pub fn local_addr(&self) -> io::Result<SocketAddr> {
+        self.io.local_addr()
+    }
+
+    pub fn try_clone(&self) -> io::Result<UdpSocket> {
+        self.io.try_clone().and_then(|io| {
+            UdpSocket::new(io)
+        })
+    }
+
+    pub fn send_to(&self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> {
+        self.io.send_to(buf, target)
+    }
+
+    pub fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
+        unsafe { recv_from(self.io.as_raw_fd(), buf) }
+    }
+
+    pub fn send(&self, buf: &[u8]) -> io::Result<usize> {
+        self.io.send(buf)
+    }
+
+    pub fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
+        self.io.recv(buf)
+    }
+
+    pub fn connect(&self, addr: SocketAddr)
+                   -> io::Result<()> {
+        self.io.connect(addr)
+    }
+
+    pub fn broadcast(&self) -> io::Result<bool> {
+        self.io.broadcast()
+    }
+
+    pub fn set_broadcast(&self, on: bool) -> io::Result<()> {
+        self.io.set_broadcast(on)
+    }
+
+    pub fn multicast_loop_v4(&self) -> io::Result<bool> {
+        self.io.multicast_loop_v4()
+    }
+
+    pub fn set_multicast_loop_v4(&self, on: bool) -> io::Result<()> {
+        self.io.set_multicast_loop_v4(on)
+    }
+
+    pub fn multicast_ttl_v4(&self) -> io::Result<u32> {
+        self.io.multicast_ttl_v4()
+    }
+
+    pub fn set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()> {
+        self.io.set_multicast_ttl_v4(ttl)
+    }
+
+    pub fn multicast_loop_v6(&self) -> io::Result<bool> {
+        self.io.multicast_loop_v6()
+    }
+
+    pub fn set_multicast_loop_v6(&self, on: bool) -> io::Result<()> {
+        self.io.set_multicast_loop_v6(on)
+    }
+
+    pub fn ttl(&self) -> io::Result<u32> {
+        self.io.ttl()
+    }
+
+    pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
+        self.io.set_ttl(ttl)
+    }
+
+    pub fn join_multicast_v4(&self,
+                             multiaddr: &Ipv4Addr,
+                             interface: &Ipv4Addr) -> io::Result<()> {
+        self.io.join_multicast_v4(multiaddr, interface)
+    }
+
+    pub fn join_multicast_v6(&self,
+                             multiaddr: &Ipv6Addr,
+                             interface: u32) -> io::Result<()> {
+        self.io.join_multicast_v6(multiaddr, interface)
+    }
+
+    pub fn leave_multicast_v4(&self,
+                              multiaddr: &Ipv4Addr,
+                              interface: &Ipv4Addr) -> io::Result<()> {
+        self.io.leave_multicast_v4(multiaddr, interface)
+    }
+
+    pub fn leave_multicast_v6(&self,
+                              multiaddr: &Ipv6Addr,
+                              interface: u32) -> io::Result<()> {
+        self.io.leave_multicast_v6(multiaddr, interface)
+    }
+
+    pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
+        self.io.set_only_v6(only_v6)
+    }
+
+    pub fn only_v6(&self) -> io::Result<bool> {
+        self.io.only_v6()
+    }
+
+
+    pub fn take_error(&self) -> io::Result<Option<io::Error>> {
+        self.io.take_error()
+    }
+}
+
+impl Evented for UdpSocket {
+    fn register(&self,
+                poll: &Poll,
+                token: Token,
+                interest: Ready,
+                opts: PollOpt) -> io::Result<()>
+    {
+        self.evented_fd.register(poll, token, interest, opts)
+    }
+
+    fn reregister(&self,
+                  poll: &Poll,
+                  token: Token,
+                  interest: Ready,
+                  opts: PollOpt) -> io::Result<()>
+    {
+        self.evented_fd.reregister(poll, token, interest, opts)
+    }
+
+    fn deregister(&self, poll: &Poll) -> io::Result<()> {
+        self.evented_fd.deregister(poll)
+    }
+}
diff --git a/src/sys/fuchsia/selector.rs b/src/sys/fuchsia/selector.rs
new file mode 100644
index 0000000..f008c48
--- /dev/null
+++ b/src/sys/fuchsia/selector.rs
@@ -0,0 +1,369 @@
+use {io, Event, PollOpt, Ready, Token};
+use sys::fuchsia::{
+    epoll_event_to_ready,
+    poll_opts_to_wait_async,
+    status_to_io_err,
+    EventedFd,
+    EventedFdInner,
+};
+use magenta;
+use magenta_sys;
+use magenta::HandleBase;
+use std::collections::hash_map;
+use std::fmt;
+use std::mem;
+use std::sync::atomic::{AtomicBool, AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
+use std::sync::{Arc, Mutex, Weak};
+use std::time::Duration;
+use sys;
+
+/// The kind of registration-- file descriptor or handle.
+///
+/// The last bit of a token is set to indicate the type of the registration.
+#[derive(Copy, Clone, Eq, PartialEq)]
+enum RegType {
+    Fd,
+    Handle,
+}
+
+fn key_from_token_and_type(token: Token, reg_type: RegType) -> io::Result<u64> {
+    let key = token.0 as u64;
+    let msb = 1u64 << 63;
+    if (key & msb) != 0 {
+        return Err(io::Error::new(
+            io::ErrorKind::InvalidInput,
+            "Most-significant bit of token must remain unset."));
+    }
+
+    Ok(match reg_type {
+        RegType::Fd => key,
+        RegType::Handle => key | msb,
+    })
+}
+
+fn token_and_type_from_key(key: u64) -> (Token, RegType) {
+    let msb = 1u64 << 63;
+    (
+        Token((key & !msb) as usize),
+        if (key & msb) == 0 {
+            RegType::Fd
+        } else {
+            RegType::Handle
+        }
+    )
+}
+
+/// Each Selector has a globally unique(ish) ID associated with it. This ID
+/// gets tracked by `TcpStream`, `TcpListener`, etc... when they are first
+/// registered with the `Selector`. If a type that is previously associated with
+/// a `Selector` attempts to register itself with a different `Selector`, the
+/// operation will return with an error. This matches windows behavior.
+static NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT;
+
+pub struct Selector {
+    id: usize,
+
+    /// Magenta object on which the handles have been registered, and on which events occur
+    port: Arc<magenta::Port>,
+
+    /// Whether or not `tokens_to_rereg` contains any elements. This is a best-effort attempt
+    /// used to prevent having to lock `tokens_to_rereg` when it is empty.
+    has_tokens_to_rereg: AtomicBool,
+
+    /// List of `Token`s corresponding to registrations that need to be reregistered before the
+    /// next `port::wait`. This is necessary to provide level-triggered behavior for
+    /// `Async::repeating` registrations.
+    ///
+    /// When a level-triggered `Async::repeating` event is seen, its token is added to this list so
+    /// that it will be reregistered before the next `port::wait` call, making `port::wait` return
+    /// immediately if the signal was high during the reregistration.
+    ///
+    /// Note: when used at the same time, the `tokens_to_rereg` lock should be taken out _before_
+    /// `token_to_fd`.
+    tokens_to_rereg: Mutex<Vec<Token>>,
+
+    /// Map from tokens to weak references to `EventedFdInner`-- a structure describing a
+    /// file handle, its associated `mxio` object, and its current registration.
+    token_to_fd: Mutex<hash_map::HashMap<Token, Weak<EventedFdInner>>>,
+}
+
+impl Selector {
+    pub fn new() -> io::Result<Selector> {
+        let port = Arc::new(
+            magenta::Port::create(magenta::PortOpts::Default)
+                .map_err(status_to_io_err)?
+        );
+
+        // offset by 1 to avoid choosing 0 as the id of a selector
+        let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1;
+
+        let has_tokens_to_rereg = AtomicBool::new(false);
+        let tokens_to_rereg = Mutex::new(Vec::new());
+        let token_to_fd = Mutex::new(hash_map::HashMap::new());
+
+        Ok(Selector {
+            id: id,
+            port: port,
+            has_tokens_to_rereg: has_tokens_to_rereg,
+            tokens_to_rereg: tokens_to_rereg,
+            token_to_fd: token_to_fd,
+        })
+    }
+
+    pub fn id(&self) -> usize {
+        self.id
+    }
+
+    /// Returns a reference to the underlying port `Arc`.
+    pub fn port(&self) -> &Arc<magenta::Port> { &self.port }
+
+    /// Reregisters all registrations pointed to by the `tokens_to_rereg` list
+    /// if `has_tokens_to_rereg`.
+    fn reregister_handles(&self) -> io::Result<()> {
+        // We use `Ordering::Acquire` to make sure that we see all `tokens_to_rereg`
+        // written before the store using `Ordering::Release`.
+        if self.has_tokens_to_rereg.load(Ordering::Acquire) {
+            let mut tokens = self.tokens_to_rereg.lock().unwrap();
+            let token_to_fd = self.token_to_fd.lock().unwrap();
+            for token in tokens.drain(0..) {
+                if let Some(eventedfd) = token_to_fd.get(&token)
+                    .and_then(|h| h.upgrade()) {
+                    eventedfd.rereg_for_level(&self.port);
+                }
+            }
+            self.has_tokens_to_rereg.store(false, Ordering::Release);
+        }
+        Ok(())
+    }
+
+    pub fn select(&self,
+                  evts: &mut Events,
+                  _awakener: Token,
+                  timeout: Option<Duration>) -> io::Result<bool>
+    {
+        evts.events.drain(0..);
+
+        self.reregister_handles()?;
+
+        let deadline = match timeout {
+            Some(duration) => {
+                let nanos = duration.as_secs().saturating_mul(1_000_000_000)
+                                .saturating_add(duration.subsec_nanos() as u64);
+
+                magenta::deadline_after(nanos)
+            }
+            None => magenta::MX_TIME_INFINITE,
+        };
+
+        let packet = match self.port.wait(deadline) {
+            Ok(packet) => packet,
+            Err(magenta::Status::ErrTimedOut) => return Ok(false),
+            Err(e) => return Err(status_to_io_err(e)),
+        };
+
+        let observed_signals = match packet.contents() {
+            magenta::PacketContents::SignalOne(signal_packet) => {
+                signal_packet.observed()
+            }
+            magenta::PacketContents::SignalRep(signal_packet) => {
+                signal_packet.observed()
+            }
+            magenta::PacketContents::User(_user_packet) => {
+                // User packets are only ever sent by an Awakener
+                return Ok(true);
+            }
+        };
+
+        let key = packet.key();
+        let (token, reg_type) = token_and_type_from_key(key);
+
+        match reg_type {
+            RegType::Handle => {
+                // We can return immediately-- no lookup or registration necessary.
+                evts.events.push(Event::new(signals_to_ready(observed_signals), token));
+                Ok(false)
+            },
+            RegType::Fd => {
+                // Convert the signals to epoll events using __mxio_wait_end,
+                // and add to reregistration list if necessary.
+                let events: u32;
+                {
+                    let handle = if let Some(handle) =
+                    self.token_to_fd.lock().unwrap()
+                        .get(&token)
+                        .and_then(|h| h.upgrade()) {
+                        handle
+                    } else {
+                        // This handle is apparently in the process of removal.
+                        // It has been removed from the list, but port_cancel has not been called.
+                        return Ok(false);
+                    };
+
+                    events = unsafe {
+                        let mut events: u32 = mem::uninitialized();
+                        sys::fuchsia::sys::__mxio_wait_end(handle.mxio(), observed_signals, &mut events);
+                        events
+                    };
+
+                    // If necessary, queue to be reregistered before next port_await
+                    let needs_to_rereg = {
+                        let registration_lock = handle.registration().lock().unwrap();
+
+                        registration_lock
+                            .as_ref()
+                            .and_then(|r| r.rereg_signals())
+                            .is_some()
+                    };
+
+                    if needs_to_rereg {
+                        let mut tokens_to_rereg_lock = self.tokens_to_rereg.lock().unwrap();
+                        tokens_to_rereg_lock.push(token);
+                        // We use `Ordering::Release` to make sure that we see all `tokens_to_rereg`
+                        // written before the store.
+                        self.has_tokens_to_rereg.store(true, Ordering::Release);
+                    }
+                }
+
+                evts.events.push(Event::new(epoll_event_to_ready(events), token));
+                Ok(false)
+            },
+        }
+    }
+
+    /// Register event interests for the given IO handle with the OS
+    pub fn register_fd(&self,
+                       handle: &magenta::Handle,
+                       fd: &EventedFd,
+                       token: Token,
+                       signals: magenta::Signals,
+                       poll_opts: PollOpt) -> io::Result<()>
+    {
+        {
+            let mut token_to_fd = self.token_to_fd.lock().unwrap();
+            match token_to_fd.entry(token) {
+                hash_map::Entry::Occupied(_) =>
+                    return Err(io::Error::new(io::ErrorKind::AlreadyExists,
+                               "Attempted to register a filedescriptor on an existing token.")),
+                hash_map::Entry::Vacant(slot) => slot.insert(Arc::downgrade(&fd.inner)),
+            };
+        }
+
+        let wait_async_opts = poll_opts_to_wait_async(poll_opts);
+
+        let wait_res = handle.wait_async(&self.port, token.0 as u64, signals, wait_async_opts)
+            .map_err(status_to_io_err);
+
+        if wait_res.is_err() {
+            self.token_to_fd.lock().unwrap().remove(&token);
+        }
+
+        wait_res
+    }
+
+    /// Deregister event interests for the given IO handle with the OS
+    pub fn deregister_fd(&self, handle: &magenta::Handle, token: Token) -> io::Result<()> {
+        self.token_to_fd.lock().unwrap().remove(&token);
+
+        // We ignore NotFound errors since oneshots are automatically deregistered,
+        // but mio will attempt to deregister them manually.
+        self.port.cancel(&*handle, token.0 as u64)
+            .map_err(status_to_io_err)
+            .or_else(|e| if e.kind() == io::ErrorKind::NotFound {
+                Ok(())
+            } else {
+                Err(e)
+            })
+    }
+
+    pub fn register_handle<H>(&self,
+                              handle: &H,
+                              token: Token,
+                              interests: Ready,
+                              poll_opts: PollOpt) -> io::Result<()>
+        where H: magenta::HandleBase
+    {
+        if poll_opts.is_level() && !poll_opts.is_oneshot() {
+            return Err(io::Error::new(io::ErrorKind::InvalidInput,
+                      "Repeated level-triggered events are not supported on Fuchsia handles."));
+        }
+
+        handle.wait_async(&self.port,
+                          key_from_token_and_type(token, RegType::Handle)?,
+                          ready_to_signals(interests),
+                          poll_opts_to_wait_async(poll_opts))
+              .map_err(status_to_io_err)
+    }
+
+
+    pub fn deregister_handle<H>(&self, handle: &H, token: Token) -> io::Result<()>
+        where H: magenta::HandleBase
+    {
+        self.port.cancel(handle, key_from_token_and_type(token, RegType::Handle)?)
+                 .map_err(status_to_io_err)
+    }
+}
+
+fn ready_to_signals(ready: Ready) -> magenta::Signals {
+    // Get empty notifications for peer closing or other miscellaneous signals:
+    magenta_sys::MX_OBJECT_PEER_CLOSED |
+    magenta::MX_EVENT_SIGNALED |
+
+    if ready.is_writable() {
+        magenta_sys::MX_OBJECT_WRITABLE
+    } else {
+        magenta::MX_SIGNAL_NONE
+    } |
+
+    if ready.is_readable() {
+        magenta_sys::MX_OBJECT_READABLE
+    } else {
+        magenta::MX_SIGNAL_NONE
+    }
+}
+
+fn signals_to_ready(signals: magenta::Signals) -> Ready {
+    (if signals.contains(magenta_sys::MX_OBJECT_WRITABLE) {
+        Ready::writable()
+    } else {
+        Ready::empty()
+    }) |
+
+    (if signals.contains(magenta_sys::MX_OBJECT_READABLE) {
+        Ready::readable()
+    } else {
+        Ready::empty()
+    })
+}
+
+pub struct Events {
+    events: Vec<Event>
+}
+
+impl Events {
+    pub fn with_capacity(_u: usize) -> Events {
+        // The Fuchsia selector only handles one event at a time,
+        // so we ignore the default capacity and set it to one.
+        Events { events: Vec::with_capacity(1) }
+    }
+    pub fn len(&self) -> usize {
+        self.events.len()
+    }
+    pub fn capacity(&self) -> usize {
+        self.events.capacity()
+    }
+    pub fn is_empty(&self) -> bool {
+        self.events.is_empty()
+    }
+    pub fn get(&self, idx: usize) -> Option<Event> {
+        self.events.get(idx).map(|e| *e)
+    }
+    pub fn push_event(&mut self, event: Event) {
+        self.events.push(event)
+    }
+}
+
+impl fmt::Debug for Events {
+    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+        write!(fmt, "Events {{ len: {} }}", self.len())
+    }
+}
diff --git a/src/sys/mod.rs b/src/sys/mod.rs
index cff0631..3f712d4 100644
--- a/src/sys/mod.rs
+++ b/src/sys/mod.rs
@@ -1,4 +1,4 @@
-#[cfg(unix)]
+#[cfg(all(unix, not(target_os = "fuchsia")))]
 pub use self::unix::{
     Awakener,
     EventedFd,
@@ -12,11 +12,11 @@
     set_nonblock,
 };
 
-#[cfg(unix)]
+#[cfg(all(unix, not(target_os = "fuchsia")))]
 #[cfg(feature = "with-deprecated")]
 pub use self::unix::UnixSocket;
 
-#[cfg(unix)]
+#[cfg(all(unix, not(target_os = "fuchsia")))]
 pub mod unix;
 
 #[cfg(windows)]
@@ -33,3 +33,18 @@
 
 #[cfg(windows)]
 mod windows;
+
+#[cfg(target_os = "fuchsia")]
+pub use self::fuchsia::{
+    Awakener,
+    Events,
+    EventedHandle,
+    Selector,
+    TcpStream,
+    TcpListener,
+    UdpSocket,
+    set_nonblock,
+};
+
+#[cfg(target_os = "fuchsia")]
+mod fuchsia;
diff --git a/src/sys/unix/mod.rs b/src/sys/unix/mod.rs
index ee6f5ce..a45893b 100644
--- a/src/sys/unix/mod.rs
+++ b/src/sys/unix/mod.rs
@@ -3,10 +3,10 @@
 #[macro_use]
 pub mod dlsym;
 
-#[cfg(any(target_os = "linux", target_os = "android", target_os = "fuchsia"))]
+#[cfg(any(target_os = "linux", target_os = "android"))]
 mod epoll;
 
-#[cfg(any(target_os = "linux", target_os = "android", target_os = "fuchsia"))]
+#[cfg(any(target_os = "linux", target_os = "android"))]
 pub use self::epoll::{Events, Selector};
 
 #[cfg(any(target_os = "bitrig", target_os = "dragonfly",
diff --git a/src/udp.rs b/src/udp.rs
index 388e823..d99919a 100644
--- a/src/udp.rs
+++ b/src/udp.rs
@@ -280,24 +280,24 @@
  *
  */
 
-#[cfg(unix)]
+#[cfg(all(unix, not(target_os = "fuchsia")))]
 use std::os::unix::io::{IntoRawFd, AsRawFd, FromRawFd, RawFd};
 
-#[cfg(unix)]
+#[cfg(all(unix, not(target_os = "fuchsia")))]
 impl IntoRawFd for UdpSocket {
     fn into_raw_fd(self) -> RawFd {
         self.sys.into_raw_fd()
     }
 }
 
-#[cfg(unix)]
+#[cfg(all(unix, not(target_os = "fuchsia")))]
 impl AsRawFd for UdpSocket {
     fn as_raw_fd(&self) -> RawFd {
         self.sys.as_raw_fd()
     }
 }
 
-#[cfg(unix)]
+#[cfg(all(unix, not(target_os = "fuchsia")))]
 impl FromRawFd for UdpSocket {
     unsafe fn from_raw_fd(fd: RawFd) -> UdpSocket {
         UdpSocket {
diff --git a/test/mod.rs b/test/mod.rs
index 9d7a962..16bc087 100644
--- a/test/mod.rs
+++ b/test/mod.rs
@@ -10,6 +10,9 @@
 extern crate slab;
 extern crate tempdir;
 
+#[cfg(target_os = "fuchsia")]
+extern crate magenta;
+
 pub use ports::localhost;
 
 mod test_custom_evented;
@@ -51,8 +54,13 @@
 #[cfg(any(target_os = "macos", target_os = "linux"))]
 mod test_broken_pipe;
 
+#[cfg(any(target_os = "fuchsia"))]
+mod test_fuchsia_handles;
+
 use bytes::{Buf, MutBuf};
 use std::io::{self, Read, Write};
+use std::time::Duration;
+use mio::{Event, Events, Poll};
 
 pub trait TryRead {
     fn try_read_buf<B: MutBuf>(&mut self, buf: &mut B) -> io::Result<Option<usize>>
@@ -165,3 +173,32 @@
     use std::time::Duration;
     thread::sleep(Duration::from_millis(ms));
 }
+
+pub fn expect_events(poll: &Poll,
+                     event_buffer: &mut Events,
+                     poll_try_count: usize,
+                     mut expected: Vec<Event>)
+{
+    const MS: u64 = 1_000;
+
+    for _ in 0..poll_try_count {
+        poll.poll(event_buffer, Some(Duration::from_millis(MS))).unwrap();
+        for event in event_buffer.iter() {
+            let pos_opt = match expected.iter().position(|exp_event| {
+                (event.token() == exp_event.token()) &&
+                event.kind().contains(exp_event.kind())
+            }) {
+                Some(x) => Some(x),
+                None => None,
+            };
+            if let Some(pos) = pos_opt { expected.remove(pos); }
+        }
+
+        if expected.len() == 0 {
+            break;
+        }
+    }
+
+    assert!(expected.len() == 0, "The following expected events were not found: {:?}", expected);
+}
+
diff --git a/test/test_echo_server.rs b/test/test_echo_server.rs
index dc02e96..c564099 100644
--- a/test/test_echo_server.rs
+++ b/test/test_echo_server.rs
@@ -214,9 +214,12 @@
             Err(e) => debug!("not implemented; client err={:?}", e)
         }
 
-        assert!(self.interest.is_readable() || self.interest.is_writable(), "actual={:?}", self.interest);
-        event_loop.reregister(&self.sock, self.token, self.interest,
-                              PollOpt::edge() | PollOpt::oneshot())
+        if self.interest.is_readable() || self.interest.is_writable() {
+            try!(event_loop.reregister(&self.sock, self.token, self.interest,
+                                  PollOpt::edge() | PollOpt::oneshot()));
+        }
+
+        Ok(())
     }
 
     fn next_msg(&mut self, event_loop: &mut EventLoop<Echo>) -> io::Result<()> {
diff --git a/test/test_fuchsia_handles.rs b/test/test_fuchsia_handles.rs
new file mode 100644
index 0000000..26fd974
--- /dev/null
+++ b/test/test_fuchsia_handles.rs
@@ -0,0 +1,30 @@
+use mio::*;
+use mio::fuchsia::EventedHandle;
+use magenta;
+use std::time::Duration;
+
+const MS: u64 = 1_000;
+
+#[test]
+pub fn test_fuchsia_channel() {
+    let poll = Poll::new().unwrap();
+    let mut event_buffer = Events::with_capacity(1);
+    let event_buffer = &mut event_buffer;
+
+    let (channel0, channel1) = magenta::Channel::create(magenta::ChannelOpts::Normal).unwrap();
+    let channel1_evented = EventedHandle::new(channel1);
+
+    poll.register(&channel1_evented, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
+
+    poll.poll(event_buffer, Some(Duration::from_millis(MS))).unwrap();
+    assert_eq!(event_buffer.len(), 0);
+
+    channel0.write(&[1, 2, 3], &mut vec![], 0).unwrap();
+
+    poll.poll(event_buffer, Some(Duration::from_millis(MS))).unwrap();
+    let event = event_buffer.get(0).unwrap();
+    assert_eq!(event.token(), Token(1));
+    assert!(event.readiness().is_readable());
+
+    poll.deregister(&channel1_evented).unwrap();
+}
\ No newline at end of file
diff --git a/test/test_poll_channel.rs b/test/test_poll_channel.rs
index ecaa140..39b2cc6 100644
--- a/test/test_poll_channel.rs
+++ b/test/test_poll_channel.rs
@@ -1,4 +1,4 @@
-use {sleep_ms};
+use {expect_events, sleep_ms};
 use mio::*;
 use std::sync::mpsc::TryRecvError;
 use std::thread;
@@ -238,13 +238,10 @@
     // Sleep a bit to ensure it arrives at dest
     sleep_ms(250);
 
-    poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap();
-    let e = filter(&events, Token(0));
-
-    assert_eq!(1, e.len());
-
-    let e = filter(&events, Token(1));
-    assert_eq!(1, e.len());
+    expect_events(&poll, &mut events, 2, vec![
+        Event::new(Ready::none(), Token(0)),
+        Event::new(Ready::none(), Token(1)),
+    ]);
 }
 
 #[test]
@@ -285,7 +282,3 @@
         }
     }
 }
-
-fn filter(events: &Events, token: Token) -> Vec<Event> {
-    events.iter().filter(|e| e.token() == token).collect()
-}
diff --git a/test/test_tcp.rs b/test/test_tcp.rs
index 028cba2..93eccde 100644
--- a/test/test_tcp.rs
+++ b/test/test_tcp.rs
@@ -509,7 +509,12 @@
             if event.token() == Token(3) {
                 assert!(event.kind().is_readable());
 
-                server.read(&mut buf).unwrap_err();
+                match server.read(&mut buf) {
+                    Ok(0) |
+                    Err(_) => {},
+
+                    Ok(x) => panic!("expected empty buffer but read {} bytes", x),
+                }
                 return;
             }
         }
@@ -518,6 +523,7 @@
 }
 
  #[test]
+ #[cfg_attr(target_os = "fuchsia", ignore)]
  fn connect_error() {
      let poll = Poll::new().unwrap();
      let mut events = Events::with_capacity(16);
diff --git a/test/test_tcp_level.rs b/test/test_tcp_level.rs
index 2cc9286..d0d228b 100644
--- a/test/test_tcp_level.rs
+++ b/test/test_tcp_level.rs
@@ -1,4 +1,4 @@
-use {sleep_ms, TryRead};
+use {expect_events, sleep_ms, TryRead};
 use mio::*;
 use mio::tcp::*;
 use std::io::Write;
@@ -76,10 +76,10 @@
     // Sleep a bit to ensure it arrives at dest
     sleep_ms(250);
 
-    let _ = poll.poll(&mut pevents, Some(Duration::from_millis(MS))).unwrap();
-    let events: Vec<Event> = (0..pevents.len()).map(|i| pevents.get(i).unwrap()).collect();
-    assert!(events.len() == 2, "actual={:?}", events);
-    assert_eq!(filter(&pevents, Token(1))[0], Event::new(Ready::writable(), Token(1)));
+    expect_events(&poll, &mut pevents, 2, vec![
+        Event::new(Ready::readable(), Token(0)),
+        Event::new(Ready::writable(), Token(1)),
+    ]);
 
     // Server side of socket
     let (mut s1_tx, _) = l.accept().unwrap();
@@ -87,10 +87,9 @@
     // Sleep a bit to ensure it arrives at dest
     sleep_ms(250);
 
-    poll.poll(&mut pevents, Some(Duration::from_millis(MS))).unwrap();
-    let events = filter(&pevents, Token(1));
-    assert_eq!(events.len(), 1);
-    assert_eq!(events[0], Event::new(Ready::writable(), Token(1)));
+    expect_events(&poll, &mut pevents, 2, vec![
+        Event::new(Ready::writable(), Token(1))
+    ]);
 
     // Register the socket
     poll.register(&s1_tx, Token(123), Ready::readable(), PollOpt::edge()).unwrap();
@@ -107,10 +106,9 @@
     debug!("looking at rx end ----------");
 
     // Poll rx end
-    poll.poll(&mut pevents, Some(Duration::from_millis(MS))).unwrap();
-    let events = filter(&pevents, Token(1));
-    assert!(events.len() == 1, "actual={:?}", events);
-    assert_eq!(events[0], Event::new(Ready::readable() | Ready::writable(), Token(1)));
+    expect_events(&poll, &mut pevents, 2, vec![
+        Event::new(Ready::readable(), Token(1))
+    ]);
 
     debug!("reading ----------");
 
@@ -121,12 +119,10 @@
 
     assert_eq!(res, b"hello world!");
 
-    debug!("checking read is gone ----------");
+    debug!("checking just read ----------");
 
-    poll.poll(&mut pevents, Some(Duration::from_millis(MS))).unwrap();
-    let events = filter(&pevents, Token(1));
-    assert!(events.len() == 1);
-    assert_eq!(events[0], Event::new(Ready::writable(), Token(1)));
+    expect_events(&poll, &mut pevents, 1, vec![
+        Event::new(Ready::writable(), Token(1))]);
 
     // Closing the socket clears all active level events
     drop(s1);
diff --git a/test/test_udp_level.rs b/test/test_udp_level.rs
index 454dde8..43055ee 100644
--- a/test/test_udp_level.rs
+++ b/test/test_udp_level.rs
@@ -1,14 +1,13 @@
 use mio::*;
 use mio::udp::*;
-use sleep_ms;
-use std::time::Duration;
-
-const MS: u64 = 1_000;
+use {expect_events, sleep_ms};
 
 #[test]
 pub fn test_udp_level_triggered() {
     let poll = Poll::new().unwrap();
+    let poll = &poll;
     let mut events = Events::with_capacity(1024);
+    let events = &mut events;
 
     // Create the listener
     let tx = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
@@ -17,16 +16,12 @@
     poll.register(&tx, Token(0), Ready::all(), PollOpt::level()).unwrap();
     poll.register(&rx, Token(1), Ready::all(), PollOpt::level()).unwrap();
 
+
     for _ in 0..2 {
-        poll.poll(&mut events, Some(Duration::from_millis(MS))).unwrap();
-
-        let tx_events = filter(&events, Token(0));
-        assert_eq!(1, tx_events.len());
-        assert_eq!(tx_events[0], Event::new(Ready::writable(), Token(0)));
-
-        let rx_events = filter(&events, Token(1));
-        assert_eq!(1, rx_events.len());
-        assert_eq!(rx_events[0], Event::new(Ready::writable(), Token(1)));
+        expect_events(poll, events, 2, vec![
+            Event::new(Ready::writable(), Token(0)),
+            Event::new(Ready::writable(), Token(1)),
+        ]);
     }
 
     tx.send_to(b"hello world!", &rx.local_addr().unwrap()).unwrap();
@@ -34,40 +29,23 @@
     sleep_ms(250);
 
     for _ in 0..2 {
-        poll.poll(&mut events, Some(Duration::from_millis(MS))).unwrap();
-        let rx_events = filter(&events, Token(1));
-        assert_eq!(1, rx_events.len());
-        assert_eq!(rx_events[0], Event::new(Ready::readable() | Ready::writable(), Token(1)));
+        expect_events(poll, events, 2, vec![
+            Event::new(Ready::readable() | Ready::writable(), Token(1))
+        ]);
     }
 
     let mut buf = [0; 200];
-    while rx.recv_from(&mut buf).unwrap().is_some() {
-    }
+    while rx.recv_from(&mut buf).unwrap().is_some() {}
 
     for _ in 0..2 {
-        poll.poll(&mut events, Some(Duration::from_millis(MS))).unwrap();
-        let rx_events = filter(&events, Token(1));
-        assert_eq!(1, rx_events.len());
-        assert_eq!(rx_events[0], Event::new(Ready::writable(), Token(1)));
+        expect_events(poll, events, 4, vec![Event::new(Ready::writable(), Token(1))]);
     }
 
     tx.send_to(b"hello world!", &rx.local_addr().unwrap()).unwrap();
     sleep_ms(250);
 
-    poll.poll(&mut events, Some(Duration::from_millis(MS))).unwrap();
-    let rx_events = filter(&events, Token(1));
-    assert_eq!(1, rx_events.len());
-    assert_eq!(rx_events[0], Event::new(Ready::readable() | Ready::writable(), Token(1)));
+    expect_events(poll, events, 10,
+                  vec![Event::new(Ready::readable() | Ready::writable(), Token(1))]);
 
     drop(rx);
-
-    poll.poll(&mut events, Some(Duration::from_millis(MS))).unwrap();
-    let rx_events = filter(&events, Token(1));
-    assert!(rx_events.is_empty());
-}
-
-fn filter(events: &Events, token: Token) -> Vec<Event> {
-    (0..events.len()).map(|i| events.get(i).unwrap())
-                     .filter(|e| e.token() == token)
-                     .collect()
 }
diff --git a/test/test_udp_socket.rs b/test/test_udp_socket.rs
index 4113a76..f3b4ac2 100644
--- a/test/test_udp_socket.rs
+++ b/test/test_udp_socket.rs
@@ -152,6 +152,7 @@
 
     let tx_addr = tx.local_addr().unwrap();
     let rx_addr = rx.local_addr().unwrap();
+
     assert!(tx.connect(rx_addr).is_ok());
     assert!(rx.connect(tx_addr).is_ok());