// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

pub mod ap;
pub mod client;
pub mod mesh;

use failure::{bail, format_err};
use fidl_fuchsia_wlan_mlme::{self as fidl_mlme, MlmeEvent, MlmeEventStream, MlmeProxy};
use fidl_fuchsia_wlan_stats::IfaceStats;
use fuchsia_async as fasync;
use futures::channel::mpsc;
use futures::prelude::*;
use futures::select;
use log::warn;
use pin_utils::pin_mut;
use std::marker::Unpin;
use std::sync::{Arc, Mutex};
use wlan_sme::{
    timer::{TimeEntry, TimedEvent},
    MlmeRequest, MlmeStream, Station,
};

use crate::fidl_util::is_peer_closed;
use crate::stats_scheduler::StatsRequest;
use crate::Never;

// The returned future successfully terminates when MLME closes the channel
async fn serve_mlme_sme<STA, SRS, TS>(
    proxy: MlmeProxy,
    mut event_stream: MlmeEventStream,
    station: Arc<Mutex<STA>>,
    mut mlme_stream: MlmeStream,
    stats_requests: SRS,
    time_stream: TS,
) -> Result<(), failure::Error>
where
    STA: Station,
    SRS: Stream<Item = StatsRequest> + Unpin,
    TS: Stream<Item = TimeEntry<<STA as wlan_sme::Station>::Event>> + Unpin,
{
    let (mut stats_sender, stats_receiver) = mpsc::channel(1);
    let stats_fut = serve_stats(proxy.clone(), stats_requests, stats_receiver);
    pin_mut!(stats_fut);
    let mut stats_fut = stats_fut.fuse();

    let mut timeout_stream = make_timer_stream(time_stream).fuse();

    loop {
        select! {
            // Fuse rationale: any `none`s in the MLME stream should result in
            // bailing immediately, so we don't need to track if we've seen a
            // `None` or not and can `fuse` directly in the `select` call.
            mlme_event = event_stream.next().fuse() => match mlme_event {
                // Handle the stats response separately since it is SME-independent
                Some(Ok(MlmeEvent::StatsQueryResp{ resp })) => handle_stats_resp(&mut stats_sender, resp)?,
                Some(Ok(other)) => station.lock().unwrap().on_mlme_event(other),
                Some(Err(ref e)) if is_peer_closed(e) => return Ok(()),
                None => return Ok(()),
                Some(Err(e)) => bail!("Error reading an event from MLME channel: {}", e),
            },
            mlme_req = mlme_stream.next().fuse() => match mlme_req {
                Some(req) => match forward_mlme_request(req, &proxy) {
                    Ok(()) => {},
                    Err(ref e) if is_peer_closed(e) => return Ok(()),
                    Err(e) => bail!("Error forwarding a request from SME to MLME: {}", e),
                },
                None => bail!("Stream of requests from SME to MLME has ended unexpectedly"),
            },
            timeout = timeout_stream.next() => match timeout {
                Some(timed_event) => station.lock().unwrap().on_timeout(timed_event),
                None => bail!("SME timer stream has ended unexpectedly"),
            },
            stats = stats_fut => stats?.into_any(),
        }
    }
}

fn make_timer_stream<E>(
    time_stream: impl Stream<Item = TimeEntry<E>>,
) -> impl Stream<Item = TimedEvent<E>> {
    time_stream
        .map(|(deadline, timed_event)| fasync::Timer::new(deadline).map(|_| timed_event))
        .buffer_unordered(usize::max_value())
}

