Merge "Remove use of paramiko in Fuchsia SSH module"
diff --git a/acts/framework/acts/controllers/fuchsia_device.py b/acts/framework/acts/controllers/fuchsia_device.py
index 8a36c85..44419bf 100644
--- a/acts/framework/acts/controllers/fuchsia_device.py
+++ b/acts/framework/acts/controllers/fuchsia_device.py
@@ -54,7 +54,7 @@
 from acts.controllers.fuchsia_lib.netstack.netstack_lib import FuchsiaNetstackLib
 from acts.controllers.fuchsia_lib.package_server import PackageServer
 from acts.controllers.fuchsia_lib.session_manager_lib import FuchsiaSessionManagerLib
-from acts.controllers.fuchsia_lib.ssh import DEFAULT_SSH_PORT, DEFAULT_SSH_USER, SSHProvider, FuchsiaSSHError
+from acts.controllers.fuchsia_lib.ssh import DEFAULT_SSH_PORT, DEFAULT_SSH_USER, SSHConfig, SSHProvider, FuchsiaSSHError
 from acts.controllers.fuchsia_lib.syslog_lib import FuchsiaSyslogError
 from acts.controllers.fuchsia_lib.syslog_lib import create_syslog_process
 from acts.controllers.fuchsia_lib.utils_lib import flash
@@ -204,20 +204,28 @@
         self.sl4f_port: int = fd_conf_data.get("sl4f_port", 80)
         self.ssh_port: int = fd_conf_data.get("ssh_port", DEFAULT_SSH_PORT)
         self.ssh_config: Optional[str] = fd_conf_data.get("ssh_config", None)
-        self.ssh_priv_key: Optional[str] = fd_conf_data.get("ssh_priv_key", None)
-        self.authorized_file: Optional[str] = fd_conf_data.get("authorized_file_loc", None)
-        self.serial_number: Optional[str] = fd_conf_data.get("serial_number", None)
+        self.ssh_priv_key: Optional[str] = fd_conf_data.get(
+            "ssh_priv_key", None)
+        self.authorized_file: Optional[str] = fd_conf_data.get(
+            "authorized_file_loc", None)
+        self.serial_number: Optional[str] = fd_conf_data.get(
+            "serial_number", None)
         self.device_type: Optional[str] = fd_conf_data.get("device_type", None)
-        self.product_type: Optional[str] = fd_conf_data.get("product_type", None)
+        self.product_type: Optional[str] = fd_conf_data.get(
+            "product_type", None)
         self.board_type: Optional[str] = fd_conf_data.get("board_type", None)
-        self.build_number: Optional[str] = fd_conf_data.get("build_number", None)
+        self.build_number: Optional[str] = fd_conf_data.get(
+            "build_number", None)
         self.build_type: Optional[str] = fd_conf_data.get("build_type", None)
         self.server_path: Optional[str] = fd_conf_data.get("server_path", None)
-        self.specific_image: Optional[str] = fd_conf_data.get("specific_image", None)
-        self.ffx_binary_path: Optional[str] = fd_conf_data.get("ffx_binary_path", None)
+        self.specific_image: Optional[str] = fd_conf_data.get(
+            "specific_image", None)
+        self.ffx_binary_path: Optional[str] = fd_conf_data.get(
+            "ffx_binary_path", None)
         # Path to a tar.gz archive with pm and amber-files, as necessary for
         # starting a package server.
-        self.packages_archive_path: Optional[str] = fd_conf_data.get("packages_archive_path", None)
+        self.packages_archive_path: Optional[str] = fd_conf_data.get(
+            "packages_archive_path", None)
         self.mdns_name: Optional[str] = fd_conf_data.get("mdns_name", None)
 
         # Instead of the input ssh_config, a new config is generated with proper
@@ -481,7 +489,8 @@
                 raise FuchsiaConfigError(
                     'Must provide "ssh_priv_key: <file path>" in the device config'
                 )
-            self._ssh = SSHProvider(self.ip, self.ssh_port, self.ssh_priv_key)
+            self._ssh = SSHProvider(
+                SSHConfig(self.ip, self.ssh_priv_key, port=self.ssh_port))
         return self._ssh
 
     @ssh.deleter
