Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/ray/ray_syncer/node_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,12 @@ bool NodeState::ConsumeSyncMessage(std::shared_ptr<const RaySyncMessage> message
<< (current ? current->version() : -1)
<< " message_version=" << message->version()
<< ", message_from=" << NodeID::FromBinary(message->node_id());

// Check whether newer version of this message has been received.
if (current && current->version() >= message->version()) {
RAY_LOG(INFO) << "Dropping sync message with stale version. latest version: "
<< current->version()
<< ", dropped message version: " << message->version();
return false;
}

Expand Down
18 changes: 11 additions & 7 deletions src/ray/ray_syncer/ray_syncer_bidi_reactor_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,18 @@ class RaySyncerBidiReactorBase : public RaySyncerBidiReactor, public T {
}

auto &node_versions = GetNodeComponentVersions(message->node_id());
if (node_versions[message->message_type()] < message->version()) {
node_versions[message->message_type()] = message->version();
sending_buffer_[std::make_pair(message->node_id(), message->message_type())] =
std::move(message);
StartSend();
return true;
if (node_versions[message->message_type()] >= message->version()) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no behavior change, just reversed the early return logic

RAY_LOG(INFO) << "Dropping sync message with stale version. latest version: "
<< node_versions[message->message_type()]
<< ", dropped message version: " << message->version();
return false;
}
return false;

node_versions[message->message_type()] = message->version();
sending_buffer_[std::make_pair(message->node_id(), message->message_type())] =
std::move(message);
StartSend();
return true;
}

