Skip to content

Commit 60e5aa4

Browse files
HOTFIX: Matomo config dump postgres
1 parent 7e8de8d commit 60e5aa4

File tree

3 files changed

+120
-39
lines changed

3 files changed

+120
-39
lines changed

dags/data_utils/doc_helpscout.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from airflow.models import Variable
55
from requests.auth import HTTPBasicAuth
66

7-
from data_utils.grist.grist_helper import get_grist_api
7+
from .grist.grist_helper import get_grist_api
88

99
connection_helpscout = BaseHook.get_connection("helpscout")
1010
assert connection_helpscout.login is not None

dags/data_utils/matomo_pull/matomo_helper.py

Lines changed: 22 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,7 @@
44

55
from .matomo_campaign_helper import process_dataframe_for_campaign
66
from .matomo_request_config import matomo_requests_config
7-
from ..postgres_helper import (
8-
get_postgres_connection,
9-
clean_data_in_postgres,
10-
dump_data_to_postgres,
11-
)
7+
from .matomo_postgres_helper import get_postgres_connection, clean_data_in_postgres, dump_data_to_postgres
128
from .matomo_url import get_matomo_base_url, construct_url
139
import logging
1410

@@ -19,13 +15,13 @@
1915

2016
def parse_range_data(raw_data):
2117
for entry in raw_data:
22-
if entry.get("subtable"):
23-
for sub_entry in entry["subtable"]:
24-
sub_entry["sub_type"] = entry["label"]
18+
if entry.get('subtable'):
19+
for sub_entry in entry['subtable']:
20+
sub_entry['sub_type'] = entry["label"]
2521
raw_data.append(sub_entry)
26-
entry.pop("subtable")
27-
if entry.get("goals"):
28-
entry.pop("goals")
22+
entry.pop('subtable')
23+
if entry.get('goals'):
24+
entry.pop('goals')
2925
return raw_data
3026

3127

@@ -34,11 +30,11 @@ def fetch_data_for_day(base_url, report_name, config, day):
3430
"""Fetches data from Matomo for a specific day and returns it as a DataFrame."""
3531
url = construct_url(base_url, config, day)
3632
try:
37-
response = http.request("GET", url)
38-
raw_data = json.loads(response.data.decode("utf-8"))
33+
response = http.request('GET', url)
34+
raw_data = json.loads(response.data.decode('utf-8'))
3935

4036
# Check if the response contains errors
41-
if isinstance(raw_data, dict) and raw_data.get("result") == "error":
37+
if isinstance(raw_data, dict) and raw_data.get('result') == 'error':
4238
error_message = f"Error fetching data for {report_name} on {day}: {raw_data.get('message')}"
4339
raise Exception(error_message)
4440

@@ -49,58 +45,46 @@ def fetch_data_for_day(base_url, report_name, config, day):
4945
elif isinstance(raw_data, dict):
5046
data = pd.json_normalize(raw_data)
5147
else:
52-
print(
53-
f"Unexpected data format for {report_name} on {day}: {type(raw_data)}"
54-
)
48+
print(f"Unexpected data format for {report_name} on {day}: {type(raw_data)}")
5549
return pd.DataFrame()
5650
# Add the date field to each row
57-
data["date"] = pd.to_datetime(day)
51+
data['date'] = pd.to_datetime(day)
5852
data_processed = process_dataframe_for_campaign(data)
5953
return data_processed
6054

6155
except Exception as e:
6256
error_message = f"Error fetching data for {report_name} on {day}: {str(e)}"
6357
raise Exception(error_message)
6458

65-
59+
# Fetch data from Matomo for each day in the date range and merge into a single DataFrame
6660
def fetch_data_from_matomo(base_url, report_name, config, start_date, end_date):
6761
"""Fetches data from Matomo for each day in the specified range and merges it into a single DataFrame."""
68-
date_range = (
69-
pd.date_range(start=start_date, end=end_date).strftime("%Y-%m-%d").tolist()
70-
)
71-
all_data = [
72-
fetch_data_for_day(base_url, report_name, config, day) for day in date_range
73-
]
62+
date_range = pd.date_range(start=start_date, end=end_date).strftime('%Y-%m-%d').tolist()
63+
all_data = [fetch_data_for_day(base_url, report_name, config, day) for day in date_range]
7464

7565
# Combine all non-empty DataFrames into a single DataFrame
7666
valid_data = [df for df in all_data if not df.empty]
7767
if valid_data:
7868
return pd.concat(valid_data, ignore_index=True)
7969
else:
80-
logger.warning(
81-
f"No data fetched for report '{report_name}' between {start_date} and {end_date}."
82-
)
70+
logger.warning(f"No data fetched for report '{report_name}' between {start_date} and {end_date}.")
8371
return pd.DataFrame()
8472

