Skip to content

Commit 05c97b2

Browse files
authoredFeb 5, 2025
feat: implement primary key check
feat: implement primary key check
2 parents 4fbe31b + 20e1edc commit 05c97b2

15 files changed

+2231
-123
lines changed
 

‎.dockerignore

+8-1
Original file line numberDiff line numberDiff line change
@@ -1 +1,8 @@
1-
data_check/.streamlit/secrets.toml
1+
data_check/.streamlit/secrets.toml
2+
.venv/
3+
build/
4+
.vscode/
5+
.git/
6+
.gitignore
7+
.dockerignore
8+
.DS_Store

‎.gitignore

+5
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,8 @@
66
# Standard Python ignore list
77
*.py[cod]
88
__pycache__/
9+
10+
# Python package
11+
build/
12+
dist/
13+
data_check.egg-info/

‎.python-version

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
3.9

‎.vscode/launch.json

+4-4
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@
33
"configurations": [
44
{
55
"name": "debug streamlit",
6-
"type": "python",
6+
"type": "debugpy",
77
"request": "launch",
8-
"python": "/Users/antoineballiet/Documents/GitHub/data-check/.venv/bin/python",
9-
"program": "/Users/antoineballiet/Documents/GitHub/data-check/.venv/bin/streamlit",
8+
"python": "${workspaceFolder}/.venv/bin/python",
9+
"program": "${workspaceFolder}/.venv/bin/streamlit",
1010
"args": [
1111
"run",
12-
"data_check/streamlit_app.py"
12+
"${workspaceFolder}/data_check/streamlit_app.py"
1313
]
1414
}
1515
]

‎Dockerfile

