diff --git a/historyserver/config/raycluster.yaml b/historyserver/config/raycluster.yaml index d8eaaeed479..bd16764261d 100644 --- a/historyserver/config/raycluster.yaml +++ b/historyserver/config/raycluster.yaml @@ -20,10 +20,18 @@ spec: affinity: containers: - env: + - name: RAY_enable_ray_event + value: "true" - name: RAY_enable_core_worker_ray_event_to_aggregator - value: "1" + value: "true" - name: RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR value: "http://localhost:8084/v1/events" + # in ray 2.52.0, we need to set RAY_DASHBOARD_AGGREGATOR_AGENT_EXPOSABLE_EVENT_TYPES + # in ray 2.53.0 (noy yet done). we need to set RAY_DASHBOARD_AGGREGATOR_AGENT_PUBLISHER_HTTP_ENDPOINT_EXPOSABLE_EVENT_TYPES + - name: RAY_DASHBOARD_AGGREGATOR_AGENT_EXPOSABLE_EVENT_TYPES + value: "TASK_DEFINITION_EVENT,TASK_LIFECYCLE_EVENT,ACTOR_TASK_DEFINITION_EVENT, + TASK_PROFILE_EVENT,DRIVER_JOB_DEFINITION_EVENT,DRIVER_JOB_LIFECYCLE_EVENT, + ACTOR_DEFINITION_EVENT,ACTOR_LIFECYCLE_EVENT,NODE_DEFINITION_EVENT,NODE_LIFECYCLE_EVENT" image: rayproject/ray:2.52.0 imagePullPolicy: IfNotPresent command: @@ -105,10 +113,10 @@ spec: workerGroupSpecs: - groupName: cpu maxReplicas: 1000 - minReplicas: 0 + minReplicas: 1 numOfHosts: 1 rayStartParams: {} - replicas: 0 + replicas: 1 template: metadata: labels: @@ -117,10 +125,18 @@ spec: imagePullSecrets: containers: - env: + - name: RAY_enable_ray_event + value: "true" - name: RAY_enable_core_worker_ray_event_to_aggregator - value: "1" + value: "true" - name: RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR value: "http://localhost:8084/v1/events" + # in ray 2.52.0, we need to set RAY_DASHBOARD_AGGREGATOR_AGENT_EXPOSABLE_EVENT_TYPES + # in ray 2.53.0 (not yet done). we need to set RAY_DASHBOARD_AGGREGATOR_AGENT_PUBLISHER_HTTP_ENDPOINT_EXPOSABLE_EVENT_TYPES + - name: RAY_DASHBOARD_AGGREGATOR_AGENT_EXPOSABLE_EVENT_TYPES + value: "TASK_DEFINITION_EVENT,TASK_LIFECYCLE_EVENT,ACTOR_TASK_DEFINITION_EVENT, + TASK_PROFILE_EVENT,DRIVER_JOB_DEFINITION_EVENT,DRIVER_JOB_LIFECYCLE_EVENT, + ACTOR_DEFINITION_EVENT,ACTOR_LIFECYCLE_EVENT,NODE_DEFINITION_EVENT,NODE_LIFECYCLE_EVENT" image: rayproject/ray:2.52.0 command: - 'echo "=========================================="; [ -d "/tmp/ray/session_latest" ] && dest="/tmp/ray/prev-logs/$(basename $(readlink /tmp/ray/session_latest))/$(cat /tmp/ray/raylet_node_id)" && echo "dst is $dest" && mkdir -p "$dest" && mv /tmp/ray/session_latest/logs "$dest/logs"; echo "========================================="' diff --git a/historyserver/config/rayjob.yaml b/historyserver/config/rayjob.yaml index 2c7119f8285..dc5c5dbf041 100644 --- a/historyserver/config/rayjob.yaml +++ b/historyserver/config/rayjob.yaml @@ -3,7 +3,39 @@ kind: RayJob metadata: name: rayjob spec: - entrypoint: python -c "import ray; ray.init(); print(ray.cluster_resources())" + entrypoint: | + python -c " + import ray + ray.init() + + @ray.remote + def my_task(x): + return x * 2 + + @ray.remote + class Counter: + def __init__(self): + self.count = 0 + + def increment(self): + self.count += 1 + return self.count + + def get_count(self): + return self.count + + task_result = ray.get(my_task.remote(1)) + print(f'Task result: {task_result}') + + counter = Counter.remote() + for i in range(1): + count = ray.get(counter.increment.remote()) + print(f'Counter: {count}') + + final_count = ray.get(counter.get_count.remote()) + print(f'Final count: {final_count}') + print(f'Cluster resources: {ray.cluster_resources()}') + " # Select the existing Ray cluster running the collector. clusterSelector: ray.io/cluster: raycluster-historyserver diff --git a/historyserver/pkg/collector/eventserver/eventserver.go b/historyserver/pkg/collector/eventserver/eventserver.go index b0143c63c96..6ae44d53058 100644 --- a/historyserver/pkg/collector/eventserver/eventserver.go +++ b/historyserver/pkg/collector/eventserver/eventserver.go @@ -45,6 +45,21 @@ type EventServer struct { mutex sync.Mutex } +var eventTypesWithJobID = []string{ + // Job Events (Driver Job) + "driverJobDefinitionEvent", + "driverJobLifecycleEvent", + + // Task Events (Normal Task) + "taskDefinitionEvent", + "taskLifecycleEvent", + "taskProfileEvents", + + // Actor Events (Actor Task + Actor Definition) + "actorTaskDefinitionEvent", + "actorDefinitionEvent", +} + func NewEventServer(writer storage.StorageWriter, rootDir, sessionDir, nodeID, clusterName, clusterID, sessionName string) *EventServer { server := &EventServer{ events: make([]Event, 0), @@ -408,9 +423,14 @@ func (es *EventServer) isNodeEvent(eventData map[string]interface{}) bool { // getJobID gets jobID associated with event func (es *EventServer) getJobID(eventData map[string]interface{}) string { - if jobID, hasJob := eventData["jobId"]; hasJob && jobID != "" { - return fmt.Sprintf("%v", jobID) + for _, eventType := range eventTypesWithJobID { + if nestedEvent, ok := eventData[eventType].(map[string]interface{}); ok { + if jobID, hasJob := nestedEvent["jobId"]; hasJob && jobID != "" { + return fmt.Sprintf("%v", jobID) + } + } } + return "" }