fn forward_mlme_request(req: MlmeRequest, proxy: &MlmeProxy) -> Result<(), fidl::Error> {
    match req {
        MlmeRequest::Scan(mut req) => proxy.start_scan(&mut req),
        MlmeRequest::Join(mut req) => proxy.join_req(&mut req),
        MlmeRequest::Authenticate(mut req) => proxy.authenticate_req(&mut req),
        MlmeRequest::AuthResponse(mut resp) => proxy.authenticate_resp(&mut resp),
        MlmeRequest::Associate(mut req) => proxy.associate_req(&mut req),
        MlmeRequest::AssocResponse(mut resp) => proxy.associate_resp(&mut resp),
        MlmeRequest::Deauthenticate(mut req) => proxy.deauthenticate_req(&mut req),
        MlmeRequest::Eapol(mut req) => proxy.eapol_req(&mut req),
        MlmeRequest::SetKeys(mut req) => proxy.set_keys_req(&mut req),
        MlmeRequest::SetCtrlPort(mut req) => proxy.set_controlled_port(&mut req),
        MlmeRequest::Start(mut req) => proxy.start_req(&mut req),
        MlmeRequest::Stop(mut req) => proxy.stop_req(&mut req),
        MlmeRequest::SendMpOpenAction(mut req) => proxy.send_mp_open_action(&mut req),
        MlmeRequest::SendMpConfirmAction(mut req) => proxy.send_mp_confirm_action(&mut req),
        MlmeRequest::MeshPeeringEstablished(mut req) => proxy.mesh_peering_established(&mut req),
    }
}

fn handle_stats_resp(
    stats_sender: &mut mpsc::Sender<IfaceStats>,
    resp: fidl_mlme::StatsQueryResponse,
) -> Result<(), failure::Error> {
    stats_sender.try_send(resp.stats).or_else(|e| {
        if e.is_full() {
            // We only expect one response from MLME per each request, so the bounded
            // queue of size 1 should always suffice.
            warn!("Received an extra GetStatsResp from MLME, discarding");
            Ok(())
        } else {
            Err(format_err!("Failed to send a message to stats future"))
        }
    })
}

async fn serve_stats<S>(
    proxy: MlmeProxy,
    mut stats_requests: S,
    mut responses: mpsc::Receiver<IfaceStats>,
) -> Result<Never, failure::Error>
where
    S: Stream<Item = StatsRequest> + Unpin,
{
    while let Some(req) = await!(stats_requests.next()) {
        proxy
            .stats_query_req()
            .map_err(|e| format_err!("Failed to send a StatsReq to MLME: {}", e))?;
        match await!(responses.next()) {
            Some(response) => req.reply(response),
            None => bail!("Stream of stats responses has ended unexpectedly"),
        };
    }
    Err(format_err!("Stream of stats requests has ended unexpectedly"))
}

#[cfg(test)]
mod tests {
    use super::*;
    use {
        fuchsia_zircon::{self as zx, prelude::DurationNum},
        futures::channel::mpsc::{self, UnboundedSender},
        futures::Poll,
        pin_utils::pin_mut,
    };

    type Event = u32;

    #[test]
    fn test_timer() {
        let mut exec = fasync::Executor::new().expect("failed to create an executor");
        let fut = async {
            let (timer, time_stream) = mpsc::unbounded::<TimeEntry<Event>>();
            let mut timeout_stream = make_timer_stream(time_stream);
            let now = zx::Time::get(zx::ClockId::Monotonic);
            schedule(&timer, now + 40.millis(), 0);
            schedule(&timer, now + 10.millis(), 1);
            schedule(&timer, now + 20.millis(), 2);
            schedule(&timer, now + 30.millis(), 3);

            let mut events = vec![];
            for _ in 0u32..4 {
                match await!(timeout_stream.next()) {
                    Some(event) => events.push(event.event),
                    None => panic!("timer terminates prematurely"),
                }
            }
            events
        };
        pin_mut!(fut);
        for _ in 0u32..4 {
            assert_eq!(Poll::Pending, exec.run_until_stalled(&mut fut));
            assert!(exec.wake_next_timer().is_some());
        }
        match exec.run_until_stalled(&mut fut) {
            Poll::Ready(events) => assert_eq!(events, vec![1, 2, 3, 0]),
            Poll::Pending => panic!("expect future to complete"),
        }
    }

    fn schedule(timer: &UnboundedSender<TimeEntry<Event>>, deadline: zx::Time, event: Event) {
        let entry = (deadline, TimedEvent { id: 0, event });
        timer.unbounded_send(entry).expect("expect send successful");
    }
}
