// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use {
    crate::{
        lsm_tree::merge,
        object_handle::ReadObjectHandle,
        serialized_types::{Version, Versioned, VersionedLatest},
    },
    anyhow::Error,
    async_trait::async_trait,
    async_utils::event::Event,
    serde::{Deserialize, Serialize},
    std::{fmt::Debug, sync::Arc},
};

/// Keys and values need to implement the following traits.  For merging, they need to implement
/// MergeableKey.  TODO: Use trait_alias when available.
pub trait Key:
    Clone
    + OrdUpperBound
    + Send
    + Sync
    + Versioned
    + VersionedLatest
    + Debug
    + std::marker::Unpin
    + 'static
{
}

pub trait RangeKey: Key {
    /// Returns if two keys overlap.
    fn overlaps(&self, other: &Self) -> bool;
}

impl<K> Key for K where
    K: Clone
        + OrdUpperBound
        + Send
        + Sync
        + Versioned
        + VersionedLatest
        + Debug
        + std::marker::Unpin
        + 'static
{
}

pub trait MergeableKey: Key + Eq + NextKey + OrdLowerBound {}
impl<K> MergeableKey for K where K: Key + Eq + NextKey + OrdLowerBound {}

pub trait Value:
    Clone + Send + Sync + Versioned + VersionedLatest + Debug + std::marker::Unpin + 'static
{
}
impl<V> Value for V where
    V: Clone + Send + Sync + Versioned + VersionedLatest + Debug + std::marker::Unpin + 'static
{
}

/// ItemRef is a struct that contains references to key and value, which is useful since in many
/// cases since keys and values are stored separately so &Item is not possible.
#[derive(Debug, Serialize)]
pub struct ItemRef<'a, K, V> {
    pub key: &'a K,
    pub value: &'a V,
    pub sequence: u64,
}

impl<K: PartialEq, V: PartialEq> PartialEq for ItemRef<'_, K, V> {
    fn eq(&self, other: &Self) -> bool {
        self.key == other.key && self.value == other.value
    }
}

impl<K: PartialEq, V: PartialEq> Eq for ItemRef<'_, K, V> {}

impl<K: Clone, V: Clone> ItemRef<'_, K, V> {
    pub fn cloned(&self) -> Item<K, V> {
        Item { key: self.key.clone(), value: self.value.clone(), sequence: self.sequence }
    }
}

impl<'a, K, V> Clone for ItemRef<'a, K, V> {
    fn clone(&self) -> Self {
        ItemRef { key: self.key, value: self.value, sequence: self.sequence }
    }
}
impl<'a, K, V> Copy for ItemRef<'a, K, V> {}

/// Item is a struct that combines a key and a value.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Item<K, V> {
    pub key: K,
    pub value: V,
    /// |sequence| is a monotonically increasing sequence number for the Item, which is set when the
    /// Item is inserted into the tree.  In practice, this is the journal file offset at the time of
    /// committing the transaction containing the Item.  Note that two or more Items may share the
    /// same |sequence|.
    pub sequence: u64,
}

impl<K: PartialEq, V: PartialEq> PartialEq for Item<K, V> {
    fn eq(&self, other: &Self) -> bool {
        self.key == other.key && self.value == other.value
    }
}

impl<K: PartialEq, V: PartialEq> Eq for Item<K, V> {}

impl<K, V> Item<K, V> {
    pub fn new(key: K, value: V) -> Item<K, V> {
        Item { key, value, sequence: 0u64 }
    }

    pub fn new_with_sequence(key: K, value: V, sequence: u64) -> Item<K, V> {
        Item { key, value, sequence }
    }

    pub fn as_item_ref(&self) -> ItemRef<'_, K, V> {
        self.into()
    }
}

impl<'a, K, V> From<&'a Item<K, V>> for ItemRef<'a, K, V> {
    fn from(item: &'a Item<K, V>) -> ItemRef<'a, K, V> {
        ItemRef { key: &item.key, value: &item.value, sequence: item.sequence }
    }
}

/// The find functions will return items with keys that are greater-than or equal to the search key,
/// so for keys that are like extents, the keys should sort (via OrdUpperBound) using the end
/// of their ranges, and you should set the search key accordingly.
///
/// For example, let's say the tree holds extents 100..200, 200..250 and you want to perform a read
/// for range 150..250, you should search for 0..151 which will first return the extent 100..200
/// (and then the iterator can be advanced to 200..250 after). When merging, keys can overlap, so
/// consider the case where we want to merge an extent with range 100..300 with an existing extent
/// of 200..250. In that case, we want to treat the extent with range 100..300 as lower than the key
/// 200..250 because we'll likely want to split the extents (e.g. perhaps we want 100..200,
/// 200..250, 250..300), so for merging, we need to use a different comparison function and we deal
/// with that using the OrdLowerBound trait.
///
/// If your keys don't have overlapping ranges that need to be merged, then these can be the same as
/// std::cmp::Ord (use the DefaultOrdUpperBound and DefaultOrdLowerBound traits).

