diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index dd6ea922f789..c5ad9525bbdb 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -161,6 +161,8 @@ void OutgoingMigration::Finish(const GenericError& error) { } bool should_cancel_flows = false; + absl::Cleanup on_exit([this]() { CloseSocket(); }); + { util::fb2::LockGuard lk(state_mu_); switch (state_) { @@ -313,6 +315,7 @@ void OutgoingMigration::SyncFb() { break; } + CloseSocket(); VLOG(1) << "Exiting outgoing migration fiber for migration " << migration_info_.ToString(); } diff --git a/src/server/protocol_client.cc b/src/server/protocol_client.cc index d20f73856677..166d439d2ae5 100644 --- a/src/server/protocol_client.cc +++ b/src/server/protocol_client.cc @@ -119,13 +119,6 @@ ProtocolClient::ProtocolClient(ServerContext context) : server_context_(std::mov ProtocolClient::~ProtocolClient() { exec_st_.JoinErrorHandler(); - // FIXME: We should close the socket explictly outside of the destructor. This currently - // breaks test_cancel_replication_immediately. - if (sock_) { - std::error_code ec; - sock_->proactor()->Await([this, &ec]() { ec = sock_->Close(); }); - LOG_IF(ERROR, ec) << "Error closing socket " << ec; - } #ifdef DFLY_USE_SSL if (ssl_ctx_) { SSL_CTX_free(ssl_ctx_); @@ -235,6 +228,9 @@ void ProtocolClient::CloseSocket() { auto ec = sock_->Shutdown(SHUT_RDWR); LOG_IF(ERROR, ec) << "Could not shutdown socket " << ec; } + + auto ec = sock_->Close(); // Quietly close. + LOG_IF(WARNING, ec) << "Error closing socket " << ec << "/" << ec.message(); }); } } @@ -385,11 +381,11 @@ void ProtocolClient::ResetParser(RedisParser::Mode mode) { } uint64_t ProtocolClient::LastIoTime() const { - return last_io_time_; + return last_io_time_.load(std::memory_order_relaxed); } void ProtocolClient::TouchIoTime() { - last_io_time_ = Proactor()->GetMonotonicTimeNs(); + last_io_time_.store(Proactor()->GetMonotonicTimeNs(), std::memory_order_relaxed); } } // namespace dfly diff --git a/src/server/protocol_client.h b/src/server/protocol_client.h index 7e7ddda036b7..53d487866657 100644 --- a/src/server/protocol_client.h +++ b/src/server/protocol_client.h @@ -132,7 +132,7 @@ class ProtocolClient { std::string last_cmd_; std::string last_resp_; - uint64_t last_io_time_ = 0; // in ns, monotonic clock. + std::atomic last_io_time_ = 0; // in ns, monotonic clock. #ifdef DFLY_USE_SSL diff --git a/src/server/replica.cc b/src/server/replica.cc index 7363c7163220..93ddf671ffe4 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -108,6 +108,7 @@ GenericError Replica::Start() { VLOG(1) << "Starting replication " << this; ProactorBase* mythread = ProactorBase::me(); CHECK(mythread); + DCHECK(proactor_ == mythread); auto check_connection_error = [this](error_code ec, const char* msg) -> GenericError { if (!exec_st_.IsRunning()) { @@ -1212,9 +1213,7 @@ auto Replica::GetSummary() const -> Summary { auto f = [this]() { auto last_io_time = LastIoTime(); - // Note: we access LastIoTime from foreigh thread in unsafe manner. However, specifically here - // it's unlikely to cause a real bug. - for (const auto& flow : shard_flows_) { // Get last io time from all sub flows. + for (const auto& flow : shard_flows_) { last_io_time = std::max(last_io_time, flow->LastIoTime()); } @@ -1246,19 +1245,7 @@ auto Replica::GetSummary() const -> Summary { return res; }; - if (Sock()) - return Proactor()->AwaitBrief(f); - - /** - * when this branch happens: there is a very short grace period - * where Sock() is not initialized, yet the server can - * receive ROLE/INFO commands. That period happens when launching - * an instance with '--replicaof' and then immediately - * sending a command. - * - * In that instance, we have to run f() on the current fiber. - */ - return f(); + return proactor_->AwaitBrief(f); } std::vector Replica::GetReplicaOffset() const {