From b0172bc2ca8b67bc15d42c4d7960a795915f8e3d Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Mon, 5 Jan 2026 21:10:54 +0800 Subject: [PATCH 01/29] [historyserver] Fix getJobID for job event collection Signed-off-by: Future-Outlier --- historyserver/config/raycluster.yaml | 8 +++-- historyserver/config/rayjob.yaml | 34 ++++++++++++++++++- .../pkg/collector/eventserver/eventserver.go | 18 ++++++++++ 3 files changed, 57 insertions(+), 3 deletions(-) diff --git a/historyserver/config/raycluster.yaml b/historyserver/config/raycluster.yaml index d8eaaeed479..242968eb740 100644 --- a/historyserver/config/raycluster.yaml +++ b/historyserver/config/raycluster.yaml @@ -20,6 +20,8 @@ spec: affinity: containers: - env: + - name: RAY_enable_ray_event + value: "true" - name: RAY_enable_core_worker_ray_event_to_aggregator value: "1" - name: RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR @@ -105,10 +107,10 @@ spec: workerGroupSpecs: - groupName: cpu maxReplicas: 1000 - minReplicas: 0 + minReplicas: 1 numOfHosts: 1 rayStartParams: {} - replicas: 0 + replicas: 1 template: metadata: labels: @@ -117,6 +119,8 @@ spec: imagePullSecrets: containers: - env: + - name: RAY_enable_ray_event + value: "true" - name: RAY_enable_core_worker_ray_event_to_aggregator value: "1" - name: RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR diff --git a/historyserver/config/rayjob.yaml b/historyserver/config/rayjob.yaml index 2c7119f8285..dc5c5dbf041 100644 --- a/historyserver/config/rayjob.yaml +++ b/historyserver/config/rayjob.yaml @@ -3,7 +3,39 @@ kind: RayJob metadata: name: rayjob spec: - entrypoint: python -c "import ray; ray.init(); print(ray.cluster_resources())" + entrypoint: | + python -c " + import ray + ray.init() + + @ray.remote + def my_task(x): + return x * 2 + + @ray.remote + class Counter: + def __init__(self): + self.count = 0 + + def increment(self): + self.count += 1 + return self.count + + def get_count(self): + return self.count + + task_result = ray.get(my_task.remote(1)) + print(f'Task result: {task_result}') + + counter = Counter.remote() + for i in range(1): + count = ray.get(counter.increment.remote()) + print(f'Counter: {count}') + + final_count = ray.get(counter.get_count.remote()) + print(f'Final count: {final_count}') + print(f'Cluster resources: {ray.cluster_resources()}') + " # Select the existing Ray cluster running the collector. clusterSelector: ray.io/cluster: raycluster-historyserver diff --git a/historyserver/pkg/collector/eventserver/eventserver.go b/historyserver/pkg/collector/eventserver/eventserver.go index b0143c63c96..f5c4ef621c2 100644 --- a/historyserver/pkg/collector/eventserver/eventserver.go +++ b/historyserver/pkg/collector/eventserver/eventserver.go @@ -411,6 +411,24 @@ func (es *EventServer) getJobID(eventData map[string]interface{}) string { if jobID, hasJob := eventData["jobId"]; hasJob && jobID != "" { return fmt.Sprintf("%v", jobID) } + + eventTypesWithJobID := []string{ + "driverJobDefinitionEvent", + "driverJobLifecycleEvent", + "taskDefinitionEvent", + "taskLifecycleEvent", + "actorTaskDefinitionEvent", + "actorDefinitionEvent", + } + + for _, eventType := range eventTypesWithJobID { + if nestedEvent, ok := eventData[eventType].(map[string]interface{}); ok { + if jobID, hasJob := nestedEvent["jobId"]; hasJob && jobID != "" { + return fmt.Sprintf("%v", jobID) + } + } + } + return "" } From eeb7a7fca4406908a693cc06a216a72863f7a3eb Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Mon, 5 Jan 2026 21:44:58 +0800 Subject: [PATCH 02/29] add jia-wei as co-author, since he debug with me together Signed-off-by: Future-Outlier Co-authored-by: Jia-Wei Jiang From 0ff5bcffb22598d9895be3b42cca8737ac85dd5f Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Mon, 5 Jan 2026 21:52:07 +0800 Subject: [PATCH 03/29] remove unused code Signed-off-by: Future-Outlier --- historyserver/pkg/collector/eventserver/eventserver.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/historyserver/pkg/collector/eventserver/eventserver.go b/historyserver/pkg/collector/eventserver/eventserver.go index f5c4ef621c2..e6e4a0b79f7 100644 --- a/historyserver/pkg/collector/eventserver/eventserver.go +++ b/historyserver/pkg/collector/eventserver/eventserver.go @@ -408,10 +408,6 @@ func (es *EventServer) isNodeEvent(eventData map[string]interface{}) bool { // getJobID gets jobID associated with event func (es *EventServer) getJobID(eventData map[string]interface{}) string { - if jobID, hasJob := eventData["jobId"]; hasJob && jobID != "" { - return fmt.Sprintf("%v", jobID) - } - eventTypesWithJobID := []string{ "driverJobDefinitionEvent", "driverJobLifecycleEvent", From 5aea3052bc3373835d81b69f59d4c867272f6217 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Mon, 5 Jan 2026 22:11:14 +0800 Subject: [PATCH 04/29] update rueian's advice Signed-off-by: Future-Outlier --- .../pkg/collector/eventserver/eventserver.go | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/historyserver/pkg/collector/eventserver/eventserver.go b/historyserver/pkg/collector/eventserver/eventserver.go index e6e4a0b79f7..19274b78125 100644 --- a/historyserver/pkg/collector/eventserver/eventserver.go +++ b/historyserver/pkg/collector/eventserver/eventserver.go @@ -408,17 +408,8 @@ func (es *EventServer) isNodeEvent(eventData map[string]interface{}) bool { // getJobID gets jobID associated with event func (es *EventServer) getJobID(eventData map[string]interface{}) string { - eventTypesWithJobID := []string{ - "driverJobDefinitionEvent", - "driverJobLifecycleEvent", - "taskDefinitionEvent", - "taskLifecycleEvent", - "actorTaskDefinitionEvent", - "actorDefinitionEvent", - } - - for _, eventType := range eventTypesWithJobID { - if nestedEvent, ok := eventData[eventType].(map[string]interface{}); ok { + for _, value := range eventData { + if nestedEvent, ok := value.(map[string]interface{}); ok { if jobID, hasJob := nestedEvent["jobId"]; hasJob && jobID != "" { return fmt.Sprintf("%v", jobID) } From 5f547a2f8ad13b94f26e4cda37f033f3609075e9 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Mon, 5 Jan 2026 22:17:57 +0800 Subject: [PATCH 05/29] add task profile event example Signed-off-by: Future-Outlier --- historyserver/config/raycluster.yaml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/historyserver/config/raycluster.yaml b/historyserver/config/raycluster.yaml index 242968eb740..6ae73f71785 100644 --- a/historyserver/config/raycluster.yaml +++ b/historyserver/config/raycluster.yaml @@ -23,7 +23,9 @@ spec: - name: RAY_enable_ray_event value: "true" - name: RAY_enable_core_worker_ray_event_to_aggregator - value: "1" + value: "true" + - name: RAY_enable_timeline + value: "true" - name: RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR value: "http://localhost:8084/v1/events" image: rayproject/ray:2.52.0 @@ -122,7 +124,9 @@ spec: - name: RAY_enable_ray_event value: "true" - name: RAY_enable_core_worker_ray_event_to_aggregator - value: "1" + value: "true" + - name: RAY_enable_timeline + value: "true" - name: RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR value: "http://localhost:8084/v1/events" image: rayproject/ray:2.52.0 From 01771609afa8c2697f9f01a56ed2c8a75bfb0e59 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Mon, 5 Jan 2026 22:35:09 +0800 Subject: [PATCH 06/29] revert back oneof solution Signed-off-by: Future-Outlier --- historyserver/config/raycluster.yaml | 12 ++++++++---- .../pkg/collector/eventserver/eventserver.go | 13 +++++++++++-- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/historyserver/config/raycluster.yaml b/historyserver/config/raycluster.yaml index 6ae73f71785..4a512f01f13 100644 --- a/historyserver/config/raycluster.yaml +++ b/historyserver/config/raycluster.yaml @@ -24,10 +24,12 @@ spec: value: "true" - name: RAY_enable_core_worker_ray_event_to_aggregator value: "true" - - name: RAY_enable_timeline - value: "true" - name: RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR value: "http://localhost:8084/v1/events" + - name: RAY_DASHBOARD_AGGREGATOR_AGENT_PUBLISHER_HTTP_ENDPOINT_EXPOSABLE_EVENT_TYPES + value: "TASK_DEFINITION_EVENT,TASK_LIFECYCLE_EVENT,ACTOR_TASK_DEFINITION_EVENT, + TASK_PROFILE_EVENT,DRIVER_JOB_DEFINITION_EVENT,DRIVER_JOB_LIFECYCLE_EVENT, + ACTOR_DEFINITION_EVENT,ACTOR_LIFECYCLE_EVENT,NODE_DEFINITION_EVENT,NODE_LIFECYCLE_EVENT" image: rayproject/ray:2.52.0 imagePullPolicy: IfNotPresent command: @@ -125,10 +127,12 @@ spec: value: "true" - name: RAY_enable_core_worker_ray_event_to_aggregator value: "true" - - name: RAY_enable_timeline - value: "true" - name: RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR value: "http://localhost:8084/v1/events" + - name: RAY_DASHBOARD_AGGREGATOR_AGENT_PUBLISHER_HTTP_ENDPOINT_EXPOSABLE_EVENT_TYPES + value: "TASK_DEFINITION_EVENT,TASK_LIFECYCLE_EVENT,ACTOR_TASK_DEFINITION_EVENT, + TASK_PROFILE_EVENT,DRIVER_JOB_DEFINITION_EVENT,DRIVER_JOB_LIFECYCLE_EVENT, + ACTOR_DEFINITION_EVENT,ACTOR_LIFECYCLE_EVENT,NODE_DEFINITION_EVENT,NODE_LIFECYCLE_EVENT" image: rayproject/ray:2.52.0 command: - 'echo "=========================================="; [ -d "/tmp/ray/session_latest" ] && dest="/tmp/ray/prev-logs/$(basename $(readlink /tmp/ray/session_latest))/$(cat /tmp/ray/raylet_node_id)" && echo "dst is $dest" && mkdir -p "$dest" && mv /tmp/ray/session_latest/logs "$dest/logs"; echo "========================================="' diff --git a/historyserver/pkg/collector/eventserver/eventserver.go b/historyserver/pkg/collector/eventserver/eventserver.go index 19274b78125..e6e4a0b79f7 100644 --- a/historyserver/pkg/collector/eventserver/eventserver.go +++ b/historyserver/pkg/collector/eventserver/eventserver.go @@ -408,8 +408,17 @@ func (es *EventServer) isNodeEvent(eventData map[string]interface{}) bool { // getJobID gets jobID associated with event func (es *EventServer) getJobID(eventData map[string]interface{}) string { - for _, value := range eventData { - if nestedEvent, ok := value.(map[string]interface{}); ok { + eventTypesWithJobID := []string{ + "driverJobDefinitionEvent", + "driverJobLifecycleEvent", + "taskDefinitionEvent", + "taskLifecycleEvent", + "actorTaskDefinitionEvent", + "actorDefinitionEvent", + } + + for _, eventType := range eventTypesWithJobID { + if nestedEvent, ok := eventData[eventType].(map[string]interface{}); ok { if jobID, hasJob := nestedEvent["jobId"]; hasJob && jobID != "" { return fmt.Sprintf("%v", jobID) } From 420ce9a7f1c1b5ad9405be670f6c00069d6c0498 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Mon, 5 Jan 2026 22:43:40 +0800 Subject: [PATCH 07/29] add task profile event Signed-off-by: Future-Outlier --- historyserver/pkg/collector/eventserver/eventserver.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/historyserver/pkg/collector/eventserver/eventserver.go b/historyserver/pkg/collector/eventserver/eventserver.go index e6e4a0b79f7..5c0ca4dd059 100644 --- a/historyserver/pkg/collector/eventserver/eventserver.go +++ b/historyserver/pkg/collector/eventserver/eventserver.go @@ -409,10 +409,16 @@ func (es *EventServer) isNodeEvent(eventData map[string]interface{}) bool { // getJobID gets jobID associated with event func (es *EventServer) getJobID(eventData map[string]interface{}) string { eventTypesWithJobID := []string{ + // Job Events (Driver Job) "driverJobDefinitionEvent", "driverJobLifecycleEvent", + + // Task Events (Normal Task) "taskDefinitionEvent", "taskLifecycleEvent", + "taskProfileEvents", + + // Actor Events (Actor Task + Actor Definition) "actorTaskDefinitionEvent", "actorDefinitionEvent", } From 0621ba422d1ba940495041257135116b6fd8bce2 Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Mon, 5 Jan 2026 23:44:47 +0800 Subject: [PATCH 08/29] test: Test event type coverage in happy path Signed-off-by: JiangJiaWei1103 --- historyserver/test/e2e/collector_test.go | 125 +++++++++++++++++++---- 1 file changed, 107 insertions(+), 18 deletions(-) diff --git a/historyserver/test/e2e/collector_test.go b/historyserver/test/e2e/collector_test.go index 3fe16341405..fd1e58480ae 100644 --- a/historyserver/test/e2e/collector_test.go +++ b/historyserver/test/e2e/collector_test.go @@ -2,6 +2,7 @@ package e2e import ( "context" + "encoding/json" "fmt" "os/exec" "path/filepath" @@ -18,8 +19,6 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" - "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" - rayv1ac "github.com/ray-project/kuberay/ray-operator/pkg/client/applyconfiguration/ray/v1" . "github.com/ray-project/kuberay/ray-operator/test/support" ) @@ -35,6 +34,9 @@ const ( // Ray cluster rayClusterManifestPath = "../../config/raycluster.yaml" rayClusterID = "default" + + // Ray job + rayJobManifestPath = "../../config/rayjob.yaml" ) func TestCollector(t *testing.T) { @@ -49,10 +51,10 @@ func TestCollector(t *testing.T) { name: "Happy path: Logs and events should be uploaded to S3 on deletion", testFunc: testCollectorUploadOnGracefulShutdown, }, - { - name: "Simulate OOMKilled behavior: Single session single node logs and events should be uploaded to S3 after the ray-head container is restarted", - testFunc: testCollectorSeparatesFilesBySession, - }, + // { + // name: "Simulate OOMKilled behavior: Single session single node logs and events should be uploaded to S3 after the ray-head container is restarted", + // testFunc: testCollectorSeparatesFilesBySession, + // }, } for _, tt := range tests { @@ -66,20 +68,23 @@ func TestCollector(t *testing.T) { } } -// testCollectorUploadOnGracefulShutdown verifies that logs and node_events are successfully uploaded to S3 on cluster deletion. +// testCollectorUploadOnGracefulShutdown verifies that logs, node_events, and job_events are successfully uploaded to S3 on cluster deletion. // // The test case follows these steps: // 1. Prepare test environment by applying a Ray cluster with the collector // 2. Submit a Ray job to the existing Ray cluster // 3. Get the sessionID and nodeID for further verification // 4. Delete the Ray cluster to trigger log uploading and event flushing on deletion. When the Ray cluster is deleted, -// logs and node_events are processed as follows: +// logs, node_events, and job_events are processed as follows: // - logs: Trigger RayLogHandler.processSessionLatestLog to process logs under /tmp/ray/session_latest -// - node_events: Trigger EventServer.flushEvents to process in-memory events +// - node_events: Trigger EventServer.flushEvents, which calls es.flushNodeEventsForHour to process in-memory node events +// - job_events: Trigger EventServer.flushEvents, which calls es.flushJobEventsForHour to process in-memory job events // -// 5. Verify logs and node_events are successfully uploaded to S3. Expected S3 path structure: +// 5. Verify logs, node_events, and job_events are successfully uploaded to S3. Expected S3 path structure: // - {s3BucketName}/log/{clusterName}_{clusterID}/{sessionID}/logs/... // - {s3BucketName}/log/{clusterName}_{clusterID}/{sessionID}/node_events/... +// - {s3BucketName}/log/{clusterName}_{clusterID}/{sessionID}/job_events/AgAAAA==/... +// - {s3BucketName}/log/{clusterName}_{clusterID}/{sessionID}/job_events/AQAAAA==/... // // 6. Delete S3 bucket to ensure test isolation func testCollectorUploadOnGracefulShutdown(test Test, g *WithT, namespace *corev1.Namespace, s3Client *s3.S3) { @@ -347,14 +352,12 @@ func applyRayCluster(test Test, g *WithT, namespace *corev1.Namespace) *rayv1.Ra // applyRayJobToCluster applies a Ray job to the existing Ray cluster. func applyRayJobToCluster(test Test, g *WithT, namespace *corev1.Namespace, rayCluster *rayv1.RayCluster) *rayv1.RayJob { - jobScript := "import ray; ray.init(); print(ray.cluster_resources())" - rayJobAC := rayv1ac.RayJob("ray-job", namespace.Name). - WithSpec(rayv1ac.RayJobSpec(). - WithClusterSelector(map[string]string{utils.RayClusterLabelKey: rayCluster.Name}). - WithEntrypoint(fmt.Sprintf("python -c %q", jobScript)). - WithSubmitterPodTemplate(JobSubmitterPodTemplateApplyConfiguration())) - - rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) + rayJobFromYaml := DeserializeRayJobYAML(test, rayJobManifestPath) + rayJobFromYaml.Namespace = namespace.Name + + rayJob, err := test.Client().Ray().RayV1(). + RayJobs(namespace.Name). + Create(test.Ctx(), rayJobFromYaml, metav1.CreateOptions{}) g.Expect(err).NotTo(HaveOccurred()) LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) @@ -414,6 +417,36 @@ func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix str LogWithTimestamp(test.T(), "Verified file %s has content: %d bytes", fileKey, fileSize) }, TestTimeoutMedium).Should(Succeed(), "Failed to verify at least one object in directory %s has content", dirPrefix) } + + // Verify event type coverage. + LogWithTimestamp(test.T(), "Verifying event type coverage") + events := []struct { + prefix string + requiredTypes []string + }{ + { + prefix: sessionPrefix + "node_events", + requiredTypes: []string{"NODE_DEFINITION_EVENT", "NODE_LIFECYCLE_EVENT", "ACTOR_LIFECYCLE_EVENT", "TASK_LIFECYCLE_EVENT"}, + }, + { + prefix: sessionPrefix + "job_events/AgAAAA==", + requiredTypes: []string{"DRIVER_JOB_DEFINITION_EVENT", "DRIVER_JOB_LIFECYCLE_EVENT", "TASK_DEFINITION_EVENT", "TASK_LIFECYCLE_EVENT", "ACTOR_DEFINITION_EVENT", "ACTOR_TASK_DEFINITION_EVENT"}, + }, + { + prefix: sessionPrefix + "job_events/AQAAAA==", + requiredTypes: []string{"ACTOR_DEFINITION_EVENT", "TASK_DEFINITION_EVENT"}, + }, + } + for _, event := range events { + events, err := loadRayEventsFromS3(test, s3Client, s3BucketName, event.prefix) + g.Expect(err).NotTo(HaveOccurred()) + + LogWithTimestamp(test.T(), "Loaded %d events from %s", len(events), event.prefix) + LogWithTimestamp(test.T(), "Events: %+v", events) + + assertEventTypesPresent(test, g, events, event.requiredTypes) + } + } // getSessionIDFromHeadPod retrieves the sessionID from the Ray head pod by reading the symlink @@ -473,3 +506,59 @@ func getContainerStatusByName(pod *corev1.Pod, containerName string) (*corev1.Co } return nil, fmt.Errorf("container %s not found in pod %s/%s", containerName, pod.Namespace, pod.Name) } + +type RayEvent struct { + EventID string `json:"eventId"` + EventType string `json:"eventType"` + Timestamp string `json:"timestamp"` +} + +func loadRayEventsFromS3(test Test, s3Client *s3.S3, bucket string, prefix string) ([]RayEvent, error) { + objects, err := s3Client.ListObjectsV2(&s3.ListObjectsV2Input{ + Bucket: aws.String(bucket), + Prefix: aws.String(prefix), + }) + if err != nil { + return nil, err + } + + // Find the first file object for content verification. + var fileObj *s3.Object + for _, obj := range objects.Contents { + if !strings.HasSuffix(aws.StringValue(obj.Key), "/") { + fileObj = obj + break + } + } + + var events []RayEvent + fileKey := *fileObj.Key + content, err := s3Client.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(fileKey), + }) + if err != nil { + return nil, err + } + defer content.Body.Close() + + var batch []RayEvent + if err := json.NewDecoder(content.Body).Decode(&batch); err != nil { + return nil, err + } + events = append(events, batch...) + + return events, nil +} + +func assertEventTypesPresent(test Test, g *WithT, events []RayEvent, requiredTypes []string) { + found := map[string]bool{} + + for _, event := range events { + found[event.EventType] = true + } + + for _, requiredType := range requiredTypes { + g.Expect(found[requiredType]).To(BeTrue(), "Event type %s not found", requiredType) + } +} From de153d135dd1406e130ed819d412a109884e7cbf Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Tue, 6 Jan 2026 19:56:41 +0800 Subject: [PATCH 09/29] refactor: Remove redundant code Signed-off-by: JiangJiaWei1103 --- .../pkg/collector/eventserver/eventserver.go | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/historyserver/pkg/collector/eventserver/eventserver.go b/historyserver/pkg/collector/eventserver/eventserver.go index de0dc728c67..6ae44d53058 100644 --- a/historyserver/pkg/collector/eventserver/eventserver.go +++ b/historyserver/pkg/collector/eventserver/eventserver.go @@ -423,21 +423,6 @@ func (es *EventServer) isNodeEvent(eventData map[string]interface{}) bool { // getJobID gets jobID associated with event func (es *EventServer) getJobID(eventData map[string]interface{}) string { - eventTypesWithJobID := []string{ - // Job Events (Driver Job) - "driverJobDefinitionEvent", - "driverJobLifecycleEvent", - - // Task Events (Normal Task) - "taskDefinitionEvent", - "taskLifecycleEvent", - "taskProfileEvents", - - // Actor Events (Actor Task + Actor Definition) - "actorTaskDefinitionEvent", - "actorDefinitionEvent", - } - for _, eventType := range eventTypesWithJobID { if nestedEvent, ok := eventData[eventType].(map[string]interface{}); ok { if jobID, hasJob := nestedEvent["jobId"]; hasJob && jobID != "" { From d1c2b1841b8c80d41d69f24d72b5cceb308a9622 Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Tue, 6 Jan 2026 22:52:24 +0800 Subject: [PATCH 10/29] test: Verify event type coverage of aggregated node and job events Signed-off-by: JiangJiaWei1103 --- historyserver/test/e2e/collector_test.go | 121 ++++++++++++----------- 1 file changed, 64 insertions(+), 57 deletions(-) diff --git a/historyserver/test/e2e/collector_test.go b/historyserver/test/e2e/collector_test.go index fd1e58480ae..3e4d5db79a7 100644 --- a/historyserver/test/e2e/collector_test.go +++ b/historyserver/test/e2e/collector_test.go @@ -39,6 +39,28 @@ const ( rayJobManifestPath = "../../config/rayjob.yaml" ) +// rayEventTypes includes all potential event types defined in Ray: +// https://github.com/ray-project/ray/blob/3b41c97fa90c58b0b72c0026f57005b92310160d/src/ray/protobuf/public/events_base_event.proto#L49-L61 +var rayEventTypes = []string{ + "ACTOR_DEFINITION_EVENT", + "ACTOR_LIFECYCLE_EVENT", + "ACTOR_TASK_DEFINITION_EVENT", + "DRIVER_JOB_DEFINITION_EVENT", + "DRIVER_JOB_LIFECYCLE_EVENT", + "TASK_DEFINITION_EVENT", + "TASK_LIFECYCLE_EVENT", + "TASK_PROFILE_EVENT", + "NODE_DEFINITION_EVENT", + "NODE_LIFECYCLE_EVENT", +} + +// rayEvent contains specific fields in the Ray event JSON schema. For now, we keep only two fields, +// eventId and eventType while ensuring future extensibility (e.g., sessionName, timestamp, sourceType, etc.). +type rayEvent struct { + EventID string `json:"eventId"` + EventType string `json:"eventType"` +} + func TestCollector(t *testing.T) { // Share a single S3 client among subtests. s3Client := ensureS3Client(t) @@ -86,6 +108,8 @@ func TestCollector(t *testing.T) { // - {s3BucketName}/log/{clusterName}_{clusterID}/{sessionID}/job_events/AgAAAA==/... // - {s3BucketName}/log/{clusterName}_{clusterID}/{sessionID}/job_events/AQAAAA==/... // +// For detailed verification logic, please refer to verifyS3SessionDirs. +// // 6. Delete S3 bucket to ensure test isolation func testCollectorUploadOnGracefulShutdown(test Test, g *WithT, namespace *corev1.Namespace, s3Client *s3.S3) { rayCluster := prepareTestEnv(test, g, namespace, s3Client) @@ -108,7 +132,7 @@ func testCollectorUploadOnGracefulShutdown(test Test, g *WithT, namespace *corev return err }, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue())) - // Verify logs and node_events are successfully uploaded to S3. + // Verify logs, node_events, and job_events are successfully uploaded to S3. verifyS3SessionDirs(test, g, s3Client, sessionPrefix, nodeID) // Delete S3 bucket to ensure test isolation. @@ -150,6 +174,7 @@ func testCollectorSeparatesFilesBySession(test Test, g *WithT, namespace *corev1 // Since Kubernetes 1.28 (with cgroups v2 enabled), `memory.oom.group` is enabled by default: when any process in a cgroup // hits the memory limit, all processes in the container are killed together, thereby triggering container restart. // For more details, please refer to https://github.com/kubernetes/kubernetes/pull/117793 + // TODO(jwj): Force delete workers. LogWithTimestamp(test.T(), "Killing main process of ray-head container to trigger a restart") g.Eventually(func(gg Gomega) { headPod, err := GetHeadPod(test, rayCluster) @@ -372,13 +397,20 @@ func applyRayJobToCluster(test Test, g *WithT, namespace *corev1.Namespace, rayC return rayJob } -// verifyS3SessionDirs verifies that directories logs/ and node_events/ exist under a session prefix in S3. -// This helper function checks that each directory contains at least one object. -// Additionally, it verifies that specific files have content: -// - logs//raylet.out must exist and have content > 0 bytes -// - node_events/_ must exist and have content > 0 bytes (suffix can be ignored for verification) +// verifyS3SessionDirs verifies file contents in logs/, node_events/, and job_events/ directories under a session prefix in S3. +// There are two phases of verification: +// 1. Verify file contents in logs/ directory +// - logs//raylet.out must exist and have content > 0 bytes +// - TODO(jwj): Complete docs. +// +// 2. Verify event type coverage in node_events/ and job_events/ directories +// - Aggregate all events from node_events/ and job_events/ directories +// - Verify that all potential event types are present in the aggregated events +// +// NOTE: Since flushed node and job events are nondeterministic, we need to aggregate them first before verifying event type coverage. func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix string, nodeID string) { - dirs := []string{"logs", "node_events"} + // TODO(jwj): Separate verification for logs and events. + dirs := []string{"logs"} for _, dir := range dirs { dirPrefix := sessionPrefix + dir + "/" @@ -418,35 +450,15 @@ func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix str }, TestTimeoutMedium).Should(Succeed(), "Failed to verify at least one object in directory %s has content", dirPrefix) } - // Verify event type coverage. - LogWithTimestamp(test.T(), "Verifying event type coverage") - events := []struct { - prefix string - requiredTypes []string - }{ - { - prefix: sessionPrefix + "node_events", - requiredTypes: []string{"NODE_DEFINITION_EVENT", "NODE_LIFECYCLE_EVENT", "ACTOR_LIFECYCLE_EVENT", "TASK_LIFECYCLE_EVENT"}, - }, - { - prefix: sessionPrefix + "job_events/AgAAAA==", - requiredTypes: []string{"DRIVER_JOB_DEFINITION_EVENT", "DRIVER_JOB_LIFECYCLE_EVENT", "TASK_DEFINITION_EVENT", "TASK_LIFECYCLE_EVENT", "ACTOR_DEFINITION_EVENT", "ACTOR_TASK_DEFINITION_EVENT"}, - }, - { - prefix: sessionPrefix + "job_events/AQAAAA==", - requiredTypes: []string{"ACTOR_DEFINITION_EVENT", "TASK_DEFINITION_EVENT"}, - }, - } - for _, event := range events { - events, err := loadRayEventsFromS3(test, s3Client, s3BucketName, event.prefix) + LogWithTimestamp(test.T(), "Verifying all %d event types are covered: %v", len(rayEventTypes), rayEventTypes) + uploadedEvents := []rayEvent{} + for _, dir := range []string{"node_events", "job_events/AgAAAA==", "job_events/AQAAAA=="} { + dirPrefix := sessionPrefix + dir + "/" + events, err := loadRayEventsFromS3(s3Client, s3BucketName, dirPrefix) g.Expect(err).NotTo(HaveOccurred()) - - LogWithTimestamp(test.T(), "Loaded %d events from %s", len(events), event.prefix) - LogWithTimestamp(test.T(), "Events: %+v", events) - - assertEventTypesPresent(test, g, events, event.requiredTypes) + uploadedEvents = append(uploadedEvents, events...) } - + assertAllEventTypesCovered(test, g, uploadedEvents) } // getSessionIDFromHeadPod retrieves the sessionID from the Ray head pod by reading the symlink @@ -507,13 +519,8 @@ func getContainerStatusByName(pod *corev1.Pod, containerName string) (*corev1.Co return nil, fmt.Errorf("container %s not found in pod %s/%s", containerName, pod.Namespace, pod.Name) } -type RayEvent struct { - EventID string `json:"eventId"` - EventType string `json:"eventType"` - Timestamp string `json:"timestamp"` -} - -func loadRayEventsFromS3(test Test, s3Client *s3.S3, bucket string, prefix string) ([]RayEvent, error) { +// loadRayEventsFromS3 loads Ray events from S3. +func loadRayEventsFromS3(s3Client *s3.S3, bucket string, prefix string) ([]rayEvent, error) { objects, err := s3Client.ListObjectsV2(&s3.ListObjectsV2Input{ Bucket: aws.String(bucket), Prefix: aws.String(prefix), @@ -522,17 +529,19 @@ func loadRayEventsFromS3(test Test, s3Client *s3.S3, bucket string, prefix strin return nil, err } - // Find the first file object for content verification. - var fileObj *s3.Object + // Find the first file object for loading Ray events. + var fileKey string for _, obj := range objects.Contents { - if !strings.HasSuffix(aws.StringValue(obj.Key), "/") { - fileObj = obj + if key := aws.StringValue(obj.Key); !strings.HasSuffix(key, "/") { + fileKey = key break } } + if fileKey == "" { + return nil, fmt.Errorf("no file object found in directory %s", prefix) + } - var events []RayEvent - fileKey := *fileObj.Key + // Get and decode the file object content into Ray events. content, err := s3Client.GetObject(&s3.GetObjectInput{ Bucket: aws.String(bucket), Key: aws.String(fileKey), @@ -542,23 +551,21 @@ func loadRayEventsFromS3(test Test, s3Client *s3.S3, bucket string, prefix strin } defer content.Body.Close() - var batch []RayEvent - if err := json.NewDecoder(content.Body).Decode(&batch); err != nil { + var events []rayEvent + if err := json.NewDecoder(content.Body).Decode(&events); err != nil { return nil, err } - events = append(events, batch...) - return events, nil } -func assertEventTypesPresent(test Test, g *WithT, events []RayEvent, requiredTypes []string) { - found := map[string]bool{} - +// assertAllEventTypesCovered verifies that all potential event types are present in the events uploaded to S3. +func assertAllEventTypesCovered(test Test, g *WithT, events []rayEvent) { + foundEventTypes := map[string]bool{} for _, event := range events { - found[event.EventType] = true + foundEventTypes[event.EventType] = true } - for _, requiredType := range requiredTypes { - g.Expect(found[requiredType]).To(BeTrue(), "Event type %s not found", requiredType) + for _, eventType := range rayEventTypes { + g.Expect(foundEventTypes[eventType]).To(BeTrue(), "Event type %s not found", eventType) } } From 14a9b522ab0931d91cb5091f3c483aaa57d5d1dc Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Tue, 6 Jan 2026 23:42:19 +0800 Subject: [PATCH 11/29] test: Force kill worker container and verify event coverage Signed-off-by: JiangJiaWei1103 --- historyserver/test/e2e/collector_test.go | 64 +++++++++++++++--------- 1 file changed, 41 insertions(+), 23 deletions(-) diff --git a/historyserver/test/e2e/collector_test.go b/historyserver/test/e2e/collector_test.go index 3e4d5db79a7..ea92888823f 100644 --- a/historyserver/test/e2e/collector_test.go +++ b/historyserver/test/e2e/collector_test.go @@ -73,10 +73,10 @@ func TestCollector(t *testing.T) { name: "Happy path: Logs and events should be uploaded to S3 on deletion", testFunc: testCollectorUploadOnGracefulShutdown, }, - // { - // name: "Simulate OOMKilled behavior: Single session single node logs and events should be uploaded to S3 after the ray-head container is restarted", - // testFunc: testCollectorSeparatesFilesBySession, - // }, + { + name: "Simulate OOMKilled behavior: Single session single node logs and events should be uploaded to S3 after the ray-head container is restarted", + testFunc: testCollectorSeparatesFilesBySession, + }, } for _, tt := range tests { @@ -174,24 +174,21 @@ func testCollectorSeparatesFilesBySession(test Test, g *WithT, namespace *corev1 // Since Kubernetes 1.28 (with cgroups v2 enabled), `memory.oom.group` is enabled by default: when any process in a cgroup // hits the memory limit, all processes in the container are killed together, thereby triggering container restart. // For more details, please refer to https://github.com/kubernetes/kubernetes/pull/117793 - // TODO(jwj): Force delete workers. - LogWithTimestamp(test.T(), "Killing main process of ray-head container to trigger a restart") - g.Eventually(func(gg Gomega) { - headPod, err := GetHeadPod(test, rayCluster) - gg.Expect(err).NotTo(HaveOccurred()) - _, stderr := ExecPodCmd(test, headPod, "ray-head", []string{"kill", "1"}) - gg.Expect(stderr.String()).To(BeEmpty()) - }, TestTimeoutMedium).Should(Succeed(), "Failed to kill main process of ray-head container") - - LogWithTimestamp(test.T(), "Waiting for ray-head container to restart and become ready") - g.Eventually(func(gg Gomega) { - updatedPod, err := GetHeadPod(test, rayCluster) - gg.Expect(err).NotTo(HaveOccurred()) - rayHeadStatus, err := getContainerStatusByName(updatedPod, "ray-head") - gg.Expect(err).NotTo(HaveOccurred()) - gg.Expect(rayHeadStatus.RestartCount).To(BeNumerically(">", 0)) - gg.Expect(rayHeadStatus.Ready).To(BeTrue()) - }, TestTimeoutShort).Should(Succeed(), "ray-head container should restart and become ready") + killContainerAndWaitForRestart(test, g, rayCluster, "ray-head", func() (*corev1.Pod, error) { + return GetHeadPod(test, rayCluster) + }) + // TODO(jwj): Clarify the automatic restart mechanism. + // Force kill the ray-worker container before automatic restart. + killContainerAndWaitForRestart(test, g, rayCluster, "ray-worker", func() (*corev1.Pod, error) { + workerPods, err := GetWorkerPods(test, rayCluster) + if err != nil { + return nil, err + } + if len(workerPods) == 0 { + return nil, fmt.Errorf("no worker pods found") + } + return &workerPods[0], nil + }) // Verify the old session logs have been processed on disk. dirs := []string{"prev-logs", "persist-complete-logs"} @@ -208,7 +205,7 @@ func testCollectorSeparatesFilesBySession(test Test, g *WithT, namespace *corev1 }, TestTimeoutMedium).Should(Succeed(), "Session directory %s should exist in %s", sessionID, dirPath) } - // Verify logs and node_events are successfully uploaded to S3. + // Verify logs, node_events, and job_events are successfully uploaded to S3. verifyS3SessionDirs(test, g, s3Client, sessionPrefix, nodeID) deleteS3Bucket(test, g, s3Client) @@ -506,6 +503,27 @@ fi` } +// killContainerAndWaitForRestart kills the main process of a container and waits for the container to restart and become ready. +func killContainerAndWaitForRestart(test Test, g *WithT, rayCluster *rayv1.RayCluster, containerName string, getPod func() (*corev1.Pod, error)) { + LogWithTimestamp(test.T(), "Killing main process of %s container to trigger a restart", containerName) + g.Eventually(func(gg Gomega) { + pod, err := getPod() + gg.Expect(err).NotTo(HaveOccurred()) + _, stderr := ExecPodCmd(test, pod, containerName, []string{"kill", "1"}) + gg.Expect(stderr.String()).To(BeEmpty()) + }, TestTimeoutMedium).Should(Succeed(), "Failed to kill main process of %s container", containerName) + + LogWithTimestamp(test.T(), "Waiting for %s container to restart and become ready", containerName) + g.Eventually(func(gg Gomega) { + pod, err := getPod() + gg.Expect(err).NotTo(HaveOccurred()) + containerStatus, err := getContainerStatusByName(pod, containerName) + gg.Expect(err).NotTo(HaveOccurred()) + gg.Expect(containerStatus.RestartCount).To(BeNumerically(">", 0)) + gg.Expect(containerStatus.Ready).To(BeTrue()) + }, TestTimeoutShort).Should(Succeed(), "%s container should restart and become ready", containerName) +} + // getContainerStatusByName retrieves the container status by container name. // NOTE: ContainerStatuses order doesn't guarantee to match Spec.Containers order. // For more details, please refer to the following link: From 598dbfd5157b44fd1ba4f35fcd87ad1b2e660ca0 Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Wed, 7 Jan 2026 22:22:34 +0800 Subject: [PATCH 12/29] refactor: Create an WorkerPods adapter and remove redundancy Signed-off-by: JiangJiaWei1103 --- historyserver/test/e2e/collector_test.go | 32 +++++++++++++----------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/historyserver/test/e2e/collector_test.go b/historyserver/test/e2e/collector_test.go index ea92888823f..21a8ba750c4 100644 --- a/historyserver/test/e2e/collector_test.go +++ b/historyserver/test/e2e/collector_test.go @@ -174,21 +174,10 @@ func testCollectorSeparatesFilesBySession(test Test, g *WithT, namespace *corev1 // Since Kubernetes 1.28 (with cgroups v2 enabled), `memory.oom.group` is enabled by default: when any process in a cgroup // hits the memory limit, all processes in the container are killed together, thereby triggering container restart. // For more details, please refer to https://github.com/kubernetes/kubernetes/pull/117793 - killContainerAndWaitForRestart(test, g, rayCluster, "ray-head", func() (*corev1.Pod, error) { - return GetHeadPod(test, rayCluster) - }) + killContainerAndWaitForRestart(test, g, HeadPod(test, rayCluster), "ray-head") // TODO(jwj): Clarify the automatic restart mechanism. // Force kill the ray-worker container before automatic restart. - killContainerAndWaitForRestart(test, g, rayCluster, "ray-worker", func() (*corev1.Pod, error) { - workerPods, err := GetWorkerPods(test, rayCluster) - if err != nil { - return nil, err - } - if len(workerPods) == 0 { - return nil, fmt.Errorf("no worker pods found") - } - return &workerPods[0], nil - }) + killContainerAndWaitForRestart(test, g, FirstWorkerPod(test, rayCluster), "ray-worker") // Verify the old session logs have been processed on disk. dirs := []string{"prev-logs", "persist-complete-logs"} @@ -503,8 +492,23 @@ fi` } +// FirstWorkerPod returns a function that gets the first worker pod from the Ray cluster. +// It adapts the WorkerPods function to be used with functions expecting a single pod. +func FirstWorkerPod(test Test, rayCluster *rayv1.RayCluster) func() (*corev1.Pod, error) { + return func() (*corev1.Pod, error) { + workerPods, err := GetWorkerPods(test, rayCluster) + if err != nil { + return nil, err + } + if len(workerPods) == 0 { + return nil, fmt.Errorf("no worker pods found") + } + return &workerPods[0], nil + } +} + // killContainerAndWaitForRestart kills the main process of a container and waits for the container to restart and become ready. -func killContainerAndWaitForRestart(test Test, g *WithT, rayCluster *rayv1.RayCluster, containerName string, getPod func() (*corev1.Pod, error)) { +func killContainerAndWaitForRestart(test Test, g *WithT, getPod func() (*corev1.Pod, error), containerName string) { LogWithTimestamp(test.T(), "Killing main process of %s container to trigger a restart", containerName) g.Eventually(func(gg Gomega) { pod, err := getPod() From 7492761fe53be782ef10ad9e62cad35214b9e89c Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Wed, 7 Jan 2026 22:50:04 +0800 Subject: [PATCH 13/29] test: Check both head and worker logs Signed-off-by: JiangJiaWei1103 --- historyserver/test/e2e/collector_test.go | 44 +++++++++++++----------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/historyserver/test/e2e/collector_test.go b/historyserver/test/e2e/collector_test.go index 21a8ba750c4..1e80d2fdc21 100644 --- a/historyserver/test/e2e/collector_test.go +++ b/historyserver/test/e2e/collector_test.go @@ -117,9 +117,11 @@ func testCollectorUploadOnGracefulShutdown(test Test, g *WithT, namespace *corev // Submit a Ray job to the existing cluster. _ = applyRayJobToCluster(test, g, namespace, rayCluster) + // Define variables for constructing S3 object prefix. clusterNameID := fmt.Sprintf("%s_%s", rayCluster.Name, rayClusterID) sessionID := getSessionIDFromHeadPod(test, g, rayCluster) - nodeID := getNodeIDFromHeadPod(test, g, rayCluster) + headNodeID := getNodeIDFromPod(test, g, HeadPod(test, rayCluster), "ray-head") + workerNodeID := getNodeIDFromPod(test, g, FirstWorkerPod(test, rayCluster), "ray-worker") sessionPrefix := fmt.Sprintf("log/%s/%s/", clusterNameID, sessionID) // Delete the Ray cluster to trigger log uploading and event flushing on deletion. @@ -133,7 +135,7 @@ func testCollectorUploadOnGracefulShutdown(test Test, g *WithT, namespace *corev }, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue())) // Verify logs, node_events, and job_events are successfully uploaded to S3. - verifyS3SessionDirs(test, g, s3Client, sessionPrefix, nodeID) + verifyS3SessionDirs(test, g, s3Client, sessionPrefix, headNodeID, workerNodeID) // Delete S3 bucket to ensure test isolation. deleteS3Bucket(test, g, s3Client) @@ -165,7 +167,8 @@ func testCollectorSeparatesFilesBySession(test Test, g *WithT, namespace *corev1 clusterNameID := fmt.Sprintf("%s_%s", rayCluster.Name, rayClusterID) sessionID := getSessionIDFromHeadPod(test, g, rayCluster) - nodeID := getNodeIDFromHeadPod(test, g, rayCluster) + headNodeID := getNodeIDFromPod(test, g, HeadPod(test, rayCluster), "ray-head") + workerNodeID := getNodeIDFromPod(test, g, FirstWorkerPod(test, rayCluster), "ray-worker") sessionPrefix := fmt.Sprintf("log/%s/%s/", clusterNameID, sessionID) // NOTE: We use `kill 1` to simulate Kubernetes OOMKilled behavior. @@ -195,7 +198,7 @@ func testCollectorSeparatesFilesBySession(test Test, g *WithT, namespace *corev1 } // Verify logs, node_events, and job_events are successfully uploaded to S3. - verifyS3SessionDirs(test, g, s3Client, sessionPrefix, nodeID) + verifyS3SessionDirs(test, g, s3Client, sessionPrefix, headNodeID, workerNodeID) deleteS3Bucket(test, g, s3Client) } @@ -386,7 +389,8 @@ func applyRayJobToCluster(test Test, g *WithT, namespace *corev1.Namespace, rayC // verifyS3SessionDirs verifies file contents in logs/, node_events/, and job_events/ directories under a session prefix in S3. // There are two phases of verification: // 1. Verify file contents in logs/ directory -// - logs//raylet.out must exist and have content > 0 bytes +// - logs// must exist and have content > 0 bytes +// - logs// must exist and have content > 0 bytes // - TODO(jwj): Complete docs. // // 2. Verify event type coverage in node_events/ and job_events/ directories @@ -394,11 +398,10 @@ func applyRayJobToCluster(test Test, g *WithT, namespace *corev1.Namespace, rayC // - Verify that all potential event types are present in the aggregated events // // NOTE: Since flushed node and job events are nondeterministic, we need to aggregate them first before verifying event type coverage. -func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix string, nodeID string) { - // TODO(jwj): Separate verification for logs and events. - dirs := []string{"logs"} - for _, dir := range dirs { - dirPrefix := sessionPrefix + dir + "/" +func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix string, headNodeID string, workerNodeID string) { + LogWithTimestamp(test.T(), "Verifying there's at least one non-empty object in both head and worker logs/ directories under %s", sessionPrefix) + for _, nodeID := range []string{headNodeID, workerNodeID} { + dirPrefix := fmt.Sprintf("%slogs/%s/", sessionPrefix, nodeID) g.Eventually(func(gg Gomega) { // Verify the directory has at least one object. @@ -410,21 +413,20 @@ func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix str gg.Expect(err).NotTo(HaveOccurred()) keyCount := aws.Int64Value(objects.KeyCount) gg.Expect(keyCount).To(BeNumerically(">", 0)) - LogWithTimestamp(test.T(), "Verified directory %s under %s has at least one object", dir, sessionPrefix) + LogWithTimestamp(test.T(), "Verified directory %s has at least one object", dirPrefix) // Find the first file object for content verification. - var fileObj *s3.Object + var fileKey string for _, obj := range objects.Contents { - if !strings.HasSuffix(aws.StringValue(obj.Key), "/") { - fileObj = obj + if key := aws.StringValue(obj.Key); !strings.HasSuffix(key, "/") { + fileKey = key break } } - gg.Expect(fileObj).NotTo(BeNil(), "No file object found in directory %s", dirPrefix) + gg.Expect(fileKey).NotTo(BeEmpty(), "No file object found in directory %s", dirPrefix) // Verify the file has content by checking file size. - fileKey := *fileObj.Key - LogWithTimestamp(test.T(), "Checking file: %s", fileKey) + LogWithTimestamp(test.T(), "Verifying file %s has content (> 0 bytes)", fileKey) obj, err := s3Client.HeadObject(&s3.HeadObjectInput{ Bucket: aws.String(s3BucketName), Key: aws.String(fileKey), @@ -470,9 +472,9 @@ fi` return sessionID } -// getNodeIDFromHeadPod retrieves the nodeID from the Ray head pod by reading /tmp/ray/raylet_node_id. -func getNodeIDFromHeadPod(test Test, g *WithT, rayCluster *rayv1.RayCluster) string { - headPod, err := GetHeadPod(test, rayCluster) +// getNodeIDFromPod retrieves the nodeID from the Ray head or worker pod by reading /tmp/ray/raylet_node_id. +func getNodeIDFromPod(test Test, g *WithT, getPod func() (*corev1.Pod, error), containerName string) string { + pod, err := getPod() g.Expect(err).NotTo(HaveOccurred()) getNodeIDCmd := `if [ -f "/tmp/ray/raylet_node_id" ]; then @@ -481,7 +483,7 @@ else echo "raylet_node_id not found" exit 1 fi` - output, _ := ExecPodCmd(test, headPod, "ray-head", []string{"sh", "-c", getNodeIDCmd}) + output, _ := ExecPodCmd(test, pod, containerName, []string{"sh", "-c", getNodeIDCmd}) // Parse output to extract the nodeID. nodeID := strings.TrimSpace(output.String()) From 9611a42c4f7d8b432e1e1a34ae5a270a63b740ef Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Fri, 9 Jan 2026 13:55:35 +0800 Subject: [PATCH 14/29] refactor: Use eventually to wrap coverage check Signed-off-by: JiangJiaWei1103 --- historyserver/test/e2e/collector_test.go | 38 +++++++++++------------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/historyserver/test/e2e/collector_test.go b/historyserver/test/e2e/collector_test.go index 21a8ba750c4..52de1824418 100644 --- a/historyserver/test/e2e/collector_test.go +++ b/historyserver/test/e2e/collector_test.go @@ -437,14 +437,24 @@ func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix str } LogWithTimestamp(test.T(), "Verifying all %d event types are covered: %v", len(rayEventTypes), rayEventTypes) - uploadedEvents := []rayEvent{} - for _, dir := range []string{"node_events", "job_events/AgAAAA==", "job_events/AQAAAA=="} { - dirPrefix := sessionPrefix + dir + "/" - events, err := loadRayEventsFromS3(s3Client, s3BucketName, dirPrefix) - g.Expect(err).NotTo(HaveOccurred()) - uploadedEvents = append(uploadedEvents, events...) - } - assertAllEventTypesCovered(test, g, uploadedEvents) + g.Eventually(func(gg Gomega) { + uploadedEvents := []rayEvent{} + for _, dir := range []string{"node_events", "job_events/AgAAAA==", "job_events/AQAAAA=="} { + dirPrefix := sessionPrefix + dir + "/" + events, err := loadRayEventsFromS3(s3Client, s3BucketName, dirPrefix) + gg.Expect(err).NotTo(HaveOccurred()) + uploadedEvents = append(uploadedEvents, events...) + } + + // Verify that all potential event types are present in the events uploaded to S3. + foundEventTypes := map[string]bool{} + for _, event := range uploadedEvents { + foundEventTypes[event.EventType] = true + } + for _, eventType := range rayEventTypes { + gg.Expect(foundEventTypes[eventType]).To(BeTrue(), "Event type %s not found", eventType) + } + }, TestTimeoutShort).Should(Succeed(), "Failed to verify all event types are covered") } // getSessionIDFromHeadPod retrieves the sessionID from the Ray head pod by reading the symlink @@ -579,15 +589,3 @@ func loadRayEventsFromS3(s3Client *s3.S3, bucket string, prefix string) ([]rayEv } return events, nil } - -// assertAllEventTypesCovered verifies that all potential event types are present in the events uploaded to S3. -func assertAllEventTypesCovered(test Test, g *WithT, events []rayEvent) { - foundEventTypes := map[string]bool{} - for _, event := range events { - foundEventTypes[event.EventType] = true - } - - for _, eventType := range rayEventTypes { - g.Expect(foundEventTypes[eventType]).To(BeTrue(), "Event type %s not found", eventType) - } -} From 86e73da281d9de4fc0c371ac476e70185e4b2870 Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Mon, 12 Jan 2026 20:26:01 +0800 Subject: [PATCH 15/29] test: Check raylet.out and gcs_server.out Signed-off-by: JiangJiaWei1103 --- historyserver/test/e2e/collector_test.go | 71 +++++++++++------------- 1 file changed, 32 insertions(+), 39 deletions(-) diff --git a/historyserver/test/e2e/collector_test.go b/historyserver/test/e2e/collector_test.go index 1e80d2fdc21..230bb8448a5 100644 --- a/historyserver/test/e2e/collector_test.go +++ b/historyserver/test/e2e/collector_test.go @@ -389,9 +389,8 @@ func applyRayJobToCluster(test Test, g *WithT, namespace *corev1.Namespace, rayC // verifyS3SessionDirs verifies file contents in logs/, node_events/, and job_events/ directories under a session prefix in S3. // There are two phases of verification: // 1. Verify file contents in logs/ directory -// - logs// must exist and have content > 0 bytes -// - logs// must exist and have content > 0 bytes -// - TODO(jwj): Complete docs. +// - For the head node, verify raylet.out and gcs_server.out exist and have content > 0 bytes +// - For the worker node, verify raylet.out exists and have content > 0 bytes // // 2. Verify event type coverage in node_events/ and job_events/ directories // - Aggregate all events from node_events/ and job_events/ directories @@ -399,45 +398,19 @@ func applyRayJobToCluster(test Test, g *WithT, namespace *corev1.Namespace, rayC // // NOTE: Since flushed node and job events are nondeterministic, we need to aggregate them first before verifying event type coverage. func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix string, headNodeID string, workerNodeID string) { - LogWithTimestamp(test.T(), "Verifying there's at least one non-empty object in both head and worker logs/ directories under %s", sessionPrefix) - for _, nodeID := range []string{headNodeID, workerNodeID} { - dirPrefix := fmt.Sprintf("%slogs/%s/", sessionPrefix, nodeID) + // Verify file contents in logs/ directory. + headLogDirPrefix := fmt.Sprintf("%slogs/%s/", sessionPrefix, headNodeID) + workerLogDirPrefix := fmt.Sprintf("%slogs/%s/", sessionPrefix, workerNodeID) - g.Eventually(func(gg Gomega) { - // Verify the directory has at least one object. - objects, err := s3Client.ListObjectsV2(&s3.ListObjectsV2Input{ - Bucket: aws.String(s3BucketName), - Prefix: aws.String(dirPrefix), - MaxKeys: aws.Int64(10), - }) - gg.Expect(err).NotTo(HaveOccurred()) - keyCount := aws.Int64Value(objects.KeyCount) - gg.Expect(keyCount).To(BeNumerically(">", 0)) - LogWithTimestamp(test.T(), "Verified directory %s has at least one object", dirPrefix) - - // Find the first file object for content verification. - var fileKey string - for _, obj := range objects.Contents { - if key := aws.StringValue(obj.Key); !strings.HasSuffix(key, "/") { - fileKey = key - break - } - } - gg.Expect(fileKey).NotTo(BeEmpty(), "No file object found in directory %s", dirPrefix) - - // Verify the file has content by checking file size. - LogWithTimestamp(test.T(), "Verifying file %s has content (> 0 bytes)", fileKey) - obj, err := s3Client.HeadObject(&s3.HeadObjectInput{ - Bucket: aws.String(s3BucketName), - Key: aws.String(fileKey), - }) - gg.Expect(err).NotTo(HaveOccurred()) - fileSize := aws.Int64Value(obj.ContentLength) - gg.Expect(fileSize).To(BeNumerically(">", 0)) - LogWithTimestamp(test.T(), "Verified file %s has content: %d bytes", fileKey, fileSize) - }, TestTimeoutMedium).Should(Succeed(), "Failed to verify at least one object in directory %s has content", dirPrefix) + LogWithTimestamp(test.T(), "Verifying raylet.out and gcs_server.out exist in head log directory %s", headLogDirPrefix) + for _, fileName := range []string{"raylet.out", "gcs_server.out"} { + assertNonEmptyFileExist(test, g, s3Client, headLogDirPrefix, fileName) } + LogWithTimestamp(test.T(), "Verifying raylet.out exists in worker log directory %s", workerLogDirPrefix) + assertNonEmptyFileExist(test, g, s3Client, workerLogDirPrefix, "raylet.out") + + // Verify event type coverage in node_events/ and job_events/ directories. LogWithTimestamp(test.T(), "Verifying all %d event types are covered: %v", len(rayEventTypes), rayEventTypes) uploadedEvents := []rayEvent{} for _, dir := range []string{"node_events", "job_events/AgAAAA==", "job_events/AQAAAA=="} { @@ -582,6 +555,26 @@ func loadRayEventsFromS3(s3Client *s3.S3, bucket string, prefix string) ([]rayEv return events, nil } +// assertNonEmptyFileExist verifies that a file exists and has content (> 0 bytes). +// For a Ray cluster with one head node and one worker node, there are two logs/ directories to verify: +// - logs//... +// - logs//... +func assertNonEmptyFileExist(test Test, g *WithT, s3Client *s3.S3, nodeLogDirPrefix string, fileName string) { + fileKey := fmt.Sprintf("%s/%s", nodeLogDirPrefix, fileName) + LogWithTimestamp(test.T(), "Verifying file %s has content (> 0 bytes)", fileKey) + g.Eventually(func(gg Gomega) { + // Verify the file has content by checking file size. + obj, err := s3Client.HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String(s3BucketName), + Key: aws.String(fileKey), + }) + gg.Expect(err).NotTo(HaveOccurred()) + fileSize := aws.Int64Value(obj.ContentLength) + gg.Expect(fileSize).To(BeNumerically(">", 0)) + LogWithTimestamp(test.T(), "Verified file %s has content: %d bytes", fileKey, fileSize) + }, TestTimeoutMedium).Should(Succeed(), "Failed to verify file %s has content (> 0 bytes)", fileKey) +} + // assertAllEventTypesCovered verifies that all potential event types are present in the events uploaded to S3. func assertAllEventTypesCovered(test Test, g *WithT, events []rayEvent) { foundEventTypes := map[string]bool{} From ecf3648ab440ef1c99f3266d6851281b22304f46 Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Mon, 12 Jan 2026 20:29:42 +0800 Subject: [PATCH 16/29] docs: Correct docs Signed-off-by: JiangJiaWei1103 --- historyserver/test/e2e/collector_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/historyserver/test/e2e/collector_test.go b/historyserver/test/e2e/collector_test.go index 230bb8448a5..c304d7b3bdd 100644 --- a/historyserver/test/e2e/collector_test.go +++ b/historyserver/test/e2e/collector_test.go @@ -556,9 +556,9 @@ func loadRayEventsFromS3(s3Client *s3.S3, bucket string, prefix string) ([]rayEv } // assertNonEmptyFileExist verifies that a file exists and has content (> 0 bytes). -// For a Ray cluster with one head node and one worker node, there are two logs/ directories to verify: -// - logs//... -// - logs//... +// For a Ray cluster with one head node and one worker node, there are two log directories to verify: +// - logs// +// - logs// func assertNonEmptyFileExist(test Test, g *WithT, s3Client *s3.S3, nodeLogDirPrefix string, fileName string) { fileKey := fmt.Sprintf("%s/%s", nodeLogDirPrefix, fileName) LogWithTimestamp(test.T(), "Verifying file %s has content (> 0 bytes)", fileKey) From 9f70a218f6d4f55c2810d106f995127cc98a8e32 Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Tue, 13 Jan 2026 23:27:27 +0800 Subject: [PATCH 17/29] refactor: List subdirs of job_events rather than hardcoding Signed-off-by: JiangJiaWei1103 --- historyserver/test/e2e/collector_test.go | 34 +++++++++++++++++++++--- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/historyserver/test/e2e/collector_test.go b/historyserver/test/e2e/collector_test.go index 52de1824418..e675dc633d8 100644 --- a/historyserver/test/e2e/collector_test.go +++ b/historyserver/test/e2e/collector_test.go @@ -437,11 +437,20 @@ func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix str } LogWithTimestamp(test.T(), "Verifying all %d event types are covered: %v", len(rayEventTypes), rayEventTypes) + nodeEventDirPrefix := fmt.Sprintf("%snode_events/", sessionPrefix) + jobEventDirPrefix := fmt.Sprintf("%sjob_events/", sessionPrefix) + + // Enumerate all event directories to be aggregated for verification. + eventDirPrefixes := []string{nodeEventDirPrefix} + jobEventDirPrefixes, err := listS3SubdirPrefixes(s3Client, s3BucketName, jobEventDirPrefix) + g.Expect(err).NotTo(HaveOccurred()) + LogWithTimestamp(test.T(), "Found %d job event subdir prefixes: %v", len(jobEventDirPrefixes), jobEventDirPrefixes) + eventDirPrefixes = append(eventDirPrefixes, jobEventDirPrefixes...) + g.Eventually(func(gg Gomega) { uploadedEvents := []rayEvent{} - for _, dir := range []string{"node_events", "job_events/AgAAAA==", "job_events/AQAAAA=="} { - dirPrefix := sessionPrefix + dir + "/" - events, err := loadRayEventsFromS3(s3Client, s3BucketName, dirPrefix) + for _, eventDirPrefix := range eventDirPrefixes { + events, err := loadRayEventsFromS3(s3Client, s3BucketName, eventDirPrefix) gg.Expect(err).NotTo(HaveOccurred()) uploadedEvents = append(uploadedEvents, events...) } @@ -499,7 +508,6 @@ fi` g.Expect(nodeID).NotTo(BeEmpty(), "nodeID should not be empty") return nodeID - } // FirstWorkerPod returns a function that gets the first worker pod from the Ray cluster. @@ -551,6 +559,24 @@ func getContainerStatusByName(pod *corev1.Pod, containerName string) (*corev1.Co return nil, fmt.Errorf("container %s not found in pod %s/%s", containerName, pod.Namespace, pod.Name) } +// listS3SubdirPrefixes lists all subdirectory prefixes in the S3 bucket under the given prefix. +func listS3SubdirPrefixes(s3Client *s3.S3, bucket string, prefix string) ([]string, error) { + objects, err := s3Client.ListObjectsV2(&s3.ListObjectsV2Input{ + Bucket: aws.String(bucket), + Prefix: aws.String(prefix), + Delimiter: aws.String("/"), + }) + if err != nil { + return nil, err + } + + subdirPrefixes := []string{} + for _, subdirPrefix := range objects.CommonPrefixes { + subdirPrefixes = append(subdirPrefixes, *subdirPrefix.Prefix) + } + return subdirPrefixes, nil +} + // loadRayEventsFromS3 loads Ray events from S3. func loadRayEventsFromS3(s3Client *s3.S3, bucket string, prefix string) ([]rayEvent, error) { objects, err := s3Client.ListObjectsV2(&s3.ListObjectsV2Input{ From 84519bc9a8e9ba360c67d62a9533456bdec8aeb5 Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Tue, 13 Jan 2026 23:36:00 +0800 Subject: [PATCH 18/29] fix: Wait for async job events flushing on worker Signed-off-by: JiangJiaWei1103 --- historyserver/config/rayjob.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/historyserver/config/rayjob.yaml b/historyserver/config/rayjob.yaml index dc5c5dbf041..c4577438fc9 100644 --- a/historyserver/config/rayjob.yaml +++ b/historyserver/config/rayjob.yaml @@ -6,6 +6,7 @@ spec: entrypoint: | python -c " import ray + import time ray.init() @ray.remote @@ -35,6 +36,8 @@ spec: final_count = ray.get(counter.get_count.remote()) print(f'Final count: {final_count}') print(f'Cluster resources: {ray.cluster_resources()}') + + time.sleep(5) " # Select the existing Ray cluster running the collector. clusterSelector: From c796bfce7fc56919754024bcb3311a543ff2d285 Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Wed, 14 Jan 2026 00:20:25 +0800 Subject: [PATCH 19/29] test: Consolidate tests by checking non-empty list Signed-off-by: JiangJiaWei1103 --- historyserver/test/e2e/collector_test.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/historyserver/test/e2e/collector_test.go b/historyserver/test/e2e/collector_test.go index e675dc633d8..a8b2200b399 100644 --- a/historyserver/test/e2e/collector_test.go +++ b/historyserver/test/e2e/collector_test.go @@ -439,19 +439,20 @@ func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix str LogWithTimestamp(test.T(), "Verifying all %d event types are covered: %v", len(rayEventTypes), rayEventTypes) nodeEventDirPrefix := fmt.Sprintf("%snode_events/", sessionPrefix) jobEventDirPrefix := fmt.Sprintf("%sjob_events/", sessionPrefix) - - // Enumerate all event directories to be aggregated for verification. eventDirPrefixes := []string{nodeEventDirPrefix} - jobEventDirPrefixes, err := listS3SubdirPrefixes(s3Client, s3BucketName, jobEventDirPrefix) - g.Expect(err).NotTo(HaveOccurred()) - LogWithTimestamp(test.T(), "Found %d job event subdir prefixes: %v", len(jobEventDirPrefixes), jobEventDirPrefixes) - eventDirPrefixes = append(eventDirPrefixes, jobEventDirPrefixes...) - g.Eventually(func(gg Gomega) { + // Enumerate all job event directories to be aggregated for verification. + jobEventDirPrefixes, err := listS3SubdirPrefixes(s3Client, s3BucketName, jobEventDirPrefix) + gg.Expect(err).NotTo(HaveOccurred()) + gg.Expect(jobEventDirPrefixes).NotTo(BeEmpty()) + LogWithTimestamp(test.T(), "Found %d job event subdir prefixes: %v", len(jobEventDirPrefixes), jobEventDirPrefixes) + eventDirPrefixes = append(eventDirPrefixes, jobEventDirPrefixes...) + uploadedEvents := []rayEvent{} for _, eventDirPrefix := range eventDirPrefixes { events, err := loadRayEventsFromS3(s3Client, s3BucketName, eventDirPrefix) gg.Expect(err).NotTo(HaveOccurred()) + gg.Expect(events).NotTo(BeEmpty()) uploadedEvents = append(uploadedEvents, events...) } From c3f73b121b21c512d76814c0ba1955cf4b4e30e0 Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Wed, 14 Jan 2026 00:29:32 +0800 Subject: [PATCH 20/29] fix: Aggregate all event files not just the first file obj Signed-off-by: JiangJiaWei1103 --- historyserver/test/e2e/collector_test.go | 42 ++++++++++++------------ 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/historyserver/test/e2e/collector_test.go b/historyserver/test/e2e/collector_test.go index a8b2200b399..839f8f8a83f 100644 --- a/historyserver/test/e2e/collector_test.go +++ b/historyserver/test/e2e/collector_test.go @@ -580,6 +580,9 @@ func listS3SubdirPrefixes(s3Client *s3.S3, bucket string, prefix string) ([]stri // loadRayEventsFromS3 loads Ray events from S3. func loadRayEventsFromS3(s3Client *s3.S3, bucket string, prefix string) ([]rayEvent, error) { + var events []rayEvent + + // List all file objects in the directory. objects, err := s3Client.ListObjectsV2(&s3.ListObjectsV2Input{ Bucket: aws.String(bucket), Prefix: aws.String(prefix), @@ -588,31 +591,28 @@ func loadRayEventsFromS3(s3Client *s3.S3, bucket string, prefix string) ([]rayEv return nil, err } - // Find the first file object for loading Ray events. - var fileKey string for _, obj := range objects.Contents { - if key := aws.StringValue(obj.Key); !strings.HasSuffix(key, "/") { - fileKey = key - break + fileKey := aws.StringValue(obj.Key) + if strings.HasSuffix(fileKey, "/") { + continue } - } - if fileKey == "" { - return nil, fmt.Errorf("no file object found in directory %s", prefix) - } - // Get and decode the file object content into Ray events. - content, err := s3Client.GetObject(&s3.GetObjectInput{ - Bucket: aws.String(bucket), - Key: aws.String(fileKey), - }) - if err != nil { - return nil, err - } - defer content.Body.Close() + // Get the file object content and decode it into Ray events. + content, err := s3Client.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(fileKey), + }) + if err != nil { + return nil, err + } + defer content.Body.Close() - var events []rayEvent - if err := json.NewDecoder(content.Body).Decode(&events); err != nil { - return nil, err + var fileEvents []rayEvent + if err := json.NewDecoder(content.Body).Decode(&fileEvents); err != nil { + return nil, err + } + events = append(events, fileEvents...) } + return events, nil } From 302d903edad05ecfc44957b609e1f7e08b01bfc4 Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Wed, 14 Jan 2026 00:33:45 +0800 Subject: [PATCH 21/29] fix: Avoid redundant appends Signed-off-by: JiangJiaWei1103 --- historyserver/test/e2e/collector_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/historyserver/test/e2e/collector_test.go b/historyserver/test/e2e/collector_test.go index 839f8f8a83f..02d28d4caeb 100644 --- a/historyserver/test/e2e/collector_test.go +++ b/historyserver/test/e2e/collector_test.go @@ -439,9 +439,9 @@ func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix str LogWithTimestamp(test.T(), "Verifying all %d event types are covered: %v", len(rayEventTypes), rayEventTypes) nodeEventDirPrefix := fmt.Sprintf("%snode_events/", sessionPrefix) jobEventDirPrefix := fmt.Sprintf("%sjob_events/", sessionPrefix) - eventDirPrefixes := []string{nodeEventDirPrefix} g.Eventually(func(gg Gomega) { - // Enumerate all job event directories to be aggregated for verification. + // Enumerate all event directories to be aggregated for verification. + eventDirPrefixes := []string{nodeEventDirPrefix} jobEventDirPrefixes, err := listS3SubdirPrefixes(s3Client, s3BucketName, jobEventDirPrefix) gg.Expect(err).NotTo(HaveOccurred()) gg.Expect(jobEventDirPrefixes).NotTo(BeEmpty()) From 6237c07641ebecd0e625363ede3bc9483bf42ff5 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Wed, 14 Jan 2026 19:21:52 +0800 Subject: [PATCH 22/29] list job Signed-off-by: Future-Outlier --- historyserver/test/e2e/collector_test.go | 62 ++++++++++++++++++++---- 1 file changed, 53 insertions(+), 9 deletions(-) diff --git a/historyserver/test/e2e/collector_test.go b/historyserver/test/e2e/collector_test.go index c304d7b3bdd..640a71693e0 100644 --- a/historyserver/test/e2e/collector_test.go +++ b/historyserver/test/e2e/collector_test.go @@ -178,8 +178,6 @@ func testCollectorSeparatesFilesBySession(test Test, g *WithT, namespace *corev1 // hits the memory limit, all processes in the container are killed together, thereby triggering container restart. // For more details, please refer to https://github.com/kubernetes/kubernetes/pull/117793 killContainerAndWaitForRestart(test, g, HeadPod(test, rayCluster), "ray-head") - // TODO(jwj): Clarify the automatic restart mechanism. - // Force kill the ray-worker container before automatic restart. killContainerAndWaitForRestart(test, g, FirstWorkerPod(test, rayCluster), "ray-worker") // Verify the old session logs have been processed on disk. @@ -389,7 +387,7 @@ func applyRayJobToCluster(test Test, g *WithT, namespace *corev1.Namespace, rayC // verifyS3SessionDirs verifies file contents in logs/, node_events/, and job_events/ directories under a session prefix in S3. // There are two phases of verification: // 1. Verify file contents in logs/ directory -// - For the head node, verify raylet.out and gcs_server.out exist and have content > 0 bytes +// - For the head node, verify raylet.out, gcs_server.out, and monitor.out exist and have content > 0 bytes // - For the worker node, verify raylet.out exists and have content > 0 bytes // // 2. Verify event type coverage in node_events/ and job_events/ directories @@ -402,8 +400,8 @@ func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix str headLogDirPrefix := fmt.Sprintf("%slogs/%s/", sessionPrefix, headNodeID) workerLogDirPrefix := fmt.Sprintf("%slogs/%s/", sessionPrefix, workerNodeID) - LogWithTimestamp(test.T(), "Verifying raylet.out and gcs_server.out exist in head log directory %s", headLogDirPrefix) - for _, fileName := range []string{"raylet.out", "gcs_server.out"} { + LogWithTimestamp(test.T(), "Verifying raylet.out, gcs_server.out, and monitor.out exist in head log directory %s", headLogDirPrefix) + for _, fileName := range []string{"raylet.out", "gcs_server.out", "monitor.out"} { assertNonEmptyFileExist(test, g, s3Client, headLogDirPrefix, fileName) } @@ -413,12 +411,28 @@ func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix str // Verify event type coverage in node_events/ and job_events/ directories. LogWithTimestamp(test.T(), "Verifying all %d event types are covered: %v", len(rayEventTypes), rayEventTypes) uploadedEvents := []rayEvent{} - for _, dir := range []string{"node_events", "job_events/AgAAAA==", "job_events/AQAAAA=="} { - dirPrefix := sessionPrefix + dir + "/" - events, err := loadRayEventsFromS3(s3Client, s3BucketName, dirPrefix) + + // Load events from node_events directory. + nodeEventsPrefix := sessionPrefix + "node_events/" + nodeEvents, err := loadRayEventsFromS3(s3Client, s3BucketName, nodeEventsPrefix) + g.Expect(err).NotTo(HaveOccurred()) + uploadedEvents = append(uploadedEvents, nodeEvents...) + LogWithTimestamp(test.T(), "Loaded %d events from node_events", len(nodeEvents)) + + // Dynamically discover and load events from job_events directories. + jobEventsPrefix := sessionPrefix + "job_events/" + jobDirs, err := listS3Directories(s3Client, s3BucketName, jobEventsPrefix) + g.Expect(err).NotTo(HaveOccurred()) + LogWithTimestamp(test.T(), "Found %d job directories: %v", len(jobDirs), jobDirs) + + for _, jobDir := range jobDirs { + jobDirPrefix := jobEventsPrefix + jobDir + "/" + jobEvents, err := loadRayEventsFromS3(s3Client, s3BucketName, jobDirPrefix) g.Expect(err).NotTo(HaveOccurred()) - uploadedEvents = append(uploadedEvents, events...) + uploadedEvents = append(uploadedEvents, jobEvents...) + LogWithTimestamp(test.T(), "Loaded %d events from job_events/%s", len(jobEvents), jobDir) } + assertAllEventTypesCovered(test, g, uploadedEvents) } @@ -586,3 +600,33 @@ func assertAllEventTypesCovered(test Test, g *WithT, events []rayEvent) { g.Expect(foundEventTypes[eventType]).To(BeTrue(), "Event type %s not found", eventType) } } + +// listS3Directories lists all directories (prefixes) under the given S3 prefix. +// In S3, directories are simulated using prefixes and delimiters. +// For example, given prefix "log/cluster/session/job_events/", this function returns ["AgAAAA==", "AQAAAA=="] +// which are the jobID directories under job_events/. +func listS3Directories(s3Client *s3.S3, bucket string, prefix string) ([]string, error) { + result, err := s3Client.ListObjectsV2(&s3.ListObjectsV2Input{ + Bucket: aws.String(bucket), + Prefix: aws.String(prefix), + Delimiter: aws.String("/"), + }) + if err != nil { + return nil, fmt.Errorf("failed to list S3 directories under %s: %w", prefix, err) + } + + // Extract directory names from CommonPrefixes. + var directories []string + for _, commonPrefix := range result.CommonPrefixes { + fullPrefix := aws.StringValue(commonPrefix.Prefix) + // Extract the directory name by removing the parent prefix and trailing slash. + // Example: "log/cluster/session/job_events/AgAAAA==/" -> "AgAAAA==" + dirName := strings.TrimPrefix(fullPrefix, prefix) + dirName = strings.TrimSuffix(dirName, "/") + if dirName != "" { + directories = append(directories, dirName) + } + } + + return directories, nil +} From 5c8fc59822b52f9ed76d3ebf35f31bd77dcb63ea Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Wed, 14 Jan 2026 20:07:36 +0800 Subject: [PATCH 23/29] fix: Explicitly close content body to avoid resource leaks Signed-off-by: JiangJiaWei1103 --- historyserver/test/e2e/collector_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/historyserver/test/e2e/collector_test.go b/historyserver/test/e2e/collector_test.go index 02d28d4caeb..36c83facaa8 100644 --- a/historyserver/test/e2e/collector_test.go +++ b/historyserver/test/e2e/collector_test.go @@ -605,13 +605,13 @@ func loadRayEventsFromS3(s3Client *s3.S3, bucket string, prefix string) ([]rayEv if err != nil { return nil, err } - defer content.Body.Close() var fileEvents []rayEvent if err := json.NewDecoder(content.Body).Decode(&fileEvents); err != nil { return nil, err } events = append(events, fileEvents...) + content.Body.Close() } return events, nil From 10237ecedf1026b77875f50445934b6d3c0ee764 Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Wed, 14 Jan 2026 20:11:14 +0800 Subject: [PATCH 24/29] docs: Remove redundant notes Signed-off-by: JiangJiaWei1103 --- historyserver/test/e2e/collector_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/historyserver/test/e2e/collector_test.go b/historyserver/test/e2e/collector_test.go index 36c83facaa8..e9825c0a20d 100644 --- a/historyserver/test/e2e/collector_test.go +++ b/historyserver/test/e2e/collector_test.go @@ -175,8 +175,6 @@ func testCollectorSeparatesFilesBySession(test Test, g *WithT, namespace *corev1 // hits the memory limit, all processes in the container are killed together, thereby triggering container restart. // For more details, please refer to https://github.com/kubernetes/kubernetes/pull/117793 killContainerAndWaitForRestart(test, g, HeadPod(test, rayCluster), "ray-head") - // TODO(jwj): Clarify the automatic restart mechanism. - // Force kill the ray-worker container before automatic restart. killContainerAndWaitForRestart(test, g, FirstWorkerPod(test, rayCluster), "ray-worker") // Verify the old session logs have been processed on disk. From 0e28fa20246a9f5d9a430a2a5f1439a30782dd14 Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Wed, 14 Jan 2026 20:26:40 +0800 Subject: [PATCH 25/29] fix: Close content on failure to prevent rsc leak Signed-off-by: JiangJiaWei1103 --- historyserver/test/e2e/collector_test.go | 96 +++++++++++++++--------- 1 file changed, 60 insertions(+), 36 deletions(-) diff --git a/historyserver/test/e2e/collector_test.go b/historyserver/test/e2e/collector_test.go index e9825c0a20d..c2dd68f3ff6 100644 --- a/historyserver/test/e2e/collector_test.go +++ b/historyserver/test/e2e/collector_test.go @@ -434,35 +434,33 @@ func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix str }, TestTimeoutMedium).Should(Succeed(), "Failed to verify at least one object in directory %s has content", dirPrefix) } + // Verify event type coverage in node_events/ and job_events/ directories. LogWithTimestamp(test.T(), "Verifying all %d event types are covered: %v", len(rayEventTypes), rayEventTypes) - nodeEventDirPrefix := fmt.Sprintf("%snode_events/", sessionPrefix) - jobEventDirPrefix := fmt.Sprintf("%sjob_events/", sessionPrefix) - g.Eventually(func(gg Gomega) { - // Enumerate all event directories to be aggregated for verification. - eventDirPrefixes := []string{nodeEventDirPrefix} - jobEventDirPrefixes, err := listS3SubdirPrefixes(s3Client, s3BucketName, jobEventDirPrefix) - gg.Expect(err).NotTo(HaveOccurred()) - gg.Expect(jobEventDirPrefixes).NotTo(BeEmpty()) - LogWithTimestamp(test.T(), "Found %d job event subdir prefixes: %v", len(jobEventDirPrefixes), jobEventDirPrefixes) - eventDirPrefixes = append(eventDirPrefixes, jobEventDirPrefixes...) + uploadedEvents := []rayEvent{} - uploadedEvents := []rayEvent{} - for _, eventDirPrefix := range eventDirPrefixes { - events, err := loadRayEventsFromS3(s3Client, s3BucketName, eventDirPrefix) - gg.Expect(err).NotTo(HaveOccurred()) - gg.Expect(events).NotTo(BeEmpty()) - uploadedEvents = append(uploadedEvents, events...) - } + // Load events from node_events directory. + nodeEventsPrefix := sessionPrefix + "node_events/" + nodeEvents, err := loadRayEventsFromS3(s3Client, s3BucketName, nodeEventsPrefix) + g.Expect(err).NotTo(HaveOccurred()) + uploadedEvents = append(uploadedEvents, nodeEvents...) + LogWithTimestamp(test.T(), "Loaded %d events from node_events", len(nodeEvents)) - // Verify that all potential event types are present in the events uploaded to S3. - foundEventTypes := map[string]bool{} - for _, event := range uploadedEvents { - foundEventTypes[event.EventType] = true - } - for _, eventType := range rayEventTypes { - gg.Expect(foundEventTypes[eventType]).To(BeTrue(), "Event type %s not found", eventType) - } - }, TestTimeoutShort).Should(Succeed(), "Failed to verify all event types are covered") + // Dynamically discover and load events from job_events directories. + jobEventsPrefix := sessionPrefix + "job_events/" + jobDirs, err := listS3Directories(s3Client, s3BucketName, jobEventsPrefix) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(jobDirs).NotTo(BeEmpty()) + LogWithTimestamp(test.T(), "Found %d job directories: %v", len(jobDirs), jobDirs) + + for _, jobDir := range jobDirs { + jobDirPrefix := jobEventsPrefix + jobDir + "/" + jobEvents, err := loadRayEventsFromS3(s3Client, s3BucketName, jobDirPrefix) + g.Expect(err).NotTo(HaveOccurred()) + uploadedEvents = append(uploadedEvents, jobEvents...) + LogWithTimestamp(test.T(), "Loaded %d events from job_events/%s", len(jobEvents), jobDir) + } + + assertAllEventTypesCovered(test, g, uploadedEvents) } // getSessionIDFromHeadPod retrieves the sessionID from the Ray head pod by reading the symlink @@ -558,22 +556,34 @@ func getContainerStatusByName(pod *corev1.Pod, containerName string) (*corev1.Co return nil, fmt.Errorf("container %s not found in pod %s/%s", containerName, pod.Namespace, pod.Name) } -// listS3SubdirPrefixes lists all subdirectory prefixes in the S3 bucket under the given prefix. -func listS3SubdirPrefixes(s3Client *s3.S3, bucket string, prefix string) ([]string, error) { - objects, err := s3Client.ListObjectsV2(&s3.ListObjectsV2Input{ +// listS3Directories lists all directories (prefixes) under the given S3 prefix. +// In S3, directories are simulated using prefixes and delimiters. +// For example, given prefix "log/cluster/session/job_events/", this function returns ["AgAAAA==", "AQAAAA=="] +// which are the jobID directories under job_events/. +func listS3Directories(s3Client *s3.S3, bucket string, prefix string) ([]string, error) { + result, err := s3Client.ListObjectsV2(&s3.ListObjectsV2Input{ Bucket: aws.String(bucket), Prefix: aws.String(prefix), Delimiter: aws.String("/"), }) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to list S3 directories under %s: %w", prefix, err) } - subdirPrefixes := []string{} - for _, subdirPrefix := range objects.CommonPrefixes { - subdirPrefixes = append(subdirPrefixes, *subdirPrefix.Prefix) + // Extract directory names from CommonPrefixes. + var directories []string + for _, commonPrefix := range result.CommonPrefixes { + fullPrefix := aws.StringValue(commonPrefix.Prefix) + // Extract the directory name by removing the parent prefix and trailing slash. + // Example: "log/cluster/session/job_events/AgAAAA==/" -> "AgAAAA==" + dirName := strings.TrimPrefix(fullPrefix, prefix) + dirName = strings.TrimSuffix(dirName, "/") + if dirName != "" { + directories = append(directories, dirName) + } } - return subdirPrefixes, nil + + return directories, nil } // loadRayEventsFromS3 loads Ray events from S3. @@ -606,11 +616,25 @@ func loadRayEventsFromS3(s3Client *s3.S3, bucket string, prefix string) ([]rayEv var fileEvents []rayEvent if err := json.NewDecoder(content.Body).Decode(&fileEvents); err != nil { - return nil, err + content.Body.Close() + return nil, fmt.Errorf("failed to decode Ray events from %s: %w", fileKey, err) } - events = append(events, fileEvents...) content.Body.Close() + + events = append(events, fileEvents...) } return events, nil } + +// assertAllEventTypesCovered verifies that all potential event types are present in the events uploaded to S3. +func assertAllEventTypesCovered(test Test, g *WithT, events []rayEvent) { + foundEventTypes := map[string]bool{} + for _, event := range events { + foundEventTypes[event.EventType] = true + } + + for _, eventType := range rayEventTypes { + g.Expect(foundEventTypes[eventType]).To(BeTrue(), "Event type %s not found", eventType) + } +} From 48681abc368f7db7c837af557285e9d36edd2c1c Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Wed, 14 Jan 2026 20:34:06 +0800 Subject: [PATCH 26/29] docs: Update helper usage Signed-off-by: JiangJiaWei1103 --- historyserver/test/e2e/collector_test.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/historyserver/test/e2e/collector_test.go b/historyserver/test/e2e/collector_test.go index c2dd68f3ff6..109a422d832 100644 --- a/historyserver/test/e2e/collector_test.go +++ b/historyserver/test/e2e/collector_test.go @@ -115,7 +115,7 @@ func testCollectorUploadOnGracefulShutdown(test Test, g *WithT, namespace *corev rayCluster := prepareTestEnv(test, g, namespace, s3Client) // Submit a Ray job to the existing cluster. - _ = applyRayJobToCluster(test, g, namespace, rayCluster) + _ = applyRayJobAndWaitForCompletion(test, g, namespace) clusterNameID := fmt.Sprintf("%s_%s", rayCluster.Name, rayClusterID) sessionID := getSessionIDFromHeadPod(test, g, rayCluster) @@ -161,7 +161,7 @@ func testCollectorSeparatesFilesBySession(test Test, g *WithT, namespace *corev1 rayCluster := prepareTestEnv(test, g, namespace, s3Client) // Submit a Ray job to the existing cluster. - _ = applyRayJobToCluster(test, g, namespace, rayCluster) + _ = applyRayJobAndWaitForCompletion(test, g, namespace) clusterNameID := fmt.Sprintf("%s_%s", rayCluster.Name, rayClusterID) sessionID := getSessionIDFromHeadPod(test, g, rayCluster) @@ -359,8 +359,9 @@ func applyRayCluster(test Test, g *WithT, namespace *corev1.Namespace) *rayv1.Ra return rayCluster } -// applyRayJobToCluster applies a Ray job to the existing Ray cluster. -func applyRayJobToCluster(test Test, g *WithT, namespace *corev1.Namespace, rayCluster *rayv1.RayCluster) *rayv1.RayJob { +// applyRayJobAndWaitForCompletion applies a Ray job to the existing Ray cluster and waits for it to complete successfully. +// In the RayJob manifest, the clusterSelector is set to the existing Ray cluster, raycluster-historyserver. +func applyRayJobAndWaitForCompletion(test Test, g *WithT, namespace *corev1.Namespace) *rayv1.RayJob { rayJobFromYaml := DeserializeRayJobYAML(test, rayJobManifestPath) rayJobFromYaml.Namespace = namespace.Name From e64079829fc3488d56990debe14307329efc559d Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Wed, 14 Jan 2026 21:16:41 +0800 Subject: [PATCH 27/29] test: Test log file existence only Signed-off-by: JiangJiaWei1103 --- historyserver/test/e2e/collector_test.go | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/historyserver/test/e2e/collector_test.go b/historyserver/test/e2e/collector_test.go index 640a71693e0..421b6585cb8 100644 --- a/historyserver/test/e2e/collector_test.go +++ b/historyserver/test/e2e/collector_test.go @@ -387,8 +387,8 @@ func applyRayJobToCluster(test Test, g *WithT, namespace *corev1.Namespace, rayC // verifyS3SessionDirs verifies file contents in logs/, node_events/, and job_events/ directories under a session prefix in S3. // There are two phases of verification: // 1. Verify file contents in logs/ directory -// - For the head node, verify raylet.out, gcs_server.out, and monitor.out exist and have content > 0 bytes -// - For the worker node, verify raylet.out exists and have content > 0 bytes +// - For the head node, verify raylet.out, gcs_server.out, and monitor.out exist +// - For the worker node, verify raylet.out exists // // 2. Verify event type coverage in node_events/ and job_events/ directories // - Aggregate all events from node_events/ and job_events/ directories @@ -402,11 +402,11 @@ func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix str LogWithTimestamp(test.T(), "Verifying raylet.out, gcs_server.out, and monitor.out exist in head log directory %s", headLogDirPrefix) for _, fileName := range []string{"raylet.out", "gcs_server.out", "monitor.out"} { - assertNonEmptyFileExist(test, g, s3Client, headLogDirPrefix, fileName) + assertFileExist(test, g, s3Client, headLogDirPrefix, fileName) } LogWithTimestamp(test.T(), "Verifying raylet.out exists in worker log directory %s", workerLogDirPrefix) - assertNonEmptyFileExist(test, g, s3Client, workerLogDirPrefix, "raylet.out") + assertFileExist(test, g, s3Client, workerLogDirPrefix, "raylet.out") // Verify event type coverage in node_events/ and job_events/ directories. LogWithTimestamp(test.T(), "Verifying all %d event types are covered: %v", len(rayEventTypes), rayEventTypes) @@ -569,24 +569,21 @@ func loadRayEventsFromS3(s3Client *s3.S3, bucket string, prefix string) ([]rayEv return events, nil } -// assertNonEmptyFileExist verifies that a file exists and has content (> 0 bytes). +// assertFileExist verifies that a file object exists under the given log directory prefix. // For a Ray cluster with one head node and one worker node, there are two log directories to verify: // - logs// // - logs// -func assertNonEmptyFileExist(test Test, g *WithT, s3Client *s3.S3, nodeLogDirPrefix string, fileName string) { +func assertFileExist(test Test, g *WithT, s3Client *s3.S3, nodeLogDirPrefix string, fileName string) { fileKey := fmt.Sprintf("%s/%s", nodeLogDirPrefix, fileName) - LogWithTimestamp(test.T(), "Verifying file %s has content (> 0 bytes)", fileKey) + LogWithTimestamp(test.T(), "Verifying file %s exists", fileKey) g.Eventually(func(gg Gomega) { - // Verify the file has content by checking file size. - obj, err := s3Client.HeadObject(&s3.HeadObjectInput{ + _, err := s3Client.HeadObject(&s3.HeadObjectInput{ Bucket: aws.String(s3BucketName), Key: aws.String(fileKey), }) gg.Expect(err).NotTo(HaveOccurred()) - fileSize := aws.Int64Value(obj.ContentLength) - gg.Expect(fileSize).To(BeNumerically(">", 0)) - LogWithTimestamp(test.T(), "Verified file %s has content: %d bytes", fileKey, fileSize) - }, TestTimeoutMedium).Should(Succeed(), "Failed to verify file %s has content (> 0 bytes)", fileKey) + LogWithTimestamp(test.T(), "Verified file %s exists", fileKey) + }, TestTimeoutMedium).Should(Succeed(), "Failed to verify file %s exists", fileKey) } // assertAllEventTypesCovered verifies that all potential event types are present in the events uploaded to S3. From 706199c44b72b1ed009127c6467cadc983927965 Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Wed, 14 Jan 2026 21:19:42 +0800 Subject: [PATCH 28/29] style: Remove trailing slash Signed-off-by: JiangJiaWei1103 --- historyserver/test/e2e/collector_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/historyserver/test/e2e/collector_test.go b/historyserver/test/e2e/collector_test.go index 421b6585cb8..18e05100cc0 100644 --- a/historyserver/test/e2e/collector_test.go +++ b/historyserver/test/e2e/collector_test.go @@ -397,8 +397,8 @@ func applyRayJobToCluster(test Test, g *WithT, namespace *corev1.Namespace, rayC // NOTE: Since flushed node and job events are nondeterministic, we need to aggregate them first before verifying event type coverage. func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix string, headNodeID string, workerNodeID string) { // Verify file contents in logs/ directory. - headLogDirPrefix := fmt.Sprintf("%slogs/%s/", sessionPrefix, headNodeID) - workerLogDirPrefix := fmt.Sprintf("%slogs/%s/", sessionPrefix, workerNodeID) + headLogDirPrefix := fmt.Sprintf("%slogs/%s", sessionPrefix, headNodeID) + workerLogDirPrefix := fmt.Sprintf("%slogs/%s", sessionPrefix, workerNodeID) LogWithTimestamp(test.T(), "Verifying raylet.out, gcs_server.out, and monitor.out exist in head log directory %s", headLogDirPrefix) for _, fileName := range []string{"raylet.out", "gcs_server.out", "monitor.out"} { From 6f4710553fafa5198aed18d9d77ff6f0beba5045 Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Thu, 15 Jan 2026 20:07:57 +0800 Subject: [PATCH 29/29] docs: State why we use sleep Signed-off-by: JiangJiaWei1103 --- historyserver/config/rayjob.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/historyserver/config/rayjob.yaml b/historyserver/config/rayjob.yaml index 305ce1a6274..3ce3619364a 100644 --- a/historyserver/config/rayjob.yaml +++ b/historyserver/config/rayjob.yaml @@ -37,6 +37,8 @@ spec: print(f'Final count: {final_count}') print(f'Cluster resources: {ray.cluster_resources()}') + # For now, events on the worker nodes aren't sent to the collector when calling ray.shutdown(). + # As a workaround, we explicitly wait for 5 seconds to ensure events are sent. time.sleep(5) " # Select the existing Ray cluster running the collector.