#!/usr/bin/env python3.8
# Copyright 2020 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import os
import sys
import unittest
from io import StringIO

import test_env
from lib import args
from factory_fake import FakeFactory
from process_fake import FakeProcess


class TestCaseWithIO(unittest.TestCase):

    # Unit test "constructor" and "destructor"

    def setUp(self):
        sys.stdin = StringIO()
        self._stdout = StringIO()
        self._stderr = StringIO()

    def tearDown(self):
        sys.stdin = sys.__stdin__

    # Unit test utilities

    def set_input(self, *lines):
        sys.stdin.truncate(0)
        sys.stdin.seek(0)
        for line in lines:
            sys.stdin.write(line)
            sys.stdin.write('\n')
        sys.stdin.flush()
        sys.stdin.seek(0)

    # Unit test assertions

    def _dump_io(self, io):
        data = io.getvalue()
        io.truncate(0)
        io.seek(0)
        return data

    def _assert_io_equals(self, io, lines, n=-1):
        self.assertEqual(self._dump_io(io).split('\n')[:n], list(lines))

    def assertOut(self, lines, n=-1):
        """Checks that 'n' lines of stdout match 'lines'.

        Calling this method resets stdout. If 'n' is omitted all lines are
        checked. If 'n' is 0 and 'lines' is [], it passes trivially, but still
        resets stdout.
        """
        self._assert_io_equals(self._stdout, lines, n)

    def assertErr(self, lines, n=-1):
        """Checks that 'n' lines of stderr match 'lines'.

        Calling this method resets stdout. If 'n' is omitted all lines are
        checked. If 'n' is 0 and 'lines' is [], it passes trivially, but still
        resets stderr.
        """
        self._assert_io_equals(self._stderr, lines, n)

    def assertOutContains(self, *strings):
        """Check that all 'strings' are contained in the stdout. Resets stdout."""
        output = self._dump_io(self._stdout)
        for s in strings:
            self.assertIn(s, output)


class TestCaseWithFactory(TestCaseWithIO):
    """TestCase that provides common test context, utilities, and assertions."""

    # Unit test "constructor"

    def setUp(self):
        super(TestCaseWithFactory, self).setUp()
        self._factory = None

    # Unit test context, as aliases to the Factory.

    @property
    def factory(self):
        """The associated FakeFactory object."""
        if not self._factory:
            self._factory = FakeFactory()
            self.host.fd_out = self._stdout
            self.host.fd_err = self._stderr
        return self._factory

    @property
    def host(self):
        """The associated Host object."""
        return self.factory.host

    @property
    def parser(self):
        """The associated ArgParser object."""
        return self.factory.parser

    @property
    def buildenv(self):
        """The associated BuildEnv object."""
        return self.factory.buildenv

    @property
    def device(self):
        """The associated Device object."""
        return self.factory.device

    # Unit test utilities

    def _ssh_cmd(self, args):
        """Returns the command line arguments for an SSH commaned."""
        return ['ssh'] + self.device.ssh_opts() + [self.device.addr] + args

    def _scp_cmd(self, args):
        return ['scp'] + self.device.ssh_opts() + args

    def get_process(self, args, ssh=False):
        cmd = self._ssh_cmd(args) if ssh else args
        return self.host.create_process(cmd)

    def parse_args(self, *args):
        return self.parser.parse_args(args)

    def set_outputs(
            self,
            args,
            outputs,
            start=None,
            end=None,
            returncode=None,
            reset=True,
            ssh=False):
        """Sets what will be returned from the stdout and return code of a fake
        process.

        Providing a start and/or end will schedule the output to be added and/or
        removed, respectively, at a later time; see FakeProcess.schedule.
        Setting reset to True will replace any existing output for the command.
        Setting ssh to true will automatically add the necessary SSH arguments.
        """
        process = self.get_process(args, ssh=ssh)
        if reset:
            process.clear()
        process.schedule(
            '\n'.join(outputs), start=start, end=end, returncode=returncode)

    def set_running(self, url, refresh=True, duration=None):
        """Marks a packaged executable as running on device.

        If refresh is True, this will cause the device to refresh its URLs.
        If a duration is provided, the component will stop running
        after the given duration.
        """
        cmd = ['cs info']
        output = 'URL: {}'.format(url)
        end = None if not duration else self.host.elapsed + duration
        self.set_outputs(cmd, [output], end=end, reset=False, ssh=True)
        if refresh:
            self.device.has_cs_info(url, refresh)

    def touch_on_device(
            self, pathname, start=None, end=None, reset=False, size=1000):
        """Prepares the 'ls' response for a file and its parent directory."""
        parts = pathname.split('/')
        dirname = '/'.join(parts[:-1])
        output = '-rw-r--r-- 1 0 0 {} Dec 25 12:34 {}'.format(size, parts[-1])
        self.set_outputs(
            ['ls', '-l', dirname], [output],
            start=start,
            end=end,
            reset=reset,
            ssh=True)
        self.set_outputs(
            ['ls', '-l', pathname], [output],
            start=start,
            end=end,
            reset=reset,
            ssh=True)

    def symbolize_cmd(self):
        cmd = [
            self.buildenv.symbolizer_exec, '-llvm-symbolizer',
            self.buildenv.llvm_symbolizer
        ]
        for build_id_dir in self.buildenv.build_id_dirs:
            cmd += ['-build-id-dir', build_id_dir]
        return cmd

    def infra_testrunner_cmd(self, out_dir, test_file):
        cmd = [os.path.join(self.buildenv.build_dir, 'host_x64', 'testrunner')] \
            + ['-out-dir', out_dir] \
            + ['-use-runtests', '-per-test-timeout', '600s'] \
            + [test_file]
        return cmd

    # Unit test assertions

    def assertLogged(self, *logs):
        """Asserts logs were generated by calls to host.echo or host.error."""
        self.assertOut(logs)

    def assertError(self, expr, *logs):
        assert logs, 'Missing error message.'
        logs = ['ERROR: {}'.format(logs[0])
               ] + ['       {}'.format(log) for log in logs[1:]]
        with self.assertRaises(SystemExit):
            expr()
        self.assertErr(logs)

    def assertRan(self, *args):
        """Asserts a previous call was made to host.create_process."""
        self.assertIn(' '.join(args), self.host.processes.keys())

    def assertScpTo(self, *args):
        """Asserts a previous call was made to device.scp with args."""
        args = list(args)[:-1] + [self.device.scp_rpath(args[-1])]
        cmd = self._scp_cmd(args)
        self.assertRan(*cmd)

    def assertScpFrom(self, *args):
        """Asserts a previous call was made to device.scp with args."""
        args = [self.device.scp_rpath(arg) for arg in args[:-1]] + [args[-1]]
        cmd = self._scp_cmd(args)
        self.assertRan(*cmd)

    def assertSsh(self, *args):
        """Asserts a previous call was made to device.ssh with cmd."""
        cmd = self._ssh_cmd(list(args))
        self.assertRan(*cmd)


