diff --git a/robusta_krr/core/integrations/kubernetes/__init__.py b/robusta_krr/core/integrations/kubernetes/__init__.py index 76f3af96..1398c62b 100644 --- a/robusta_krr/core/integrations/kubernetes/__init__.py +++ b/robusta_krr/core/integrations/kubernetes/__init__.py @@ -23,6 +23,14 @@ from robusta_krr.core.models.result import ResourceAllocations from robusta_krr.utils.object_like_dict import ObjectLikeDict + +class LightweightJobInfo: + """Lightweight job object containing only the fields needed for GroupedJob processing.""" + def __init__(self, name: str, namespace: str): + self.name = name + self.namespace = namespace + + from . import config_patch as _ logger = logging.getLogger("krr") @@ -261,6 +269,82 @@ def _should_list_resource(self, resource: str) -> bool: return True return resource in settings.resources + def _is_job_owned_by_cronjob(self, job: V1Job) -> bool: + """Check if a job is owned by a CronJob.""" + return any(owner.kind == "CronJob" for owner in job.metadata.owner_references or []) + + def _is_job_grouped(self, job: V1Job) -> bool: + """Check if a job has any of the grouping labels.""" + if not settings.job_grouping_labels or not job.metadata.labels: + return False + return any(label in job.metadata.labels for label in settings.job_grouping_labels) + + async def _list_namespaced_or_global_objects_batched( + self, + kind: KindLiteral, + all_namespaces_request: Callable, + namespaced_request: Callable, + namespace: str, + limit: Optional[int] = None, + continue_ref: Optional[str] = None, + ) -> tuple[list[Any], Optional[str]]: + logger.debug("Listing %s in %s with batching (limit=%d)", kind, self.cluster, limit) + loop = asyncio.get_running_loop() + + try: + if namespace == "*": + requests = [ + loop.run_in_executor( + self.executor, + lambda: all_namespaces_request( + watch=False, + label_selector=settings.selector, + limit=limit, + _continue=continue_ref, + ), + ) + ] + else: + requests = [ + loop.run_in_executor( + self.executor, + lambda: namespaced_request( + namespace=namespace, + watch=False, + label_selector=settings.selector, + limit=limit, + _continue=continue_ref, + ), + ) ] + + gathered_results = await asyncio.gather(*requests) + + result = [ + item + for request_result in gathered_results + for item in request_result.items + ] + + next_continue_ref = None + if gathered_results: + next_continue_ref = getattr(gathered_results[0].metadata, '_continue', None) + + return result, next_continue_ref + + except ApiException as e: + if e.status == 410 and e.body: + # Continue token expired + import json + try: + error_body = json.loads(e.body) + new_continue_token = error_body.get("metadata", {}).get("continue") + if new_continue_token: + logger.info("Continue token expired for jobs listing. Continuing") + return [], new_continue_token + except (json.JSONDecodeError, KeyError): + pass + raise + async def _list_namespaced_or_global_objects( self, kind: KindLiteral, @@ -458,26 +542,57 @@ def _list_all_daemon_set(self) -> list[K8sObjectData]: extract_containers=lambda item: item.spec.template.spec.containers, ) - def _list_all_jobs(self) -> list[K8sObjectData]: - def filter_jobs(item): - # Skip jobs owned by CronJobs - if any(owner.kind == "CronJob" for owner in item.metadata.owner_references or []): - return False + + async def _list_all_jobs(self) -> list[K8sObjectData]: + """List all jobs using batched loading with 500 batch size.""" + if not self._should_list_resource("Job"): + return [] + + namespaces = self.namespaces if self.namespaces != "*" else ["*"] + all_jobs = [] + try: + batch_count = 0 + for namespace in namespaces: + continue_ref: Optional[str] = None + while batch_count < settings.discovery_job_max_batches: + jobs_batch, next_continue_ref = await self._list_namespaced_or_global_objects_batched( + kind="Job", + all_namespaces_request=self.batch.list_job_for_all_namespaces, + namespaced_request=self.batch.list_namespaced_job, + namespace=namespace, + limit=settings.discovery_job_batch_size, + continue_ref=continue_ref, + ) + continue_ref = next_continue_ref + + if not jobs_batch and continue_ref: + # refreshed continue token, count error batches + batch_count += 1 + continue + if not jobs_batch: + # no more jobs to batch do not count empty batches + break + + batch_count += 1 + for job in jobs_batch: + if self._is_job_owned_by_cronjob(job): + continue + if self._is_job_grouped(job): + continue + for container in job.spec.template.spec.containers: + all_jobs.append(self.__build_scannable_object(job, container, "Job")) + if not continue_ref: + break - # Skip jobs that have any of the grouping labels (they will be handled by GroupedJob) - if settings.job_grouping_labels and item.metadata.labels: - if any(label in item.metadata.labels for label in settings.job_grouping_labels): - return False + logger.debug("Found %d regular jobs", len(all_jobs)) + return all_jobs - return True - - return self._list_scannable_objects( - kind="Job", - all_namespaces_request=self.batch.list_job_for_all_namespaces, - namespaced_request=self.batch.list_namespaced_job, - extract_containers=lambda item: item.spec.template.spec.containers, - filter_workflows=filter_jobs, - ) + except Exception as e: + logger.error( + "Failed to run jobs discovery", + exc_info=True, + ) + return all_jobs def _list_all_cronjobs(self) -> list[K8sObjectData]: return self._list_scannable_objects( @@ -497,42 +612,82 @@ async def _list_all_groupedjobs(self) -> list[K8sObjectData]: logger.debug("Skipping GroupedJob in cluster") return [] - logger.debug(f"Listing GroupedJobs with grouping labels: {settings.job_grouping_labels}") - - # Get all jobs that have any of the grouping labels - all_jobs = await self._list_namespaced_or_global_objects( - kind="Job", - all_namespaces_request=self.batch.list_job_for_all_namespaces, - namespaced_request=self.batch.list_namespaced_job, - ) + logger.debug("Listing GroupedJobs with grouping labels: %s", settings.job_grouping_labels) grouped_jobs = defaultdict(list) - for job in all_jobs: - if (job.metadata.labels and - not any(owner.kind == "CronJob" for owner in job.metadata.owner_references or [])): - - for label_name in settings.job_grouping_labels: - if label_name in job.metadata.labels: - label_value = job.metadata.labels[label_name] - group_key = f"{label_name}={label_value}" - grouped_jobs[group_key].append(job) + grouped_jobs_template = {} # Store only ONE full job as template per group - needed for class K8sObjectData + continue_ref: Optional[str] = None + batch_count = 0 + namespaces = self.namespaces if self.namespaces != "*" else ["*"] + try: + batch_count = 0 + for namespace in namespaces: + continue_ref = None + while batch_count < settings.discovery_job_max_batches: + jobs_batch, next_continue_ref = await self._list_namespaced_or_global_objects_batched( + kind="Job", + all_namespaces_request=self.batch.list_job_for_all_namespaces, + namespaced_request=self.batch.list_namespaced_job, + namespace=namespace, + limit=settings.discovery_job_batch_size, + continue_ref=continue_ref, + ) + + continue_ref = next_continue_ref + + if not jobs_batch and continue_ref: + # refreshed continue token, count error batches + batch_count += 1 + continue + if not jobs_batch: + # no more jobs to batch do not count empty batches + break + + batch_count += 1 + for job in jobs_batch: + if not job.metadata.labels or self._is_job_owned_by_cronjob(job) or not self._is_job_grouped(job): + continue + for label_name in settings.job_grouping_labels: + if label_name not in job.metadata.labels: + continue + # label_name is value of grouped job label + label_value = job.metadata.labels[label_name] + group_key = f"{label_name}={label_value}" + lightweight_job = LightweightJobInfo( + name=job.metadata.name, + namespace=job.metadata.namespace + ) + # Store lightweight job info only for grouped jobs + grouped_jobs[group_key].append(lightweight_job) + # Keep only ONE full job as template per group + if group_key not in grouped_jobs_template: + grouped_jobs_template[group_key] = job + if not continue_ref: + break + + except Exception as e: + logger.error( + "Failed to run grouped jobs discovery", + exc_info=True, + ) + raise result = [] for group_name, jobs in grouped_jobs.items(): + template_job = grouped_jobs_template[group_name] + jobs_by_namespace = defaultdict(list) for job in jobs: - jobs_by_namespace[job.metadata.namespace].append(job) + jobs_by_namespace[job.namespace].append(job) for namespace, namespace_jobs in jobs_by_namespace.items(): limited_jobs = namespace_jobs[:settings.job_grouping_limit] container_names = set() - for job in limited_jobs: - for container in job.spec.template.spec.containers: - container_names.add(container.name) + for container in template_job.spec.template.spec.containers: + container_names.add(container.name) for container_name in container_names: - template_job = limited_jobs[0] template_container = None for container in template_job.spec.template.spec.containers: if container.name == container_name: diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py index 2cab35a4..eb8c1933 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py @@ -351,7 +351,7 @@ async def load_pods(self, object: K8sObjectData, period: timedelta) -> list[PodD del jobs elif object.kind == "GroupedJob": if hasattr(object._api_resource, '_grouped_jobs'): - pod_owners = [job.metadata.name for job in object._api_resource._grouped_jobs] + pod_owners = [job.name for job in object._api_resource._grouped_jobs] pod_owner_kind = "Job" else: pod_owners = [object.name] diff --git a/robusta_krr/core/models/config.py b/robusta_krr/core/models/config.py index 62616c34..f131a971 100644 --- a/robusta_krr/core/models/config.py +++ b/robusta_krr/core/models/config.py @@ -53,6 +53,10 @@ class Config(pd.BaseSettings): # Threading settings max_workers: int = pd.Field(6, ge=1) + # Discovery settings + discovery_job_batch_size: int = pd.Field(1000, ge=1, description="Batch size for Kubernetes job API calls") + discovery_job_max_batches: int = pd.Field(50, ge=1, description="Maximum number of job batches to process to prevent infinite loops") + # Job grouping settings job_grouping_labels: Union[list[str], str, None] = pd.Field(None, description="Label name(s) to use for grouping jobs into GroupedJob workload type") job_grouping_limit: int = pd.Field(500, ge=1, description="Maximum number of jobs/pods to query per GroupedJob group") diff --git a/robusta_krr/main.py b/robusta_krr/main.py index 8604b20b..6eda717c 100644 --- a/robusta_krr/main.py +++ b/robusta_krr/main.py @@ -232,6 +232,18 @@ def run_strategy( help="Maximum number of jobs/pods to query per GroupedJob group (default: 500).", rich_help_panel="Job Grouping Settings", ), + discovery_job_batch_size: int = typer.Option( + 5000, + "--discovery-job-batch-size", + help="Batch size for Kubernetes job API calls (default: 5000).", + rich_help_panel="Job Discovery Settings", + ), + discovery_job_max_batches: int = typer.Option( + 100, + "--discovery-job-max-batches", + help="Maximum number of job batches to process to prevent infinite loops (default: 100).", + rich_help_panel="Job Discovery Settings", + ), format: str = typer.Option( "table", "--formatter", @@ -371,6 +383,8 @@ def run_strategy( max_workers=max_workers, job_grouping_labels=job_grouping_labels, job_grouping_limit=job_grouping_limit, + discovery_job_batch_size=discovery_job_batch_size, + discovery_job_max_batches=discovery_job_max_batches, format=format, show_cluster_name=show_cluster_name, verbose=verbose, diff --git a/tests/test_grouped_jobs.py b/tests/test_grouped_jobs.py index 283f6fa9..6d882691 100644 --- a/tests/test_grouped_jobs.py +++ b/tests/test_grouped_jobs.py @@ -10,9 +10,13 @@ def mock_config(): config = MagicMock(spec=Config) config.job_grouping_labels = ["app", "team"] config.job_grouping_limit = 3 # Small limit for testing + config.discovery_job_batch_size = 1000 + config.discovery_job_max_batches = 50 config.max_workers = 4 config.get_kube_client = MagicMock() config.resources = "*" + config.selector = None + config.namespaces = "*" # Add namespaces setting return config @@ -23,7 +27,14 @@ def mock_kubernetes_loader(mock_config): loader = ClusterLoader() loader.batch = MagicMock() loader.core = MagicMock() + + # Mock executor to return a proper Future + from concurrent.futures import Future + mock_future = Future() + mock_future.set_result(None) # Set a dummy result loader.executor = MagicMock() + loader.executor.submit.return_value = mock_future + loader._ClusterLoader__hpa_list = {} # type: ignore # needed for mock return loader @@ -60,16 +71,24 @@ async def test_list_all_groupedjobs_with_limit(mock_kubernetes_loader, mock_conf create_mock_job("job-9", "default", {"app": "backend"}), # This should be excluded ] - # Mock the _list_namespaced_or_global_objects method - mock_kubernetes_loader._list_namespaced_or_global_objects = AsyncMock(return_value=mock_jobs) + # Mock the _list_namespaced_or_global_objects_batched method + async def mock_batched_method(*args, **kwargs): + # Create mock response objects that have the expected structure + mock_response = MagicMock() + mock_response.items = mock_jobs + mock_response.metadata = MagicMock() + mock_response.metadata._continue = None + return (mock_jobs, None) # Return (jobs, continue_token) + mock_kubernetes_loader._list_namespaced_or_global_objects_batched = mock_batched_method # Mock the __build_scannable_object method def mock_build_scannable_object(item, container, kind): obj = MagicMock() obj._api_resource = MagicMock() + obj.container = container.name return obj - mock_kubernetes_loader._KubernetesLoader__build_scannable_object = mock_build_scannable_object + mock_kubernetes_loader._ClusterLoader__build_scannable_object = mock_build_scannable_object # Patch the settings to use our mock config with patch("robusta_krr.core.integrations.kubernetes.settings", mock_config): @@ -88,23 +107,23 @@ def mock_build_scannable_object(item, container, kind): assert frontend_objects[0].namespace == "default" assert frontend_objects[0].container == "main-container" - # Verify we got 1 backend object (one per unique container name) + # Verify we got 1 backend object assert len(backend_objects) == 1 assert backend_objects[0].namespace == "default" assert backend_objects[0].container == "main-container" - # Verify all objects in each group have the same grouped_jobs list + # Verify all objects in each group have lightweight job info frontend_grouped_jobs = frontend_objects[0]._api_resource._grouped_jobs assert len(frontend_grouped_jobs) == 3 - assert frontend_grouped_jobs[0].metadata.name == "job-1" - assert frontend_grouped_jobs[1].metadata.name == "job-2" - assert frontend_grouped_jobs[2].metadata.name == "job-3" + assert frontend_grouped_jobs[0].name == "job-1" + assert frontend_grouped_jobs[1].name == "job-2" + assert frontend_grouped_jobs[2].name == "job-3" backend_grouped_jobs = backend_objects[0]._api_resource._grouped_jobs assert len(backend_grouped_jobs) == 3 - assert backend_grouped_jobs[0].metadata.name == "job-6" - assert backend_grouped_jobs[1].metadata.name == "job-7" - assert backend_grouped_jobs[2].metadata.name == "job-8" + assert backend_grouped_jobs[0].name == "job-6" + assert backend_grouped_jobs[1].name == "job-7" + assert backend_grouped_jobs[2].name == "job-8" @pytest.mark.asyncio @@ -119,14 +138,22 @@ async def test_list_all_groupedjobs_with_different_namespaces(mock_kubernetes_lo create_mock_job("job-4", "namespace-2", {"app": "frontend"}), ] - mock_kubernetes_loader._list_namespaced_or_global_objects = AsyncMock(return_value=mock_jobs) + async def mock_batched_method(*args, **kwargs): + # Create mock response objects that have the expected structure + mock_response = MagicMock() + mock_response.items = mock_jobs + mock_response.metadata = MagicMock() + mock_response.metadata._continue = None + return (mock_jobs, None) # Return (jobs, continue_token) + mock_kubernetes_loader._list_namespaced_or_global_objects_batched = mock_batched_method def mock_build_scannable_object(item, container, kind): obj = MagicMock() obj._api_resource = MagicMock() + obj.container = container.name return obj - mock_kubernetes_loader._KubernetesLoader__build_scannable_object = mock_build_scannable_object + mock_kubernetes_loader._ClusterLoader__build_scannable_object = mock_build_scannable_object # Patch the settings to use our mock config with patch("robusta_krr.core.integrations.kubernetes.settings", mock_config): @@ -166,14 +193,22 @@ async def test_list_all_groupedjobs_with_cronjob_owner_reference(mock_kubernetes # Add CronJob owner reference to the second job mock_jobs[1].metadata.owner_references = [MagicMock(kind="CronJob")] - mock_kubernetes_loader._list_namespaced_or_global_objects = AsyncMock(return_value=mock_jobs) + async def mock_batched_method(*args, **kwargs): + # Create mock response objects that have the expected structure + mock_response = MagicMock() + mock_response.items = mock_jobs + mock_response.metadata = MagicMock() + mock_response.metadata._continue = None + return (mock_jobs, None) # Return (jobs, continue_token) + mock_kubernetes_loader._list_namespaced_or_global_objects_batched = mock_batched_method def mock_build_scannable_object(item, container, kind): obj = MagicMock() obj._api_resource = MagicMock() + obj.container = container.name # Set the actual container name return obj - mock_kubernetes_loader._KubernetesLoader__build_scannable_object = mock_build_scannable_object + mock_kubernetes_loader._ClusterLoader__build_scannable_object = mock_build_scannable_object # Patch the settings to use our mock config with patch("robusta_krr.core.integrations.kubernetes.settings", mock_config): @@ -185,7 +220,7 @@ def mock_build_scannable_object(item, container, kind): obj = result[0] assert obj.name == "app=frontend" assert len(obj._api_resource._grouped_jobs) == 1 - assert obj._api_resource._grouped_jobs[0].metadata.name == "job-1" + assert obj._api_resource._grouped_jobs[0].name == "job-1" @pytest.mark.asyncio @@ -212,14 +247,22 @@ async def test_list_all_groupedjobs_multiple_labels(mock_kubernetes_loader, mock create_mock_job("job-3", "default", {"app": "api"}), ] - mock_kubernetes_loader._list_namespaced_or_global_objects = AsyncMock(return_value=mock_jobs) + async def mock_batched_method(*args, **kwargs): + # Create mock response objects that have the expected structure + mock_response = MagicMock() + mock_response.items = mock_jobs + mock_response.metadata = MagicMock() + mock_response.metadata._continue = None + return (mock_jobs, None) # Return (jobs, continue_token) + mock_kubernetes_loader._list_namespaced_or_global_objects_batched = mock_batched_method def mock_build_scannable_object(item, container, kind): obj = MagicMock() obj._api_resource = MagicMock() + obj.container = container.name return obj - mock_kubernetes_loader._KubernetesLoader__build_scannable_object = mock_build_scannable_object + mock_kubernetes_loader._ClusterLoader__build_scannable_object = mock_build_scannable_object # Patch the settings to use our mock config with patch("robusta_krr.core.integrations.kubernetes.settings", mock_config): @@ -236,3 +279,121 @@ def mock_build_scannable_object(item, container, kind): # Verify all objects have the same container name assert all(obj.container == "main-container" for obj in result) + + +@pytest.mark.asyncio +async def test_list_all_groupedjobs_job_in_multiple_groups(mock_kubernetes_loader, mock_config): + """Test that a job with multiple grouping labels is added to all matching groups""" + + # Create a job that matches multiple grouping labels + mock_jobs = [ + create_mock_job("job-1", "default", {"app": "frontend", "team": "web"}), + create_mock_job("job-2", "default", {"app": "backend", "team": "api"}), + ] + + async def mock_batched_method(*args, **kwargs): + # Create mock response objects that have the expected structure + mock_response = MagicMock() + mock_response.items = mock_jobs + mock_response.metadata = MagicMock() + mock_response.metadata._continue = None + return (mock_jobs, None) # Return (jobs, continue_token) + mock_kubernetes_loader._list_namespaced_or_global_objects_batched = mock_batched_method + + def mock_build_scannable_object(item, container, kind): + obj = MagicMock() + obj._api_resource = MagicMock() + obj.container = container.name + return obj + + mock_kubernetes_loader._ClusterLoader__build_scannable_object = mock_build_scannable_object + + # Patch the settings to use our mock config + with patch("robusta_krr.core.integrations.kubernetes.settings", mock_config): + # Call the method + result = await mock_kubernetes_loader._list_all_groupedjobs() + + # Verify we got 4 objects (2 jobs × 2 labels each = 4 groups) + assert len(result) == 4 + + group_names = {g.name for g in result} + assert "app=frontend" in group_names + assert "app=backend" in group_names + assert "team=web" in group_names + assert "team=api" in group_names + + # Find each group and verify it contains the correct job + frontend_group = next(g for g in result if g.name == "app=frontend") + backend_group = next(g for g in result if g.name == "app=backend") + web_group = next(g for g in result if g.name == "team=web") + api_group = next(g for g in result if g.name == "team=api") + + # Verify job-1 is in both app=frontend and team=web groups + assert len(frontend_group._api_resource._grouped_jobs) == 1 + assert frontend_group._api_resource._grouped_jobs[0].name == "job-1" + + assert len(web_group._api_resource._grouped_jobs) == 1 + assert web_group._api_resource._grouped_jobs[0].name == "job-1" + + # Verify job-2 is in both app=backend and team=api groups + assert len(backend_group._api_resource._grouped_jobs) == 1 + assert backend_group._api_resource._grouped_jobs[0].name == "job-2" + + assert len(api_group._api_resource._grouped_jobs) == 1 + assert api_group._api_resource._grouped_jobs[0].name == "job-2" + + +@pytest.mark.asyncio +async def test_groupedjobs_respects_global_batch_limit_across_namespaces(mock_kubernetes_loader, mock_config): + """Verify batch_count is global: when limit reached, subsequent namespaces are not processed.""" + mock_config.namespaces = ["ns-1", "ns-2"] + mock_config.discovery_job_max_batches = 1 + + calls = [] + + async def mock_batched_method(*args, **kwargs): + calls.append(kwargs.get("namespace")) + # return empty batch and no continue token + return ([], None) + + mock_kubernetes_loader._list_namespaced_or_global_objects_batched = mock_batched_method + + with patch("robusta_krr.core.integrations.kubernetes.settings", mock_config): + result = await mock_kubernetes_loader._list_all_groupedjobs() + + # No results; with empty batches not counted, both namespaces are attempted + assert result == [] + assert calls == ["ns-1", "ns-2"] + + +@pytest.mark.asyncio +async def test_groupedjobs_calls_each_explicit_namespace(mock_kubernetes_loader, mock_config): + """Ensure explicit namespaces list is iterated and passed to the batched call.""" + mock_config.namespaces = ["namespace-a", "namespace-b"] + mock_config.discovery_job_max_batches = 10 + + seen_namespaces = [] + + # Return one simple job per call and terminate with no continue token + async def mock_batched_method(*args, **kwargs): + ns = kwargs.get("namespace") + seen_namespaces.append(ns) + job = create_mock_job("job-1", ns if ns != "*" else "default", {"app": "frontend"}) + return ([job], None) + + mock_kubernetes_loader._list_namespaced_or_global_objects_batched = mock_batched_method + + def mock_build_scannable_object(item, container, kind): + obj = MagicMock() + obj._api_resource = MagicMock() + obj.container = container.name + return obj + + mock_kubernetes_loader._ClusterLoader__build_scannable_object = mock_build_scannable_object + + with patch("robusta_krr.core.integrations.kubernetes.settings", mock_config): + result = await mock_kubernetes_loader._list_all_groupedjobs() + + # We expect one grouped object per namespace + assert len(result) == 2 + assert set(seen_namespaces) == {"namespace-a", "namespace-b"} diff --git a/tests/test_grouped_jobs_metrics_logic.py b/tests/test_grouped_jobs_metrics_logic.py new file mode 100644 index 00000000..e8e534f2 --- /dev/null +++ b/tests/test_grouped_jobs_metrics_logic.py @@ -0,0 +1,277 @@ +import pytest +from unittest.mock import MagicMock, patch +from datetime import timedelta + +from robusta_krr.core.integrations.kubernetes import LightweightJobInfo + + +def test_grouped_job_extracts_job_names(): + """Test that GroupedJob objects correctly expose job names for metrics queries""" + + # Create a mock GroupedJob object + grouped_job = MagicMock() + grouped_job.kind = "GroupedJob" + grouped_job.name = "app=frontend" + grouped_job.namespace = "default" + grouped_job.container = "main-container" + + # Mock the API resource with lightweight job info + grouped_job._api_resource = MagicMock() + grouped_job._api_resource._grouped_jobs = [ + LightweightJobInfo(name="job-1", namespace="default"), + LightweightJobInfo(name="job-2", namespace="default"), + LightweightJobInfo(name="job-3", namespace="default"), + ] + grouped_job._api_resource._label_filter = "app=frontend" + + # Test the logic that would be used in PrometheusMetricsService.load_pods + if grouped_job.kind == "GroupedJob": + if hasattr(grouped_job._api_resource, '_grouped_jobs'): + pod_owners = [job.name for job in grouped_job._api_resource._grouped_jobs] + pod_owner_kind = "Job" + else: + pod_owners = [grouped_job.name] + pod_owner_kind = grouped_job.kind + + # Verify the extracted job names + assert pod_owners == ["job-1", "job-2", "job-3"] + assert pod_owner_kind == "Job" + + # Verify the namespace + assert grouped_job.namespace == "default" + + +def test_grouped_job_with_different_namespaces(): + """Test that GroupedJob objects in different namespaces are handled correctly""" + + # Create grouped jobs in different namespaces + grouped_job_ns1 = MagicMock() + grouped_job_ns1.kind = "GroupedJob" + grouped_job_ns1.name = "app=frontend" + grouped_job_ns1.namespace = "namespace-1" + grouped_job_ns1._api_resource = MagicMock() + grouped_job_ns1._api_resource._grouped_jobs = [ + LightweightJobInfo(name="job-1", namespace="namespace-1"), + ] + + grouped_job_ns2 = MagicMock() + grouped_job_ns2.kind = "GroupedJob" + grouped_job_ns2.name = "app=frontend" + grouped_job_ns2.namespace = "namespace-2" + grouped_job_ns2._api_resource = MagicMock() + grouped_job_ns2._api_resource._grouped_jobs = [ + LightweightJobInfo(name="job-2", namespace="namespace-2"), + ] + + # Test the logic for both namespaces + for grouped_job in [grouped_job_ns1, grouped_job_ns2]: + if grouped_job.kind == "GroupedJob": + if hasattr(grouped_job._api_resource, '_grouped_jobs'): + pod_owners = [job.name for job in grouped_job._api_resource._grouped_jobs] + pod_owner_kind = "Job" + else: + pod_owners = [grouped_job.name] + pod_owner_kind = grouped_job.kind + + # Verify namespace-specific results + if grouped_job.namespace == "namespace-1": + assert pod_owners == ["job-1"] + assert grouped_job.namespace == "namespace-1" + elif grouped_job.namespace == "namespace-2": + assert pod_owners == ["job-2"] + assert grouped_job.namespace == "namespace-2" + + +def test_grouped_job_prometheus_query_construction(): + """Test that the Prometheus query is constructed correctly for GroupedJob""" + + # Create a mock GroupedJob object + grouped_job = MagicMock() + grouped_job.kind = "GroupedJob" + grouped_job.name = "app=frontend" + grouped_job.namespace = "default" + + # Mock the API resource with lightweight job info + grouped_job._api_resource = MagicMock() + grouped_job._api_resource._grouped_jobs = [ + LightweightJobInfo(name="job-1", namespace="default"), + LightweightJobInfo(name="job-2", namespace="default"), + LightweightJobInfo(name="job-3", namespace="default"), + ] + + # Simulate the logic from PrometheusMetricsService.load_pods + if grouped_job.kind == "GroupedJob": + if hasattr(grouped_job._api_resource, '_grouped_jobs'): + pod_owners = [job.name for job in grouped_job._api_resource._grouped_jobs] + pod_owner_kind = "Job" + else: + pod_owners = [grouped_job.name] + pod_owner_kind = grouped_job.kind + + # Construct the Prometheus query (simplified version) + owners_regex = "|".join(pod_owners) + cluster_label = "" # Simplified for testing + + expected_query = f""" + last_over_time( + kube_pod_owner{{ + owner_name=~"{owners_regex}", + owner_kind="{pod_owner_kind}", + namespace="{grouped_job.namespace}" + {cluster_label} + }}[1h] + ) + """ + + # Verify the query contains the expected elements + assert "job-1|job-2|job-3" in expected_query + assert 'owner_kind="Job"' in expected_query + assert 'namespace="default"' in expected_query + assert "kube_pod_owner" in expected_query + + +def test_grouped_job_batched_queries(): + """Test that batched queries are handled correctly for many jobs""" + + # Create a grouped job with many lightweight jobs + grouped_job = MagicMock() + grouped_job.kind = "GroupedJob" + grouped_job.name = "app=frontend" + grouped_job.namespace = "default" + + # Create 150 jobs (more than typical batch size of 100) + many_jobs = [LightweightJobInfo(name=f"job-{i}", namespace="default") for i in range(150)] + grouped_job._api_resource = MagicMock() + grouped_job._api_resource._grouped_jobs = many_jobs + + # Extract job names + if grouped_job.kind == "GroupedJob": + if hasattr(grouped_job._api_resource, '_grouped_jobs'): + pod_owners = [job.name for job in grouped_job._api_resource._grouped_jobs] + pod_owner_kind = "Job" + + # Simulate batching logic + batch_size = 100 + batches = [] + for i in range(0, len(pod_owners), batch_size): + batch = pod_owners[i:i + batch_size] + batches.append(batch) + + # Verify batching + assert len(batches) == 2 # 150 jobs split into 2 batches + assert len(batches[0]) == 100 # First batch has 100 jobs + assert len(batches[1]) == 50 # Second batch has 50 jobs + + # Verify all job names are included + all_batched_jobs = [job for batch in batches for job in batch] + assert len(all_batched_jobs) == 150 + assert all_batched_jobs == [f"job-{i}" for i in range(150)] + + +def test_grouped_job_fallback_logic(): + """Test the fallback logic when _grouped_jobs is not available""" + + # Create a GroupedJob without _grouped_jobs + grouped_job = MagicMock() + grouped_job.kind = "GroupedJob" + grouped_job.name = "app=frontend" + grouped_job.namespace = "default" + + # Create a mock API resource that doesn't have _grouped_jobs + api_resource = MagicMock() + # Explicitly remove the _grouped_jobs attribute + del api_resource._grouped_jobs + grouped_job._api_resource = api_resource + + # Test the fallback logic + if grouped_job.kind == "GroupedJob": + if hasattr(grouped_job._api_resource, '_grouped_jobs'): + pod_owners = [job.name for job in grouped_job._api_resource._grouped_jobs] + pod_owner_kind = "Job" + else: + pod_owners = [grouped_job.name] + pod_owner_kind = grouped_job.kind + + # Verify fallback behavior + assert pod_owners == ["app=frontend"] + assert pod_owner_kind == "GroupedJob" + + +def test_lightweight_job_info_structure(): + """Test that LightweightJobInfo has the correct structure""" + + # Create a LightweightJobInfo instance + job_info = LightweightJobInfo(name="test-job", namespace="test-namespace") + + # Verify the structure + assert job_info.name == "test-job" + assert job_info.namespace == "test-namespace" + + # Verify it's a simple data class + assert hasattr(job_info, 'name') + assert hasattr(job_info, 'namespace') + + # Test that it can be used in list comprehensions + job_infos = [ + LightweightJobInfo(name="job-1", namespace="default"), + LightweightJobInfo(name="job-2", namespace="default"), + ] + + job_names = [job.name for job in job_infos] + assert job_names == ["job-1", "job-2"] + + namespaces = [job.namespace for job in job_infos] + assert namespaces == ["default", "default"] + + +def test_grouped_job_multiple_groups_metrics_extraction(): + """Test that jobs appearing in multiple groups work correctly for metrics extraction""" + + # Create a job that appears in multiple groups + grouped_job_app = MagicMock() + grouped_job_app.kind = "GroupedJob" + grouped_job_app.name = "app=frontend" + grouped_job_app.namespace = "default" + grouped_job_app._api_resource = MagicMock() + grouped_job_app._api_resource._grouped_jobs = [ + LightweightJobInfo(name="job-1", namespace="default"), + LightweightJobInfo(name="job-2", namespace="default"), + ] + + grouped_job_team = MagicMock() + grouped_job_team.kind = "GroupedJob" + grouped_job_team.name = "team=web" + grouped_job_team.namespace = "default" + grouped_job_team._api_resource = MagicMock() + grouped_job_team._api_resource._grouped_jobs = [ + LightweightJobInfo(name="job-1", namespace="default"), # Same job appears in both groups + LightweightJobInfo(name="job-3", namespace="default"), + ] + + # Test metrics extraction for both groups + for grouped_job in [grouped_job_app, grouped_job_team]: + if grouped_job.kind == "GroupedJob": + if hasattr(grouped_job._api_resource, '_grouped_jobs'): + pod_owners = [job.name for job in grouped_job._api_resource._grouped_jobs] + pod_owner_kind = "Job" + else: + pod_owners = [grouped_job.name] + pod_owner_kind = grouped_job.kind + + # Verify the extracted job names + if grouped_job.name == "app=frontend": + assert pod_owners == ["job-1", "job-2"] + elif grouped_job.name == "team=web": + assert pod_owners == ["job-1", "job-3"] + + assert pod_owner_kind == "Job" + assert grouped_job.namespace == "default" + + # Verify that job-1 appears in both groups (this is the key behavior we're testing) + app_job_names = [job.name for job in grouped_job_app._api_resource._grouped_jobs] + team_job_names = [job.name for job in grouped_job_team._api_resource._grouped_jobs] + + assert "job-1" in app_job_names + assert "job-1" in team_job_names + assert len(app_job_names) == 2 + assert len(team_job_names) == 2