blob: efd41dd25ae08731dd3d21daefa6b5de6031192c [file] [log] [blame]
#!/usr/bin/env python
# coding=utf-8
# Copyright (c) 2015 Remko Tronçon (https://el-tramo.be)
# Copied from https://github.com/remko/pycotap/
#
# Released under the MIT license
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import unittest
import sys
import base64
if sys.hexversion >= 0x03000000:
from io import StringIO
else:
from StringIO import StringIO
# Log modes
class LogMode(object) :
LogToError, LogToDiagnostics, LogToYAML, LogToAttachment = range(4)
class TAPTestResult(unittest.TestResult):
def __init__(self, output_stream, error_stream, message_log, test_output_log):
super(TAPTestResult, self).__init__(self, output_stream)
self.output_stream = output_stream
self.error_stream = error_stream
self.orig_stdout = None
self.orig_stderr = None
self.message = None
self.test_output = None
self.message_log = message_log
self.test_output_log = test_output_log
self.output_stream.write("TAP version 13\n")
self._set_streams()
def printErrors(self):
self.print_raw("1..%d\n" % self.testsRun)
self._reset_streams()
def _set_streams(self):
self.orig_stdout = sys.stdout
self.orig_stderr = sys.stderr
if self.message_log == LogMode.LogToError:
self.message = self.error_stream
else:
self.message = StringIO()
if self.test_output_log == LogMode.LogToError:
self.test_output = self.error_stream
else:
self.test_output = StringIO()
if self.message_log == self.test_output_log:
self.test_output = self.message
sys.stdout = sys.stderr = self.test_output
def _reset_streams(self):
sys.stdout = self.orig_stdout
sys.stderr = self.orig_stderr
def print_raw(self, text):
self.output_stream.write(text)
self.output_stream.flush()
def print_result(self, result, test, directive = None):
self.output_stream.write("%s %d %s" % (result, self.testsRun, test.id()))
if directive:
self.output_stream.write(" # " + directive)
self.output_stream.write("\n")
self.output_stream.flush()
def ok(self, test, directive = None):
self.print_result("ok", test, directive)
def not_ok(self, test):
self.print_result("not ok", test)
def startTest(self, test):
super(TAPTestResult, self).startTest(test)
def stopTest(self, test):
super(TAPTestResult, self).stopTest(test)
if self.message_log == self.test_output_log:
logs = [(self.message_log, self.message, "output")]
else:
logs = [
(self.test_output_log, self.test_output, "test_output"),
(self.message_log, self.message, "message")
]
for log_mode, log, log_name in logs:
if log_mode != LogMode.LogToError:
output = log.getvalue()
if len(output):
if log_mode == LogMode.LogToYAML:
self.print_raw(" ---\n")
self.print_raw(" " + log_name + ": |\n")
self.print_raw(" " + output.rstrip().replace("\n", "\n ") + "\n")
self.print_raw(" ...\n")
elif log_mode == LogMode.LogToAttachment:
self.print_raw(" ---\n")
self.print_raw(" " + log_name + ":\n")
self.print_raw(" File-Name: " + log_name + ".txt\n")
self.print_raw(" File-Type: text/plain\n")
self.print_raw(" File-Content: " + base64.b64encode(output) + "\n")
self.print_raw(" ...\n")
else:
self.print_raw("# " + output.rstrip().replace("\n", "\n# ") + "\n")
log.truncate(0)
def addSuccess(self, test):
super(TAPTestResult, self).addSuccess(test)
self.ok(test)
def addError(self, test, err):
super(TAPTestResult, self).addError(test, err)
self.message.write(self.errors[-1][1] + "\n")
self.not_ok(test)
def addFailure(self, test, err):
super(TAPTestResult, self).addFailure(test, err)
self.message.write(self.failures[-1][1] + "\n")
self.not_ok(test)
def addSkip(self, test, reason):
super(TAPTestResult, self).addSkip(test, reason)
self.ok(test, "SKIP " + reason)
def addExpectedFailure(self, test, err):
super(TAPTestResult, self).addExpectedFailure(test, err)
self.ok(test)
def addUnexpectedSuccess(self, test):
super(TAPTestResult, self).addUnexpectedSuccess(test)
self.message.write("Unexpected success" + "\n")
self.not_ok(test)
class TAPTestRunner(object):
def __init__(self,
message_log = LogMode.LogToYAML,
test_output_log = LogMode.LogToDiagnostics,
output_stream = sys.stdout, error_stream = sys.stderr):
self.output_stream = output_stream
self.error_stream = error_stream
self.message_log = message_log
self.test_output_log = test_output_log
def run(self, test):
result = TAPTestResult(
self.output_stream,
self.error_stream,
self.message_log,
self.test_output_log)
test(result)
result.printErrors()
return result