Fix deadlock issue with adb commands with timeout. (#558)

diff --git a/mobly/controllers/android_device_lib/adb.py b/mobly/controllers/android_device_lib/adb.py
index 15e2a5c..fee766c 100644
--- a/mobly/controllers/android_device_lib/adb.py
+++ b/mobly/controllers/android_device_lib/adb.py
@@ -21,6 +21,8 @@
 import subprocess
 import threading
 
+from mobly import utils
+
 # Command to use for running ADB commands.
 ADB = 'adb'
 
@@ -167,23 +169,17 @@
             AdbError: The adb command exit code is not 0.
             AdbTimeoutError: The adb command timed out.
         """
-        proc = subprocess.Popen(
-            args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=shell)
-        process = psutil.Process(proc.pid)
         if timeout and timeout <= 0:
             raise ValueError('Timeout is not a positive value: %s' % timeout)
-        if timeout and timeout > 0:
-            try:
-                process.wait(timeout=timeout)
-            except psutil.TimeoutExpired:
-                process.terminate()
-                raise AdbTimeoutError(
-                    cmd=args, timeout=timeout, serial=self.serial)
+        try:
+            (ret, out, err) = utils.run_command(
+                args, shell=shell, timeout=timeout)
+        except psutil.TimeoutExpired:
+            raise AdbTimeoutError(
+                cmd=args, timeout=timeout, serial=self.serial)
 
-        (out, err) = proc.communicate()
         if stderr:
             stderr.write(err)
-        ret = proc.returncode
         logging.debug('cmd: %s, stdout: %s, stderr: %s, ret: %s',
                       cli_cmd_to_string(args), out, err, ret)
         if ret == 0:
diff --git a/mobly/utils.py b/mobly/utils.py
index bb2f316..c3c22c8 100644
--- a/mobly/utils.py
+++ b/mobly/utils.py
@@ -25,6 +25,7 @@
 import signal
 import string
 import subprocess
+import threading
 import time
 import traceback
 
@@ -283,6 +284,71 @@
         return return_vals
 
 
+def run_command(cmd,
+                stdout=None,
+                stderr=None,
+                shell=False,
+                timeout=None,
+                env=None):
+    """Runs a command in a subprocess.
+
+    This function is very similar to subprocess.check_output. The main
+    difference is that it returns the return code and std error output as well
+    as supporting a timeout parameter.
+
+    Args:
+        cmd: string or list of strings, the command to run.
+            See subprocess.Popen() documentation.
+        stdout: file handle, the file handle to write std out to. If None is
+            given, then subprocess.PIPE is used. See subprocess.Popen()
+            documentation.
+        stdee: file handle, the file handle to write std err to. If None is
+            given, then subprocess.PIPE is used. See subprocess.Popen()
+            documentation.
+        shell: bool, True to run this command through the system shell,
+            False to invoke it directly. See subprocess.Popen() docs.
+        timeout: float, the number of seconds to wait before timing out.
+            If not specified, no timeout takes effect.
+
+    Returns:
+        A 3-tuple of the consisting of the return code, the std output, and the
+            std error.
+
+    Raises:
+        psutil.TimeoutExpired: The command timed out.
+    """
+    # Only import psutil when actually needed.
+    # psutil may cause import error in certain env. This way the utils module
+    # doesn't crash upon import.
+    import psutil
+    if stdout is None:
+        stdout = subprocess.PIPE
+    if stderr is None:
+        stderr = subprocess.PIPE
+    process = psutil.Popen(
+        cmd, stdout=stdout, stderr=stderr, shell=shell, env=env)
+    timer = None
+    timer_triggered = threading.Event()
+    if timeout and timeout > 0:
+        # The wait method on process will hang when used with PIPEs with large
+        # outputs, so use a timer thread instead.
+
+        def timeout_expired():
+            timer_triggered.set()
+            process.terminate()
+
+        timer = threading.Timer(timeout, timeout_expired)
+        timer.start()
+    # If the command takes longer than the timeout, then the timer thread
+    # will kill the subprocess, which will make it terminate.
+    (out, err) = process.communicate()
+    if timer is not None:
+        timer.cancel()
+    if timer_triggered.is_set():
+        raise psutil.TimeoutExpired(timeout, pid=process.pid)
+    return (process.returncode, out, err)
+
+
 def start_standing_subprocess(cmd, shell=False, env=None):
     """Starts a long-running subprocess.
 
@@ -395,6 +461,12 @@
     wait_for_standing_subprocess() to keep waiting for it to terminate on its
     own.
 
+    If the corresponding subprocess command generates a large amount of output
+    and this method is called with a timeout value, then the command can hang
+    indefinitely. See http://go/pylib/subprocess.html#subprocess.Popen.wait
+
+    This function does not support Python 2.
+
     Args:
         p: Subprocess to wait for.
         timeout: An integer number of seconds to wait before timing out.
diff --git a/tests/mobly/controllers/android_device_lib/adb_test.py b/tests/mobly/controllers/android_device_lib/adb_test.py
index 9430795..fd34b6e 100755
--- a/tests/mobly/controllers/android_device_lib/adb_test.py
+++ b/tests/mobly/controllers/android_device_lib/adb_test.py
@@ -79,22 +79,23 @@
         mock_proc.returncode = 0
         return mock_popen
 
-    @mock.patch('mobly.controllers.android_device_lib.adb.subprocess.Popen')
-    @mock.patch('mobly.controllers.android_device_lib.adb.psutil.Process')
-    def test_exec_cmd_no_timeout_success(self, mock_psutil_process,
-                                         mock_Popen):
-        self._mock_process(mock_psutil_process, mock_Popen)
-
+    @mock.patch('mobly.utils.run_command')
+    def test_exec_cmd_no_timeout_success(self, mock_run_command):
+        mock_run_command.return_value = (0,
+                                         MOCK_DEFAULT_STDOUT.encode('utf-8'),
+                                         MOCK_DEFAULT_STDERR.encode('utf-8'))
         out = adb.AdbProxy()._exec_cmd(
             ['fake_cmd'], shell=False, timeout=None, stderr=None)
         self.assertEqual(MOCK_DEFAULT_STDOUT, out.decode('utf-8'))
+        mock_run_command.assert_called_with(
+            ['fake_cmd'], shell=False, timeout=None)
 
-    @mock.patch('mobly.controllers.android_device_lib.adb.subprocess.Popen')
-    @mock.patch('mobly.controllers.android_device_lib.adb.psutil.Process')
-    def test_exec_cmd_error_with_serial(self, mock_psutil_process, mock_popen):
-        self._mock_process(mock_psutil_process, mock_popen)
-        # update return code to indicate command execution error
-        mock_popen.return_value.returncode = 1
+    @mock.patch('mobly.utils.run_command')
+    def test_exec_cmd_error_with_serial(self, mock_run_command):
+        # Return 1 for retcode for error.
+        mock_run_command.return_value = (1,
+                                         MOCK_DEFAULT_STDOUT.encode('utf-8'),
+                                         MOCK_DEFAULT_STDERR.encode('utf-8'))
         mock_serial = 'ABCD1234'
         with self.assertRaisesRegex(adb.AdbError,
                                     'Error executing adb cmd .*') as context:
@@ -102,35 +103,35 @@
         self.assertEqual(context.exception.serial, mock_serial)
         self.assertIn(mock_serial, context.exception.cmd)
 
-    @mock.patch('mobly.controllers.android_device_lib.adb.subprocess.Popen')
-    @mock.patch('mobly.controllers.android_device_lib.adb.psutil.Process')
-    def test_exec_cmd_error_without_serial(self, mock_psutil_process,
-                                           mock_popen):
-        self._mock_process(mock_psutil_process, mock_popen)
-        # update return code to indicate command execution error
-        mock_popen.return_value.returncode = 1
+    @mock.patch('mobly.utils.run_command')
+    def test_exec_cmd_error_without_serial(self, mock_run_command):
+        # Return 1 for retcode for error.
+        mock_run_command.return_value = (1,
+                                         MOCK_DEFAULT_STDOUT.encode('utf-8'),
+                                         MOCK_DEFAULT_STDERR.encode('utf-8'))
         with self.assertRaisesRegex(adb.AdbError,
                                     'Error executing adb cmd .*') as context:
             adb.AdbProxy()._exec_cmd(
                 ['fake_cmd'], shell=False, timeout=None, stderr=None)
         self.assertFalse(context.exception.serial)
+        mock_run_command.assert_called_with(
+            ['fake_cmd'], shell=False, timeout=None)
 
-    @mock.patch('mobly.controllers.android_device_lib.adb.subprocess.Popen')
-    @mock.patch('mobly.controllers.android_device_lib.adb.psutil.Process')
-    def test_exec_cmd_with_timeout_success(self, mock_psutil_process,
-                                           mock_popen):
-        self._mock_process(mock_psutil_process, mock_popen)
+    @mock.patch('mobly.utils.run_command')
+    def test_exec_cmd_with_timeout_success(self, mock_run_command):
+        mock_run_command.return_value = (0,
+                                         MOCK_DEFAULT_STDOUT.encode('utf-8'),
+                                         MOCK_DEFAULT_STDERR.encode('utf-8'))
 
         out = adb.AdbProxy()._exec_cmd(
             ['fake_cmd'], shell=False, timeout=1, stderr=None)
         self.assertEqual(MOCK_DEFAULT_STDOUT, out.decode('utf-8'))
+        mock_run_command.assert_called_with(
+            ['fake_cmd'], shell=False, timeout=1)
 
-    @mock.patch('mobly.controllers.android_device_lib.adb.subprocess.Popen')
-    @mock.patch('mobly.controllers.android_device_lib.adb.psutil.Process')
-    def test_exec_cmd_timed_out(self, mock_psutil_process, mock_popen):
-        self._mock_process(mock_psutil_process, mock_popen)
-        mock_psutil_process.return_value.wait.side_effect = (
-            adb.psutil.TimeoutExpired('Timed out'))
+    @mock.patch('mobly.utils.run_command')
+    def test_exec_cmd_timed_out(self, mock_run_command):
+        mock_run_command.side_effect = adb.psutil.TimeoutExpired('Timed out')
         mock_serial = '1234Abcd'
         with self.assertRaisesRegex(
                 adb.AdbTimeoutError, 'Timed out executing command "adb -s '
@@ -139,23 +140,15 @@
         self.assertEqual(context.exception.serial, mock_serial)
         self.assertIn(mock_serial, context.exception.cmd)
 
-    @mock.patch('mobly.controllers.android_device_lib.adb.subprocess.Popen')
-    @mock.patch('mobly.controllers.android_device_lib.adb.psutil.Process')
-    def test_exec_cmd_timed_out_without_serial(self, mock_psutil_process,
-                                               mock_popen):
-        self._mock_process(mock_psutil_process, mock_popen)
-        mock_psutil_process.return_value.wait.side_effect = (
-            adb.psutil.TimeoutExpired('Timed out'))
+    @mock.patch('mobly.utils.run_command')
+    def test_exec_cmd_timed_out_without_serial(self, mock_run_command):
+        mock_run_command.side_effect = adb.psutil.TimeoutExpired('Timed out')
         with self.assertRaisesRegex(adb.AdbTimeoutError,
                                     'Timed out executing command "adb '
                                     'fake-cmd" after 0.01s.') as context:
             adb.AdbProxy().fake_cmd(timeout=0.01)
 
-    @mock.patch('mobly.controllers.android_device_lib.adb.subprocess.Popen')
-    @mock.patch('mobly.controllers.android_device_lib.adb.psutil.Process')
-    def test_exec_cmd_with_negative_timeout_value(self, mock_psutil_process,
-                                                  mock_popen):
-        self._mock_process(mock_psutil_process, mock_popen)
+    def test_exec_cmd_with_negative_timeout_value(self):
         with self.assertRaisesRegex(ValueError,
                                     'Timeout is not a positive value: -1'):
             adb.AdbProxy()._exec_cmd(
@@ -181,8 +174,8 @@
         self._mock_execute_and_process_stdout_process(mock_popen)
         mock_handler = mock.MagicMock()
         mock_popen.return_value.communicate = mock.Mock(
-            return_value=(unexpected_stdout,
-                          MOCK_DEFAULT_STDERR.encode('utf-8')))
+            return_value=(unexpected_stdout, MOCK_DEFAULT_STDERR.encode(
+                'utf-8')))
 
         err = adb.AdbProxy()._execute_and_process_stdout(
             ['fake_cmd'], shell=False, handler=mock_handler)
@@ -439,11 +432,11 @@
                 mock_execute_and_process_stdout.assert_called_once_with(
                     mock_adb_cmd, shell=False, handler=mock_handler)
 
-    @mock.patch('mobly.controllers.android_device_lib.adb.subprocess.Popen')
-    @mock.patch('mobly.controllers.android_device_lib.adb.psutil.Process')
-    def test_exec_adb_cmd_with_stderr_pipe(self, mock_psutil_process,
-                                           mock_popen):
-        self._mock_process(mock_psutil_process, mock_popen)
+    @mock.patch('mobly.utils.run_command')
+    def test_exec_adb_cmd_with_stderr_pipe(self, mock_run_command):
+        mock_run_command.return_value = (0,
+                                         MOCK_DEFAULT_STDOUT.encode('utf-8'),
+                                         MOCK_DEFAULT_STDERR.encode('utf-8'))
         stderr_redirect = io.BytesIO()
         out = adb.AdbProxy().shell(
             'arg1 arg2', shell=True, stderr=stderr_redirect)
diff --git a/tests/mobly/utils_test.py b/tests/mobly/utils_test.py
index b9b0c22..5217a96 100755
--- a/tests/mobly/utils_test.py
+++ b/tests/mobly/utils_test.py
@@ -37,15 +37,79 @@
 
     def setUp(self):
         system = platform.system()
-        self.sleep_cmd = 'timeout' if system == 'Windows' else 'sleep'
         self.tmp_dir = tempfile.mkdtemp()
 
     def tearDown(self):
         shutil.rmtree(self.tmp_dir)
 
+    def sleep_cmd(self, wait_millis):
+        if platform.system() == 'Windows':
+            return ['ping', 'localhost', '-n', '1', '-w', str(wait_millis)]
+        else:
+            return ['sleep', str(wait_millis / 1000.0)]
+
+    def test_run_command(self):
+        (ret, out, err) = utils.run_command(self.sleep_cmd(10))
+        self.assertEqual(ret, 0)
+
+    def test_run_command_with_timeout(self):
+        (ret, out, err) = utils.run_command(self.sleep_cmd(10), timeout=4)
+        self.assertEqual(ret, 0)
+
+    def test_run_command_with_timeout_expired(self):
+        with self.assertRaises(psutil.TimeoutExpired):
+            _ = utils.run_command(self.sleep_cmd(4000), timeout=0.01)
+
+    @mock.patch('threading.Timer')
+    @mock.patch('psutil.Popen')
+    def test_run_command_with_default_params(self, mock_Popen, mock_Timer):
+        mock_command = mock.MagicMock(spec=dict)
+        mock_proc = mock_Popen.return_value
+        mock_proc.communicate.return_value = ('fake_out', 'fake_err')
+        mock_proc.returncode = 0
+        out = utils.run_command(mock_command)
+        self.assertEqual(out, (0, 'fake_out', 'fake_err'))
+        mock_Popen.assert_called_with(
+            mock_command,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE,
+            shell=False,
+            env=None,
+        )
+        mock_Timer.assert_not_called()
+
+    @mock.patch('threading.Timer')
+    @mock.patch('psutil.Popen')
+    def test_run_command_with_custom_params(self, mock_Popen, mock_Timer):
+        mock_command = mock.MagicMock(spec=dict)
+        mock_stdout = mock.MagicMock(spec=int)
+        mock_stderr = mock.MagicMock(spec=int)
+        mock_shell = mock.MagicMock(spec=bool)
+        mock_timeout = 1234
+        mock_env = mock.MagicMock(spec=dict)
+        mock_proc = mock_Popen.return_value
+        mock_proc.communicate.return_value = ('fake_out', 'fake_err')
+        mock_proc.returncode = 127
+        out = utils.run_command(
+            mock_command,
+            stdout=mock_stdout,
+            stderr=mock_stderr,
+            shell=mock_shell,
+            timeout=mock_timeout,
+            env=mock_env)
+        self.assertEqual(out, (127, 'fake_out', 'fake_err'))
+        mock_Popen.assert_called_with(
+            mock_command,
+            stdout=mock_stdout,
+            stderr=mock_stderr,
+            shell=mock_shell,
+            env=mock_env,
+        )
+        mock_Timer.assert_called_with(1234, mock.ANY)
+
     def test_start_standing_subproc(self):
         try:
-            p = utils.start_standing_subprocess([self.sleep_cmd, '0.1'])
+            p = utils.start_standing_subprocess(self.sleep_cmd(10))
             p1 = psutil.Process(p.pid)
             self.assertTrue(p1.is_running())
         finally:
@@ -55,9 +119,9 @@
 
     @mock.patch('subprocess.Popen')
     def test_start_standing_subproc_without_env(self, mock_Popen):
-        p = utils.start_standing_subprocess(self.sleep_cmd)
+        p = utils.start_standing_subprocess(self.sleep_cmd(10))
         mock_Popen.assert_called_with(
-            self.sleep_cmd,
+            self.sleep_cmd(10),
             stdin=subprocess.PIPE,
             stdout=subprocess.PIPE,
             stderr=subprocess.PIPE,
@@ -68,9 +132,9 @@
     @mock.patch('subprocess.Popen')
     def test_start_standing_subproc_with_custom_env(self, mock_Popen):
         mock_env = mock.MagicMock(spec=dict)
-        p = utils.start_standing_subprocess(self.sleep_cmd, env=mock_env)
+        p = utils.start_standing_subprocess(self.sleep_cmd(10), env=mock_env)
         mock_Popen.assert_called_with(
-            self.sleep_cmd,
+            self.sleep_cmd(10),
             stdin=subprocess.PIPE,
             stdout=subprocess.PIPE,
             stderr=subprocess.PIPE,
@@ -79,13 +143,13 @@
         )
 
     def test_stop_standing_subproc(self):
-        p = utils.start_standing_subprocess([self.sleep_cmd, '4'])
+        p = utils.start_standing_subprocess(self.sleep_cmd(4000))
         p1 = psutil.Process(p.pid)
         utils.stop_standing_subprocess(p)
         self.assertFalse(p1.is_running())
 
     def test_stop_standing_subproc_wihtout_pipe(self):
-        p = subprocess.Popen([self.sleep_cmd, '4'])
+        p = subprocess.Popen(self.sleep_cmd(4000))
         self.assertIsNone(p.stdout)
         p1 = psutil.Process(p.pid)
         utils.stop_standing_subprocess(p)