// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

//! Implementation of Generator thread and Generator trait.
//!
//! Generator thread accept a set of serializable arguments.
use {
    crate::file_target::FileBlockingTarget,
    crate::io_packet::IoPacketType,
    crate::issuer::{run_issuer, IssuerArgs},
    crate::log::Stats,
    crate::operations::{AvailableTargets, OperationType, PipelineStages, Target},
    crate::sequential_io_generator::SequentialIoGenerator,
    crate::verifier::{run_verifier, VerifierArgs},
    failure::Error,
    log::debug,
    serde_derive::{Deserialize, Serialize},
    std::{
        clone::Clone,
        collections::HashMap,
        ops::Range,
        sync::{
            mpsc::{channel, sync_channel, SyncSender},
            Arc, Condvar, Mutex,
        },
        thread::spawn,
        time::Instant,
    },
};

/// This structure provides a mechanism for issuer to block on commands from
/// generator or from verifiers. When command_count drops to zero, issuer blocks
/// on someone to wake them up.
/// When generator or verifier insert a command in issuer's channel they signal
/// the issuer to wake up.
#[derive(Clone)]
pub struct ActiveCommands {
    /// command_count indicates how many commands are in issuers queue.
    /// Mutex and condition variable protect and help to wait on the count.
    command_count: Arc<(Mutex<u64>, Condvar)>,
}

impl ActiveCommands {
    pub fn new() -> ActiveCommands {
        ActiveCommands { command_count: Arc::new((Mutex::new(0), Condvar::new())) }
    }

    /// Decrements number of active commands. Waits on the condition variable if
    /// command_count is zero. Returns true if command_count was zero and call
    /// was blocked.
    /// ```
    /// let mut count = ActiveCommands::new();
    ///
    /// Thread 1
    /// command_count.remove();
    /// cmd = receiver.try_recv();
    /// assert_eq!(cmd.is_ok());
    ///
    /// Thread 2
    /// sender.send(cmd);
    /// command_count.insert();
    /// ```
    pub fn decrement(&mut self) -> bool {
        let (lock, cvar) = &*self.command_count;
        let mut count = lock.lock().unwrap();
        let mut slept = false;

        while (*count) == 0 {
            slept = true;
            debug!("waiting to on command");
            count = cvar.wait(count).unwrap();
        }
        (*count) -= 1;
        slept
    }

    /// Increments command_count and notifies one waiter.
    pub fn increment(&mut self) {
        let &(ref lock, ref cvar) = &*self.command_count;
        let mut count = lock.lock().unwrap();
        (*count) += 1;
        cvar.notify_one();
    }

    /// Returns value of command_count. This returns a snap-shot in time value.
    /// By the time another action is performed based on previous value returned
    /// by count, the count may have changed. Currently, sender increments the
    /// count and reciever decrements it.
    pub fn count(&self) -> u64 {
        let &(ref lock, ref _cvar) = &*self.command_count;
        let count = lock.lock().unwrap();
        *count
    }
}

/// Generating an IoPacket involves several variants like
/// - data for the IO and it's checksum
/// - data size
/// - offset of the IO
/// - several other (future) things like file name, directory path.
/// When we want randomly generated IO to be repeatable, we need to generate
/// a random number from a seed and based on that random number, we derive
/// variants of the IO. A typical use of Generator would look something like
/// ```
///  let generator: Generator = create_my_awesome_generator();
///  while (disks_death) {
///     random_number = generator.generate_number();
///     io_range = generator.get_io_range();
///     io_type = generator.get_io_operation();
///     io_packet = create_io_packet(io_type, io_range);
///     generator.fill_buffer(io_packet);
///  }
/// ```
pub trait Generator {
    /// Generates a new [random] number and return it's value.
    /// TODO(auradkar): "It is a bit confusing that the generator is both providing random numbers,
    ///                  operations, and buffers.  Seems like it is operating at 3 different levels
    ///                  of abstraction... maybe split it into several different traits. "
    fn generate_number(&mut self) -> u64;

    /// Returns type of operation corresponding to the last generated [random]
    /// number
    fn get_io_operation(&self, allowed_ops: &Vec<OperationType>) -> OperationType;

    /// Returns Range (start and end] of IO operation. end - start gives the size
    /// of the IO
    fn get_io_range(&self) -> Range<u64>;

    /// Generates and fills the buf with data.
    fn fill_buffer(&self, buf: &mut Vec<u8>, sequence_number: u64, offset_range: Range<u64>);
}

