// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use {
    crate::constants::{MAX_RETRY_COUNT, RETRY_DELAY, SOCKET},
    crate::discovery::{TargetFinder, TargetFinderConfig},
    crate::logger::setup_logger,
    crate::mdns::MdnsTargetFinder,
    crate::ok_or_continue,
    crate::onet,
    crate::ssh::build_ssh_command,
    crate::target::{RCSConnection, Target, TargetCollection},
    anyhow::{anyhow, Context, Error},
    async_std::task,
    async_trait::async_trait,
    fidl::endpoints::{ClientEnd, RequestStream, ServiceMarker},
    fidl_fuchsia_developer_bridge::{
        DaemonError, DaemonMarker, DaemonRequest, DaemonRequestStream,
    },
    fidl_fuchsia_developer_remotecontrol::RemoteControlMarker,
    fidl_fuchsia_overnet::{
        ServiceConsumerProxyInterface, ServiceProviderRequest, ServiceProviderRequestStream,
    },
    futures::channel::mpsc,
    futures::lock::Mutex,
    futures::prelude::*,
    hoist::spawn,
    std::rc::Rc,
    std::sync::Arc,
    std::time::Duration,
};

#[async_trait]
pub trait DiscoveryHook {
    async fn on_new_target(&self, target: &Arc<Target>, tc: &Arc<TargetCollection>);
}

#[derive(Default)]
struct RCSActivatorHook {}

#[async_trait]
impl DiscoveryHook for RCSActivatorHook {
    async fn on_new_target(&self, target: &Arc<Target>, _tc: &Arc<TargetCollection>) {
        let addrs_clone = target.clone_addrs().await;
        let mut state = target.state.lock().await;
        if state.overnet_started {
            return;
        }

        {
            let mut host_pipe = state.host_pipe.lock().await;

            if host_pipe.is_none() {
                match onet::connect_to_onet(&target, addrs_clone).await {
                    Ok(c) => {
                        host_pipe.replace(c);
                    }
                    Err(e) => {
                        log::warn!(
                            "failed to start host-pipe process for '{}': {}",
                            target.nodename,
                            e
                        );
                        return;
                    }
                }
            }
        }

        match Daemon::start_remote_control(&target).await {
            Ok(()) => state.overnet_started = true,
            Err(e) => {
                log::warn!("unable to start remote control for '{}': {}", target.nodename, e);
                return;
            }
        }
    }
}

// Daemon
#[derive(Clone)]
pub struct Daemon {
    target_collection: Arc<TargetCollection>,

    discovered_target_hooks: Arc<Mutex<Vec<Rc<dyn DiscoveryHook>>>>,
}

impl Daemon {
    pub async fn new() -> Result<Daemon, Error> {
        log::info!("Starting daemon overnet server");
        let (tx, rx) = mpsc::unbounded::<Target>();
        let target_collection = Arc::new(TargetCollection::new());
        let discovered_target_hooks = Arc::new(Mutex::new(Vec::<Rc<dyn DiscoveryHook>>::new()));
        Daemon::spawn_receiver_loop(rx, target_collection.clone(), discovered_target_hooks.clone());
        Daemon::spawn_onet_discovery(target_collection.clone());
        let mut d = Daemon {
            target_collection: target_collection.clone(),
            discovered_target_hooks: discovered_target_hooks.clone(),
        };
        d.register_hook(RCSActivatorHook::default()).await;

        // MDNS must be started as late as possible to avoid races with registered
        // hooks.
        let config =
            TargetFinderConfig { broadcast_interval: Duration::from_secs(120), mdns_ttl: 255 };
        let mdns = MdnsTargetFinder::new(&config)?;
        mdns.start(&tx)?;

        Ok(d)
    }

    pub async fn register_hook(&mut self, cb: impl DiscoveryHook + 'static) {
        let mut hooks = self.discovered_target_hooks.lock().await;
        hooks.push(Rc::new(cb));
    }

