blob: 0f0006261c26225a8e766f3b1ca4605f10f61ac2 [file] [log] [blame]
__all__ = ['Composer', 'ComposerError']
from error import MarkedYAMLError
from events import *
from nodes import *
class ComposerError(MarkedYAMLError):
pass
class Composer:
def __init__(self, parser):
self.parser = parser
self.all_anchors = {}
self.complete_anchors = {}
def check(self):
# If there are more documents available?
return not self.parser.check(StreamEndEvent)
def get(self):
# Get the root node of the next document.
if not self.parser.check(StreamEndEvent):
return self.compose_document()
def __iter__(self):
# Iterator protocol.
while not self.parser.check(StreamEndEvent):
yield self.compose_document()
def compose_document(self):
node = self.compose_node()
self.all_anchors = {}
self.complete_anchors = {}
return node
def compose_node(self):
if self.parser.check(AliasEvent):
event = self.parser.get()
anchor = event.anchor
if anchor not in self.all_anchors:
raise ComposerError(None, None, "found undefined alias %r"
% anchor.encode('utf-8'), event.start_marker)
if anchor not in self.complete_anchors:
collection_event = self.all_anchors[anchor]
raise ComposerError("while composing a collection",
collection_event.start_marker,
"found recursive anchor %r" % anchor.encode('utf-8'),
event.start_marker)
return self.complete_anchors[anchor]
event = self.parser.peek()
anchor = event.anchor
if anchor is not None:
if anchor in self.all_anchors:
raise ComposerError("found duplicate anchor %r; first occurence"
% anchor.encode('utf-8'), self.all_anchors[anchor].start_marker,
"second occurence", event.start_marker)
self.all_anchors[anchor] = event
if self.parser.check(ScalarEvent):
node = self.compose_scalar_node()
elif self.parser.check(SequenceEvent):
node = self.compose_sequence_node()
elif self.parser.check(MappingEvent):
node = self.compose_mapping_node()
if anchor is not None:
self.complete_anchors[anchor] = node
return node
def compose_scalar_node(self):
event = self.parser.get()
return ScalarNode(event.tag, event.value,
event.start_marker, event.end_marker)
def compose_sequence_node(self):
start_event = self.parser.get()
value = []
while not self.parser.check(CollectionEndEvent):
value.append(self.compose_node())
end_event = self.parser.get()
return SequenceNode(start_event.tag, value,
start_event.start_marker, end_event.end_marker)
def compose_mapping_node(self):
start_event = self.parser.get()
value = {}
while not self.parser.check(CollectionEndEvent):
key_event = self.parser.peek()
item_key = self.compose_node()
item_value = self.compose_node()
if item_key in value:
raise ComposerError("while composing a mapping", start_event.start_marker,
"found duplicate key", key_event.start_marker)
value[item_key] = item_value
end_event = self.parser.get()
return MappingNode(start_event.tag, value,
start_event.start_marker, end_event.end_marker)