| //! HTTP Upgrades |
| //! |
| //! See [this example][example] showing how upgrades work with both |
| //! Clients and Servers. |
| //! |
| //! [example]: https://github.com/hyperium/hyper/blob/master/examples/upgrades.rs |
| |
| use std::any::TypeId; |
| use std::error::Error as StdError; |
| use std::fmt; |
| use std::io::{self, Read, Write}; |
| |
| use bytes::{Buf, BufMut, Bytes}; |
| use futures::{Async, Future, Poll}; |
| use futures::sync::oneshot; |
| use tokio_io::{AsyncRead, AsyncWrite}; |
| |
| use common::io::Rewind; |
| |
| /// An upgraded HTTP connection. |
| /// |
| /// This type holds a trait object internally of the original IO that |
| /// was used to speak HTTP before the upgrade. It can be used directly |
| /// as a `Read` or `Write` for convenience. |
| /// |
| /// Alternatively, if the exact type is known, this can be deconstructed |
| /// into its parts. |
| pub struct Upgraded { |
| io: Rewind<Box<Io + Send>>, |
| } |
| |
| /// A future for a possible HTTP upgrade. |
| /// |
| /// If no upgrade was available, or it doesn't succeed, yields an `Error`. |
| pub struct OnUpgrade { |
| rx: Option<oneshot::Receiver<::Result<Upgraded>>>, |
| } |
| |
| /// The deconstructed parts of an [`Upgraded`](Upgraded) type. |
| /// |
| /// Includes the original IO type, and a read buffer of bytes that the |
| /// HTTP state machine may have already read before completing an upgrade. |
| #[derive(Debug)] |
| pub struct Parts<T> { |
| /// The original IO object used before the upgrade. |
| pub io: T, |
| /// A buffer of bytes that have been read but not processed as HTTP. |
| /// |
| /// For instance, if the `Connection` is used for an HTTP upgrade request, |
| /// it is possible the server sent back the first bytes of the new protocol |
| /// along with the response upgrade. |
| /// |
| /// You will want to check for any existing bytes if you plan to continue |
| /// communicating on the IO object. |
| pub read_buf: Bytes, |
| _inner: (), |
| } |
| |
| pub(crate) struct Pending { |
| tx: oneshot::Sender<::Result<Upgraded>> |
| } |
| |
| /// Error cause returned when an upgrade was expected but canceled |
| /// for whatever reason. |
| /// |
| /// This likely means the actual `Conn` future wasn't polled and upgraded. |
| #[derive(Debug)] |
| struct UpgradeExpected(()); |
| |
| pub(crate) fn pending() -> (Pending, OnUpgrade) { |
| let (tx, rx) = oneshot::channel(); |
| ( |
| Pending { |
| tx, |
| }, |
| OnUpgrade { |
| rx: Some(rx), |
| }, |
| ) |
| } |
| |
| pub(crate) trait Io: AsyncRead + AsyncWrite + 'static { |
| fn __hyper_type_id(&self) -> TypeId { |
| TypeId::of::<Self>() |
| } |
| } |
| |
| impl Io + Send { |
| fn __hyper_is<T: Io>(&self) -> bool { |
| let t = TypeId::of::<T>(); |
| self.__hyper_type_id() == t |
| } |
| |
| fn __hyper_downcast<T: Io>(self: Box<Self>) -> Result<Box<T>, Box<Self>> { |
| if self.__hyper_is::<T>() { |
| // Taken from `std::error::Error::downcast()`. |
| unsafe { |
| let raw: *mut Io = Box::into_raw(self); |
| Ok(Box::from_raw(raw as *mut T)) |
| } |
| } else { |
| Err(self) |
| } |
| } |
| } |
| |
| impl<T: AsyncRead + AsyncWrite + 'static> Io for T {} |
| |
| // ===== impl Upgraded ===== |
| |
| impl Upgraded { |
| pub(crate) fn new(io: Box<Io + Send>, read_buf: Bytes) -> Self { |
| Upgraded { |
| io: Rewind::new_buffered(io, read_buf), |
| } |
| } |
| |
| /// Tries to downcast the internal trait object to the type passed. |
| /// |
| /// On success, returns the downcasted parts. On error, returns the |
| /// `Upgraded` back. |
| pub fn downcast<T: AsyncRead + AsyncWrite + 'static>(self) -> Result<Parts<T>, Self> { |
| let (io, buf) = self.io.into_inner(); |
| match io.__hyper_downcast() { |
| Ok(t) => Ok(Parts { |
| io: *t, |
| read_buf: buf, |
| _inner: (), |
| }), |
| Err(io) => Err(Upgraded { |
| io: Rewind::new_buffered(io, buf), |
| }) |
| } |
| } |
| } |
| |
| impl Read for Upgraded { |
| #[inline] |
| fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
| self.io.read(buf) |
| } |
| } |
| |
| impl Write for Upgraded { |
| #[inline] |
| fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
| self.io.write(buf) |
| } |
| |
| #[inline] |
| fn flush(&mut self) -> io::Result<()> { |
| self.io.flush() |
| } |
| } |
| |
| impl AsyncRead for Upgraded { |
| #[inline] |
| unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { |
| self.io.prepare_uninitialized_buffer(buf) |
| } |
| |
| #[inline] |
| fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { |
| self.io.read_buf(buf) |
| } |
| } |
| |
| impl AsyncWrite for Upgraded { |
| #[inline] |
| fn shutdown(&mut self) -> Poll<(), io::Error> { |
| AsyncWrite::shutdown(&mut self.io) |
| } |
| |
| #[inline] |
| fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { |
| self.io.write_buf(buf) |
| } |
| } |
| |
| impl fmt::Debug for Upgraded { |
| fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
| f.debug_struct("Upgraded") |
| .finish() |
| } |
| } |
| |
| // ===== impl OnUpgrade ===== |
| |
| impl OnUpgrade { |
| pub(crate) fn none() -> Self { |
| OnUpgrade { |
| rx: None, |
| } |
| } |
| |
| pub(crate) fn is_none(&self) -> bool { |
| self.rx.is_none() |
| } |
| } |
| |
| impl Future for OnUpgrade { |
| type Item = Upgraded; |
| type Error = ::Error; |
| |
| fn poll(&mut self) -> Poll<Self::Item, Self::Error> { |
| match self.rx { |
| Some(ref mut rx) => match rx.poll() { |
| Ok(Async::NotReady) => Ok(Async::NotReady), |
| Ok(Async::Ready(Ok(upgraded))) => Ok(Async::Ready(upgraded)), |
| Ok(Async::Ready(Err(err))) => Err(err), |
| Err(_oneshot_canceled) => Err( |
| ::Error::new_canceled(Some(UpgradeExpected(()))) |
| ), |
| }, |
| None => Err(::Error::new_user_no_upgrade()), |
| } |
| } |
| } |
| |
| impl fmt::Debug for OnUpgrade { |
| fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
| f.debug_struct("OnUpgrade") |
| .finish() |
| } |
| } |
| |
| // ===== impl Pending ===== |
| |
| impl Pending { |
| pub(crate) fn fulfill(self, upgraded: Upgraded) { |
| trace!("pending upgrade fulfill"); |
| let _ = self.tx.send(Ok(upgraded)); |
| } |
| |
| /// Don't fulfill the pending Upgrade, but instead signal that |
| /// upgrades are handled manually. |
| pub(crate) fn manual(self) { |
| trace!("pending upgrade handled manually"); |
| let _ = self.tx.send(Err(::Error::new_user_manual_upgrade())); |
| } |
| } |
| |
| // ===== impl UpgradeExpected ===== |
| |
| impl fmt::Display for UpgradeExpected { |
| fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
| f.write_str(self.description()) |
| } |
| } |
| |
| impl StdError for UpgradeExpected { |
| fn description(&self) -> &str { |
| "upgrade expected but not completed" |
| } |
| } |
| |