/// GeneratorArgs contains only the fields that help generator make decisions
/// needed for re-playability. This structure can be serialized and saved
/// for possible later use.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct GeneratorArgs {
    /// magic_number helps to identify that the block was written
    /// by the app.
    magic_number: u64,

    /// process_id helps to differentiate this run from other runs
    process_id: u64,

    /// Human friendly name for this thread.
    name: String,

    /// Unique identifier for each generator.
    generator_unique_id: u64,

    /// Target block size. For some Targets,
    /// IO might fail if size of IO is not a multiple of
    /// block_size. This size is also used to watermark the
    /// block with block header
    block_size: u64,

    /// MTU per IO that Target can handle.
    /// 0 represents N/A for this Target
    max_io_size: u64,

    /// Hard alignment requirements without which IOs might fail
    align: bool,

    /// Seed that will be used to generate IOs in this thread
    seed: u64,

    /// Name of the target on which generator will perform IOs.
    target_name: String,

    /// target_range describes the portion of the Target
    /// the generator is allowed to work on. Other instances
    /// of Target may work on different ranges within the same
    /// Target.
    /// All generated IoPacket's offset and length should
    /// fall in this range
    target_range: Range<u64>,

    /// Target type. When there are multiple target types in the apps, this
    /// will help us search and load the right target operations.
    target_type: AvailableTargets,

    /// The maximum allowed number of outstanding IOs that are generated and
    /// are in Issuer queue. This number does not limit IOs that belong to verify
    /// operation.
    issuer_queue_depth: usize,

    /// The number of IOs that need to be issued before we gracefully tear-down
    /// generator thread.
    /// TODO(auradkar): Introduce time bound exit criteria.
    max_io_count: u64,

    /// When true, the target access (read/write) are sequential with respect to
    /// offsets within the target and within a thread.
    sequential: bool,
}

impl GeneratorArgs {
    pub fn new(
        magic_number: u64,
        process_id: u64,
        id: u64,
        block_size: u64,
        max_io_size: u64,
        align: bool,
        seed: u64,
        target_name: String,
        target_range: Range<u64>,
        target_type: AvailableTargets,
        issuer_queue_depth: usize,
        max_io_count: u64,
        sequential: bool,
    ) -> GeneratorArgs {
        GeneratorArgs {
            name: format!("generator-{}", id),
            generator_unique_id: id,
            block_size,
            max_io_size,
            align,
            seed,
            target_name,
            target_range,
            target_type,
            issuer_queue_depth,
            magic_number,
            process_id,
            max_io_count,
            sequential,
        }
    }
}

/// Based on the input args this returns a set of allowed operations that
/// generator is allowed to issue. For now we only allow writes.
fn pick_operation_type(_args: &GeneratorArgs) -> Vec<OperationType> {
    vec![OperationType::Write]
}

/// Based on the input args this returns a generator that can generate requested
/// IO load.For now we only allow sequential io.
fn pick_generator_type(args: &GeneratorArgs, target_id: u64) -> Box<Generator> {
    if !args.sequential {
        panic!("Only sequential io generator is implemented at the moment");
    }

    Box::new(SequentialIoGenerator::new(
        args.magic_number,
        args.process_id,
        target_id,
        args.generator_unique_id,
        args.target_range.clone(),
        args.block_size,
        args.max_io_size,
        args.align,
    ))
}

/// Based on the input args, create_target searches available Targets and
/// creates an appropriate Target trait.
fn create_target(
    target_type: AvailableTargets,
    target_id: u64,
    target_name: String,
    offset_range: Range<u64>,
    start_instant: Instant,
) -> Arc<Box<Target + Send + Sync>> {
    // Manually check what is passed is what is supported.

    match target_type {
        AvailableTargets::FileTarget => {
            FileBlockingTarget::new(target_name, target_id, offset_range, start_instant)
        }
    }
}

