| # mako/codegen.py |
| # Copyright (C) 2006-2015 the Mako authors and contributors <see AUTHORS file> |
| # |
| # This module is part of Mako and is released under |
| # the MIT License: http://www.opensource.org/licenses/mit-license.php |
| |
| """provides functionality for rendering a parsetree constructing into module |
| source code.""" |
| |
| import time |
| import re |
| from mako.pygen import PythonPrinter |
| from mako import util, ast, parsetree, filters, exceptions |
| from mako import compat |
| |
| |
| MAGIC_NUMBER = 10 |
| |
| # names which are hardwired into the |
| # template and are not accessed via the |
| # context itself |
| TOPLEVEL_DECLARED = set(["UNDEFINED", "STOP_RENDERING"]) |
| RESERVED_NAMES = set(['context', 'loop']).union(TOPLEVEL_DECLARED) |
| |
| |
| def compile(node, |
| uri, |
| filename=None, |
| default_filters=None, |
| buffer_filters=None, |
| imports=None, |
| future_imports=None, |
| source_encoding=None, |
| generate_magic_comment=True, |
| disable_unicode=False, |
| strict_undefined=False, |
| enable_loop=True, |
| reserved_names=frozenset()): |
| """Generate module source code given a parsetree node, |
| uri, and optional source filename""" |
| |
| # if on Py2K, push the "source_encoding" string to be |
| # a bytestring itself, as we will be embedding it into |
| # the generated source and we don't want to coerce the |
| # result into a unicode object, in "disable_unicode" mode |
| if not compat.py3k and isinstance(source_encoding, compat.text_type): |
| source_encoding = source_encoding.encode(source_encoding) |
| |
| buf = util.FastEncodingBuffer() |
| |
| printer = PythonPrinter(buf) |
| _GenerateRenderMethod(printer, |
| _CompileContext(uri, |
| filename, |
| default_filters, |
| buffer_filters, |
| imports, |
| future_imports, |
| source_encoding, |
| generate_magic_comment, |
| disable_unicode, |
| strict_undefined, |
| enable_loop, |
| reserved_names), |
| node) |
| return buf.getvalue() |
| |
| |
| class _CompileContext(object): |
| |
| def __init__(self, |
| uri, |
| filename, |
| default_filters, |
| buffer_filters, |
| imports, |
| future_imports, |
| source_encoding, |
| generate_magic_comment, |
| disable_unicode, |
| strict_undefined, |
| enable_loop, |
| reserved_names): |
| self.uri = uri |
| self.filename = filename |
| self.default_filters = default_filters |
| self.buffer_filters = buffer_filters |
| self.imports = imports |
| self.future_imports = future_imports |
| self.source_encoding = source_encoding |
| self.generate_magic_comment = generate_magic_comment |
| self.disable_unicode = disable_unicode |
| self.strict_undefined = strict_undefined |
| self.enable_loop = enable_loop |
| self.reserved_names = reserved_names |
| |
| |
| class _GenerateRenderMethod(object): |
| |
| """A template visitor object which generates the |
| full module source for a template. |
| |
| """ |
| |
| def __init__(self, printer, compiler, node): |
| self.printer = printer |
| self.compiler = compiler |
| self.node = node |
| self.identifier_stack = [None] |
| self.in_def = isinstance(node, (parsetree.DefTag, parsetree.BlockTag)) |
| |
| if self.in_def: |
| name = "render_%s" % node.funcname |
| args = node.get_argument_expressions() |
| filtered = len(node.filter_args.args) > 0 |
| buffered = eval(node.attributes.get('buffered', 'False')) |
| cached = eval(node.attributes.get('cached', 'False')) |
| defs = None |
| pagetag = None |
| if node.is_block and not node.is_anonymous: |
| args += ['**pageargs'] |
| else: |
| defs = self.write_toplevel() |
| pagetag = self.compiler.pagetag |
| name = "render_body" |
| if pagetag is not None: |
| args = pagetag.body_decl.get_argument_expressions() |
| if not pagetag.body_decl.kwargs: |
| args += ['**pageargs'] |
| cached = eval(pagetag.attributes.get('cached', 'False')) |
| self.compiler.enable_loop = self.compiler.enable_loop or eval( |
| pagetag.attributes.get( |
| 'enable_loop', 'False') |
| ) |
| else: |
| args = ['**pageargs'] |
| cached = False |
| buffered = filtered = False |
| if args is None: |
| args = ['context'] |
| else: |
| args = [a for a in ['context'] + args] |
| |
| self.write_render_callable( |
| pagetag or node, |
| name, args, |
| buffered, filtered, cached) |
| |
| if defs is not None: |
| for node in defs: |
| _GenerateRenderMethod(printer, compiler, node) |
| |
| if not self.in_def: |
| self.write_metadata_struct() |
| |
| def write_metadata_struct(self): |
| self.printer.source_map[self.printer.lineno] = \ |
| max(self.printer.source_map) |
| struct = { |
| "filename": self.compiler.filename, |
| "uri": self.compiler.uri, |
| "source_encoding": self.compiler.source_encoding, |
| "line_map": self.printer.source_map, |
| } |
| self.printer.writelines( |
| '"""', |
| '__M_BEGIN_METADATA', |
| compat.json.dumps(struct), |
| '__M_END_METADATA\n' |
| '"""' |
| ) |
| |
| @property |
| def identifiers(self): |
| return self.identifier_stack[-1] |
| |
| def write_toplevel(self): |
| """Traverse a template structure for module-level directives and |
| generate the start of module-level code. |
| |
| """ |
| inherit = [] |
| namespaces = {} |
| module_code = [] |
| |
| self.compiler.pagetag = None |
| |
| class FindTopLevel(object): |
| |
| def visitInheritTag(s, node): |
| inherit.append(node) |
| |
| def visitNamespaceTag(s, node): |
| namespaces[node.name] = node |
| |
| def visitPageTag(s, node): |
| self.compiler.pagetag = node |
| |
| def visitCode(s, node): |
| if node.ismodule: |
| module_code.append(node) |
| |
| f = FindTopLevel() |
| for n in self.node.nodes: |
| n.accept_visitor(f) |
| |
| self.compiler.namespaces = namespaces |
| |
| module_ident = set() |
| for n in module_code: |
| module_ident = module_ident.union(n.declared_identifiers()) |
| |
| module_identifiers = _Identifiers(self.compiler) |
| module_identifiers.declared = module_ident |
| |
| # module-level names, python code |
| if self.compiler.generate_magic_comment and \ |
| self.compiler.source_encoding: |
| self.printer.writeline("# -*- coding:%s -*-" % |
| self.compiler.source_encoding) |
| |
| if self.compiler.future_imports: |
| self.printer.writeline("from __future__ import %s" % |
| (", ".join(self.compiler.future_imports),)) |
| self.printer.writeline("from mako import runtime, filters, cache") |
| self.printer.writeline("UNDEFINED = runtime.UNDEFINED") |
| self.printer.writeline("STOP_RENDERING = runtime.STOP_RENDERING") |
| self.printer.writeline("__M_dict_builtin = dict") |
| self.printer.writeline("__M_locals_builtin = locals") |
| self.printer.writeline("_magic_number = %r" % MAGIC_NUMBER) |
| self.printer.writeline("_modified_time = %r" % time.time()) |
| self.printer.writeline("_enable_loop = %r" % self.compiler.enable_loop) |
| self.printer.writeline( |
| "_template_filename = %r" % self.compiler.filename) |
| self.printer.writeline("_template_uri = %r" % self.compiler.uri) |
| self.printer.writeline( |
| "_source_encoding = %r" % self.compiler.source_encoding) |
| if self.compiler.imports: |
| buf = '' |
| for imp in self.compiler.imports: |
| buf += imp + "\n" |
| self.printer.writeline(imp) |
| impcode = ast.PythonCode( |
| buf, |
| source='', lineno=0, |
| pos=0, |
| filename='template defined imports') |
| else: |
| impcode = None |
| |
| main_identifiers = module_identifiers.branch(self.node) |
| module_identifiers.topleveldefs = \ |
| module_identifiers.topleveldefs.\ |
| union(main_identifiers.topleveldefs) |
| module_identifiers.declared.update(TOPLEVEL_DECLARED) |
| if impcode: |
| module_identifiers.declared.update(impcode.declared_identifiers) |
| |
| self.compiler.identifiers = module_identifiers |
| self.printer.writeline("_exports = %r" % |
| [n.name for n in |
| main_identifiers.topleveldefs.values()] |
| ) |
| self.printer.write_blanks(2) |
| |
| if len(module_code): |
| self.write_module_code(module_code) |
| |
| if len(inherit): |
| self.write_namespaces(namespaces) |
| self.write_inherit(inherit[-1]) |
| elif len(namespaces): |
| self.write_namespaces(namespaces) |
| |
| return list(main_identifiers.topleveldefs.values()) |
| |
| def write_render_callable(self, node, name, args, buffered, filtered, |
| cached): |
| """write a top-level render callable. |
| |
| this could be the main render() method or that of a top-level def.""" |
| |
| if self.in_def: |
| decorator = node.decorator |
| if decorator: |
| self.printer.writeline( |
| "@runtime._decorate_toplevel(%s)" % decorator) |
| |
| self.printer.start_source(node.lineno) |
| self.printer.writelines( |
| "def %s(%s):" % (name, ','.join(args)), |
| # push new frame, assign current frame to __M_caller |
| "__M_caller = context.caller_stack._push_frame()", |
| "try:" |
| ) |
| if buffered or filtered or cached: |
| self.printer.writeline("context._push_buffer()") |
| |
| self.identifier_stack.append( |
| self.compiler.identifiers.branch(self.node)) |
| if (not self.in_def or self.node.is_block) and '**pageargs' in args: |
| self.identifier_stack[-1].argument_declared.add('pageargs') |
| |
| if not self.in_def and ( |
| len(self.identifiers.locally_assigned) > 0 or |
| len(self.identifiers.argument_declared) > 0 |
| ): |
| self.printer.writeline("__M_locals = __M_dict_builtin(%s)" % |
| ','.join([ |
| "%s=%s" % (x, x) for x in |
| self.identifiers.argument_declared |
| ])) |
| |
| self.write_variable_declares(self.identifiers, toplevel=True) |
| |
| for n in self.node.nodes: |
| n.accept_visitor(self) |
| |
| self.write_def_finish(self.node, buffered, filtered, cached) |
| self.printer.writeline(None) |
| self.printer.write_blanks(2) |
| if cached: |
| self.write_cache_decorator( |
| node, name, |
| args, buffered, |
| self.identifiers, toplevel=True) |
| |
| def write_module_code(self, module_code): |
| """write module-level template code, i.e. that which |
| is enclosed in <%! %> tags in the template.""" |
| for n in module_code: |
| self.printer.start_source(n.lineno) |
| self.printer.write_indented_block(n.text) |
| |
| def write_inherit(self, node): |
| """write the module-level inheritance-determination callable.""" |
| |
| self.printer.writelines( |
| "def _mako_inherit(template, context):", |
| "_mako_generate_namespaces(context)", |
| "return runtime._inherit_from(context, %s, _template_uri)" % |
| (node.parsed_attributes['file']), |
| None |
| ) |
| |
| def write_namespaces(self, namespaces): |
| """write the module-level namespace-generating callable.""" |
| self.printer.writelines( |
| "def _mako_get_namespace(context, name):", |
| "try:", |
| "return context.namespaces[(__name__, name)]", |
| "except KeyError:", |
| "_mako_generate_namespaces(context)", |
| "return context.namespaces[(__name__, name)]", |
| None, None |
| ) |
| self.printer.writeline("def _mako_generate_namespaces(context):") |
| |
| for node in namespaces.values(): |
| if 'import' in node.attributes: |
| self.compiler.has_ns_imports = True |
| self.printer.start_source(node.lineno) |
| if len(node.nodes): |
| self.printer.writeline("def make_namespace():") |
| export = [] |
| identifiers = self.compiler.identifiers.branch(node) |
| self.in_def = True |
| |
| class NSDefVisitor(object): |
| |
| def visitDefTag(s, node): |
| s.visitDefOrBase(node) |
| |
| def visitBlockTag(s, node): |
| s.visitDefOrBase(node) |
| |
| def visitDefOrBase(s, node): |
| if node.is_anonymous: |
| raise exceptions.CompileException( |
| "Can't put anonymous blocks inside " |
| "<%namespace>", |
| **node.exception_kwargs |
| ) |
| self.write_inline_def(node, identifiers, nested=False) |
| export.append(node.funcname) |
| vis = NSDefVisitor() |
| for n in node.nodes: |
| n.accept_visitor(vis) |
| self.printer.writeline("return [%s]" % (','.join(export))) |
| self.printer.writeline(None) |
| self.in_def = False |
| callable_name = "make_namespace()" |
| else: |
| callable_name = "None" |
| |
| if 'file' in node.parsed_attributes: |
| self.printer.writeline( |
| "ns = runtime.TemplateNamespace(%r," |
| " context._clean_inheritance_tokens()," |
| " templateuri=%s, callables=%s, " |
| " calling_uri=_template_uri)" % |
| ( |
| node.name, |
| node.parsed_attributes.get('file', 'None'), |
| callable_name, |
| ) |
| ) |
| elif 'module' in node.parsed_attributes: |
| self.printer.writeline( |
| "ns = runtime.ModuleNamespace(%r," |
| " context._clean_inheritance_tokens()," |
| " callables=%s, calling_uri=_template_uri," |
| " module=%s)" % |
| ( |
| node.name, |
| callable_name, |
| node.parsed_attributes.get( |
| 'module', 'None') |
| ) |
| ) |
| else: |
| self.printer.writeline( |
| "ns = runtime.Namespace(%r," |
| " context._clean_inheritance_tokens()," |
| " callables=%s, calling_uri=_template_uri)" % |
| ( |
| node.name, |
| callable_name, |
| ) |
| ) |
| if eval(node.attributes.get('inheritable', "False")): |
| self.printer.writeline("context['self'].%s = ns" % (node.name)) |
| |
| self.printer.writeline( |
| "context.namespaces[(__name__, %s)] = ns" % repr(node.name)) |
| self.printer.write_blanks(1) |
| if not len(namespaces): |
| self.printer.writeline("pass") |
| self.printer.writeline(None) |
| |
| def write_variable_declares(self, identifiers, toplevel=False, limit=None): |
| """write variable declarations at the top of a function. |
| |
| the variable declarations are in the form of callable |
| definitions for defs and/or name lookup within the |
| function's context argument. the names declared are based |
| on the names that are referenced in the function body, |
| which don't otherwise have any explicit assignment |
| operation. names that are assigned within the body are |
| assumed to be locally-scoped variables and are not |
| separately declared. |
| |
| for def callable definitions, if the def is a top-level |
| callable then a 'stub' callable is generated which wraps |
| the current Context into a closure. if the def is not |
| top-level, it is fully rendered as a local closure. |
| |
| """ |
| |
| # collection of all defs available to us in this scope |
| comp_idents = dict([(c.funcname, c) for c in identifiers.defs]) |
| to_write = set() |
| |
| # write "context.get()" for all variables we are going to |
| # need that arent in the namespace yet |
| to_write = to_write.union(identifiers.undeclared) |
| |
| # write closure functions for closures that we define |
| # right here |
| to_write = to_write.union( |
| [c.funcname for c in identifiers.closuredefs.values()]) |
| |
| # remove identifiers that are declared in the argument |
| # signature of the callable |
| to_write = to_write.difference(identifiers.argument_declared) |
| |
| # remove identifiers that we are going to assign to. |
| # in this way we mimic Python's behavior, |
| # i.e. assignment to a variable within a block |
| # means that variable is now a "locally declared" var, |
| # which cannot be referenced beforehand. |
| to_write = to_write.difference(identifiers.locally_declared) |
| |
| if self.compiler.enable_loop: |
| has_loop = "loop" in to_write |
| to_write.discard("loop") |
| else: |
| has_loop = False |
| |
| # if a limiting set was sent, constraint to those items in that list |
| # (this is used for the caching decorator) |
| if limit is not None: |
| to_write = to_write.intersection(limit) |
| |
| if toplevel and getattr(self.compiler, 'has_ns_imports', False): |
| self.printer.writeline("_import_ns = {}") |
| self.compiler.has_imports = True |
| for ident, ns in self.compiler.namespaces.items(): |
| if 'import' in ns.attributes: |
| self.printer.writeline( |
| "_mako_get_namespace(context, %r)." |
| "_populate(_import_ns, %r)" % |
| ( |
| ident, |
| re.split(r'\s*,\s*', ns.attributes['import']) |
| )) |
| |
| if has_loop: |
| self.printer.writeline( |
| 'loop = __M_loop = runtime.LoopStack()' |
| ) |
| |
| for ident in to_write: |
| if ident in comp_idents: |
| comp = comp_idents[ident] |
| if comp.is_block: |
| if not comp.is_anonymous: |
| self.write_def_decl(comp, identifiers) |
| else: |
| self.write_inline_def(comp, identifiers, nested=True) |
| else: |
| if comp.is_root(): |
| self.write_def_decl(comp, identifiers) |
| else: |
| self.write_inline_def(comp, identifiers, nested=True) |
| |
| elif ident in self.compiler.namespaces: |
| self.printer.writeline( |
| "%s = _mako_get_namespace(context, %r)" % |
| (ident, ident) |
| ) |
| else: |
| if getattr(self.compiler, 'has_ns_imports', False): |
| if self.compiler.strict_undefined: |
| self.printer.writelines( |
| "%s = _import_ns.get(%r, UNDEFINED)" % |
| (ident, ident), |
| "if %s is UNDEFINED:" % ident, |
| "try:", |
| "%s = context[%r]" % (ident, ident), |
| "except KeyError:", |
| "raise NameError(\"'%s' is not defined\")" % |
| ident, |
| None, None |
| ) |
| else: |
| self.printer.writeline( |
| "%s = _import_ns.get" |
| "(%r, context.get(%r, UNDEFINED))" % |
| (ident, ident, ident)) |
| else: |
| if self.compiler.strict_undefined: |
| self.printer.writelines( |
| "try:", |
| "%s = context[%r]" % (ident, ident), |
| "except KeyError:", |
| "raise NameError(\"'%s' is not defined\")" % |
| ident, |
| None |
| ) |
| else: |
| self.printer.writeline( |
| "%s = context.get(%r, UNDEFINED)" % (ident, ident) |
| ) |
| |
| self.printer.writeline("__M_writer = context.writer()") |
| |
| def write_def_decl(self, node, identifiers): |
| """write a locally-available callable referencing a top-level def""" |
| funcname = node.funcname |
| namedecls = node.get_argument_expressions() |
| nameargs = node.get_argument_expressions(as_call=True) |
| |
| if not self.in_def and ( |
| len(self.identifiers.locally_assigned) > 0 or |
| len(self.identifiers.argument_declared) > 0): |
| nameargs.insert(0, 'context._locals(__M_locals)') |
| else: |
| nameargs.insert(0, 'context') |
| self.printer.writeline("def %s(%s):" % (funcname, ",".join(namedecls))) |
| self.printer.writeline( |
| "return render_%s(%s)" % (funcname, ",".join(nameargs))) |
| self.printer.writeline(None) |
| |
| def write_inline_def(self, node, identifiers, nested): |
| """write a locally-available def callable inside an enclosing def.""" |
| |
| namedecls = node.get_argument_expressions() |
| |
| decorator = node.decorator |
| if decorator: |
| self.printer.writeline( |
| "@runtime._decorate_inline(context, %s)" % decorator) |
| self.printer.writeline( |
| "def %s(%s):" % (node.funcname, ",".join(namedecls))) |
| filtered = len(node.filter_args.args) > 0 |
| buffered = eval(node.attributes.get('buffered', 'False')) |
| cached = eval(node.attributes.get('cached', 'False')) |
| self.printer.writelines( |
| # push new frame, assign current frame to __M_caller |
| "__M_caller = context.caller_stack._push_frame()", |
| "try:" |
| ) |
| if buffered or filtered or cached: |
| self.printer.writelines( |
| "context._push_buffer()", |
| ) |
| |
| identifiers = identifiers.branch(node, nested=nested) |
| |
| self.write_variable_declares(identifiers) |
| |
| self.identifier_stack.append(identifiers) |
| for n in node.nodes: |
| n.accept_visitor(self) |
| self.identifier_stack.pop() |
| |
| self.write_def_finish(node, buffered, filtered, cached) |
| self.printer.writeline(None) |
| if cached: |
| self.write_cache_decorator(node, node.funcname, |
| namedecls, False, identifiers, |
| inline=True, toplevel=False) |
| |
| def write_def_finish(self, node, buffered, filtered, cached, |
| callstack=True): |
| """write the end section of a rendering function, either outermost or |
| inline. |
| |
| this takes into account if the rendering function was filtered, |
| buffered, etc. and closes the corresponding try: block if any, and |
| writes code to retrieve captured content, apply filters, send proper |
| return value.""" |
| |
| if not buffered and not cached and not filtered: |
| self.printer.writeline("return ''") |
| if callstack: |
| self.printer.writelines( |
| "finally:", |
| "context.caller_stack._pop_frame()", |
| None |
| ) |
| |
| if buffered or filtered or cached: |
| if buffered or cached: |
| # in a caching scenario, don't try to get a writer |
| # from the context after popping; assume the caching |
| # implemenation might be using a context with no |
| # extra buffers |
| self.printer.writelines( |
| "finally:", |
| "__M_buf = context._pop_buffer()" |
| ) |
| else: |
| self.printer.writelines( |
| "finally:", |
| "__M_buf, __M_writer = context._pop_buffer_and_writer()" |
| ) |
| |
| if callstack: |
| self.printer.writeline("context.caller_stack._pop_frame()") |
| |
| s = "__M_buf.getvalue()" |
| if filtered: |
| s = self.create_filter_callable(node.filter_args.args, s, |
| False) |
| self.printer.writeline(None) |
| if buffered and not cached: |
| s = self.create_filter_callable(self.compiler.buffer_filters, |
| s, False) |
| if buffered or cached: |
| self.printer.writeline("return %s" % s) |
| else: |
| self.printer.writelines( |
| "__M_writer(%s)" % s, |
| "return ''" |
| ) |
| |
| def write_cache_decorator(self, node_or_pagetag, name, |
| args, buffered, identifiers, |
| inline=False, toplevel=False): |
| """write a post-function decorator to replace a rendering |
| callable with a cached version of itself.""" |
| |
| self.printer.writeline("__M_%s = %s" % (name, name)) |
| cachekey = node_or_pagetag.parsed_attributes.get('cache_key', |
| repr(name)) |
| |
| cache_args = {} |
| if self.compiler.pagetag is not None: |
| cache_args.update( |
| ( |
| pa[6:], |
| self.compiler.pagetag.parsed_attributes[pa] |
| ) |
| for pa in self.compiler.pagetag.parsed_attributes |
| if pa.startswith('cache_') and pa != 'cache_key' |
| ) |
| cache_args.update( |
| ( |
| pa[6:], |
| node_or_pagetag.parsed_attributes[pa] |
| ) for pa in node_or_pagetag.parsed_attributes |
| if pa.startswith('cache_') and pa != 'cache_key' |
| ) |
| if 'timeout' in cache_args: |
| cache_args['timeout'] = int(eval(cache_args['timeout'])) |
| |
| self.printer.writeline("def %s(%s):" % (name, ','.join(args))) |
| |
| # form "arg1, arg2, arg3=arg3, arg4=arg4", etc. |
| pass_args = [ |
| "%s=%s" % ((a.split('=')[0],) * 2) if '=' in a else a |
| for a in args |
| ] |
| |
| self.write_variable_declares( |
| identifiers, |
| toplevel=toplevel, |
| limit=node_or_pagetag.undeclared_identifiers() |
| ) |
| if buffered: |
| s = "context.get('local')."\ |
| "cache._ctx_get_or_create("\ |
| "%s, lambda:__M_%s(%s), context, %s__M_defname=%r)" % ( |
| cachekey, name, ','.join(pass_args), |
| ''.join(["%s=%s, " % (k, v) |
| for k, v in cache_args.items()]), |
| name |
| ) |
| # apply buffer_filters |
| s = self.create_filter_callable(self.compiler.buffer_filters, s, |
| False) |
| self.printer.writelines("return " + s, None) |
| else: |
| self.printer.writelines( |
| "__M_writer(context.get('local')." |
| "cache._ctx_get_or_create(" |
| "%s, lambda:__M_%s(%s), context, %s__M_defname=%r))" % |
| ( |
| cachekey, name, ','.join(pass_args), |
| ''.join(["%s=%s, " % (k, v) |
| for k, v in cache_args.items()]), |
| name, |
| ), |
| "return ''", |
| None |
| ) |
| |
| def create_filter_callable(self, args, target, is_expression): |
| """write a filter-applying expression based on the filters |
| present in the given filter names, adjusting for the global |
| 'default' filter aliases as needed.""" |
| |
| def locate_encode(name): |
| if re.match(r'decode\..+', name): |
| return "filters." + name |
| elif self.compiler.disable_unicode: |
| return filters.NON_UNICODE_ESCAPES.get(name, name) |
| else: |
| return filters.DEFAULT_ESCAPES.get(name, name) |
| |
| if 'n' not in args: |
| if is_expression: |
| if self.compiler.pagetag: |
| args = self.compiler.pagetag.filter_args.args + args |
| if self.compiler.default_filters: |
| args = self.compiler.default_filters + args |
| for e in args: |
| # if filter given as a function, get just the identifier portion |
| if e == 'n': |
| continue |
| m = re.match(r'(.+?)(\(.*\))', e) |
| if m: |
| ident, fargs = m.group(1, 2) |
| f = locate_encode(ident) |
| e = f + fargs |
| else: |
| e = locate_encode(e) |
| assert e is not None |
| target = "%s(%s)" % (e, target) |
| return target |
| |
| def visitExpression(self, node): |
| self.printer.start_source(node.lineno) |
| if len(node.escapes) or \ |
| ( |
| self.compiler.pagetag is not None and |
| len(self.compiler.pagetag.filter_args.args) |
| ) or \ |
| len(self.compiler.default_filters): |
| |
| s = self.create_filter_callable(node.escapes_code.args, |
| "%s" % node.text, True) |
| self.printer.writeline("__M_writer(%s)" % s) |
| else: |
| self.printer.writeline("__M_writer(%s)" % node.text) |
| |
| def visitControlLine(self, node): |
| if node.isend: |
| self.printer.writeline(None) |
| if node.has_loop_context: |
| self.printer.writeline('finally:') |
| self.printer.writeline("loop = __M_loop._exit()") |
| self.printer.writeline(None) |
| else: |
| self.printer.start_source(node.lineno) |
| if self.compiler.enable_loop and node.keyword == 'for': |
| text = mangle_mako_loop(node, self.printer) |
| else: |
| text = node.text |
| self.printer.writeline(text) |
| children = node.get_children() |
| # this covers the three situations where we want to insert a pass: |
| # 1) a ternary control line with no children, |
| # 2) a primary control line with nothing but its own ternary |
| # and end control lines, and |
| # 3) any control line with no content other than comments |
| if not children or ( |
| compat.all(isinstance(c, (parsetree.Comment, |
| parsetree.ControlLine)) |
| for c in children) and |
| compat.all((node.is_ternary(c.keyword) or c.isend) |
| for c in children |
| if isinstance(c, parsetree.ControlLine))): |
| self.printer.writeline("pass") |
| |
| def visitText(self, node): |
| self.printer.start_source(node.lineno) |
| self.printer.writeline("__M_writer(%s)" % repr(node.content)) |
| |
| def visitTextTag(self, node): |
| filtered = len(node.filter_args.args) > 0 |
| if filtered: |
| self.printer.writelines( |
| "__M_writer = context._push_writer()", |
| "try:", |
| ) |
| for n in node.nodes: |
| n.accept_visitor(self) |
| if filtered: |
| self.printer.writelines( |
| "finally:", |
| "__M_buf, __M_writer = context._pop_buffer_and_writer()", |
| "__M_writer(%s)" % |
| self.create_filter_callable( |
| node.filter_args.args, |
| "__M_buf.getvalue()", |
| False), |
| None |
| ) |
| |
| def visitCode(self, node): |
| if not node.ismodule: |
| self.printer.start_source(node.lineno) |
| self.printer.write_indented_block(node.text) |
| |
| if not self.in_def and len(self.identifiers.locally_assigned) > 0: |
| # if we are the "template" def, fudge locally |
| # declared/modified variables into the "__M_locals" dictionary, |
| # which is used for def calls within the same template, |
| # to simulate "enclosing scope" |
| self.printer.writeline( |
| '__M_locals_builtin_stored = __M_locals_builtin()') |
| self.printer.writeline( |
| '__M_locals.update(__M_dict_builtin([(__M_key,' |
| ' __M_locals_builtin_stored[__M_key]) for __M_key in' |
| ' [%s] if __M_key in __M_locals_builtin_stored]))' % |
| ','.join([repr(x) for x in node.declared_identifiers()])) |
| |
| def visitIncludeTag(self, node): |
| self.printer.start_source(node.lineno) |
| args = node.attributes.get('args') |
| if args: |
| self.printer.writeline( |
| "runtime._include_file(context, %s, _template_uri, %s)" % |
| (node.parsed_attributes['file'], args)) |
| else: |
| self.printer.writeline( |
| "runtime._include_file(context, %s, _template_uri)" % |
| (node.parsed_attributes['file'])) |
| |
| def visitNamespaceTag(self, node): |
| pass |
| |
| def visitDefTag(self, node): |
| pass |
| |
| def visitBlockTag(self, node): |
| if node.is_anonymous: |
| self.printer.writeline("%s()" % node.funcname) |
| else: |
| nameargs = node.get_argument_expressions(as_call=True) |
| nameargs += ['**pageargs'] |
| self.printer.writeline( |
| "if 'parent' not in context._data or " |
| "not hasattr(context._data['parent'], '%s'):" |
| % node.funcname) |
| self.printer.writeline( |
| "context['self'].%s(%s)" % (node.funcname, ",".join(nameargs))) |
| self.printer.writeline("\n") |
| |
| def visitCallNamespaceTag(self, node): |
| # TODO: we can put namespace-specific checks here, such |
| # as ensure the given namespace will be imported, |
| # pre-import the namespace, etc. |
| self.visitCallTag(node) |
| |
| def visitCallTag(self, node): |
| self.printer.writeline("def ccall(caller):") |
| export = ['body'] |
| callable_identifiers = self.identifiers.branch(node, nested=True) |
| body_identifiers = callable_identifiers.branch(node, nested=False) |
| # we want the 'caller' passed to ccall to be used |
| # for the body() function, but for other non-body() |
| # <%def>s within <%call> we want the current caller |
| # off the call stack (if any) |
| body_identifiers.add_declared('caller') |
| |
| self.identifier_stack.append(body_identifiers) |
| |
| class DefVisitor(object): |
| |
| def visitDefTag(s, node): |
| s.visitDefOrBase(node) |
| |
| def visitBlockTag(s, node): |
| s.visitDefOrBase(node) |
| |
| def visitDefOrBase(s, node): |
| self.write_inline_def(node, callable_identifiers, nested=False) |
| if not node.is_anonymous: |
| export.append(node.funcname) |
| # remove defs that are within the <%call> from the |
| # "closuredefs" defined in the body, so they dont render twice |
| if node.funcname in body_identifiers.closuredefs: |
| del body_identifiers.closuredefs[node.funcname] |
| |
| vis = DefVisitor() |
| for n in node.nodes: |
| n.accept_visitor(vis) |
| self.identifier_stack.pop() |
| |
| bodyargs = node.body_decl.get_argument_expressions() |
| self.printer.writeline("def body(%s):" % ','.join(bodyargs)) |
| |
| # TODO: figure out best way to specify |
| # buffering/nonbuffering (at call time would be better) |
| buffered = False |
| if buffered: |
| self.printer.writelines( |
| "context._push_buffer()", |
| "try:" |
| ) |
| self.write_variable_declares(body_identifiers) |
| self.identifier_stack.append(body_identifiers) |
| |
| for n in node.nodes: |
| n.accept_visitor(self) |
| self.identifier_stack.pop() |
| |
| self.write_def_finish(node, buffered, False, False, callstack=False) |
| self.printer.writelines( |
| None, |
| "return [%s]" % (','.join(export)), |
| None |
| ) |
| |
| self.printer.writelines( |
| # push on caller for nested call |
| "context.caller_stack.nextcaller = " |
| "runtime.Namespace('caller', context, " |
| "callables=ccall(__M_caller))", |
| "try:") |
| self.printer.start_source(node.lineno) |
| self.printer.writelines( |
| "__M_writer(%s)" % self.create_filter_callable( |
| [], node.expression, True), |
| "finally:", |
| "context.caller_stack.nextcaller = None", |
| None |
| ) |
| |
| |
| class _Identifiers(object): |
| |
| """tracks the status of identifier names as template code is rendered.""" |
| |
| def __init__(self, compiler, node=None, parent=None, nested=False): |
| if parent is not None: |
| # if we are the branch created in write_namespaces(), |
| # we don't share any context from the main body(). |
| if isinstance(node, parsetree.NamespaceTag): |
| self.declared = set() |
| self.topleveldefs = util.SetLikeDict() |
| else: |
| # things that have already been declared |
| # in an enclosing namespace (i.e. names we can just use) |
| self.declared = set(parent.declared).\ |
| union([c.name for c in parent.closuredefs.values()]).\ |
| union(parent.locally_declared).\ |
| union(parent.argument_declared) |
| |
| # if these identifiers correspond to a "nested" |
| # scope, it means whatever the parent identifiers |
| # had as undeclared will have been declared by that parent, |
| # and therefore we have them in our scope. |
| if nested: |
| self.declared = self.declared.union(parent.undeclared) |
| |
| # top level defs that are available |
| self.topleveldefs = util.SetLikeDict(**parent.topleveldefs) |
| else: |
| self.declared = set() |
| self.topleveldefs = util.SetLikeDict() |
| |
| self.compiler = compiler |
| |
| # things within this level that are referenced before they |
| # are declared (e.g. assigned to) |
| self.undeclared = set() |
| |
| # things that are declared locally. some of these things |
| # could be in the "undeclared" list as well if they are |
| # referenced before declared |
| self.locally_declared = set() |
| |
| # assignments made in explicit python blocks. |
| # these will be propagated to |
| # the context of local def calls. |
| self.locally_assigned = set() |
| |
| # things that are declared in the argument |
| # signature of the def callable |
| self.argument_declared = set() |
| |
| # closure defs that are defined in this level |
| self.closuredefs = util.SetLikeDict() |
| |
| self.node = node |
| |
| if node is not None: |
| node.accept_visitor(self) |
| |
| illegal_names = self.compiler.reserved_names.intersection( |
| self.locally_declared) |
| if illegal_names: |
| raise exceptions.NameConflictError( |
| "Reserved words declared in template: %s" % |
| ", ".join(illegal_names)) |
| |
| def branch(self, node, **kwargs): |
| """create a new Identifiers for a new Node, with |
| this Identifiers as the parent.""" |
| |
| return _Identifiers(self.compiler, node, self, **kwargs) |
| |
| @property |
| def defs(self): |
| return set(self.topleveldefs.union(self.closuredefs).values()) |
| |
| def __repr__(self): |
| return "Identifiers(declared=%r, locally_declared=%r, "\ |
| "undeclared=%r, topleveldefs=%r, closuredefs=%r, "\ |
| "argumentdeclared=%r)" %\ |
| ( |
| list(self.declared), |
| list(self.locally_declared), |
| list(self.undeclared), |
| [c.name for c in self.topleveldefs.values()], |
| [c.name for c in self.closuredefs.values()], |
| self.argument_declared) |
| |
| def check_declared(self, node): |
| """update the state of this Identifiers with the undeclared |
| and declared identifiers of the given node.""" |
| |
| for ident in node.undeclared_identifiers(): |
| if ident != 'context' and\ |
| ident not in self.declared.union(self.locally_declared): |
| self.undeclared.add(ident) |
| for ident in node.declared_identifiers(): |
| self.locally_declared.add(ident) |
| |
| def add_declared(self, ident): |
| self.declared.add(ident) |
| if ident in self.undeclared: |
| self.undeclared.remove(ident) |
| |
| def visitExpression(self, node): |
| self.check_declared(node) |
| |
| def visitControlLine(self, node): |
| self.check_declared(node) |
| |
| def visitCode(self, node): |
| if not node.ismodule: |
| self.check_declared(node) |
| self.locally_assigned = self.locally_assigned.union( |
| node.declared_identifiers()) |
| |
| def visitNamespaceTag(self, node): |
| # only traverse into the sub-elements of a |
| # <%namespace> tag if we are the branch created in |
| # write_namespaces() |
| if self.node is node: |
| for n in node.nodes: |
| n.accept_visitor(self) |
| |
| def _check_name_exists(self, collection, node): |
| existing = collection.get(node.funcname) |
| collection[node.funcname] = node |
| if existing is not None and \ |
| existing is not node and \ |
| (node.is_block or existing.is_block): |
| raise exceptions.CompileException( |
| "%%def or %%block named '%s' already " |
| "exists in this template." % |
| node.funcname, **node.exception_kwargs) |
| |
| def visitDefTag(self, node): |
| if node.is_root() and not node.is_anonymous: |
| self._check_name_exists(self.topleveldefs, node) |
| elif node is not self.node: |
| self._check_name_exists(self.closuredefs, node) |
| |
| for ident in node.undeclared_identifiers(): |
| if ident != 'context' and \ |
| ident not in self.declared.union(self.locally_declared): |
| self.undeclared.add(ident) |
| |
| # visit defs only one level deep |
| if node is self.node: |
| for ident in node.declared_identifiers(): |
| self.argument_declared.add(ident) |
| |
| for n in node.nodes: |
| n.accept_visitor(self) |
| |
| def visitBlockTag(self, node): |
| if node is not self.node and not node.is_anonymous: |
| |
| if isinstance(self.node, parsetree.DefTag): |
| raise exceptions.CompileException( |
| "Named block '%s' not allowed inside of def '%s'" |
| % (node.name, self.node.name), **node.exception_kwargs) |
| elif isinstance(self.node, |
| (parsetree.CallTag, parsetree.CallNamespaceTag)): |
| raise exceptions.CompileException( |
| "Named block '%s' not allowed inside of <%%call> tag" |
| % (node.name, ), **node.exception_kwargs) |
| |
| for ident in node.undeclared_identifiers(): |
| if ident != 'context' and \ |
| ident not in self.declared.union(self.locally_declared): |
| self.undeclared.add(ident) |
| |
| if not node.is_anonymous: |
| self._check_name_exists(self.topleveldefs, node) |
| self.undeclared.add(node.funcname) |
| elif node is not self.node: |
| self._check_name_exists(self.closuredefs, node) |
| for ident in node.declared_identifiers(): |
| self.argument_declared.add(ident) |
| for n in node.nodes: |
| n.accept_visitor(self) |
| |
| def visitTextTag(self, node): |
| for ident in node.undeclared_identifiers(): |
| if ident != 'context' and \ |
| ident not in self.declared.union(self.locally_declared): |
| self.undeclared.add(ident) |
| |
| def visitIncludeTag(self, node): |
| self.check_declared(node) |
| |
| def visitPageTag(self, node): |
| for ident in node.declared_identifiers(): |
| self.argument_declared.add(ident) |
| self.check_declared(node) |
| |
| def visitCallNamespaceTag(self, node): |
| self.visitCallTag(node) |
| |
| def visitCallTag(self, node): |
| if node is self.node: |
| for ident in node.undeclared_identifiers(): |
| if ident != 'context' and \ |
| ident not in self.declared.union( |
| self.locally_declared): |
| self.undeclared.add(ident) |
| for ident in node.declared_identifiers(): |
| self.argument_declared.add(ident) |
| for n in node.nodes: |
| n.accept_visitor(self) |
| else: |
| for ident in node.undeclared_identifiers(): |
| if ident != 'context' and \ |
| ident not in self.declared.union( |
| self.locally_declared): |
| self.undeclared.add(ident) |
| |
| |
| _FOR_LOOP = re.compile( |
| r'^for\s+((?:\(?)\s*[A-Za-z_][A-Za-z_0-9]*' |
| r'(?:\s*,\s*(?:[A-Za-z_][A-Za-z0-9_]*),??)*\s*(?:\)?))\s+in\s+(.*):' |
| ) |
| |
| |
| def mangle_mako_loop(node, printer): |
| """converts a for loop into a context manager wrapped around a for loop |
| when access to the `loop` variable has been detected in the for loop body |
| """ |
| loop_variable = LoopVariable() |
| node.accept_visitor(loop_variable) |
| if loop_variable.detected: |
| node.nodes[-1].has_loop_context = True |
| match = _FOR_LOOP.match(node.text) |
| if match: |
| printer.writelines( |
| 'loop = __M_loop._enter(%s)' % match.group(2), |
| 'try:' |
| # 'with __M_loop(%s) as loop:' % match.group(2) |
| ) |
| text = 'for %s in loop:' % match.group(1) |
| else: |
| raise SyntaxError("Couldn't apply loop context: %s" % node.text) |
| else: |
| text = node.text |
| return text |
| |
| |
| class LoopVariable(object): |
| |
| """A node visitor which looks for the name 'loop' within undeclared |
| identifiers.""" |
| |
| def __init__(self): |
| self.detected = False |
| |
| def _loop_reference_detected(self, node): |
| if 'loop' in node.undeclared_identifiers(): |
| self.detected = True |
| else: |
| for n in node.get_children(): |
| n.accept_visitor(self) |
| |
| def visitControlLine(self, node): |
| self._loop_reference_detected(node) |
| |
| def visitCode(self, node): |
| self._loop_reference_detected(node) |
| |
| def visitExpression(self, node): |
| self._loop_reference_detected(node) |