blob: 0d33081f424ebbd4b3cf5a415b6244d5f6824f56 [file] [log] [blame]
#!/usr/bin/env python3.8
# Copyright 2019 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import json
import subprocess
from io import StringIO
import test_env
from lib.process import Process
class FakeProcess(Process):
"""A fake for process creation and execution.
Instead of actually running subprocesses, this class just records
commands. Other fakes can additionally add canned responses.
"""
def __init__(self, host, args, **kwargs):
self._host = host
self._popen = FakeProcess.Popen(host)
# Special case to emulate JSON output for `ffx` commands. We can't just
# do type-based detection because of the empty-output case.
if ('--machine', 'json') in zip(args[:-1], args[1:]):
self._popen._output_json = True
super(FakeProcess, self).__init__(args)
@property
def duration(self):
return self._popen._duration
@duration.setter
def duration(self, duration):
self._popen._duration = float(duration)
@property
def succeeds(self):
return self._popen._succeeds
@succeeds.setter
def succeeds(self, succeeds):
self._popen._succeeds = succeeds
@property
def inputs(self):
return self._popen._stdin.getvalue().split('\n')
def schedule(self, output, returncode=None, start=None, end=None):
"""Sets the output and/or error to be returned later.
Between the start and end times, the output will appear in the process's
stdout, and any specified returncode will override the default return
code. If start is None, it behaves as if start is now. If end
is None, the output is not removed once it is added.
"""
if not start:
start = self._host.elapsed
self._popen._outputs.append((output, returncode, start, end))
def clear(self):
self._popen._outputs = []
def popen(self):
self._popen._start()
return self._popen
class Popen(object):
"""Fakes subprocess.Popen for FakeProcess.
Unlike a real subprocess.Popen, this object always buffers stdio.
"""
def __init__(self, host):
self._host = host
self._duration = 0.0
self._completion = None
self._succeeds = True
self._returncode = None
self._outputs = []
self._output_json = False
self._stdin = StringIO()
self._stdout = StringIO()
self._stderr = StringIO()
@property
def stdin(self):
return self._stdin
@property
def stdout(self):
return self._stdout
@property
def stderr(self):
return self._stderr
@property
def returncode(self):
return self._returncode
def _start(self):
assert self._completion is None, 'popen() called twice'
self._completion = self._host.elapsed + self._duration
self._returncode = None
self._stdin.truncate(0)
self._stdin.seek(0)
def communicate(self, inputs=''):
"""Like subprocess.Popen.communicate().
In particular, writes bytes from inputs to stdin, and returns a
tuple of stdout and stderr.
"""
self._stdin.write(str(inputs))
self.wait()
return (self._stdout.getvalue(), self._stderr.getvalue())
def poll(self):
"""Like subprocess.Popen.poll().
This method will update the object's stdout and stderr according to
the schedule and the elapsed time. It will set the returncode once
its specified duration has elapsed, using the most recently-added
return code set for the current time range, falling back to the
`succeeds` property if none was specified.
"""
now = self._host.elapsed
if self.returncode is None and self._completion <= now:
self._stdout.truncate(0)
self._stdout.seek(0)
current_outputs = []
for output, returncode, start, end in self._outputs:
if now < start:
continue
if end and end <= now:
continue
self._returncode = returncode
current_outputs.append(output)
if self._output_json:
self._stdout.write(json.dumps(current_outputs))
else:
self._stdout.write('\n'.join(current_outputs))
self._stdout.write('\n')
self._stdout.flush()
# If no return code was explicitly specified for this time
# period, fall back to the `succeeds` setting
if self._returncode is None:
self._returncode = 0 if self._succeeds else 1
self._completion = None
return self.returncode
def wait(self):
if self._completion is not None:
self._host.sleep(self._completion - self._host.elapsed)
return self.poll()
def kill(self):
self._completion = None