+3-5
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,12 @@ RUN apt-get update && apt-get install -y \
99
git \
1010
&& rm -rf /var/lib/apt/lists/*
1111

12-
COPY requirements.txt .
12+
COPY . .
1313

14-
RUN pip3 install -r requirements.txt
15-
16-
COPY data_check/ .
14+
RUN pip install .
1715

1816
EXPOSE 8501
1917

2018
HEALTHCHECK CMD curl --fail http://localhost:8501/_stcore/health
2119

22-
ENTRYPOINT ["streamlit", "run", "streamlit_app.py", "--server.port=8501", "--server.address=0.0.0.0"]
20+
ENTRYPOINT ["streamlit", "run", "data_check/streamlit_app.py", "--server.port=8501", "--server.address=0.0.0.0"]

‎data_check/data_processor.py

+19-2
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@
22
from typing import List, Tuple
33

44
import pandas as pd
5-
from models.table import TableSchema
6-
from query_client import QueryClient
75
from sqlglot import parse_one
86
from sqlglot.expressions import Select
97

8+
from .models.table import TableSchema
9+
from .query_client import QueryClient
10+
1011

1112
class DataProcessor(ABC):
1213
def __init__(
@@ -100,6 +101,11 @@ def get_query_insight_tables_primary_keys(self) -> Select:
100101
"""Compare the primary keys of two tables"""
101102
pass
102103

104+
@abstractmethod
105+
def get_query_check_primary_keys_unique(self, table_name: str) -> Select:
106+
"""Check if the primary keys are unique for a given row"""
107+
pass
108+
103109
@abstractmethod
104110
def get_query_exclusive_primary_keys(self, exclusive_to: str) -> Select:
105111
pass
@@ -247,6 +253,17 @@ def get_plain_diff(
247253
)
248254
df = self.client.run_query_to_dataframe(query)
249255
return query, df
256+
257+
def run_query_check_primary_keys_unique(self, table: str) -> Tuple[bool, str]:
258+
"""Check if the primary keys are unique for a given row"""
259+
query = self.get_query_check_primary_keys_unique(table_name=table)
260+
df = self.client.run_query_to_dataframe(query)
261+
262+
if not df.empty:
263+
error_message = f"Primary key is not unique for {table}: . You can use the query: {query.sql()} to check it."
264+
return False, error_message
265+
266+
return True, ""
250267

251268
def run_query_compare_primary_keys(self) -> pd.DataFrame:
252269
"""Compare the primary keys of two tables"""

‎data_check/processors/bigquery.py

+19-7
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1-
from data_processor import DataProcessor
2-
from models.table import TableSchema
3-
from processors.utils import add_suffix_to_column_names
4-
from query.query_bq import QueryBigQuery
51
from sqlglot import alias, column, condition, func, parse_one, select
62
from sqlglot.expressions import Select
73

4+
from data_check.data_processor import DataProcessor
5+
from data_check.models.table import TableSchema
6+
from data_check.query.query_bq import QueryBigQuery
7+
8+
from .utils import add_suffix_to_column_names
9+
810

911
class BigQueryProcessor(DataProcessor):
1012
def __init__(self, query1: str, query2: str) -> None:
@@ -70,7 +72,7 @@ def get_query_insight_tables_primary_keys(self) -> Select:
7072
)
7173

7274
query = (
73-
self.with_statement_query.with_("agg_diff_keys", as_=agg_diff_keys)
75+
self.with_statement_query_sampled.with_("agg_diff_keys", as_=agg_diff_keys)
7476
.select(
7577
"total_rows",
7678
"missing_primary_key_in_table1",
@@ -89,6 +91,16 @@ def get_query_insight_tables_primary_keys(self) -> Select:
8991

9092
return query
9193

94+
def get_query_check_primary_keys_unique(self, table_name: str) -> Select:
95+
"""Check if the primary keys are unique for a given row"""
96+
return (
97+
self.with_statement_query_sampled.select(
98+
alias(func("count", "*"), "total_rows"),
99+
).from_(table_name, dialect=self.dialect).group_by(self.primary_key).having(
100+
func("count", "*") > 1
101+
)
102+
)
103+
92104
def get_query_exclusive_primary_keys(
93105
self, exclusive_to: str, limit: int = 500
94106
) -> Select:
@@ -102,7 +114,7 @@ def get_query_exclusive_primary_keys(
102114
)
103115

104116
return (
105-
self.with_statement_query.select(
117+
self.with_statement_query_sampled.select(
106118
column(self.primary_key, table="table1"), *table1_columns_renamed
107119
)
108120
.from_("table1")
@@ -119,7 +131,7 @@ def get_query_exclusive_primary_keys(
119131
)
120132

121133
return (
122-
self.with_statement_query.select(
134+
self.with_statement_query_sampled.select(
123135
column(self.primary_key, table="table2"), *table1_columns_renamed
124136
)
125137
.from_("table2")

‎data_check/query/query_bq.py

+5-6
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
11
from os import getenv
22
from threading import Thread
3+
34
import pandas as pd
45
import streamlit as st
56
from google.cloud import bigquery
7+
from google.cloud.bigquery.job import QueryJob
68
from google.oauth2 import service_account
7-
from google.cloud.bigquery._helpers import TimeoutType
8-
from models.table import TableSchema
9-
from query_client import QueryClient
109
from sqlglot.expressions import Select
11-
from google.cloud.bigquery.job import (
12-
QueryJob,
13-
)
10+
11+
from data_check.models.table import TableSchema
12+
from data_check.query_client import QueryClient
1413

1514
USE_STREAMLIT_SECRET = getenv("USE_STREAMLIT_SECRET", False)
1615
TIMEOUT_BIGQUERY = 900 # 15 * 60 = 15 minutes

‎data_check/query_client.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
from abc import ABC, abstractmethod
22

33
import pandas as pd
4-
from models.table import TableSchema
54
from sqlglot.expressions import Select
65

6+
from .models.table import TableSchema
7+
78

89
class QueryClient(ABC):
910
###### ABSTRACT METHODS ######

‎data_check/streamlit_app.py

+19-5
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import pandas as pd
22
import streamlit as st
3-
from data_formatter import (highlight_diff_dataset, style_gradient,
4-
style_percentage)
5-
from processors.bigquery import BigQueryProcessor
3+
4+
from data_check.data_formatter import (highlight_diff_dataset, style_gradient,
5+
style_percentage)
6+
from data_check.processors.bigquery import BigQueryProcessor
67

78

89
class DataDiff:
@@ -152,7 +153,7 @@ def second_step(self):
152153
)
153154

154155
st.selectbox(
155-
"Select primary key:",
156+
"Select primary key (must be unique for a given row):",
156157
common_table_schema.columns_names,
157158
key="temp_primary_key",
158159
index=primary_key_select_index,
@@ -172,7 +173,7 @@ def second_step(self):
172173
)
173174

174175
st.slider(
175-
"Data sampling (only avaible for direct tables as input)",
176+
"Data sampling (only available for direct tables as input)",
176177
min_value=10,
177178
max_value=100,
178179
step=1,
@@ -204,6 +205,19 @@ def window(self):
204205
)
205206

206207
if st.session_state.loaded_tables:
208+
209+
st.write("Checking primary keys are unique for a given row...")
210+
211+
primary_keys_unique_table1, error_message_table1 = processor.run_query_check_primary_keys_unique(table="table1")
212+
primary_keys_unique_table2, error_message_table2 = processor.run_query_check_primary_keys_unique(table="table2")
213+
214+
if not primary_keys_unique_table1 or not primary_keys_unique_table2:
215+
st.write("Primary keys are not unique for a given row ❌")
216+
st.write(error_message_table1)
217+
st.write(error_message_table2)
218+
st.stop()
219+
220+
st.write("Primary keys are unique for a given row ✅")
207221
# Using BigQueryClient to run queries, output primary keys in common and exclusive to each table on streamlit : display rows in table format
208222
st.write("Analyzing primary keys...")
209223
results_primary_keys = processor.run_query_compare_primary_keys()

‎pyproject.toml

+24-2
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,24 @@
1-
[tool.pytest.ini_options]
2-
pythonpath = "data_check"
1+
[project]
2+
name = "data_check"
3+
version = "0.1.1"
4+
description = "Streamlit app to compare bigquery tables"
5+
readme = "README.md"
6+
requires-python = ">=3.9"
7+
dependencies = [
8+
"db-dtypes>=1.4.0",
9+
"google-cloud-bigquery>=3.29.0",
10+
"pandas>=2.2.3",
11+
"pandas-gbq>=0.26.1",
12+
"seaborn>=0.13.2",
13+
"sqlglot>=26.4.1",
14+
"streamlit==1.31.1",
15+
"streamlit-tags>=1.2.8",
16+
"tqdm>=4.67.1",
17+
]
18+
19+
[dependency-groups]
20+
dev = [
21+
"isort>=6.0.0",
22+
"pytest>=8.3.4",
23+
"ruff>=0.9.4",
24+
]

‎tests/processors/test_bigquery.py

+90-11
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,27 @@
1-
from data_check.processors.bigquery import BigQueryProcessor
1+
import pytest
22

3+
from data_check.models.table import ColumnSchema, TableSchema
4+
from data_check.processors.bigquery import BigQueryProcessor
35

4-
def test_bigquery_processor_init():
5-
query1 = "select * from `my-project.my_dataset.table1`"
6-
query2 = "select * from `my-project.my_dataset.table2`"
6+
QUERY_1 = "select * from `my-project.my_dataset.table1`"
7+
QUERY_2 = "select * from `my-project.my_dataset.table2`"
78

8-
result = BigQueryProcessor(query1, query2)
9+
# Create fixture for BigQueryProcessor
10+
@pytest.fixture
11+
def bigquery_processor() -> BigQueryProcessor:
12+
processor = BigQueryProcessor(QUERY_1, QUERY_2)
13+
processor.set_config_data(
14+
primary_key="A",
15+
columns_to_compare=["B", "C"],
16+
sampling_rate=100,
17+
)
18+
return processor
919

10-
assert result.query1 == query1
11-
assert result.query2 == query2
12-
assert result.dialect == "bigquery"
13-
assert result.client.__class__.__name__ == "QueryBigQuery"
20+
def test_bigquery_processor_init(bigquery_processor: BigQueryProcessor):
21+
assert bigquery_processor.query1.sql() == 'SELECT * FROM "my-project"."my_dataset"."table1"'
22+
assert bigquery_processor.query2.sql() == 'SELECT * FROM "my-project"."my_dataset"."table2"'
23+
assert bigquery_processor.dialect == "bigquery"
24+
assert bigquery_processor.client.__class__.__name__ == "QueryBigQuery"
1425

1526

1627
def test_bigquery_processor_init_with_table():
@@ -19,5 +30,73 @@ def test_bigquery_processor_init_with_table():
1930

2031
result = BigQueryProcessor(table1, table2)
2132

22-
assert result.query1 == "select * from `my-project.my_dataset.table1`"
23-
assert result.query2 == "select * from `my-project.my_dataset.table2`"
33+
assert result.query1.sql() == 'SELECT * FROM my-project.my_dataset.table1'
34+
assert result.query2.sql() == 'SELECT * FROM my-project.my_dataset.table2'
35+
36+
def test_get_query_plain_diff_tables():
37+
38+
processor = BigQueryProcessor("table1", "table2")
39+
processor.set_config_data(
40+
primary_key="A",
41+
columns_to_compare=["B", "C"],
42+
sampling_rate=100,
43+
)
44+
45+
result = processor.get_query_plain_diff_tables(
46+
common_table_schema=TableSchema(
47+
table_name="common",
48+
columns=[
49+
ColumnSchema(name="B", field_type="INTEGER", mode="NULLABLE"),
50+
ColumnSchema(name="C", field_type="STRING", mode="NULLABLE"),
51+
],
52+
)
53+
)
54+
55+
assert (
56+
result.sql()
57+
== f"""WITH table1 AS (SELECT * FROM table1), table2 AS (SELECT * FROM table2), inner_merged AS (SELECT table1.A, table1.B AS B__1, table2.B AS B__2, table1.C AS C__1, table2.C AS C__2 FROM table1 INNER JOIN table2 USING (A)), final_result AS (SELECT * FROM inner_merged WHERE COALESCE(CAST(B__1 AS TEXT), 'none') <> COALESCE(CAST(B__2 AS TEXT), 'none') OR COALESCE(C__1, 'none') <> COALESCE(C__2, 'none')) SELECT * FROM final_result"""
58+
)
59+
60+
61+
def test_query_ratio_common_values_per_column():
62+
63+
processor = BigQueryProcessor("table1", "table2")
64+
processor.set_config_data(
65+
primary_key="A",
66+
columns_to_compare=["B", "C"],
67+
sampling_rate=100,
68+
)
69+
70+
result = processor.query_ratio_common_values_per_column(
71+
common_table_schema=TableSchema(
72+
table_name="common",
73+
columns=[
74+
ColumnSchema(name="A", field_type="INTEGER", mode="NULLABLE"),
75+
ColumnSchema(name="B", field_type="INTEGER", mode="NULLABLE"),
76+
ColumnSchema(name="C", field_type="STRING", mode="NULLABLE"),
77+
],
78+
)
79+
)
80+
81+
assert (
82+
result.sql()
83+
== f"WITH table1 AS (SELECT * FROM table1), table2 AS (SELECT * FROM table2), count_diff AS (SELECT COUNT(A) AS count_common, COUNT_IF(NOT COALESCE(CAST(table1.A AS TEXT), CAST(table2.A AS TEXT)) IS NULL) AS A_count_not_null, COUNT_IF(COALESCE(CAST(table1.A AS TEXT), 'none') = COALESCE(CAST(table2.A AS TEXT), 'non')) AS A, COUNT_IF(NOT COALESCE(CAST(table1.B AS TEXT), CAST(table2.B AS TEXT)) IS NULL) AS B_count_not_null, COUNT_IF(COALESCE(CAST(table1.B AS TEXT), 'none') = COALESCE(CAST(table2.B AS TEXT), 'non')) AS B, COUNT_IF(NOT COALESCE(table1.C, table2.C) IS NULL) AS C_count_not_null, COUNT_IF(COALESCE(table1.C, 'none') = COALESCE(table2.C, 'non')) AS C FROM table1 INNER JOIN table2 USING (A)), final_result AS (SELECT STRUCT(CASE WHEN count_common <> 0 THEN A_count_not_null / count_common ELSE NULL END AS ratio_not_null, CASE WHEN A_count_not_null <> 0 THEN A / A_count_not_null ELSE NULL END AS ratio_equal) AS A, STRUCT(CASE WHEN count_common <> 0 THEN B_count_not_null / count_common ELSE NULL END AS ratio_not_null, CASE WHEN B_count_not_null <> 0 THEN B / B_count_not_null ELSE NULL END AS ratio_equal) AS B, STRUCT(CASE WHEN count_common <> 0 THEN C_count_not_null / count_common ELSE NULL END AS ratio_not_null, CASE WHEN C_count_not_null <> 0 THEN C / C_count_not_null ELSE NULL END AS ratio_equal) AS C FROM count_diff) SELECT * FROM final_result"
84+
)
85+
86+
87+
def test_get_query_check_primary_keys_unique(bigquery_processor: BigQueryProcessor):
88+
query1 = bigquery_processor.get_query_check_primary_keys_unique(table_name="table1")
89+
assert query1.sql() == 'WITH table1 AS (SELECT * FROM "my-project"."my_dataset"."table1"), table2 AS (SELECT * FROM "my-project"."my_dataset"."table2") SELECT COUNT(*) AS total_rows FROM table1 GROUP BY A HAVING COUNT(*) > 1'
90+
91+
92+
@pytest.mark.skip(reason="Need BigQuery credentials to run this test")
93+
def test_run_query_check_primary_keys_unique():
94+
processor = BigQueryProcessor("MY_TABLE", "MY_TABLE_2")
95+
processor.set_config_data(
96+
primary_key="user_id",
97+
columns_to_compare=["user_id", "account_automation"],
98+
sampling_rate=100,
99+
)
100+
101+
result = processor.run_query_check_primary_keys_unique(table="table1")
102+
assert result == (True, "")

‎tests/test_data_processor.py

-71
This file was deleted.

‎tests/test_table.py

+2-8
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,5 @@ def test_TableSchema_get_common_columns():
1919
],
2020
)
2121

22-
assert table1.get_common_columns(other=table2) == [
23-
ColumnSchema(name="col2", field_type="STRING", mode="NULLABLE"),
24-
ColumnSchema(name="col3", field_type="STRING", mode="NULLABLE"),
25-
]
26-
assert table2.get_common_columns(other=table1) == [
27-
ColumnSchema(name="col2", field_type="STRING", mode="NULLABLE"),
28-
ColumnSchema(name="col3", field_type="STRING", mode="NULLABLE"),
29-
]
22+
assert set(table1.get_common_column_names(other=table2)) == set(["col2", "col3"])
23+
assert set(table2.get_common_column_names(other=table1)) == set(["col2", "col3"])

‎uv.lock

+2,030
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)
Please sign in to comment.