// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use {
    crate::RWHandle,
    fuchsia_zircon::{self as zx, AsHandleRef},
    futures::ready,
    std::{
        fmt,
        future::Future,
        marker::{PhantomData, Unpin},
        pin::Pin,
        task::{Context, Poll},
    },
};

/// Marker trait for types that can be read/written with a `Fifo`.
/// Unsafe because not all types may be represented by arbitrary bit patterns.
pub unsafe trait FifoEntry {}

/// Identifies that the object may be used to write entries into a FIFO.
pub trait FifoWritable<W: FifoEntry>
where
    Self: Sized,
{
    /// Creates a future that transmits entries to be written.
    ///
    /// The returned future will return after an entry has been received on
    /// this fifo. The future will resolve to the fifo once all elements
    /// have been transmitted.
    ///
    /// An error during writing will cause the fifo to get
    /// destroyed and the status will be returned.
    fn write_entries<'a>(&'a self, entries: &'a [W]) -> WriteEntry<'a, Self, W> {
        WriteEntry::new(self, entries)
    }

    /// Writes entries to the fifo and registers this `Fifo` as
    /// needing a write on receiving a `zx::Status::SHOULD_WAIT`.
    ///
    /// Returns the number of elements processed.
    fn write(&self, cx: &mut Context<'_>, entries: &[W]) -> Poll<Result<usize, zx::Status>>;
}

/// Identifies that the object may be used to read entries from a FIFO.
pub trait FifoReadable<R: FifoEntry>
where
    Self: Sized,
{
    /// Creates a future that receives an entry to be written to the element
    /// provided.
    ///
    /// The returned future will return after an entry has been received on
    /// this fifo. The future will resolve to the fifo and the entry.
    ///
    /// An error during reading will cause the fifo and entry to get
    /// destroyed and the status will be returned.
    fn read_entry(&self) -> ReadEntry<'_, Self, R> {
        ReadEntry::new(self)
    }

    /// Reads an entry from the fifo and registers this `Fifo` as
    /// needing a read on receiving a `zx::Status::SHOULD_WAIT`.
    fn read(&self, cx: &mut Context<'_>) -> Poll<Result<Option<R>, zx::Status>>;
}

/// An I/O object representing a `Fifo`.
pub struct Fifo<R: FifoEntry, W: FifoEntry = R> {
    handle: RWHandle<zx::Fifo>,
    read_marker: PhantomData<R>,
    write_marker: PhantomData<W>,
}

impl<R: FifoEntry, W: FifoEntry> AsRef<zx::Fifo> for Fifo<R, W> {
    fn as_ref(&self) -> &zx::Fifo {
        self.handle.get_ref()
    }
}

impl<R: FifoEntry, W: FifoEntry> AsHandleRef for Fifo<R, W> {
    fn as_handle_ref(&self) -> zx::HandleRef<'_> {
        self.handle.get_ref().as_handle_ref()
    }
}

impl<R: FifoEntry, W: FifoEntry> From<Fifo<R, W>> for zx::Fifo {
    fn from(fifo: Fifo<R, W>) -> zx::Fifo {
        fifo.handle.into_inner()
    }
}

impl<R: FifoEntry, W: FifoEntry> Fifo<R, W> {
    /// Creates a new `Fifo` from a previously-created `zx::Fifo`.
    pub fn from_fifo(fifo: zx::Fifo) -> Result<Self, zx::Status> {
        Ok(Fifo {
            handle: RWHandle::new(fifo)?,
            read_marker: PhantomData,
            write_marker: PhantomData,
        })
    }

    /// Test whether this fifo is ready to be written or not.
    ///
    /// If the fifo is *not* writable then the current task is scheduled to
    /// get a notification when the fifo does become writable. That is, this
    /// is only suitable for calling in a `Future::poll` method and will
    /// automatically handle ensuring a retry once the fifo is writable again.
    ///
    /// Returns `true` if the CLOSED signal has been received.
    pub fn poll_write(&self, cx: &mut Context<'_>) -> Poll<Result<bool, zx::Status>> {
        self.handle.poll_write(cx)
    }

