// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#![recursion_limit = "512"]

mod id;
mod interrupter;
mod proxies;
mod services;
#[cfg(test)]
mod test;

use self::services::{
    active_session::ActiveSession,
    discovery::{filter::Filter, Discovery},
    publisher::Publisher,
};
use anyhow::{Context, Error};
use fidl::endpoints::{create_endpoints, create_request_stream};
use fidl_fuchsia_media::UsageReporterMarker;
use fidl_fuchsia_media_sessions2::*;
use fuchsia_async as fasync;
use fuchsia_component::{client, server::ServiceFs};
use fuchsia_inspect::component;
use futures::{channel::mpsc, prelude::*};
use std::sync::Arc;
use tracing::warn;

type Result<T> = std::result::Result<T, Error>;
type SessionId = u64;

const CHANNEL_BUFFER_SIZE: usize = 100;

/// This number should be forgiving. If we lower it we may want to build some
/// in-process staging area for changes so we can send them to clients that ACK
/// late. At 20 though, clients that don't ACK can't reasonably expect to be
/// accommodated.
const MAX_EVENTS_SENT_WITHOUT_ACK: usize = 20;

fn spawn_log_error(fut: impl Future<Output = Result<()>> + 'static) {
    fasync::Task::local(fut.unwrap_or_else(|err| warn!(%err))).detach()
}

#[fuchsia::main(logging_tags = ["mediasession"])]
async fn main() {
    tracing::info!("Initializing Fuchsia Media Session Service");

    let mut server = ServiceFs::new_local();
    let inspector = component::inspector();
    let player_list = Arc::new(inspector.root().create_child("players"));

    let (player_sink, player_stream) = mpsc::channel(CHANNEL_BUFFER_SIZE);
    let (discovery_request_sink, discovery_request_stream) = mpsc::channel(CHANNEL_BUFFER_SIZE);
    let (observer_request_sink, observer_request_stream) = mpsc::channel(CHANNEL_BUFFER_SIZE);

    let usage_reporter_proxy =
        client::connect_to_protocol::<UsageReporterMarker>().expect("Connecting to UsageReporter");
    let discovery = Discovery::new(player_stream, usage_reporter_proxy);
    let sessions_info_stream = discovery.sessions_info_stream(Filter::default());
    spawn_log_error(discovery.serve(discovery_request_stream, observer_request_stream));

    let internal_discovery_request_sink = discovery_request_sink.clone();
    let connect_to_session = move |session_id| {
        let (discovery, request_stream) = create_request_stream::<DiscoveryMarker>()?;
        let discovery = discovery.into_proxy()?;
        let (session, session_request) = create_endpoints();
        discovery.connect_to_session(session_id, session_request)?;

        let discovery_request_sink = internal_discovery_request_sink.clone().sink_err_into();
        spawn_log_error(request_stream.err_into().forward(discovery_request_sink));

        Ok(session)
    };

    let (active_session_service, active_session_client_sink) =
        ActiveSession::new(sessions_info_stream, connect_to_session)
            .expect("Creating active session service");
    spawn_log_error(active_session_service.serve());

    server
        .dir("svc")
        .add_fidl_service(move |request_stream| {
            spawn_log_error(
                Publisher::new(player_sink.clone(), player_list.clone()).serve(request_stream),
            )
        })
        .add_fidl_service(move |request_stream: DiscoveryRequestStream| {
            let discovery_request_sink = discovery_request_sink.clone().sink_err_into();
            spawn_log_error(request_stream.err_into().forward(discovery_request_sink));
        })
        .add_fidl_service(move |request_stream: ObserverDiscoveryRequestStream| {
            let observer_request_sink = observer_request_sink.clone().sink_err_into();
            spawn_log_error(request_stream.err_into().forward(observer_request_sink));
        })
        .add_fidl_service(move |request_stream: ActiveSessionRequestStream| {
            let mut active_session_client_sink = active_session_client_sink.clone();
            spawn_log_error(async move {
                Ok(active_session_client_sink
                    .send(request_stream)
                    .await
                    .context("Sending new client to Active Session service")?)
            });
        });
    let _inspect_server_task =
        inspect_runtime::publish(&inspector, inspect_runtime::PublishOptions::default());

    server.take_and_serve_directory_handle().expect("To serve Media Session services");
    server.collect::<()>().await;
}
