diff --git a/airflow/api_connexion/endpoints/xcom_endpoint.py b/airflow/api_connexion/endpoints/xcom_endpoint.py index 59fa9f5acaaa..5ba0ffa71594 100644 --- a/airflow/api_connexion/endpoints/xcom_endpoint.py +++ b/airflow/api_connexion/endpoints/xcom_endpoint.py @@ -125,7 +125,7 @@ def get_xcom_entry( stub.value = XCom.deserialize_value(stub) item = stub - if stringify: + if stringify or conf.getboolean("core", "enable_xcom_pickling"): return xcom_schema_string.dump(item) return xcom_schema_native.dump(item) diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index f6fe5ac0733a..8e9752f084ff 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -2039,6 +2039,8 @@ paths: If set to true (default) the Any value will be returned as string, e.g. a Python representation of a dict. If set to false it will return the raw data as dict, list, string or whatever was stored. + This parameter is not meaningful when using XCom pickling, then it is always returned as string. + *New in version 2.10.0* responses: "200": diff --git a/tests/api_connexion/endpoints/test_xcom_endpoint.py b/tests/api_connexion/endpoints/test_xcom_endpoint.py index 9f2d65250069..7a51714c5b29 100644 --- a/tests/api_connexion/endpoints/test_xcom_endpoint.py +++ b/tests/api_connexion/endpoints/test_xcom_endpoint.py @@ -174,6 +174,36 @@ def test_should_respond_200_native(self): "value": {"key": "value"}, } + @conf_vars({("core", "enable_xcom_pickling"): "True"}) + def test_should_respond_200_native_for_pickled(self): + dag_id = "test-dag-id" + task_id = "test-task-id" + execution_date = "2005-04-02T00:00:00+00:00" + xcom_key = "test-xcom-key" + execution_date_parsed = parse_execution_date(execution_date) + run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed) + value_non_serializable_key = {("201009_NB502104_0421_AHJY23BGXG (SEQ_WF: 138898)", None): 82359} + self._create_xcom_entry( + dag_id, run_id, execution_date_parsed, task_id, xcom_key, {"key": value_non_serializable_key} + ) + response = self.client.get( + f"/api/v1/dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert 200 == response.status_code + + current_data = response.json + current_data["timestamp"] = "TIMESTAMP" + assert current_data == { + "dag_id": dag_id, + "execution_date": execution_date, + "key": xcom_key, + "task_id": task_id, + "map_index": -1, + "timestamp": "TIMESTAMP", + "value": f"{{'key': {str(value_non_serializable_key)}}}", + } + def test_should_raise_404_for_non_existent_xcom(self): dag_id = "test-dag-id" task_id = "test-task-id"