blob: 9fb615bc3dcea76777e83c7eb0c238c85051241d [file] [log] [blame]
use std::collections::{HashMap, HashSet, VecDeque};
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Mutex, Weak};
use std::time::{Duration, Instant};
use futures::{Future, Async, Poll};
use futures::sync::oneshot;
#[cfg(feature = "runtime")]
use tokio_timer::Interval;
use common::Exec;
use super::Ver;
// FIXME: allow() required due to `impl Trait` leaking types to this lint
#[allow(missing_debug_implementations)]
pub(super) struct Pool<T> {
// If the pool is disabled, this is None.
inner: Option<Arc<Mutex<PoolInner<T>>>>,
}
// Before using a pooled connection, make sure the sender is not dead.
//
// This is a trait to allow the `client::pool::tests` to work for `i32`.
//
// See https://github.com/hyperium/hyper/issues/1429
pub(super) trait Poolable: Send + Sized + 'static {
fn is_open(&self) -> bool;
/// Reserve this connection.
///
/// Allows for HTTP/2 to return a shared reservation.
fn reserve(self) -> Reservation<Self>;
fn can_share(&self) -> bool;
}
/// When checking out a pooled connection, it might be that the connection
/// only supports a single reservation, or it might be usable for many.
///
/// Specifically, HTTP/1 requires a unique reservation, but HTTP/2 can be
/// used for multiple requests.
// FIXME: allow() required due to `impl Trait` leaking types to this lint
#[allow(missing_debug_implementations)]
pub(super) enum Reservation<T> {
/// This connection could be used multiple times, the first one will be
/// reinserted into the `idle` pool, and the second will be given to
/// the `Checkout`.
Shared(T, T),
/// This connection requires unique access. It will be returned after
/// use is complete.
Unique(T),
}
/// Simple type alias in case the key type needs to be adjusted.
pub(super) type Key = Arc<String>;
struct PoolInner<T> {
// A flag that a connection is being estabilished, and the connection
// should be shared. This prevents making multiple HTTP/2 connections
// to the same host.
connecting: HashSet<Key>,
// These are internal Conns sitting in the event loop in the KeepAlive
// state, waiting to receive a new Request to send on the socket.
idle: HashMap<Key, Vec<Idle<T>>>,
max_idle_per_host: usize,
// These are outstanding Checkouts that are waiting for a socket to be
// able to send a Request one. This is used when "racing" for a new
// connection.
//
// The Client starts 2 tasks, 1 to connect a new socket, and 1 to wait
// for the Pool to receive an idle Conn. When a Conn becomes idle,
// this list is checked for any parked Checkouts, and tries to notify
// them that the Conn could be used instead of waiting for a brand new
// connection.
waiters: HashMap<Key, VecDeque<oneshot::Sender<T>>>,
// A oneshot channel is used to allow the interval to be notified when
// the Pool completely drops. That way, the interval can cancel immediately.
#[cfg(feature = "runtime")]
idle_interval_ref: Option<oneshot::Sender<::common::Never>>,
#[cfg(feature = "runtime")]
exec: Exec,
timeout: Option<Duration>,
}
// This is because `Weak::new()` *allocates* space for `T`, even if it
// doesn't need it!
struct WeakOpt<T>(Option<Weak<T>>);
// Newtypes to act as keyword arguments for `Pool::new`...
// FIXME: allow() required due to `impl Trait` leaking types to this lint
#[allow(missing_debug_implementations)]
pub(super) struct Enabled(pub(super) bool);
// FIXME: allow() required due to `impl Trait` leaking types to this lint
#[allow(missing_debug_implementations)]
pub(super) struct IdleTimeout(pub(super) Option<Duration>);
// FIXME: allow() required due to `impl Trait` leaking types to this lint
#[allow(missing_debug_implementations)]
pub(super) struct MaxIdlePerHost(pub(super) usize);
impl<T> Pool<T> {
pub fn new(enabled: Enabled, timeout: IdleTimeout, max_idle: MaxIdlePerHost, __exec: &Exec) -> Pool<T> {
let inner = if enabled.0 {
Some(Arc::new(Mutex::new(PoolInner {
connecting: HashSet::new(),
idle: HashMap::new(),
#[cfg(feature = "runtime")]
idle_interval_ref: None,
max_idle_per_host: max_idle.0,
waiters: HashMap::new(),
#[cfg(feature = "runtime")]
exec: __exec.clone(),
timeout: timeout.0,
})))
} else {
None
};
Pool {
inner,
}
}
fn is_enabled(&self) -> bool {
self.inner.is_some()
}
#[cfg(test)]
pub(super) fn no_timer(&self) {
// Prevent an actual interval from being created for this pool...
#[cfg(feature = "runtime")]
{
let mut inner = self.inner.as_ref().unwrap().lock().unwrap();
assert!(inner.idle_interval_ref.is_none(), "timer already spawned");
let (tx, _) = oneshot::channel();
inner.idle_interval_ref = Some(tx);
}
}
}
impl<T: Poolable> Pool<T> {
/// Returns a `Checkout` which is a future that resolves if an idle
/// connection becomes available.
pub fn checkout(&self, key: Key) -> Checkout<T> {
Checkout {
key,
pool: self.clone(),
waiter: None,
}
}
/// Ensure that there is only ever 1 connecting task for HTTP/2
/// connections. This does nothing for HTTP/1.
pub(super) fn connecting(&self, key: &Key, ver: Ver) -> Option<Connecting<T>> {
if ver == Ver::Http2 {
if let Some(ref enabled) = self.inner {
let mut inner = enabled.lock().unwrap();
return if inner.connecting.insert(key.clone()) {
let connecting = Connecting {
key: key.clone(),
pool: WeakOpt::downgrade(enabled),
};
Some(connecting)
} else {
trace!("HTTP/2 connecting already in progress for {:?}", key);
None
};
}
}
// else
Some(Connecting {
key: key.clone(),
// in HTTP/1's case, there is never a lock, so we don't
// need to do anything in Drop.
pool: WeakOpt::none(),
})
}
#[cfg(test)]
fn locked(&self) -> ::std::sync::MutexGuard<PoolInner<T>> {
self
.inner
.as_ref()
.expect("enabled")
.lock()
.expect("lock")
}
#[cfg(feature = "runtime")]
#[cfg(test)]
pub(super) fn h1_key(&self, s: &str) -> Key {
Arc::new(s.to_string())
}
#[cfg(feature = "runtime")]
#[cfg(test)]
pub(super) fn idle_count(&self, key: &Key) -> usize {
self
.locked()
.idle
.get(key)
.map(|list| list.len())
.unwrap_or(0)
}
fn take(&self, key: &Key) -> Option<Pooled<T>> {
let entry = {
let mut inner = self.inner.as_ref()?.lock().unwrap();
let expiration = Expiration::new(inner.timeout);
let maybe_entry = inner.idle.get_mut(key)
.and_then(|list| {
trace!("take? {:?}: expiration = {:?}", key, expiration.0);
// A block to end the mutable borrow on list,
// so the map below can check is_empty()
{
let popper = IdlePopper {
key,
list,
};
popper.pop(&expiration)
}
.map(|e| (e, list.is_empty()))
});
let (entry, empty) = if let Some((e, empty)) = maybe_entry {
(Some(e), empty)
} else {
// No entry found means nuke the list for sure.
(None, true)
};
if empty {
//TODO: This could be done with the HashMap::entry API instead.
inner.idle.remove(key);
}
entry
};
entry.map(|e| self.reuse(key, e.value))
}
pub(super) fn pooled(&self, mut connecting: Connecting<T>, value: T) -> Pooled<T> {
let (value, pool_ref) = if let Some(ref enabled) = self.inner {
match value.reserve() {
Reservation::Shared(to_insert, to_return) => {
let mut inner = enabled.lock().unwrap();
inner.put(connecting.key.clone(), to_insert, enabled);
// Do this here instead of Drop for Connecting because we
// already have a lock, no need to lock the mutex twice.
inner.connected(&connecting.key);
// prevent the Drop of Connecting from repeating inner.connected()
connecting.pool = WeakOpt::none();
// Shared reservations don't need a reference to the pool,
// since the pool always keeps a copy.
(to_return, WeakOpt::none())
},
Reservation::Unique(value) => {
// Unique reservations must take a reference to the pool
// since they hope to reinsert once the reservation is
// completed
(value, WeakOpt::downgrade(enabled))
},
}
} else {
// If pool is not enabled, skip all the things...
// The Connecting should have had no pool ref
debug_assert!(connecting.pool.upgrade().is_none());
(value, WeakOpt::none())
};
Pooled {
key: connecting.key.clone(),
is_reused: false,
pool: pool_ref,
value: Some(value)
}
}
fn reuse(&self, key: &Key, value: T) -> Pooled<T> {
debug!("reuse idle connection for {:?}", key);
// TODO: unhack this
// In Pool::pooled(), which is used for inserting brand new connections,
// there's some code that adjusts the pool reference taken depending
// on if the Reservation can be shared or is unique. By the time
// reuse() is called, the reservation has already been made, and
// we just have the final value, without knowledge of if this is
// unique or shared. So, the hack is to just assume Ver::Http2 means
// shared... :(
let mut pool_ref = WeakOpt::none();
if !value.can_share() {
if let Some(ref enabled) = self.inner {
pool_ref = WeakOpt::downgrade(enabled);
}
}
Pooled {
is_reused: true,
key: key.clone(),
pool: pool_ref,
value: Some(value),
}
}
fn waiter(&self, key: Key, tx: oneshot::Sender<T>) {
debug_assert!(
self.is_enabled(),
"pool.waiter() should not be called if disabled",
);
trace!("checkout waiting for idle connection: {:?}", key);
self.inner
.as_ref()
.expect("pool.waiter() expects pool is enabled")
.lock()
.unwrap()
.waiters
.entry(key)
.or_insert(VecDeque::new())
.push_back(tx);
}
}
/// Pop off this list, looking for a usable connection that hasn't expired.
struct IdlePopper<'a, T: 'a> {
key: &'a Key,
list: &'a mut Vec<Idle<T>>,
}
impl<'a, T: Poolable + 'a> IdlePopper<'a, T> {
fn pop(self, expiration: &Expiration) -> Option<Idle<T>> {
while let Some(entry) = self.list.pop() {
// If the connection has been closed, or is older than our idle
// timeout, simply drop it and keep looking...
if !entry.value.is_open() {
trace!("removing closed connection for {:?}", self.key);
continue;
}
// TODO: Actually, since the `idle` list is pushed to the end always,
// that would imply that if *this* entry is expired, then anything
// "earlier" in the list would *have* to be expired also... Right?
//
// In that case, we could just break out of the loop and drop the
// whole list...
if expiration.expires(entry.idle_at) {
trace!("removing expired connection for {:?}", self.key);
continue;
}
let value = match entry.value.reserve() {
Reservation::Shared(to_reinsert, to_checkout) => {
self.list.push(Idle {
idle_at: Instant::now(),
value: to_reinsert,
});
to_checkout
},
Reservation::Unique(unique) => {
unique
}
};
return Some(Idle {
idle_at: entry.idle_at,
value,
});
}
None
}
}
impl<T: Poolable> PoolInner<T> {
fn put(&mut self, key: Key, value: T, __pool_ref: &Arc<Mutex<PoolInner<T>>>) {
if value.can_share() && self.idle.contains_key(&key) {
trace!("put; existing idle HTTP/2 connection for {:?}", key);
return;
}
trace!("put; add idle connection for {:?}", key);
let mut remove_waiters = false;
let mut value = Some(value);
if let Some(waiters) = self.waiters.get_mut(&key) {
while let Some(tx) = waiters.pop_front() {
if !tx.is_canceled() {
let reserved = value.take().expect("value already sent");
let reserved = match reserved.reserve() {
Reservation::Shared(to_keep, to_send) => {
value = Some(to_keep);
to_send
},
Reservation::Unique(uniq) => uniq,
};
match tx.send(reserved) {
Ok(()) => {
if value.is_none() {
break;
} else {
continue;
}
},
Err(e) => {
value = Some(e);
}
}
}
trace!("put; removing canceled waiter for {:?}", key);
}
remove_waiters = waiters.is_empty();
}
if remove_waiters {
self.waiters.remove(&key);
}
match value {
Some(value) => {
// borrow-check scope...
{
let idle_list = self
.idle
.entry(key.clone())
.or_insert(Vec::new());
if self.max_idle_per_host <= idle_list.len() {
trace!("max idle per host for {:?}, dropping connection", key);
return;
}
debug!("pooling idle connection for {:?}", key);
idle_list.push(Idle {
value: value,
idle_at: Instant::now(),
});
}
#[cfg(feature = "runtime")]
{
self.spawn_idle_interval(__pool_ref);
}
}
None => trace!("put; found waiter for {:?}", key),
}
}
/// A `Connecting` task is complete. Not necessarily successfully,
/// but the lock is going away, so clean up.
fn connected(&mut self, key: &Key) {
let existed = self.connecting.remove(key);
debug_assert!(
existed,
"Connecting dropped, key not in pool.connecting"
);
// cancel any waiters. if there are any, it's because
// this Connecting task didn't complete successfully.
// those waiters would never receive a connection.
self.waiters.remove(key);
}
#[cfg(feature = "runtime")]
fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T>>>) {
let (dur, rx) = {
if self.idle_interval_ref.is_some() {
return;
}
if let Some(dur) = self.timeout {
let (tx, rx) = oneshot::channel();
self.idle_interval_ref = Some(tx);
(dur, rx)
} else {
return
}
};
let start = Instant::now() + dur;
let interval = IdleInterval {
interval: Interval::new(start, dur),
pool: WeakOpt::downgrade(pool_ref),
pool_drop_notifier: rx,
};
if let Err(err) = self.exec.execute(interval) {
// This task isn't critical, so simply log and ignore.
warn!("error spawning connection pool idle interval: {}", err);
}
}
}
impl<T> PoolInner<T> {
/// Any `FutureResponse`s that were created will have made a `Checkout`,
/// and possibly inserted into the pool that it is waiting for an idle
/// connection. If a user ever dropped that future, we need to clean out
/// those parked senders.
fn clean_waiters(&mut self, key: &Key) {
let mut remove_waiters = false;
if let Some(waiters) = self.waiters.get_mut(key) {
waiters.retain(|tx| {
!tx.is_canceled()
});
remove_waiters = waiters.is_empty();
}
if remove_waiters {
self.waiters.remove(key);
}
}
}
#[cfg(feature = "runtime")]
impl<T: Poolable> PoolInner<T> {
/// This should *only* be called by the IdleInterval.
fn clear_expired(&mut self) {
let dur = self.timeout.expect("interval assumes timeout");
let now = Instant::now();
//self.last_idle_check_at = now;
self.idle.retain(|key, values| {
values.retain(|entry| {
if !entry.value.is_open() {
trace!("idle interval evicting closed for {:?}", key);
return false;
}
if now - entry.idle_at > dur {
trace!("idle interval evicting expired for {:?}", key);
return false;
}
// Otherwise, keep this value...
true
});
// returning false evicts this key/val
!values.is_empty()
});
}
}
impl<T> Clone for Pool<T> {
fn clone(&self) -> Pool<T> {
Pool {
inner: self.inner.clone(),
}
}
}
/// A wrapped poolable value that tries to reinsert to the Pool on Drop.
// Note: The bounds `T: Poolable` is needed for the Drop impl.
pub(super) struct Pooled<T: Poolable> {
value: Option<T>,
is_reused: bool,
key: Key,
pool: WeakOpt<Mutex<PoolInner<T>>>,
}
impl<T: Poolable> Pooled<T> {
pub fn is_reused(&self) -> bool {
self.is_reused
}
pub fn is_pool_enabled(&self) -> bool {
self.pool.0.is_some()
}
fn as_ref(&self) -> &T {
self.value.as_ref().expect("not dropped")
}
fn as_mut(&mut self) -> &mut T {
self.value.as_mut().expect("not dropped")
}
}
impl<T: Poolable> Deref for Pooled<T> {
type Target = T;
fn deref(&self) -> &T {
self.as_ref()
}
}
impl<T: Poolable> DerefMut for Pooled<T> {
fn deref_mut(&mut self) -> &mut T {
self.as_mut()
}
}
impl<T: Poolable> Drop for Pooled<T> {
fn drop(&mut self) {
if let Some(value) = self.value.take() {
if !value.is_open() {
// If we *already* know the connection is done here,
// it shouldn't be re-inserted back into the pool.
return;
}
if let Some(pool) = self.pool.upgrade() {
if let Ok(mut inner) = pool.lock() {
inner.put(self.key.clone(), value, &pool);
}
} else if !value.can_share() {
trace!("pool dropped, dropping pooled ({:?})", self.key);
}
// Ver::Http2 is already in the Pool (or dead), so we wouldn't
// have an actual reference to the Pool.
}
}
}
impl<T: Poolable> fmt::Debug for Pooled<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Pooled")
.field("key", &self.key)
.finish()
}
}
struct Idle<T> {
idle_at: Instant,
value: T,
}
// FIXME: allow() required due to `impl Trait` leaking types to this lint
#[allow(missing_debug_implementations)]
pub(super) struct Checkout<T> {
key: Key,
pool: Pool<T>,
waiter: Option<oneshot::Receiver<T>>,
}
impl<T: Poolable> Checkout<T> {
fn poll_waiter(&mut self) -> Poll<Option<Pooled<T>>, ::Error> {
static CANCELED: &str = "pool checkout failed";
if let Some(mut rx) = self.waiter.take() {
match rx.poll() {
Ok(Async::Ready(value)) => {
if value.is_open() {
Ok(Async::Ready(Some(self.pool.reuse(&self.key, value))))
} else {
Err(::Error::new_canceled(Some(CANCELED)))
}
},
Ok(Async::NotReady) => {
self.waiter = Some(rx);
Ok(Async::NotReady)
},
Err(_canceled) => Err(::Error::new_canceled(Some(CANCELED))),
}
} else {
Ok(Async::Ready(None))
}
}
fn add_waiter(&mut self) {
debug_assert!(self.pool.is_enabled());
if self.waiter.is_none() {
let (tx, mut rx) = oneshot::channel();
let _ = rx.poll(); // park this task
self.pool.waiter(self.key.clone(), tx);
self.waiter = Some(rx);
}
}
}
impl<T: Poolable> Future for Checkout<T> {
type Item = Pooled<T>;
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(pooled) = try_ready!(self.poll_waiter()) {
return Ok(Async::Ready(pooled));
}
let entry = self.pool.take(&self.key);
if let Some(pooled) = entry {
Ok(Async::Ready(pooled))
} else if !self.pool.is_enabled() {
Err(::Error::new_canceled(Some("pool is disabled")))
} else {
self.add_waiter();
Ok(Async::NotReady)
}
}
}
impl<T> Drop for Checkout<T> {
fn drop(&mut self) {
if self.waiter.take().is_some() {
if let Some(Ok(mut inner)) = self.pool.inner.as_ref().map(|i| i.lock()) {
inner.clean_waiters(&self.key);
}
}
}
}
// FIXME: allow() required due to `impl Trait` leaking types to this lint
#[allow(missing_debug_implementations)]
pub(super) struct Connecting<T: Poolable> {
key: Key,
pool: WeakOpt<Mutex<PoolInner<T>>>,
}
impl<T: Poolable> Connecting<T> {
pub(super) fn alpn_h2(self, pool: &Pool<T>) -> Option<Self> {
debug_assert!(
self.pool.0.is_none(),
"Connecting::alpn_h2 but already Http2"
);
pool.connecting(&self.key, Ver::Http2)
}
}
impl<T: Poolable> Drop for Connecting<T> {
fn drop(&mut self) {
if let Some(pool) = self.pool.upgrade() {
// No need to panic on drop, that could abort!
if let Ok(mut inner) = pool.lock() {
inner.connected(&self.key);
}
}
}
}
struct Expiration(Option<Duration>);
impl Expiration {
fn new(dur: Option<Duration>) -> Expiration {
Expiration(dur)
}
fn expires(&self, instant: Instant) -> bool {
match self.0 {
Some(timeout) => instant.elapsed() > timeout,
None => false,
}
}
}
#[cfg(feature = "runtime")]
struct IdleInterval<T> {
interval: Interval,
pool: WeakOpt<Mutex<PoolInner<T>>>,
// This allows the IdleInterval to be notified as soon as the entire
// Pool is fully dropped, and shutdown. This channel is never sent on,
// but Err(Canceled) will be received when the Pool is dropped.
pool_drop_notifier: oneshot::Receiver<::common::Never>,
}
#[cfg(feature = "runtime")]
impl<T: Poolable + 'static> Future for IdleInterval<T> {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// Interval is a Stream
use futures::Stream;
loop {
match self.pool_drop_notifier.poll() {
Ok(Async::Ready(n)) => match n {},
Ok(Async::NotReady) => (),
Err(_canceled) => {
trace!("pool closed, canceling idle interval");
return Ok(Async::Ready(()));
}
}
try_ready!(self.interval.poll().map_err(|err| {
error!("idle interval timer error: {}", err);
}));
if let Some(inner) = self.pool.upgrade() {
if let Ok(mut inner) = inner.lock() {
trace!("idle interval checking for expired");
inner.clear_expired();
continue;
}
}
return Ok(Async::Ready(()));
}
}
}
impl<T> WeakOpt<T> {
fn none() -> Self {
WeakOpt(None)
}
fn downgrade(arc: &Arc<T>) -> Self {
WeakOpt(Some(Arc::downgrade(arc)))
}
fn upgrade(&self) -> Option<Arc<T>> {
self.0
.as_ref()
.and_then(Weak::upgrade)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use futures::{Async, Future};
use futures::future;
use common::Exec;
use super::{Connecting, Key, Poolable, Pool, Reservation, WeakOpt};
/// Test unique reservations.
#[derive(Debug, PartialEq, Eq)]
struct Uniq<T>(T);
impl<T: Send + 'static> Poolable for Uniq<T> {
fn is_open(&self) -> bool {
true
}
fn reserve(self) -> Reservation<Self> {
Reservation::Unique(self)
}
fn can_share(&self) -> bool {
false
}
}
fn c<T: Poolable>(key: Key) -> Connecting<T> {
Connecting {
key,
pool: WeakOpt::none(),
}
}
fn pool_no_timer<T>() -> Pool<T> {
pool_max_idle_no_timer(::std::usize::MAX)
}
fn pool_max_idle_no_timer<T>(max_idle: usize) -> Pool<T> {
let pool = Pool::new(
super::Enabled(true),
super::IdleTimeout(Some(Duration::from_millis(100))),
super::MaxIdlePerHost(max_idle),
&Exec::Default,
);
pool.no_timer();
pool
}
#[test]
fn test_pool_checkout_smoke() {
let pool = pool_no_timer();
let key = Arc::new("foo".to_string());
let pooled = pool.pooled(c(key.clone()), Uniq(41));
drop(pooled);
match pool.checkout(key).poll().unwrap() {
Async::Ready(pooled) => assert_eq!(*pooled, Uniq(41)),
_ => panic!("not ready"),
}
}
#[test]
fn test_pool_checkout_returns_none_if_expired() {
future::lazy(|| {
let pool = pool_no_timer();
let key = Arc::new("foo".to_string());
let pooled = pool.pooled(c(key.clone()), Uniq(41));
drop(pooled);
::std::thread::sleep(pool.locked().timeout.unwrap());
assert!(pool.checkout(key).poll().unwrap().is_not_ready());
::futures::future::ok::<(), ()>(())
}).wait().unwrap();
}
#[test]
fn test_pool_checkout_removes_expired() {
future::lazy(|| {
let pool = pool_no_timer();
let key = Arc::new("foo".to_string());
pool.pooled(c(key.clone()), Uniq(41));
pool.pooled(c(key.clone()), Uniq(5));
pool.pooled(c(key.clone()), Uniq(99));
assert_eq!(pool.locked().idle.get(&key).map(|entries| entries.len()), Some(3));
::std::thread::sleep(pool.locked().timeout.unwrap());
// checkout.poll() should clean out the expired
pool.checkout(key.clone()).poll().unwrap();
assert!(pool.locked().idle.get(&key).is_none());
Ok::<(), ()>(())
}).wait().unwrap();
}
#[test]
fn test_pool_max_idle_per_host() {
future::lazy(|| {
let pool = pool_max_idle_no_timer(2);
let key = Arc::new("foo".to_string());
pool.pooled(c(key.clone()), Uniq(41));
pool.pooled(c(key.clone()), Uniq(5));
pool.pooled(c(key.clone()), Uniq(99));
// pooled and dropped 3, max_idle should only allow 2
assert_eq!(pool.locked().idle.get(&key).map(|entries| entries.len()), Some(2));
Ok::<(), ()>(())
}).wait().unwrap();
}
#[cfg(feature = "runtime")]
#[test]
fn test_pool_timer_removes_expired() {
use std::time::Instant;
use tokio_timer::Delay;
let mut rt = ::tokio::runtime::current_thread::Runtime::new().unwrap();
let pool = Pool::new(
super::Enabled(true),
super::IdleTimeout(Some(Duration::from_millis(100))),
super::MaxIdlePerHost(::std::usize::MAX),
&Exec::Default,
);
let key = Arc::new("foo".to_string());
// Since pool.pooled() will be calling spawn on drop, need to be sure
// those drops are called while `rt` is the current executor. To do so,
// call those inside a future.
rt.block_on(::futures::future::lazy(|| {
pool.pooled(c(key.clone()), Uniq(41));
pool.pooled(c(key.clone()), Uniq(5));
pool.pooled(c(key.clone()), Uniq(99));
Ok::<_, ()>(())
})).unwrap();
assert_eq!(pool.locked().idle.get(&key).map(|entries| entries.len()), Some(3));
// Let the timer tick passed the expiration...
rt
.block_on(Delay::new(Instant::now() + Duration::from_millis(200)))
.expect("rt block_on 200ms");
assert!(pool.locked().idle.get(&key).is_none());
}
#[test]
fn test_pool_checkout_task_unparked() {
let pool = pool_no_timer();
let key = Arc::new("foo".to_string());
let pooled = pool.pooled(c(key.clone()), Uniq(41));
let checkout = pool.checkout(key).join(future::lazy(move || {
// the checkout future will park first,
// and then this lazy future will be polled, which will insert
// the pooled back into the pool
//
// this test makes sure that doing so will unpark the checkout
drop(pooled);
Ok(())
})).map(|(entry, _)| entry);
assert_eq!(*checkout.wait().unwrap(), Uniq(41));
}
#[test]
fn test_pool_checkout_drop_cleans_up_waiters() {
future::lazy(|| {
let pool = pool_no_timer::<Uniq<i32>>();
let key = Arc::new("localhost:12345".to_string());
let mut checkout1 = pool.checkout(key.clone());
let mut checkout2 = pool.checkout(key.clone());
// first poll needed to get into Pool's parked
checkout1.poll().unwrap();
assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
checkout2.poll().unwrap();
assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2);
// on drop, clean up Pool
drop(checkout1);
assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
drop(checkout2);
assert!(pool.locked().waiters.get(&key).is_none());
::futures::future::ok::<(), ()>(())
}).wait().unwrap();
}
#[derive(Debug)]
struct CanClose {
val: i32,
closed: bool,
}
impl Poolable for CanClose {
fn is_open(&self) -> bool {
!self.closed
}
fn reserve(self) -> Reservation<Self> {
Reservation::Unique(self)
}
fn can_share(&self) -> bool {
false
}
}
#[test]
fn pooled_drop_if_closed_doesnt_reinsert() {
let pool = pool_no_timer();
let key = Arc::new("localhost:12345".to_string());
pool.pooled(c(key.clone()), CanClose {
val: 57,
closed: true,
});
assert!(!pool.locked().idle.contains_key(&key));
}
}