Source code for ocdsextensionregistry.versioned_release_schema

import warnings
from copy import deepcopy

import jsonref

from ocdsextensionregistry.exceptions import (
    VersionedReleaseItemsWarning,
    VersionedReleaseRefWarning,
    VersionedReleaseTypeWarning,
)

_VERSIONED_TEMPLATE = {
    "type": "array",
    "items": {
        "type": "object",
        "properties": {
            "releaseDate": {"format": "date-time", "type": "string"},
            "releaseID": {"type": "string"},
            "value": {},
            "releaseTag": {"type": "array", "items": {"type": "string"}},
        },
    },
}
_COMMON_VERSIONED_DEFINITIONS = {
    "StringNullUriVersioned": {
        "type": ["string", "null"],
        "format": "uri",
    },
    "StringNullDateTimeVersioned": {
        "type": ["string", "null"],
        "format": "date-time",
    },
    "StringNullVersioned": {
        "type": ["string", "null"],
        "format": None,
    },
}
_RECOGNIZED_TYPES = (
    # Array
    ["array"],
    ["array", "null"],  # optional string arrays
    # Object
    ["object"],
    ["object", "null"],  # /Organization/details
    # String
    ["string"],
    ["string", "null"],
    # Literal
    ["boolean", "null"],
    ["integer", "null"],
    ["number", "null"],
    # Mixed
    ["integer", "string"],
    ["integer", "string", "null"],
    ["number", "string", "null"],
)
_KEYWORDS_TO_REMOVE = (
    # Metadata keywords
    # https://tools.ietf.org/html/draft-fge-json-schema-validation-00#section-6
    "title",
    "description",
    "default",
    # Extended keywords
    # http://os4d.opendataservices.coop/development/schema/#extended-json-schema
    "omitWhenMerged",
    "wholeListMerge",
)


def _cast_as_list(value):
    if isinstance(value, str):
        return [value]
    return sorted(value, key=lambda entry: "~" if entry == "null" else entry)


def _get_common_definition_ref(item):
    """
    Return a schema that references the common definition that the ``item`` matches: "StringNullUriVersioned",
    "StringNullDateTimeVersioned" or "StringNullVersioned".
    """
    for name, keywords in _COMMON_VERSIONED_DEFINITIONS.items():
        # If the item matches the definition.
        if any(item.get(keyword) != value for keyword, value in keywords.items()):
            continue
        # And adds no keywords to the definition.
        if any(keyword not in {*keywords, *_KEYWORDS_TO_REMOVE} for keyword in item):
            continue
        return {"$ref": f"#/definitions/{name}"}
    return None


def _remove_omit_when_merged(schema):
    """Remove properties that set ``omitWhenMerged``."""
    if isinstance(schema, list):
        for item in schema:
            _remove_omit_when_merged(item)
    elif isinstance(schema, dict):
        for key, value in schema.items():
            if key == "properties":
                for prop in list(value):
                    if value[prop].get("omitWhenMerged"):
                        del value[prop]
                        if prop in schema.get("required", []):
                            schema["required"].remove(prop)
            _remove_omit_when_merged(value)


def _update_refs_to_unversioned_definitions(schema):
    """Replace ``$ref`` values with unversioned definitions."""
    for key, value in schema.items():
        if key == "$ref":
            schema[key] = value + "Unversioned"
        elif isinstance(value, dict):
            _update_refs_to_unversioned_definitions(value)


def _get_unversioned_pointers(schema, fields, pointer=""):
    """Return the JSON Pointers to ``id`` fields that must not be versioned if the object is in an array."""
    if isinstance(schema, list):
        for index, item in enumerate(schema):
            _get_unversioned_pointers(item, fields, pointer=f"{pointer}/{index}")
    elif isinstance(schema, dict):
        # Follows the logic of _get_merge_rules in merge.py from ocds-merge.
        types = _cast_as_list(schema.get("type", []))

        # If an array is whole list merge, its items are unversioned.
        if "array" in types and schema.get("wholeListMerge"):
            return
        if "array" in types and (items := schema.get("items")):
            if isinstance(items, dict):
                item_types = _cast_as_list(items.get("type", []))
                # If an array mixes objects and non-objects, it is whole list merge.
                if any(item_type != "object" for item_type in item_types):
                    return
                # If it is an array of objects, any `id` fields are unversioned.
                if "id" in items["properties"]:
                    reference = items.__reference__["$ref"][1:] if hasattr(items, "__reference__") else pointer
                    fields.add(f"{reference}/properties/id")
            # This should only occur in tests.
            else:
                warnings.warn(VersionedReleaseItemsWarning(pointer, schema), stacklevel=2)

        for key, value in schema.items():
            _get_unversioned_pointers(value, fields, pointer=f"{pointer}/{key}")


def _add_versioned_fields(schema, unversioned_pointers, pointer=""):
    """Call ``_add_versioned_field`` on each field."""
    for key, value in schema.get("properties", {}).items():
        new_pointer = f"{pointer}/properties/{key}"
        _add_versioned_field(schema, unversioned_pointers, new_pointer, key, value)

    for key, value in schema.get("definitions", {}).items():
        new_pointer = f"{pointer}/definitions/{key}"
        _add_versioned_fields(value, unversioned_pointers, pointer=new_pointer)