    /// Writes entries to the fifo and registers this `Fifo` as
    /// needing a write on receiving a `zx::Status::SHOULD_WAIT`.
    ///
    /// Returns the number of elements processed.
    pub fn try_write(
        &self,
        cx: &mut Context<'_>,
        entries: &[W],
    ) -> Poll<Result<usize, zx::Status>> {
        let clear_closed = ready!(self.poll_write(cx)?);
        let elem_size = ::std::mem::size_of::<W>();
        let elembuf = unsafe {
            ::std::slice::from_raw_parts(entries.as_ptr() as *const u8, elem_size * entries.len())
        };
        match self.as_ref().write(elem_size, elembuf) {
            Err(e) => {
                if e == zx::Status::SHOULD_WAIT {
                    self.handle.need_write(cx, clear_closed)?;
                    Poll::Pending
                } else {
                    Poll::Ready(Err(e))
                }
            }
            Ok(count) => Poll::Ready(Ok(count)),
        }
    }

    /// Test whether this fifo is ready to be read or not.
    ///
    /// If the fifo is *not* readable then the current task is scheduled to
    /// get a notification when the fifo does become readable. That is, this
    /// is only suitable for calling in a `Future::poll` method and will
    /// automatically handle ensuring a retry once the fifo is readable again.
    ///
    /// Returns `true` if the CLOSED signal has been received.
    pub fn poll_read(&self, cx: &mut Context<'_>) -> Poll<Result<bool, zx::Status>> {
        self.handle.poll_read(cx)
    }

    /// Reads an entry from the fifo and registers this `Fifo` as
    /// needing a read on receiving a `zx::Status::SHOULD_WAIT`.
    pub fn try_read(&self, cx: &mut Context<'_>) -> Poll<Result<Option<R>, zx::Status>> {
        let clear_closed = ready!(self.handle.poll_read(cx)?);
        let mut element = ::std::mem::MaybeUninit::<R>::uninit();
        let elembuf = unsafe {
            ::std::slice::from_raw_parts_mut(
                element.as_mut_ptr() as *mut u8,
                ::std::mem::size_of::<R>(),
            )
        };

        match self.as_ref().read(::std::mem::size_of::<R>(), elembuf) {
            Err(e) => {
                if e == zx::Status::SHOULD_WAIT {
                    self.handle.need_read(cx, clear_closed)?;
                    return Poll::Pending;
                }
                if e == zx::Status::PEER_CLOSED {
                    return Poll::Ready(Ok(None));
                }
                return Poll::Ready(Err(e));
            }
            Ok(count) => {
                debug_assert_eq!(1, count);
                let element = unsafe { element.assume_init() };
                return Poll::Ready(Ok(Some(element)));
            }
        }
    }
}

impl<R: FifoEntry, W: FifoEntry> FifoReadable<R> for Fifo<R, W> {
    fn read(&self, cx: &mut Context<'_>) -> Poll<Result<Option<R>, zx::Status>> {
        self.try_read(cx)
    }
}

impl<R: FifoEntry, W: FifoEntry> FifoWritable<W> for Fifo<R, W> {
    fn write(&self, cx: &mut Context<'_>, entries: &[W]) -> Poll<Result<usize, zx::Status>> {
        self.try_write(cx, entries)
    }
}

impl<R: FifoEntry, W: FifoEntry> fmt::Debug for Fifo<R, W> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        self.handle.get_ref().fmt(f)
    }
}

/// WriteEntry represents the future of one or more writes.
pub struct WriteEntry<'a, F, W> {
    fifo: &'a F,
    entries: &'a [W],
}

impl<'a, F, W> Unpin for WriteEntry<'a, F, W> {}

impl<'a, F: FifoWritable<W>, W: FifoEntry> WriteEntry<'a, F, W> {
    /// Create a new WriteEntry, which borrows the `FifoWritable` type
    /// until the future completes.
    pub fn new(fifo: &'a F, entries: &'a [W]) -> Self {
        WriteEntry { fifo, entries }
    }
}

impl<'a, F: FifoWritable<W>, W: FifoEntry> Future for WriteEntry<'a, F, W> {
    type Output = Result<(), zx::Status>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = &mut *self;
        while !this.entries.is_empty() {
            let advance = ready!(this.fifo.write(cx, this.entries)?);
            this.entries = &this.entries[advance..];
        }
        Poll::Ready(Ok(()))
    }
}

