-
-
Notifications
You must be signed in to change notification settings - Fork 37
Expand file tree
/
Copy pathshortcuts.py
More file actions
129 lines (109 loc) · 4.53 KB
/
shortcuts.py
File metadata and controls
129 lines (109 loc) · 4.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
from __future__ import annotations
from typing import Any
from typing import Mapping
from typing import cast
from jsonschema.exceptions import best_match
from jsonschema.protocols import Validator
from referencing import Registry
from openapi_schema_validator._caches import ValidatorCache
from openapi_schema_validator._dialects import OAS31_BASE_DIALECT_ID
from openapi_schema_validator._dialects import OAS32_BASE_DIALECT_ID
from openapi_schema_validator.validators import OAS32Validator
from openapi_schema_validator.validators import (
build_enforce_properties_required_validator,
)
from openapi_schema_validator.validators import check_openapi_schema
_LOCAL_ONLY_REGISTRY = Registry()
_VALIDATOR_CACHE = ValidatorCache()
def _check_schema(
cls: type[Validator],
schema: dict[str, Any],
) -> None:
meta_schema = getattr(cls, "META_SCHEMA", None)
# jsonschema's default check_schema path does not accept a custom
# registry, so for OAS dialects we use the package registry
# explicitly to keep metaschema resolution local and deterministic.
if isinstance(meta_schema, dict) and meta_schema.get("$id") in (
OAS31_BASE_DIALECT_ID,
OAS32_BASE_DIALECT_ID,
):
check_openapi_schema(cls, schema)
else:
cls.check_schema(schema)
def validate(
instance: Any,
schema: Mapping[str, Any],
cls: type[Validator] = OAS32Validator,
*args: Any,
allow_remote_references: bool = False,
check_schema: bool = True,
enforce_properties_required: bool = False,
**kwargs: Any,
) -> None:
"""
Validate an instance against a given schema using the specified
validator class.
Unlike direct ``Validator(schema).validate(instance)`` usage, this helper
checks schema validity first.
Invalid schemas therefore raise ``SchemaError`` before any instance
validation occurs.
Args:
instance: Value to validate against ``schema``.
schema: OpenAPI schema mapping used for validation. Local references
(``#/...``) are resolved against this mapping.
cls: Validator class to use. Defaults to ``OAS32Validator``.
*args: Positional arguments forwarded to ``cls`` constructor.
allow_remote_references: If ``True`` and no explicit ``registry`` is
provided, allow jsonschema's default remote reference retrieval
behavior.
check_schema: If ``True`` (default), validate the provided schema
before validating ``instance``. If ``False``, skip schema
validation and run instance validation directly.
enforce_properties_required: If ``True``, all properties declared in
the schema's ``properties`` object are strictly required to be
present in the instance (except those marked as ``writeOnly`` or
``readOnly`` where appropriate), regardless of the schema's
``required`` array. Defaults to ``False``.
**kwargs: Keyword arguments forwarded to ``cls`` constructor
(for example ``registry`` and ``format_checker``). If omitted,
a local-only empty ``Registry`` is used to avoid implicit remote
reference retrieval.
Raises:
jsonschema.exceptions.SchemaError: If ``schema`` is invalid.
jsonschema.exceptions.ValidationError: If ``instance`` is invalid.
"""
if enforce_properties_required:
cls = build_enforce_properties_required_validator(cls) # type: ignore[arg-type]
schema_dict = cast(dict[str, Any], schema)
validator_kwargs = kwargs.copy()
if not allow_remote_references:
validator_kwargs.setdefault("registry", _LOCAL_ONLY_REGISTRY)
key = _VALIDATOR_CACHE.build_key(
schema=schema_dict,
cls=cls,
args=args,
kwargs=validator_kwargs,
allow_remote_references=allow_remote_references,
)
cached = _VALIDATOR_CACHE.get(key)
if cached is None:
if check_schema:
_check_schema(cls, schema_dict)
validator = cls(schema_dict, *args, **validator_kwargs)
cached = _VALIDATOR_CACHE.set(
key,
validator=validator,
schema_checked=check_schema,
)
elif check_schema and not cached.schema_checked:
_check_schema(cls, schema_dict)
_VALIDATOR_CACHE.mark_schema_checked(key)
else:
_VALIDATOR_CACHE.touch(key)
error = best_match(
cached.validator.evolve(schema=schema_dict).iter_errors(instance)
)
if error is not None:
raise error
def clear_validate_cache() -> None:
_VALIDATOR_CACHE.clear()