-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathqualityTests.py
More file actions
70 lines (57 loc) · 2.47 KB
/
qualityTests.py
File metadata and controls
70 lines (57 loc) · 2.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
####File for quality checks functions for spark####
def check_unique_keys(df,table_name="", primary_key=""):
"""
Function to check table primary key is unique in nature by comparing
distinct number of primary key values with total number of rows, a
value error is raised
"""
num_rows = df.count()
num_unique_identifier_rows = df.dropDuplicates(subset=[primary_key]).count()
if num_rows == num_unique_identifier_rows:
print("{} has unique rows, and primary key constraint is not violated".format(table_name))
else:
raise ValueError("{} uniqueness violated , duplicated data detected".format(table_name))
def ensure_no_nulls(df, column=""):
"""
Function to check for null counts on spark dataframe columns,
a value error is raised if column has null values
"""
null_counts = df.filter("{} is NULL".format(column)).count()
if null_counts == 0:
print("{} doesnt have any nulls".format(column))
else:
raise ValueError("{} violated the null constraint, \
cannot contain nulls".format(column))
def check_data_type(df, column="", datatype=""):
"""
Function to check datatype matches for spark dataframe.
If mismatch is detected then value error is raised, and if column
is not found then key error is raised
"""
for _ in df.schema:
if _.name == column:
if str(_.dataType) == datatype:
print("datatype match for column {} having {} values".format(column, datatype))
break
else:
raise ValueError("""datatype mismatch detected for column {} .
Expected {} but got {}""".format(column, datatype, _.dataType))
def check_greater_that_zero(df):
"""
Function to perform greater than 0 rows check for spark df.
If check fails a value error is raised
"""
if df.count() > 0:
print("Greater than 0 test passed for the table")
else:
raise ValueError("Table has 0 rows, data may not have loaded correctly")
def match_source_input(df_input, df_output):
"""
Function to check if data pushed to a location matches
data before pushing, to ensure data completeness, else
value error is raised
"""
if df_input.count() == df_output.count():
print("Data pushed has complete data")
else:
raise ValueError("Data at source doesnt match data at destination")