Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions historyserver/config/raycluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,18 @@ spec:
affinity:
containers:
- env:
- name: RAY_enable_ray_event
value: "true"
Comment on lines +23 to +24
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to enable all types of event, we should always enable it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is for enabling gcs level event

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should also be "1" instead of bool string

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

- name: RAY_enable_core_worker_ray_event_to_aggregator
value: "1"
value: "true"
- name: RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR
value: "http://localhost:8084/v1/events"
# in ray 2.52.0, we need to set RAY_DASHBOARD_AGGREGATOR_AGENT_EXPOSABLE_EVENT_TYPES
# in ray 2.53.0 (noy yet done). we need to set RAY_DASHBOARD_AGGREGATOR_AGENT_PUBLISHER_HTTP_ENDPOINT_EXPOSABLE_EVENT_TYPES
- name: RAY_DASHBOARD_AGGREGATOR_AGENT_EXPOSABLE_EVENT_TYPES
value: "TASK_DEFINITION_EVENT,TASK_LIFECYCLE_EVENT,ACTOR_TASK_DEFINITION_EVENT,
TASK_PROFILE_EVENT,DRIVER_JOB_DEFINITION_EVENT,DRIVER_JOB_LIFECYCLE_EVENT,
ACTOR_DEFINITION_EVENT,ACTOR_LIFECYCLE_EVENT,NODE_DEFINITION_EVENT,NODE_LIFECYCLE_EVENT"
image: rayproject/ray:2.52.0
imagePullPolicy: IfNotPresent
command:
Expand Down Expand Up @@ -105,10 +113,10 @@ spec:
workerGroupSpecs:
- groupName: cpu
maxReplicas: 1000
minReplicas: 0
minReplicas: 1
numOfHosts: 1
rayStartParams: {}
replicas: 0
replicas: 1
template:
metadata:
labels:
Expand All @@ -117,10 +125,18 @@ spec:
imagePullSecrets:
containers:
- env:
- name: RAY_enable_ray_event
value: "true"
- name: RAY_enable_core_worker_ray_event_to_aggregator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be "1", I dont think it accepts bool strings

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also note that even when this config is enabled, events will still go to GCS. to disable that we need to set RAY_enable_core_worker_task_event_to_gcs="0". but this will break cli and state API.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh TIL!

Copy link
Member Author

@Future-Outlier Future-Outlier Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you, will keep in mind, curretly we might need to enable it until we can fully support removed the GCS, since there are some data not provided from base event.

for example:

/api/data/datasets/{job_id}
/api/serve/applications/

value: "1"
value: "true"
- name: RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR
value: "http://localhost:8084/v1/events"
# in ray 2.52.0, we need to set RAY_DASHBOARD_AGGREGATOR_AGENT_EXPOSABLE_EVENT_TYPES
# in ray 2.53.0 (not yet done). we need to set RAY_DASHBOARD_AGGREGATOR_AGENT_PUBLISHER_HTTP_ENDPOINT_EXPOSABLE_EVENT_TYPES
- name: RAY_DASHBOARD_AGGREGATOR_AGENT_EXPOSABLE_EVENT_TYPES
value: "TASK_DEFINITION_EVENT,TASK_LIFECYCLE_EVENT,ACTOR_TASK_DEFINITION_EVENT,
TASK_PROFILE_EVENT,DRIVER_JOB_DEFINITION_EVENT,DRIVER_JOB_LIFECYCLE_EVENT,
ACTOR_DEFINITION_EVENT,ACTOR_LIFECYCLE_EVENT,NODE_DEFINITION_EVENT,NODE_LIFECYCLE_EVENT"
image: rayproject/ray:2.52.0
command:
- 'echo "=========================================="; [ -d "/tmp/ray/session_latest" ] && dest="/tmp/ray/prev-logs/$(basename $(readlink /tmp/ray/session_latest))/$(cat /tmp/ray/raylet_node_id)" && echo "dst is $dest" && mkdir -p "$dest" && mv /tmp/ray/session_latest/logs "$dest/logs"; echo "========================================="'
Expand Down
34 changes: 33 additions & 1 deletion historyserver/config/rayjob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,39 @@ kind: RayJob
metadata:
name: rayjob
spec:
entrypoint: python -c "import ray; ray.init(); print(ray.cluster_resources())"
entrypoint: |
python -c "
import ray
ray.init()

@ray.remote
def my_task(x):
return x * 2

@ray.remote
class Counter:
def __init__(self):
self.count = 0

def increment(self):
self.count += 1
return self.count

def get_count(self):
return self.count

task_result = ray.get(my_task.remote(1))
print(f'Task result: {task_result}')

counter = Counter.remote()
for i in range(1):
count = ray.get(counter.increment.remote())
print(f'Counter: {count}')

final_count = ray.get(counter.get_count.remote())
print(f'Final count: {final_count}')
print(f'Cluster resources: {ray.cluster_resources()}')
"
# Select the existing Ray cluster running the collector.
clusterSelector:
ray.io/cluster: raycluster-historyserver
24 changes: 22 additions & 2 deletions historyserver/pkg/collector/eventserver/eventserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,21 @@ type EventServer struct {
mutex sync.Mutex
}

var eventTypesWithJobID = []string{
// Job Events (Driver Job)
"driverJobDefinitionEvent",
"driverJobLifecycleEvent",

// Task Events (Normal Task)
"taskDefinitionEvent",
"taskLifecycleEvent",
"taskProfileEvents",

// Actor Events (Actor Task + Actor Definition)
"actorTaskDefinitionEvent",
"actorDefinitionEvent",
}

func NewEventServer(writer storage.StorageWriter, rootDir, sessionDir, nodeID, clusterName, clusterID, sessionName string) *EventServer {
server := &EventServer{
events: make([]Event, 0),
Expand Down Expand Up @@ -408,9 +423,14 @@ func (es *EventServer) isNodeEvent(eventData map[string]interface{}) bool {

// getJobID gets jobID associated with event
func (es *EventServer) getJobID(eventData map[string]interface{}) string {
if jobID, hasJob := eventData["jobId"]; hasJob && jobID != "" {
return fmt.Sprintf("%v", jobID)
Comment on lines -411 to -412
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ray event structure never contains jobId at the top level of the event data.

Looking at actual Ray events, the structure is:

{
  "eventId": "...",
  "eventType": "TASK_DEFINITION_EVENT",
  "message": "",
  "sessionName": "...",
  "severity": "INFO",
  "sourceType": "CORE_WORKER",
  "timestamp": "...",
  "taskDefinitionEvent": {       // <-- Nested event object
    "jobId": "AgAAAA==",         // <-- jobId is ALWAYS here (nested)
    "language": "PYTHON",
    ...
  }
}

The jobId field is always nested inside the specific event type object (e.g., taskDefinitionEvent, actorDefinitionEvent, driverJobDefinitionEvent), never at the top level of eventData.

for _, eventType := range eventTypesWithJobID {
if nestedEvent, ok := eventData[eventType].(map[string]interface{}); ok {
if jobID, hasJob := nestedEvent["jobId"]; hasJob && jobID != "" {
return fmt.Sprintf("%v", jobID)
}
}
}

return ""
}

Expand Down
Loading