blob: c6ca48e187a0600bcd0f35b6ba926603aed0844e [file] [log] [blame]
# Copyright 2022 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""
Test to prevent changes to SDK's JSON metadata schemas.
Ensures that developers will not unintentionally break down stream
customers by changing the JSON schemas without knowing the effect
the change will have.
"""
import argparse
import textwrap
import json
import os
import sys
from typing import Any
width = 4
def get_pretty_str(
changes: dict[tuple[str, str], Any],
str_list: list[str] | None = None,
indent: int = 0,
) -> str:
if not str_list:
str_list = []
for key, value in changes.items():
str_list.append(textwrap.indent(f"{str(key)}:", indent * " "))
if isinstance(value, dict):
get_pretty_str(value, str_list, indent + width)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
get_pretty_str(item, str_list, indent + width)
else:
str_list.append(
textwrap.indent(f"{str(value)}", (indent + width) * " ")
)
if indent == 0:
return "\n".join(str_list)
return ""
class GoldenMismatchError(Exception):
"""Exception raised when golden and current are not identical."""
def __init__(
self,
mismatch: list[tuple[str, str]],
breaks: dict[tuple[str, str], Any],
non_breaks: dict[tuple[str, str], Any],
) -> None:
self.breaks = breaks
self.non_breaks = non_breaks
cmd_str = ""
for path in mismatch:
cmd_str += "\n" + update_cmd(path[0], path[1])
self.cmd_str = cmd_str
def __str__(self) -> str:
# Notice of any changes detected, including those not yet classified
# as breaking or non-breaking.
ret_str = (
f"Detected changes to JSON Schemas.\n"
f"All breaking and non-breaking changes MUST result in a change to the “id” field.\n"
f"Please do not change the schemas without consulting"
f" with sdk-dev@fuchsia.dev.\n"
f"To prevent breaking SDK integrators, "
f"the contents of current schemas should match goldens.\n"
)
if self.breaks:
formatted_breaks = get_pretty_str(self.breaks)
ret_str += (
textwrap.indent("Breaking Changes:", width * " ")
+ "\n"
+ textwrap.indent(f"{formatted_breaks}", 2 * width * " ")
+ "\n"
)
if self.non_breaks:
formatted_non_breaks = get_pretty_str(self.non_breaks)
ret_str += (
textwrap.indent("Non-breaking Changes:", width * " ")
+ "\n"
+ textwrap.indent(f"{formatted_non_breaks}", 2 * width * " ")
+ "\n"
)
ret_str += (
f"If you have approval to make this change, run: {self.cmd_str}\n"
)
return ret_str
# TODO(https://fxbug.dev/42058424): Update JSON schemas check to be a compatibility test.
# class BreakingChangesError(Exception):
# Exception raised when breaking changes are detected.
# def __init__(self, changes):
# self.changes = changes
# cmd_str = ""
# for key in changes.keys():
# cmd_str += "\n" + update_cmd(key[1], key[0])
# self.cmd_str = cmd_str
# def __str__(self):
# return (
# f"Detected the following breaking changes:\n"
# f"{self.changes}\n"
# f"Please do not make this change without consulting"
# f" with sdk-dev@fuchsia.dev.\n"
# f"To prevent potential breaking SDK integrators, "
# f"the contents of current schemas should match goldens.\n"
# f"If you have approval to make this change, run:{self.cmd_str}\n")
# class NotifyNonBreakingChanges(Exception):
# Exception raised when non-breaking changes are detected.
# def __init__(self, changes):
# self.changes = changes
# cmd_str = ""
# for key in changes.keys():
# cmd_str += "\n" + update_cmd(key[1], key[0])
# self.cmd_str = cmd_str
# def __str__(self):
# return (
# f"Detected the following non-breaking changes:\n{self.changes}\n"
# f"If you want to continue with this change, run:{self.cmd_str}\n")
class InvalidJsonError(Exception):
"""Exception raised when invalid JSON in a golden file is detected."""
def __init__(self, invalid_schema: str) -> None:
self.invalid_schema = invalid_schema
def __str__(self) -> str:
return (
f"Detected invalid JSON Schema {self.invalid_schema}.\n"
f"Consult with sdk-dev@fuchsia.dev to update this golden file.\n"
)
class MissingInputError(Exception):
"""Exception raised when a missing golden,
current, or stamp file is detected."""
def __init__(self, missing: str, schema: bool = True) -> None:
self.missing = missing
self.schema = schema
def __str__(self) -> str:
if self.schema:
return (
f"Detected missing JSON Schema {self.missing}.\n"
f"Please consult with sdk-dev@fuchsia.dev if you are"
f" planning to remove a schema.\n"
f"If you have approval to make this change, remove the "
f"schema and corresponding golden file from the schema lists.\n"
)
return f"The verification file path appears to be missing:\n{self.missing}\n"
class SchemaListMismatchError(Exception):
"""Exception raised when the golden list and current
list contain a different number of schemas or when
there is a mismatch of the schema filenames."""
def __init__(
self, goldens: list[str], currents: list[str], err_type: int
) -> None:
self.goldens = goldens
self.currents = currents
self.err_type = err_type
def __str__(self) -> str:
if self.err_type == 0:
return (
f"Detected that the golden list contains "
f"a different number of schemas than the current list.\n"
f"Golden:\n{self.goldens}\nCurrent:\n{self.currents}\n"
f"Please make sure each schema has a corresponding golden file,"
f" and vice versa.\n"
)
else:
return (
f"Detected that filenames in the golden list, {self.goldens},"
f" do not correspond to those in the current list, "
f"{self.currents}.\n"
f"Please make sure each schema has a corresponding golden file,"
f" and vice versa.\n"
)
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument(
"--golden",
nargs="+",
help="Files in the golden directory",
required=True,
)
parser.add_argument(
"--current",
nargs="+",
help="Files in the local directory",
required=True,
)
parser.add_argument(
"--stamp", help="Verification output file path", required=True
)
args = parser.parse_args()
err = None
try:
fail_on_breaking_changes(args.current, args.golden)
except (
SchemaListMismatchError,
MissingInputError,
InvalidJsonError,
GoldenMismatchError,
) as e:
err = e
try:
with open(args.stamp, "w") as stamp_file:
stamp_file.write("Verified!\n")
except FileNotFoundError:
err = MissingInputError(missing=args.stamp, schema=False)
if not err:
return 0
else:
print("ERROR: ", err)
# Force developers to acknowledge a stale golden.
return 1
def fail_on_breaking_changes(
current_list: list[str], golden_list: list[str]
) -> None:
"""Fails if current and golden aren't identical."""
if not len(golden_list) == len(current_list):
raise SchemaListMismatchError(
currents=current_list, goldens=golden_list, err_type=0
)
golden_list.sort()
current_list.sort()
total_non_breaking_changes = dict()
total_breaking_changes = dict()
mismatches: list[tuple[str, str]] = []
for schema in zip(golden_list, current_list):
if not (
(os.path.basename(schema[1]) + ".golden")
== os.path.basename(schema[0])
):
raise SchemaListMismatchError(
currents=current_list, goldens=golden_list, err_type=1
)
# In order to not be reliant on GN side-effects, the below try-except
# statements cover the case of a missing / invalid schema.
# Generally, before this script runs, a build error will occur in
# these cases.
try:
schema_file = open(schema[0], "r")
except FileNotFoundError:
raise MissingInputError(
missing=schema[0],
)
try:
schema_data = json.load(schema_file)
except json.decoder.JSONDecodeError:
raise InvalidJsonError(
invalid_schema=schema[0],
)
schema_file.close()
try:
curr_file = open(schema[1], "r")
except FileNotFoundError:
raise MissingInputError(
missing=schema[1],
)
try:
curr_data = json.load(curr_file)
except json.decoder.JSONDecodeError:
mismatches.append((schema[1], schema[0]))
curr_file.close()
non_breaking_changes: list[dict[str, Any]] = []
breaking_changes: list[dict[str, Any]] = []
compare_schema_structure(
curr_data, schema_data, non_breaking_changes, breaking_changes
)
if non_breaking_changes:
total_non_breaking_changes[schema] = non_breaking_changes
if breaking_changes:
total_breaking_changes[schema] = breaking_changes
if (
schema in total_breaking_changes
or schema in total_non_breaking_changes
):
if "id" in curr_data and curr_data["id"] == schema_data["id"]:
append_to_list(
total_breaking_changes[schema],
"All breaking changes must result in a change to the 'id' field",
schema_data["id"],
)
if not schema_data == curr_data:
mismatches.append((schema[1], schema[0]))
if mismatches:
raise GoldenMismatchError(
mismatch=mismatches,
breaks=total_breaking_changes,
non_breaks=total_non_breaking_changes,
)
return None
# Compare the input schemas' keys. Return breaking and non-breaking changes.
def compare_schema_structure(
curr_data: dict[str, Any],
gold_data: dict[str, Any],
non_breaking: list[dict[str, Any]] = [],
breaking_changes: list[dict[str, Any]] = [],
level: str = "root",
) -> None:
if isinstance(curr_data, dict) and isinstance(gold_data, dict):
curr_keys = set(curr_data.keys())
gold_keys = set(gold_data.keys())
if curr_data.keys() != gold_data.keys():
if gold_keys.difference(curr_keys):
append_to_list(
breaking_changes,
f"Missing keys of {level}",
set(gold_keys.difference(curr_keys)),
)
if curr_keys.difference(gold_keys):
append_to_list(
non_breaking,
f"New keys of {level}",
set(curr_keys.difference(gold_keys)),
)
check_keys = [
("required", list),
("enum", list),
("additionalProperties", bool),
("type", str),
]
for pair in check_keys:
if pair[0] in curr_keys and pair[0] in gold_keys:
check_json_value(
pair[0],
gold_data,
curr_data,
level,
pair[1],
breaking_changes,
non_breaking,
)
for key in gold_keys.intersection(curr_keys):
# Values under "properties" are held to different policies.
# These parameters must be checked at the level above
# "properties" in order to access the "required" list and
# "additionalProperties" value as well.
if key == "properties":
if isinstance(curr_data[key], dict) and isinstance(
gold_data[key], dict
):
curr_props = set(curr_data[key].keys())
gold_props = set(gold_data[key].keys())
if curr_data[key].keys() != gold_data[key].keys():
temp_breaks = set()
temp_nonbreaks = set()
# Removing a parameter is only a breaking change
# if additionalProperties is set to False
# or the parameter is listed under "required".
if gold_props.difference(curr_props):
for removed_key in set(
gold_props.difference(curr_props)
):
req = False
if (
"required" in gold_data
and removed_key in gold_data["required"]
):
req = True
temp_breaks.add(removed_key)
if (
not req
and "additionalProperties" in gold_data
):
if gold_data["additionalProperties"]:
temp_nonbreaks.add(removed_key)
else:
temp_breaks.add(removed_key)
# Adding a parameter is a non-breaking change unless
# the parameter is listed under "required".
if curr_props.difference(gold_props):
for added_key in set(
curr_props.difference(gold_props)
):
if "required" in gold_data:
if added_key in curr_data["required"]:
temp_breaks.add(added_key)
else:
temp_nonbreaks.add(added_key)
else:
temp_nonbreaks.add(added_key)
# If any parameters were non-breaking changes, append
# them all to the set of non-breaking changes.
if temp_nonbreaks:
append_to_list(
non_breaking,
f"Changed parameters on level {level}.properties",
temp_nonbreaks,
)
# If any parameters were breaking changes, append
# them all to the set of breaking changes.
if temp_breaks:
append_to_list(
breaking_changes,
f"Changed parameters on level {level}.properties",
temp_breaks,
)
# Pass in the parameters of "properties" recursively
# in order to avoid passing in "properties".
for k in gold_props.intersection(curr_props):
compare_schema_structure(
curr_data[key][k],
gold_data[key][k],
non_breaking,
breaking_changes,
f"{level}.{key}.{k}",
)
else:
compare_schema_structure(
curr_data[key],
gold_data[key],
non_breaking,
breaking_changes,
f"{level}.{key}",
)
elif isinstance(gold_data, dict):
append_to_list(
breaking_changes, f"Missing keys of {level}", set(gold_data.keys())
)
elif isinstance(curr_data, dict):
append_to_list(
non_breaking, f"New keys of {level}", set(curr_data.keys())
)
def check_json_value(
key: str,
gold_data: dict[str, Any],
curr_data: dict[str, Any],
level: str,
expected_type: type,
breaking_changes: list[dict[str, Any]],
non_breaking: list[dict[str, Any]],
) -> None:
"""Classify specific changes as breaking or
non-breaking accordingly, if they exist.
Args:
key: The key of the key-value JSON pair in question.
gold_data: JSON content of the golden schema file.
curr_data: JSON content of the current schema file.
level: Level of the change within the JSON, beginning with 'root'.
type_to_look_for: Expected type of the JSON value in question.
breaking_changes: List of breaking changes.
non_breaking: List of non-breaking changes.
"""
if expected_type is list:
if isinstance(curr_data[key], expected_type):
gold_data[key].sort()
curr_data[key].sort()
if gold_data[key] != curr_data[key]:
append_to_list(
breaking_changes,
f"'{key}' parameters changed on {level}",
{
"golden": set(gold_data[key]),
"current": set(curr_data[key]),
},
)
else:
append_to_list(
breaking_changes,
f"'{key}' parameters changed on {level}",
{
"golden": set(gold_data[key]),
"current": curr_data[key],
},
)
elif expected_type is bool:
if isinstance(curr_data[key], expected_type):
if curr_data[key] != gold_data[key]:
if curr_data[key]:
append_to_list(
non_breaking,
f"New value for '{key}' on {level}",
curr_data[key],
)
else:
append_to_list(
breaking_changes,
f"Value for '{key}' on {level} should be",
gold_data[key],
)
else:
append_to_list(
breaking_changes,
f"Value for '{key}' on {level} should be",
gold_data[key],
)
elif expected_type is str:
if isinstance(gold_data[key], expected_type):
if isinstance(curr_data[key], expected_type):
if curr_data[key] != gold_data[key]:
append_to_list(
breaking_changes,
f"Value for '{key}' on {level} should be",
gold_data[key],
)
else:
append_to_list(
breaking_changes,
f"Value for '{key}' on {level} should be",
gold_data[key],
)
def append_to_list(
list_var: list[dict[str, Any]], key: str, value: Any
) -> None:
list_var.append({key: value})
def update_cmd(current: str, golden: str) -> str:
"""For presentation only. Never execute the cmd output pragmatically because
it may present a security exploit."""
return 'cp "{}" "{}"'.format(
os.path.abspath(current), os.path.abspath(golden)
)
if __name__ == "__main__":
sys.exit(main())