-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path41.DesignBulkDataImporter.py
More file actions
122 lines (91 loc) · 3.15 KB
/
41.DesignBulkDataImporter.py
File metadata and controls
122 lines (91 loc) · 3.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# Design a Bulk Data Importer that loads CSV/JSON files, validates each record,
# processes them concurrently, stores valid records in an in-memory datastore,
# and retries failed records once. The system must generate a summary report of
# total, success, and failed records. The entire solution should run as a single,
# self-contained file without any external database.
import json
import csv
from concurrent.futures import ThreadPoolExecutor
from threading import Lock
DATA_STORE = []
DB_LOCK = Lock()
class DataStore:
@staticmethod
def save(record):
with DB_LOCK:
DATA_STORE.append(record)
@staticmethod
def list_all():
return DATA_STORE
class BaseImporter:
def __init__(self, file_path):
self.file_path = file_path
self.failed_records = []
def load(self):
raise NotImplementedError
def validate(self, record):
return True, ""
def process(self, record):
DataStore.save(record)
def run(self):
records = self.load()
with ThreadPoolExecutor(max_workers=4) as pool:
results = list(pool.map(self._process_record, records))
retry_results = []
for r in self.failed_records:
ok, msg = self.validate(r)
if ok:
self.process(r)
else:
retry_results.append((r, msg))
return {
"total": len(records),
"success": len(records) - len(retry_results),
"failed": len(retry_results),
"data": DataStore.list_all()
}
def _process_record(self, record):
ok, msg = self.validate(record)
if ok:
self.process(record)
else:
self.failed_records.append(record)
class CSVImporter(BaseImporter):
def load(self):
records = []
with open(self.file_path, "r") as f:
reader = csv.DictReader(f)
for row in reader:
records.append(row)
return records
def validate(self, record):
if not record.get("id"):
return False, "Missing Id"
return True, ""
class JSONImporter(BaseImporter):
def load(self):
with open(self.file_path, "r") as f:
return json.load(f)
def validate(self, record):
if "id" not in record:
return False, "Missing Id"
return True, ""
class ImporterFactory:
@staticmethod
def get_importer(file_path):
if file_path.endswith(".csv"):
return CSVImporter(file_path)
if file_path.endswith(".json"):
return JSONImporter(file_path)
raise Exception("Unsupported file format")
if __name__ == "__main__":
# Create sample CSV file for demo
with open("sample.csv", "w") as f:
f.write("id,name\n1,Alice\n,Invalid\n2,Bob\n")
# Get correct importer from factory
importer = ImporterFactory.get_importer("sample.csv")
# Run importer (load → validate → process → retry → summary)
output = importer.run()
# Print final structured summary
print("=== Import Summary ===")
print(json.dumps(output, indent=2)) # Pretty-print final dictionary