11"""CrateDB target sink class, which handles writing streams."""
22
3- import datetime
43import os
5- import time
64from typing import List , Optional , Union
75
86import sqlalchemy as sa
@@ -20,9 +18,6 @@ class CrateDBSink(PostgresSink):
2018
2119 connector_class = CrateDBConnector
2220
23- soft_delete_column_name = "__sdc_deleted_at"
24- version_column_name = "__sdc_table_version"
25-
2621 def __init__ (self , * args , ** kwargs ):
2722 """Initialize SQL Sink. See super class for more details."""
2823 super ().__init__ (* args , ** kwargs )
@@ -32,91 +27,6 @@ def __init__(self, *args, **kwargs):
3227 # operations on the target table.
3328 self .strategy_direct = MELTANO_CRATEDB_STRATEGY_DIRECT
3429
35- # Record processing
36-
37- def _add_sdc_metadata_to_record (
38- self ,
39- record : dict ,
40- message : dict ,
41- context : dict ,
42- ) -> None :
43- """Populate metadata _sdc columns from incoming record message.
44-
45- Record metadata specs documented at:
46- https://sdk.meltano.com/en/latest/implementation/record_metadata.html
47-
48- Args:
49- record: Individual record in the stream.
50- message: The record message.
51- context: Stream partition or context dictionary.
52- """
53- record ["__sdc_extracted_at" ] = message .get ("time_extracted" )
54- record ["__sdc_received_at" ] = datetime .datetime .now (
55- tz = datetime .timezone .utc ,
56- ).isoformat ()
57- record ["__sdc_batched_at" ] = (
58- context .get ("batch_start_time" , None ) or datetime .datetime .now (tz = datetime .timezone .utc )
59- ).isoformat ()
60- record ["__sdc_deleted_at" ] = record .get ("__sdc_deleted_at" )
61- record ["__sdc_sequence" ] = int (round (time .time () * 1000 ))
62- record ["__sdc_table_version" ] = message .get ("version" )
63- record ["__sdc_sync_started_at" ] = self .sync_started_at
64-
65- def _add_sdc_metadata_to_schema (self ) -> None :
66- """Add _sdc metadata columns.
67-
68- Record metadata specs documented at:
69- https://sdk.meltano.com/en/latest/implementation/record_metadata.html
70- """
71- properties_dict = self .schema ["properties" ]
72- for col in (
73- "__sdc_extracted_at" ,
74- "__sdc_received_at" ,
75- "__sdc_batched_at" ,
76- "__sdc_deleted_at" ,
77- ):
78- properties_dict [col ] = {
79- "type" : ["null" , "string" ],
80- "format" : "date-time" ,
81- }
82- for col in ("__sdc_sequence" , "__sdc_table_version" , "__sdc_sync_started_at" ):
83- properties_dict [col ] = {"type" : ["null" , "integer" ]}
84-
85- def _remove_sdc_metadata_from_schema (self ) -> None :
86- """Remove _sdc metadata columns.
87-
88- Record metadata specs documented at:
89- https://sdk.meltano.com/en/latest/implementation/record_metadata.html
90- """
91- properties_dict = self .schema ["properties" ]
92- for col in (
93- "__sdc_extracted_at" ,
94- "__sdc_received_at" ,
95- "__sdc_batched_at" ,
96- "__sdc_deleted_at" ,
97- "__sdc_sequence" ,
98- "__sdc_table_version" ,
99- "__sdc_sync_started_at" ,
100- ):
101- properties_dict .pop (col , None )
102-
103- def _remove_sdc_metadata_from_record (self , record : dict ) -> None :
104- """Remove metadata _sdc columns from incoming record message.
105-
106- Record metadata specs documented at:
107- https://sdk.meltano.com/en/latest/implementation/record_metadata.html
108-
109- Args:
110- record: Individual record in the stream.
111- """
112- record .pop ("__sdc_extracted_at" , None )
113- record .pop ("__sdc_received_at" , None )
114- record .pop ("__sdc_batched_at" , None )
115- record .pop ("__sdc_deleted_at" , None )
116- record .pop ("__sdc_sequence" , None )
117- record .pop ("__sdc_table_version" , None )
118- record .pop ("__sdc_sync_started_at" , None )
119-
12030 def process_batch (self , context : dict ) -> None :
12131 """Process a batch with the given batch context.
12232
0 commit comments