# Copyright 2016 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from builtins import str
from past.builtins import basestring

import logging
import psutil
import re
import subprocess
import time
import threading

from mobly import utils

# Command to use for running ADB commands.
ADB = 'adb'

# adb gets confused if we try to manage bound ports in parallel, so anything to
# do with port forwarding must happen under this lock.
ADB_PORT_LOCK = threading.Lock()

# Qualified class name of the default instrumentation test runner.
DEFAULT_INSTRUMENTATION_RUNNER = 'com.android.common.support.test.runner.AndroidJUnitRunner'

# Adb getprop call should never take too long.
DEFAULT_GETPROP_TIMEOUT_SEC = 5
DEFAULT_GETPROPS_ATTEMPTS = 3
DEFAULT_GETPROPS_RETRY_SLEEP_SEC = 1


class Error(Exception):
    """Base error type for adb proxy module."""


class AdbError(Error):
    """Raised when an adb command encounters an error.

    Args:
        cmd: list of strings, the adb command executed.
        stdout: byte string, the raw stdout of the command.
        stderr: byte string, the raw stderr of the command.
        ret_code: int, the return code of the command.
        serial: string, the serial of the device the command is executed on.
            This is an empty string if the adb command is not specific to a
            device.
    """

    def __init__(self, cmd, stdout, stderr, ret_code, serial=''):
        self.cmd = cmd
        self.stdout = stdout
        self.stderr = stderr
        self.ret_code = ret_code
        self.serial = serial

    def __str__(self):
        return ('Error executing adb cmd "%s". ret: %d, stdout: %s, stderr: %s'
                ) % (utils.cli_cmd_to_string(
                    self.cmd), self.ret_code, self.stdout, self.stderr)


class AdbTimeoutError(Error):
    """Raised when an command did not complete within expected time.

    Args:
        cmd: list of strings, the adb command that timed out
        timeout: float, the number of seconds passed before timing out.
        serial: string, the serial of the device the command is executed on.
            This is an empty string if the adb command is not specific to a
            device.
    """

    def __init__(self, cmd, timeout, serial=''):
        self.cmd = cmd
        self.timeout = timeout
        self.serial = serial

    def __str__(self):
        return 'Timed out executing command "%s" after %ss.' % (
            utils.cli_cmd_to_string(self.cmd), self.timeout)


def list_occupied_adb_ports():
    """Lists all the host ports occupied by adb forward.

    This is useful because adb will silently override the binding if an attempt
    to bind to a port already used by adb was made, instead of throwing binding
    error. So one should always check what ports adb is using before trying to
    bind to a port with adb.

    Returns:
        A list of integers representing occupied host ports.
    """
    out = AdbProxy().forward('--list')
    clean_lines = str(out, 'utf-8').strip().split('\n')
    used_ports = []
    for line in clean_lines:
        tokens = line.split(' tcp:')
        if len(tokens) != 3:
            continue
        used_ports.append(int(tokens[1]))
    return used_ports


