-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathprocess_datapackage.py
More file actions
164 lines (148 loc) · 5.94 KB
/
process_datapackage.py
File metadata and controls
164 lines (148 loc) · 5.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import os
import sys
import json
from frictionless import Package
import pandas as pd
import json
def main():
datasets_path = "./datasets"
input_catalogue = os.path.join(datasets_path, "input_catalogue.json")
datapackage_list = []
with open(input_catalogue, 'r') as f:
data_packages = json.load(f)
for data_package in data_packages:
if not data_package['process']:
continue
name = data_package['name']
print('Processing Data Package:', name)
out_path = data_package['outName']
datapackage = augment_datapackage(name, out_path)
datapackage_list.append(datapackage)
# Create the top-level schema file with the combined list
# top_level_catalogue_path = os.path.join(datasets_path, "output_catalogue.json")
# with open(top_level_catalogue_path, "w") as top_level_schema_file:
# json.dump(datapackage_list, top_level_schema_file, indent=4)
return
def augment_datapackage(in_path, out_path):
"""
Augment a datapackage with additional metadata we expect.
"""
folder = in_path.split('/')[-2]
package = Package(in_path)
package.custom['udi:name'] = folder
package.custom['udi:path'] = './data/' + folder + '/'
# report = package.validate()
# if not report.valid:
# print("The package is not valid:")
# print(report.flatten(["rowPosition", "fieldPosition", "code"]))
# raise ValueError("Invalid datapackage. Please fix the errors and try again.")
print('...updating metadata')
for resource in package.resources:
ephemeral_print(resource.name)
df = resource.to_pandas()
df = df.reset_index()
rows = df.shape[0]
cols = df.shape[1]
resource.custom['udi:row_count'] = rows
resource.custom['udi:column_count'] = cols
for field in resource.schema.fields:
if field.type == 'array':
cardinality = 0
# pandas.nunique does not work on arrays and
# we don't use array types so we can ignore this
elif rows == 0:
cardinality = 0
else:
col = field.name
cardinality = df[col].nunique()
field.custom['udi:cardinality'] = cardinality
field.custom['udi:unique'] = cardinality == rows
field.custom['udi:data_type'] = infer_data_type(field)
print('\n...finding field overlap')
df = df.notnull()
df = df.drop_duplicates()
empty_cols = df.any(axis='index')
empty_cols = empty_cols.to_dict().items()
empty_cols = {k: v for k, v in empty_cols if not v}
num_empty_cols = len(empty_cols)
for field in resource.schema.fields:
if field.type == 'array':
cardinality = 0
overlapping_df = df[df[field.name]]
if overlapping_df.empty:
# No overlapping fields
field.custom['udi:overlapping_fields'] = []
continue
overlapping_df = overlapping_df.any(axis='index')
overlapping_items = overlapping_df.to_dict().items()
related_fields = [k for k,v in overlapping_items if v]
if (len(related_fields) == cols - num_empty_cols):
related_fields = 'all'
field.custom['udi:overlapping_fields'] = related_fields
print('\n...updating relationships')
# handle relationships in another pass so we can assume udi fields are populated
for resource in package.resources:
ephemeral_print(resource.name)
foreignKeys = resource.schema.foreign_keys
for foreignKey in foreignKeys:
from_unique = unique_multi_key(resource, foreignKey['fields'])
from_cardinality = 'one' if from_unique else 'many'
to_resource = package.get_resource(foreignKey['reference']['resource'])
to_unique = unique_multi_key(to_resource, foreignKey['reference']['fields'])
to_cardinality = 'one' if to_unique else 'many'
foreignKey['udi:cardinality'] = {
"from": from_cardinality,
"to": to_cardinality,
}
print('\n...exporting')
package.to_json(out_path)
return json.load(open(out_path, 'r'))
def unique_multi_key(resource, key_fields):
"""
Check if the combination of fields in the dataframe is unique.
"""
if len(key_fields) == 0:
raise ValueError("No fields provided")
if len(key_fields) == 1:
key_field = key_fields[0]
fields = resource.schema.fields
field = next((f for f in fields if f.name == key_field), None)
if field is None:
raise ValueError(f"Field '{key_field}' not found in resource schema")
return field.custom.get('udi:unique', False)
else:
df = resource.to_pandas()
df = df.reset_index()
if df.empty:
# Can't really determine based on empty data so give "safer" answer.
return False
return len(df[key_fields].drop_duplicates()) == df.shape[0]
def ephemeral_print(message):
sys.stdout.write("\r\033[K") # Clear the line
sys.stdout.write(f"\t{message}")
sys.stdout.flush()
def infer_data_type(field):
"""
Infer the simplified data type (ordinal, nominal, quantitative) of a data package field.
https://datapackage.org/standard/table-schema/#field
"""
if field.custom.get("categories"):
if field.custom.get('categoriesOrdered'):
return "ordinal"
return "nominal"
if field.custom.get('enum') is not None:
return "nominal"
type = field.type
if type is None:
return "unknown"
if type == "boolean":
return "nominal"
if type == "integer" or type == "number":
return "quantitative"
if type == "string":
# TODO: I could check the cardinality
return "nominal"
else:
return "other"
if __name__ == "__main__":
main()