Skip to content

Commit 2b5ca60

Browse files
committed
Merge commit '414c2585a8b646788e6d3cf715b638ab1a2f846c' into issue696
2 parents 1a0934d + 414c258 commit 2b5ca60

File tree

9 files changed

+240
-23
lines changed

9 files changed

+240
-23
lines changed

dev-requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ types-dataclasses==0.6.6
1010
backoff-stubs~=1.10
1111
pytest~=7.0.1
1212
types-beautifulsoup4==4.11.1
13+
types-python-dateutil~=2.8.2

granulate_utils/containers/container.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,16 @@
44
#
55

66
from dataclasses import dataclass
7+
from datetime import datetime
78
from typing import Dict, List, Optional
89

910

11+
@dataclass
12+
class TimeInfo:
13+
create_time: datetime # Creation time of the container (UTC)
14+
start_time: Optional[datetime] # Start time of the container (UTC) - None=not started
15+
16+
1017
@dataclass
1118
class Container:
1219
"""
@@ -22,6 +29,8 @@ class Container:
2229
running: bool
2330
# None if not requested / container is dead
2431
pid: Optional[int]
32+
# None if not requested, make sure to pass all_info=True
33+
time_info: Optional[TimeInfo]
2534

2635

2736
class ContainersClientInterface:

granulate_utils/containers/cri.py

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,17 @@
33
# Licensed under the AGPL3 License. See LICENSE.md in the project root for license information.
44
#
55
import json
6+
from datetime import datetime, timezone
67
from typing import List, Optional, Union
78

89
import grpc # type: ignore # no types-grpc sadly
910

10-
from granulate_utils.containers.container import Container, ContainersClientInterface
11+
from granulate_utils.containers.container import Container, ContainersClientInterface, TimeInfo
1112
from granulate_utils.exceptions import ContainerNotFound, CriNotAvailableError
1213
from granulate_utils.generated.containers.cri import api_pb2 as api_pb2 # type: ignore
1314
from granulate_utils.generated.containers.cri.api_pb2_grpc import RuntimeServiceStub # type: ignore
1415
from granulate_utils.linux import ns
16+
from granulate_utils.type_utils import assert_cast
1517

1618
RUNTIMES = (
1719
("containerd", "/run/containerd/containerd.sock"),
@@ -76,31 +78,35 @@ def list_containers(self, all_info: bool) -> List[Container]:
7678
for container in stub.ListContainers(api_pb2.ListContainersRequest()).containers:
7779
if all_info:
7880
# need verbose=True to get the info which contains the PID
79-
status = stub.ContainerStatus(
80-
api_pb2.ContainerStatusRequest(container_id=container.id, verbose=True)
81-
)
82-
pid: Optional[int] = json.loads(status.info.get("info", "{}")).get("pid")
81+
status_response = self._container_status_request(stub, container.id, verbose=True)
82+
if status_response is None:
83+
# container probably went down
84+
continue
85+
pid: Optional[int] = json.loads(status_response.info.get("info", "{}")).get("pid")
86+
containers.append(self._create_container(status_response.status, pid, rt))
8387
else:
84-
pid = None
85-
86-
containers.append(self._create_container(container, pid, rt))
88+
containers.append(self._create_container(container, None, rt))
8789

8890
return containers
8991

92+
def _container_status_request(
93+
self, stub: RuntimeServiceStub, container_id: str, *, verbose: bool
94+
) -> Optional[api_pb2.ContainerStatusResponse]:
95+
try:
96+
return stub.ContainerStatus(api_pb2.ContainerStatusRequest(container_id=container_id, verbose=verbose))
97+
except grpc._channel._InactiveRpcError as e:
98+
if e.code() == grpc.StatusCode.NOT_FOUND:
99+
return None
100+
raise
101+
90102
def get_container(self, container_id: str, all_info: bool) -> Container:
91103
for rt, path in self._runtimes.items():
92104
with RuntimeServiceWrapper(path) as stub:
93-
try:
94-
status = stub.ContainerStatus(
95-
api_pb2.ContainerStatusRequest(container_id=container_id, verbose=all_info)
96-
)
97-
except grpc._channel._InactiveRpcError as e:
98-
if e.code() == grpc.StatusCode.NOT_FOUND:
99-
continue
100-
raise
101-
102-
pid: Optional[int] = json.loads(status.info.get("info", "{}")).get("pid")
103-
return self._create_container(status.status, pid, rt)
105+
status_response = self._container_status_request(stub, container_id, verbose=all_info)
106+
if status_response is None:
107+
continue
108+
pid: Optional[int] = json.loads(status_response.info.get("info", "{}")).get("pid")
109+
return self._create_container(status_response.status, pid, rt)
104110

105111
raise ContainerNotFound(container_id)
106112

@@ -109,13 +115,27 @@ def get_runtimes(self) -> List[str]:
109115

110116
@classmethod
111117
def _create_container(
112-
cls, container: Union[api_pb2.Container, api_pb2.ContainerStatus], pid: Optional[int], runtime: str
118+
cls,
119+
container: Union[api_pb2.Container, api_pb2.ContainerStatus],
120+
pid: Optional[int],
121+
runtime: str,
113122
) -> Container:
123+
time_info: Optional[TimeInfo] = None
124+
if isinstance(container, api_pb2.ContainerStatus):
125+
created_at_ns = assert_cast(int, container.created_at)
126+
started_at_ns = assert_cast(int, container.started_at)
127+
create_time = datetime.fromtimestamp(created_at_ns / 1e9, tz=timezone.utc)
128+
start_time = None
129+
# from ContainerStatus message docs, 0 == not started
130+
if started_at_ns != 0:
131+
start_time = datetime.fromtimestamp(started_at_ns / 1e9, tz=timezone.utc)
132+
time_info = TimeInfo(create_time=create_time, start_time=start_time)
114133
return Container(
115134
runtime=runtime,
116135
name=cls._reconstruct_name(container),
117136
id=container.id,
118137
labels=container.labels,
119138
running=container.state == CONTAINER_RUNNING,
120139
pid=pid,
140+
time_info=time_info,
121141
)

granulate_utils/containers/docker.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33
# Licensed under the AGPL3 License. See LICENSE.md in the project root for license information.
44
#
55

6+
from datetime import datetime
67
from typing import List, Optional
78

89
import docker
910
import docker.errors
1011
import docker.models.containers
12+
from dateutil.parser import isoparse
1113

12-
from granulate_utils.containers.container import Container, ContainersClientInterface
14+
from granulate_utils.containers.container import Container, ContainersClientInterface, TimeInfo
1315
from granulate_utils.exceptions import ContainerNotFound
1416
from granulate_utils.linux import ns
1517

@@ -35,15 +37,27 @@ def get_runtimes(self) -> List[str]:
3537
return ["docker"]
3638

3739
@staticmethod
38-
def _create_container(container: docker.models.containers.Container) -> Container:
40+
def _parse_docker_ts(ts: str) -> Optional[datetime]:
41+
assert ts.endswith("Z") # assert UTC
42+
if ts.startswith("0001"): # None-value timestamp in docker is represented as "0001-01-01T00:00:00Z".
43+
return None
44+
return isoparse(ts)
45+
46+
@classmethod
47+
def _create_container(cls, container: docker.models.containers.Container) -> Container:
3948
pid: Optional[int] = container.attrs["State"].get("Pid")
4049
if pid == 0: # Docker returns 0 for dead containers
4150
pid = None
51+
created = cls._parse_docker_ts(container.attrs["Created"])
52+
assert created is not None
53+
started_at = cls._parse_docker_ts(container.attrs["State"]["StartedAt"])
54+
time_info = TimeInfo(create_time=created, start_time=started_at)
4255
return Container(
4356
runtime="docker",
4457
name=container.name,
4558
id=container.id,
4659
labels=container.labels,
4760
running=container.status == "running",
4861
pid=pid,
62+
time_info=time_info,
4963
)

granulate_utils/exceptions.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,8 @@ def __init__(self, process: Process):
4949
class AlreadyInCgroup(Exception):
5050
def __init__(self, subsystem: str, cgroup: str) -> None:
5151
super().__init__(f"{subsystem!r} subsystem is already in a predefined cgroup: {cgroup!r}")
52+
53+
54+
class DatabricksJobNameDiscoverException(Exception):
55+
def __init__(self, msg: str) -> None:
56+
super().__init__(msg)
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
#
2+
# Copyright (c) Granulate. All rights reserved.
3+
# Licensed under the AGPL3 License. See LICENSE.md in the project root for license information.
4+
#
5+
6+
import json
7+
import logging
8+
import os
9+
import time
10+
from typing import Dict, Optional
11+
12+
import requests
13+
14+
from granulate_utils.exceptions import DatabricksJobNameDiscoverException
15+
16+
HOST_KEY_NAME = "*.sink.ganglia.host"
17+
DATABRICKS_METRICS_PROP_PATH = "/databricks/spark/conf/metrics.properties"
18+
CLUSTER_TAGS_KEY = "spark.databricks.clusterUsageTags.clusterAllTags"
19+
SPARKUI_APPS_URL = "http://{}/api/v1/applications"
20+
REQUEST_TIMEOUT = 5
21+
JOB_NAME_KEY = "RunName"
22+
DEFAULT_WEBUI_PORT = 40001
23+
DATABRICKS_JOBNAME_TIMEOUT_S = 2 * 60
24+
RETRY_INTERVAL_S = 1
25+
26+
27+
class DatabricksClient:
28+
def __init__(self, logger: logging.LoggerAdapter) -> None:
29+
self.logger = logger
30+
self.logger.debug("Getting Databricks job name")
31+
self.job_name = self.get_job_name()
32+
if self.job_name is None:
33+
self.logger.warning(
34+
"Failed initializing Databricks client. Databricks job name will not be included in ephemeral clusters."
35+
)
36+
else:
37+
self.logger.debug(f"Got Databricks job name: {self.job_name}")
38+
39+
def _request_get(self, url: str) -> requests.Response:
40+
resp = requests.get(url, timeout=REQUEST_TIMEOUT)
41+
resp.raise_for_status()
42+
return resp
43+
44+
@staticmethod
45+
def get_webui_address() -> Optional[str]:
46+
with open(DATABRICKS_METRICS_PROP_PATH) as f:
47+
properties = f.read()
48+
try:
49+
host = dict([line.split("=", 1) for line in properties.splitlines()])[HOST_KEY_NAME]
50+
except KeyError as e:
51+
if e.args[0] == HOST_KEY_NAME:
52+
# Might happen while provisioning the cluster, retry.
53+
return None
54+
raise DatabricksJobNameDiscoverException(f"Failed to get Databricks webui address {properties=}") from e
55+
except Exception as e:
56+
raise DatabricksJobNameDiscoverException(f"Failed to get Databricks webui address {properties=}") from e
57+
return f"{host}:{DEFAULT_WEBUI_PORT}"
58+
59+
def get_job_name(self) -> Optional[str]:
60+
# Retry in case of a connection error, as the metrics server might not be up yet.
61+
start_time = time.monotonic()
62+
while time.monotonic() - start_time < DATABRICKS_JOBNAME_TIMEOUT_S:
63+
try:
64+
if cluster_metadata := self._cluster_all_tags_metadata():
65+
name = self._get_name_from_metadata(cluster_metadata)
66+
if name:
67+
self.logger.debug("Found name in metadata", job_name=name, cluster_metadata=cluster_metadata)
68+
return name
69+
else:
70+
self.logger.debug("Failed to extract name from metadata", cluster_metadata=cluster_metadata)
71+
return None
72+
else:
73+
# No job name yet, retry.
74+
time.sleep(RETRY_INTERVAL_S)
75+
except DatabricksJobNameDiscoverException:
76+
self.logger.exception("Failed to get Databricks job name")
77+
return None
78+
except Exception:
79+
self.logger.exception("Generic exception was raise during spark job name discovery")
80+
return None
81+
self.logger.info("Databricks get job name timeout, continuing...")
82+
return None
83+
84+
@staticmethod
85+
def _get_name_from_metadata(metadata: Dict[str, str]) -> Optional[str]:
86+
if JOB_NAME_KEY in metadata:
87+
return str(metadata[JOB_NAME_KEY]).replace(" ", "-").lower()
88+
return None
89+
90+
def _cluster_all_tags_metadata(self) -> Optional[Dict[str, str]]:
91+
"""
92+
Returns `includes spark.databricks.clusterUsageTags.clusterAllTags` tags as `Dict`.
93+
"""
94+
if not os.path.isfile(DATABRICKS_METRICS_PROP_PATH):
95+
# We want to retry in case the cluster is still initializing, and the file is not yet deployed.
96+
return None
97+
webui = self.get_webui_address()
98+
if webui is None:
99+
# retry
100+
return None
101+
# The API used: https://spark.apache.org/docs/latest/monitoring.html#rest-api
102+
apps_url = SPARKUI_APPS_URL.format(webui)
103+
self.logger.debug("Databricks SparkUI address", apps_url=apps_url)
104+
try:
105+
response = self._request_get(apps_url)
106+
except requests.exceptions.RequestException:
107+
# Request might fail in cases where the cluster is still initializing, retrying.
108+
return None
109+
try:
110+
apps = response.json()
111+
except Exception as e:
112+
if "Spark is starting up. Please wait a while until it's ready" in response.text:
113+
# Spark is still initializing, retrying.
114+
# https://github.com/apache/spark/blob/38c41c/core/src/main/scala/org/apache/spark/ui/SparkUI.scala#L64
115+
return None
116+
else:
117+
raise DatabricksJobNameDiscoverException(
118+
f"Failed to parse apps url response, query {response.text=}"
119+
) from e
120+
if len(apps) == 0:
121+
# apps might be empty because of initialization, retrying.
122+
self.logger.debug("No apps yet, retrying.")
123+
return None
124+
125+
env_url = f"{apps_url}/{apps[0]['id']}/environment"
126+
try:
127+
response = self._request_get(env_url)
128+
except Exception as e:
129+
# No reason for any exception, `environment` uri should be accessible if we have running apps.
130+
raise DatabricksJobNameDiscoverException(f"Environment request failed {env_url=}") from e
131+
try:
132+
env = response.json()
133+
except Exception as e:
134+
raise DatabricksJobNameDiscoverException(f"Environment request failed {response.text=}") from e
135+
props = env.get("sparkProperties")
136+
if props is None:
137+
raise DatabricksJobNameDiscoverException(f"sparkProperties was not found in {env=}")
138+
for prop in props:
139+
if prop[0] == CLUSTER_TAGS_KEY:
140+
try:
141+
all_tags_value = json.loads(prop[1])
142+
except Exception as e:
143+
raise DatabricksJobNameDiscoverException(f"Failed to parse {prop=}") from e
144+
return {cluster_all_tag["key"]: cluster_all_tag["value"] for cluster_all_tag in all_tags_value}
145+
else:
146+
raise DatabricksJobNameDiscoverException(f"Failed to find {CLUSTER_TAGS_KEY=} in {props=}")

granulate_utils/metrics/spark.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,11 @@ def _get_standalone_apps(self) -> Dict[str, Tuple[str, str]]:
122122
metrics_json = rest_request_to_json(self._master_address, SPARK_MASTER_STATE_PATH)
123123
running_apps = {}
124124

125-
for app in metrics_json.get("activeapps", []):
125+
activeapps = metrics_json.get("activeapps", [])
126+
if activeapps == []:
127+
self._logger.warning("No active apps found in Spark master state", metrics_json=metrics_json)
128+
129+
for app in activeapps:
126130
try:
127131
app_id = app["id"]
128132
app_name = app["name"]

granulate_utils/type_utils.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#
2+
# Copyright (c) Granulate. All rights reserved.
3+
# Licensed under the AGPL3 License. See LICENSE.md in the project root for license information.
4+
#
5+
from typing import Any, Optional, Type, TypeVar
6+
7+
T = TypeVar("T")
8+
9+
10+
def cast_away_optional(arg: Optional[T]) -> T:
11+
assert arg is not None
12+
return arg
13+
14+
15+
def assert_cast(typ: Type[T], arg: Any) -> T:
16+
assert isinstance(arg, typ)
17+
return arg

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ typing-extensions>=4.1.0
88
pyelftools~=0.28
99
packaging~=23.1
1010
beautifulsoup4==4.11.1
11+
python-dateutil~=2.8.1

0 commit comments

Comments
 (0)