blob: 418c52a34a031adfda7e1598012dfa2957e378d1 [file] [log] [blame]
# Copyright 2024 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Sensor schema validation tooling."""
from collections.abc import Sequence
import importlib.resources
import logging
from pathlib import Path
import jsonschema # type: ignore
import jsonschema.exceptions # type: ignore
import yaml
_METADATA_SCHEMA = yaml.safe_load(
importlib.resources.read_text("pw_sensor", "metadata_schema.json")
)
_DEPENDENCY_SCHEMA = yaml.safe_load(
importlib.resources.read_text("pw_sensor", "dependency_schema.json")
)
_RESOLVED_SCHEMA = yaml.safe_load(
importlib.resources.read_text("pw_sensor", "resolved_schema.json")
)
class Validator:
"""
Context used for validating metadata dictionaries.
What the validator is:
- A system to resolve and verify that declared sensor metadata is well
defined and formatted
- A utility to resolve any and all dependencies when using a specified
metadata file.
What the validator is NOT:
- Code generator
"""
def __init__(
self,
include_paths: Sequence[Path] | None = None,
log_level: int = logging.WARNING,
) -> None:
"""
Construct a Validator with some context of the current run.
Args:
include_paths: An optional list of directories in which to resolve
dependencies
log_level: A desired logging level (defaults to logging.WARNING)
"""
self._include_paths = include_paths if include_paths else []
self._logger = logging.getLogger(self.__class__.__name__)
self._logger.setLevel(log_level)
def validate(self, metadata: dict) -> dict:
"""
Accept a structured metadata description. This dictionary should first
pass the schema provided in metadata_schema.yaml. Then, every channel
used by the sensor should be defined in exactly one of the dependencies.
Example YAML:
deps:
- "pw_sensor/channels.yaml"
compatible:
org: "Bosch"
part: "BMA4xx
supported-buses:
- i2c
channels:
acceleration: []
die_temperature: []
Args:
metadata: Structured sensor data, this will NOT be modified
Returns:
A set of attributes, channels, triggers, and units along with a single
sensor which match the schema in resolved_schema.json.
Raises:
RuntimeError: An error in the schema validation or a missing
definition.
FileNotFoundError: One of the dependencies was not found.
"""
result: dict = {
"attributes": {},
"channels": {},
"triggers": {},
"units": {},
"sensors": {},
}
metadata = metadata.copy()
# Validate the incoming schema
try:
jsonschema.validate(instance=metadata, schema=_METADATA_SCHEMA)
except jsonschema.exceptions.ValidationError as e:
raise RuntimeError(
"ERROR: Malformed sensor metadata YAML:\n"
f"{yaml.safe_dump(metadata, indent=2)}"
) from e
# Resolve all the dependencies, after this, 'result' will have all the
# missing properties for which defaults can be provided
self._resolve_dependencies(metadata=metadata, out=result)
self._logger.debug(
"Resolved dependencies:\n%s", yaml.safe_dump(result, indent=2)
)
# Resolve all channel entries
self._resolve_channels(metadata=metadata, out=result)
# Resolve all attribute entries
self._resolve_attributes(metadata=metadata, out=result)
# Resolve all trigger entries
self._resolve_triggers(metadata=metadata, out=result)
compatible = metadata.pop("compatible")
supported_buses = metadata.pop("supported-buses")
channels = metadata.pop("channels")
attributes = metadata.pop("attributes")
triggers = metadata.pop("triggers")
result["sensors"][f"{compatible['org']},{compatible['part']}"] = {
"compatible": compatible,
"supported-buses": supported_buses,
"channels": channels,
"attributes": attributes,
"triggers": triggers,
"description": metadata.get("description", ""),
"extras": metadata.get("extras", {}),
}
# Validate the final output before returning
try:
jsonschema.validate(instance=result, schema=_RESOLVED_SCHEMA)
except jsonschema.exceptions.ValidationError as e:
raise RuntimeError(
"ERROR: Malformed output YAML: "
f"{yaml.safe_dump(result, indent=2)}"
) from e
return result
def _resolve_dependencies(self, metadata: dict, out: dict) -> None:
"""
Given a list of dependencies, ensure that each of them exists and
matches the schema provided in dependency_schema.yaml. Once loaded, the
content of the definition file will be resolved (filling in any missing
fields that can be inherited) and the final result will be placed in the
'out' dictionary.
Args:
metadata: The full sensor metadata passed to the validate function
out: Output dictionary where the resolved dependencies should be
stored
Raises:
RuntimeError: An error in the schema validation or a missing
definition.
FileNotFoundError: One of the dependencies was not found.
"""
deps: None | list[str] = metadata.get("deps")
if not deps:
self._logger.debug("No dependencies found, skipping imports")
return
merged_deps: dict = {
"attributes": {},
"channels": {},
"triggers": {},
"units": {},
}
for dep in deps:
# Load each of the dependencies, then merge them. This avoids any
# include dependency order issues.
dep_file = self._get_dependency_file(dep)
with open(dep_file, mode="r", encoding="utf-8") as dep_yaml_file:
dep_yaml = yaml.safe_load(dep_yaml_file)
try:
jsonschema.validate(
instance=dep_yaml, schema=_DEPENDENCY_SCHEMA
)
except jsonschema.exceptions.ValidationError as e:
raise RuntimeError(
"ERROR: Malformed dependency YAML: "
f"{yaml.safe_dump(dep_yaml, indent=2)}"
) from e
# Merge all the loaded values into 'merged_deps'
for category in merged_deps:
self._merge_deps(
category=category,
dep_yaml=dep_yaml,
merged_deps=merged_deps,
)
# Backfill any default values from the merged dependencies and put them
# into 'out'
self._backfill_declarations(declarations=merged_deps, out=out)
@staticmethod
def _merge_deps(category: str, dep_yaml: dict, merged_deps: dict) -> None:
"""
Pull all properties from dep_yaml[category] and put them into
merged_deps after validating that no key duplicates exist.
Args:
category: The index of dep_yaml and merged_deps to merge
dep_yaml: The newly loaded dependency YAML
merged_deps: The accumulated dependency map
"""
for key, value in dep_yaml.get(category, {}).items():
assert (
key not in merged_deps[category]
), f"'{key}' was already found under '{category}'"
merged_deps[category][key] = value
def _backfill_declarations(self, declarations: dict, out: dict) -> None:
"""
Add any missing properties of a declaration object.
Args:
declarations: The top level declarations dictionary loaded from the
dependency file.
out: The already resolved map of all defined dependencies
"""
self._backfill_units(declarations=declarations, out=out)
self._backfill_channels(declarations=declarations, out=out)
self._backfill_attributes(declarations=declarations, out=out)
self._backfill_triggers(declarations=declarations, out=out)
@staticmethod
def _backfill_units(declarations: dict, out: dict) -> None:
"""
Move units from 'declarations' to 'out' while also filling in any
default values.
Args:
declarations: The original YAML declaring units.
out: Output dictionary where we'll add the key "units" wit the result.
"""
if out.get("units") is None:
out["units"] = {}
resolved_units: dict = out["units"]
if not declarations.get("units"):
return
for units_id, unit in declarations["units"].items():
# Copy unit to resolved_units and fill any default values
assert resolved_units.get(units_id) is None
resolved_units[units_id] = unit
if not unit.get("name"):
unit["name"] = unit["symbol"]
if unit.get("description") is None:
unit["description"] = ""
@staticmethod
def _backfill_attributes(declarations: dict, out: dict) -> None:
"""
Move attributes from 'delcarations' to 'out' while also filling in any
default values.
Args:
declarations: The original YAML declaring attributes.
out: Output dictionary where we'll add the key "attributes" with the
result.
"""
if out.get("attributes") is None:
out["attributes"] = {}
resolved_attributes: dict = out["attributes"]
if not declarations.get("attributes"):
return
for attr_id, attribute in declarations["attributes"].items():
# Copy attribute to resolved_attributes and fill any default values
assert resolved_attributes.get(attr_id) is None
resolved_attributes[attr_id] = attribute
if not attribute.get("name"):
attribute["name"] = attr_id
if not attribute.get("description"):
attribute["description"] = ""
@staticmethod
def _backfill_channels(declarations: dict, out: dict) -> None:
"""
Move channels from 'declarations' to 'out' while also filling in any
default values.
Args:
declarations: The original YAML declaring channels.
out: Output dictionary where we'll add the key "channels" with the
result.
"""
if out.get("channels") is None:
out["channels"] = {}
resolved_channels: dict = out["channels"]
if not declarations.get("channels"):
return
for chan_id, channel in declarations["channels"].items():
# Copy channel to resolved_channels and fill any default values
assert resolved_channels.get(chan_id) is None
resolved_channels[chan_id] = channel
if not channel.get("name"):
channel["name"] = chan_id
if not channel.get("description"):
channel["description"] = ""
assert channel["units"] in out["units"], (
f"'{channel['units']}' not found in\n"
+ f"{yaml.safe_dump(out.get('units', {}), indent=2)}"
)
@staticmethod
def _backfill_triggers(declarations: dict, out: dict) -> None:
"""
Move triggers from 'delcarations' to 'out' while also filling in any
default values.
Args:
declarations: The original YAML declaring triggers.
out: Output dictionary where we'll add the key "triggers" with the
result.
"""
if out.get("triggers") is None:
out["triggers"] = {}
resolved_triggers: dict = out["triggers"]
if not declarations.get("triggers"):
return
for trigger_id, trigger in declarations["triggers"].items():
# Copy trigger to resolved_triggers and fill any default values
assert resolved_triggers.get(trigger_id) is None
resolved_triggers[trigger_id] = trigger
if not trigger.get("name"):
trigger["name"] = trigger_id
if not trigger.get("description"):
trigger["description"] = ""
def _resolve_attributes(self, metadata: dict, out: dict) -> None:
"""
For each attribute in the metadta, find the matching definition in the
'out/attributes' entry and use the data to fill any missing information.
For example, if an entry exists that looks like:
sample_rate: {}
We would then try and find the 'sample_rate' key in the out/attributes
list (which was already validated by _resolve_dependencies). Since the
example above does not override any fields, we would copy the 'name',
'description', and 'units' from the definition into the attribute entry.
Args:
metadata: The full sensor metadata passed to the validate function
out: The current output, used to get channel definitions
Raises:
RuntimeError: An error in the schema validation or a missing
definition.
"""
attributes: list | None = metadata.get("attributes")
if not attributes:
metadata["attributes"] = []
self._logger.debug("No attributes found, skipping")
return
attribute: dict
for attribute in attributes:
assert attribute["attribute"] in out["attributes"]
assert attribute["channel"] in out["channels"]
assert attribute["units"] in out["units"]
def _resolve_channels(self, metadata: dict, out: dict) -> None:
"""
For each channel in the metadata, find the matching definition in the
'out/channels' entry and use the data to fill any missing information.
For example, if an entry exists that looks like:
acceleration: {}
We would then try and find the 'acceleration' key in the out/channels
dict (which was already validated by _resolve_dependencies). Since the
example above does not override any fields, we would copy the 'name',
'description', and 'units' from the definition into the channel entry.
Args:
metadata: The full sensor metadata passed to the validate function
out: The current output, used to get channel definitions
Raises:
RuntimeError: An error in the schema validation or a missing
definition.
"""
channels: dict | None = metadata.get("channels")
if not channels:
self._logger.debug("No channels found, skipping")
metadata["channels"] = {}
return
channel_name: str
indices: list[dict]
for channel_name, indices in channels.items():
# channel_name must have been resolved by now.
if out["channels"].get(channel_name) is None:
raise RuntimeError(
f"Failed to find a definition for '{channel_name}', did you"
" forget a dependency?"
)
channel = out["channels"][channel_name]
# The content of 'channel' came from the 'out/channels' dict which
# was already validated and every field added if missing. At this
# point it's safe to access the channel's name, description, and
# units.
if not indices:
indices.append({})
index: dict
for index in indices:
if not index.get("name"):
index["name"] = channel["name"]
if not index.get("description"):
index["description"] = channel["description"]
# Always use the same units
index["units"] = channel["units"]
def _resolve_triggers(self, metadata: dict, out: dict) -> None:
"""
For each trigger in the metadata, find the matching definition in the
'out/triggers' entry and use the data to fill any missing information.
For example, if an entry exists that looks like:
data_ready: {}
We would then try and find the 'data_ready' key in the out/triggers
dict (which was already validated by _resolve_dependencies). Since the
example above does not override any fields, we would copy the 'name' and
'description' from the definition into the trigger entry.
Args:
metadata: The full sensor metadata passed to the validate function
out: The current output, used to get trigger definitions
Raises:
RuntimeError: An error in the schema validation or a missing
definition.
"""
triggers: list | None = metadata.get("triggers")
if not triggers:
metadata["triggers"] = []
self._logger.debug("No triggers found, skipping")
return
for trigger_name in triggers:
assert trigger_name in out["triggers"]
def _get_dependency_file(self, dep: str) -> Path:
"""
Search for a dependency file and return the full path to it if found.
Args:
dep: The dependency string as provided by the metadata yaml.
Returns:
The dependency file as a Path object if found.
Raises:
FileNotFoundError: One of the dependencies was not found.
"""
error_string = f"Failed to find {dep} using search paths:"
# Check if a full path was used
if Path(dep).is_file():
return Path(dep)
# Search all the include paths
for path in self._include_paths:
if (path / dep).is_file():
return path / dep
error_string += f"\n- {path}"
raise FileNotFoundError(error_string)