class AdbProxy(object):
    """Proxy class for ADB.

    For syntactic reasons, the '-' in adb commands need to be replaced with
    '_'. Can directly execute adb commands on an object:
    >> adb = AdbProxy(<serial>)
    >> adb.start_server()
    >> adb.devices() # will return the console output of "adb devices".

    By default, command args are expected to be an iterable which is passed
    directly to subprocess.Popen():
    >> adb.shell(['echo', 'a', 'b'])

    This way of launching commands is recommended by the subprocess
    documentation to avoid shell injection vulnerabilities and avoid having to
    deal with multiple layers of shell quoting and different shell environments
    between different OSes.

    If you really want to run the command through the system shell, this is
    possible by supplying shell=True, but try to avoid this if possible:
    >> adb.shell('cat /foo > /tmp/file', shell=True)
    """

    def __init__(self, serial=''):
        self.serial = serial

    def _exec_cmd(self, args, shell, timeout, stderr):
        """Executes adb commands.

        Args:
            args: string or list of strings, program arguments.
                See subprocess.Popen() documentation.
            shell: bool, True to run this command through the system shell,
                False to invoke it directly. See subprocess.Popen() docs.
            timeout: float, the number of seconds to wait before timing out.
                If not specified, no timeout takes effect.
            stderr: a Byte stream, like io.BytesIO, stderr of the command will
                be written to this object if provided.

        Returns:
            The output of the adb command run if exit code is 0.

        Raises:
            ValueError: timeout value is invalid.
            AdbError: The adb command exit code is not 0.
            AdbTimeoutError: The adb command timed out.
        """
        if timeout and timeout <= 0:
            raise ValueError('Timeout is not a positive value: %s' % timeout)
        try:
            (ret, out, err) = utils.run_command(args,
                                                shell=shell,
                                                timeout=timeout)
        except psutil.TimeoutExpired:
            raise AdbTimeoutError(cmd=args,
                                  timeout=timeout,
                                  serial=self.serial)

        if stderr:
            stderr.write(err)
        logging.debug('cmd: %s, stdout: %s, stderr: %s, ret: %s',
                      utils.cli_cmd_to_string(args), out, err, ret)
        if ret == 0:
            return out
        else:
            raise AdbError(cmd=args,
                           stdout=out,
                           stderr=err,
                           ret_code=ret,
                           serial=self.serial)

    def _execute_and_process_stdout(self, args, shell, handler):
        """Executes adb commands and processes the stdout with a handler.

        Args:
            args: string or list of strings, program arguments.
                See subprocess.Popen() documentation.
            shell: bool, True to run this command through the system shell,
                False to invoke it directly. See subprocess.Popen() docs.
            handler: func, a function to handle adb stdout line by line.

        Returns:
            The stderr of the adb command run if exit code is 0.

        Raises:
            AdbError: The adb command exit code is not 0.
        """
        proc = subprocess.Popen(args,
                                stdout=subprocess.PIPE,
                                stderr=subprocess.PIPE,
                                shell=shell,
                                bufsize=1)
        out = '[elided, processed via handler]'
        try:
            # Even if the process dies, stdout.readline still works
            # and will continue until it runs out of stdout to process.
            while True:
                line = proc.stdout.readline()
                if line:
                    handler(line)
                else:
                    break
        finally:
            # Note, communicate will not contain any buffered output.
            (unexpected_out, err) = proc.communicate()
            if unexpected_out:
                out = '[unexpected stdout] %s' % unexpected_out
                for line in unexpected_out.splitlines():
                    handler(line)

        ret = proc.returncode
        logging.debug('cmd: %s, stdout: %s, stderr: %s, ret: %s',
                      utils.cli_cmd_to_string(args), out, err, ret)
        if ret == 0:
            return err
        else:
            raise AdbError(cmd=args, stdout=out, stderr=err, ret_code=ret)

    def _construct_adb_cmd(self, raw_name, args, shell):
        """Constructs an adb command with arguments for a subprocess call.

        Args:
            raw_name: string, the raw unsanitized name of the adb command to
                format.
            args: string or list of strings, arguments to the adb command.
                See subprocess.Proc() documentation.
            shell: bool, True to run this command through the system shell,
                False to invoke it directly. See subprocess.Proc() docs.

        Returns:
            The adb command in a format appropriate for subprocess. If shell is
                True, then this is a string; otherwise, this is a list of
                strings.
        """
        args = args or ''
        name = raw_name.replace('_', '-')
        if shell:
            args = utils.cli_cmd_to_string(args)
            # Add quotes around "adb" in case the ADB path contains spaces. This
            # is pretty common on Windows (e.g. Program Files).
            if self.serial:
                adb_cmd = '"%s" -s "%s" %s %s' % (ADB, self.serial, name, args)
            else:
                adb_cmd = '"%s" %s %s' % (ADB, name, args)
        else:
            adb_cmd = [ADB]
            if self.serial:
                adb_cmd.extend(['-s', self.serial])
            adb_cmd.append(name)
            if args:
                if isinstance(args, basestring):
                    adb_cmd.append(args)
                else:
                    adb_cmd.extend(args)
        return adb_cmd

    def _exec_adb_cmd(self, name, args, shell, timeout, stderr):
        adb_cmd = self._construct_adb_cmd(name, args, shell=shell)
        out = self._exec_cmd(adb_cmd,
                             shell=shell,
                             timeout=timeout,
                             stderr=stderr)
        return out

    def _execute_adb_and_process_stdout(self, name, args, shell, handler):
        adb_cmd = self._construct_adb_cmd(name, args, shell=shell)
        err = self._execute_and_process_stdout(adb_cmd,
                                               shell=shell,
                                               handler=handler)
        return err

    def _parse_getprop_output(self, output):
        """Parses the raw output of `adb shell getprop` into a dictionary.

        Args:
            output: byte str, the raw output of the `adb shell getprop` call.

        Returns:
            dict, name-value pairs of the properties.
        """
        output = output.decode('utf-8', errors='ignore').replace('\r\n', '\n')
        results = {}
        for line in output.split(']\n'):
            if not line:
                continue
            try:
                name, value = line.split(': ', 1)
            except ValueError:
                logging.debug('Failed to parse adb getprop line %s', line)
                continue
            name = name.strip()[1:-1]
            # Remove any square bracket from either end of the value string.
            if value and value[0] == '[':
                value = value[1:]
            results[name] = value
        return results

    @property
    def current_user_id(self):
        """Gets the ID of the current Android user.

        Some adb commands require the current user ID to work properly.
        This convenient method is meant for 

        Returns:
            int, the ID of the current user.
        """
        sdk_int = int(self.getprop('ro.build.version.sdk'))
        if sdk_int >= 24:
            return int(self.shell(['am', 'get-current-user']))
        if sdk_int >= 21:
            user_info_str = self.shell(['dumpsys', 'user']).decode('utf-8')
            return int(re.findall(r'\{(\d+):', user_info_str)[0])
        # Multi-user is not supported in SDK < 21, only user 0 exists.
        return 0

    def getprop(self, prop_name):
        """Get a property of the device.

        This is a convenience wrapper for `adb shell getprop xxx`.

        Args:
            prop_name: A string that is the name of the property to get.

        Returns:
            A string that is the value of the property, or None if the property
            doesn't exist.
        """
        return self.shell(
            ['getprop', prop_name],
            timeout=DEFAULT_GETPROP_TIMEOUT_SEC).decode('utf-8').strip()

    def getprops(self, prop_names):
        """Get multiple properties of the device.

        This is a convenience wrapper for `adb shell getprop`. Use this to
        reduce the number of adb calls when getting multiple properties.

        Args:
            prop_names: list of strings, the names of the properties to get.

        Returns:
            A dict containing name-value pairs of the properties requested, if
            they exist.
        """
        attempts = DEFAULT_GETPROPS_ATTEMPTS
        results = {}
        for attempt in range(attempts):
            # The ADB getprop command can randomly return empty string, so try
            # multiple times. This value should always be non-empty if the device
            # in a working state.
            raw_output = self.shell(['getprop'],
                                    timeout=DEFAULT_GETPROP_TIMEOUT_SEC)
            properties = self._parse_getprop_output(raw_output)
            if properties:
                for name in prop_names:
                    if name in properties:
                        results[name] = properties[name]
                break
            # Don't call sleep on the last attempt.
            if attempt < attempts - 1:
                time.sleep(DEFAULT_GETPROPS_RETRY_SLEEP_SEC)
        return results

    def has_shell_command(self, command):
        """Checks to see if a given check command exists on the device.

        Args:
            command: A string that is the name of the command to check.

        Returns:
            A boolean that is True if the command exists and False otherwise.
        """
        try:
            output = self.shell(['command', '-v',
                                 command]).decode('utf-8').strip()
            return command in output
        except AdbError:
            # If the command doesn't exist, then 'command -v' can return
            # an exit code > 1.
            return False

    def forward(self, args=None, shell=False):
        with ADB_PORT_LOCK:
            return self._exec_adb_cmd('forward',
                                      args,
                                      shell,
                                      timeout=None,
                                      stderr=None)

    def instrument(self, package, options=None, runner=None, handler=None):
        """Runs an instrumentation command on the device.

        This is a convenience wrapper to avoid parameter formatting.

        Example:

        .. code-block:: python

            device.instrument(
                'com.my.package.test',
                options = {
                    'class': 'com.my.package.test.TestSuite',
                },
            )

        Args:
            package: string, the package of the instrumentation tests.
            options: dict, the instrumentation options including the test
                class.
            runner: string, the test runner name, which defaults to
                DEFAULT_INSTRUMENTATION_RUNNER.
            handler: optional func, when specified the function is used to parse
                the instrumentation stdout line by line as the output is
                generated; otherwise, the stdout is simply returned once the
                instrumentation is finished.

        Returns:
            The stdout of instrumentation command or the stderr if the handler
                is set.
        """
        if runner is None:
            runner = DEFAULT_INSTRUMENTATION_RUNNER
        if options is None:
            options = {}

        options_list = []
        for option_key, option_value in options.items():
            options_list.append('-e %s %s' % (option_key, option_value))
        options_string = ' '.join(options_list)

        instrumentation_command = 'am instrument -r -w %s %s/%s' % (
            options_string, package, runner)
        logging.info('AndroidDevice|%s: Executing adb shell %s', self.serial,
                     instrumentation_command)
        if handler is None:
            return self._exec_adb_cmd('shell',
                                      instrumentation_command,
                                      shell=False,
                                      timeout=None,
                                      stderr=None)
        else:
            return self._execute_adb_and_process_stdout(
                'shell', instrumentation_command, shell=False, handler=handler)

    def __getattr__(self, name):
        def adb_call(args=None, shell=False, timeout=None, stderr=None):
            """Wrapper for an ADB command.

            Args:
                args: string or list of strings, arguments to the adb command.
                    See subprocess.Proc() documentation.
                shell: bool, True to run this command through the system shell,
                    False to invoke it directly. See subprocess.Proc() docs.
                timeout: float, the number of seconds to wait before timing out.
                    If not specified, no timeout takes effect.
                stderr: a Byte stream, like io.BytesIO, stderr of the command
                    will be written to this object if provided.

            Returns:
                The output of the adb command run if exit code is 0.
            """
            return self._exec_adb_cmd(name,
                                      args,
                                      shell=shell,
                                      timeout=timeout,
                                      stderr=stderr)

        return adb_call
