Skip to content

Commit 41f6ce2

Browse files
committed
Airflow: Implement suggestions by CodeRabbit, part 9
1 parent 0356ae7 commit 41f6ce2

File tree

3 files changed

+33
-26
lines changed

3 files changed

+33
-26
lines changed

docs/integrate/airflow/data-retention-hot-cold.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ Insert a sample row to validate shard allocation:
9898
INSERT INTO doc.raw_metrics (variable, timestamp, value, quality) VALUES ('water-flow', NOW() - '5 months'::INTERVAL, 12, 1);
9999
```
100100

101-
The `INSERT` implicitly creates a new partition with six shards. Because `cratedb01` and `cratedb02` are hot nodes, CrateDB allocates shards only on those nodes, not on `cratedb03` (cold). Verify this in the Admin UI under “Shards”:
101+
The `INSERT` implicitly creates a new partition with the table’s configured number of shards.
102+
Because `cratedb01` and `cratedb02` are hot nodes, CrateDB allocates shards only on those nodes, not on `cratedb03` (cold). Verify this in the Admin UI under “Shards”:
102103

103104
![CrateDB Admin UI “Shards” view showing primary and replica shards evenly distributed across hot nodes; no shards on the cold node](https://us1.discourse-cdn.com/flex020/uploads/crate/original/1X/ade3bbd61b56a642ee2493f2dca63a60cba7de1b.png){height=320px}
104105

@@ -135,7 +136,7 @@ Use a basic Astronomer/Airflow setup as described in the {ref}`first post <airfl
135136

136137
1. `get_policies`: A query on `doc.retention_policies` and `information_schema.table_partitions` identifies partitions affected by a retention policy.
137138
2. `map_policy`: A helper method transforming the output of `get_policies` into a Python `dict` data structure for easier handling.
138-
3. `reallocate_partitions`: Executes an SQL statement for each mapped policy: `ALTER TABLE <table> PARTITION (<partition key> = <partition value>) SET ("routing.allocation.require.storage" = 'cold');`
139+
3. `reallocate_partitions`: Executes an SQL statement for each mapped policy: `ALTER TABLE "<schema>"."<table>" PARTITION ("<partition key>" = <partition value>) SET ("routing.allocation.require.storage" = 'cold');`
139140

140141
CrateDB then automatically initiates relocation of the affected partition to a node that fulfills the requirement (`cratedb03` in this setup).
141142