@@ -548,22 +557,19 @@
             skip_status_code_check = 'true' == str(
                 cmd_dict.get('skip_status_code_check', False)).lower()
 
-            try:
-                if skip_status_code_check:
-                    self.log.info(
-                        f'Running command "{cmd}" and ignoring result.')
-                else:
-                    self.log.info(f'Running command "{cmd}".')
+            if skip_status_code_check:
+                self.log.info(f'Running command "{cmd}" and ignoring result.')
+            else:
+                self.log.info(f'Running command "{cmd}".')
 
-                result = self.ssh.run(
-                    cmd,
-                    timeout_sec=timeout,
-                    skip_status_code_check=skip_status_code_check)
+            try:
+                result = self.ssh.run(cmd, timeout_sec=timeout)
                 self.log.debug(result)
             except FuchsiaSSHError as e:
-                raise FuchsiaDeviceError(
-                    'Failed device specific commands for initial configuration'
-                ) from e
+                if not skip_status_code_check:
+                    raise FuchsiaDeviceError(
+                        'Failed device specific commands for initial configuration'
+                    ) from e
 
     def build_id(self, test_id):
         """Concatenates client_id and test_id to form a command_id
@@ -689,10 +695,13 @@
                 with utils.SuppressLogOutput():
                     self.clean_up_services()
                     self.stop_sl4f_on_fuchsia_device()
-                    self.ssh.run(
-                        'dm reboot',
-                        timeout_sec=FUCHSIA_RECONNECT_AFTER_REBOOT_TIME,
-                        skip_status_code_check=True)
+                    try:
+                        self.ssh.run(
+                            'dm reboot',
+                            timeout_sec=FUCHSIA_RECONNECT_AFTER_REBOOT_TIME)
+                    except FuchsiaSSHError as e:
+                        if 'closed by remote host' not in e.result.stderr:
+                            raise e
             else:
                 self.log.info('Calling SL4F reboot command.')
                 self.clean_up_services()
@@ -1239,6 +1248,7 @@
 
 
 class FuchsiaDeviceLoggerAdapter(logging.LoggerAdapter):
+
     def process(self, msg, kwargs):
         msg = "[FuchsiaDevice|%s] %s" % (self.extra["ip"], msg)
         return msg, kwargs
diff --git a/acts/framework/acts/controllers/fuchsia_lib/package_server.py b/acts/framework/acts/controllers/fuchsia_lib/package_server.py
index 7c763e6..bcc3f35 100644
--- a/acts/framework/acts/controllers/fuchsia_lib/package_server.py
+++ b/acts/framework/acts/controllers/fuchsia_lib/package_server.py
@@ -150,7 +150,8 @@
         Raises:
             TestAbortClass: when the timestamp.json file has expired
         """
-        with open(f'{self._packages_path}/repository/timestamp.json', 'r') as f:
+        with open(f'{self._packages_path}/repository/timestamp.json',
+                  'r') as f:
             data = json.load(f)
             expiresAtRaw = data["signed"]["expires"]
             expiresAt = datetime.strptime(expiresAtRaw, '%Y-%m-%dT%H:%M:%SZ')
@@ -187,28 +188,28 @@
         self.log.info(f'Serving packages on port {self._port}')
 
     def configure_device(self,
-                         device_ssh: SSHProvider,
+                         ssh: SSHProvider,
                          repo_name=DEFAULT_FUCHSIA_REPO_NAME) -> None:
         """Configure the device to use this package server.
 
         Args:
-            device_ssh: Device SSH transport channel
+            ssh: Device SSH transport channel
             repo_name: Name of the repo to alias this package server
         """
         # Remove any existing repositories that may be stale.
         try:
-            device_ssh.run(f'pkgctl repo rm fuchsia-pkg://{repo_name}')
+            ssh.run(f'pkgctl repo rm fuchsia-pkg://{repo_name}')
         except FuchsiaSSHError as e:
