Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement primary key check #15

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
@@ -1 +1,8 @@
data_check/.streamlit/secrets.toml
data_check/.streamlit/secrets.toml
.venv/
build/
.vscode/
.git/
.gitignore
.dockerignore
.DS_Store
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,8 @@
# Standard Python ignore list
*.py[cod]
__pycache__/

# Python package
build/
dist/
data_check.egg-info/
1 change: 1 addition & 0 deletions .python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.9
8 changes: 4 additions & 4 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
"configurations": [
{
"name": "debug streamlit",
"type": "python",
"type": "debugpy",
"request": "launch",
"python": "/Users/antoineballiet/Documents/GitHub/data-check/.venv/bin/python",
"program": "/Users/antoineballiet/Documents/GitHub/data-check/.venv/bin/streamlit",
"python": "${workspaceFolder}/.venv/bin/python",
"program": "${workspaceFolder}/.venv/bin/streamlit",
"args": [
"run",
"data_check/streamlit_app.py"
"${workspaceFolder}/data_check/streamlit_app.py"
]
}
]
Expand Down
8 changes: 3 additions & 5 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,12 @@ RUN apt-get update && apt-get install -y \
git \
&& rm -rf /var/lib/apt/lists/*

COPY requirements.txt .
COPY . .

RUN pip3 install -r requirements.txt

COPY data_check/ .
RUN pip install .

EXPOSE 8501

HEALTHCHECK CMD curl --fail http://localhost:8501/_stcore/health

ENTRYPOINT ["streamlit", "run", "streamlit_app.py", "--server.port=8501", "--server.address=0.0.0.0"]
ENTRYPOINT ["streamlit", "run", "data_check/streamlit_app.py", "--server.port=8501", "--server.address=0.0.0.0"]
21 changes: 19 additions & 2 deletions data_check/data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
from typing import List, Tuple

import pandas as pd
from models.table import TableSchema
from query_client import QueryClient
from sqlglot import parse_one
from sqlglot.expressions import Select

from .models.table import TableSchema
from .query_client import QueryClient


class DataProcessor(ABC):
def __init__(
Expand Down Expand Up @@ -100,6 +101,11 @@ def get_query_insight_tables_primary_keys(self) -> Select:
"""Compare the primary keys of two tables"""
pass

@abstractmethod
def get_query_check_primary_keys_unique(self, table_name: str) -> Select:
"""Check if the primary keys are unique for a given row"""
pass

@abstractmethod
def get_query_exclusive_primary_keys(self, exclusive_to: str) -> Select:
pass
Expand Down Expand Up @@ -247,6 +253,17 @@ def get_plain_diff(
)
df = self.client.run_query_to_dataframe(query)
return query, df

def run_query_check_primary_keys_unique(self, table: str) -> Tuple[bool, str]:
"""Check if the primary keys are unique for a given row"""
query = self.get_query_check_primary_keys_unique(table_name=table)
df = self.client.run_query_to_dataframe(query)

if not df.empty:
error_message = f"Primary key is not unique for {table}: . You can use the query: {query.sql()} to check it."
return False, error_message

return True, ""

def run_query_compare_primary_keys(self) -> pd.DataFrame:
"""Compare the primary keys of two tables"""
Expand Down
26 changes: 19 additions & 7 deletions data_check/processors/bigquery.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from data_processor import DataProcessor
from models.table import TableSchema
from processors.utils import add_suffix_to_column_names
from query.query_bq import QueryBigQuery
from sqlglot import alias, column, condition, func, parse_one, select
from sqlglot.expressions import Select

from data_check.data_processor import DataProcessor
from data_check.models.table import TableSchema
from data_check.query.query_bq import QueryBigQuery

from .utils import add_suffix_to_column_names


class BigQueryProcessor(DataProcessor):
def __init__(self, query1: str, query2: str) -> None:
Expand Down Expand Up @@ -70,7 +72,7 @@ def get_query_insight_tables_primary_keys(self) -> Select:
)

query = (
self.with_statement_query.with_("agg_diff_keys", as_=agg_diff_keys)
self.with_statement_query_sampled.with_("agg_diff_keys", as_=agg_diff_keys)
.select(
"total_rows",
"missing_primary_key_in_table1",
Expand All @@ -89,6 +91,16 @@ def get_query_insight_tables_primary_keys(self) -> Select:

return query

def get_query_check_primary_keys_unique(self, table_name: str) -> Select:
"""Check if the primary keys are unique for a given row"""
return (
self.with_statement_query_sampled.select(
alias(func("count", "*"), "total_rows"),
).from_(table_name, dialect=self.dialect).group_by(self.primary_key).having(
func("count", "*") > 1
)
)

def get_query_exclusive_primary_keys(
self, exclusive_to: str, limit: int = 500
) -> Select:
Expand All @@ -102,7 +114,7 @@ def get_query_exclusive_primary_keys(
)

return (
self.with_statement_query.select(
self.with_statement_query_sampled.select(
column(self.primary_key, table="table1"), *table1_columns_renamed
)
.from_("table1")
Expand All @@ -119,7 +131,7 @@ def get_query_exclusive_primary_keys(
)

return (
self.with_statement_query.select(
self.with_statement_query_sampled.select(
column(self.primary_key, table="table2"), *table1_columns_renamed
)
.from_("table2")
Expand Down
11 changes: 5 additions & 6 deletions data_check/query/query_bq.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
from os import getenv
from threading import Thread

import pandas as pd
import streamlit as st
from google.cloud import bigquery
from google.cloud.bigquery.job import QueryJob
from google.oauth2 import service_account
from google.cloud.bigquery._helpers import TimeoutType
from models.table import TableSchema
from query_client import QueryClient
from sqlglot.expressions import Select
from google.cloud.bigquery.job import (
QueryJob,
)

from data_check.models.table import TableSchema
from data_check.query_client import QueryClient

USE_STREAMLIT_SECRET = getenv("USE_STREAMLIT_SECRET", False)
TIMEOUT_BIGQUERY = 900 # 15 * 60 = 15 minutes
Expand Down
3 changes: 2 additions & 1 deletion data_check/query_client.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from abc import ABC, abstractmethod

import pandas as pd
from models.table import TableSchema
from sqlglot.expressions import Select

from .models.table import TableSchema


class QueryClient(ABC):
###### ABSTRACT METHODS ######
Expand Down
24 changes: 19 additions & 5 deletions data_check/streamlit_app.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import pandas as pd
import streamlit as st
from data_formatter import (highlight_diff_dataset, style_gradient,
style_percentage)
from processors.bigquery import BigQueryProcessor

from data_check.data_formatter import (highlight_diff_dataset, style_gradient,
style_percentage)
from data_check.processors.bigquery import BigQueryProcessor


class DataDiff:
Expand Down Expand Up @@ -152,7 +153,7 @@ def second_step(self):
)

st.selectbox(
"Select primary key:",
"Select primary key (must be unique for a given row):",
common_table_schema.columns_names,
key="temp_primary_key",
index=primary_key_select_index,
Expand All @@ -172,7 +173,7 @@ def second_step(self):
)

st.slider(
"Data sampling (only avaible for direct tables as input)",
"Data sampling (only available for direct tables as input)",
min_value=10,
max_value=100,
step=1,
Expand Down Expand Up @@ -204,6 +205,19 @@ def window(self):
)

if st.session_state.loaded_tables:

st.write("Checking primary keys are unique for a given row...")

primary_keys_unique_table1, error_message_table1 = processor.run_query_check_primary_keys_unique(table="table1")
primary_keys_unique_table2, error_message_table2 = processor.run_query_check_primary_keys_unique(table="table2")

if not primary_keys_unique_table1 or not primary_keys_unique_table2:
st.write("Primary keys are not unique for a given row ❌")
st.write(error_message_table1)
st.write(error_message_table2)
st.stop()

st.write("Primary keys are unique for a given row ✅")
# Using BigQueryClient to run queries, output primary keys in common and exclusive to each table on streamlit : display rows in table format
st.write("Analyzing primary keys...")
results_primary_keys = processor.run_query_compare_primary_keys()
Expand Down
26 changes: 24 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,24 @@
[tool.pytest.ini_options]
pythonpath = "data_check"
[project]
name = "data_check"
version = "0.1.1"
description = "Streamlit app to compare bigquery tables"
readme = "README.md"
requires-python = ">=3.9"
dependencies = [
"db-dtypes>=1.4.0",
"google-cloud-bigquery>=3.29.0",
"pandas>=2.2.3",
"pandas-gbq>=0.26.1",
"seaborn>=0.13.2",
"sqlglot>=26.4.1",
"streamlit==1.31.1",
"streamlit-tags>=1.2.8",
"tqdm>=4.67.1",
]

[dependency-groups]
dev = [
"isort>=6.0.0",
"pytest>=8.3.4",
"ruff>=0.9.4",
]
101 changes: 90 additions & 11 deletions tests/processors/test_bigquery.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,27 @@
from data_check.processors.bigquery import BigQueryProcessor
import pytest

from data_check.models.table import ColumnSchema, TableSchema
from data_check.processors.bigquery import BigQueryProcessor

def test_bigquery_processor_init():
query1 = "select * from `my-project.my_dataset.table1`"
query2 = "select * from `my-project.my_dataset.table2`"
QUERY_1 = "select * from `my-project.my_dataset.table1`"
QUERY_2 = "select * from `my-project.my_dataset.table2`"

result = BigQueryProcessor(query1, query2)
# Create fixture for BigQueryProcessor
@pytest.fixture
def bigquery_processor() -> BigQueryProcessor:
processor = BigQueryProcessor(QUERY_1, QUERY_2)
processor.set_config_data(
primary_key="A",
columns_to_compare=["B", "C"],
sampling_rate=100,
)
return processor

assert result.query1 == query1
assert result.query2 == query2
assert result.dialect == "bigquery"
assert result.client.__class__.__name__ == "QueryBigQuery"
def test_bigquery_processor_init(bigquery_processor: BigQueryProcessor):
assert bigquery_processor.query1.sql() == 'SELECT * FROM "my-project"."my_dataset"."table1"'
assert bigquery_processor.query2.sql() == 'SELECT * FROM "my-project"."my_dataset"."table2"'
assert bigquery_processor.dialect == "bigquery"
assert bigquery_processor.client.__class__.__name__ == "QueryBigQuery"


def test_bigquery_processor_init_with_table():
Expand All @@ -19,5 +30,73 @@ def test_bigquery_processor_init_with_table():

result = BigQueryProcessor(table1, table2)

assert result.query1 == "select * from `my-project.my_dataset.table1`"
assert result.query2 == "select * from `my-project.my_dataset.table2`"
assert result.query1.sql() == 'SELECT * FROM my-project.my_dataset.table1'
assert result.query2.sql() == 'SELECT * FROM my-project.my_dataset.table2'

def test_get_query_plain_diff_tables():

processor = BigQueryProcessor("table1", "table2")
processor.set_config_data(
primary_key="A",
columns_to_compare=["B", "C"],
sampling_rate=100,
)

result = processor.get_query_plain_diff_tables(
common_table_schema=TableSchema(
table_name="common",
columns=[
ColumnSchema(name="B", field_type="INTEGER", mode="NULLABLE"),
ColumnSchema(name="C", field_type="STRING", mode="NULLABLE"),
],
)
)

assert (
result.sql()
== f"""WITH table1 AS (SELECT * FROM table1), table2 AS (SELECT * FROM table2), inner_merged AS (SELECT table1.A, table1.B AS B__1, table2.B AS B__2, table1.C AS C__1, table2.C AS C__2 FROM table1 INNER JOIN table2 USING (A)), final_result AS (SELECT * FROM inner_merged WHERE COALESCE(CAST(B__1 AS TEXT), 'none') <> COALESCE(CAST(B__2 AS TEXT), 'none') OR COALESCE(C__1, 'none') <> COALESCE(C__2, 'none')) SELECT * FROM final_result"""
)


def test_query_ratio_common_values_per_column():

processor = BigQueryProcessor("table1", "table2")
processor.set_config_data(
primary_key="A",
columns_to_compare=["B", "C"],
sampling_rate=100,
)

result = processor.query_ratio_common_values_per_column(
common_table_schema=TableSchema(
table_name="common",
columns=[
ColumnSchema(name="A", field_type="INTEGER", mode="NULLABLE"),
ColumnSchema(name="B", field_type="INTEGER", mode="NULLABLE"),
ColumnSchema(name="C", field_type="STRING", mode="NULLABLE"),
],
)
)

assert (
result.sql()
== f"WITH table1 AS (SELECT * FROM table1), table2 AS (SELECT * FROM table2), count_diff AS (SELECT COUNT(A) AS count_common, COUNT_IF(NOT COALESCE(CAST(table1.A AS TEXT), CAST(table2.A AS TEXT)) IS NULL) AS A_count_not_null, COUNT_IF(COALESCE(CAST(table1.A AS TEXT), 'none') = COALESCE(CAST(table2.A AS TEXT), 'non')) AS A, COUNT_IF(NOT COALESCE(CAST(table1.B AS TEXT), CAST(table2.B AS TEXT)) IS NULL) AS B_count_not_null, COUNT_IF(COALESCE(CAST(table1.B AS TEXT), 'none') = COALESCE(CAST(table2.B AS TEXT), 'non')) AS B, COUNT_IF(NOT COALESCE(table1.C, table2.C) IS NULL) AS C_count_not_null, COUNT_IF(COALESCE(table1.C, 'none') = COALESCE(table2.C, 'non')) AS C FROM table1 INNER JOIN table2 USING (A)), final_result AS (SELECT STRUCT(CASE WHEN count_common <> 0 THEN A_count_not_null / count_common ELSE NULL END AS ratio_not_null, CASE WHEN A_count_not_null <> 0 THEN A / A_count_not_null ELSE NULL END AS ratio_equal) AS A, STRUCT(CASE WHEN count_common <> 0 THEN B_count_not_null / count_common ELSE NULL END AS ratio_not_null, CASE WHEN B_count_not_null <> 0 THEN B / B_count_not_null ELSE NULL END AS ratio_equal) AS B, STRUCT(CASE WHEN count_common <> 0 THEN C_count_not_null / count_common ELSE NULL END AS ratio_not_null, CASE WHEN C_count_not_null <> 0 THEN C / C_count_not_null ELSE NULL END AS ratio_equal) AS C FROM count_diff) SELECT * FROM final_result"
)


def test_get_query_check_primary_keys_unique(bigquery_processor: BigQueryProcessor):
query1 = bigquery_processor.get_query_check_primary_keys_unique(table_name="table1")
assert query1.sql() == 'WITH table1 AS (SELECT * FROM "my-project"."my_dataset"."table1"), table2 AS (SELECT * FROM "my-project"."my_dataset"."table2") SELECT COUNT(*) AS total_rows FROM table1 GROUP BY A HAVING COUNT(*) > 1'


@pytest.mark.skip(reason="Need BigQuery credentials to run this test")
def test_run_query_check_primary_keys_unique():
processor = BigQueryProcessor("MY_TABLE", "MY_TABLE_2")
processor.set_config_data(
primary_key="user_id",
columns_to_compare=["user_id", "account_automation"],
sampling_rate=100,
)

result = processor.run_query_check_primary_keys_unique(table="table1")
assert result == (True, "")
Loading