@@ -168,5 +169,5 @@ INSERT INTO doc.retention_policies (table_schema, table_name, partition_column,
168169

169170
## Summary
170171

171-
Reallocation builds on the earlier data‑retention policy and requires a single SQL statement.
172-
CrateDB handles the low‑level movement automatically. A multi‑stage policy is straightforward: first reallocate, then delete.
172+
Reallocation builds on the earlier data‑retention policy and uses a single SQL statement.
173+
CrateDB handles the movement automatically. A multi‑stage policy is straightforward: first reallocate, then delete.

docs/integrate/airflow/data-retention-policy.md

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33

44
## What is a Data Retention Policy?
55

6-
A data retention policy describes the practice of storing and managing data for a designated period of time. Once a data set completes its retention period, it should be deleted or archived, depending on requirements. Implementing data retention policies in the right way ensures compliance with existing guidelines and regulations, such as data privacy law, optimizes storage space by discarding outdated data and reduces storage costs.
6+
A data retention policy defines how long to keep data and what to do when it expires. Implement it to comply with dataprivacy rules, reduce storage, and cut costs.
77

88
## Specification of a Data Retention Policy in CrateDB
99

10-
In the [previous guide](https://community.cratedb.com/t/cratedb-and-apache-airflow-part-one/901), we illustrated how to use CrateDB and Apache Airflow to automate periodic data export to a remote filesystem with the infrastructure provided by [Astronomer](https://www.astronomer.io/).
10+
The {ref}`previous guide <airflow-export-s3>` shows how to use CrateDB and Apache Airflow to automate periodic data export to a remote filesystem on [Astronomer](https://www.astronomer.io/).
1111
In this guide, we focus on a more complex use case: the implementation of an effective retention policy for time-series data. To define retention policies we create a new table in CrateDB with the following schema:
1212

1313
```sql
@@ -20,8 +20,8 @@ CREATE TABLE IF NOT EXISTS "doc"."retention_policies" (
2020
PRIMARY KEY ("table_schema", "table_name")
2121
);
2222
```
23-
The retention policy requires the use of a partitioned table, as in CrateDB, data can be deleted in an efficient way by dropping partitions. Therefore, for each retention policy, we store table schema, table name, the partition column, and the retention period defining how many days data should be retained.
24-
The `strategy` column is reserved for future implementations of additional data retention policies. For now, we will always set it to the value `delete`.
23+
A retention policy assumes a partitioned table because CrateDB can delete data efficiently by dropping partitions. For each policy, store the table schema, table name, partition column, and the retention period in days.
24+
Use the `strategy` column for future retention strategies. For now, set it to `delete`.
2525

2626
Next, define the table for storing demo data:
2727

@@ -37,7 +37,7 @@ CREATE TABLE IF NOT EXISTS "doc"."raw_metrics" (
3737
PARTITIONED BY ("ts_day");
3838
```
3939

40-
You may also use a different table. The important part is that data should be partitioned: in our case, we partition the table on the `ts_day` column. Finally, we store the retention policy of 1 day for demo data in the `retention_policies` table:
40+
You can use a different table. Ensure the table is partitioned; here, we partition on `ts_day`. Finally, insert a 1‑day demo policy into `retention_policies`:
4141

4242
```sql
4343
INSERT INTO retention_policies (table_schema, table_name, partition_column, retention_period, strategy) VALUES ('doc', 'raw_metrics', 'ts_day', 1, 'delete');
@@ -48,7 +48,8 @@ INSERT INTO retention_policies (table_schema, table_name, partition_column, rete
4848
Use [Apache Airflow](https://airflow.apache.org/) to automate deletions. Once a day, fetch policies from the database and delete data whose retention period expired.
4949

5050
### Retrieving Retention Policies
51-
The first step consists of a task that queries partitions affected by retention policies. We do this by joining `retention_policies` and `information_schema.table_partitions` tables and selecting values with expired retention periods. In CrateDB, `information_schema.table_partitions` [{ref}`documentation <crate-reference:is_table_partitions>`] contains information about all partitioned tables including the name of the table, schema, partition column, and the values of the partition.
51+
52+
Create a task that queries partitions affected by retention policies.
5253
The resulting query is constructed as:
5354
```sql
5455
SELECT QUOTE_IDENT(p.table_schema) || '.' || QUOTE_IDENT(p.table_name),
@@ -63,7 +64,7 @@ WHERE r.strategy = 'delete';
6364
To separate SQL logic from orchestration logic, we save the query as a file to `include/data_retention_retrieve_delete_policies.sql`.
6465

6566
In the query, we use the `%(day)s` placeholder which will be substituted with the logical execution date. This is especially useful in case of failing workflow: the next time Airflow will pick up the date on which the job failed. This makes job runs consistent.
66-
To implement the query above we use a regular Python method, annotated with `@task` to make it executable by Airflow. The most important reason behind choosing this type of operator is the need to pass the query result to the next operator. In our case that would be the list of affected partitions. However, it would be natural to expect that we want to execute a query on CrateDB as an `SQLExecuteQueryOperator`, but since this operator always returns `None` as a result, we would not be able to access the query result outside the operator.
67+
Implement the query as a Python function decorated with `@task`.
6768

6869
The implementation of the corresponding tasks looks as follows:
6970
```python
@@ -80,12 +81,14 @@ def get_policies(ds=None):
8081
The first step is to create the function `get_policies` that takes as a parameter the logical date. The SQL statement gets loaded from a file. The `PostgresHook` establishes the connection with CrateDB. A `PostgresHook` takes the information from the `postgres_conn_id` and hooks us up with the CrateDB service. Then, the function executes the query and returns the result.
8182

8283
### Cross-Communication Between Tasks
83-
Before we continue into the implementation of the next task in Apache Airflow, we would like to give a brief overview of how the data is communicated between different tasks in a DAG. For this purpose, Airflow introduces the [XCom](https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html) system. Simply speaking `XCom` can be seen as a small object with storage that allows tasks to `push` data into that storage that can be later used by a different task in the DAG.
84+
85+
Before implementing the next task, briefly review how tasks exchange data in a DAG.
8486

8587
XCom exchanges a small amount of data between tasks. Since Airflow 2.0, a Python task’s return value is stored in XCom. In our case, `get_policies` returns the partitions; the next task reads them via a reference to `get_policies` when defining dependencies.
8688

8789
### Applying Retention Policies
88-
Now that we retrieved the policies and Airflow automatically saved them via `XCom`, we need to create another task that will go through each element in the list and delete expired data.
90+
91+
After retrieving the policies (stored in XCom), create another task that iterates over the list and deletes expired data.
8992

9093
The `get_policies` task returns tuples with a positional index. As this makes further processing not very readable, we map tuples to a list with named indexes:
9194
```python
@@ -97,7 +100,8 @@ def map_policy(policy):
97100
}
98101
```
99102

100-
In the DAG’s main method, use Airflow’s [dynamic task mapping](https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html) to execute the same task several times with different parameters:
103+
In the DAG’s main method, use Airflow’s [dynamic task mapping]
104+
to execute the same task several times with different parameters:
101105

102106
```python
103107
SQLExecuteQueryOperator.partial(
@@ -154,11 +158,13 @@ The full DAG implementation of the data retention policy can be found in our [Gi
154158

155159
## Summary
156160

157-
This guide introduced you on how to delete data with expired retention policies.
158-
The first part shows how to design policies in CrateDB and then, how to use
159-
Apache Airflow to automate data deletion.
161+
This guide introduced you on how to delete data whose retention period expired.
162+
First, design policies in CrateDB. Then use Apache Airflow to automate the deletion.
163+
164+
The DAG implementation is straightforward: one task extracts relevant policies;
165+
another one deletes the affected partitions.
166+
The {ref}`next guide <airflow-data-retention-hot-cold>` covers another real‑world
167+
example automated with Apache Airflow and CrateDB.
168+
160169

161-
The DAG implementation is fairly simple: the first task performs the extraction
162-
of relevant policies, while the second task makes sure that affected partitions
163-
are deleted. In the following guide, we will focus on another real-world
164-
example that can be automated with Apache Airflow and CrateDB.
170+
[dynamic task mapping]: https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html

docs/integrate/airflow/index.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,13 @@ Astro is a managed Airflow service by [Astronomer].
6969
Define an Airflow DAG that downloads, processes, and stores data in CrateDB.
7070
:::
7171

72-
:::{grid-item-card} Guide: Import Parquet files
72+
:::{grid-item-card} Import Parquet files
7373
:link: airflow-import-parquet
7474
:link-type: ref
7575
Define an Airflow DAG to import a Parquet file from S3 into CrateDB.
7676
:::
7777

78-
:::{grid-item-card} Guide: Load stock market data
78+
:::{grid-item-card} Load stock market data
7979
:link: airflow-import-stock-market-data
8080
:link-type: ref
8181
Define an Airflow DAG to download, process, and store stock market data
@@ -91,20 +91,20 @@ into CrateDB.
9191
::::{grid} 3
9292
:gutter: 2
9393

94-
:::{grid-item-card} Guide: Export to S3
94+
:::{grid-item-card} Export to S3
9595
:link: airflow-export-s3
9696
:link-type: ref
9797
Export data from CrateDB to S3 on a schedule.
9898
:::
9999

100-
:::{grid-item-card} Guide: Implement a data retention policy
100+
:::{grid-item-card} Implement a data retention policy
101101
:link: airflow-data-retention-policy
102102
:link-type: ref
103103
An effective retention policy for time-series data, relating to the practice of
104104
storing and managing data for a designated period of time.
105105
:::
106106

107-
:::{grid-item-card} Guide: Implement a hot and cold storage data retention policy
107+
:::{grid-item-card} Implement a hot and cold storage data retention policy
108108
:link: airflow-data-retention-hot-cold
109109
:link-type: ref
110110
A hot/cold storage strategy is often motivated by a tradeoff between performance

0 commit comments

Comments
 (0)