WIP
diff --git a/jsonschema/_annotation.py b/jsonschema/_annotation.py
new file mode 100644
index 0000000..d83a35d
--- /dev/null
+++ b/jsonschema/_annotation.py
@@ -0,0 +1,63 @@
+"""
+Support for JSON Schema annotation collection.
+"""
+
+from collections import deque
+
+import attr
+
+from jsonschema._utils import __no_init_subclass__
+
+
+@attr.s
+class Annotator:
+ """
+ An annotator supervises validation of an instance, annotating as it goes.
+
+ Whereas validators, type checkers, format checkers and the like
+ are generally stateless, an annotator is *stateful*. It tracks
+ the incremental progress as validation –or more broadly pure
+ annotation– of an instance is progressing.
+ """
+
+ _validator = attr.ib(
+ repr=lambda validator: f"<{validator.__class__.__name__}>",
+ kw_only=True,
+ )
+
+ def __attrs_post_init__(self):
+ self._scope_stack = deque([self._validator.ID_OF(self._validator.schema)])
+
+ def descend(self, instance, schema, path=None, schema_path=None):
+ validator = attr.evolve(self._validator, schema=schema)
+ for error in validator.iter_errors(instance):
+ if path is not None:
+ error.path.appendleft(path)
+ if schema_path is not None:
+ error.schema_path.appendleft(schema_path)
+ yield error
+
+ __init_subclass__ = __no_init_subclass__
+
+ # TODO: IMPROVEME / belongs on ref resolver?
+ def scopes_moving_outward(self):
+ yield self.resolver.resolution_scope, self._validator.schema
+ for each in reversed(self.resolver._scopes_stack[1:]):
+ yield self.resolver.resolve(each)
+
+ def descend_at_ref(self, instance, ref):
+ scope, resolved = self._validator.resolver.resolve(
+ ref=ref,
+ resolution_scope=self._scope_stack[-1],
+ )
+ self._scope_stack.append(scope)
+ yield from self.descend(instance=instance, schema=resolved)
+ self._scope_stack.pop()
+
+ # TODO: REMOVEME
+ @property
+ def format_checker(self): return self._validator.format_checker
+ @property
+ def is_valid(self): return self._validator.is_valid
+ @property
+ def is_type(self): return self._validator.is_type
diff --git a/jsonschema/_legacy_validators.py b/jsonschema/_legacy_validators.py
index b50b78f..11e18ae 100644
--- a/jsonschema/_legacy_validators.py
+++ b/jsonschema/_legacy_validators.py
@@ -206,14 +206,12 @@
)
-def recursiveRef(validator, recursiveRef, instance, schema):
- scope_stack = validator.resolver.scopes_stack_copy
- lookup_url, target = validator.resolver.resolution_scope, validator.schema
-
- for each in reversed(scope_stack[1:]):
- lookup_url, next_target = validator.resolver.resolve(each)
- if next_target.get("$recursiveAnchor"):
- target = next_target
+def recursiveRef(annotator, recursiveRef, instance, schema):
+ outward = (schema for _, schema in annotator.scopes_moving_outward())
+ target = next(outward)
+ for each in outward:
+ if each.get("$recursiveAnchor"):
+ target = each
else:
break
diff --git a/jsonschema/_utils.py b/jsonschema/_utils.py
index f3603c5..454609d 100644
--- a/jsonschema/_utils.py
+++ b/jsonschema/_utils.py
@@ -71,6 +71,17 @@
return vocabulary
+def __no_init_subclass__(*args, **kwargs):
+ """
+ Warn users that subclassing is not part of the public API of objects.
+ """
+ raise RuntimeError(
+ "jsonschema classes do not support subclassing. "
+ "If an API is missing which prevents extension, please "
+ "file a ticket at https://github.com/Julian/jsonschema/issues."
+ )
+
+
def format_as_index(container, indices):
"""
Construct a single string containing indexing operations for the indices.
diff --git a/jsonschema/_validators.py b/jsonschema/_validators.py
index c6d0e9a..b24dc26 100644
--- a/jsonschema/_validators.py
+++ b/jsonschema/_validators.py
@@ -282,26 +282,14 @@
yield ValidationError(f"{instance!r} is not one of {enums!r}")
-def ref(validator, ref, instance, schema):
- resolve = getattr(validator.resolver, "resolve", None)
- if resolve is None:
- with validator.resolver.resolving(ref) as resolved:
- yield from validator.descend(instance, resolved)
- else:
- scope, resolved = validator.resolver.resolve(ref)
- validator.resolver.push_scope(scope)
-
- try:
- yield from validator.descend(instance, resolved)
- finally:
- validator.resolver.pop_scope()
+def ref(annotator, ref, instance, schema):
+ yield from annotator.descend_at_ref(instance=instance, ref=ref)
def dynamicRef(validator, dynamicRef, instance, schema):
_, fragment = urldefrag(dynamicRef)
- scope_stack = validator.resolver.scopes_stack_copy
- for url in scope_stack:
+ for url in []:
lookup_url = urljoin(url, dynamicRef)
with validator.resolver.resolving(lookup_url) as subschema:
if ("$dynamicAnchor" in subschema
@@ -309,8 +297,7 @@
yield from validator.descend(instance, subschema)
break
else:
- with validator.resolver.resolving(dynamicRef) as subschema:
- yield from validator.descend(instance, subschema)
+ yield from validator.descend_at_ref(instance, dynamicRef)
def type(validator, types, instance, schema):
diff --git a/jsonschema/tests/test_annotation.py b/jsonschema/tests/test_annotation.py
new file mode 100644
index 0000000..d625a0f
--- /dev/null
+++ b/jsonschema/tests/test_annotation.py
@@ -0,0 +1,156 @@
+from unittest import TestCase
+
+from jsonschema._annotation import Annotator
+from jsonschema.exceptions import UnknownType
+from jsonschema.validators import _LATEST_VERSION, extend
+
+
+class TestAnnotator(TestCase):
+ def test_descend(self):
+ annotator = Annotator(validator=_LATEST_VERSION({}))
+ errors = {
+ error.message
+ for error in annotator.descend(instance=37, schema=False)
+ }
+ self.assertEqual(errors, {"False schema does not allow 37"})
+
+ def test_descend_multiple_errors(self):
+ annotator = Annotator(validator=_LATEST_VERSION({}))
+ errors = {
+ error.message
+ for error in annotator.descend(
+ instance=37,
+ schema={"type": "string", "minimum": 38},
+ )
+ }
+ self.assertEqual(
+ errors, {
+ "37 is less than the minimum of 38",
+ "37 is not of type 'string'",
+ },
+ )
+
+ def test_descend_extend_path(self):
+ annotator = Annotator(validator=_LATEST_VERSION({}))
+ errors = {
+ (
+ error.message,
+ tuple(error.absolute_path),
+ tuple(error.absolute_schema_path),
+ ) for error in annotator.descend(
+ instance={"b": {"c": 37}},
+ schema={
+ "properties": {"b": {"const": "a"}},
+ "minProperties": 2,
+ },
+ path="a",
+ )
+ }
+ self.assertEqual(
+ errors, {
+ (
+ "{'b': {'c': 37}} does not have enough properties",
+ ("a",),
+ ("minProperties",)
+ ),
+ (
+ "'a' was expected",
+ ("a", "b"),
+ ("properties", "b", "const"),
+ ),
+ },
+ )
+
+ def test_descend_extend_schema_path(self):
+ annotator = Annotator(validator=_LATEST_VERSION({}))
+ errors = {
+ (
+ error.message,
+ tuple(error.absolute_path),
+ tuple(error.absolute_schema_path),
+ ) for error in annotator.descend(
+ instance={"b": {"c": 37}},
+ schema={
+ "properties": {"b": {"const": "a"}},
+ "minProperties": 2,
+ },
+ schema_path="no37",
+ )
+ }
+ self.assertEqual(
+ errors, {
+ (
+ "{'b': {'c': 37}} does not have enough properties",
+ (),
+ ("no37", "minProperties")
+ ),
+ (
+ "'a' was expected",
+ ("b",),
+ ("no37", "properties", "b", "const"),
+ ),
+ },
+ )
+
+ def test_descend_extend_both_paths(self):
+ annotator = Annotator(validator=_LATEST_VERSION({}))
+ errors = {
+ (
+ error.message,
+ tuple(error.absolute_path),
+ tuple(error.absolute_schema_path),
+ ) for error in annotator.descend(
+ instance={"b": {"c": 37}},
+ schema={
+ "properties": {"b": {"const": "a"}},
+ "minProperties": 2,
+ },
+ path="foo",
+ schema_path="no37",
+ )
+ }
+ self.assertEqual(
+ errors, {
+ (
+ "{'b': {'c': 37}} does not have enough properties",
+ ("foo",),
+ ("no37", "minProperties")
+ ),
+ (
+ "'a' was expected",
+ ("foo", "b"),
+ ("no37", "properties", "b", "const"),
+ ),
+ },
+ )
+
+ def test_is_type(self):
+ annotator = Annotator(validator=_LATEST_VERSION({}))
+ self.assertTrue(annotator.is_type("foo", "string"))
+
+ def test_is_not_type(self):
+ annotator = Annotator(validator=_LATEST_VERSION({}))
+ self.assertFalse(annotator.is_type(37, "string"))
+
+ def test_is_unknown_type(self):
+ annotator = Annotator(validator=_LATEST_VERSION({}))
+ with self.assertRaises(UnknownType) as e:
+ self.assertFalse(annotator.is_type(37, "boopety"))
+ self.assertEqual(
+ vars(e.exception),
+ {"type": "boopety", "instance": 37, "schema": {}},
+ )
+
+ def test_repr(self):
+ validator = extend(_LATEST_VERSION)({})
+ annotator = Annotator(validator=validator)
+ self.assertEqual(
+ repr(annotator),
+ "Annotator(_validator=<Validator>)",
+ )
+
+ def test_it_does_not_allow_subclassing(self):
+ with self.assertRaises(RuntimeError) as e:
+ class NoNo(Annotator):
+ pass
+ self.assertIn("support subclassing", str(e.exception))
diff --git a/jsonschema/validators.py b/jsonschema/validators.py
index f94e9d0..2a21e21 100644
--- a/jsonschema/validators.py
+++ b/jsonschema/validators.py
@@ -6,10 +6,11 @@
from urllib.parse import unquote, urldefrag, urljoin, urlsplit
from urllib.request import urlopen
from warnings import warn
-import contextlib
import json
import warnings
+import attr
+
from jsonschema import (
_legacy_validators,
_types,
@@ -17,6 +18,7 @@
_validators,
exceptions,
)
+from jsonschema._annotation import Annotator
validators = {}
meta_schemas = _utils.URIDict()
@@ -143,6 +145,7 @@
a new `jsonschema.IValidator` class
"""
+ @attr.s
class Validator:
VALIDATORS = dict(validators)
@@ -151,22 +154,24 @@
TYPE_CHECKER = type_checker
ID_OF = staticmethod(id_of)
- def __init__(self, schema, resolver=None, format_checker=None):
- if resolver is None:
- resolver = RefResolver.from_schema(schema, id_of=id_of)
+ schema = attr.ib()
+ resolver = attr.ib(default=None)
+ format_checker = attr.ib(default=None)
- self.resolver = resolver
- self.format_checker = format_checker
- self.schema = schema
+ def __attrs_post_init__(self):
+ if self.resolver is None:
+ self.resolver = RefResolver.from_schema(
+ self.schema,
+ id_of=id_of,
+ )
@classmethod
def check_schema(cls, schema):
for error in cls(cls.META_SCHEMA).iter_errors(schema):
raise exceptions.SchemaError.create_from(error)
- def iter_errors(self, instance, _schema=None):
- if _schema is None:
- _schema = self.schema
+ def iter_errors(self, instance):
+ _schema = self.schema
if _schema is True:
return
@@ -180,38 +185,25 @@
)
return
- scope = id_of(_schema)
- if scope:
- self.resolver.push_scope(scope)
- try:
- for k, v in applicable_validators(_schema):
- validator = self.VALIDATORS.get(k)
- if validator is None:
- continue
+ annotator = Annotator(validator=self)
- errors = validator(self, v, instance, _schema) or ()
- for error in errors:
- # set details if not already set by the called fn
- error._set(
- validator=k,
- validator_value=v,
- instance=instance,
- schema=_schema,
- )
- if k not in {"if", "$ref"}:
- error.schema_path.appendleft(k)
- yield error
- finally:
- if scope:
- self.resolver.pop_scope()
+ for k, v in applicable_validators(_schema):
+ validator = self.VALIDATORS.get(k)
+ if validator is None:
+ continue
- def descend(self, instance, schema, path=None, schema_path=None):
- for error in self.iter_errors(instance, schema):
- if path is not None:
- error.path.appendleft(path)
- if schema_path is not None:
- error.schema_path.appendleft(schema_path)
- yield error
+ errors = validator(annotator, v, instance, _schema) or ()
+ for error in errors:
+ # set details if not already set by the called fn
+ error._set(
+ validator=k,
+ validator_value=v,
+ instance=instance,
+ schema=_schema,
+ )
+ if k not in {"if", "$ref"}:
+ error.schema_path.appendleft(k)
+ yield error
def validate(self, *args, **kwargs):
for error in self.iter_errors(*args, **kwargs):
@@ -608,7 +600,6 @@
self.cache_remote = cache_remote
self.handlers = dict(handlers)
- self._scopes_stack = [base_uri]
self.store = _utils.URIDict(_store_schema_list())
self.store.update(store)
self.store[base_uri] = referrer
@@ -634,124 +625,27 @@
return cls(base_uri=id_of(schema), referrer=schema, *args, **kwargs)
- def push_scope(self, scope):
- """
- Enter a given sub-scope.
-
- Treats further dereferences as being performed underneath the
- given scope.
- """
+ """ PUSH:
+ self._scopes_stack = [base_uri]
self._scopes_stack.append(
self._urljoin_cache(self.resolution_scope, scope),
)
+ POP:
+ self._scopes_stack.pop()
- def pop_scope(self):
- """
- Exit the most recent entered scope.
+ RESOLUTION SCOPE:
+ self._scopes_stack[-1]
- Treats further dereferences as being performed underneath the
- original scope.
-
- Don't call this method more times than `push_scope` has been
- called.
- """
- try:
- self._scopes_stack.pop()
- except IndexError:
- raise exceptions.RefResolutionError(
- "Failed to pop the scope from an empty stack. "
- "`pop_scope()` should only be called once for every "
- "`push_scope()`",
- )
-
- @property
- def resolution_scope(self):
- """
- Retrieve the current resolution scope.
- """
- return self._scopes_stack[-1]
-
- @property
- def scopes_stack_copy(self):
- """
- Retrieve a copy of the stack of resolution scopes.
- """
- return self._scopes_stack.copy()
-
- @property
- def base_uri(self):
- """
- Retrieve the current base URI, not including any fragment.
- """
+ BASE URI:
uri, _ = urldefrag(self.resolution_scope)
return uri
+ """
- @contextlib.contextmanager
- def in_scope(self, scope):
- """
- Temporarily enter the given scope for the duration of the context.
- """
- warnings.warn(
- "jsonschema.RefResolver.in_scope is deprecated and will be "
- "removed in a future release.",
- DeprecationWarning,
- )
- self.push_scope(scope)
- try:
- yield
- finally:
- self.pop_scope()
-
- @contextlib.contextmanager
- def resolving(self, ref):
- """
- Resolve the given ``ref`` and enter its resolution scope.
-
- Exits the scope on exit of this context manager.
-
- Arguments:
-
- ref (str):
-
- The reference to resolve
- """
-
- url, resolved = self.resolve(ref)
- self.push_scope(url)
- try:
- yield resolved
- finally:
- self.pop_scope()
-
- def _finditem(self, schema, key):
- results = []
- if isinstance(schema, dict):
- if key in schema:
- results.append(schema)
-
- for v in schema.values():
- if isinstance(v, dict):
- results += self._finditem(v, key)
-
- return results
-
- def resolve(self, ref):
+ def resolve(self, ref, resolution_scope):
"""
Resolve the given reference.
"""
- url = self._urljoin_cache(self.resolution_scope, ref).rstrip("/")
-
- uri, fragment = urldefrag(url)
-
- for subschema in self._finditem(self.referrer, "$id"):
- target_uri = self._urljoin_cache(
- self.resolution_scope, subschema["$id"],
- )
- if target_uri.rstrip("/") == uri.rstrip("/"):
- if fragment:
- subschema = self.resolve_fragment(subschema, fragment)
- return url, subschema
-
+ url = self._urljoin_cache(resolution_scope, ref)
return url, self._remote_cache(url)
def resolve_from_url(self, url):
@@ -784,20 +678,11 @@
a URI fragment to resolve within it
"""
- fragment = fragment.lstrip("/")
+ fragment = fragment.lstrip(u"/")
+ parts = unquote(fragment).split(u"/") if fragment else []
- if not fragment:
- return document
-
- for keyword in ["$anchor", "$dynamicAnchor"]:
- for subschema in self._finditem(document, keyword):
- if fragment == subschema[keyword]:
- return subschema
-
- # Resolve via path
- parts = unquote(fragment).split("/") if fragment else []
for part in parts:
- part = part.replace("~1", "/").replace("~0", "~")
+ part = part.replace(u"~1", u"/").replace(u"~0", u"~")
if isinstance(document, Sequence):
# Array indexes should be turned into integers