/// ReadEntry represents the future of a single read.
pub struct ReadEntry<'a, F, R> {
    fifo: &'a F,
    read_marker: PhantomData<R>,
}

impl<'a, F, W> Unpin for ReadEntry<'a, F, W> {}

impl<'a, F: FifoReadable<R>, R: FifoEntry> ReadEntry<'a, F, R> {
    /// Create a new ReadEntry, which borrows the `FifoReadable` type
    /// until the future completes.
    pub fn new(fifo: &'a F) -> ReadEntry<'_, F, R> {
        ReadEntry { fifo, read_marker: PhantomData }
    }
}

impl<'a, F: FifoReadable<R>, R: FifoEntry> Future for ReadEntry<'a, F, R> {
    type Output = Result<Option<R>, zx::Status>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.fifo.read(cx)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::{DurationExt, Executor, TimeoutExt, Timer};
    use fuchsia_zircon::prelude::*;
    use futures::future::try_join;
    use futures::prelude::*;

    #[derive(Clone, Debug, PartialEq, Eq)]
    #[repr(C)]
    struct entry {
        a: u32,
        b: u32,
    }
    unsafe impl FifoEntry for entry {}

    #[derive(Clone, Debug, PartialEq, Eq)]
    #[repr(C)]
    struct wrong_entry {
        a: u16,
    }
    unsafe impl FifoEntry for wrong_entry {}

    #[test]
    fn can_read_write() {
        let mut exec = Executor::new().expect("failed to create executor");
        let elements: &[entry; 1] = &[entry { a: 10, b: 20 }];

        let (tx, rx) =
            zx::Fifo::create(2, ::std::mem::size_of::<entry>()).expect("failed to create zx fifo");
        let (tx, rx) = (
            Fifo::<entry>::from_fifo(tx).expect("failed to create async tx fifo"),
            Fifo::<entry>::from_fifo(rx).expect("failed to create async rx fifo"),
        );

        let receive_future = rx.read_entry().map_ok(|entry| {
            assert_eq!(elements[0], entry.expect("peer closed"));
        });

        // add a timeout to receiver so if test is broken it doesn't take forever
        let receiver = receive_future.on_timeout(300.millis().after_now(), || panic!("timeout"));

        // Sends an entry after the timeout has passed
        let sender = Timer::new(10.millis().after_now()).then(|()| tx.write_entries(elements));

        let done = try_join(receiver, sender);
        exec.run_singlethreaded(done).expect("failed to run receive future on executor");
    }

    #[test]
    fn read_wrong_size() {
        let mut exec = Executor::new().expect("failed to create executor");
        let elements: &[entry; 1] = &[entry { a: 10, b: 20 }];

        let (tx, rx) =
            zx::Fifo::create(2, ::std::mem::size_of::<entry>()).expect("failed to create zx fifo");
        let (tx, rx) = (
            Fifo::<entry>::from_fifo(tx).expect("failed to create async tx fifo"),
            Fifo::<wrong_entry>::from_fifo(rx).expect("failed to create async rx fifo"),
        );

        let receive_future = rx.read_entry().map_ok(|_entry| panic!("read should have failed"));

        // add a timeout to receiver so if test is broken it doesn't take forever
        let receiver = receive_future.on_timeout(300.millis().after_now(), || panic!("timeout"));

        // Sends an entry after the timeout has passed
        let sender = Timer::new(10.millis().after_now()).then(|()| tx.write_entries(elements));

        let done = try_join(receiver, sender);
        let res = exec.run_singlethreaded(done);
        match res {
            Err(zx::Status::OUT_OF_RANGE) => (),
            _ => panic!("did not get out-of-range error"),
        }
    }

