Skip to content

Commit

Permalink
Do not attempt to provide not stringified objects to UI via xcom if p…
Browse files Browse the repository at this point in the history
…ickling is active (apache#42388)

* Do not attempt to provide not stringified objects to UI via xcom if pickling is active

* Add pytest
  • Loading branch information
jscheffl authored and joaopamaral committed Oct 21, 2024
1 parent 9df76d6 commit 64e7d28
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 1 deletion.
2 changes: 1 addition & 1 deletion airflow/api_connexion/endpoints/xcom_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def get_xcom_entry(
stub.value = XCom.deserialize_value(stub)
item = stub

if stringify:
if stringify or conf.getboolean("core", "enable_xcom_pickling"):
return xcom_schema_string.dump(item)

return xcom_schema_native.dump(item)
2 changes: 2 additions & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2040,6 +2040,8 @@ paths:
If set to true (default) the Any value will be returned as string, e.g. a Python representation
of a dict. If set to false it will return the raw data as dict, list, string or whatever was stored.
This parameter is not meaningful when using XCom pickling, then it is always returned as string.
*New in version 2.10.0*
responses:
"200":
Expand Down
30 changes: 30 additions & 0 deletions tests/api_connexion/endpoints/test_xcom_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,36 @@ def test_should_respond_200_native(self):
"value": {"key": "value"},
}

@conf_vars({("core", "enable_xcom_pickling"): "True"})
def test_should_respond_200_native_for_pickled(self):
dag_id = "test-dag-id"
task_id = "test-task-id"
execution_date = "2005-04-02T00:00:00+00:00"
xcom_key = "test-xcom-key"
execution_date_parsed = parse_execution_date(execution_date)
run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed)
value_non_serializable_key = {("201009_NB502104_0421_AHJY23BGXG (SEQ_WF: 138898)", None): 82359}
self._create_xcom_entry(
dag_id, run_id, execution_date_parsed, task_id, xcom_key, {"key": value_non_serializable_key}
)
response = self.client.get(
f"/api/v1/dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}",
environ_overrides={"REMOTE_USER": "test"},
)
assert 200 == response.status_code

current_data = response.json
current_data["timestamp"] = "TIMESTAMP"
assert current_data == {
"dag_id": dag_id,
"execution_date": execution_date,
"key": xcom_key,
"task_id": task_id,
"map_index": -1,
"timestamp": "TIMESTAMP",
"value": f"{{'key': {str(value_non_serializable_key)}}}",
}

def test_should_raise_404_for_non_existent_xcom(self):
dag_id = "test-dag-id"
task_id = "test-task-id"
Expand Down

0 comments on commit 64e7d28

Please sign in to comment.