virtual ~RaySyncerBidiReactorBase() = default;
Expand Down
37 changes: 29 additions & 8 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,6 @@ NodeManager::NodeManager(
cluster_lease_manager_(cluster_lease_manager),
record_metrics_period_ms_(config.record_metrics_period_ms),
placement_group_resource_manager_(placement_group_resource_manager),
next_resource_seq_no_(0),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was vestigial

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does vestigial mean lol

ray_syncer_(io_service_, self_node_id_.Binary()),
worker_killing_policy_(std::make_shared<GroupByOwnerIdWorkerKillingPolicy>()),
memory_monitor_(std::make_unique<MemoryMonitor>(
Expand Down Expand Up @@ -322,16 +321,29 @@ void NodeManager::RegisterGcs() {
auto on_node_change_subscribe_done = [this](Status status) {
RAY_CHECK_OK(status);

// Register resource manager and scheduler
// RESOURCE_VIEW is used to synchronize available resources across Raylets.
//
// LocalResourceManager::CreateSyncMessage will be called periodically to collect
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's good to mention that it's both periodically called and also on-demand when local resources change.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it is not called on-demand! Inside of OnResourceOrStateChanged, we increment the version but we do not actually eagerly broadcast.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh whattt, thanks for the clarification 🤯

// the local Raylet's usage to broadcast to others (via the GCS). The updates are
// versioned inside of `LocalResourceManager` to avoid unnecessary broadcasts.
//
// NodeManager::ConsumeSyncMessage will be called when a sync message containing
// other Raylets' resource usage is received.
ray_syncer_.Register(
/* message_type */ syncer::MessageType::RESOURCE_VIEW,
/* reporter */ &cluster_resource_scheduler_.GetLocalResourceManager(),
/* receiver */ this,
/* pull_from_reporter_interval_ms */
report_resources_period_ms_);

// Register a commands channel.
// It's only used for GC right now.
// COMMANDS is used only to broadcast a global request to call the Python garbage
// collector on all Raylets when the cluster is under memory pressure.
//
// Periodic collection is disabled, so this command is only broadcasted via
// `OnDemandBroadcasting` (which will call NodeManager::CreateSyncMessage).
//
// NodeManager::ConsumeSyncMessage is called to execute the GC command from other
// Raylets.
ray_syncer_.Register(
/* message_type */ syncer::MessageType::COMMANDS,
/* reporter */ this,
Expand All @@ -346,6 +358,9 @@ void NodeManager::RegisterGcs() {
// If plasma store is under high pressure, we should try to schedule a global
// gc.
if (triggered_by_global_gc) {
// Always increment the sync message version number so that all GC commands
// are sent indiscriminately.
gc_command_sync_version_++;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's good to mention that even though we call OnDemandBroadcasting, it's only sent to the GCS and not to other raylets

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would also be good to mention in BroadcastMessage or the map of sync_reactors_ that for node managers, we only have one bidi reactor which is to the GCS. GCS has multiple bidi reactors, one for each node. Hence just to emphasize that it's NOT all to all on the raylet level, it's node to GCS to all nodes

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I included this where we initialize the syncer and in the ray_syncer_ field comment. Putting it here specifically felt odd because it applies to all usage of the syncer.

ray_syncer_.OnDemandBroadcasting(syncer::MessageType::COMMANDS);
}
},
Expand Down Expand Up @@ -3032,19 +3047,25 @@ void NodeManager::ConsumeSyncMessage(

std::optional<syncer::RaySyncMessage> NodeManager::CreateSyncMessage(
Copy link
Contributor

@Sparks0219 Sparks0219 Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we rename this to CreateSyncCommandsMessage?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we cannot because it's a virtual method to implement the sync broadcaster interface. I tried to do that already :'(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohh... I see both LocalResourceManager and NodeManager inherit from the syncer class and override this RIP

int64_t after_version, syncer::MessageType message_type) const {
// This method is only called for the COMMANDS channel, as the RESOURCE_VIEW
// channel goes through the LocalResourceManager.
RAY_CHECK_EQ(message_type, syncer::MessageType::COMMANDS);

// Serialize the COMMANDS message to a byte string to be nested inside the sync message.
std::string serialized_commands_sync_msg;
syncer::CommandsSyncMessage commands_sync_message;
commands_sync_message.set_should_global_gc(true);
commands_sync_message.set_cluster_full_of_actors_detected(resource_deadlock_warned_ >=
1);
RAY_CHECK(commands_sync_message.SerializeToString(&serialized_commands_sync_msg));

// Populate the sync message.
syncer::RaySyncMessage msg;
msg.set_version(absl::GetCurrentTimeNanos());
msg.set_version(gc_command_sync_version_);
msg.set_node_id(self_node_id_.Binary());
msg.set_message_type(syncer::MessageType::COMMANDS);
std::string serialized_msg;
RAY_CHECK(commands_sync_message.SerializeToString(&serialized_msg));
msg.set_sync_message(std::move(serialized_msg));
msg.set_sync_message(std::move(serialized_commands_sync_msg));

return std::make_optional(std::move(msg));
}

Expand Down
18 changes: 14 additions & 4 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,18 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
/// Get the port of the node manager rpc server.
int GetServerPort() const { return node_manager_server_.GetPort(); }

// Consume a RaySyncer sync message from another Raylet.
//
// The two types of messages that are received are:
// - RESOURCE_VIEW: an update of the resources available on another Raylet.
// - COMMANDS: a request to run the Python garbage collector globally across Raylets.
void ConsumeSyncMessage(std::shared_ptr<const syncer::RaySyncMessage> message) override;

// Generate a RaySyncer sync message to be sent to other Raylets.
//
// This is currently only used to generate messages for the COMMANDS channel to request
// other Raylets to call the Python garbage collector, and is only called on demand
// (not periodically polled by the RaySyncer code).
std::optional<syncer::RaySyncMessage> CreateSyncMessage(
int64_t after_version, syncer::MessageType message_type) const override;

Expand Down Expand Up @@ -890,13 +900,13 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
/// Managers all bundle-related operations.
PlacementGroupResourceManager &placement_group_resource_manager_;

/// Next resource broadcast seq no. Non-incrementing sequence numbers
/// indicate network issues (dropped/duplicated/ooo packets, etc).
int64_t next_resource_seq_no_;

/// Ray syncer for synchronization
syncer::RaySyncer ray_syncer_;

/// `version` for the RaySyncer COMMANDS channel. Monotonically incremented each time
/// we issue a GC command so that none of the messages are dropped.
int64_t gc_command_sync_version_ = 0;

/// The Policy for selecting the worker to kill when the node runs out of memory.
std::shared_ptr<WorkerKillingPolicy> worker_killing_policy_;

Expand Down