-            if not 'NOT_FOUND' in e.result.stderr:
+            if 'NOT_FOUND' not in e.result.stderr:
                 raise e
 
         # Configure the device with the new repository.
-        host_ip = find_host_ip(device_ssh.ip)
+        host_ip = find_host_ip(ssh.config.host_name)
         repo_url = f"http://{host_ip}:{self._port}"
-        device_ssh.run(
+        ssh.run(
             f"pkgctl repo add url -f 2 -n {repo_name} {repo_url}/config.json")
         self.log.info(
-            f'Added repo "{repo_name}" as {repo_url} on device {device_ssh.ip}'
+            f'Added repo "{repo_name}" as {repo_url} on device {ssh.config.host_name}'
         )
 
     def _wait_for_server(self, timeout_sec: int = 5) -> None:
diff --git a/acts/framework/acts/controllers/fuchsia_lib/ssh.py b/acts/framework/acts/controllers/fuchsia_lib/ssh.py
index fdd656f..9ed61a8 100644
--- a/acts/framework/acts/controllers/fuchsia_lib/ssh.py
+++ b/acts/framework/acts/controllers/fuchsia_lib/ssh.py
@@ -14,44 +14,32 @@
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 
-import paramiko
-import paramiko.channel
-import socket
+import subprocess
 
-from contextlib import contextmanager
-from typing import Iterator
+from dataclasses import dataclass
+from typing import List, Union
 
 from acts import logger
 from acts import signals
 
 DEFAULT_SSH_USER: str = "fuchsia"
 DEFAULT_SSH_PORT: int = 22
-DEFAULT_SSH_TIMEOUT_SEC: int = 30
+DEFAULT_SSH_TIMEOUT_SEC: int = 60
+DEFAULT_SSH_CONNECT_TIMEOUT_SEC: int = 30
+DEFAULT_SSH_SERVER_ALIVE_INTERVAL: int = 30
 
 
-class FuchsiaSSHError(signals.TestError):
-    """A SSH command returned with a non-zero status code."""
+class SSHResult:
+    """Result of an SSH command."""
 
-    def __init__(self, command, result):
-        super().__init__(
-            f'SSH command "{command}" unexpectedly returned {result}')
-        self.result = result
-
-
-class FuchsiaSSHTransportError(signals.TestError):
-    """Failure to send an SSH command."""
-
-
-class SSHResults:
-    """Results from an SSH command."""
-
-    def __init__(self, stdout: paramiko.channel.ChannelFile,
-                 stderr: paramiko.channel.ChannelStderrFile) -> None:
-        """Create SSHResults from paramiko channels."""
-        self._raw_stdout = stdout.read()
-        self._stdout = self._raw_stdout.decode('utf-8', errors='replace')
-        self._stderr = stderr.read().decode('utf-8', errors='replace')
-        self._exit_status: int = stdout.channel.recv_exit_status()
+    def __init__(
+        self, process: Union[subprocess.CompletedProcess[bytes],
+                             subprocess.CalledProcessError]
+    ) -> None:
+        self._raw_stdout = process.stdout
+        self._stdout = process.stdout.decode('utf-8', errors='replace')
+        self._stderr = process.stderr.decode('utf-8', errors='replace')
+        self._exit_status: int = process.returncode
 
     def __str__(self):
         if self.exit_status == 0:
@@ -59,7 +47,7 @@
         return f'status {self.exit_status}, stdout: "{self.stdout}", stderr: "{self.stderr}"'
 
     @property
-    def stdout(self) -> bytes:
+    def stdout(self) -> str:
         return self._stdout
 
     @property
@@ -75,110 +63,139 @@
         return self._raw_stdout
 
 
+class FuchsiaSSHError(signals.TestError):
+    """A SSH command returned with a non-zero status code."""
+
+    def __init__(self, command: str, result: SSHResult):
+        super().__init__(
+            f'SSH command "{command}" unexpectedly returned {result}')
+        self.result = result
+
+
+class SSHTimeout(signals.TestError):
+    """A SSH command timed out."""
+
+    def __init__(self, err: subprocess.TimeoutExpired):
+        super().__init__(
+            f'SSH command "{err.command}" timed out after {err.timeout}s, '
+            f'stdout="{err.stdout}", stderr="{err.stderr}"')
+
+
+class FuchsiaSSHTransportError(signals.TestError):
+    """Failure to send an SSH command."""
+
+
+@dataclass
+class SSHConfig:
+    """SSH client config."""
+
+    # SSH flags. See ssh(1) for full details.
+    host_name: str
+    identity_file: str
+
+    ssh_binary: str = 'ssh'
+    config_file: str = '/dev/null'
+    port: int = 22
+    user: str = DEFAULT_SSH_USER
+
+    # SSH options. See ssh_config(5) for full details.
+    connect_timeout: int = DEFAULT_SSH_CONNECT_TIMEOUT_SEC
+    server_alive_interval: int = DEFAULT_SSH_SERVER_ALIVE_INTERVAL
+    strict_host_key_checking: bool = False
+    user_known_hosts_file: str = "/dev/null"
+
+    def full_command(self, command: str) -> List[str]:
+        return [
+            self.ssh_binary,
+            # SSH flags
+            '-i',
+            self.identity_file,
+            '-F',
+            self.config_file,
+            '-p',
+            str(self.port),
+            # SSH configuration options
+            '-o',
+            f'ConnectTimeout={self.connect_timeout}',
+            '-o',
+            f'ServerAliveInterval={self.server_alive_interval}',
+            '-o',
+            f'StrictHostKeyChecking={self.strict_host_key_checking}',
+            '-o',
+            f'UserKnownHostsFile={self.user_known_hosts_file}',
+            f'{self.user}@{self.host_name}'
+        ] + command.split()
+
+
 class SSHProvider:
     """Device-specific provider for SSH clients."""
 
-    def __init__(self,
-                 ip: str,
-                 port: int,
-                 private_key_file_name: str,
-                 timeout_sec: int = DEFAULT_SSH_TIMEOUT_SEC) -> None:
+    def __init__(self, config: SSHConfig) -> None:
         """
         Args:
-            ip: IP used by the SSH server on the device.
-            port: Port running the SSH server on the device.
-            private_key_file_name: File name of the SSH private key to use.
-            timeout_sec: Timeout to connect to the SSH server.
+            config: SSH client config
         """
-        logger_tag = f"ssh | {ip}"
-        if port != DEFAULT_SSH_PORT:
-            logger_tag += f':{port}'
+        logger_tag = f"ssh | {config.host_name}"
+        if config.port != DEFAULT_SSH_PORT:
+            logger_tag += f':{config.port}'
+
+        # Check if the private key exists
 
         self.log = logger.create_tagged_trace_logger(logger_tag)
-        self.ip = ip
-        self.port = port
-        self.private_key_file_name = private_key_file_name
-        self.timeout_sec = timeout_sec
+        self.config = config
 
     def run(self,
             command: str,
             timeout_sec: int = DEFAULT_SSH_TIMEOUT_SEC,
-            connect_retries: int = 3,
-            skip_status_code_check: bool = False) -> SSHResults:
+            connect_retries: int = 3) -> SSHResult:
         """Run a command on the device then exit.
 
         Args:
             command: String to send to the device.
             timeout_sec: Seconds to wait for the command to complete.
             connect_retries: Amount of times to retry connect on fail.
-            skip_status_code_check: Whether to check for an error status code.
 
         Raises:
             FuchsiaSSHError: if the SSH command returns a non-zero status code
-                and skip_status_code_check is False
+            FuchsiaSSHTimeout: if there is no response within timeout_sec
             FuchsiaSSHTransportError: if SSH fails to run the command
 
         Returns:
             SSHResults from the executed command.
         """
