blob: b4298b804342577af28be641b978a932d8620458 [file] [log] [blame]
# Events should obey the following grammar:
# stream ::= STREAM-START document* STREAM-END
# document ::= DOCUMENT-START node DOCUMENT-END
# node ::= SCALAR | sequence | mapping
# sequence ::= SEQUENCE-START node* SEQUENCE-END
# mapping ::= MAPPING-START (node node)* MAPPING-END
__all__ = ['Emitter', 'EmitterError']
from error import YAMLError
from events import *
class EmitterError(YAMLError):
pass
class Emitter:
def __init__(self, writer):
self.writer = writer
self.states = []
self.state = self.expect_stream_start
self.indents = []
self.indent = None
self.flow_level = 0
self.key_context = False
self.space = True
self.line = True
self.allow_inline_collection = False
self.allow_indentless_sequence = False
self.simple_key = False
self.event_queue = []
def emit(self, event):
if self.event_queue:
self.event_queue.append(event)
event = self.event_queue.pop(0)
self.state(event)
def push_back(self, event):
self.event_queue.insert(0, event)
def expect_stream_start(self, event):
if isinstance(event, StreamStartEvent):
self.state = self.expect_document_start
else:
raise EmitterError("expected StreamStartEvent, but got %s" % event.__class__.__name__)
def expect_document_start(self, event):
if isinstance(event, DocumentStartEvent):
self.write_document_start()
self.states.append(self.expect_document_end)
self.state = self.expect_root_node
self.allow_inline_collection = False
self.allow_indentless_sequence = False
elif isinstance(event, StreamEndEvent):
self.writer.flush()
self.state = self.expect_nothing
else:
raise EmitterError("expected DocumentStartEvent, but got %s" % event.__class__.__name__)
def expect_document_end(self, event):
if isinstance(event, DocumentEndEvent):
self.write_document_end()
self.state = self.expect_document_start
self.allow_inline_collection = False
self.allow_indentless_sequence = False
else:
raiseEmitterError("expected DocumentEndEvent, but got %s" % event.__class__.__name__)
def expect_root_node(self, event):
self.expect_node(event)
def expect_node(self, event):
empty = None
if isinstance(event, (SequenceEvent, MappingEvent)):
if not self.event_queue:
return self.push_back(event)
empty = isinstance(self.event_queue[0], CollectionEndEvent)
if isinstance(event, AliasEvent):
self.expect_alias(event)
elif isinstance(event, (ScalarEvent, SequenceEvent, MappingEvent)):
if event.anchor:
self.write_anchor("&", event.anchor)
self.allow_inline_collection = False
if event.tag not in [None, u'!']:
self.write_tag(event.tag)
self.allow_inline_collection = False
if isinstance(event, ScalarEvent):
self.expect_scalar(event)
elif isinstance(event, SequenceEvent):
self.expect_sequence(event, empty)
elif isinstance(event, MappingEvent):
self.expect_mapping(event, empty)
else:
raise EmitterError("Expected NodeEvent, but got %s" % event.__class__.__name__)
def expect_alias(self, event):
self.write_anchor("*", event.anchor)
self.state = self.states.pop()
def expect_scalar(self, event):
self.indents.append(self.indent)
if self.indent is None:
self.indent = 2
else:
self.indent += 2
self.write_scalar(event.value, event.implicit, event.style)
self.indent = self.indents.pop()
self.state = self.states.pop()
self.allow_inline_collection = False
self.allow_indentless_sequence = False
def expect_sequence(self, event, empty):
if self.flow_level or event.flow or empty:
self.write_indicator("[", need_space=True, provide_space=True)
self.flow_level += 1
self.indents.append(self.indent)
if self.indent is None:
self.indent = 2
else:
self.indent += 2
self.state = self.expect_first_flow_sequence_item
else:
self.indents.append(self.indent)
if self.indent is None:
self.indent = 0
else:
self.indent += 2
self.state = self.expect_first_block_sequence_item
def expect_mapping(self, event, empty):
if self.flow_level or event.flow or empty:
self.write_indicator("{", need_space=True, provide_space=True)
self.flow_level += 1
self.indents.append(self.indent)
if self.indent is None:
self.indent = 2
else:
self.indent += 2
self.state = self.expect_first_flow_mapping_key
else:
self.indents.append(self.indent)
if self.indent is None:
self.indent = 0
else:
self.indent += 2
self.state = self.expect_first_block_mapping_key
def expect_first_flow_sequence_item(self, event):
if isinstance(event, CollectionEndEvent):
self.indent = self.indents.pop()
self.write_indicator("]", provide_space=True)
self.flow_level -= 1
self.state = self.states.pop()
else:
self.write_indent()
self.states.append(self.expect_flow_sequence_item)
self.state = self.expect_node
self.expect_node(event)
def expect_flow_sequence_item(self, event):
if isinstance(event, CollectionEndEvent):
self.write_indicator(",")
self.indent = self.indents.pop()
self.write_indent()
self.write_indicator("]")
self.flow_level -= 1
self.state = self.states.pop()
else:
self.write_indicator(",")
self.write_indent()
self.states.append(self.expect_flow_sequence_item)
self.state = self.expect_node
self.expect_node(event)
def expect_first_block_sequence_item(self, event):
assert not isinstance(event, CollectionEndEvent)
if not self.allow_inline_collection:
if self.allow_indentless_sequence:
self.indent = self.indents.pop()
self.indents.append(self.indent)
self.write_indent()
self.write_indicator("-", need_space=True)
self.allow_indentless_sequence = False
self.allow_inline_collection = True
self.states.append(self.expect_block_sequence_item)
self.state = self.expect_node
self.expect_node(event)
def expect_block_sequence_item(self, event):
if isinstance(event, CollectionEndEvent):
self.indent = self.indents.pop()
self.state = self.states.pop()
else:
self.write_indent()
self.write_indicator("-", need_space=True)
self.allow_indentless_sequence = False
self.allow_inline_collection = True
self.states.append(self.expect_block_sequence_item)
self.state = self.expect_node
self.expect_node(event)
def expect_first_flow_mapping_key(self, event):
if isinstance(event, CollectionEndEvent):
self.indent = self.indents.pop()
self.write_indicator("}")
self.flow_level -= 1
self.state = self.states.pop()
else:
self.write_indent()
if self.is_simple(event):
self.simple_key = True
else:
self.write_indicator("?", need_space=True)
self.states.append(self.expect_flow_mapping_value)
self.state = self.expect_node
self.expect_node(event)
def expect_flow_mapping_key(self, event):
if isinstance(event, CollectionEndEvent):
self.indent = self.indents.pop()
self.write_indent()
self.write_indicator("}")
self.flow_level -= 1
self.state = self.states.pop()
else:
self.write_indicator(",")
self.write_indent()
if self.is_simple(event):
self.simple_key = True
else:
self.write_indicator("?", need_space=True)
self.states.append(self.expect_flow_mapping_value)
self.state = self.expect_node
self.expect_node(event)
def expect_flow_mapping_value(self, event):
if self.simple_key:
self.write_indicator(":", need_space=False)
self.simple_key = False
else:
self.write_indent()
self.write_indicator(":", need_space=True)
self.states.append(self.expect_flow_mapping_key)
self.state = self.expect_node
self.expect_node(event)
def expect_first_block_mapping_key(self, event):
assert not isinstance(event, CollectionEndEvent)
simple = self.is_simple(event)
if simple is None:
return self.push_back(event)
if not self.allow_inline_collection:
self.write_indent()
if self.is_simple(event):
self.allow_indentless_sequence = True
self.allow_inline_collection = False
self.simple_key = True
else:
self.write_indicator("?", need_space=True)
self.allow_indentless_sequence = True
self.allow_inline_collection = True
self.states.append(self.expect_block_mapping_value)
self.state = self.expect_node
self.expect_node(event)
def expect_block_mapping_key(self, event):
if isinstance(event, CollectionEndEvent):
self.indent = self.indents.pop()
self.state = self.states.pop()
else:
self.write_indent()
if self.is_simple(event):
self.allow_indentless_sequence = True
self.allow_inline_collection = False
self.simple_key = True
else:
self.write_indicator("?", need_space=True)
self.allow_indentless_sequence = True
self.allow_inline_collection = True
self.states.append(self.expect_block_mapping_value)
self.state = self.expect_node
self.expect_node(event)
def expect_block_mapping_value(self, event):
if self.simple_key:
self.write_indicator(":", need_space=False)
self.allow_indentless_sequence = True
self.allow_inline_collection = False
self.simple_key = False
else:
self.write_indent()
self.write_indicator(":", need_space=True)
self.allow_indentless_sequence = True
self.allow_inline_collection = True
self.states.append(self.expect_block_mapping_key)
self.state = self.expect_node
self.expect_node(event)
def expect_nothing(self, event):
raise EmitterError("expected nothing, but got %s" % event.__class__.__name__)
def write_document_start(self):
self.writer.write("%YAML 1.1")
self.write_line_break()
self.writer.write("---")
self.space = False
self.line = False
def write_document_end(self):
if not self.line:
self.write_line_break()
self.writer.write("...")
self.write_line_break()
def write_line_break(self):
self.writer.write('\n')
self.space = True
self.line = True
def write_anchor(self, indicator, name):
if not self.space:
self.writer.write(" ")
self.writer.write("%s%s" % (indicator, name))
self.space = False
self.line = False
def write_tag(self, tag):
if not self.space:
self.writer.write(" ")
if tag.startswith("tag:yaml.org,2002:"):
self.writer.write("!!%s" % tag[len("tag.yaml.org,2002:"):])
else:
self.writer.write("!<%s>" % tag)
self.space = False
self.line = False
def is_simple(self, event):
if not isinstance(event, ScalarEvent):
return False
if '\n' in event.value or len(event.value) > 128:
return False
if event.style and event.style in '|>':
return False
return True
def write_scalar(self, value, implicit, style):
if implicit:
if not self.space:
self.writer.write(" ")
self.writer.write(value.encode('utf-8'))
self.space = False
self.line = False
elif style in ['>', '|'] and not self.flow_level and not self.simple_key:
if not self.space:
self.writer.write(" ")
self.writer.write("|-")
self.write_line_break()
self.write_indent()
self.writer.write(value.encode('utf-8'))
self.write_line_break()
else:
if not self.space:
self.writer.write(" ")
self.writer.write("\"%s\"" % value.encode('utf-8'))
self.space = False
self.line = False
def write_indicator(self, indicator, need_space=False, provide_space=False):
if need_space and not self.space:
self.writer.write(" ")
self.writer.write(indicator)
self.space = provide_space
self.line = False
def write_indent(self):
if not self.line:
self.write_line_break()
if self.indent:
self.writer.write(" "*self.indent)
self.line = False
self.space = True