    pub fn spawn_receiver_loop(
        mut rx: mpsc::UnboundedReceiver<Target>,
        tc: Arc<TargetCollection>,
        hooks: Arc<Mutex<Vec<Rc<dyn DiscoveryHook>>>>,
    ) {
        spawn(async move {
            loop {
                let target = rx.next().await.unwrap();
                let target_clone = tc.merge_insert(target).await;
                let tc_clone = tc.clone();
                let hooks_clone = (*hooks.lock().await).clone();
                spawn(async move {
                    futures::future::join_all(
                        hooks_clone.iter().map(|hook| hook.on_new_target(&target_clone, &tc_clone)),
                    )
                    .await;
                });
            }
        });
    }

    #[cfg(test)]
    pub fn new_with_rx(rx: mpsc::UnboundedReceiver<Target>) -> Daemon {
        let target_collection = Arc::new(TargetCollection::new());
        let discovered_target_hooks = Arc::new(Mutex::new(Vec::<Rc<dyn DiscoveryHook>>::new()));
        Daemon::spawn_receiver_loop(rx, target_collection.clone(), discovered_target_hooks.clone());
        Daemon { target_collection, discovered_target_hooks }
    }

    pub async fn handle_requests_from_stream(
        &self,
        mut stream: DaemonRequestStream,
        quiet: bool,
    ) -> Result<(), Error> {
        while let Some(req) = stream.try_next().await? {
            self.handle_request(req, quiet).await?;
        }
        Ok(())
    }

    pub fn spawn_onet_discovery(tc: Arc<TargetCollection>) {
        spawn(async move {
            let svc = hoist::connect_as_service_consumer().unwrap();
            loop {
                let peers = svc.list_peers().await.unwrap();
                for mut peer in peers {
                    if peer.description.services.is_none() {
                        continue;
                    }
                    if peer
                        .description
                        .services
                        .unwrap()
                        .iter()
                        .find(|name| *name == RemoteControlMarker::NAME)
                        .is_none()
                    {
                        continue;
                    }
                    if tc.get(peer.id.id.into()).await.is_some() {
                        continue;
                    }
                    let remote_control_proxy = ok_or_continue!(RCSConnection::new(&mut peer.id)
                        .await
                        .context("unable to convert proxy to target"));
                    let target = ok_or_continue!(
                        Target::from_rcs_connection(remote_control_proxy).await,
                        "unable to convert proxy to target",
                    );
                    tc.merge_insert(target).await;
                }
            }
        });
    }

    async fn start_remote_control(target: &Target) -> Result<(), Error> {
        for _ in 0..MAX_RETRY_COUNT {
            let args = [
                "run",
                "fuchsia-pkg://fuchsia.com/remote-control-runner#meta/remote-control-runner.cmx",
            ];
            let mut cmd = build_ssh_command(target.clone_addrs().await, args.to_vec()).await?;

            let output = cmd
                .stdin(std::process::Stdio::null())
                .output()
                .context("Failed to SSH into device")?;
            if output.stdout.starts_with(b"Successfully") {
                return Ok(());
            }
            task::sleep(RETRY_DELAY).await;
        }

        Err(anyhow!("Starting RCS failed. Check target system logs for details."))
    }

    /// Attempts to get at most one target. If there is more than one target,
    /// returns an error.
    /// TODO(fxb/47843): Implement target lookup for commands to deprecate this
    /// function, and as a result remove the inner_lock() function.
    async fn target_from_cache(&self) -> Result<Arc<Target>, Error> {
        let targets = self.target_collection.inner_lock().await;
        if targets.len() > 1 {
            return Err(anyhow!("more than one target"));
        }

        match targets.values().next() {
            Some(t) => Ok(t.clone()),
            None => Err(anyhow!("no targets found")),
        }
    }

