diff --git a/src/ray/ray_syncer/node_state.cc b/src/ray/ray_syncer/node_state.cc index c046145cd7cb..78f368987964 100644 --- a/src/ray/ray_syncer/node_state.cc +++ b/src/ray/ray_syncer/node_state.cc @@ -63,8 +63,12 @@ bool NodeState::ConsumeSyncMessage(std::shared_ptr message << (current ? current->version() : -1) << " message_version=" << message->version() << ", message_from=" << NodeID::FromBinary(message->node_id()); + // Check whether newer version of this message has been received. if (current && current->version() >= message->version()) { + RAY_LOG(INFO) << "Dropping sync message with stale version. latest version: " + << current->version() + << ", dropped message version: " << message->version(); return false; } diff --git a/src/ray/ray_syncer/ray_syncer_bidi_reactor_base.h b/src/ray/ray_syncer/ray_syncer_bidi_reactor_base.h index 2de621c47a5b..8186fadedcbc 100644 --- a/src/ray/ray_syncer/ray_syncer_bidi_reactor_base.h +++ b/src/ray/ray_syncer/ray_syncer_bidi_reactor_base.h @@ -64,14 +64,18 @@ class RaySyncerBidiReactorBase : public RaySyncerBidiReactor, public T { } auto &node_versions = GetNodeComponentVersions(message->node_id()); - if (node_versions[message->message_type()] < message->version()) { - node_versions[message->message_type()] = message->version(); - sending_buffer_[std::make_pair(message->node_id(), message->message_type())] = - std::move(message); - StartSend(); - return true; + if (node_versions[message->message_type()] >= message->version()) { + RAY_LOG(INFO) << "Dropping sync message with stale version. latest version: " + << node_versions[message->message_type()] + << ", dropped message version: " << message->version(); + return false; } - return false; + + node_versions[message->message_type()] = message->version(); + sending_buffer_[std::make_pair(message->node_id(), message->message_type())] = + std::move(message); + StartSend(); + return true; } virtual ~RaySyncerBidiReactorBase() = default; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 9e2a45641ba8..b84f4a39729c 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -224,7 +224,6 @@ NodeManager::NodeManager( cluster_lease_manager_(cluster_lease_manager), record_metrics_period_ms_(config.record_metrics_period_ms), placement_group_resource_manager_(placement_group_resource_manager), - next_resource_seq_no_(0), ray_syncer_(io_service_, self_node_id_.Binary()), worker_killing_policy_(std::make_shared()), memory_monitor_(std::make_unique( @@ -322,7 +321,14 @@ void NodeManager::RegisterGcs() { auto on_node_change_subscribe_done = [this](Status status) { RAY_CHECK_OK(status); - // Register resource manager and scheduler + // RESOURCE_VIEW is used to synchronize available resources across Raylets. + // + // LocalResourceManager::CreateSyncMessage will be called periodically to collect + // the local Raylet's usage to broadcast to others (via the GCS). The updates are + // versioned inside of `LocalResourceManager` to avoid unnecessary broadcasts. + // + // NodeManager::ConsumeSyncMessage will be called when a sync message containing + // other Raylets' resource usage is received. ray_syncer_.Register( /* message_type */ syncer::MessageType::RESOURCE_VIEW, /* reporter */ &cluster_resource_scheduler_.GetLocalResourceManager(), @@ -330,8 +336,14 @@ void NodeManager::RegisterGcs() { /* pull_from_reporter_interval_ms */ report_resources_period_ms_); - // Register a commands channel. - // It's only used for GC right now. + // COMMANDS is used only to broadcast a global request to call the Python garbage + // collector on all Raylets when the cluster is under memory pressure. + // + // Periodic collection is disabled, so this command is only broadcasted via + // `OnDemandBroadcasting` (which will call NodeManager::CreateSyncMessage). + // + // NodeManager::ConsumeSyncMessage is called to execute the GC command from other + // Raylets. ray_syncer_.Register( /* message_type */ syncer::MessageType::COMMANDS, /* reporter */ this, @@ -346,6 +358,9 @@ void NodeManager::RegisterGcs() { // If plasma store is under high pressure, we should try to schedule a global // gc. if (triggered_by_global_gc) { + // Always increment the sync message version number so that all GC commands + // are sent indiscriminately. + gc_command_sync_version_++; ray_syncer_.OnDemandBroadcasting(syncer::MessageType::COMMANDS); } }, @@ -3032,19 +3047,25 @@ void NodeManager::ConsumeSyncMessage( std::optional NodeManager::CreateSyncMessage( int64_t after_version, syncer::MessageType message_type) const { + // This method is only called for the COMMANDS channel, as the RESOURCE_VIEW + // channel goes through the LocalResourceManager. RAY_CHECK_EQ(message_type, syncer::MessageType::COMMANDS); + // Serialize the COMMANDS message to a byte string to be nested inside the sync message. + std::string serialized_commands_sync_msg; syncer::CommandsSyncMessage commands_sync_message; commands_sync_message.set_should_global_gc(true); commands_sync_message.set_cluster_full_of_actors_detected(resource_deadlock_warned_ >= 1); + RAY_CHECK(commands_sync_message.SerializeToString(&serialized_commands_sync_msg)); + + // Populate the sync message. syncer::RaySyncMessage msg; - msg.set_version(absl::GetCurrentTimeNanos()); + msg.set_version(gc_command_sync_version_); msg.set_node_id(self_node_id_.Binary()); msg.set_message_type(syncer::MessageType::COMMANDS); - std::string serialized_msg; - RAY_CHECK(commands_sync_message.SerializeToString(&serialized_msg)); - msg.set_sync_message(std::move(serialized_msg)); + msg.set_sync_message(std::move(serialized_commands_sync_msg)); + return std::make_optional(std::move(msg)); } diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 843f37e28f5e..80a7eeed4d3e 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -202,8 +202,18 @@ class NodeManager : public rpc::NodeManagerServiceHandler, /// Get the port of the node manager rpc server. int GetServerPort() const { return node_manager_server_.GetPort(); } + // Consume a RaySyncer sync message from another Raylet. + // + // The two types of messages that are received are: + // - RESOURCE_VIEW: an update of the resources available on another Raylet. + // - COMMANDS: a request to run the Python garbage collector globally across Raylets. void ConsumeSyncMessage(std::shared_ptr message) override; + // Generate a RaySyncer sync message to be sent to other Raylets. + // + // This is currently only used to generate messages for the COMMANDS channel to request + // other Raylets to call the Python garbage collector, and is only called on demand + // (not periodically polled by the RaySyncer code). std::optional CreateSyncMessage( int64_t after_version, syncer::MessageType message_type) const override; @@ -890,13 +900,13 @@ class NodeManager : public rpc::NodeManagerServiceHandler, /// Managers all bundle-related operations. PlacementGroupResourceManager &placement_group_resource_manager_; - /// Next resource broadcast seq no. Non-incrementing sequence numbers - /// indicate network issues (dropped/duplicated/ooo packets, etc). - int64_t next_resource_seq_no_; - /// Ray syncer for synchronization syncer::RaySyncer ray_syncer_; + /// `version` for the RaySyncer COMMANDS channel. Monotonically incremented each time + /// we issue a GC command so that none of the messages are dropped. + int64_t gc_command_sync_version_ = 0; + /// The Policy for selecting the worker to kill when the node runs out of memory. std::shared_ptr worker_killing_policy_;