-
Notifications
You must be signed in to change notification settings - Fork 336
/
Copy pathjson_schema.py
437 lines (354 loc) · 14.5 KB
/
json_schema.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
import json
import re
import typing as _t
import pydantic_core
import typing_extensions as _ta
from pydantic import BaseModel
from pydantic.json_schema import GenerateJsonSchema, JsonSchemaValue
from pydantic_core import core_schema
from .components.forms import (
FormField,
FormFieldBoolean,
FormFieldFile,
FormFieldInput,
FormFieldSelect,
FormFieldSelectSearch,
FormFieldTextarea,
InputHtmlType,
)
if _t.TYPE_CHECKING:
from .forms import SelectOption
else:
SelectOption = dict
__all__ = 'model_json_schema_to_fields', 'SchemeLocation'
class GenerateJsonSchemaWithDefaultFactory(GenerateJsonSchema):
"""Custom JSON schema including default_factory value as in
https://github.com/pydantic/pydantic/blob/main/pydantic/json_schema.py#L1046
"""
def default_schema(self, schema: core_schema.WithDefaultSchema) -> JsonSchemaValue:
"""Generates a JSON schema that matches a schema with a default value.
Args:
schema: The core schema.
Returns:
The generated JSON schema.
"""
json_schema = self.generate_inner(schema['schema'])
if 'default' in schema:
default = schema['default']
elif 'default_factory' in schema:
default = schema['default_factory']()
else:
return json_schema
try:
encoded_default = self.encode_default(default)
except pydantic_core.PydanticSerializationError:
self.emit_warning(
'non-serializable-default',
f'Default value {default} is not JSON serializable; excluding default from JSON schema',
)
# Return the inner schema, as though there was no default
return json_schema
if '$ref' in json_schema:
# Since reference schemas do not support child keys, we wrap the reference schema in a single-case allOf:
return {'allOf': [json_schema], 'default': encoded_default}
else:
json_schema['default'] = encoded_default
return json_schema
def model_json_schema_to_fields(model: _t.Type[BaseModel]) -> _t.List[FormField]:
schema = _t.cast(JsonSchemaObject, model.model_json_schema(schema_generator=GenerateJsonSchemaWithDefaultFactory))
defs = schema.get('$defs', {})
return list(json_schema_obj_to_fields(schema, [], [], defs))
JsonSchemaInput: _ta.TypeAlias = (
'JsonSchemaString | JsonSchemaStringEnum | JsonSchemaFile | JsonSchemaTextarea | JsonSchemaInt | JsonSchemaNumber'
)
JsonSchemaField: _ta.TypeAlias = 'JsonSchemaInput | JsonSchemaBool'
JsonSchemaConcrete: _ta.TypeAlias = 'JsonSchemaField | JsonSchemaArray | JsonSchemaObject'
JsonSchemaAny: _ta.TypeAlias = 'JsonSchemaConcrete | JsonSchemaAnyOf | JsonSchemaAllOf | JsonSchemaRef'
class JsonSchemaBase(_t.TypedDict, total=False):
title: str
description: str
class JsonSchemaString(JsonSchemaBase):
type: _ta.Required[_t.Literal['string']]
default: str
format: _t.Literal['date', 'date-time', 'time', 'email', 'uri', 'uuid', 'password']
class JsonSchemaStringEnum(JsonSchemaBase, total=False):
type: _ta.Required[_t.Literal['string']]
enum: _ta.Required[_t.List[str]]
default: str
placeholder: str
enum_labels: _t.Dict[str, str]
class JsonSchemaStringSearch(JsonSchemaBase, total=False):
type: _ta.Required[_t.Literal['string']]
search_url: _ta.Required[str]
placeholder: str
initial: SelectOption
class JsonSchemaFile(JsonSchemaBase, total=False):
type: _ta.Required[_t.Literal['string']]
format: _ta.Required[_t.Literal['binary']]
accept: str
class JsonSchemaTextarea(JsonSchemaBase, total=False):
type: _ta.Required[_t.Literal['string']]
format: _ta.Required[_t.Literal['textarea']]
rows: int
cols: int
default: str
placeholder: str
class JsonSchemaBool(JsonSchemaBase, total=False):
type: _ta.Required[_t.Literal['boolean']]
default: bool
mode: _t.Literal['checkbox', 'switch']
class JsonSchemaInt(JsonSchemaBase, total=False):
type: _ta.Required[_t.Literal['integer']]
default: int
minimum: int
exclusiveMinimum: int
maximum: int
exclusiveMaximum: int
multipleOf: int
class JsonSchemaNumber(JsonSchemaBase, total=False):
type: _ta.Required[_t.Literal['number']]
default: float
minimum: float
exclusiveMinimum: float
maximum: float
exclusiveMaximum: float
multipleOf: float
class JsonSchemaArray(JsonSchemaBase, total=False):
type: _ta.Required[_t.Literal['array']]
uniqueItems: bool
minItems: int
maxItems: int
prefixItems: _t.List[JsonSchemaAny]
items: JsonSchemaAny
search_url: str
placeholder: str
JsonSchemaDefs: _ta.TypeAlias = 'dict[str, JsonSchemaConcrete]'
JsonSchemaObject = _t.TypedDict(
'JsonSchemaObject',
{
'type': _ta.Required[_t.Literal['object']],
'properties': _t.Dict[str, JsonSchemaAny],
'$defs': JsonSchemaDefs,
'required': _t.List[str],
'title': str,
'description': str,
},
total=False,
)
class JsonSchemaNull(JsonSchemaBase):
type: _t.Literal['null']
class JsonSchemaAnyOf(JsonSchemaBase):
anyOf: _t.List[JsonSchemaAny]
class JsonSchemaAllOf(JsonSchemaBase):
allOf: _t.List[JsonSchemaAny]
JsonSchemaRef = _t.TypedDict('JsonSchemaRef', {'$ref': str})
SchemeLocation: _ta.TypeAlias = '_t.List[str | int]'
def json_schema_obj_to_fields(
schema: JsonSchemaObject, loc: SchemeLocation, title: _t.List[str], defs: JsonSchemaDefs
) -> _t.Iterable[FormField]:
required = set(schema.get('required', []))
if properties := schema.get('properties'):
for key, value in properties.items():
yield from json_schema_any_to_fields(value, loc + [key], title, key in required, defs)
def json_schema_any_to_fields(
schema: JsonSchemaAny, loc: SchemeLocation, title: _t.List[str], required: bool, defs: JsonSchemaDefs
) -> _t.Iterable[FormField]:
schema, required = deference_json_schema(schema, defs, required)
title = title + [schema.get('title') or loc_to_title(loc)]
if schema_is_field(schema):
yield json_schema_field_to_field(schema, loc, title, required)
elif schema_is_array(schema):
yield from json_schema_array_to_fields(schema, loc, title, required, defs)
else:
assert schema_is_object(schema), f'Unexpected schema type {schema}'
yield from json_schema_obj_to_fields(schema, loc, title, defs)
def json_schema_field_to_field(
schema: JsonSchemaField, loc: SchemeLocation, title: _t.List[str], required: bool
) -> FormField:
name = loc_to_name(loc)
if schema['type'] == 'boolean':
return FormFieldBoolean(
name=name,
title=title,
required=required,
initial=schema.get('default'),
description=schema.get('description'),
mode=schema.get('mode', 'checkbox'),
)
elif field := special_string_field(schema, name, title, required, False):
return field
else:
return FormFieldInput(
name=name,
title=title,
html_type=input_html_type(schema),
required=required,
initial=schema.get('default'),
autocomplete=schema.get('autocomplete'),
description=schema.get('description'),
)
def loc_to_title(loc: SchemeLocation) -> str:
return as_title(loc[-1])
def json_schema_array_to_fields(
schema: JsonSchemaArray, loc: SchemeLocation, title: _t.List[str], required: bool, defs: JsonSchemaDefs
) -> _t.Iterable[FormField]:
items_schema = schema.get('items')
if items_schema:
items_schema, required = deference_json_schema(items_schema, defs, required)
for field_name in 'search_url', 'placeholder', 'description':
if value := schema.get(field_name):
items_schema[field_name] = value # type: ignore
if field := special_string_field(items_schema, loc_to_name(loc), title, required, True):
yield field
return
# for fixed length tuples (min_items == max_items), where all items are required,
# we "inline" the fields into the list of form fields
if (min_items := schema.get('minItems')) and min_items == schema.get('maxItems'):
if items := schema.get('prefixItems'):
for i, item in enumerate(items):
fields = list(json_schema_any_to_fields(item, loc + [i], title, required, defs))
if any(not f.required for f in fields):
raise NotImplementedError(
'Tuples with optional fields are not yet supported, '
'see https://github.com/pydantic/FastUI/pull/52'
)
yield from fields
return
raise NotImplementedError('Array fields are not fully supported, see https://github.com/pydantic/FastUI/pull/52')
def special_string_field(
schema: JsonSchemaConcrete, name: str, title: _t.List[str], required: bool, multiple: bool
) -> _t.Union[FormField, None]:
if schema['type'] == 'string':
if schema.get('format') == 'binary':
return FormFieldFile(
name=name,
title=title,
required=required,
multiple=multiple,
accept=schema.get('accept'),
description=schema.get('description'),
)
elif schema.get('format') == 'textarea':
return FormFieldTextarea(
name=name,
title=title,
required=required,
rows=schema.get('rows'),
cols=schema.get('cols'),
placeholder=schema.get('placeholder'),
initial=schema.get('initial'),
description=schema.get('description'),
autocomplete=schema.get('autocomplete'),
)
elif enum := schema.get('enum'):
enum_labels = schema.get('enum_labels', {})
return FormFieldSelect(
name=name,
title=title,
placeholder=schema.get('placeholder'),
required=required,
multiple=multiple,
options=[SelectOption(value=v, label=enum_labels.get(v) or as_title(v)) for v in enum],
initial=schema.get('default'),
description=schema.get('description'),
autocomplete=schema.get('autocomplete'),
)
elif search_url := schema.get('search_url'):
return FormFieldSelectSearch(
search_url=search_url,
name=name,
title=title,
placeholder=schema.get('placeholder'),
required=required,
multiple=multiple,
initial=schema.get('initial'),
description=schema.get('description'),
)
def loc_to_name(loc: SchemeLocation) -> str:
"""
Convert a loc to a string if any item contains a '.' or the first item starts with '[' then encode with JSON,
otherwise join with '.'.
The sister method `name_to_loc` is in `form_extra.py`.
"""
if any(isinstance(v, str) and '.' in v for v in loc):
return json.dumps(loc)
elif isinstance(loc[0], str) and loc[0].startswith('['):
return json.dumps(loc)
else:
return '.'.join(str(v) for v in loc)
def deference_json_schema(
schema: JsonSchemaAny, defs: JsonSchemaDefs, required: bool
) -> _t.Tuple[JsonSchemaConcrete, bool]:
"""
Convert a schema which might be a reference or union to a concrete schema.
"""
if ref := schema.get('$ref'):
defs = defs or {}
def_schema = defs[ref.rsplit('/')[-1]]
if def_schema is None:
raise ValueError(f'Invalid $ref "{ref}", not found in {defs}')
else:
return def_schema.copy(), required # clone dict to avoid attribute leakage via shared schema.
elif any_of := schema.get('anyOf'):
if len(any_of) == 2 and sum(s.get('type') == 'null' for s in any_of) == 1:
# If anyOf is a single type and null, then it is optional
not_null_schema = next(s for s in any_of if s.get('type') != 'null')
# copy everything except `anyOf` across to the new schema
# TODO is this right?
for key, value in schema.items(): # type: ignore
if key not in {'anyOf'}:
not_null_schema[key] = value # type: ignore
return deference_json_schema(not_null_schema, defs, False)
else:
raise NotImplementedError('`anyOf` schemas which are not simply `X | None` are not yet supported')
elif all_of := schema.get('allOf'):
all_of = _t.cast(_t.List[JsonSchemaAny], all_of)
if len(all_of) == 1:
new_schema, required = deference_json_schema(all_of[0], defs, required)
new_schema.update({k: v for k, v in schema.items() if k != 'allOf'}) # type: ignore
return new_schema, required
else:
raise NotImplementedError('`allOf` schemas with more than 1 choice are not yet supported')
else:
return _t.cast(JsonSchemaConcrete, schema), required
def as_title(s: _t.Any) -> str:
return re.sub(r'[\-_]', ' ', str(s)).title()
type_lookup: _t.Dict[str, InputHtmlType] = {
'string': 'text',
'string-date': 'date',
'string-date-time': 'datetime-local',
'string-time': 'time',
'string-email': 'email',
'string-uri': 'url',
'string-uuid': 'text',
'string-password': 'password',
'number': 'number',
'integer': 'number',
}
def input_html_type(schema: JsonSchemaField) -> InputHtmlType:
"""
Convert a schema into an HTML type
"""
key = schema['type']
if key == 'string':
if string_format := schema.get('format'):
key = f'string-{string_format}'
try:
return type_lookup[key]
except KeyError as e:
raise ValueError(f'Unknown schema: {schema}') from e
def schema_is_field(schema: JsonSchemaConcrete) -> _ta.TypeGuard[JsonSchemaField]:
"""
Determine if a schema is a field `JsonSchemaField`
"""
return schema['type'] in {'string', 'number', 'integer', 'boolean'}
def schema_is_array(schema: JsonSchemaConcrete) -> _ta.TypeGuard[JsonSchemaArray]:
"""
Determine if a schema is an array `JsonSchemaArray`
"""
return schema['type'] == 'array'
def schema_is_object(schema: JsonSchemaConcrete) -> _ta.TypeGuard[JsonSchemaObject]:
"""
Determine if a schema is an object `JsonSchemaObject`
"""
return schema['type'] == 'object'