-        with self._client(connect_retries) as c:
+        err: Exception
+        for i in range(0, connect_retries):
             try:
-                _, stdout, stderr = c.exec_command(command,
-                                                   timeout=timeout_sec)
-                result = SSHResults(stdout, stderr)
-            except Exception as e:
-                raise FuchsiaSSHTransportError(
-                    f'Failed sending SSH command "{command}"') from e
-
-        if result.exit_status != 0 and not skip_status_code_check:
-            raise FuchsiaSSHError(command, result)
-
-        return result
-
-    @contextmanager
-    def _client(self, connect_retries: int) -> Iterator[paramiko.SSHClient]:
-        """Create a SSH client to the device.
-
-        Args:
-            connect_retries: Amount of times to retry connect on fail.
-
-        Returns:
-            A context manager around a paramiko.SSHClient that has already been
-            connected to the device.
-        """
-        client = paramiko.SSHClient()
-        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
-        try:
-            self._connect(client, connect_retries)
-            yield client
-        finally:
-            client.close()
-
-    def _connect(self, client: paramiko.SSHClient, retries: int) -> None:
-        """Connect the client to the device.
-
-        Args:
-            client: Paramiko SSH client
-            retries: Amount of times to retry on fail.
-        """
-        err: Exception = None
-        for i in range(0, retries):
-            try:
-                client.connect(hostname=self.ip,
-                               username=DEFAULT_SSH_USER,
-                               allow_agent=False,
-                               key_filename=self.private_key_file_name,
-                               port=self.port,
-                               timeout=self.timeout_sec,
-                               auth_timeout=self.timeout_sec,
-                               banner_timeout=self.timeout_sec)
-                client.get_transport().set_keepalive(1)
-                return
-            except FileNotFoundError as e:
-                raise signals.TestAbortClass('Private key not found') from e
-            except (paramiko.SSHException, paramiko.AuthenticationException,
-                    socket.timeout, socket.error, ConnectionRefusedError,
-                    ConnectionResetError) as e:
+                return self._run(command, timeout_sec)
+            except FuchsiaSSHTransportError as e:
                 err = e
                 self.log.warn(f'Connect failed: {e}')
         raise err
