From c27b13100475f929a5500dbe8e8c4690ec33cf18 Mon Sep 17 00:00:00 2001 From: Eman Elsabban Date: Wed, 8 Jan 2025 06:53:04 -0800 Subject: [PATCH 1/6] Add default behaviour for jobs that dont have some keys and handle the non-existent of some json_vals --- tron/core/action.py | 49 ++++++++++--------- tron/core/actionrun.py | 8 +-- .../runstate/dynamodb_state_store.py | 8 ++- 3 files changed, 38 insertions(+), 27 deletions(-) diff --git a/tron/core/action.py b/tron/core/action.py index 03b9e6419..3b625d61e 100644 --- a/tron/core/action.py +++ b/tron/core/action.py @@ -117,37 +117,42 @@ def serialize_namedtuple(obj): try: return json.dumps( { - "command": state_data["command"], - "cpus": state_data["cpus"], - "mem": state_data["mem"], - "disk": state_data["disk"], - "cap_add": state_data["cap_add"], - "cap_drop": state_data["cap_drop"], + "command": state_data.get("command"), + "cpus": state_data.get("cpus"), + "mem": state_data.get("mem"), + "disk": state_data.get("disk"), + "cap_add": state_data.get("cap_add", []), + "cap_drop": state_data.get("cap_drop", []), "constraints": [ - serialize_namedtuple(constraint) for constraint in state_data["constraints"] + serialize_namedtuple(constraint) for constraint in state_data.get("constraints", []) ], # convert each ConfigConstraint to dictionary, so it would be a list of dicts - "docker_image": state_data["docker_image"], + "docker_image": state_data.get("docker_image"), "docker_parameters": [ - serialize_namedtuple(parameter) for parameter in state_data["docker_parameters"] + serialize_namedtuple(parameter) for parameter in state_data.get("docker_parameters", []) ], - "env": state_data["env"], - "secret_env": {key: serialize_namedtuple(val) for key, val in state_data["secret_env"].items()}, - "secret_volumes": [serialize_namedtuple(volume) for volume in state_data["secret_volumes"]], + "env": state_data.get("env", {}), + "secret_env": { + key: serialize_namedtuple(val) for key, val in state_data.get("secret_env", {}).items() + }, + "secret_volumes": [serialize_namedtuple(volume) for volume in state_data.get("secret_volumes", [])], "projected_sa_volumes": [ - serialize_namedtuple(volume) for volume in state_data["projected_sa_volumes"] + serialize_namedtuple(volume) for volume in state_data.get("projected_sa_volumes", []) ], "field_selector_env": { - key: serialize_namedtuple(val) for key, val in state_data["field_selector_env"].items() + key: serialize_namedtuple(val) for key, val in state_data.get("field_selector_env", {}).items() }, - "extra_volumes": [serialize_namedtuple(volume) for volume in state_data["extra_volumes"]], - "node_selectors": state_data["node_selectors"], - "node_affinities": [serialize_namedtuple(affinity) for affinity in state_data["node_affinities"]], - "labels": state_data["labels"], - "annotations": state_data["annotations"], - "service_account_name": state_data["service_account_name"], - "ports": state_data["ports"], + "extra_volumes": [serialize_namedtuple(volume) for volume in state_data.get("extra_volumes", [])], + "node_selectors": state_data.get("node_selectors", {}), + "node_affinities": [ + serialize_namedtuple(affinity) for affinity in state_data.get("node_affinities", []) + ], + "labels": state_data.get("labels", {}), + "annotations": state_data.get("annotations", {}), + "service_account_name": state_data.get("service_account_name"), + "ports": state_data.get("ports", []), "topology_spread_constraints": [ - serialize_namedtuple(constraint) for constraint in state_data["topology_spread_constraints"] + serialize_namedtuple(constraint) + for constraint in state_data.get("topology_spread_constraints", []) ], } ) diff --git a/tron/core/actionrun.py b/tron/core/actionrun.py index a37a83b15..5f067e251 100644 --- a/tron/core/actionrun.py +++ b/tron/core/actionrun.py @@ -185,8 +185,8 @@ def to_json(state_data: dict) -> Optional[str]: "end_time": state_data["end_time"].isoformat() if state_data["end_time"] else None, "rendered_command": state_data["rendered_command"], "exit_status": state_data["exit_status"], - "mesos_task_id": state_data["mesos_task_id"], - "kubernetes_task_id": state_data["kubernetes_task_id"], + "mesos_task_id": state_data.get("mesos_task_id"), + "kubernetes_task_id": state_data.get("kubernetes_task_id"), } ) except KeyError: @@ -811,12 +811,12 @@ def to_json(state_data: dict) -> Optional[str]: "job_run_id": state_data["job_run_id"], "action_name": state_data["action_name"], "state": state_data["state"], - "original_command": state_data["original_command"], + "original_command": state_data.get("original_command"), "start_time": state_data["start_time"].isoformat() if state_data["start_time"] else None, "end_time": state_data["end_time"].isoformat() if state_data["end_time"] else None, "node_name": state_data["node_name"], "exit_status": state_data["exit_status"], - "attempts": [ActionRunAttempt.to_json(attempt) for attempt in state_data["attempts"]], + "attempts": [ActionRunAttempt.to_json(attempt) for attempt in state_data.get("attempts", [])], "retries_remaining": state_data["retries_remaining"], "retries_delay": ( state_data["retries_delay"].total_seconds() if state_data["retries_delay"] is not None else None diff --git a/tron/serialize/runstate/dynamodb_state_store.py b/tron/serialize/runstate/dynamodb_state_store.py index 6f1b4efb4..1fb5d58cf 100644 --- a/tron/serialize/runstate/dynamodb_state_store.py +++ b/tron/serialize/runstate/dynamodb_state_store.py @@ -169,7 +169,13 @@ def _merge_items(self, first_items, remaining_items, read_json=False) -> dict: raw_items[key] += bytes(val["val"]["B"]) if read_json: for json_val in item: - json_items[key] = json_val["json_val"]["S"] + try: + json_items[key] = json_val["json_val"]["S"] + except Exception: + log.exception(f"json_val not found for key {key}") + # fallback to pickled data if json_val fails to exist for any key + # TODO: it would be nice if we can read the pickled data only for this failed key instead of all keys + read_json = False if read_json: try: log.info("read_json is enabled. Deserializing JSON items to restore them instead of pickled data.") From f320de41add896ef57d688a3aa124a98f77beef3 Mon Sep 17 00:00:00 2001 From: Eman Elsabban Date: Wed, 8 Jan 2025 07:19:58 -0800 Subject: [PATCH 2/6] Remove get from command --- tron/core/action.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tron/core/action.py b/tron/core/action.py index 3b625d61e..cfd0b904a 100644 --- a/tron/core/action.py +++ b/tron/core/action.py @@ -117,7 +117,7 @@ def serialize_namedtuple(obj): try: return json.dumps( { - "command": state_data.get("command"), + "command": state_data["command"], "cpus": state_data.get("cpus"), "mem": state_data.get("mem"), "disk": state_data.get("disk"), From 5808232328c448410569356dc5210e6ebed11900 Mon Sep 17 00:00:00 2001 From: Eman Elsabban Date: Wed, 8 Jan 2025 07:26:30 -0800 Subject: [PATCH 3/6] cap add and drop should have values --- tron/core/action.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tron/core/action.py b/tron/core/action.py index cfd0b904a..3ce6ce1de 100644 --- a/tron/core/action.py +++ b/tron/core/action.py @@ -121,8 +121,8 @@ def serialize_namedtuple(obj): "cpus": state_data.get("cpus"), "mem": state_data.get("mem"), "disk": state_data.get("disk"), - "cap_add": state_data.get("cap_add", []), - "cap_drop": state_data.get("cap_drop", []), + "cap_add": state_data["cap_add"], + "cap_drop": state_data["cap_drop"], "constraints": [ serialize_namedtuple(constraint) for constraint in state_data.get("constraints", []) ], # convert each ConfigConstraint to dictionary, so it would be a list of dicts From 9adc64ec49fe8eccebe9c434e1defd2eef939775 Mon Sep 17 00:00:00 2001 From: Eman Elsabban Date: Thu, 9 Jan 2025 12:09:37 -0800 Subject: [PATCH 4/6] removing the try/except since it was added in a separate pr --- tron/serialize/runstate/dynamodb_state_store.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/tron/serialize/runstate/dynamodb_state_store.py b/tron/serialize/runstate/dynamodb_state_store.py index 1fb5d58cf..6f1b4efb4 100644 --- a/tron/serialize/runstate/dynamodb_state_store.py +++ b/tron/serialize/runstate/dynamodb_state_store.py @@ -169,13 +169,7 @@ def _merge_items(self, first_items, remaining_items, read_json=False) -> dict: raw_items[key] += bytes(val["val"]["B"]) if read_json: for json_val in item: - try: - json_items[key] = json_val["json_val"]["S"] - except Exception: - log.exception(f"json_val not found for key {key}") - # fallback to pickled data if json_val fails to exist for any key - # TODO: it would be nice if we can read the pickled data only for this failed key instead of all keys - read_json = False + json_items[key] = json_val["json_val"]["S"] if read_json: try: log.info("read_json is enabled. Deserializing JSON items to restore them instead of pickled data.") From cde91b1ff8b226d98e3e6a81e386964240d13397 Mon Sep 17 00:00:00 2001 From: Eman Elsabban Date: Thu, 9 Jan 2025 13:50:30 -0800 Subject: [PATCH 5/6] maybe not all gets are necessary --- tron/core/action.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tron/core/action.py b/tron/core/action.py index 3ce6ce1de..efe4a5d83 100644 --- a/tron/core/action.py +++ b/tron/core/action.py @@ -118,15 +118,15 @@ def serialize_namedtuple(obj): return json.dumps( { "command": state_data["command"], - "cpus": state_data.get("cpus"), - "mem": state_data.get("mem"), - "disk": state_data.get("disk"), + "cpus": state_data["cpus"], + "mem": state_data["mem"], + "disk": state_data["disk"], "cap_add": state_data["cap_add"], "cap_drop": state_data["cap_drop"], "constraints": [ serialize_namedtuple(constraint) for constraint in state_data.get("constraints", []) ], # convert each ConfigConstraint to dictionary, so it would be a list of dicts - "docker_image": state_data.get("docker_image"), + "docker_image": state_data["docker_image"], "docker_parameters": [ serialize_namedtuple(parameter) for parameter in state_data.get("docker_parameters", []) ], From 27938cbac2a6e1c30aa88878e0d24e4245848f61 Mon Sep 17 00:00:00 2001 From: Eman Elsabban Date: Fri, 17 Jan 2025 10:46:04 -0800 Subject: [PATCH 6/6] Adding comments --- tron/core/action.py | 7 +++++++ tron/core/actionrun.py | 5 +++++ 2 files changed, 12 insertions(+) diff --git a/tron/core/action.py b/tron/core/action.py index efe4a5d83..33b18c355 100644 --- a/tron/core/action.py +++ b/tron/core/action.py @@ -115,6 +115,13 @@ def serialize_namedtuple(obj): return obj try: + # NOTE: you'll notice that there's a lot of get() accesses of state_data for + # pretty common fields - this is because ActionCommandConfig is used by more + # than one type of ActionRun (Kubernetes, Mesos, SSH) and these generally look + # different. Alternatively, some of these fields are used by KubernetesActionRun + # but are relatively new and older runs do not have data for them. + # Once we get rid of the SSH and Mesos code as well as older runs in DynamoDB, + # we'll likely be able to clean this up. return json.dumps( { "command": state_data["command"], diff --git a/tron/core/actionrun.py b/tron/core/actionrun.py index 5f067e251..efec9bcdf 100644 --- a/tron/core/actionrun.py +++ b/tron/core/actionrun.py @@ -185,6 +185,11 @@ def to_json(state_data: dict) -> Optional[str]: "end_time": state_data["end_time"].isoformat() if state_data["end_time"] else None, "rendered_command": state_data["rendered_command"], "exit_status": state_data["exit_status"], + # NOTE: mesos_task_id can be deleted once we delete all Mesos + # code and run data - and kubernetes_task_id can then be + # accessed unconditionally :) + # (see note in ActionCommandConfig::to_json() for more + # information about why we do this) "mesos_task_id": state_data.get("mesos_task_id"), "kubernetes_task_id": state_data.get("kubernetes_task_id"), }