|
3 | 3 |
|
4 | 4 | The Airflow KubernetesPodOperator starts a Docker container that includes the Docker engine and the CWL libraries. |
5 | 5 | The "cwl-runner" tool is invoked to execute the CWL workflow. |
6 | | -Parameter cwl_workflow: the URL of the CWL workflow to execute. |
7 | | -Parameter args_as_json: JSON string contained the specific values for the workflow specific inputs. |
| 6 | +Parameter stage_in_args: The stage in job parameters encoded as a JSON string |
| 7 | +Parameter process_workflow: the URL of the CWL workflow to execute. |
| 8 | +Parameter process_args: JSON string contained the specific values for the processing workflow specific inputs. |
| 9 | +Parameter stage_out_bucket: The S3 bucket to stage data out to. |
| 10 | +Parameter collection_id: The output collection identifier for processed data. |
8 | 11 | """ |
9 | 12 |
|
10 | 13 | import json |
|
25 | 28 | from airflow import DAG |
26 | 29 |
|
27 | 30 | # Task constants |
28 | | -UNITY_STAGE_IN_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/unity-data-services/refs/heads/cwl-examples/cwl/stage-in-unity/stage-in-workflow.cwl" |
29 | | -DAAC_STAGE_IN_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/unity-data-services/refs/heads/cwl-examples/cwl/stage-in-daac/stage-in-workflow.cwl" |
| 31 | +STAGE_IN_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/unity-sps-workflows/refs/heads/220-stage-in-task/demos/cwl_dag_modular_stage_in.cwl" |
| 32 | +STAGE_OUT_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/unity-sps-workflows/refs/heads/220-stage-in-task/demos/cwl_dag_modular_stage_out.cwl" |
30 | 33 | LOCAL_DIR = "/shared-task-data" |
31 | 34 |
|
32 | 35 | # The path of the working directory where the CWL workflow is executed |
|
35 | 38 | WORKING_DIR = "/scratch" |
36 | 39 |
|
37 | 40 | # Default parameters |
38 | | -DEFAULT_CWL_WORKFLOW = ( |
39 | | - "https://raw.githubusercontent.com/unity-sds/unity-sps-workflows/main/demos/echo_message.cwl" |
| 41 | +DEFAULT_STAC_JSON = "https://raw.githubusercontent.com/unity-sds/unity-tutorial-application/refs/heads/main/test/stage_in/stage_in_results.json" |
| 42 | +DEFAULT_PROCESS_WORKFLOW = ( |
| 43 | + "https://raw.githubusercontent.com/mike-gangl/unity-OGC-example-application/refs/heads/main/process.cwl" |
40 | 44 | ) |
41 | | -DEFAULT_CWL_ARGUMENTS = json.dumps({"message": "Hello Unity"}) |
42 | | -DEFAULT_STAC_JSON_URL = "https://cmr.earthdata.nasa.gov/stac/LPCLOUD/collections/EMITL1BRAD_001/items?limit=2" |
43 | | -DEFAULT_INPUT_LOCATION = "daac" |
44 | | - |
| 45 | +DEFAULT_PROCESS_ARGS = json.dumps({"example_argument_empty": ""}) |
45 | 46 |
|
46 | 47 | # Alternative arguments to execute SBG Pre-Process |
47 | | -# DEFAULT_CWL_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl" |
48 | | -# DEFAULT_CWL_ARGUMENTS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.dev.yml" |
| 48 | +# DEFAULT_PROCESS_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl" |
| 49 | +# DEFAULT_PROCESS_ARGS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.dev.yml" |
49 | 50 |
|
50 | 51 | # Alternative arguments to execute SBG end-to-end |
51 | | -# DEFAULT_CWL_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/L1-to-L2-e2e.cwl" |
52 | | -# DEFAULT_CWL_ARGUMENTS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/L1-to-L2-e2e.dev.yml" |
| 52 | +# DEFAULT_PROCESS_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/L1-to-L2-e2e.cwl" |
| 53 | +# DEFAULT_PROCESS_ARGS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/L1-to-L2-e2e.dev.yml" |
53 | 54 |
|
54 | 55 | # Alternative arguments to execute SBG end-to-end |
55 | 56 | # unity_sps_sbg_debug.txt |
|
67 | 68 | # "ephemeral-storage": "30Gi" |
68 | 69 | # }, |
69 | 70 | ) |
70 | | -STAGE_IN_CONTAINER_RESOURCES = k8s.V1ResourceRequirements( |
71 | | - requests={ |
72 | | - "memory": "4Gi", |
73 | | - "cpu": "4", |
74 | | - "ephemeral-storage": "{{ params.request_storage }}", |
75 | | - } |
76 | | -) |
77 | 71 |
|
78 | 72 | # Default DAG configuration |
79 | 73 | dag_default_args = { |
|
95 | 89 | max_active_tasks=30, |
96 | 90 | default_args=dag_default_args, |
97 | 91 | params={ |
98 | | - "cwl_workflow": Param( |
99 | | - DEFAULT_CWL_WORKFLOW, type="string", title="CWL workflow", description="The CWL workflow URL" |
| 92 | + "stac_json": Param( |
| 93 | + DEFAULT_STAC_JSON, |
| 94 | + type="string", |
| 95 | + title="STAC JSON", |
| 96 | + description="STAC JSON data to download granules encoded as a JSON string or the URL of a JSON or YAML file", |
| 97 | + ), |
| 98 | + "process_workflow": Param( |
| 99 | + DEFAULT_PROCESS_WORKFLOW, |
| 100 | + type="string", |
| 101 | + title="Processing workflow", |
| 102 | + description="The processing workflow URL", |
100 | 103 | ), |
101 | | - "cwl_args": Param( |
102 | | - DEFAULT_CWL_ARGUMENTS, |
| 104 | + "process_args": Param( |
| 105 | + DEFAULT_PROCESS_ARGS, |
103 | 106 | type="string", |
104 | | - title="CWL workflow parameters", |
105 | | - description=("The job parameters encoded as a JSON string," "or the URL of a JSON or YAML file"), |
| 107 | + title="Processing workflow parameters", |
| 108 | + description=( |
| 109 | + "The processing job parameters encoded as a JSON string," "or the URL of a JSON or YAML file" |
| 110 | + ), |
106 | 111 | ), |
107 | 112 | "request_memory": Param( |
108 | 113 | "4Gi", |
|
123 | 128 | title="Docker container storage", |
124 | 129 | ), |
125 | 130 | "use_ecr": Param(False, type="boolean", title="Log into AWS Elastic Container Registry (ECR)"), |
126 | | - "stac_json_url": Param( |
127 | | - DEFAULT_STAC_JSON_URL, |
128 | | - type="string", |
129 | | - title="STAC JSON URL", |
130 | | - description="The URL to the STAC JSON document", |
131 | | - ), |
132 | | - "input_location": Param( |
133 | | - DEFAULT_INPUT_LOCATION, |
134 | | - type="string", |
135 | | - enum=["daac", "unity"], |
136 | | - title="Input data location", |
137 | | - description="Indicate whether input data should be retrieved from a DAAC or Unity", |
138 | | - ), |
139 | 131 | }, |
140 | 132 | ) |
141 | 133 |
|
142 | 134 |
|
143 | | -def setup(ti=None, **context): |
| 135 | +def create_local_dir(dag_run_id): |
144 | 136 | """ |
145 | | - Task that creates the working directory on the shared volume |
146 | | - and parses the input parameter values. |
| 137 | + Create local directory for working DAG data. |
147 | 138 | """ |
148 | | - context = get_current_context() |
149 | | - dag_run_id = context["dag_run"].run_id |
150 | 139 | local_dir = f"{LOCAL_DIR}/{dag_run_id}" |
151 | | - logging.info(f"Creating directory: {local_dir}") |
152 | 140 | os.makedirs(local_dir, exist_ok=True) |
153 | 141 | logging.info(f"Created directory: {local_dir}") |
154 | 142 |
|
155 | | - # select the node pool based on what resources were requested |
| 143 | + |
| 144 | +def select_node_pool(ti, request_storage, request_memory, request_cpu): |
| 145 | + """ |
| 146 | + Select node pool based on resources requested in input parameters. |
| 147 | + """ |
156 | 148 | node_pool = unity_sps_utils.NODE_POOL_DEFAULT |
157 | | - storage = context["params"]["request_storage"] # 100Gi |
158 | | - storage = int(storage[0:-2]) # 100 |
159 | | - memory = context["params"]["request_memory"] # 32Gi |
160 | | - memory = int(memory[0:-2]) # 32 |
161 | | - cpu = int(context["params"]["request_cpu"]) # 8 |
| 149 | + storage = int(request_storage[0:-2]) # 100Gi -> 100 |
| 150 | + memory = int(request_memory[0:-2]) # 32Gi -> 32 |
| 151 | + cpu = int(request_cpu) # 8 |
162 | 152 |
|
163 | 153 | logging.info(f"Requesting storage={storage}Gi memory={memory}Gi CPU={cpu}") |
164 | 154 | if (storage > 30) or (memory > 32) or (cpu > 8): |
165 | 155 | node_pool = unity_sps_utils.NODE_POOL_HIGH_WORKLOAD |
166 | 156 | logging.info(f"Selecting node pool={node_pool}") |
167 | 157 | ti.xcom_push(key="node_pool_processing", value=node_pool) |
168 | 158 |
|
169 | | - # select "use_ecr" argument and determine if ECR login is required |
170 | | - logging.info("Use ECR: %s", context["params"]["use_ecr"]) |
171 | | - if context["params"]["use_ecr"]: |
| 159 | + |
| 160 | +def select_ecr(ti, use_ecr): |
| 161 | + """ |
| 162 | + Determine if ECR login is required. |
| 163 | + """ |
| 164 | + logging.info("Use ECR: %s", use_ecr) |
| 165 | + if use_ecr: |
172 | 166 | ecr_login = os.environ["AIRFLOW_VAR_ECR_URI"] |
173 | 167 | ti.xcom_push(key="ecr_login", value=ecr_login) |
174 | 168 | logging.info("ECR login: %s", ecr_login) |
175 | 169 |
|
176 | | - # define stage in arguments |
177 | | - stage_in_args = {"download_dir": "input", "stac_json": context["params"]["stac_json_url"]} |
178 | 170 |
|
179 | | - # select stage in workflow based on input location |
180 | | - if context["params"]["input_location"] == "daac": |
181 | | - stage_in_workflow = DAAC_STAGE_IN_WORKFLOW |
182 | | - else: |
183 | | - stage_in_workflow = UNITY_STAGE_IN_WORKFLOW |
184 | | - ssm_client = boto3.client("ssm", region_name="us-west-2") |
185 | | - ss_acct_num = ssm_client.get_parameter(Name=unity_sps_utils.SS_ACT_NUM, WithDecryption=True)[ |
186 | | - "Parameter" |
187 | | - ]["Value"] |
188 | | - unity_client_id = ssm_client.get_parameter( |
189 | | - Name=f"arn:aws:ssm:us-west-2:{ss_acct_num}:parameter{unity_sps_utils.DS_CLIENT_ID_PARAM}", |
190 | | - WithDecryption=True, |
191 | | - )["Parameter"]["Value"] |
192 | | - stage_in_args["unity_client_id"] = unity_client_id |
| 171 | +def select_stage_out(ti): |
| 172 | + """Retrieve stage out input parameters from SSM parameter store.""" |
| 173 | + ssm_client = boto3.client("ssm", region_name="us-west-2") |
193 | 174 |
|
194 | | - ti.xcom_push(key="stage_in_workflow", value=stage_in_workflow) |
195 | | - logging.info("Stage In workflow selected: %s", stage_in_workflow) |
| 175 | + project = os.environ["AIRFLOW_VAR_UNITY_PROJECT"] |
| 176 | + venue = os.environ["AIRFLOW_VAR_UNITY_VENUE"] |
| 177 | + staging_bucket = ssm_client.get_parameter(Name=unity_sps_utils.DS_S3_BUCKET_PARAM, WithDecryption=True)[ |
| 178 | + "Parameter" |
| 179 | + ]["Value"] |
196 | 180 |
|
197 | | - ti.xcom_push(key="stage_in_args", value=stage_in_args) |
198 | | - logging.info("Stage in arguments selected: %s", stage_in_args) |
| 181 | + stage_out_args = json.dumps({"project": project, "venue": venue, "staging_bucket": staging_bucket}) |
| 182 | + logging.info(f"Selecting stage out args={stage_out_args}") |
| 183 | + ti.xcom_push(key="stage_out_args", value=stage_out_args) |
199 | 184 |
|
200 | 185 |
|
201 | | -setup_task = PythonOperator(task_id="Setup", python_callable=setup, dag=dag) |
| 186 | +def setup(ti=None, **context): |
| 187 | + """ |
| 188 | + Task that creates the working directory on the shared volume |
| 189 | + and parses the input parameter values. |
| 190 | + """ |
| 191 | + context = get_current_context() |
202 | 192 |
|
| 193 | + # create local working directory |
| 194 | + dag_run_id = context["dag_run"].run_id |
| 195 | + create_local_dir(dag_run_id) |
203 | 196 |
|
204 | | -cwl_task_stage_in = unity_sps_utils.SpsKubernetesPodOperator( |
205 | | - retries=0, |
206 | | - task_id="cwl_task_stage_in", |
207 | | - namespace=unity_sps_utils.POD_NAMESPACE, |
208 | | - name="cwl-task-pod", |
209 | | - image=unity_sps_utils.SPS_DOCKER_CWL_IMAGE, |
210 | | - service_account_name="airflow-worker", |
211 | | - in_cluster=True, |
212 | | - get_logs=True, |
213 | | - startup_timeout_seconds=1800, |
214 | | - arguments=[ |
215 | | - "-w", |
216 | | - "{{ ti.xcom_pull(task_ids='Setup', key='stage_in_workflow') }}", |
217 | | - "-j", |
218 | | - "{{ ti.xcom_pull(task_ids='Setup', key='stage_in_args') }}", |
219 | | - "-e", |
220 | | - "{{ ti.xcom_pull(task_ids='Setup', key='ecr_login') }}", |
221 | | - ], |
222 | | - container_security_context={"privileged": True}, |
223 | | - container_resources=STAGE_IN_CONTAINER_RESOURCES, |
224 | | - container_logs=True, |
225 | | - volume_mounts=[ |
226 | | - k8s.V1VolumeMount(name="workers-volume", mount_path=WORKING_DIR, sub_path="{{ dag_run.run_id }}") |
227 | | - ], |
228 | | - volumes=[ |
229 | | - k8s.V1Volume( |
230 | | - name="workers-volume", |
231 | | - persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="airflow-kpo"), |
232 | | - ) |
233 | | - ], |
234 | | - dag=dag, |
235 | | - node_selector={"karpenter.sh/nodepool": unity_sps_utils.NODE_POOL_DEFAULT}, |
236 | | - labels={"app": unity_sps_utils.POD_LABEL}, |
237 | | - annotations={"karpenter.sh/do-not-disrupt": "true"}, |
238 | | - # note: 'affinity' cannot yet be templated |
239 | | - affinity=unity_sps_utils.get_affinity( |
240 | | - capacity_type=["spot"], |
241 | | - # instance_type=["t3.2xlarge"], |
242 | | - anti_affinity_label=unity_sps_utils.POD_LABEL, |
243 | | - ), |
244 | | - on_finish_action="keep_pod", |
245 | | - is_delete_operator_pod=False, |
246 | | -) |
| 197 | + # select the node pool based on what resources were requested |
| 198 | + select_node_pool( |
| 199 | + ti, |
| 200 | + context["params"]["request_storage"], |
| 201 | + context["params"]["request_memory"], |
| 202 | + context["params"]["request_cpu"], |
| 203 | + ) |
| 204 | + |
| 205 | + # select "use_ecr" argument and determine if ECR login is required |
| 206 | + select_ecr(ti, context["params"]["use_ecr"]) |
| 207 | + |
| 208 | + # retrieve stage out aws api key and account id |
| 209 | + select_stage_out(ti) |
| 210 | + |
| 211 | + |
| 212 | +setup_task = PythonOperator(task_id="Setup", python_callable=setup, dag=dag) |
247 | 213 |
|
248 | 214 |
|
249 | 215 | cwl_task_processing = unity_sps_utils.SpsKubernetesPodOperator( |
250 | 216 | retries=0, |
251 | 217 | task_id="cwl_task_processing", |
252 | 218 | namespace=unity_sps_utils.POD_NAMESPACE, |
253 | 219 | name="cwl-task-pod", |
254 | | - image=unity_sps_utils.SPS_DOCKER_CWL_IMAGE, |
| 220 | + image=unity_sps_utils.SPS_DOCKER_CWL_IMAGE_MODULAR, |
255 | 221 | service_account_name="airflow-worker", |
256 | 222 | in_cluster=True, |
257 | 223 | get_logs=True, |
258 | 224 | startup_timeout_seconds=1800, |
259 | 225 | arguments=[ |
| 226 | + "-i", |
| 227 | + STAGE_IN_WORKFLOW, |
| 228 | + "-s", |
| 229 | + "{{ params.stac_json }}", |
260 | 230 | "-w", |
261 | | - "{{ params.cwl_workflow }}", |
| 231 | + "{{ params.process_workflow }}", |
262 | 232 | "-j", |
263 | | - "{{ params.cwl_args }}", |
| 233 | + "{{ params.process_args }}", |
| 234 | + "-o", |
| 235 | + STAGE_OUT_WORKFLOW, |
| 236 | + "-d", |
| 237 | + "{{ ti.xcom_pull(task_ids='Setup', key='stage_out_args') }}", |
264 | 238 | "-e", |
265 | 239 | "{{ ti.xcom_pull(task_ids='Setup', key='ecr_login') }}", |
266 | 240 | ], |
@@ -313,6 +287,5 @@ def cleanup(**context): |
313 | 287 | task_id="Cleanup", python_callable=cleanup, dag=dag, trigger_rule=TriggerRule.ALL_DONE |
314 | 288 | ) |
315 | 289 |
|
316 | | -chain( |
317 | | - setup_task.as_setup(), cwl_task_stage_in, cwl_task_processing, cleanup_task.as_teardown(setups=setup_task) |
318 | | -) |
| 290 | + |
| 291 | +chain(setup_task.as_setup(), cwl_task_processing, cleanup_task.as_teardown(setups=setup_task)) |
0 commit comments