class TestCaseWithFuzzer(TestCaseWithFactory):

    # Unit test "constructor"

    def setUp(self):
        super(TestCaseWithFuzzer, self).setUp()
        self.create_fuzzer('check', 'fake-package1/fake-target1')

    def tearDown(self):
        super(TestCaseWithFuzzer, self).tearDown()
        self._fuzzer = None

    # Unit test context.

    @property
    def fuzzer(self):
        """The most recently created Fuzzer object."""
        assert self._fuzzer, 'No fuzzer created.'
        return self._fuzzer

    @property
    def corpus(self):
        return self.fuzzer.corpus

    @property
    def dictionary(self):
        return self.fuzzer.dictionary

    @property
    def ns(self):
        return self.fuzzer.ns

    # Unit test utilities

    def create_fuzzer(self, *args, **kwargs):
        resolve = kwargs.pop('resolve', True)
        include_tests = kwargs.pop('include_tests', False)
        assert not kwargs, 'Unexpected keyword argument(s): {}'.format(kwargs)
        args = self.parse_args(*args)
        self._fuzzer = self.factory.create_fuzzer(
            args, include_tests=include_tests)
        if resolve:
            self.resolve_fuzzer()
        else:
            base_cmx = self.ns.base_abspath(
                'meta/{}.cmx'.format(self.fuzzer.executable))
            cmd = ['test', '-f', base_cmx]
            process = self.get_process(cmd, ssh=True)
            process.succeeds = False
        self.create_log()
        return self.fuzzer

    def resolve_fuzzer(self):
        cmd = ['pkgctl', 'pkg-status', self.fuzzer.package_url]
        merkle = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'
        package_path = '/pkgfs/versions/{}'.format(merkle)
        self.set_outputs(
            cmd, [
                'Package in registered TUF repo: yes (merkle={})'.format(
                    merkle),
                'Package on disk: yes (path={})'.format(package_path)
            ],
            ssh=True)

    def create_log(self, start=None, end=None):
        self.touch_on_device(
            self.ns.data_abspath('fuzz-[0-9].log'),
            start=start,
            end=end,
            reset=True)
