// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.

use super::error::StreamError;
use super::message::{Message, MAX_DATAGRAM_LEN};
use super::stats::LogStreamStats;
use crate::container::ComponentIdentity;
use fuchsia_async as fasync;
use fuchsia_zircon as zx;
use futures::io::{self, AsyncReadExt};
use std::{marker::PhantomData, sync::Arc};

/// An `Encoding` is able to parse a `Message` from raw bytes.
pub trait Encoding {
    /// Attempt to parse a message from the given buffer
    fn parse_message(source: &ComponentIdentity, buf: &[u8]) -> Result<Message, StreamError>;
}

/// An encoding that can parse the legacy [logger/syslog wire format]
///
/// [logger/syslog wire format]: https://fuchsia.googlesource.com/fuchsia/+/HEAD/zircon/system/ulib/syslog/include/lib/syslog/wire_format.h
#[derive(Clone, Debug)]
pub struct LegacyEncoding;

/// An encoding that can parse the [structured log format]
///
/// [structured log format]: https://fuchsia.dev/fuchsia-src/development/logs/encodings
#[derive(Clone, Debug)]
pub struct StructuredEncoding;

impl Encoding for LegacyEncoding {
    fn parse_message(source: &ComponentIdentity, buf: &[u8]) -> Result<Message, StreamError> {
        Message::from_logger(source, buf)
    }
}

impl Encoding for StructuredEncoding {
    fn parse_message(source: &ComponentIdentity, buf: &[u8]) -> Result<Message, StreamError> {
        Message::from_structured(source, buf)
    }
}

#[must_use = "don't drop logs on the floor please!"]
pub struct LogMessageSocket<E> {
    source: Arc<ComponentIdentity>,
    stats: Arc<LogStreamStats>,
    socket: fasync::Socket,
    buffer: [u8; MAX_DATAGRAM_LEN],
    _encoder: PhantomData<E>,
}

impl LogMessageSocket<LegacyEncoding> {
    /// Creates a new `LogMessageSocket` from the given `socket` that reads the legacy format.
    pub fn new(
        socket: zx::Socket,
        source: Arc<ComponentIdentity>,
        stats: Arc<LogStreamStats>,
    ) -> Result<Self, io::Error> {
        Ok(Self {
            socket: fasync::Socket::from_socket(socket)?,
            buffer: [0; MAX_DATAGRAM_LEN],
            source,
            stats,
            _encoder: PhantomData,
        })
    }
}

impl LogMessageSocket<StructuredEncoding> {
    /// Creates a new `LogMessageSocket` from the given `socket` that reads the structured log
    /// format.
    pub fn new_structured(
        socket: zx::Socket,
        source: Arc<ComponentIdentity>,
        stats: Arc<LogStreamStats>,
    ) -> Result<Self, io::Error> {
        Ok(Self {
            socket: fasync::Socket::from_socket(socket)?,
            buffer: [0; MAX_DATAGRAM_LEN],
            source,
            stats,
            _encoder: PhantomData,
        })
    }
}

impl<E> LogMessageSocket<E>
where
    E: Encoding + Unpin,
{
    pub async fn next(&mut self) -> Result<Message, StreamError> {
        let len = self.socket.read(&mut self.buffer).await?;

        if len == 0 {
            return Err(StreamError::Closed);
        }

        let msg_bytes = &self.buffer[..len];
        let message = E::parse_message(&self.source, msg_bytes)?;
        Ok(message.with_stats(&self.stats))
    }
}

#[cfg(test)]
mod tests {
    use super::super::message::{
        fx_log_packet_t, LogsField, LogsHierarchy, LogsProperty, Message, Severity, METADATA_SIZE,
        TEST_IDENTITY,
    };
    use super::*;
    use diagnostics_stream::{
        encode::Encoder, Argument, Record, Severity as StreamSeverity, Value,
    };
    use std::io::Cursor;

    #[fasync::run_until_stalled(test)]
    async fn logger_stream_test() {
        let (sin, sout) = zx::Socket::create(zx::SocketOpts::DATAGRAM).unwrap();
        let mut packet: fx_log_packet_t = Default::default();
        packet.metadata.pid = 1;
        packet.metadata.severity = 0x30; // INFO
        packet.data[0] = 5;
        packet.fill_data(1..6, 'A' as _);
        packet.fill_data(7..12, 'B' as _);

        let mut ls =
            LogMessageSocket::new(sout, TEST_IDENTITY.clone(), Default::default()).unwrap();
        sin.write(packet.as_bytes()).unwrap();
        let expected_p = Message::new(
            zx::Time::from_nanos(packet.metadata.time),
            Severity::Info,
            METADATA_SIZE + 6 /* tag */+ 6, /* msg */
            packet.metadata.dropped_logs as u64,
            &*TEST_IDENTITY,
            LogsHierarchy::new(
                "root",
                vec![
                    LogsProperty::Uint(LogsField::ProcessId, packet.metadata.pid),
                    LogsProperty::Uint(LogsField::ThreadId, packet.metadata.tid),
                    LogsProperty::String(LogsField::Tag, "AAAAA".to_string()),
                    LogsProperty::String(LogsField::Msg, "BBBBB".to_string()),
                ],
                vec![],
            ),
        );

        let result_message = ls.next().await.unwrap();
        assert_eq!(result_message, expected_p);

        // write one more time
        sin.write(packet.as_bytes()).unwrap();

        let result_message = ls.next().await.unwrap();
        assert_eq!(result_message, expected_p);
    }

    #[fasync::run_until_stalled(test)]
    async fn structured_logger_stream_test() {
        let (sin, sout) = zx::Socket::create(zx::SocketOpts::DATAGRAM).unwrap();
        let timestamp = 107;
        let record = Record {
            timestamp,
            severity: StreamSeverity::Fatal,
            arguments: vec![
                Argument { name: "key".to_string(), value: Value::Text("value".to_string()) },
                Argument { name: "tag".to_string(), value: Value::Text("tag-a".to_string()) },
            ],
        };
        let mut buffer = Cursor::new(vec![0u8; 1024]);
        let mut encoder = Encoder::new(&mut buffer);
        encoder.write_record(&record).unwrap();
        let encoded = &buffer.get_ref()[..buffer.position() as usize];

        let expected_p = Message::new(
            timestamp,
            Severity::Fatal,
            encoded.len(),
            0, // dropped logs
            &*TEST_IDENTITY,
            LogsHierarchy::new(
                "root",
                vec![
                    LogsProperty::String(LogsField::Other("key".to_string()), "value".to_string()),
                    LogsProperty::String(LogsField::Tag, "tag-a".to_string()),
                ],
                vec![],
            ),
        );

        let mut stream =
            LogMessageSocket::new_structured(sout, TEST_IDENTITY.clone(), Default::default())
                .unwrap();

        sin.write(encoded).unwrap();
        let result_message = stream.next().await.unwrap();
        assert_eq!(result_message, expected_p);

        // write again
        sin.write(encoded).unwrap();
        let result_message = stream.next().await.unwrap();
        assert_eq!(result_message, expected_p);
    }
}