+
+    def _run(self, command: str, timeout_sec: int) -> SSHResult:
+        full_command = self.config.full_command(command)
+        try:
+            process = subprocess.run(full_command,
+                                     capture_output=True,
+                                     timeout=timeout_sec,
+                                     check=True)
+        except subprocess.CalledProcessError as e:
+            if e.returncode == 255:
+                stderr = e.stderr.decode('utf-8', errors='replace')
+                if 'Name or service not known' in stderr or 'Host does not exist' in stderr:
+                    raise FuchsiaSSHTransportError(
+                        f'Hostname {self.config.host_name} cannot be resolved to an address'
+                    ) from e
+                if 'Connection timed out' in stderr:
+                    raise FuchsiaSSHTransportError(
+                        f'Failed to establish a connection to {self.config.remote} within {timeout_sec}s'
+                    ) from e
+                if 'Connection refused' in stderr:
+                    raise FuchsiaSSHTransportError(
+                        f'Connection refused by {self.config.remote}') from e
+
+            raise FuchsiaSSHError(command, SSHResult(e)) from e
+        except subprocess.TimeoutExpired as e:
+            raise SSHTimeout(e) from e
+
+        return SSHResult(process)
diff --git a/acts/framework/acts/controllers/fuchsia_lib/utils_lib.py b/acts/framework/acts/controllers/fuchsia_lib/utils_lib.py
index dd39ede..4e620b9 100644
--- a/acts/framework/acts/controllers/fuchsia_lib/utils_lib.py
+++ b/acts/framework/acts/controllers/fuchsia_lib/utils_lib.py
@@ -27,6 +27,7 @@
 
 from acts import utils
 from acts.controllers.fuchsia_lib.base_lib import DeviceOffline
+from acts.controllers.fuchsia_lib.ssh import FuchsiaSSHError
 from acts.libs.proc import job
 from acts.utils import get_fuchsia_mdns_ipv6_address
 
@@ -270,10 +271,12 @@
             # after this command.  There is no check so if there is an
             # expectation of the device being in fastboot, then some
             # other check needs to be done.
-            fuchsia_device.ssh.run(
-                'dm rb',
-                timeout_sec=fuchsia_reconnect_after_reboot_time,
-                skip_status_code_check=True)
+            try:
+                fuchsia_device.ssh.run(
+                    'dm rb', timeout_sec=fuchsia_reconnect_after_reboot_time)
+            except FuchsiaSSHError as e:
+                if 'closed by remote host' not in e.result.stderr:
+                    raise e
     else:
         pass
         ## Todo: Add elif for SL4F if implemented in SL4F
