1
1
import logging
2
2
import sys
3
3
import time
4
+ from datetime import datetime , timezone
4
5
from typing import Any , Callable , Iterable , Literal , Mapping , Optional , Union , get_args
5
6
6
7
from quixstreams .models import HeadersTuples
27
28
28
29
29
30
TimePrecision = Literal ["ms" , "ns" , "us" , "s" ]
31
+ TIME_PRECISION_LEN = {
32
+ "s" : 10 ,
33
+ "ms" : 13 ,
34
+ "ns" : 16 ,
35
+ "us" : 19 ,
36
+ }
30
37
31
38
InfluxDBValueMap = dict [str , Union [str , int , float , bool ]]
32
39
33
40
FieldsCallable = Callable [[InfluxDBValueMap ], Iterable [str ]]
34
41
MeasurementCallable = Callable [[InfluxDBValueMap ], str ]
35
42
TagsCallable = Callable [[InfluxDBValueMap ], Iterable [str ]]
36
-
43
+ TimeCallable = Callable [[ InfluxDBValueMap ], Optional [ Union [ str , int , datetime ]]]
37
44
38
45
FieldsSetter = Union [Iterable [str ], FieldsCallable ]
39
46
MeasurementSetter = Union [str , MeasurementCallable ]
40
47
TagsSetter = Union [Iterable [str ], TagsCallable ]
48
+ TimeSetter = Union [str , TimeCallable ]
41
49
42
50
43
51
class InfluxDB3Sink (BatchingSink ):
44
52
_TIME_PRECISIONS = {
53
+ "s" : WritePrecision .S ,
45
54
"ms" : WritePrecision .MS ,
46
55
"ns" : WritePrecision .NS ,
47
56
"us" : WritePrecision .US ,
48
- "s" : WritePrecision .S ,
49
57
}
50
58
51
59
def __init__ (
@@ -57,7 +65,7 @@ def __init__(
57
65
measurement : MeasurementSetter ,
58
66
fields_keys : FieldsSetter = (),
59
67
tags_keys : TagsSetter = (),
60
- time_key : Optional [str ] = None ,
68
+ time_setter : Optional [TimeSetter ] = None ,
61
69
time_precision : TimePrecision = "ms" ,
62
70
allow_missing_fields : bool = False ,
63
71
include_metadata_tags : bool = False ,
@@ -108,10 +116,12 @@ def __init__(
108
116
- If empty, no tags will be sent.
109
117
>***NOTE***: InfluxDB client always converts tag values to strings.
110
118
Default - `()`.
111
- :param time_key: a key to be used as "time" when writing to InfluxDB.
112
- By default, the record timestamp will be used with "ms" time precision.
113
- When using a custom key, you may need to adjust the `time_precision` setting
114
- to match.
119
+ :param time_setter: an optional column name to use as "time" for InfluxDB.
120
+ Also accepts a callable which receives the current message data and
121
+ returns either the desired time or `None` (use default).
122
+ The time can be an `int`, `string` (RFC3339 format), or `datetime`.
123
+ The time must match the `time_precision` argument if not a `datetime` object, else raises.
124
+ By default, a record's kafka timestamp with "ms" time precision is used.
115
125
:param time_precision: a time precision to use when writing to InfluxDB.
116
126
Possible values: "ms", "ns", "us", "s".
117
127
Default - `"ms"`.
@@ -173,31 +183,16 @@ def __init__(
173
183
},
174
184
}
175
185
self ._client : Optional [InfluxDBClient3 ] = None
176
- self ._measurement = self ._measurement_callable (measurement )
177
- self ._fields_keys = self ._fields_callable (fields_keys )
178
- self ._tags_keys = self ._tags_callable (tags_keys )
186
+ self ._measurement = _measurement_callable (measurement )
187
+ self ._fields_keys = _fields_callable (fields_keys )
188
+ self ._tags_keys = _tags_callable (tags_keys )
189
+ self ._time_setter = _time_callable (time_setter )
179
190
self ._include_metadata_tags = include_metadata_tags
180
- self ._time_key = time_key
181
191
self ._write_precision = self ._TIME_PRECISIONS [time_precision ]
182
192
self ._batch_size = batch_size
183
193
self ._allow_missing_fields = allow_missing_fields
184
194
self ._convert_ints_to_floats = convert_ints_to_floats
185
195
186
- def _measurement_callable (self , setter : MeasurementSetter ) -> MeasurementCallable :
187
- if callable (setter ):
188
- return setter
189
- return lambda value : setter
190
-
191
- def _fields_callable (self , setter : FieldsSetter ) -> FieldsCallable :
192
- if callable (setter ):
193
- return setter
194
- return lambda value : setter
195
-
196
- def _tags_callable (self , setter : TagsSetter ) -> TagsCallable :
197
- if callable (setter ):
198
- return setter
199
- return lambda value : setter
200
-
201
196
def setup (self ):
202
197
self ._client = InfluxDBClient3 (** self ._client_args )
203
198
try :
@@ -241,19 +236,20 @@ def write(self, batch: SinkBatch):
241
236
measurement = self ._measurement
242
237
fields_keys = self ._fields_keys
243
238
tags_keys = self ._tags_keys
244
- time_key = self ._time_key
239
+ time_setter = self ._time_setter
245
240
for write_batch in batch .iter_chunks (n = self ._batch_size ):
246
241
records = []
247
242
248
- min_timestamp = sys . maxsize
249
- max_timestamp = - 1
243
+ min_timestamp = None
244
+ max_timestamp = None
250
245
251
246
for item in write_batch :
252
247
value = item .value
253
248
# Evaluate these before we alter the value
254
249
_measurement = measurement (value )
255
250
_tags_keys = tags_keys (value )
256
251
_fields_keys = fields_keys (value )
252
+ ts = time_setter (value )
257
253
258
254
tags = {}
259
255
for tag_key in _tags_keys :
@@ -265,6 +261,24 @@ def write(self, batch: SinkBatch):
265
261
tag = value .pop (tag_key )
266
262
tags [tag_key ] = tag
267
263
264
+ if ts is None :
265
+ ts = item .timestamp
266
+ # Note: currently NOT validating the timestamp itself is valid
267
+ elif not isinstance (ts , valid := (str , int , datetime )):
268
+ raise TypeError (
269
+ f'InfluxDB3 "time" field expects: { valid } , got { type (ts )} '
270
+ )
271
+
272
+ if isinstance (ts , int ):
273
+ time_len = len (str (ts ))
274
+ expected = TIME_PRECISION_LEN [self ._write_precision ]
275
+ if time_len != expected :
276
+ raise ValueError (
277
+ f'`time_precision` of "{ self ._write_precision } " '
278
+ f"expects a { expected } -digit integer epoch, "
279
+ f"got { time_len } (timestamp: { ts } )."
280
+ )
281
+
268
282
if self ._include_metadata_tags :
269
283
tags ["__key" ] = item .key
270
284
tags ["__topic" ] = batch .topic
@@ -281,20 +295,19 @@ def write(self, batch: SinkBatch):
281
295
282
296
if self ._convert_ints_to_floats :
283
297
fields = {
284
- k : float (v ) if isinstance ( v , int ) else v
298
+ k : float (v ) if type ( v ) is int else v # avoids bool matching
285
299
for k , v in fields .items ()
286
300
}
287
301
288
- ts = value [time_key ] if time_key is not None else item .timestamp
289
302
record = {
290
303
"measurement" : _measurement ,
291
304
"tags" : tags ,
292
305
"fields" : fields ,
293
306
"time" : ts ,
294
307
}
295
308
records .append (record )
296
- min_timestamp = min (ts , min_timestamp )
297
- max_timestamp = max (ts , max_timestamp )
309
+ min_timestamp = min (ts , min_timestamp or _ts_min_default ( ts ) )
310
+ max_timestamp = max (ts , max_timestamp or _ts_max_default ( ts ) )
298
311
299
312
try :
300
313
_start = time .monotonic ()
@@ -317,3 +330,47 @@ def write(self, batch: SinkBatch):
317
330
retry_after = int (exc .retry_after )
318
331
) from exc
319
332
raise
333
+
334
+
335
+ def _ts_min_default (timestamp : Union [int , str , datetime ]):
336
+ if isinstance (timestamp , int ):
337
+ return sys .maxsize
338
+ elif isinstance (timestamp , str ):
339
+ return "~" # lexicographically largest ASCII char
340
+ elif isinstance (timestamp , datetime ):
341
+ return datetime .max .replace (tzinfo = timezone .utc )
342
+
343
+
344
+ def _ts_max_default (timestamp : Union [int , str , datetime ]):
345
+ if isinstance (timestamp , int ):
346
+ return - 1
347
+ elif isinstance (timestamp , str ):
348
+ return ""
349
+ elif isinstance (timestamp , datetime ):
350
+ return datetime .min .replace (tzinfo = timezone .utc )
351
+
352
+
353
+ def _measurement_callable (setter : MeasurementSetter ) -> MeasurementCallable :
354
+ if callable (setter ):
355
+ return setter
356
+ return lambda value : setter
357
+
358
+
359
+ def _fields_callable (setter : FieldsSetter ) -> FieldsCallable :
360
+ if callable (setter ):
361
+ return setter
362
+ return lambda value : setter
363
+
364
+
365
+ def _tags_callable (setter : TagsSetter ) -> TagsCallable :
366
+ if callable (setter ):
367
+ return setter
368
+ return lambda value : setter
369
+
370
+
371
+ def _time_callable (setter : Optional [TimeSetter ]) -> TimeCallable :
372
+ if callable (setter ):
373
+ return setter
374
+ if isinstance (setter , str ):
375
+ return lambda value : value [setter ] # type: ignore
376
+ return lambda value : None # the kafka timestamp will be used
0 commit comments