-
Notifications
You must be signed in to change notification settings - Fork 942
Expand file tree
/
Copy path__init__.py
More file actions
227 lines (184 loc) · 6.64 KB
/
__init__.py
File metadata and controls
227 lines (184 loc) · 6.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import io
from typing import Optional
from .schema_registry_client import (
ConfigCompatibilityLevel,
Metadata,
MetadataProperties,
MetadataTags,
RegisteredSchema,
Rule,
RuleKind,
RuleMode,
RuleParams,
RuleSet,
Schema,
SchemaRegistryClient,
AsyncSchemaRegistryClient,
SchemaRegistryError,
SchemaReference,
ServerConfig
)
from ..serialization import SerializationError, MessageField, SerializationContext
_KEY_SCHEMA_ID = "__key_schema_id"
_VALUE_SCHEMA_ID = "__value_schema_id"
_MAGIC_BYTE = 0
_MAGIC_BYTE_V0 = _MAGIC_BYTE
_MAGIC_BYTE_V1 = 1
__all__ = [
"ConfigCompatibilityLevel",
"Metadata",
"MetadataProperties",
"MetadataTags",
"RegisteredSchema",
"Rule",
"RuleKind",
"RuleMode",
"RuleParams",
"RuleSet",
"Schema",
"SchemaRegistryClient",
"AsyncSchemaRegistryClient",
"SchemaRegistryError",
"SchemaReference",
"ServerConfig",
"topic_subject_name_strategy",
"topic_record_subject_name_strategy",
"record_subject_name_strategy",
"header_schema_id_serializer",
"prefix_schema_id_serializer",
"dual_schema_id_deserializer",
"prefix_schema_id_deserializer"
]
def topic_subject_name_strategy(ctx, record_name: Optional[str]) -> Optional[str]:
"""
Constructs a subject name in the form of {topic}-key|value.
Args:
ctx (SerializationContext): Metadata pertaining to the serialization
operation.
record_name (Optional[str]): Record name.
"""
return ctx.topic + "-" + ctx.field
def topic_record_subject_name_strategy(ctx, record_name: Optional[str]) -> Optional[str]:
"""
Constructs a subject name in the form of {topic}-{record_name}.
Args:
ctx (SerializationContext): Metadata pertaining to the serialization
operation.
record_name (Optional[str]): Record name.
"""
return ctx.topic + "-" + record_name if record_name is not None else None
def record_subject_name_strategy(ctx, record_name: Optional[str]) -> Optional[str]:
"""
Constructs a subject name in the form of {record_name}.
Args:
ctx (SerializationContext): Metadata pertaining to the serialization
operation.
record_name (Optional[str]): Record name.
"""
return record_name if record_name is not None else None
def reference_subject_name_strategy(ctx, schema_ref: SchemaReference) -> Optional[str]:
"""
Constructs a subject reference name in the form of {reference name}.
Args:
ctx (SerializationContext): Metadata pertaining to the serialization
operation.
schema_ref (SchemaReference): SchemaReference instance.
"""
return schema_ref.name if schema_ref is not None else None
def header_schema_id_serializer(payload: bytes, ctx: Optional[SerializationContext], schema_id) -> bytes:
"""
Serializes the schema guid into the header.
Args:
payload (bytes): The payload to serialize.
ctx (SerializationContext): Metadata pertaining to the serialization
operation.
schema_id (SchemaId): The schema ID to serialize.
Returns:
bytes: The payload
"""
if ctx is None:
raise SerializationError("SerializationContext is required for header_schema_id_serializer")
headers = ctx.headers
if headers is None:
raise SerializationError("Missing headers")
header_key = _KEY_SCHEMA_ID if ctx.field == MessageField.KEY else _VALUE_SCHEMA_ID
header_value = schema_id.guid_to_bytes()
if isinstance(headers, list):
headers.append((header_key, header_value))
elif isinstance(headers, dict):
headers[header_key] = header_value
else:
raise SerializationError("Invalid headers type")
return payload
def prefix_schema_id_serializer(payload: bytes, ctx, schema_id) -> bytes:
"""
Serializes the schema id into the payload prefix.
Args:
payload (bytes): The payload to serialize.
ctx (SerializationContext): Metadata pertaining to the serialization
operation.
schema_id (SchemaId): The schema ID to serialize.
Returns:
bytes: The payload prefixed with the schema id
"""
return schema_id.id_to_bytes() + payload
def dual_schema_id_deserializer(payload: bytes, ctx: Optional[SerializationContext], schema_id) -> io.BytesIO:
"""
Deserializes the schema id by first checking the header, then the payload prefix.
Args:
payload (bytes): The payload to serialize.
ctx (SerializationContext): Metadata pertaining to the serialization
operation.
schema_id (SchemaId): The schema ID to serialize.
Returns:
bytes: The payload
"""
# Look for schema ID in headers
header_value = None
if ctx is not None:
headers = ctx.headers
if headers is not None:
header_key = _KEY_SCHEMA_ID if ctx.field == MessageField.KEY else _VALUE_SCHEMA_ID
if isinstance(headers, list):
# look for header_key in headers
for header in headers:
if header[0] == header_key:
header_value = header[1]
break
elif isinstance(headers, dict):
header_value = headers.get(header_key, None)
# Parse schema ID from determined source and return appropriate payload
if header_value is not None:
schema_id.from_bytes(io.BytesIO(header_value)) # type: ignore[arg-type]
return io.BytesIO(payload) # Return full payload when schema ID is in header
else:
return schema_id.from_bytes(io.BytesIO(payload)) # Parse from payload, return remainder
def prefix_schema_id_deserializer(payload: bytes, ctx, schema_id) -> io.BytesIO:
"""
Deserializes the schema id from the payload prefix.
Args:
payload (bytes): The payload to serialize.
ctx (SerializationContext): Metadata pertaining to the serialization
operation.
schema_id (SchemaId): The schema ID to serialize.
Returns:
bytes: The payload
"""
return schema_id.from_bytes(io.BytesIO(payload))