blob: bd81fbcd07906cb0e5a35f466b3c10a5427f07c4 [file] [log] [blame]
#![allow(deprecated)]
use std::{fmt, io};
use std::cell::UnsafeCell;
use std::os::windows::prelude::*;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
use std::time::Duration;
use lazycell::AtomicLazyCell;
use winapi::*;
use miow;
use miow::iocp::{CompletionPort, CompletionStatus};
use event_imp::{Event, Evented, Ready};
use poll::{self, Poll};
use sys::windows::buffer_pool::BufferPool;
use {Token, PollOpt};
/// Each Selector has a globally unique(ish) ID associated with it. This ID
/// gets tracked by `TcpStream`, `TcpListener`, etc... when they are first
/// registered with the `Selector`. If a type that is previously associated with
/// a `Selector` attempts to register itself with a different `Selector`, the
/// operation will return with an error. This matches windows behavior.
static NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT;
/// The guts of the Windows event loop, this is the struct which actually owns
/// a completion port.
///
/// Internally this is just an `Arc`, and this allows handing out references to
/// the internals to I/O handles registered on this selector. This is
/// required to schedule I/O operations independently of being inside the event
/// loop (e.g. when a call to `write` is seen we're not "in the event loop").
pub struct Selector {
inner: Arc<SelectorInner>,
}
struct SelectorInner {
/// Unique identifier of the `Selector`
id: usize,
/// The actual completion port that's used to manage all I/O
port: CompletionPort,
/// A pool of buffers usable by this selector.
///
/// Primitives will take buffers from this pool to perform I/O operations,
/// and once complete they'll be put back in.
buffers: Mutex<BufferPool>,
}
impl Selector {
pub fn new() -> io::Result<Selector> {
// offset by 1 to avoid choosing 0 as the id of a selector
let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1;
CompletionPort::new(1).map(|cp| {
Selector {
inner: Arc::new(SelectorInner {
id: id,
port: cp,
buffers: Mutex::new(BufferPool::new(256)),
}),
}
})
}
pub fn select(&self,
events: &mut Events,
awakener: Token,
timeout: Option<Duration>) -> io::Result<bool> {
trace!("select; timeout={:?}", timeout);
// Clear out the previous list of I/O events and get some more!
events.events.truncate(0);
trace!("polling IOCP");
let n = match self.inner.port.get_many(&mut events.statuses, timeout) {
Ok(statuses) => statuses.len(),
Err(ref e) if e.raw_os_error() == Some(WAIT_TIMEOUT as i32) => 0,
Err(e) => return Err(e),
};
let mut ret = false;
for status in events.statuses[..n].iter() {
// This should only ever happen from the awakener, and we should
// only ever have one awakener right now, so assert as such.
if status.overlapped() as usize == 0 {
assert_eq!(status.token(), usize::from(awakener));
ret = true;
continue;
}
let callback = unsafe {
(*(status.overlapped() as *mut Overlapped)).callback
};
trace!("select; -> got overlapped");
callback(status.entry());
}
trace!("returning");
Ok(ret)
}
/// Gets a reference to the underlying `CompletionPort` structure.
pub fn port(&self) -> &CompletionPort {
&self.inner.port
}
/// Gets a new reference to this selector, although all underlying data
/// structures will refer to the same completion port.
pub fn clone_ref(&self) -> Selector {
Selector { inner: self.inner.clone() }
}
/// Return the `Selector`'s identifier
pub fn id(&self) -> usize {
self.inner.id
}
}
impl SelectorInner {
fn identical(&self, other: &SelectorInner) -> bool {
(self as *const SelectorInner) == (other as *const SelectorInner)
}
}
// A registration is stored in each I/O object which keeps track of how it is
// associated with a `Selector` above.
//
// Once associated with a `Selector`, a registration can never be un-associated
// (due to IOCP requirements). This is actually implemented through the
// `poll::Registration` and `poll::SetReadiness` APIs to keep track of all the
// level/edge/filtering business.
/// A `Binding` is embedded in all I/O objects associated with a `Poll`
/// object.
///
/// Each registration keeps track of which selector the I/O object is
/// associated with, ensuring that implementations of `Evented` can be
/// conformant for the various methods on Windows.
///
/// If you're working with custom IOCP-enabled objects then you'll want to
/// ensure that one of these instances is stored in your object and used in the
/// implementation of `Evented`.
///
/// For more information about how to use this see the `windows` module
/// documentation in this crate.
pub struct Binding {
selector: AtomicLazyCell<Arc<SelectorInner>>,
}
impl Binding {
/// Creates a new blank binding ready to be inserted into an I/O
/// object.
///
/// Won't actually do anything until associated with a `Poll` loop.
pub fn new() -> Binding {
Binding { selector: AtomicLazyCell::new() }
}
/// Registers a new handle with the `Poll` specified, also assigning the
/// `token` specified.
///
/// This function is intended to be used as part of `Evented::register` for
/// custom IOCP objects. It will add the specified handle to the internal
/// IOCP object with the provided `token`. All future events generated by
/// the handled provided will be received by the `Poll`'s internal IOCP
/// object.
///
/// # Unsafety
///
/// This function is unsafe as the `Poll` instance has assumptions about
/// what the `OVERLAPPED` pointer used for each I/O operation looks like.
/// Specifically they must all be instances of the `Overlapped` type in
/// this crate. More information about this can be found on the
/// `windows` module in this crate.
pub unsafe fn register_handle(&self,
handle: &AsRawHandle,
token: Token,
poll: &Poll) -> io::Result<()> {
let selector = poll::selector(poll);
// Ignore errors, we'll see them on the next line.
drop(self.selector.fill(selector.inner.clone()));
self.check_same_selector(poll)?;
selector.inner.port.add_handle(usize::from(token), handle)
}
/// Same as `register_handle` but for sockets.
pub unsafe fn register_socket(&self,
handle: &AsRawSocket,
token: Token,
poll: &Poll) -> io::Result<()> {
let selector = poll::selector(poll);
drop(self.selector.fill(selector.inner.clone()));
self.check_same_selector(poll)?;
selector.inner.port.add_socket(usize::from(token), handle)
}
/// Reregisters the handle provided from the `Poll` provided.
///
/// This is intended to be used as part of `Evented::reregister` but note
/// that this function does not currently reregister the provided handle
/// with the `poll` specified. IOCP has a special binding for changing the
/// token which has not yet been implemented. Instead this function should
/// be used to assert that the call to `reregister` happened on the same
/// `Poll` that was passed into to `register`.
///
/// Eventually, though, the provided `handle` will be re-assigned to have
/// the token `token` on the given `poll` assuming that it's been
/// previously registered with it.
///
/// # Unsafety
///
/// This function is unsafe for similar reasons to `register`. That is,
/// there may be pending I/O events and such which aren't handled correctly.
pub unsafe fn reregister_handle(&self,
_handle: &AsRawHandle,
_token: Token,
poll: &Poll) -> io::Result<()> {
self.check_same_selector(poll)
}
/// Same as `reregister_handle`, but for sockets.
pub unsafe fn reregister_socket(&self,
_socket: &AsRawSocket,
_token: Token,
poll: &Poll) -> io::Result<()> {
self.check_same_selector(poll)
}
/// Deregisters the handle provided from the `Poll` provided.
///
/// This is intended to be used as part of `Evented::deregister` but note
/// that this function does not currently deregister the provided handle
/// from the `poll` specified. IOCP has a special binding for that which has
/// not yet been implemented. Instead this function should be used to assert
/// that the call to `deregister` happened on the same `Poll` that was
/// passed into to `register`.
///
/// # Unsafety
///
/// This function is unsafe for similar reasons to `register`. That is,
/// there may be pending I/O events and such which aren't handled correctly.
pub unsafe fn deregister_handle(&self,
_handle: &AsRawHandle,
poll: &Poll) -> io::Result<()> {
self.check_same_selector(poll)
}
/// Same as `deregister_handle`, but for sockets.
pub unsafe fn deregister_socket(&self,
_socket: &AsRawSocket,
poll: &Poll) -> io::Result<()> {
self.check_same_selector(poll)
}
fn check_same_selector(&self, poll: &Poll) -> io::Result<()> {
let selector = poll::selector(poll);
match self.selector.borrow() {
Some(prev) if prev.identical(&selector.inner) => Ok(()),
Some(_) |
None => Err(other("socket already registered")),
}
}
}
impl fmt::Debug for Binding {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Binding")
.finish()
}
}
/// Helper struct used for TCP and UDP which bundles a `binding` with a
/// `SetReadiness` handle.
pub struct ReadyBinding {
binding: Binding,
readiness: Option<poll::SetReadiness>,
}
impl ReadyBinding {
/// Creates a new blank binding ready to be inserted into an I/O object.
///
/// Won't actually do anything until associated with an `Selector` loop.
pub fn new() -> ReadyBinding {
ReadyBinding {
binding: Binding::new(),
readiness: None,
}
}
/// Returns whether this binding has been associated with a selector
/// yet.
pub fn registered(&self) -> bool {
self.readiness.is_some()
}
/// Acquires a buffer with at least `size` capacity.
///
/// If associated with a selector, this will attempt to pull a buffer from
/// that buffer pool. If not associated with a selector, this will allocate
/// a fresh buffer.
pub fn get_buffer(&self, size: usize) -> Vec<u8> {
match self.binding.selector.borrow() {
Some(i) => i.buffers.lock().unwrap().get(size),
None => Vec::with_capacity(size),
}
}
/// Returns a buffer to this binding.
///
/// If associated with a selector, this will push the buffer back into the
/// selector's pool of buffers. Otherwise this will just drop the buffer.
pub fn put_buffer(&self, buf: Vec<u8>) {
if let Some(i) = self.binding.selector.borrow() {
i.buffers.lock().unwrap().put(buf);
}
}
/// Sets the readiness of this I/O object to a particular `set`.
///
/// This is later used to fill out and respond to requests to `poll`. Note
/// that this is all implemented through the `SetReadiness` structure in the
/// `poll` module.
pub fn set_readiness(&self, set: Ready) {
if let Some(ref i) = self.readiness {
trace!("set readiness to {:?}", set);
i.set_readiness(set).expect("event loop disappeared?");
}
}
/// Queries what the current readiness of this I/O object is.
///
/// This is what's being used to generate events returned by `poll`.
pub fn readiness(&self) -> Ready {
match self.readiness {
Some(ref i) => i.readiness(),
None => Ready::empty(),
}
}
/// Implementation of the `Evented::register` function essentially.
///
/// Returns an error if we're already registered with another event loop,
/// and otherwise just reassociates ourselves with the event loop to
/// possible change tokens.
pub fn register_socket(&mut self,
socket: &AsRawSocket,
poll: &Poll,
token: Token,
events: Ready,
opts: PollOpt,
registration: &Mutex<Option<poll::Registration>>)
-> io::Result<()> {
trace!("register {:?} {:?}", token, events);
unsafe {
self.binding.register_socket(socket, token, poll)?;
}
let (r, s) = poll::new_registration(poll, token, events, opts);
self.readiness = Some(s);
*registration.lock().unwrap() = Some(r);
Ok(())
}
/// Implementation of `Evented::reregister` function.
pub fn reregister_socket(&mut self,
socket: &AsRawSocket,
poll: &Poll,
token: Token,
events: Ready,
opts: PollOpt,
registration: &Mutex<Option<poll::Registration>>)
-> io::Result<()> {
trace!("reregister {:?} {:?}", token, events);
unsafe {
self.binding.reregister_socket(socket, token, poll)?;
}
registration.lock().unwrap()
.as_mut().unwrap()
.reregister(poll, token, events, opts)
}
/// Implementation of the `Evented::deregister` function.
///
/// Doesn't allow registration with another event loop, just shuts down
/// readiness notifications and such.
pub fn deregister(&mut self,
socket: &AsRawSocket,
poll: &Poll,
registration: &Mutex<Option<poll::Registration>>)
-> io::Result<()> {
trace!("deregistering");
unsafe {
self.binding.deregister_socket(socket, poll)?;
}
registration.lock().unwrap()
.as_ref().unwrap()
.deregister(poll)
}
}
fn other(s: &str) -> io::Error {
io::Error::new(io::ErrorKind::Other, s)
}
#[derive(Debug)]
pub struct Events {
/// Raw I/O event completions are filled in here by the call to `get_many`
/// on the completion port above. These are then processed to run callbacks
/// which figure out what to do after the event is done.
statuses: Box<[CompletionStatus]>,
/// Literal events returned by `get` to the upwards `EventLoop`. This file
/// doesn't really modify this (except for the awakener), instead almost all
/// events are filled in by the `ReadinessQueue` from the `poll` module.
events: Vec<Event>,
}
impl Events {
pub fn with_capacity(cap: usize) -> Events {
// Note that it's possible for the output `events` to grow beyond the
// capacity as it can also include deferred events, but that's certainly
// not the end of the world!
Events {
statuses: vec![CompletionStatus::zero(); cap].into_boxed_slice(),
events: Vec::with_capacity(cap),
}
}
pub fn is_empty(&self) -> bool {
self.events.is_empty()
}
pub fn len(&self) -> usize {
self.events.len()
}
pub fn capacity(&self) -> usize {
self.events.capacity()
}
pub fn get(&self, idx: usize) -> Option<Event> {
self.events.get(idx).map(|e| *e)
}
pub fn push_event(&mut self, event: Event) {
self.events.push(event);
}
}
macro_rules! overlapped2arc {
($e:expr, $t:ty, $($field:ident).+) => ({
let offset = offset_of!($t, $($field).+);
debug_assert!(offset < mem::size_of::<$t>());
FromRawArc::from_raw(($e as usize - offset) as *mut $t)
})
}
macro_rules! offset_of {
($t:ty, $($field:ident).+) => (
&(*(0 as *const $t)).$($field).+ as *const _ as usize
)
}
// See sys::windows module docs for why this exists.
//
// The gist of it is that `Selector` assumes that all `OVERLAPPED` pointers are
// actually inside one of these structures so it can use the `Callback` stored
// right after it.
//
// We use repr(C) here to ensure that we can assume the overlapped pointer is
// at the start of the structure so we can just do a cast.
/// A wrapper around an internal instance over `miow::Overlapped` which is in
/// turn a wrapper around the Windows type `OVERLAPPED`.
///
/// This type is required to be used for all IOCP operations on handles that are
/// registered with an event loop. The event loop will receive notifications
/// over `OVERLAPPED` pointers that have completed, and it will cast that
/// pointer to a pointer to this structure and invoke the associated callback.
#[repr(C)]
pub struct Overlapped {
inner: UnsafeCell<miow::Overlapped>,
callback: fn(&OVERLAPPED_ENTRY),
}
impl Overlapped {
/// Creates a new `Overlapped` which will invoke the provided `cb` callback
/// whenever it's triggered.
///
/// The returned `Overlapped` must be used as the `OVERLAPPED` passed to all
/// I/O operations that are registered with mio's event loop. When the I/O
/// operation associated with an `OVERLAPPED` pointer completes the event
/// loop will invoke the function pointer provided by `cb`.
pub fn new(cb: fn(&OVERLAPPED_ENTRY)) -> Overlapped {
Overlapped {
inner: UnsafeCell::new(miow::Overlapped::zero()),
callback: cb,
}
}
/// Get the underlying `Overlapped` instance as a raw pointer.
///
/// This can be useful when only a shared borrow is held and the overlapped
/// pointer needs to be passed down to winapi.
pub fn as_mut_ptr(&self) -> *mut OVERLAPPED {
unsafe {
(*self.inner.get()).raw()
}
}
}
impl fmt::Debug for Overlapped {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Overlapped")
.finish()
}
}
// Overlapped's APIs are marked as unsafe Overlapped's APIs are marked as
// unsafe as they must be used with caution to ensure thread safety. The
// structure itself is safe to send across threads.
unsafe impl Send for Overlapped {}
unsafe impl Sync for Overlapped {}