def _add_versioned_field(schema, unversioned_pointers, pointer, key, value):
    """
    Perform the changes to the schema to refer to versioned/unversioned definitions.

    :param schema dict: the schema of the object on which the field is defined
    :param unversioned_pointers set: JSON Pointers to ``id`` fields to leave unversioned if the object is in an array
    :param pointer str: the field's pointer
    :param key str: the field's name
    :param value str: the field's schema
    """
    # Skip unversioned fields.
    if pointer in unversioned_pointers:
        return

    types = _cast_as_list(value.get("type", []))

    # https://github.com/transpresupuestaria/ocds_related_projects_extension
    # planning.relatedProjects has no `type`. planning.relatedProjects.locations has `properties` as "array".
    if "properties" in value and types in ([], ["array"]):
        types = ["object"]

    # If a type is unrecognized, we might need to update this script.
    if (
        "$ref" not in value
        and types not in _RECOGNIZED_TYPES
        and not (pointer == "/definitions/Quantity/properties/value" and types == ["string", "number", "null"])
    ):
        warnings.warn(VersionedReleaseTypeWarning(pointer, types, value), stacklevel=2)

    # For example, if $ref is used.
    if not types:
        # Ignore the `amendment` field, which had no `id` field in OCDS 1.0.
        if "deprecated" not in value:
            if "$ref" in value:
                versioned_pointer = f"{value['$ref'][1:]}/properties/id"
                # If the `id` field is on an object not in an array, it needs to be versioned (like on `buyer`).
                if versioned_pointer in unversioned_pointers:
                    value["$ref"] = value["$ref"] + "VersionedId"
            # This should only occur in tests.
            else:
                warnings.warn(VersionedReleaseRefWarning(pointer, value), stacklevel=2)
        return

    # Reference a common versioned definition if possible, to limit the size of the schema.
    ref = _get_common_definition_ref(value)
    if ref:
        schema["properties"][key] = ref

    # Iterate into objects with properties like `Item.unit`. Otherwise, version objects with no properties as a
    # whole, like `Organization.details`.
    elif types == ["object"] and "properties" in value:
        _add_versioned_fields(value, unversioned_pointers, pointer=pointer)

    else:
        new_value = deepcopy(value)

        if types == ["array"]:
            if (items := value.get("items")) and isinstance(items, dict):
                item_types = _cast_as_list(items.get("type", []))

                # See https://standard.open-contracting.org/latest/en/schema/merging/#whole-list-merge
                if value.get("wholeListMerge"):
                    # Update `$ref` to the unversioned definition.
                    if "$ref" in items:
                        new_value["items"]["$ref"] = items["$ref"] + "Unversioned"
                    # Otherwise, similarly, don't iterate over item properties.
                # See https://standard.open-contracting.org/latest/en/schema/merging/#lists
                elif "$ref" in items:
                    # Leave `$ref` to the versioned definition.
                    return
                # Exceptional case for deprecated `Amendment.changes`.
                elif item_types == ["object"] and pointer == "/definitions/Amendment/properties/changes":
                    return
                # Warn in case new combinations are added to the release schema.
                elif item_types != ["string"]:
                    # Note: Versioning the properties of un-$ref'erenced objects in arrays isn't implemented. However,
                    # this combination hasn't occurred, with the exception of `Amendment/changes`.
                    warnings.warn(VersionedReleaseTypeWarning(f"{pointer}/items", item_types, value), stacklevel=2)
            # This should only occur in tests.
            else:
                warnings.warn(VersionedReleaseItemsWarning(pointer, value), stacklevel=2)

        versioned = deepcopy(_VERSIONED_TEMPLATE)
        versioned["items"]["properties"]["value"] = new_value
        schema["properties"][key] = versioned


def _remove_metadata_and_extended_keywords(schema):
    """Remove metadata and extended keywords from properties and definitions."""
    if isinstance(schema, list):
        for item in schema:
            _remove_metadata_and_extended_keywords(item)
    elif isinstance(schema, dict):
        for key, value in schema.items():
            if key in {"definitions", "properties"}:
                for subschema in value.values():
                    for keyword in _KEYWORDS_TO_REMOVE:
                        subschema.pop(keyword, None)
            _remove_metadata_and_extended_keywords(value)


[docs] def get_versioned_release_schema(schema, tag): """Return the versioned release schema.""" schema = deepcopy(schema) # Update schema metadata. schema["id"] = f"https://standard.open-contracting.org/schema/{tag}/versioned-release-validation-schema.json" schema["title"] = "Schema for a compiled, versioned Open Contracting Release." # Release IDs, dates and tags appear alongside values in the versioned release schema. _remove_omit_when_merged(schema) # Create unversioned copies of all definitions. unversioned_definitions = {k + "Unversioned": deepcopy(v) for k, v in schema["definitions"].items()} _update_refs_to_unversioned_definitions(unversioned_definitions) # Determine which `id` fields occur on objects in arrays. unversioned_pointers = set() _get_unversioned_pointers(jsonref.replace_refs(schema), unversioned_pointers) # Omit `ocid` from versioning. ocid = schema["properties"].pop("ocid") _add_versioned_fields(schema, unversioned_pointers) schema["properties"]["ocid"] = ocid # Add the common versioned definitions. for name, keywords in _COMMON_VERSIONED_DEFINITIONS.items(): versioned = deepcopy(_VERSIONED_TEMPLATE) for keyword, value in keywords.items(): if value: versioned["items"]["properties"]["value"][keyword] = value schema["definitions"][name] = versioned # Add missing definitions. while True: try: jsonref.replace_refs(schema, lazy_load=False) break except jsonref.JsonRefError as e: name = e.cause.args[0] if name.endswith("VersionedId"): # Add a copy of an definition with a versioned `id` field, using the same logic as before. definition = deepcopy(schema["definitions"][name[:-11]]) pointer = f"/definitions/{name[:-11]}/properties/id" pointers = unversioned_pointers - {pointer} _add_versioned_field(definition, pointers, pointer, "id", definition["properties"]["id"]) else: # Add a copy of an definition with no versioned fields. definition = unversioned_definitions[name] schema["definitions"][name] = definition # Remove all metadata and extended keywords. _remove_metadata_and_extended_keywords(schema) return schema