@@ -224,7 +224,6 @@ NodeManager::NodeManager(
224224 cluster_lease_manager_ (cluster_lease_manager),
225225 record_metrics_period_ms_ (config.record_metrics_period_ms),
226226 placement_group_resource_manager_ (placement_group_resource_manager),
227- next_resource_seq_no_ (0 ),
228227 ray_syncer_ (io_service_, self_node_id_.Binary()),
229228 worker_killing_policy_ (std::make_shared<GroupByOwnerIdWorkerKillingPolicy>()),
230229 memory_monitor_ (std::make_unique<MemoryMonitor>(
@@ -322,16 +321,29 @@ void NodeManager::RegisterGcs() {
322321 auto on_node_change_subscribe_done = [this ](Status status) {
323322 RAY_CHECK_OK (status);
324323
325- // Register resource manager and scheduler
324+ // RESOURCE_VIEW is used to synchronize available resources across Raylets.
325+ //
326+ // LocalResourceManager::CreateSyncMessage will be called periodically to collect
327+ // the local Raylet's usage to broadcast to others (via the GCS). The updates are
328+ // versioned inside of `LocalResourceManager` to avoid unnecessary broadcasts.
329+ //
330+ // NodeManager::ConsumeSyncMessage will be called when a sync message containing
331+ // other Raylets' resource usage is received.
326332 ray_syncer_.Register (
327333 /* message_type */ syncer::MessageType::RESOURCE_VIEW,
328334 /* reporter */ &cluster_resource_scheduler_.GetLocalResourceManager (),
329335 /* receiver */ this ,
330336 /* pull_from_reporter_interval_ms */
331337 report_resources_period_ms_);
332338
333- // Register a commands channel.
334- // It's only used for GC right now.
339+ // COMMANDS is used only to broadcast a global request to call the Python garbage
340+ // collector on all Raylets when the cluster is under memory pressure.
341+ //
342+ // Periodic collection is disabled, so this command is only broadcasted via
343+ // `OnDemandBroadcasting` (which will call NodeManager::CreateSyncMessage).
344+ //
345+ // NodeManager::ConsumeSyncMessage is called to execute the GC command from other
346+ // Raylets.
335347 ray_syncer_.Register (
336348 /* message_type */ syncer::MessageType::COMMANDS,
337349 /* reporter */ this ,
@@ -346,6 +358,9 @@ void NodeManager::RegisterGcs() {
346358 // If plasma store is under high pressure, we should try to schedule a global
347359 // gc.
348360 if (triggered_by_global_gc) {
361+ // Always increment the sync message version number so that all GC commands
362+ // are sent indiscriminately.
363+ gc_command_sync_version_++;
349364 ray_syncer_.OnDemandBroadcasting (syncer::MessageType::COMMANDS);
350365 }
351366 },
@@ -3032,19 +3047,25 @@ void NodeManager::ConsumeSyncMessage(
30323047
30333048std::optional<syncer::RaySyncMessage> NodeManager::CreateSyncMessage (
30343049 int64_t after_version, syncer::MessageType message_type) const {
3050+ // This method is only called for the COMMANDS channel, as the RESOURCE_VIEW
3051+ // channel goes through the LocalResourceManager.
30353052 RAY_CHECK_EQ (message_type, syncer::MessageType::COMMANDS);
30363053
3054+ // Serialize the COMMANDS message to a byte string to be nested inside the sync message.
3055+ std::string serialized_commands_sync_msg;
30373056 syncer::CommandsSyncMessage commands_sync_message;
30383057 commands_sync_message.set_should_global_gc (true );
30393058 commands_sync_message.set_cluster_full_of_actors_detected (resource_deadlock_warned_ >=
30403059 1 );
3060+ RAY_CHECK (commands_sync_message.SerializeToString (&serialized_commands_sync_msg));
3061+
3062+ // Populate the sync message.
30413063 syncer::RaySyncMessage msg;
3042- msg.set_version (absl::GetCurrentTimeNanos () );
3064+ msg.set_version (gc_command_sync_version_ );
30433065 msg.set_node_id (self_node_id_.Binary ());
30443066 msg.set_message_type (syncer::MessageType::COMMANDS);
3045- std::string serialized_msg;
3046- RAY_CHECK (commands_sync_message.SerializeToString (&serialized_msg));
3047- msg.set_sync_message (std::move (serialized_msg));
3067+ msg.set_sync_message (std::move (serialized_commands_sync_msg));
3068+
30483069 return std::make_optional (std::move (msg));
30493070}
30503071
0 commit comments