Skip to content

Commit 4769d3e

Browse files
authored
Merge pull request #4 from tswast/airflow-demo
dags for uploading to bigquery with bigframes
2 parents 4f10073 + 03988f8 commit 4769d3e

File tree

2 files changed

+314
-0
lines changed

2 files changed

+314
-0
lines changed
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""
16+
An example DAG for loading data from the US Census using BigQuery DataFrames
17+
(aka bigframes). This DAG uses PythonVirtualenvOperator for environments where
18+
bigframes can't be installed for use from PythonOperator.
19+
20+
I have tested this DAG on Cloud Composer 3 with Apache Airflow 2.10.5.
21+
22+
For local development:
23+
24+
pip install 'apache-airflow[google]==2.10.5' bigframes
25+
"""
26+
27+
28+
import datetime
29+
30+
from airflow import models
31+
from airflow.operators import bash
32+
from airflow.operators.python import (
33+
PythonOperator,
34+
)
35+
36+
37+
default_dag_args = {
38+
# The start_date describes when a DAG is valid / can be run. Set this to a
39+
# fixed point in time rather than dynamically, since it is evaluated every
40+
# time a DAG is parsed. See:
41+
# https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
42+
"start_date": datetime.datetime(2025, 6, 30),
43+
}
44+
45+
GCS_LOCATION = "gs://us-central1-bigframes-orche-b70f2a52-bucket/data/us-census/cc-est2024-agesex-all.csv"
46+
47+
# Define a DAG (directed acyclic graph) of tasks.
48+
# Any task you create within the context manager is automatically added to the
49+
# DAG object.
50+
with models.DAG(
51+
"census_from_http_to_bigquery_python_operator_once",
52+
schedule_interval="@once",
53+
default_args=default_dag_args,
54+
) as dag:
55+
download_upload = bash.BashOperator(
56+
task_id="download_upload",
57+
# See
58+
# https://www.census.gov/data/tables/time-series/demo/popest/2020s-counties-detail.html
59+
# for file paths and methodologies.
60+
bash_command=f"""
61+
wget https://www2.census.gov/programs-surveys/popest/datasets/2020-2024/counties/asrh/cc-est2024-agesex-all.csv -P ~;
62+
gcloud storage cp ~/cc-est2024-agesex-all.csv {GCS_LOCATION}
63+
""",
64+
)
65+
66+
def callable_virtualenv():
67+
"""
68+
Example function that will be performed in a virtual environment.
69+
70+
Importing at the module level ensures that it will not attempt to import the
71+
library before it is installed.
72+
"""
73+
import datetime
74+
75+
import bigframes.pandas as bpd
76+
77+
BIGQUERY_DESTINATION = "swast-scratch.airflow_demo.us_census_by_county2020_to_present"
78+
GCS_LOCATION = "gs://us-central1-bigframes-orche-b70f2a52-bucket/data/us-census/cc-est2024-agesex-all.csv"
79+
80+
#=============================
81+
# Setup bigframes
82+
#=============================
83+
84+
# Recommended: Partial ordering mode enables the best performance.
85+
bpd.options.bigquery.ordering_mode = "partial"
86+
87+
# Recommended: Fail the operator if it accidentally downloads too many
88+
# rows to the client-side from BigQuery. This can prevent your operator
89+
# from using too much memory.
90+
bpd.options.compute.maximum_result_rows = 10_000
91+
92+
# Optional. An explicit project ID is not needed if the project can be
93+
# determined from the environment, such as in Cloud Composer, Google
94+
# Compute Engine, or if authenicated with the gcloud application-default
95+
# commands.
96+
# bpd.options.bigquery.project = "my-project-id"
97+
98+
try:
99+
# By loading with the BigQuery engine, you can avoid having to read
100+
# the file into memory. This is because BigQuery is responsible for
101+
# parsing the file.
102+
df = bpd.read_csv(GCS_LOCATION, engine="bigquery")
103+
104+
# Perform preprocessing. For example, you can map some coded data
105+
# into a form that is easier to understand.
106+
df_dates = df.assign(
107+
ESTIMATE_DATE=df["YEAR"].case_when(
108+
caselist=[
109+
(df["YEAR"].eq(1), datetime.date(2020, 4, 1)),
110+
(df["YEAR"].eq(2), datetime.date(2020, 7, 1)),
111+
(df["YEAR"].eq(3), datetime.date(2021, 7, 1)),
112+
(df["YEAR"].eq(4), datetime.date(2022, 7, 1)),
113+
(df["YEAR"].eq(5), datetime.date(2023, 7, 1)),
114+
(df["YEAR"].eq(6), datetime.date(2024, 7, 1)),
115+
(True, None),
116+
]
117+
),
118+
).drop(columns=["YEAR"])
119+
120+
# TODO(developer): Add additional processing and cleanup as needed.
121+
122+
# One of the benefits of using BigQuery DataFrames in your operators is
123+
# that it makes it easy to perform data validations.
124+
#
125+
# Note: cache() is optional, but if any of the preprocessing above is
126+
# complicated, it hints to BigQuery DataFrames to run those first and
127+
# avoid duplicating work.
128+
df_dates.cache()
129+
row_count, column_count = df_dates.shape
130+
assert row_count > 0
131+
assert column_count > 0
132+
assert not df_dates["ESTIMATE_DATE"].hasnans
133+
134+
# TODO(developer): Add additional validations as needed.
135+
136+
# Now that you have validated the data, it should be safe to write
137+
# to the final destination table.
138+
df_dates.to_gbq(
139+
BIGQUERY_DESTINATION,
140+
if_exists="replace",
141+
clustering_columns=["ESTIMATE_DATE", "STATE", "COUNTY"],
142+
)
143+
finally:
144+
# Closing the session is optional. Any temporary tables created
145+
# should be automatically cleaned up when the BigQuery Session
146+
# closes after 24 hours, but closing the session explicitly can help
147+
# save on storage costs.
148+
bpd.close_session()
149+
150+
bf_to_gbq = PythonOperator(
151+
task_id="bf_to_gbq",
152+
python_callable=callable_virtualenv,
153+
)
154+
155+
156+
download_upload >> bf_to_gbq
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""
16+
An example DAG for loading data from the US Census using BigQuery DataFrames
17+
(aka bigframes). This DAG uses PythonVirtualenvOperator for environments where
18+
bigframes can't be installed for use from PythonOperator.
19+
20+
I have tested this DAG on Cloud Composer 3 with Apache Airflow 2.10.5.
21+
22+
For local development:
23+
24+
pip install 'apache-airflow[google]==2.10.5' bigframes
25+
"""
26+
27+
28+
import datetime
29+
30+
from airflow import models
31+
from airflow.operators import bash
32+
from airflow.operators.python import (
33+
PythonVirtualenvOperator,
34+
)
35+
36+
37+
default_dag_args = {
38+
# The start_date describes when a DAG is valid / can be run. Set this to a
39+
# fixed point in time rather than dynamically, since it is evaluated every
40+
# time a DAG is parsed. See:
41+
# https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
42+
"start_date": datetime.datetime(2025, 6, 30),
43+
}
44+
45+
GCS_LOCATION = "gs://us-central1-bigframes-orche-b70f2a52-bucket/data/us-census/cc-est2024-agesex-all.csv"
46+
47+
# Define a DAG (directed acyclic graph) of tasks.
48+
# Any task you create within the context manager is automatically added to the
49+
# DAG object.
50+
with models.DAG(
51+
"census_from_http_to_bigquery_once",
52+
schedule_interval="@once",
53+
default_args=default_dag_args,
54+
) as dag:
55+
download_upload = bash.BashOperator(
56+
task_id="download_upload",
57+
# See
58+
# https://www.census.gov/data/tables/time-series/demo/popest/2020s-counties-detail.html
59+
# for file paths and methodologies.
60+
bash_command=f"""
61+
wget https://www2.census.gov/programs-surveys/popest/datasets/2020-2024/counties/asrh/cc-est2024-agesex-all.csv -P ~;
62+
gcloud storage cp ~/cc-est2024-agesex-all.csv {GCS_LOCATION}
63+
""",
64+
)
65+
66+
def callable_virtualenv():
67+
"""
68+
Example function that will be performed in a virtual environment.
69+
70+
Importing at the module level ensures that it will not attempt to import the
71+
library before it is installed.
72+
"""
73+
import datetime
74+
75+
import bigframes.pandas as bpd
76+
77+
BIGQUERY_DESTINATION = "swast-scratch.airflow_demo.us_census_by_county2020_to_present"
78+
GCS_LOCATION = "gs://us-central1-bigframes-orche-b70f2a52-bucket/data/us-census/cc-est2024-agesex-all.csv"
79+
80+
#=============================
81+
# Setup bigframes
82+
#=============================
83+
84+
# Recommended: Partial ordering mode enables the best performance.
85+
bpd.options.bigquery.ordering_mode = "partial"
86+
87+
# Recommended: Fail the operator if it accidentally downloads too many
88+
# rows to the client-side from BigQuery. This can prevent your operator
89+
# from using too much memory.
90+
bpd.options.compute.maximum_result_rows = 10_000
91+
92+
# Optional. An explicit project ID is not needed if the project can be
93+
# determined from the environment, such as in Cloud Composer, Google
94+
# Compute Engine, or if authenicated with the gcloud application-default
95+
# commands.
96+
# bpd.options.bigquery.project = "my-project-id"
97+
98+
try:
99+
# By loading with the BigQuery engine, you can avoid having to read
100+
# the file into memory. This is because BigQuery is responsible for
101+
# parsing the file.
102+
df = bpd.read_csv(GCS_LOCATION, engine="bigquery")
103+
104+
# Perform preprocessing. For example, you can map some coded data
105+
# into a form that is easier to understand.
106+
df_dates = df.assign(
107+
ESTIMATE_DATE=df["YEAR"].case_when(
108+
caselist=[
109+
(df["YEAR"].eq(1), datetime.date(2020, 4, 1)),
110+
(df["YEAR"].eq(2), datetime.date(2020, 7, 1)),
111+
(df["YEAR"].eq(3), datetime.date(2021, 7, 1)),
112+
(df["YEAR"].eq(4), datetime.date(2022, 7, 1)),
113+
(df["YEAR"].eq(5), datetime.date(2023, 7, 1)),
114+
(df["YEAR"].eq(6), datetime.date(2024, 7, 1)),
115+
(True, None),
116+
]
117+
),
118+
).drop(columns=["YEAR"])
119+
120+
# TODO(developer): Add additional processing and cleanup as needed.
121+
122+
# One of the benefits of using BigQuery DataFrames in your operators is
123+
# that it makes it easy to perform data validations.
124+
#
125+
# Note: cache() is optional, but if any of the preprocessing above is
126+
# complicated, it hints to BigQuery DataFrames to run those first and
127+
# avoid duplicating work.
128+
df_dates.cache()
129+
row_count, column_count = df_dates.shape
130+
assert row_count > 0
131+
assert column_count > 0
132+
assert not df_dates["ESTIMATE_DATE"].hasnans
133+
134+
# TODO(developer): Add additional validations as needed.
135+
136+
# Now that you have validated the data, it should be safe to write
137+
# to the final destination table.
138+
df_dates.to_gbq(
139+
BIGQUERY_DESTINATION,
140+
if_exists="replace",
141+
clustering_columns=["ESTIMATE_DATE", "STATE", "COUNTY"],
142+
)
143+
finally:
144+
# Closing the session is optional. Any temporary tables created
145+
# should be automatically cleaned up when the BigQuery Session
146+
# closes after 24 hours, but closing the session explicitly can help
147+
# save on storage costs.
148+
bpd.close_session()
149+
150+
bf_to_gbq = PythonVirtualenvOperator(
151+
task_id="bf_to_gbq",
152+
python_callable=callable_virtualenv,
153+
requirements=["bigframes==2.10.0"],
154+
system_site_packages=False,
155+
)
156+
157+
158+
download_upload >> bf_to_gbq

0 commit comments

Comments
 (0)