blob: dfe261829362f8fda3c2d2934690d20df5ae0878 [file] [log] [blame]
import yaml._yaml, yaml
import types, pprint, tempfile, sys, os
yaml.PyBaseLoader = yaml.BaseLoader
yaml.PySafeLoader = yaml.SafeLoader
yaml.PyLoader = yaml.Loader
yaml.PyBaseDumper = yaml.BaseDumper
yaml.PySafeDumper = yaml.SafeDumper
yaml.PyDumper = yaml.Dumper
old_scan = yaml.scan
def new_scan(stream, Loader=yaml.CLoader):
return old_scan(stream, Loader)
old_parse = yaml.parse
def new_parse(stream, Loader=yaml.CLoader):
return old_parse(stream, Loader)
old_compose = yaml.compose
def new_compose(stream, Loader=yaml.CLoader):
return old_compose(stream, Loader)
old_compose_all = yaml.compose_all
def new_compose_all(stream, Loader=yaml.CLoader):
return old_compose_all(stream, Loader)
old_load = yaml.load
def new_load(stream, Loader=yaml.CLoader):
return old_load(stream, Loader)
old_load_all = yaml.load_all
def new_load_all(stream, Loader=yaml.CLoader):
return old_load_all(stream, Loader)
old_safe_load = yaml.safe_load
def new_safe_load(stream):
return old_load(stream, yaml.CSafeLoader)
old_safe_load_all = yaml.safe_load_all
def new_safe_load_all(stream):
return old_load_all(stream, yaml.CSafeLoader)
old_emit = yaml.emit
def new_emit(events, stream=None, Dumper=yaml.CDumper, **kwds):
return old_emit(events, stream, Dumper, **kwds)
old_serialize = yaml.serialize
def new_serialize(node, stream, Dumper=yaml.CDumper, **kwds):
return old_serialize(node, stream, Dumper, **kwds)
old_serialize_all = yaml.serialize_all
def new_serialize_all(nodes, stream=None, Dumper=yaml.CDumper, **kwds):
return old_serialize_all(nodes, stream, Dumper, **kwds)
old_dump = yaml.dump
def new_dump(data, stream=None, Dumper=yaml.CDumper, **kwds):
return old_dump(data, stream, Dumper, **kwds)
old_dump_all = yaml.dump_all
def new_dump_all(documents, stream=None, Dumper=yaml.CDumper, **kwds):
return old_dump_all(documents, stream, Dumper, **kwds)
old_safe_dump = yaml.safe_dump
def new_safe_dump(data, stream=None, **kwds):
return old_dump(data, stream, yaml.CSafeDumper, **kwds)
old_safe_dump_all = yaml.safe_dump_all
def new_safe_dump_all(documents, stream=None, **kwds):
return old_dump_all(documents, stream, yaml.CSafeDumper, **kwds)
def _set_up():
yaml.BaseLoader = yaml.CBaseLoader
yaml.SafeLoader = yaml.CSafeLoader
yaml.Loader = yaml.CLoader
yaml.BaseDumper = yaml.CBaseDumper
yaml.SafeDumper = yaml.CSafeDumper
yaml.Dumper = yaml.CDumper
yaml.scan = new_scan
yaml.parse = new_parse
yaml.compose = new_compose
yaml.compose_all = new_compose_all
yaml.load = new_load
yaml.load_all = new_load_all
yaml.safe_load = new_safe_load
yaml.safe_load_all = new_safe_load_all
yaml.emit = new_emit
yaml.serialize = new_serialize
yaml.serialize_all = new_serialize_all
yaml.dump = new_dump
yaml.dump_all = new_dump_all
yaml.safe_dump = new_safe_dump
yaml.safe_dump_all = new_safe_dump_all
def _tear_down():
yaml.BaseLoader = yaml.PyBaseLoader
yaml.SafeLoader = yaml.PySafeLoader
yaml.Loader = yaml.PyLoader
yaml.BaseDumper = yaml.PyBaseDumper
yaml.SafeDumper = yaml.PySafeDumper
yaml.Dumper = yaml.PyDumper
yaml.scan = old_scan
yaml.parse = old_parse
yaml.compose = old_compose
yaml.compose_all = old_compose_all
yaml.load = old_load
yaml.load_all = old_load_all
yaml.safe_load = old_safe_load
yaml.safe_load_all = old_safe_load_all
yaml.emit = old_emit
yaml.serialize = old_serialize
yaml.serialize_all = old_serialize_all
yaml.dump = old_dump
yaml.dump_all = old_dump_all
yaml.safe_dump = old_safe_dump
yaml.safe_dump_all = old_safe_dump_all
def test_c_version(verbose=False):
if verbose:
print yaml._yaml.get_version()
print yaml._yaml.get_version_string()
assert ("%s.%s.%s" % yaml._yaml.get_version()) == yaml._yaml.get_version_string(), \
(_yaml.get_version(), yaml._yaml.get_version_string())
def test_deprecate_yaml_module():
import _yaml
assert _yaml.__package__ == ''
assert isinstance(_yaml.get_version(), str)
def _compare_scanners(py_data, c_data, verbose):
py_tokens = list(yaml.scan(py_data, Loader=yaml.PyLoader))
c_tokens = []
try:
for token in yaml.scan(c_data, Loader=yaml.CLoader):
c_tokens.append(token)
assert len(py_tokens) == len(c_tokens), (len(py_tokens), len(c_tokens))
for py_token, c_token in zip(py_tokens, c_tokens):
assert py_token.__class__ == c_token.__class__, (py_token, c_token)
if hasattr(py_token, 'value'):
assert py_token.value == c_token.value, (py_token, c_token)
if isinstance(py_token, yaml.StreamEndToken):
continue
py_start = (py_token.start_mark.index, py_token.start_mark.line, py_token.start_mark.column)
py_end = (py_token.end_mark.index, py_token.end_mark.line, py_token.end_mark.column)
c_start = (c_token.start_mark.index, c_token.start_mark.line, c_token.start_mark.column)
c_end = (c_token.end_mark.index, c_token.end_mark.line, c_token.end_mark.column)
assert py_start == c_start, (py_start, c_start)
assert py_end == c_end, (py_end, c_end)
finally:
if verbose:
print "PY_TOKENS:"
pprint.pprint(py_tokens)
print "C_TOKENS:"
pprint.pprint(c_tokens)
def test_c_scanner(data_filename, canonical_filename, verbose=False):
_compare_scanners(open(data_filename, 'rb'),
open(data_filename, 'rb'), verbose)
_compare_scanners(open(data_filename, 'rb').read(),
open(data_filename, 'rb').read(), verbose)
_compare_scanners(open(canonical_filename, 'rb'),
open(canonical_filename, 'rb'), verbose)
_compare_scanners(open(canonical_filename, 'rb').read(),
open(canonical_filename, 'rb').read(), verbose)
test_c_scanner.unittest = ['.data', '.canonical']
test_c_scanner.skip = ['.skip-ext']
def _compare_parsers(py_data, c_data, verbose):
py_events = list(yaml.parse(py_data, Loader=yaml.PyLoader))
c_events = []
try:
for event in yaml.parse(c_data, Loader=yaml.CLoader):
c_events.append(event)
assert len(py_events) == len(c_events), (len(py_events), len(c_events))
for py_event, c_event in zip(py_events, c_events):
for attribute in ['__class__', 'anchor', 'tag', 'implicit',
'value', 'explicit', 'version', 'tags']:
py_value = getattr(py_event, attribute, None)
c_value = getattr(c_event, attribute, None)
assert py_value == c_value, (py_event, c_event, attribute)
finally:
if verbose:
print "PY_EVENTS:"
pprint.pprint(py_events)
print "C_EVENTS:"
pprint.pprint(c_events)
def test_c_parser(data_filename, canonical_filename, verbose=False):
_compare_parsers(open(data_filename, 'rb'),
open(data_filename, 'rb'), verbose)
_compare_parsers(open(data_filename, 'rb').read(),
open(data_filename, 'rb').read(), verbose)
_compare_parsers(open(canonical_filename, 'rb'),
open(canonical_filename, 'rb'), verbose)
_compare_parsers(open(canonical_filename, 'rb').read(),
open(canonical_filename, 'rb').read(), verbose)
test_c_parser.unittest = ['.data', '.canonical']
test_c_parser.skip = ['.skip-ext']
def _compare_emitters(data, verbose):
events = list(yaml.parse(data, Loader=yaml.PyLoader))
c_data = yaml.emit(events, Dumper=yaml.CDumper)
if verbose:
print c_data
py_events = list(yaml.parse(c_data, Loader=yaml.PyLoader))
c_events = list(yaml.parse(c_data, Loader=yaml.CLoader))
try:
assert len(events) == len(py_events), (len(events), len(py_events))
assert len(events) == len(c_events), (len(events), len(c_events))
for event, py_event, c_event in zip(events, py_events, c_events):
for attribute in ['__class__', 'anchor', 'tag', 'implicit',
'value', 'explicit', 'version', 'tags']:
value = getattr(event, attribute, None)
py_value = getattr(py_event, attribute, None)
c_value = getattr(c_event, attribute, None)
if attribute == 'tag' and value in [None, u'!'] \
and py_value in [None, u'!'] and c_value in [None, u'!']:
continue
if attribute == 'explicit' and (py_value or c_value):
continue
assert value == py_value, (event, py_event, attribute)
assert value == c_value, (event, c_event, attribute)
finally:
if verbose:
print "EVENTS:"
pprint.pprint(events)
print "PY_EVENTS:"
pprint.pprint(py_events)
print "C_EVENTS:"
pprint.pprint(c_events)
def test_c_emitter(data_filename, canonical_filename, verbose=False):
_compare_emitters(open(data_filename, 'rb').read(), verbose)
_compare_emitters(open(canonical_filename, 'rb').read(), verbose)
test_c_emitter.unittest = ['.data', '.canonical']
test_c_emitter.skip = ['.skip-ext']
def test_large_file(verbose=False):
SIZE_LINE = 24
SIZE_ITERATION = 0
SIZE_FILE = 31
if sys.maxsize <= 2**32:
return
if os.environ.get('PYYAML_TEST_GROUP', '') != 'all':
return
with tempfile.TemporaryFile() as temp_file:
for i in range(2**(SIZE_FILE-SIZE_ITERATION-SIZE_LINE) + 1):
temp_file.write(('-' + (' ' * (2**SIZE_LINE-4))+ '{}\n')*(2**SIZE_ITERATION))
temp_file.seek(0)
yaml.load(temp_file, Loader=yaml.CLoader)
test_large_file.unittest = None
def wrap_ext_function(function):
def wrapper(*args, **kwds):
_set_up()
try:
function(*args, **kwds)
finally:
_tear_down()
try:
wrapper.func_name = '%s_ext' % function.func_name
except TypeError:
pass
wrapper.unittest_name = '%s_ext' % function.func_name
wrapper.unittest = function.unittest
wrapper.skip = getattr(function, 'skip', [])+['.skip-ext']
return wrapper
def wrap_ext(collections):
functions = []
if not isinstance(collections, list):
collections = [collections]
for collection in collections:
if not isinstance(collection, dict):
collection = vars(collection)
keys = collection.keys()
keys.sort()
for key in keys:
value = collection[key]
if isinstance(value, types.FunctionType) and hasattr(value, 'unittest'):
functions.append(wrap_ext_function(value))
for function in functions:
assert function.unittest_name not in globals()
globals()[function.unittest_name] = function
import test_tokens, test_structure, test_errors, test_resolver, test_constructor, \
test_emitter, test_representer, test_recursive, test_input_output
wrap_ext([test_tokens, test_structure, test_errors, test_resolver, test_constructor,
test_emitter, test_representer, test_recursive, test_input_output])
if __name__ == '__main__':
import test_appliance
test_appliance.run(globals())