| |
| import test_appliance, sys, StringIO |
| |
| from yaml import * |
| import yaml |
| |
| class TestEmitterOnCanonical(test_appliance.TestAppliance): |
| |
| def _testEmitterOnCanonical(self, test_name, canonical_filename): |
| events = list(iter(Parser(Scanner(Reader(file(canonical_filename, 'rb')))))) |
| #writer = sys.stdout |
| writer = StringIO.StringIO() |
| emitter = Emitter(writer) |
| #print "-"*30 |
| #print "ORIGINAL DATA:" |
| #print file(canonical_filename, 'rb').read() |
| for event in events: |
| emitter.emit(event) |
| data = writer.getvalue() |
| new_events = list(parse(data)) |
| self.failUnlessEqual(len(events), len(new_events)) |
| for event, new_event in zip(events, new_events): |
| self.failUnlessEqual(event.__class__, new_event.__class__) |
| |
| TestEmitterOnCanonical.add_tests('testEmitterOnCanonical', '.canonical') |
| |
| class EventsConstructor(Constructor): |
| |
| def construct_event(self, node): |
| if isinstance(node, ScalarNode): |
| mapping = {} |
| else: |
| mapping = self.construct_mapping(node) |
| class_name = str(node.tag[1:])+'Event' |
| if class_name in ['AliasEvent', 'ScalarEvent', 'SequenceEvent', 'MappingEvent']: |
| mapping.setdefault('anchor', None) |
| if class_name in ['ScalarEvent', 'SequenceEvent', 'MappingEvent']: |
| mapping.setdefault('tag', None) |
| if class_name == 'ScalarEvent': |
| mapping.setdefault('value', '') |
| value = getattr(yaml, class_name)(**mapping) |
| return value |
| |
| EventsConstructor.add_constructor(None, EventsConstructor.construct_event) |
| |
| class TestEmitter(test_appliance.TestAppliance): |
| |
| def _testEmitter(self, test_name, events_filename): |
| events = load_document(file(events_filename, 'rb'), Constructor=EventsConstructor) |
| self._dump(events_filename, events) |
| writer = StringIO.StringIO() |
| emitter = Emitter(writer) |
| for event in events: |
| emitter.emit(event) |
| data = writer.getvalue() |
| new_events = list(parse(data)) |
| self.failUnlessEqual(len(events), len(new_events)) |
| for event, new_event in zip(events, new_events): |
| self.failUnlessEqual(event.__class__, new_event.__class__) |
| |
| def _dump(self, events_filename, events): |
| writer = sys.stdout |
| emitter = Emitter(writer) |
| print "="*30 |
| print "EVENTS:" |
| print file(events_filename, 'rb').read() |
| print '-'*30 |
| print "OUTPUT:" |
| for event in events: |
| emitter.emit(event) |
| |
| TestEmitter.add_tests('testEmitter', '.events') |
| |