blob: 87e3e00764ceeba74191d0e34facf45decdab142 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
extern crate futures;
use futures::prelude::*;
use futures::task::{self, AtomicWaker};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
// This file provides support for cancelable futures (thanks cramertj@!). A PR
// has been sent to futures-rs. Remove this once that lands and makes it into
// Fuchsia (see https://github.com/alexcrichton/futures-rs/issues/693).
/// A future which can be cancelled using a `CancelHandle`.
#[derive(Debug, Clone)]
pub struct Cancelable<T> {
future: T,
inner: Arc<CancelInner>,
}
impl<T> Cancelable<T>
where
T: Future<Item = ()>,
{
pub fn new(future: T, handle: &CancelHandle) -> Self {
Cancelable {
future,
inner: handle.inner.clone(),
}
}
}
/// A handle to a `Cancelable` future.
#[derive(Debug, Clone)]
pub struct CancelHandle {
inner: Arc<CancelInner>,
}
impl CancelHandle {
pub fn new() -> Self {
CancelHandle {
inner: Arc::new(CancelInner {
task: AtomicWaker::new(),
cancel: AtomicBool::new(false),
}),
}
}
}
// Inner type storing the task to awaken and a bool indicating that it
// should be cancelled.
#[derive(Debug)]
struct CancelInner {
task: AtomicWaker,
cancel: AtomicBool,
}
impl<T> Future for Cancelable<T>
where
T: Future<Item = ()>,
{
type Item = ();
type Error = T::Error;
fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
if self.inner.cancel.load(Ordering::Acquire) {
Ok(Async::Ready(()))
} else {
match self.future.poll(cx) {
Ok(Async::Pending) => {
self.inner.task.register(cx.waker());
Ok(Async::Pending)
}
res => res,
}
}
}
}
impl CancelHandle {
/// Cancel the `Cancelable` future associated with this handle.
pub fn cancel(&self) {
self.inner.cancel.store(true, Ordering::Release);
self.inner.task.wake();
}
}