85-
73+
# Main function to fetch and dump data
8674
def fetch_and_dump_data(matomo_site_id, database, day):
87-
"""
88-
Main function to fetch and dump data
89-
"""
90-
start_date = (pd.to_datetime(day) - pd.Timedelta(days=1)).strftime("%Y-%m-%d")
75+
76+
start_date = (pd.to_datetime(day) - pd.Timedelta(days=1)).strftime('%Y-%m-%d')
9177
end_date = start_date
9278
base_url = get_matomo_base_url(matomo_site_id)
93-
connection = get_postgres_connection("matomo_postgres", database)
79+
connection = get_postgres_connection(database)
9480

9581
if not connection:
9682
error_message = "Cannot proceed without database connection."
9783
raise Exception(error_message)
9884

9985
for report_name, config in matomo_requests_config.items():
10086
print(f"Fetching data for {report_name}...")
101-
data = fetch_data_from_matomo(
102-
base_url, report_name, config, start_date, end_date
103-
)
87+
data = fetch_data_from_matomo(base_url, report_name, config, start_date, end_date)
10488

10589
if data is not None and not data.empty:
10690
# Clean existing data in the table before dumping new data
@@ -110,4 +94,4 @@ def fetch_and_dump_data(matomo_site_id, database, day):
11094
else:
11195
print(f"No data fetched for {report_name}, skipping clean and dump.")
11296

113-
connection.close()
97+
connection.close()
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
from sqlalchemy import create_engine, text
2+
from airflow.hooks.base import BaseHook
3+
from sqlalchemy import inspect
4+
5+
6+
def get_postgres_connection(database):
7+
"""Extracts PostgreSQL connection details from Airflow and establishes a connection."""
8+
try:
9+
# Retrieve the connection object using Airflow's BaseHook
10+
connection = BaseHook.get_connection('matomo_postgres')
11+
12+
# Extract connection details
13+
user = connection.login
14+
password = connection.password
15+
host = connection.host
16+
port = connection.port
17+
18+
# Create the SQLAlchemy engine
19+
engine = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}")
20+
conn = engine.connect()
21+
print("Successfully connected to the PostgreSQL database using Airflow connection.")
22+
return conn
23+
24+
except Exception as e:
25+
print(f"Failed to connect to PostgreSQL using Airflow connection: {e}")
26+
raise # Raise exception to ensure the DAG fails if the connection cannot be established
27+
28+
29+
# Clean data in PostgreSQL within the date range
30+
def clean_data_in_postgres(connection, table_name, start_date, end_date):
31+
"""Deletes rows in the table where the 'date' is between the start_date and end_date."""
32+
try:
33+
delete_query = text(
34+
f"DELETE FROM {table_name} WHERE date BETWEEN :start_date AND :end_date"
35+
)
36+
connection.execute(delete_query, {'start_date': start_date, 'end_date': end_date})
37+
print(f"Cleaned data in {table_name} between {start_date} and {end_date}.")
38+
except Exception as e:
39+
print(f"Failed to clean data in {table_name}: {e}")
40+
41+
42+
# Dump DataFrame to PostgreSQL table
43+
def dump_data_to_postgres(connection, data, table_name):
44+
"""
45+
Dumps the DataFrame into the specified PostgreSQL table, creating missing columns if necessary.
46+
47+
Parameters:
48+
connection: SQLAlchemy engine or connection object
49+
The connection to the PostgreSQL database.
50+
data: pandas.DataFrame
51+
The DataFrame containing the data to be dumped.
52+
table_name: str
53+
The name of the PostgreSQL table to insert the data into.
54+
"""
55+
56+
# Convert DataFrame columns to lowercase
57+
data.columns = [col.lower() for col in data.columns]
58+
59+
try:
60+
# Inspect the existing columns in the table
61+
inspector = inspect(connection)
62+
existing_columns = []
63+
if table_name in inspector.get_table_names():
64+
existing_columns = [col['name'] for col in inspector.get_columns(table_name)]
65+
66+
67+
# Identify missing columns
68+
missing_columns = set(data.columns) - set(existing_columns)
69+
70+
# Add missing columns to the table
71+
for column in missing_columns:
72+
dtype = data[column].dtype
73+
if dtype == 'int64':
74+
sql_type = 'INTEGER'
75+
elif dtype == 'float64':
76+
sql_type = 'FLOAT'
77+
elif dtype == 'bool':
78+
sql_type = 'BOOLEAN'
79+
elif dtype == 'datetime64[ns]':
80+
sql_type = 'TIMESTAMP'
81+
else:
82+
sql_type = 'TEXT'
83+
84+
alter_query = f'ALTER TABLE {table_name} ADD COLUMN {column} {sql_type};'
85+
try:
86+
connection.execute(alter_query)
87+
print(f"Column added: {column} ({sql_type})")
88+
except Exception as e:
89+
print(f"Error adding column {column}: {e}")
90+
91+
# Insert the data into the table
92+
data.to_sql(table_name, connection, if_exists='append', index=False)
93+
print(f"Data for {table_name} dumped successfully into the table.")
94+
95+
except Exception as e:
96+
# Log the error if the data dump fails
97+
print(f"Failed to dump data into {table_name}: {e}")

0 commit comments

Comments
 (0)