| use std::fmt; |
| use std::io; |
| use std::hash; |
| use std::mem; |
| use std::cmp; |
| use std::ops::{Deref, DerefMut}; |
| use std::sync::Arc; |
| |
| use futures::{Async, Poll, Stream, Sink, StartSend, AsyncSink}; |
| |
| use io::Io; |
| |
| const INITIAL_CAPACITY: usize = 8 * 1024; |
| |
| /// A reference counted buffer of bytes. |
| /// |
| /// An `EasyBuf` is a representation of a byte buffer where sub-slices of it can |
| /// be handed out efficiently, each with a `'static` lifetime which keeps the |
| /// data alive. The buffer also supports mutation but may require bytes to be |
| /// copied to complete the operation. |
| #[derive(Clone, Eq)] |
| pub struct EasyBuf { |
| buf: Arc<Vec<u8>>, |
| start: usize, |
| end: usize, |
| } |
| |
| /// An RAII object returned from `get_mut` which provides mutable access to the |
| /// underlying `Vec<u8>`. |
| pub struct EasyBufMut<'a> { |
| buf: &'a mut Vec<u8>, |
| end: &'a mut usize, |
| } |
| |
| impl EasyBuf { |
| /// Creates a new EasyBuf with no data and the default capacity. |
| pub fn new() -> EasyBuf { |
| EasyBuf::with_capacity(INITIAL_CAPACITY) |
| } |
| |
| /// Creates a new EasyBuf with `cap` capacity. |
| pub fn with_capacity(cap: usize) -> EasyBuf { |
| EasyBuf { |
| buf: Arc::new(Vec::with_capacity(cap)), |
| start: 0, |
| end: 0, |
| } |
| } |
| |
| /// Changes the starting index of this window to the index specified. |
| /// |
| /// Returns the windows back to chain multiple calls to this method. |
| /// |
| /// # Panics |
| /// |
| /// This method will panic if `start` is out of bounds for the underlying |
| /// slice or if it comes after the `end` configured in this window. |
| fn set_start(&mut self, start: usize) -> &mut EasyBuf { |
| assert!(start <= self.buf.as_ref().len()); |
| assert!(start <= self.end); |
| self.start = start; |
| self |
| } |
| |
| /// Changes the end index of this window to the index specified. |
| /// |
| /// Returns the windows back to chain multiple calls to this method. |
| /// |
| /// # Panics |
| /// |
| /// This method will panic if `end` is out of bounds for the underlying |
| /// slice or if it comes after the `end` configured in this window. |
| fn set_end(&mut self, end: usize) -> &mut EasyBuf { |
| assert!(end <= self.buf.len()); |
| assert!(self.start <= end); |
| self.end = end; |
| self |
| } |
| |
| /// Returns the number of bytes contained in this `EasyBuf`. |
| pub fn len(&self) -> usize { |
| self.end - self.start |
| } |
| |
| /// Returns the inner contents of this `EasyBuf` as a slice. |
| pub fn as_slice(&self) -> &[u8] { |
| self.as_ref() |
| } |
| |
| /// Splits the buffer into two at the given index. |
| /// |
| /// Afterwards `self` contains elements `[0, at)`, and the returned `EasyBuf` |
| /// contains elements `[at, len)`. |
| /// |
| /// This is an O(1) operation that just increases the reference count and |
| /// sets a few indexes. |
| /// |
| /// # Panics |
| /// |
| /// Panics if `at > len` |
| pub fn split_off(&mut self, at: usize) -> EasyBuf { |
| let mut other = EasyBuf { buf: self.buf.clone(), ..*self }; |
| let idx = self.start + at; |
| other.set_start(idx); |
| self.set_end(idx); |
| return other |
| } |
| |
| /// Splits the buffer into two at the given index. |
| /// |
| /// Afterwards `self` contains elements `[at, len)`, and the returned `EasyBuf` |
| /// contains elements `[0, at)`. |
| /// |
| /// This is an O(1) operation that just increases the reference count and |
| /// sets a few indexes. |
| /// |
| /// # Panics |
| /// |
| /// Panics if `at > len` |
| pub fn drain_to(&mut self, at: usize) -> EasyBuf { |
| let mut other = EasyBuf { buf: self.buf.clone(), ..*self }; |
| let idx = self.start + at; |
| other.set_end(idx); |
| self.set_start(idx); |
| return other |
| } |
| |
| /// Returns a mutable reference to the underlying growable buffer of bytes. |
| /// |
| /// If this `EasyBuf` is the only instance pointing at the underlying buffer |
| /// of bytes, a direct mutable reference will be returned. Otherwise the |
| /// contents of this `EasyBuf` will be reallocated in a fresh `Vec<u8>` |
| /// allocation with the same capacity as an `EasyBuf` created with `EasyBuf::new()`, |
| /// and that allocation will be returned. |
| /// |
| /// This operation **is not O(1)** as it may clone the entire contents of |
| /// this buffer. |
| /// |
| /// The returned `EasyBufMut` type implement `Deref` and `DerefMut` to |
| /// `Vec<u8>` can the byte buffer can be manipulated using the standard |
| /// `Vec<u8>` methods. |
| pub fn get_mut(&mut self) -> EasyBufMut { |
| // Fast path if we can get mutable access to our own current |
| // buffer. |
| // |
| // TODO: this should be a match or an if-let |
| if Arc::get_mut(&mut self.buf).is_some() { |
| let buf = Arc::get_mut(&mut self.buf).unwrap(); |
| buf.drain(self.end..); |
| buf.drain(..self.start); |
| self.start = 0; |
| return EasyBufMut { buf: buf, end: &mut self.end } |
| } |
| |
| // If we couldn't get access above then we give ourself a new buffer |
| // here. |
| let mut v = Vec::with_capacity(cmp::max(INITIAL_CAPACITY, self.as_ref().len())); |
| v.extend_from_slice(self.as_ref()); |
| self.start = 0; |
| self.buf = Arc::new(v); |
| EasyBufMut { |
| buf: Arc::get_mut(&mut self.buf).unwrap(), |
| end: &mut self.end, |
| } |
| } |
| } |
| |
| impl AsRef<[u8]> for EasyBuf { |
| fn as_ref(&self) -> &[u8] { |
| &self.buf[self.start..self.end] |
| } |
| } |
| |
| impl<'a> Deref for EasyBufMut<'a> { |
| type Target = Vec<u8>; |
| |
| fn deref(&self) -> &Vec<u8> { |
| self.buf |
| } |
| } |
| |
| impl<'a> DerefMut for EasyBufMut<'a> { |
| fn deref_mut(&mut self) -> &mut Vec<u8> { |
| self.buf |
| } |
| } |
| |
| impl From<Vec<u8>> for EasyBuf { |
| fn from(vec: Vec<u8>) -> EasyBuf { |
| let end = vec.len(); |
| EasyBuf { |
| buf: Arc::new(vec), |
| start: 0, |
| end: end, |
| } |
| } |
| } |
| |
| impl<T: AsRef<[u8]>> PartialEq<T> for EasyBuf { |
| fn eq(&self, other: &T) -> bool { |
| self.as_slice().eq(other.as_ref()) |
| } |
| } |
| |
| impl Ord for EasyBuf { |
| fn cmp(&self, other: &Self) -> cmp::Ordering { |
| self.as_slice().cmp(other.as_slice()) |
| } |
| } |
| |
| impl<T: AsRef<[u8]>> PartialOrd<T> for EasyBuf { |
| fn partial_cmp(&self, other: &T) -> Option<cmp::Ordering> { |
| self.as_slice().partial_cmp(other.as_ref()) |
| } |
| } |
| |
| impl hash::Hash for EasyBuf { |
| fn hash<H: hash::Hasher>(&self, state: &mut H) { |
| self.as_slice().hash(state) |
| } |
| } |
| |
| impl<'a> Drop for EasyBufMut<'a> { |
| fn drop(&mut self) { |
| *self.end = self.buf.len(); |
| } |
| } |
| |
| impl fmt::Debug for EasyBuf { |
| fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result { |
| let bytes = self.as_ref(); |
| let len = self.len(); |
| if len < 10 { |
| write!(formatter, "EasyBuf{{len={}/{} {:?}}}", self.len(), self.buf.len(), bytes) |
| } else { // choose a more compact representation |
| write!(formatter, "EasyBuf{{len={}/{} [{}, {}, {}, {}, ..., {}, {}, {}, {}]}}", self.len(), self.buf.len(), bytes[0], bytes[1], bytes[2], bytes[3], bytes[len-4], bytes[len-3], bytes[len-2], bytes[len-1]) |
| } |
| } |
| } |
| |
| impl Into<Vec<u8>> for EasyBuf { |
| fn into(mut self) -> Vec<u8> { |
| mem::replace(self.get_mut().buf, vec![]) |
| } |
| } |
| |
| /// Encoding and decoding of frames via buffers. |
| /// |
| /// This trait is used when constructing an instance of `Framed`. It provides |
| /// two types: `In`, for decoded input frames, and `Out`, for outgoing frames |
| /// that need to be encoded. It also provides methods to actually perform the |
| /// encoding and decoding, which work with corresponding buffer types. |
| /// |
| /// The trait itself is implemented on a type that can track state for decoding |
| /// or encoding, which is particularly useful for streaming parsers. In many |
| /// cases, though, this type will simply be a unit struct (e.g. `struct |
| /// HttpCodec`). |
| pub trait Codec { |
| /// The type of decoded frames. |
| type In; |
| |
| /// The type of frames to be encoded. |
| type Out; |
| |
| /// Attempts to decode a frame from the provided buffer of bytes. |
| /// |
| /// This method is called by `Framed` whenever bytes are ready to be parsed. |
| /// The provided buffer of bytes is what's been read so far, and this |
| /// instance of `Decode` can determine whether an entire frame is in the |
| /// buffer and is ready to be returned. |
| /// |
| /// If an entire frame is available, then this instance will remove those |
| /// bytes from the buffer provided and return them as a decoded |
| /// frame. Note that removing bytes from the provided buffer doesn't always |
| /// necessarily copy the bytes, so this should be an efficient operation in |
| /// most circumstances. |
| /// |
| /// If the bytes look valid, but a frame isn't fully available yet, then |
| /// `Ok(None)` is returned. This indicates to the `Framed` instance that |
| /// it needs to read some more bytes before calling this method again. |
| /// |
| /// Finally, if the bytes in the buffer are malformed then an error is |
| /// returned indicating why. This informs `Framed` that the stream is now |
| /// corrupt and should be terminated. |
| fn decode(&mut self, buf: &mut EasyBuf) -> io::Result<Option<Self::In>>; |
| |
| /// A default method available to be called when there are no more bytes |
| /// available to be read from the underlying I/O. |
| /// |
| /// This method defaults to calling `decode` and returns an error if |
| /// `Ok(None)` is returned. Typically this doesn't need to be implemented |
| /// unless the framing protocol differs near the end of the stream. |
| fn decode_eof(&mut self, buf: &mut EasyBuf) -> io::Result<Self::In> { |
| match try!(self.decode(buf)) { |
| Some(frame) => Ok(frame), |
| None => Err(io::Error::new(io::ErrorKind::Other, |
| "bytes remaining on stream")), |
| } |
| } |
| |
| /// Encodes a frame into the buffer provided. |
| /// |
| /// This method will encode `msg` into the byte buffer provided by `buf`. |
| /// The `buf` provided is an internal buffer of the `Framed` instance and |
| /// will be written out when possible. |
| fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> io::Result<()>; |
| } |
| |
| /// A unified `Stream` and `Sink` interface to an underlying `Io` object, using |
| /// the `Codec` trait to encode and decode frames. |
| /// |
| /// You can acquire a `Framed` instance by using the `Io::framed` adapter. |
| pub struct Framed<T, C> { |
| upstream: T, |
| codec: C, |
| eof: bool, |
| is_readable: bool, |
| rd: EasyBuf, |
| wr: Vec<u8>, |
| } |
| |
| impl<T: Io, C: Codec> Stream for Framed<T, C> { |
| type Item = C::In; |
| type Error = io::Error; |
| |
| fn poll(&mut self) -> Poll<Option<C::In>, io::Error> { |
| loop { |
| // If the read buffer has any pending data, then it could be |
| // possible that `decode` will return a new frame. We leave it to |
| // the decoder to optimize detecting that more data is required. |
| if self.is_readable { |
| if self.eof { |
| if self.rd.len() == 0 { |
| return Ok(None.into()) |
| } else { |
| let frame = try!(self.codec.decode_eof(&mut self.rd)); |
| return Ok(Async::Ready(Some(frame))) |
| } |
| } |
| trace!("attempting to decode a frame"); |
| if let Some(frame) = try!(self.codec.decode(&mut self.rd)) { |
| trace!("frame decoded from buffer"); |
| return Ok(Async::Ready(Some(frame))); |
| } |
| self.is_readable = false; |
| } |
| |
| assert!(!self.eof); |
| |
| // Otherwise, try to read more data and try again |
| // |
| // TODO: shouldn't read_to_end, that may read a lot |
| let before = self.rd.len(); |
| let ret = self.upstream.read_to_end(&mut self.rd.get_mut()); |
| match ret { |
| Ok(_n) => self.eof = true, |
| Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
| if self.rd.len() == before { |
| return Ok(Async::NotReady) |
| } |
| } |
| Err(e) => return Err(e), |
| } |
| self.is_readable = true; |
| } |
| } |
| } |
| |
| impl<T: Io, C: Codec> Sink for Framed<T, C> { |
| type SinkItem = C::Out; |
| type SinkError = io::Error; |
| |
| fn start_send(&mut self, item: C::Out) -> StartSend<C::Out, io::Error> { |
| // If the buffer is already over 8KiB, then attempt to flush it. If after flushing it's |
| // *still* over 8KiB, then apply backpressure (reject the send). |
| const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY; |
| if self.wr.len() > BACKPRESSURE_BOUNDARY { |
| try!(self.poll_complete()); |
| if self.wr.len() > BACKPRESSURE_BOUNDARY { |
| return Ok(AsyncSink::NotReady(item)); |
| } |
| } |
| |
| try!(self.codec.encode(item, &mut self.wr)); |
| Ok(AsyncSink::Ready) |
| } |
| |
| fn poll_complete(&mut self) -> Poll<(), io::Error> { |
| trace!("flushing framed transport"); |
| |
| while !self.wr.is_empty() { |
| trace!("writing; remaining={}", self.wr.len()); |
| let n = try_nb!(self.upstream.write(&self.wr)); |
| if n == 0 { |
| return Err(io::Error::new(io::ErrorKind::WriteZero, |
| "failed to write frame to transport")); |
| } |
| self.wr.drain(..n); |
| } |
| |
| // Try flushing the underlying IO |
| try_nb!(self.upstream.flush()); |
| |
| trace!("framed transport flushed"); |
| return Ok(Async::Ready(())); |
| } |
| |
| fn close(&mut self) -> Poll<(), io::Error> { |
| try_ready!(self.poll_complete()); |
| Ok(().into()) |
| } |
| } |
| |
| pub fn framed<T, C>(io: T, codec: C) -> Framed<T, C> { |
| Framed { |
| upstream: io, |
| codec: codec, |
| eof: false, |
| is_readable: false, |
| rd: EasyBuf::new(), |
| wr: Vec::with_capacity(INITIAL_CAPACITY), |
| } |
| } |
| |
| impl<T, C> Framed<T, C> { |
| |
| /// Returns a reference to the underlying I/O stream wrapped by `Framed`. |
| /// |
| /// Note that care should be taken to not tamper with the underlying stream |
| /// of data coming in as it may corrupt the stream of frames otherwise being |
| /// worked with. |
| pub fn get_ref(&self) -> &T { |
| &self.upstream |
| } |
| |
| /// Returns a mutable reference to the underlying I/O stream wrapped by |
| /// `Framed`. |
| /// |
| /// Note that care should be taken to not tamper with the underlying stream |
| /// of data coming in as it may corrupt the stream of frames otherwise being |
| /// worked with. |
| pub fn get_mut(&mut self) -> &mut T { |
| &mut self.upstream |
| } |
| |
| /// Consumes the `Framed`, returning its underlying I/O stream. |
| /// |
| /// Note that care should be taken to not tamper with the underlying stream |
| /// of data coming in as it may corrupt the stream of frames otherwise being |
| /// worked with. |
| pub fn into_inner(self) -> T { |
| self.upstream |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::{INITIAL_CAPACITY, EasyBuf}; |
| use std::mem; |
| |
| #[test] |
| fn debug_empty_easybuf() { |
| let buf: EasyBuf = vec![].into(); |
| assert_eq!("EasyBuf{len=0/0 []}", format!("{:?}", buf)); |
| } |
| |
| #[test] |
| fn debug_small_easybuf() { |
| let buf: EasyBuf = vec![1, 2, 3, 4, 5, 6].into(); |
| assert_eq!("EasyBuf{len=6/6 [1, 2, 3, 4, 5, 6]}", format!("{:?}", buf)); |
| } |
| |
| #[test] |
| fn debug_small_easybuf_split() { |
| let mut buf: EasyBuf = vec![1, 2, 3, 4, 5, 6].into(); |
| let split = buf.split_off(4); |
| assert_eq!("EasyBuf{len=4/6 [1, 2, 3, 4]}", format!("{:?}", buf)); |
| assert_eq!("EasyBuf{len=2/6 [5, 6]}", format!("{:?}", split)); |
| } |
| |
| #[test] |
| fn debug_large_easybuf() { |
| let vec: Vec<u8> = (0u8..255u8).collect(); |
| let buf: EasyBuf = vec.into(); |
| assert_eq!("EasyBuf{len=255/255 [0, 1, 2, 3, ..., 251, 252, 253, 254]}", format!("{:?}", buf)); |
| } |
| |
| #[test] |
| fn easybuf_get_mut_sliced() { |
| let vec: Vec<u8> = (0u8..10u8).collect(); |
| let mut buf: EasyBuf = vec.into(); |
| buf.split_off(9); |
| buf.drain_to(3); |
| assert_eq!(*buf.get_mut(), [3, 4, 5, 6, 7, 8]); |
| } |
| |
| #[test] |
| fn easybuf_get_mut_sliced_allocating_at_least_initial_capacity() { |
| let vec: Vec<u8> = (0u8..10u8).collect(); |
| let mut buf: EasyBuf = vec.into(); |
| buf.split_off(9); |
| buf.drain_to(3); |
| // Clone to make shared |
| let clone = buf.clone(); |
| assert_eq!(*buf.get_mut(), [3, 4, 5, 6, 7, 8]); |
| assert_eq!(buf.get_mut().buf.capacity(), INITIAL_CAPACITY); |
| mem::drop(clone); // prevent unused warning |
| } |
| |
| #[test] |
| fn easybuf_get_mut_sliced_allocating_required_capacity() { |
| let vec: Vec<u8> = (0..INITIAL_CAPACITY * 2).map(|_|0u8).collect(); |
| let mut buf: EasyBuf = vec.into(); |
| buf.drain_to(INITIAL_CAPACITY / 2); |
| let clone = buf.clone(); |
| assert_eq!(buf.get_mut().buf.capacity(), INITIAL_CAPACITY + INITIAL_CAPACITY / 2); |
| mem::drop(clone) |
| } |
| |
| #[test] |
| fn easybuf_into_vec_simple() { |
| let vec: Vec<u8> = (0u8..10u8).collect(); |
| let reference = vec.clone(); |
| let buf: EasyBuf = vec.into(); |
| let original_pointer = buf.buf.as_ref().as_ptr(); |
| let result: Vec<u8> = buf.into(); |
| assert_eq!(result, reference); |
| let new_pointer = result.as_ptr(); |
| assert_eq!(original_pointer, new_pointer, "Into<Vec<u8>> should reuse the exclusive Vec"); |
| } |
| |
| #[test] |
| fn easybuf_into_vec_sliced() { |
| let vec: Vec<u8> = (0u8..10u8).collect(); |
| let mut buf: EasyBuf = vec.into(); |
| let original_pointer = buf.buf.as_ref().as_ptr(); |
| buf.split_off(9); |
| buf.drain_to(3); |
| let result: Vec<u8> = buf.into(); |
| let reference: Vec<u8> = (3u8..9u8).collect(); |
| assert_eq!(result, reference); |
| let new_pointer = result.as_ptr(); |
| assert_eq!(original_pointer, new_pointer, "Into<Vec<u8>> should reuse the exclusive Vec"); |
| } |
| |
| #[test] |
| fn easybuf_into_vec_sliced_allocating() { |
| let vec: Vec<u8> = (0u8..10u8).collect(); |
| let mut buf: EasyBuf = vec.into(); |
| let original_pointer = buf.buf.as_ref().as_ptr(); |
| // Create a clone to create second reference to this EasyBuf and force allocation |
| let original = buf.clone(); |
| buf.split_off(9); |
| buf.drain_to(3); |
| let result: Vec<u8> = buf.into(); |
| let reference: Vec<u8> = (3u8..9u8).collect(); |
| assert_eq!(result, reference); |
| let original_reference: EasyBuf =(0u8..10u8).collect::<Vec<u8>>().into(); |
| assert_eq!(original.as_ref(), original_reference.as_ref()); |
| let new_pointer = result.as_ptr(); |
| assert_ne!(original_pointer, new_pointer, "A new vec should be allocated"); |
| } |
| |
| #[test] |
| fn easybuf_equality_same_underlying_vec() { |
| let mut buf: EasyBuf = (0u8..10).collect::<Vec<_>>().into(); |
| assert_eq!(buf, buf); |
| let other = buf.drain_to(5); |
| assert_ne!(buf, other); |
| |
| let buf: EasyBuf = (0u8..5).collect::<Vec<_>>().into(); |
| assert_eq!(buf, other); |
| } |
| |
| #[test] |
| fn easybuf_equality_different_underlying_vec() { |
| let mut buf: EasyBuf = (0u8..10).collect::<Vec<_>>().into(); |
| let mut other: EasyBuf = (0u8..10).collect::<Vec<_>>().into(); |
| assert_eq!(buf, other); |
| |
| buf = buf.drain_to(5); |
| assert_ne!(buf, other); |
| |
| other = other.drain_to(5); |
| assert_eq!(buf, other); |
| } |
| } |