    pub async fn handle_request(&self, req: DaemonRequest, quiet: bool) -> Result<(), Error> {
        log::debug!("daemon received request: {:?}", req);
        match req {
            DaemonRequest::EchoString { value, responder } => {
                if !quiet {
                    log::info!("Received echo request for string {:?}", value);
                }
                responder.send(value.as_ref()).context("error sending response")?;
                if !quiet {
                    log::info!("echo response sent successfully");
                }
            }
            DaemonRequest::ListTargets { value, responder } => {
                if !quiet {
                    log::info!("Received list target request for '{:?}'", value);
                }
                // TODO(awdavies): Make this into a common format for easy
                // parsing.
                let response = match value.as_ref() {
                    "" => futures::future::join_all(
                        self.target_collection.targets().await.iter().map(|t| t.to_string_async()),
                    )
                    .await
                    .join("\n"),
                    _ => format!(
                        "{}",
                        match self.target_collection.get(value.into()).await {
                            Some(t) => t.to_string_async().await,
                            None => String::new(),
                        }
                    ),
                };
                responder.send(response.as_ref()).context("error sending response")?;
            }
            DaemonRequest::GetRemoteControl { remote, responder } => {
                let target = match self.target_from_cache().await {
                    Ok(t) => t,
                    Err(e) => {
                        log::warn!("{}", e);
                        responder
                            .send(&mut Err(DaemonError::TargetCacheError))
                            .context("sending error response")?;
                        return Ok(());
                    }
                };
                let mut target_state =
                    match target.wait_for_state_with_rcs(MAX_RETRY_COUNT, RETRY_DELAY).await {
                        Ok(state) => state,
                        Err(e) => {
                            log::warn!("{}", e);
                            responder
                                .send(&mut Err(DaemonError::TargetStateError))
                                .context("sending error response")?;
                            return Ok(());
                        }
                    };
                let mut response = target_state
                    .rcs
                    .as_mut()
                    .unwrap()
                    .copy_to_channel(remote.into_channel())
                    .map_err(|_| DaemonError::RcsConnectionError);
                responder.send(&mut response).context("error sending response")?;
            }
            DaemonRequest::Quit { responder } => {
                if !quiet {
                    log::info!("Received quit request.");
                }
                responder.send(true).context("error sending response")?;

                task::sleep(std::time::Duration::from_millis(10)).await;

                match std::fs::remove_file(SOCKET) {
                    Ok(()) => {}
                    Err(e) => log::error!("failed to remove socket file: {}", e),
                }

                let targets = self.target_collection.targets().await;

                for t in targets.iter() {
                    let t = t.clone();

                    let state = t.state.lock().await;
                    let mut child_lock = state.host_pipe.lock().await;
                    if let Some(mut child) = child_lock.take() {
                        child.kill()?;
                    }
                }

                std::process::exit(0);
            }
        }
        Ok(())
    }
}

////////////////////////////////////////////////////////////////////////////////
// Overnet Server implementation

async fn next_request(
    stream: &mut ServiceProviderRequestStream,
) -> Result<Option<ServiceProviderRequest>, Error> {
    Ok(stream.try_next().await.context("error running service provider server")?)
}

async fn exec_server(daemon: Daemon, quiet: bool) -> Result<(), Error> {
    let (s, p) = fidl::Channel::create().context("failed to create zx channel")?;
    let chan = fidl::AsyncChannel::from_channel(s).context("failed to make async channel")?;
    let mut stream = ServiceProviderRequestStream::from_channel(chan);
    hoist::publish_service(DaemonMarker::NAME, ClientEnd::new(p))?;
    while let Some(ServiceProviderRequest::ConnectToService {
        chan,
        info: _,
        control_handle: _control_handle,
    }) = next_request(&mut stream).await?
    {
        if !quiet {
            log::trace!("Received service request for service");
        }
        let chan =
            fidl::AsyncChannel::from_channel(chan).context("failed to make async channel")?;
        let daemon_clone = daemon.clone();
        spawn(async move {
            daemon_clone
                .handle_requests_from_stream(DaemonRequestStream::from_channel(chan), quiet)
                .await
                .unwrap_or_else(|err| panic!("fatal error handling request: {:?}", err));
        });
    }
    Ok(())
}

////////////////////////////////////////////////////////////////////////////////
// start

pub fn is_daemon_running() -> bool {
    // Try to connect directly to the socket. This will fail if nothing is listening on the other side
    // (even if the path exists).
    match std::os::unix::net::UnixStream::connect(SOCKET) {
        Ok(_) => true,
        Err(_) => false,
    }
}