pub trait OrdUpperBound {
    fn cmp_upper_bound(&self, other: &Self) -> std::cmp::Ordering;
}

pub trait DefaultOrdUpperBound: OrdUpperBound + Ord {}

impl<T: DefaultOrdUpperBound> OrdUpperBound for T {
    fn cmp_upper_bound(&self, other: &Self) -> std::cmp::Ordering {
        // Default to using cmp.
        self.cmp(other)
    }
}

pub trait OrdLowerBound {
    fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering;
}

pub trait DefaultOrdLowerBound: OrdLowerBound + Ord {}

impl<T: DefaultOrdLowerBound> OrdLowerBound for T {
    fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
        // Default to using cmp.
        self.cmp(other)
    }
}

/// The NextKey trait allows for an optimisation which allows the merger to avoid querying a layer
/// if it knows it has found the next possible key.  Consider the following example showing two
/// layers with range based keys.
///
///      +----------+------------+
///  0   |  0..100  |  100..200  |
///      +----------+------------+
///  1              |  100..200  |
///                 +------------+
///
/// If you search and find the 0..100 key, then only layer 0 will be touched.  If you then want to
/// advance to the 100..200 record, you can find it in layer 0 but unless you know that it
/// immediately follows the 0..100 key i.e. that there is no possible key, K, such that 0..100 < K <
/// 100..200, the merger has to consult all other layers to check.  next_key should return a key, N,
/// such that if the merger encounters a key that is <= N (using OrdLowerBound), it can stop
/// touching more layers.  The key N should also be the the key to search for in other layers if the
/// merger needs to do so.  In the example above, it should be a key that is > 0..100 and 99..100,
/// but <= 100..200 (using OrdUpperBound).  In practice, what this means is that for range based
/// keys, OrdUpperBound should use the end of the range, OrdLowerBound should use the start of the
/// range, and next_key should return end..end + 1.  This is purely an optimisation; the default
/// will be correct but not performant.
pub trait NextKey: Clone {
    fn next_key(&self) -> Option<Self> {
        None
    }
}

/// Layer is a trait that all layers need to implement (mutable and immutable).
#[async_trait]
pub trait Layer<K, V>: Send + Sync {
    fn handle(&self) -> Option<&dyn ReadObjectHandle> {
        None
    }

    /// Searches for a key. Bound::Excluded is not supported. Bound::Unbounded positions the
    /// iterator on the first item in the layer.
    async fn seek(&self, bound: std::ops::Bound<&K>)
        -> Result<BoxedLayerIterator<'_, K, V>, Error>;

    /// Locks the layer preventing it from being closed. This will never block i.e. there can be
    /// many locks concurrently.  The lock is purely advisory: seek will still work even if lock has
    /// not been called; it merely causes close to wait until all locks are released.  Returns None
    /// if close has been called for the layer.
    fn lock(&self) -> Option<Event>;

    /// Waits for existing locks readers to finish and then returns.  Subsequent calls to lock will
    /// return None.
    async fn close(&self);

    /// Returns the version number used by structs in this layer
    fn get_version(&self) -> Version;
}

/// MutableLayer is a trait that only mutable layers need to implement.
#[async_trait]
pub trait MutableLayer<K, V>: Layer<K, V> {
    fn as_layer(self: Arc<Self>) -> Arc<dyn Layer<K, V>>;

    /// Inserts the given item into the layer. The item *must* not already exist.
    async fn insert(&self, item: Item<K, V>);

    /// Merges the given item into the layer. `lower_bound` is the key to search for that should
    /// provide the first potential item to be merged with.
    async fn merge_into(&self, item: Item<K, V>, lower_bound: &K, merge_fn: merge::MergeFn<K, V>);

    /// Inserts or replaces an item.
    async fn replace_or_insert(&self, item: Item<K, V>);

    /// Returns the number of items in the layer.
    fn len(&self) -> usize;
}

/// Something that implements LayerIterator is returned by the seek function.
#[async_trait]
pub trait LayerIterator<K, V>: Send + Sync {
    /// Advances the iterator.
    async fn advance(&mut self) -> Result<(), Error>;

    /// Returns the current item. This will be None if called when the iterator is first crated i.e.
    /// before either seek or advance has been called, and None if the iterator has reached the end
    /// of the layer.
    fn get(&self) -> Option<ItemRef<'_, K, V>>;
}

pub type BoxedLayerIterator<'iter, K, V> = Box<dyn LayerIterator<K, V> + 'iter>;

