forked from azurede007/InsureETL
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
56 lines (43 loc) · 1.87 KB
/
main.py
File metadata and controls
56 lines (43 loc) · 1.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from utils.logger import get_logger
from utils.spark_session import get_spark
from utils.offsets import OffsetManager
from stages.ingest import IngestStage
from stages.transform import TransformStage
from stages.writeback import WriteBackStage
import yaml, sys, traceback
def load_config(path):
with open(path) as f:
return yaml.safe_load(f)
def main():
logger = get_logger("InsuranceETL")
try:
config = load_config("config.yml")
spark = get_spark("Insurance_ETL_Pipeline")
offset_mgr = OffsetManager(config["mysql"], spark)
ingest = IngestStage(spark, config["mysql"], offset_mgr, logger)
transform = TransformStage(logger)
writeback = WriteBackStage(config["mysql"], logger)
#load tables data into dataframe
claims = ingest.load_incremental("claims","claim_id")
customers = ingest.load_incremental("customers", "customer_id")
policies = ingest.load_incremental("policies", "policy_id")
payments = ingest.load_incremental("payments", "payment_id")
#Data Curation
curated = transform.enrich(claims, customers, policies, payments)
#Aggregate and generate metrics
metrics = transform.generate_metrics(curated)
metrics.show()
#write the metrics into mysql table
writeback.write_metrics(metrics)
#update the offset into etl_offset table
offset_mgr.update_offset("claims", "claim_id", claims)
offset_mgr.update_offset("customers", "customer_id", customers)
offset_mgr.update_offset("policies", "policy_id", policies)
offset_mgr.update_offset("payments", "payment_id", payments)
logger.info("ETL Completed Successfully")
except Exception as e:
logger.error("ETL Failure: " + str(e))
logger.error(traceback.format_exc())
sys.exit(1)
if __name__ == "__main__":
main()