pub async fn start() -> Result<(), Error> {
    if is_daemon_running() {
        return Ok(());
    }
    setup_logger("ffx.daemon").await;
    onet::start_ascendd().await;
    let daemon = Daemon::new().await?;
    exec_server(daemon, true).await
}

////////////////////////////////////////////////////////////////////////////////
// tests

#[cfg(test)]
mod test {
    use super::*;
    use crate::target::TargetState;
    use chrono::Utc;
    use fidl::endpoints::create_proxy;
    use fidl_fuchsia_developer_bridge::DaemonMarker;
    use fidl_fuchsia_developer_remotecontrol::{
        RemoteControlMarker, RemoteControlProxy, RemoteControlRequest,
    };
    use fidl_fuchsia_overnet_protocol::NodeId;
    use std::collections::HashSet;

    struct TestHookFakeRCS {
        ready_channel: mpsc::UnboundedSender<bool>,
    }

    impl TestHookFakeRCS {
        pub fn new(ready_channel: mpsc::UnboundedSender<bool>) -> Self {
            Self { ready_channel }
        }
    }

    #[async_trait]
    impl DiscoveryHook for TestHookFakeRCS {
        async fn on_new_target(&self, target: &Arc<Target>, _tc: &Arc<TargetCollection>) {
            let mut target_state = target.state.lock().await;
            target_state.rcs = match &target_state.rcs {
                Some(_) => panic!("fake RCS should be set at most once"),
                None => Some(RCSConnection::new_with_proxy(
                    setup_fake_target_service(),
                    &NodeId { id: 0u64 },
                )),
            };
            self.ready_channel.unbounded_send(true).unwrap();
        }
    }

    struct TargetControlChannels {
        target_ready_channel: mpsc::UnboundedReceiver<bool>,
        target_detected_channel: mpsc::UnboundedSender<Target>,
    }

    impl TargetControlChannels {
        pub async fn send_target(&mut self, t: Target) {
            self.target_detected_channel.unbounded_send(t).unwrap();
            assert!(self.next_target_ready().await);
        }

        pub async fn next_target_ready(&mut self) -> bool {
            self.target_ready_channel.next().await.unwrap()
        }
    }

    async fn spawn_daemon_server_with_target_ctrl(
        stream: DaemonRequestStream,
    ) -> TargetControlChannels {
        let (target_in, target_out) = mpsc::unbounded::<Target>();
        let (target_ready_channel_in, target_ready_channel_out) = mpsc::unbounded::<bool>();
        spawn(async move {
            let mut d = Daemon::new_with_rx(target_out);
            d.register_hook(TestHookFakeRCS::new(target_ready_channel_in)).await;
            d.handle_requests_from_stream(stream, false)
                .await
                .unwrap_or_else(|err| panic!("Fatal error handling request: {:?}", err));
        });

        TargetControlChannels {
            target_ready_channel: target_ready_channel_out,
            target_detected_channel: target_in,
        }
    }

    async fn spawn_daemon_server_with_fake_target(
        stream: DaemonRequestStream,
    ) -> TargetControlChannels {
        let mut res = spawn_daemon_server_with_target_ctrl(stream).await;
        res.send_target(Target::new("foobar", Utc::now())).await;
        res
    }

    fn setup_fake_target_service() -> RemoteControlProxy {
        let (proxy, mut stream) =
            fidl::endpoints::create_proxy_and_stream::<RemoteControlMarker>().unwrap();

        spawn(async move {
            while let Ok(req) = stream.try_next().await {
                match req {
                    Some(RemoteControlRequest::StartComponent { responder, .. }) => {
                        let _ = responder.send(&mut Ok(())).context("sending ok response");
                    }
                    _ => assert!(false),
                }
            }
        });

        proxy
    }

    #[test]
    fn test_echo() {
        let echo = "test-echo";
        let (daemon_proxy, stream) =
            fidl::endpoints::create_proxy_and_stream::<DaemonMarker>().unwrap();
        hoist::run(async move {
            let _ctrl = spawn_daemon_server_with_target_ctrl(stream).await;
            let echoed = daemon_proxy.echo_string(echo).await.unwrap();
            assert_eq!(echoed, echo);
        });
    }

