Skip to content
228 changes: 191 additions & 37 deletions robusta_krr/core/integrations/kubernetes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@
from robusta_krr.core.models.result import ResourceAllocations
from robusta_krr.utils.object_like_dict import ObjectLikeDict


class LightweightJobInfo:
"""Lightweight job object containing only the fields needed for GroupedJob processing."""
def __init__(self, name: str, namespace: str):
self.name = name
self.namespace = namespace


from . import config_patch as _

logger = logging.getLogger("krr")
Expand Down Expand Up @@ -261,6 +269,88 @@ def _should_list_resource(self, resource: str) -> bool:
return True
return resource in settings.resources

def _is_job_owned_by_cronjob(self, job: V1Job) -> bool:
"""Check if a job is owned by a CronJob."""
return any(owner.kind == "CronJob" for owner in job.metadata.owner_references or [])

def _is_job_grouped(self, job: V1Job) -> bool:
"""Check if a job has any of the grouping labels."""
if not settings.job_grouping_labels or not job.metadata.labels:
return False
return any(label in job.metadata.labels for label in settings.job_grouping_labels)

async def _list_namespaced_or_global_objects_batched(
self,
kind: KindLiteral,
all_namespaces_request: Callable,
namespaced_request: Callable,
limit: Optional[int] = None,
continue_ref: Optional[str] = None,
) -> tuple[list[Any], Optional[str]]:
logger.debug("Listing %s in %s with batching (limit=%d)", kind, self.cluster, limit)
loop = asyncio.get_running_loop()

try:
if self.namespaces == "*":
requests = [
loop.run_in_executor(
self.executor,
lambda: all_namespaces_request(
watch=False,
label_selector=settings.selector,
limit=limit,
_continue=continue_ref,
),
)
]
else:
requests = [
loop.run_in_executor(
self.executor,
lambda ns=namespace: namespaced_request(
namespace=ns,
watch=False,
label_selector=settings.selector,
limit=limit,
_continue=continue_ref,
),
)
for namespace in self.namespaces
]

result = [
item
for request_result in await asyncio.gather(*requests)
for item in request_result.items
]

next_continue_ref = None
if requests:
first_result = await requests[0]
next_continue_ref = getattr(first_result.metadata, '_continue', None)

return result, next_continue_ref

except ApiException as e:
if e.status == 410 and e.body:
# Continue token expired
import json
try:
error_body = json.loads(e.body)
new_continue_token = error_body.get("metadata", {}).get("continue")
if new_continue_token:
logger.info("Continue token expired for jobs listing. Continuing")
return await self._list_namespaced_or_global_objects_batched(
kind=kind,
all_namespaces_request=all_namespaces_request,
namespaced_request=namespaced_request,
limit=limit,
continue_ref=new_continue_token,
)
except (json.JSONDecodeError, KeyError):
pass
raise

async def _list_namespaced_or_global_objects(
self,
kind: KindLiteral,
Expand Down Expand Up @@ -458,26 +548,55 @@ def _list_all_daemon_set(self) -> list[K8sObjectData]:
extract_containers=lambda item: item.spec.template.spec.containers,
)

def _list_all_jobs(self) -> list[K8sObjectData]:
def filter_jobs(item):
# Skip jobs owned by CronJobs
if any(owner.kind == "CronJob" for owner in item.metadata.owner_references or []):
return False

async def _list_all_jobs(self) -> list[K8sObjectData]:
"""List all jobs using batched loading with 500 batch size."""
if not self._should_list_resource("Job"):
return []

all_jobs = []
try:
continue_ref: Optional[str] = None
batch_count = 0

# Skip jobs that have any of the grouping labels (they will be handled by GroupedJob)
if settings.job_grouping_labels and item.metadata.labels:
if any(label in item.metadata.labels for label in settings.job_grouping_labels):
return False
while batch_count < settings.discovery_job_max_batches:
jobs_batch, next_continue_ref = await self._list_namespaced_or_global_objects_batched(
kind="Job",
all_namespaces_request=self.batch.list_job_for_all_namespaces,
namespaced_request=self.batch.list_namespaced_job,
limit=settings.discovery_job_batch_size,
continue_ref=continue_ref,
)

batch_count += 1
continue_ref = next_continue_ref

# refreshed continue token
if not jobs_batch and continue_ref:
continue

for job in jobs_batch:
if self._is_job_owned_by_cronjob(job):
continue

if self._is_job_grouped(job):
continue

for container in job.spec.template.spec.containers:
all_jobs.append(self.__build_scannable_object(job, container, "Job"))

if not continue_ref:
break

return True

return self._list_scannable_objects(
kind="Job",
all_namespaces_request=self.batch.list_job_for_all_namespaces,
namespaced_request=self.batch.list_namespaced_job,
extract_containers=lambda item: item.spec.template.spec.containers,
filter_workflows=filter_jobs,
)
logger.debug("Found %d regular jobs", len(all_jobs))
return all_jobs
except Exception as e:
logger.error(
"Failed to run jobs discovery",
exc_info=True,
)
return all_jobs

