|  | #!/usr/bin/env python3.8 | 
|  | # Copyright 2019 The Fuchsia Authors. All rights reserved. | 
|  | # Use of this source code is governed by a BSD-style license that can be | 
|  | # found in the LICENSE file. | 
|  |  | 
|  | import json | 
|  | import subprocess | 
|  | from io import StringIO | 
|  |  | 
|  | import test_env | 
|  | from lib.process import Process | 
|  |  | 
|  |  | 
|  | class FakeProcess(Process): | 
|  | """A fake for process creation and execution. | 
|  |  | 
|  | Instead of actually running subprocesses, this class just records | 
|  | commands. Other fakes can additionally add canned responses. | 
|  | """ | 
|  |  | 
|  | def __init__(self, host, args, **kwargs): | 
|  | self._host = host | 
|  | self._popen = FakeProcess.Popen(host) | 
|  |  | 
|  | # Special case to emulate JSON output for `ffx` commands. We can't just | 
|  | # do type-based detection because of the empty-output case. | 
|  | if ('--machine', 'json') in zip(args[:-1], args[1:]): | 
|  | self._popen._output_json = True | 
|  |  | 
|  | super(FakeProcess, self).__init__(args) | 
|  |  | 
|  | @property | 
|  | def duration(self): | 
|  | return self._popen._duration | 
|  |  | 
|  | @duration.setter | 
|  | def duration(self, duration): | 
|  | self._popen._duration = float(duration) | 
|  |  | 
|  | @property | 
|  | def succeeds(self): | 
|  | return self._popen._succeeds | 
|  |  | 
|  | @succeeds.setter | 
|  | def succeeds(self, succeeds): | 
|  | self._popen._succeeds = succeeds | 
|  |  | 
|  | @property | 
|  | def inputs(self): | 
|  | return self._popen._stdin.getvalue().split('\n') | 
|  |  | 
|  | def schedule(self, output, returncode=None, start=None, end=None): | 
|  | """Sets the output and/or error to be returned later. | 
|  |  | 
|  | Between the start and end times, the output will appear in the process's | 
|  | stdout, and any specified returncode will override the default return | 
|  | code. If start is None, it behaves as if start is now. If end | 
|  | is None, the output is not removed once it is added. | 
|  | """ | 
|  | if not start: | 
|  | start = self._host.elapsed | 
|  | self._popen._outputs.append((output, returncode, start, end)) | 
|  |  | 
|  | def clear(self): | 
|  | self._popen._outputs = [] | 
|  |  | 
|  | def popen(self): | 
|  | self._popen._start() | 
|  | return self._popen | 
|  |  | 
|  | class Popen(object): | 
|  | """Fakes subprocess.Popen for FakeProcess. | 
|  |  | 
|  | Unlike a real subprocess.Popen, this object always buffers stdio. | 
|  | """ | 
|  |  | 
|  | def __init__(self, host): | 
|  | self._host = host | 
|  | self._duration = 0.0 | 
|  | self._completion = None | 
|  | self._succeeds = True | 
|  | self._returncode = None | 
|  |  | 
|  | self._outputs = [] | 
|  | self._output_json = False | 
|  | self._stdin = StringIO() | 
|  | self._stdout = StringIO() | 
|  | self._stderr = StringIO() | 
|  |  | 
|  | @property | 
|  | def stdin(self): | 
|  | return self._stdin | 
|  |  | 
|  | @property | 
|  | def stdout(self): | 
|  | return self._stdout | 
|  |  | 
|  | @property | 
|  | def stderr(self): | 
|  | return self._stderr | 
|  |  | 
|  | @property | 
|  | def returncode(self): | 
|  | return self._returncode | 
|  |  | 
|  | def _start(self): | 
|  | assert self._completion is None, 'popen() called twice' | 
|  | self._completion = self._host.elapsed + self._duration | 
|  | self._returncode = None | 
|  | self._stdin.truncate(0) | 
|  | self._stdin.seek(0) | 
|  |  | 
|  | def communicate(self, inputs=''): | 
|  | """Like subprocess.Popen.communicate(). | 
|  |  | 
|  | In particular, writes bytes from inputs to stdin, and returns a | 
|  | tuple of stdout and stderr. | 
|  | """ | 
|  | self._stdin.write(str(inputs)) | 
|  | self.wait() | 
|  | return (self._stdout.getvalue(), self._stderr.getvalue()) | 
|  |  | 
|  | def poll(self): | 
|  | """Like subprocess.Popen.poll(). | 
|  |  | 
|  | This method will update the object's stdout and stderr according to | 
|  | the schedule and the elapsed time. It will set the returncode once | 
|  | its specified duration has elapsed, using the most recently-added | 
|  | return code set for the current time range, falling back to the | 
|  | `succeeds` property if none was specified. | 
|  | """ | 
|  |  | 
|  | now = self._host.elapsed | 
|  | if self.returncode is None and self._completion <= now: | 
|  | self._stdout.truncate(0) | 
|  | self._stdout.seek(0) | 
|  | current_outputs = [] | 
|  | for output, returncode, start, end in self._outputs: | 
|  | if now < start: | 
|  | continue | 
|  | if end and end <= now: | 
|  | continue | 
|  | self._returncode = returncode | 
|  | current_outputs.append(output) | 
|  |  | 
|  | if self._output_json: | 
|  | self._stdout.write(json.dumps(current_outputs)) | 
|  | else: | 
|  | self._stdout.write('\n'.join(current_outputs)) | 
|  | self._stdout.write('\n') | 
|  | self._stdout.flush() | 
|  | # If no return code was explicitly specified for this time | 
|  | # period, fall back to the `succeeds` setting | 
|  | if self._returncode is None: | 
|  | self._returncode = 0 if self._succeeds else 1 | 
|  | self._completion = None | 
|  | return self.returncode | 
|  |  | 
|  | def wait(self): | 
|  | if self._completion is not None: | 
|  | self._host.sleep(self._completion - self._host.elapsed) | 
|  | return self.poll() | 
|  |  | 
|  | def kill(self): | 
|  | self._completion = None |