    #[test]
    fn write_wrong_size() {
        let mut exec = Executor::new().expect("failed to create executor");
        let elements: &[wrong_entry; 1] = &[wrong_entry { a: 10 }];

        let (tx, rx) =
            zx::Fifo::create(2, ::std::mem::size_of::<entry>()).expect("failed to create zx fifo");
        let (tx, _rx) = (
            Fifo::<wrong_entry>::from_fifo(tx).expect("failed to create async tx fifo"),
            Fifo::<entry>::from_fifo(rx).expect("failed to create async rx fifo"),
        );

        let sender = Timer::new(10.millis().after_now()).then(|()| tx.write_entries(elements));

        let res = exec.run_singlethreaded(sender);
        match res {
            Err(zx::Status::OUT_OF_RANGE) => (),
            _ => panic!("did not get out-of-range error"),
        }
    }

    #[test]
    fn write_into_full() {
        use std::sync::atomic::{AtomicUsize, Ordering};

        let mut exec = Executor::new().expect("failed to create executor");
        let elements: &[entry; 3] =
            &[entry { a: 10, b: 20 }, entry { a: 30, b: 40 }, entry { a: 50, b: 60 }];

        let (tx, rx) =
            zx::Fifo::create(2, ::std::mem::size_of::<entry>()).expect("failed to create zx fifo");
        let (tx, rx) = (
            Fifo::<entry>::from_fifo(tx).expect("failed to create async tx fifo"),
            Fifo::<entry>::from_fifo(rx).expect("failed to create async rx fifo"),
        );

        // Use `writes_completed` to verify that not all writes
        // are transmitted at once, and the last write is actually blocked.
        let writes_completed = AtomicUsize::new(0);
        let sender = async {
            tx.write_entries(&elements[..2]).await?;
            writes_completed.fetch_add(1, Ordering::SeqCst);
            tx.write_entries(&elements[2..]).await?;
            writes_completed.fetch_add(1, Ordering::SeqCst);
            Ok::<(), zx::Status>(())
        };

        // Wait 10 ms, then read the messages from the fifo.
        let receive_future = async {
            Timer::new(10.millis().after_now()).await;
            let entry = rx.read_entry().await?;
            assert_eq!(writes_completed.load(Ordering::SeqCst), 1);
            assert_eq!(elements[0], entry.expect("peer closed"));
            let entry = rx.read_entry().await?;
            // At this point, the last write may or may not have
            // been written.
            assert_eq!(elements[1], entry.expect("peer closed"));
            let entry = rx.read_entry().await?;
            assert_eq!(writes_completed.load(Ordering::SeqCst), 2);
            assert_eq!(elements[2], entry.expect("peer closed"));
            Ok::<(), zx::Status>(())
        };

        // add a timeout to receiver so if test is broken it doesn't take forever
        let receiver = receive_future.on_timeout(300.millis().after_now(), || panic!("timeout"));

        let done = try_join(receiver, sender);

        exec.run_singlethreaded(done).expect("failed to run receive future on executor");
    }

    #[test]
    fn write_more_than_full() {
        let mut exec = Executor::new().expect("failed to create executor");
        let elements: &[entry; 3] =
            &[entry { a: 10, b: 20 }, entry { a: 30, b: 40 }, entry { a: 50, b: 60 }];

        let (tx, rx) =
            zx::Fifo::create(2, ::std::mem::size_of::<entry>()).expect("failed to create zx fifo");
        let (tx, rx) = (
            Fifo::<entry>::from_fifo(tx).expect("failed to create async tx fifo"),
            Fifo::<entry>::from_fifo(rx).expect("failed to create async rx fifo"),
        );

        let sender = tx.write_entries(elements);

        // Wait 10 ms, then read the messages from the fifo.
        let receive_future = async {
            Timer::new(10.millis().after_now()).await;
            let entry = rx.read_entry().await?;
            assert_eq!(elements[0], entry.expect("peer closed"));
            let entry = rx.read_entry().await?;
            assert_eq!(elements[1], entry.expect("peer closed"));
            let entry = rx.read_entry().await?;
            assert_eq!(elements[2], entry.expect("peer closed"));
            Ok::<(), zx::Status>(())
        };

        // add a timeout to receiver so if test is broken it doesn't take forever
        let receiver = receive_future.on_timeout(300.millis().after_now(), || panic!("timeout"));

        let done = try_join(receiver, sender);

        exec.run_singlethreaded(done).expect("failed to run receive future on executor");
    }
}