def _list_all_cronjobs(self) -> list[K8sObjectData]:
return self._list_scannable_objects(
Expand All @@ -497,42 +616,77 @@ async def _list_all_groupedjobs(self) -> list[K8sObjectData]:
logger.debug("Skipping GroupedJob in cluster")
return []

logger.debug(f"Listing GroupedJobs with grouping labels: {settings.job_grouping_labels}")

# Get all jobs that have any of the grouping labels
all_jobs = await self._list_namespaced_or_global_objects(
kind="Job",
all_namespaces_request=self.batch.list_job_for_all_namespaces,
namespaced_request=self.batch.list_namespaced_job,
)
logger.debug("Listing GroupedJobs with grouping labels: %s", settings.job_grouping_labels)

grouped_jobs = defaultdict(list)
for job in all_jobs:
if (job.metadata.labels and
not any(owner.kind == "CronJob" for owner in job.metadata.owner_references or [])):
grouped_jobs_template = {} # Store only ONE full job as template per group - needed for class K8sObjectData
continue_ref: Optional[str] = None
batch_count = 0

try:
while batch_count < settings.discovery_job_max_batches:
jobs_batch, next_continue_ref = await self._list_namespaced_or_global_objects_batched(
kind="Job",
all_namespaces_request=self.batch.list_job_for_all_namespaces,
namespaced_request=self.batch.list_namespaced_job,
limit=settings.discovery_job_batch_size,
continue_ref=continue_ref,
)

batch_count += 1
continue_ref = next_continue_ref

# refreshed continue token
if not jobs_batch and continue_ref:
continue

for label_name in settings.job_grouping_labels:
if label_name in job.metadata.labels:
for job in jobs_batch:
if not job.metadata.labels or self._is_job_owned_by_cronjob(job) or not self._is_job_grouped(job):
continue

for label_name in settings.job_grouping_labels:
if label_name not in job.metadata.labels:
continue

# label_name is value of grouped job label
label_value = job.metadata.labels[label_name]
group_key = f"{label_name}={label_value}"
grouped_jobs[group_key].append(job)
lightweight_job = LightweightJobInfo(
name=job.metadata.name,
namespace=job.metadata.namespace
)
# Store lightweight job info only for grouped jobs
grouped_jobs[group_key].append(lightweight_job)
# Keep only ONE full job as template per group
if group_key not in grouped_jobs_template:
grouped_jobs_template[group_key] = job

if not continue_ref:
break

except Exception as e:
logger.error(
"Failed to run grouped jobs discovery",
exc_info=True,
)
raise e

result = []
for group_name, jobs in grouped_jobs.items():
template_job = grouped_jobs_template[group_name]

jobs_by_namespace = defaultdict(list)
for job in jobs:
jobs_by_namespace[job.metadata.namespace].append(job)
jobs_by_namespace[job.namespace].append(job)

for namespace, namespace_jobs in jobs_by_namespace.items():
limited_jobs = namespace_jobs[:settings.job_grouping_limit]

container_names = set()
for job in limited_jobs:
for container in job.spec.template.spec.containers:
container_names.add(container.name)
for container in template_job.spec.template.spec.containers:
container_names.add(container.name)

for container_name in container_names:
template_job = limited_jobs[0]
template_container = None
for container in template_job.spec.template.spec.containers:
if container.name == container_name:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ async def load_pods(self, object: K8sObjectData, period: timedelta) -> list[PodD
del jobs
elif object.kind == "GroupedJob":
if hasattr(object._api_resource, '_grouped_jobs'):
pod_owners = [job.metadata.name for job in object._api_resource._grouped_jobs]
pod_owners = [job.name for job in object._api_resource._grouped_jobs]
pod_owner_kind = "Job"
else:
pod_owners = [object.name]
Expand Down
4 changes: 4 additions & 0 deletions robusta_krr/core/models/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ class Config(pd.BaseSettings):
# Threading settings
max_workers: int = pd.Field(6, ge=1)

# Discovery settings
discovery_job_batch_size: int = pd.Field(1000, ge=1, description="Batch size for Kubernetes job API calls")
discovery_job_max_batches: int = pd.Field(50, ge=1, description="Maximum number of job batches to process to prevent infinite loops")

# Job grouping settings
job_grouping_labels: Union[list[str], str, None] = pd.Field(None, description="Label name(s) to use for grouping jobs into GroupedJob workload type")
job_grouping_limit: int = pd.Field(500, ge=1, description="Maximum number of jobs/pods to query per GroupedJob group")
Expand Down
14 changes: 14 additions & 0 deletions robusta_krr/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,18 @@ def run_strategy(
help="Maximum number of jobs/pods to query per GroupedJob group (default: 500).",
rich_help_panel="Job Grouping Settings",
),
discovery_job_batch_size: int = typer.Option(
250,
"--discovery-job-batch-size",
help="Batch size for Kubernetes job API calls (default: 1000).",
rich_help_panel="Job Discovery Settings",
),
discovery_job_max_batches: int = typer.Option(
100,
"--discovery-job-max-batches",
help="Maximum number of job batches to process to prevent infinite loops (default: 50).",
rich_help_panel="Job Discovery Settings",
),
format: str = typer.Option(
"table",
"--formatter",
Expand Down Expand Up @@ -371,6 +383,8 @@ def run_strategy(
max_workers=max_workers,
job_grouping_labels=job_grouping_labels,
job_grouping_limit=job_grouping_limit,
discovery_job_batch_size=discovery_job_batch_size,
discovery_job_max_batches=discovery_job_max_batches,
format=format,
show_cluster_name=show_cluster_name,
verbose=verbose,
Expand Down
Loading