-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathavroutil.py
56 lines (51 loc) · 1.43 KB
/
avroutil.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from avro.schema import (
Schema,
Field,
RecordSchema,
PrimitiveSchema,
LogicalSchema,
ArraySchema,
UnionSchema,
)
PRIMITIVE_FIELD_TYPE_MAPPING_ICEBERG_TO_AVRO = {
"boolean": "boolean",
"binary": "bytes",
"double": "double",
"float": "float",
"integer": "int",
"long": "long",
"string": "string",
# "timestamp": "long",
}
# LOGICAL_FIELD_TYPE_MAPPING = {
# "timestamp": ("timestamp-micros", "long")
# }
def iceberg_to_avro_schema(schema, *, root=False, path="root"):
if type(schema) == dict:
schema_to_field = lambda s, p: Field(
s.to_json()["type"], p, has_default=True, default="null"
)
avro_schema = RecordSchema(
path,
None,
fields=[
schema_to_field(
iceberg_to_avro_schema(v, path=f"{path}.{k}"), f"{path}.{k}"
)
for k, v in schema.items()
],
)
elif type(schema) == list:
avro_schema = ArraySchema(
items=iceberg_to_avro_schema(schema[0], path=f"{path}.items").to_json()[
"type"
],
)
elif schema == "timestamp":
avro_schema = TimestampMicrosSchema()
else:
avro_schema = PrimitiveSchema(
PRIMITIVE_FIELD_TYPE_MAPPING_ICEBERG_TO_AVRO[schema],
)
if not root:
return UnionSchema([PrimitiveSchema("null"), avro_schema])