diff --git a/acts/framework/acts/controllers/iperf_client.py b/acts/framework/acts/controllers/iperf_client.py
index 6f57ea7..04801e8 100644
--- a/acts/framework/acts/controllers/iperf_client.py
+++ b/acts/framework/acts/controllers/iperf_client.py
@@ -24,7 +24,7 @@
 from acts import utils
 from acts.controllers.adb_lib.error import AdbCommandError
 from acts.controllers.android_device import AndroidDevice
-from acts.controllers.fuchsia_lib.ssh import SSHProvider, SSHResults
+from acts.controllers.fuchsia_lib.ssh import SSHProvider, SSHResult
 from acts.controllers.iperf_server import _AndroidDeviceBridge
 from acts.controllers.fuchsia_lib.utils_lib import create_ssh_connection
 from acts.controllers.fuchsia_lib.utils_lib import ssh_is_connected
@@ -235,8 +235,7 @@
                         ssh_config=self._ssh_settings.ssh_config)
                 _, cmd_result_stdout, cmd_result_stderr = (
                     self._ssh_session.exec_command(iperf_cmd, timeout=timeout))
-                iperf_process = SSHResults(cmd_result_stdout,
-                                           cmd_result_stderr)
+                iperf_process = SSHResult(cmd_result_stdout, cmd_result_stderr)
             else:
                 iperf_process = self._ssh_session.run(iperf_cmd,
                                                       timeout=timeout)
diff --git a/acts_tests/acts_contrib/test_utils/fuchsia/utils.py b/acts_tests/acts_contrib/test_utils/fuchsia/utils.py
index 936dfba..f9837b7 100644
--- a/acts_tests/acts_contrib/test_utils/fuchsia/utils.py
+++ b/acts_tests/acts_contrib/test_utils/fuchsia/utils.py
@@ -72,7 +72,10 @@
     finally:
         if remove_file_after_check:
             fd.log.info(f'Remove the downloaded file {file_path}')
-            fd.ssh.run(f'rm {file_path}', skip_status_code_check=True)
+            try:
+                fd.ssh.run(f'rm {file_path}')
+            except FuchsiaSSHError:
+                pass
 
 
 def _generate_file_directory_and_file_name(url, out_path):
diff --git a/acts_tests/tests/google/fuchsia/bt/ep/BtFuchsiaEPTest.py b/acts_tests/tests/google/fuchsia/bt/ep/BtFuchsiaEPTest.py
index 507cf84..f360422 100644
--- a/acts_tests/tests/google/fuchsia/bt/ep/BtFuchsiaEPTest.py
+++ b/acts_tests/tests/google/fuchsia/bt/ep/BtFuchsiaEPTest.py
@@ -20,6 +20,7 @@
 
 from acts import signals
 from acts.base_test import BaseTestClass
+from acts.controllers.fuchsia_lib.ssh import FuchsiaSSHError
 from acts.test_decorators import test_tracker_info
 from acts_contrib.test_utils.bt.bt_test_utils import generate_id_by_size
 from acts_contrib.test_utils.fuchsia.bt_test_utils import bredr_scan_for_device_by_name
@@ -68,12 +69,11 @@
         """
         ssh_timeout = 30
         for fd in self.fuchsia_devices:
-            fd.ssh.run("killall bt-a2dp*",
-                       timeout=ssh_timeout,
-                       skip_status_code_check=True)
-            fd.ssh.run("killall bt-avrcp*",
-                       timeout=ssh_timeout,
-                       skip_status_code_check=True)
+            try:
+                fd.ssh.run("killall bt-a2dp*", timeout_sec=ssh_timeout)
+                fd.ssh.run("killall bt-avrcp*", timeout_sec=ssh_timeout)
+            except FuchsiaSSHError:
+                pass
 
     def _unbond_all_known_devices(self):
         """For all Fuchsia devices, unbond any known pairings.