    #[test]
    fn test_getting_rcs_multiple_targets() -> Result<(), Error> {
        let (daemon_proxy, stream) =
            fidl::endpoints::create_proxy_and_stream::<DaemonMarker>().unwrap();
        let (_, remote_server_end) = create_proxy::<RemoteControlMarker>()?;
        hoist::run(async move {
            let mut ctrl = spawn_daemon_server_with_fake_target(stream).await;
            ctrl.send_target(Target::new("bazmumble", Utc::now())).await;
            match daemon_proxy.get_remote_control(remote_server_end).await.unwrap() {
                Ok(_) => panic!("failure expected for multiple targets"),
                _ => (),
            }
        });

        Ok(())
    }

    #[test]
    fn test_list_targets() -> Result<(), Error> {
        let (daemon_proxy, stream) =
            fidl::endpoints::create_proxy_and_stream::<DaemonMarker>().unwrap();
        hoist::run(async move {
            let mut ctrl = spawn_daemon_server_with_fake_target(stream).await;
            ctrl.send_target(Target::new("baz", Utc::now())).await;
            ctrl.send_target(Target::new("quux", Utc::now())).await;
            let res = daemon_proxy.list_targets("").await.unwrap();

            // TODO(awdavies): This check is in lieu of having an
            // established format for the list_targets output.
            assert!(res.contains("foobar"));
            assert!(res.contains("baz"));
            assert!(res.contains("quux"));

            let res = daemon_proxy.list_targets("mlorp").await.unwrap();
            assert!(!res.contains("foobar"));
            assert!(!res.contains("baz"));
            assert!(!res.contains("quux"));
        });
        Ok(())
    }

    #[test]
    fn test_quit() -> Result<(), Error> {
        let (daemon_proxy, stream) =
            fidl::endpoints::create_proxy_and_stream::<DaemonMarker>().unwrap();

        if std::path::Path::new(SOCKET).is_file() {
            std::fs::remove_file(SOCKET).unwrap();
        }

        hoist::run(async move {
            let mut _ctrl = spawn_daemon_server_with_fake_target(stream).await;
            let r = daemon_proxy.quit().await.unwrap();

            assert!(r);

            assert!(!std::path::Path::new(SOCKET).is_file());
        });

        Ok(())
    }

    struct TestHookFirst {
        callbacks_done: mpsc::UnboundedSender<bool>,
    }

    #[async_trait]
    impl DiscoveryHook for TestHookFirst {
        async fn on_new_target(&self, target: &Arc<Target>, tc: &Arc<TargetCollection>) {
            // This will crash if the target isn't already inserted.
            let t = tc.get(target.nodename.clone().into()).await.unwrap().clone();
            assert_eq!(t.nodename, "nothin");
            assert_eq!(*t.state.lock().await, TargetState::new());
            assert_eq!(*t.addrs.lock().await, HashSet::new());
            self.callbacks_done.unbounded_send(true).unwrap();
        }
    }

    struct TestHookSecond {
        callbacks_done: mpsc::UnboundedSender<bool>,
    }

    #[async_trait]
    impl DiscoveryHook for TestHookSecond {
        async fn on_new_target(&self, _target: &Arc<Target>, _tc: &Arc<TargetCollection>) {
            self.callbacks_done.unbounded_send(true).unwrap();
        }
    }

    #[test]
    fn test_receive_target() {
        hoist::run(async move {
            let (tx_from_callback, mut rx_from_callback) = mpsc::unbounded::<bool>();
            let (tx, rx) = mpsc::unbounded::<Target>();
            let mut daemon = Daemon::new_with_rx(rx);
            daemon.register_hook(TestHookFirst { callbacks_done: tx_from_callback.clone() }).await;
            daemon.register_hook(TestHookSecond { callbacks_done: tx_from_callback }).await;
            tx.unbounded_send(Target::new("nothin", Utc::now())).unwrap();
            assert!(rx_from_callback.next().await.unwrap());
            assert!(rx_from_callback.next().await.unwrap());
        });
    }
}
