Skip to content

Commit d02f828

Browse files
authored
make dag where operators are split (#8)
* make dag where operators are split * add missing task id
1 parent aea84da commit d02f828

File tree

2 files changed

+158
-10
lines changed

2 files changed

+158
-10
lines changed

2025/census-data-airflow-bigframes/census_to_bigquery.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
"start_date": datetime.datetime(2025, 6, 30),
4343
}
4444

45-
GCS_LOCATION = "gs://us-central1-bigframes-orche-b70f2a52-bucket/data/us-census/cc-est2024-agesex-all.csv"
45+
GCS_LOCATION = "gs://us-central1-bigframes-orche-5b3ec9ed-bucket/data/us-census/cc-est2024-agesex-all.csv"
4646

4747
# Define a DAG (directed acyclic graph) of tasks.
4848
# Any task you create within the context manager is automatically added to the
@@ -63,19 +63,13 @@
6363
""",
6464
)
6565

66-
def callable_virtualenv():
67-
"""
68-
Example function that will be performed in a virtual environment.
69-
70-
Importing at the module level ensures that it will not attempt to import the
71-
library before it is installed.
72-
"""
66+
def callable_python():
7367
import datetime
7468

7569
import bigframes.pandas as bpd
7670

7771
BIGQUERY_DESTINATION = "swast-scratch.airflow_demo.us_census_by_county2020_to_present"
78-
GCS_LOCATION = "gs://us-central1-bigframes-orche-b70f2a52-bucket/data/us-census/cc-est2024-agesex-all.csv"
72+
GCS_LOCATION = "gs://us-central1-bigframes-orche-5b3ec9ed-bucket/data/us-census/cc-est2024-agesex-all.csv"
7973

8074
#=============================
8175
# Setup bigframes
@@ -149,7 +143,7 @@ def callable_virtualenv():
149143

150144
bf_to_gbq = PythonOperator(
151145
task_id="bf_to_gbq",
152-
python_callable=callable_virtualenv,
146+
python_callable=callable_python,
153147
)
154148

155149

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""
16+
An example DAG for loading data from the US Census using BigQuery DataFrames
17+
(aka bigframes). This DAG uses PythonVirtualenvOperator for environments where
18+
bigframes can't be installed for use from PythonOperator.
19+
20+
I have tested this DAG on Cloud Composer 3 with Apache Airflow 2.10.5.
21+
22+
For local development:
23+
24+
pip install 'apache-airflow[google]==2.10.5' bigframes
25+
"""
26+
27+
import datetime
28+
import json
29+
30+
from airflow import models
31+
from airflow.operators import bash
32+
from airflow.operators.python import (
33+
PythonOperator,
34+
)
35+
from airflow.providers.google.cloud.operators.bigquery import BigQueryDeleteTableOperator
36+
37+
38+
default_dag_args = {
39+
# The start_date describes when a DAG is valid / can be run. Set this to a
40+
# fixed point in time rather than dynamically, since it is evaluated every
41+
# time a DAG is parsed. See:
42+
# https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
43+
"start_date": datetime.datetime(2025, 6, 30),
44+
}
45+
46+
GCS_LOCATION = "gs://us-central1-bigframes-orche-5b3ec9ed-bucket/data/us-census/cc-est2024-agesex-all.csv"
47+
48+
# Define a DAG (directed acyclic graph) of tasks.
49+
# Any task you create within the context manager is automatically added to the
50+
# DAG object.
51+
with models.DAG(
52+
"census_from_http_to_bigquery_split_once",
53+
schedule_interval="@once",
54+
default_args=default_dag_args,
55+
) as dag:
56+
download_upload = bash.BashOperator(
57+
task_id="download_upload",
58+
# See
59+
# https://www.census.gov/data/tables/time-series/demo/popest/2020s-counties-detail.html
60+
# for file paths and methodologies.
61+
bash_command=f"""
62+
wget https://www2.census.gov/programs-surveys/popest/datasets/2020-2024/counties/asrh/cc-est2024-agesex-all.csv -P ~;
63+
gcloud storage cp ~/cc-est2024-agesex-all.csv {GCS_LOCATION}
64+
""",
65+
)
66+
67+
def preprocess(task_instance):
68+
import datetime
69+
70+
import bigframes.pandas as bpd
71+
72+
GCS_LOCATION = "gs://us-central1-bigframes-orche-5b3ec9ed-bucket/data/us-census/cc-est2024-agesex-all.csv"
73+
74+
bpd.options.bigquery.ordering_mode = "partial"
75+
bpd.options.compute.maximum_result_rows = 10_000
76+
# bpd.options.bigquery.project = "my-project-id"
77+
78+
try:
79+
# By loading with the BigQuery engine, you can avoid having to read
80+
# the file into memory. This is because BigQuery is responsible for
81+
# parsing the file.
82+
df = bpd.read_csv(GCS_LOCATION, engine="bigquery")
83+
84+
# Perform preprocessing. For example, you can map some coded data
85+
# into a form that is easier to understand.
86+
df_dates = df.assign(
87+
ESTIMATE_DATE=df["YEAR"].case_when(
88+
caselist=[
89+
(df["YEAR"].eq(1), datetime.date(2020, 4, 1)),
90+
(df["YEAR"].eq(2), datetime.date(2020, 7, 1)),
91+
(df["YEAR"].eq(3), datetime.date(2021, 7, 1)),
92+
(df["YEAR"].eq(4), datetime.date(2022, 7, 1)),
93+
(df["YEAR"].eq(5), datetime.date(2023, 7, 1)),
94+
(df["YEAR"].eq(6), datetime.date(2024, 7, 1)),
95+
(True, None),
96+
]
97+
),
98+
).drop(columns=["YEAR"])
99+
100+
task_instance.xcom_push(key="census_preprocessed_table", value=df_dates.to_gbq())
101+
finally:
102+
bpd.close_session()
103+
104+
def validate_and_write(task_instance):
105+
import bigframes.pandas as bpd
106+
bigquery_destination = "swast-scratch.airflow_demo.us_census_by_county2020_to_present"
107+
108+
bpd.options.bigquery.ordering_mode = "partial"
109+
bpd.options.compute.maximum_result_rows = 10_000
110+
# bpd.options.bigquery.project = "my-project-id"
111+
112+
try:
113+
# Get the table ID from the previous step.
114+
bigquery_source = task_instance.xcom_pull(
115+
task_ids="bf_preprocess",
116+
key="census_preprocessed_table",
117+
)
118+
df_dates = bpd.read_gbq(bigquery_source)
119+
120+
row_count, column_count = df_dates.shape
121+
assert row_count > 0
122+
assert column_count > 0
123+
assert not df_dates["ESTIMATE_DATE"].hasnans
124+
125+
# TODO(developer): Add additional validations as needed.
126+
127+
df_dates.to_gbq(
128+
bigquery_destination,
129+
if_exists="replace",
130+
clustering_columns=["ESTIMATE_DATE", "STATE", "COUNTY"],
131+
)
132+
finally:
133+
bpd.close_session()
134+
135+
136+
bf_preprocess = PythonOperator(
137+
task_id="bf_preprocess",
138+
python_callable=preprocess,
139+
)
140+
141+
bf_validate_and_write = PythonOperator(
142+
task_id="bf_validate_and_write",
143+
python_callable=validate_and_write,
144+
)
145+
146+
cleanup_preprocess_table = BigQueryDeleteTableOperator(
147+
task_id="cleanup_preprocess_table",
148+
deletion_dataset_table="{{ task_instance.xcom_pull(task_ids='bf_preprocess', key='census_preprocessed_table' }}",
149+
# Always execute, even if the previous task failed.
150+
# https://stackoverflow.com/a/44441890/101923
151+
trigger_rule="all_done",
152+
)
153+
154+
download_upload >> bf_preprocess >> bf_validate_and_write >> cleanup_preprocess_table

0 commit comments

Comments
 (0)