#!/usr/bin/env fuchsia-vendored-python
# Copyright 2022 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Functions for working with remotetool.

This is a library and standalone executable script.
"""

import dataclasses
import difflib
import os
import shlex
import subprocess
import sys

from pathlib import Path
from typing import Any, Dict, Iterable, Sequence, Tuple

import fuchsia
import cl_utils

_SCRIPT_BASENAME = Path(__file__).name
_SCRIPT_DIR = Path(__file__).parent
PROJECT_ROOT = fuchsia.project_root_dir()
PROJECT_ROOT_REL = cl_utils.relpath(PROJECT_ROOT, start=os.curdir)

_REPROXY_CFG = _SCRIPT_DIR / "fuchsia-reproxy.cfg"

_HOST_REMOTETOOL = fuchsia.RECLIENT_BINDIR / 'remotetool'

class ParseError(ValueError):

    def __init__(self, msg: str):
        super().__init__(msg)


def _must_partition_string(text: str, sep: str) -> Tuple[str, str]:
    before, found_sep, after = text.partition(sep)
    if found_sep != sep:
        raise ParseError(
            f"Expected but failed to find \"{sep}\" in text \"{text}\".")
    return before, after


def _must_partition_sequence(seq: Sequence[str],
                             sep: str) -> Tuple[Sequence[str], Sequence[str]]:
    before, found_sep, after = cl_utils.partition_sequence(seq, sep)
    if found_sep != sep:
        raise ParseError(f"Expected but failed to find line == \"{sep}\".")
    return before, after


def _parse_input_digest(line: str) -> Tuple[Path, str]:
    path, right = _must_partition_string(line, ':')
    ignored, right = _must_partition_string(right, ':')
    digest = right.lstrip().rstrip(']')
    return Path(path), digest


def _parse_output_digest(line: str) -> Tuple[Path, str]:
    path, right = _must_partition_string(line, ',')
    ignored, right = _must_partition_string(right, ':')
    digest = right.lstrip()
    return Path(path), digest


# TODO: move this to a library for abstract data operations
class DictionaryDiff(object):
    """Representation of a difference between dictionaries."""

    def __init__(self, left: Dict[Any, Any], right: Dict[Any, Any]):
        left_keys = set(left.keys())
        right_keys = set(right.keys())
        self._left_only = {k: left[k] for k in left_keys - right_keys}
        self._right_only = {k: right[k] for k in right_keys - left_keys}
        common_keys = left_keys.intersection(right_keys)
        paired = [(k, (left[k], right[k])) for k in common_keys]
        self._value_diffs = {k: (v[0], v[1]) for k, v in paired if v[0] != v[1]}
        self._matches = {k: v[0] for k, v in paired if v[0] == v[1]}

    @property
    def left_only(self) -> Dict[Any, Any]:
        return self._left_only

    @property
    def right_only(self) -> Dict[Any, Any]:
        return self._right_only

    @property
    def value_diffs(self) -> Dict[Any, Tuple[Any, Any]]:
        return self._value_diffs

    @property
    def matches(self) -> Dict[Any, Any]:
        return self._matches

    def report(self) -> Iterable[str]:
        yield from (f"left only: {k}: {v}" for k, v in self.left_only.items())
        yield from (f"right only: {k}: {v}" for k, v in self.right_only.items())
        yield from (
            f"different values: {k}: {v1} vs. {v2}"
            for k, (v1, v2) in self.value_diffs.items())


@dataclasses.dataclass
class ActionDifferences(object):
    command_unified_diffs: Sequence[str]
    input_diffs: DictionaryDiff
    platform_diffs: DictionaryDiff


@dataclasses.dataclass
class ShowActionResult(object):
    """Structured representation of `remotetool --operation show_action`."""

    command: Sequence[str]
    platform: Dict[str, str]
    inputs: Dict[Path, str]
    output_files: Dict[Path, str]

    def __eq__(self, other) -> bool:
        return self.command == other.command and self.platform == other.platform and self.inputs == other.inputs and self.output_files == other.output_files

    def diff(self, other: 'ShowActionResult') -> ActionDifferences:
        return ActionDifferences(
            command_unified_diffs=list(
                difflib.unified_diff(
                    self.command,
                    other.command,
                    fromfile="left action",
                    tofile="right action",
                )),
            input_diffs=DictionaryDiff(self.inputs, other.inputs),
            platform_diffs=DictionaryDiff(self.platform, other.platform),
        )


def parse_show_action_output(lines: Iterable[str]) -> ShowActionResult:
    """Parse the results of `remotetool --operation show_action`.

    Returns:
      ShowActionResult, containing command, inputs, outputs.

    Raises:
      ParseError if there are any parsing errors.
    """
    stripped_lines = [line.rstrip() for line in lines]
    preamble, remainder = _must_partition_sequence(stripped_lines, 'Command')
    command_section, remainder = _must_partition_sequence(remainder, 'Platform')
    platform_section, remainder = _must_partition_sequence(remainder, 'Inputs')

    # Could see 'Action Result' or 'No action result in cache.'.
    inputs_section, result_sep, remainder = cl_utils.partition_sequence(remainder, 'Action Result')

    # command_section has:
    # [0] '======='
    # [1] 'Command Digest: ...'
    # [2] <the command in one long line>
    # [3] <blank line>
    command = shlex.split(command_section[2].lstrip())

    # platform_section has:
    # [0] '======='
    # [1:] key=value (repeated)
    platform_parameters = {
        k: v for k, _, v in (
            line.lstrip().partition('=') for line in platform_section[1:-1])
    }

    # inputs_section has:
    # [0] '======='
    # [1] '[Root directory digest: ...]
    # [2:] <path>: [File digest: <digest>]
    # [-2] <blank line>
    # [-1] '------------------------------------------------------------------------'
    # Input paths are relative to the exec_root.
    # Inputs that come from other remote actions, might start with
    # 'set_by_reclient/a' (relative to exec_root, canonicalized).
    inputs = {
        k: v for k, v in (
            _parse_input_digest(line) for line in inputs_section[2:-2])
    }

    # result_section:
    # (not used)
    # This could also be absent with: 'No action result in cache.'

    output_files = dict()
    if result_sep:
        result_section, remainder = _must_partition_sequence(
            remainder, 'Output Files')
        output_files_section, remainder = _must_partition_sequence(
            remainder, 'Output Files From Directories')

        # output_files_section has:
        # [0] '======='
        # [1:] <path>: [File digest: <digest>]
        # [-1] <blank line>
        # Output file paths are relative to the working_dir (not exec_root).
        output_files = {
            k: v for k, v in (
                _parse_output_digest(line) for line in output_files_section[1:-1])
        }

    return ShowActionResult(
        command=command,
        inputs=inputs,
        output_files=output_files,
        platform=platform_parameters,
    )


def read_config_file_lines(lines: Iterable[str]) -> Dict[str, str]:
    """Generic parser for reading flag files.

    Args:
      lines: lines of config from a file

    Returns:
      dictionary of key-value pairs read from the config file.
    """
    result = {}
    for line in lines:
        stripped = line.strip()
        if stripped and not stripped.startswith('#'):
            key, sep, value = stripped.partition('=')
            if sep == '=':
                result[key] = value
    return result


class RemoteTool(object):

    def __init__(self, reproxy_cfg: Path):
        self._reproxy_cfg = reproxy_cfg

    def __eq__(self, other) -> bool:
        return self.config == other.config

    @property
    def config(self) -> Dict[str, str]:
        return self._reproxy_cfg

    def run(
        self,
        args: Sequence[str],
        show_command: bool = False,
        **kwargs,
    ) -> cl_utils.SubprocessResult:
        """Runs `remotetool` using the given reproxy configuration.

        Args:
          args: remotetool command-line arguments
          show_command: if True, print the full remotetool command before executing.
          **kwargs: additional arguments forwarded to cl_utils.subprocess_call.

        Returns:
          SubprocessResult with exit code and captured stdout.
        """
        service = self.config["service"]
        instance = self.config["instance"]
        auto_args = [
            f'--service={service}',
            f'--instance={instance}',
        ]

        # Infra builds use GCE credentials instead of ADC.
        gce_creds = os.environ.get("RBE_use_gce_credentials", None)
        if gce_creds:
            auto_args.append(f'--use_gce_credentials={gce_creds}')
        else:
            use_adc = self.config.get("use_application_default_credentials", None)
            if use_adc:
                auto_args.append(f"--use_application_default_credentials={use_adc}")

        command = [
            str(PROJECT_ROOT_REL / _HOST_REMOTETOOL),
        ] + auto_args + args
        command_str = cl_utils.command_quoted_str(command)
        if show_command:
            print(command_str)

        result = cl_utils.subprocess_call(command, **kwargs)
        if result.returncode != 0:
            for line in result.stderr:
                print(line)
            raise subprocess.CalledProcessError(result.returncode, command)
        return result

    def show_action(self, digest: str, **kwargs) -> ShowActionResult:
        """Reads parameters of a remote action using `remotetool`.

        Args:
          digest: the hash/size of the action to lookup.
          **kwargs: arguments forwarded to cl_utils.subprocess_call.

        Returns:
          ShowActionResult describing command, inputs, outputs.
        """
        args = ['--operation', 'show_action', '--digest', digest]
        final_kwargs = kwargs
        final_kwargs["quiet"] = True
        result = self.run(args, **final_kwargs)
        return parse_show_action_output(result.stdout)

    def download_blob(
            self, path: Path, digest: str,
            **kwargs) -> cl_utils.SubprocessResult:
        """Downloads a remote artifact.

        Args:
          path: location of output to download.
          digest: the hash/size of the blob to retrieve.
          **kwargs: arguments forwarded to cl_utils.subprocess_call.

        Returns:
          return code of remotetool
        """
        # TODO: use filelock.FileLock to guard against concurrent conflicts
        args = [
            '--operation', 'download_blob', '--digest', digest, '--path',
            str(path)
        ]
        final_kwargs = kwargs
        final_kwargs["quiet"] = True
        return self.run(args, **final_kwargs)

    def download_dir(
            self, path: Path, digest: str,
            **kwargs) -> cl_utils.SubprocessResult:
        """Downloads a remote directory of artifacts.

        Args:
          path: location of output to download.
          digest: the hash/size of the dir to retrieve.
          **kwargs: arguments forwarded to cl_utils.subprocess_call.

        Returns:
          return code of remotetool
        """
        # TODO: use filelock.FileLock to guard against concurrent conflicts
        args = [
            '--operation', 'download_dir', '--digest', digest, '--path',
            str(path)
        ]
        final_kwargs = kwargs
        final_kwargs["quiet"] = True
        return self.run(args, **final_kwargs)


def main(argv: Sequence[str]) -> int:
    with open(_REPROXY_CFG) as cfg:
        tool = RemoteTool(read_config_file_lines(cfg))
    result = tool.run(argv, show_command=True, quiet=False)
    return result.returncode


if __name__ == "__main__":
    sys.exit(main(sys.argv[1:]))