fn run_generator(
    args: &GeneratorArgs,
    to_issuer: &SyncSender<IoPacketType>,
    active_commands: &mut ActiveCommands,
    start_instant: Instant,
    io_map: Arc<Mutex<HashMap<u64, IoPacketType>>>,
) -> Result<(), Error> {
    // Generator specific target unique id.
    let target_id = 0;

    // An array of allowed operations that helps generator to pick an operation
    // based on generated random number.
    let allowed_operations = pick_operation_type(args);

    // IO sequence number. Order of IOs issued need not be same as order they arrive at
    // verifier and get logged. While replaying, this number helps us determine order
    // to issue IOs irrespective of the order they are read from replay log.
    let io_sequence_number = 0;

    // The generator's stage in lifetime of an IO
    let stage = PipelineStages::Generate;

    let mut gen = pick_generator_type(&args, target_id);

    let target = create_target(
        args.target_type,
        target_id,
        args.target_name.clone(),
        args.target_range.clone(),
        start_instant,
    );

    for io_sequence_number in 1..(args.max_io_count + 1) {
        if active_commands.count() == 0 {
            debug!("{} running slow.", args.name);
        }

        let io_seed = gen.generate_number();
        let io_range = gen.get_io_range();
        let op_type = gen.get_io_operation(&allowed_operations);

        let mut io_packet =
            target.create_io_packet(op_type, io_sequence_number, io_seed, io_range, target.clone());
        io_packet.timestamp_stage_start(stage);

        let io_offset_range = io_packet.io_offset_range().clone();
        gen.fill_buffer(io_packet.buffer_mut(), io_sequence_number, io_offset_range);
        {
            let mut map = io_map.lock().unwrap();
            map.insert(io_sequence_number, io_packet.clone());
        }
        io_packet.timestamp_stage_end(stage);
        to_issuer.send(io_packet).expect("error sending command");
        active_commands.increment();
    }

    let io_packet =
        target.create_io_packet(OperationType::Exit, io_sequence_number, 4, 0..1, target.clone());
    to_issuer.send(io_packet).expect("error sending exit command");
    active_commands.increment();
    Ok(())
}

/// Function that creates verifier and issuer thread. It build channels for them to communicate.
/// This thread assumes the role of generator.
pub fn run_load(
    args: GeneratorArgs,
    start_instant: Instant,
    stats: Arc<Mutex<Stats>>,
) -> Result<(), Error> {
    // Channel used to send commands from generator to issuer
    // This is the only bounded channel. The throttle control happens over this channel.
    // TODO(auradkar): Considering ActiveCommands and this channel are so tightly related, should
    // this channel be part of the ActiveCommand implementation?

    let (gi_to_issuer, gi_from_generator) = sync_channel(args.issuer_queue_depth);

    // Channel used to send commands from issuer to verifier
    let (iv_to_verifier, iv_from_issuer) = channel();

    // Channel used to send commands from verifier to generator
    let (vi_to_issuer, vi_from_verifier) = channel();

    // A hashmap of all outstanding IOs. Shared between generator and verifier.
    // Generator inserts entries and verifier removes it.
    let io_map = Arc::new(Mutex::new(HashMap::new()));

    // Mechanism to notify issuer of IOs.
    let mut active_commands = ActiveCommands::new();

    // Thread handle to wait on for joining.
    let mut thread_handles = vec![];

    // Create Issuer
    let issuer_args = IssuerArgs::new(
        format!("issues-{}", args.generator_unique_id),
        0,
        gi_from_generator,
        iv_to_verifier,
        vi_from_verifier,
        active_commands.clone(),
    );
    thread_handles.push(spawn(move || run_issuer(issuer_args)));

    // Create verifier
    let verifier_args = VerifierArgs::new(
        format!("verifier-{}", args.generator_unique_id),
        0,
        iv_from_issuer,
        vi_to_issuer,
        false,
        io_map.clone(),
        stats.clone(),
        active_commands.clone(),
    );
    thread_handles.push(spawn(move || run_verifier(verifier_args)));

    run_generator(&args, &gi_to_issuer, &mut active_commands, start_instant, io_map)?;

    for handle in thread_handles {
        handle.join().unwrap()?;
    }
    stats.lock().unwrap().stop_clock();
    Ok(())
}

#[cfg(test)]
mod tests {
    use {
        crate::generator::ActiveCommands,
        std::thread::sleep,
        std::{thread, time},
    };

    #[test]
    fn active_command_test() {
        let mut command_count = ActiveCommands::new();
        assert_eq!(command_count.count(), 0);

        command_count.increment();
        assert_eq!(command_count.count(), 1);

        command_count.increment();
        assert_eq!(command_count.count(), 2);

        assert_eq!(command_count.decrement(), false);
        assert_eq!(command_count.count(), 1);

        assert_eq!(command_count.decrement(), false);
        assert_eq!(command_count.count(), 0);
    }

    #[test]
    fn active_command_block_test() {
        let mut command_count = ActiveCommands::new();
        assert_eq!(command_count.count(), 0);
        let mut command_count_copy = command_count.clone();

        command_count.increment();

        let thd = thread::spawn(move || {
            sleep(time::Duration::from_secs(1));
            // First repay will wake the other threads sleeping borrower.
            command_count_copy.increment();
        });

        // On first call we dont block as the we find it immediately
        assert_eq!(command_count.decrement(), false);

        // On second call we block as the thread that is supposed to increment in
        // sleeping for a second.
        assert_eq!(command_count.decrement(), true);
        let _ = thd.join();

        // command count should be zero now
        assert_eq!(command_count.count(), 0);
    }
}