#[async_trait]
pub trait LayerIteratorFilter<'a, K, V> {
    /// Mirrors std::iter::Iterator::filter.
    async fn filter<P: for<'b> Fn(ItemRef<'b, K, V>) -> bool + Send + Sync>(
        self,
        predicate: P,
    ) -> Result<Filter<'a, K, V, P>, Error>;
}

#[async_trait]
impl<'a, K, V> LayerIteratorFilter<'a, K, V> for BoxedLayerIterator<'a, K, V> {
    async fn filter<P: for<'b> Fn(ItemRef<'b, K, V>) -> bool + Send + Sync>(
        self,
        predicate: P,
    ) -> Result<Filter<'a, K, V, P>, Error> {
        Filter::new(self, predicate).await
    }
}

#[async_trait]
impl<'iter, K, V> LayerIterator<K, V> for BoxedLayerIterator<'iter, K, V> {
    async fn advance(&mut self) -> Result<(), Error> {
        (**self).advance().await
    }
    fn get(&self) -> Option<ItemRef<'_, K, V>> {
        (**self).get()
    }
}

/// Mutable layers need an iterator that implements this in order to make merge_into work.
#[async_trait]
pub(super) trait LayerIteratorMut<K, V>: LayerIterator<K, V> {
    /// Casts to super-traits.
    fn as_iterator_mut(&mut self) -> &mut dyn LayerIterator<K, V>;
    fn as_iterator(&self) -> &dyn LayerIterator<K, V>;

    /// Erases the item that the iterator is currently pointing at. Afterwards, the iterator will
    /// be pointing at the item that follows.
    fn erase(&mut self);

    /// Inserts the given item immediately prior to the item the iterator is currently pointing at.
    fn insert(&mut self, item: Item<K, V>);

    /// Commits changes and waits for any existing readers to finish.
    async fn commit_and_wait(&mut self);
}

/// Trait for writing new layers.
#[async_trait]
pub trait LayerWriter<K, V>
where
    K: Debug + Send + Versioned + Sync,
    V: Debug + Send + Versioned + Sync,
{
    /// Writes the given item to this layer.
    async fn write(&mut self, item: ItemRef<'_, K, V>) -> Result<(), Error>;

    /// Flushes any buffered items to the backing storage.
    async fn flush(&mut self) -> Result<(), Error>;
}

/// A helper trait that converts arrays of layers into arrays of references to layers.
pub trait IntoLayerRefs<'a, K, V, T: AsRef<U> + 'a, U: ?Sized>
where
    Self: IntoIterator<Item = &'a T>,
{
    fn into_layer_refs(self) -> Box<[&'a dyn Layer<K, V>]>;
}

// Generic implementation where we need the cast to &dyn Layer.
impl<'a, K, V, T: AsRef<U>, U: Layer<K, V> + 'a> IntoLayerRefs<'a, K, V, T, U> for &'a [T] {
    fn into_layer_refs(self) -> Box<[&'a dyn Layer<K, V>]> {
        let refs: Vec<_> = self.iter().map(|x| x.as_ref() as &dyn Layer<K, V>).collect();
        refs.into_boxed_slice()
    }
}

// Generic implementation where we already have &dyn Layer.
impl<'a, K, V, T: AsRef<dyn Layer<K, V>>> IntoLayerRefs<'a, K, V, T, dyn Layer<K, V>>
    for &'a [T]
{
    fn into_layer_refs(self) -> Box<[&'a dyn Layer<K, V>]> {
        let refs: Vec<_> = self.iter().map(|x| x.as_ref()).collect();
        refs.into_boxed_slice()
    }
}

pub struct Filter<'a, K, V, P> {
    iter: BoxedLayerIterator<'a, K, V>,
    predicate: P,
}

impl<'a, K, V, P: for<'b> Fn(ItemRef<'b, K, V>) -> bool + Send + Sync> Filter<'a, K, V, P> {
    async fn new(
        iter: BoxedLayerIterator<'a, K, V>,
        predicate: P,
    ) -> Result<Filter<'a, K, V, P>, Error> {
        let mut filter = Filter { iter, predicate };
        filter.skip_filtered().await?;
        Ok(filter)
    }

    async fn skip_filtered(&mut self) -> Result<(), Error> {
        loop {
            match self.iter.get() {
                Some(item) if !(self.predicate)(item) => {}
                _ => return Ok(()),
            }
            self.iter.advance().await?;
        }
    }
}

#[async_trait]
impl<K: Send + Sync, V: Send + Sync, P: for<'b> Fn(ItemRef<'b, K, V>) -> bool + Send + Sync>
    LayerIterator<K, V> for Filter<'_, K, V, P>
{
    async fn advance(&mut self) -> Result<(), Error> {
        self.iter.advance().await?;
        self.skip_filtered().await
    }

    fn get(&self) -> Option<ItemRef<'_, K, V>> {
        self.iter.get()
    }
}
