Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ void OutgoingMigration::Finish(const GenericError& error) {
}

bool should_cancel_flows = false;
absl::Cleanup on_exit([this]() { CloseSocket(); });

{
util::fb2::LockGuard lk(state_mu_);
switch (state_) {
Expand Down Expand Up @@ -313,6 +315,7 @@ void OutgoingMigration::SyncFb() {
break;
}

CloseSocket();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BorysTheDev is there any other place that I forgot to CloseSocket() ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so

VLOG(1) << "Exiting outgoing migration fiber for migration " << migration_info_.ToString();
}

Expand Down
14 changes: 5 additions & 9 deletions src/server/protocol_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,6 @@ ProtocolClient::ProtocolClient(ServerContext context) : server_context_(std::mov
ProtocolClient::~ProtocolClient() {
exec_st_.JoinErrorHandler();

// FIXME: We should close the socket explictly outside of the destructor. This currently
// breaks test_cancel_replication_immediately.
if (sock_) {
std::error_code ec;
sock_->proactor()->Await([this, &ec]() { ec = sock_->Close(); });
LOG_IF(ERROR, ec) << "Error closing socket " << ec;
}
#ifdef DFLY_USE_SSL
if (ssl_ctx_) {
SSL_CTX_free(ssl_ctx_);
Expand Down Expand Up @@ -235,6 +228,9 @@ void ProtocolClient::CloseSocket() {
auto ec = sock_->Shutdown(SHUT_RDWR);
LOG_IF(ERROR, ec) << "Could not shutdown socket " << ec;
}

auto ec = sock_->Close(); // Quietly close.
LOG_IF(WARNING, ec) << "Error closing socket " << ec << "/" << ec.message();
});
}
}
Expand Down Expand Up @@ -385,11 +381,11 @@ void ProtocolClient::ResetParser(RedisParser::Mode mode) {
}

uint64_t ProtocolClient::LastIoTime() const {
return last_io_time_;
return last_io_time_.load(std::memory_order_relaxed);
}

void ProtocolClient::TouchIoTime() {
last_io_time_ = Proactor()->GetMonotonicTimeNs();
last_io_time_.store(Proactor()->GetMonotonicTimeNs(), std::memory_order_relaxed);
}

} // namespace dfly
2 changes: 1 addition & 1 deletion src/server/protocol_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class ProtocolClient {
std::string last_cmd_;
std::string last_resp_;

uint64_t last_io_time_ = 0; // in ns, monotonic clock.
std::atomic<uint64_t> last_io_time_ = 0; // in ns, monotonic clock.

#ifdef DFLY_USE_SSL

Expand Down
19 changes: 3 additions & 16 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ GenericError Replica::Start() {
VLOG(1) << "Starting replication " << this;
ProactorBase* mythread = ProactorBase::me();
CHECK(mythread);
DCHECK(proactor_ == mythread);

auto check_connection_error = [this](error_code ec, const char* msg) -> GenericError {
if (!exec_st_.IsRunning()) {
Expand Down Expand Up @@ -1212,9 +1213,7 @@ auto Replica::GetSummary() const -> Summary {
auto f = [this]() {
auto last_io_time = LastIoTime();

// Note: we access LastIoTime from foreigh thread in unsafe manner. However, specifically here
// it's unlikely to cause a real bug.
for (const auto& flow : shard_flows_) { // Get last io time from all sub flows.
for (const auto& flow : shard_flows_) {
last_io_time = std::max(last_io_time, flow->LastIoTime());
}

Expand Down Expand Up @@ -1246,19 +1245,7 @@ auto Replica::GetSummary() const -> Summary {
return res;
};

if (Sock())
return Proactor()->AwaitBrief(f);

/**
* when this branch happens: there is a very short grace period
* where Sock() is not initialized, yet the server can
* receive ROLE/INFO commands. That period happens when launching
* an instance with '--replicaof' and then immediately
* sending a command.
*
* In that instance, we have to run f() on the current fiber.
*/
return f();
return proactor_->AwaitBrief(f);
}

std::vector<uint64_t> Replica::GetReplicaOffset() const {
Expand Down
Loading