Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 54 additions & 23 deletions src/plugins/libfabric/libfabric_backend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1339,20 +1339,42 @@ nixlLibfabricEngine::cmThread() {
NIXL_DEBUG << "ConnectionManagement thread started successfully";
NIXL_DEBUG << "Initial receives already posted in main thread, entering progress loop";

// Main progress loop - continuously process completions on all rails
while (!cm_thread_stop_.load()) {
NIXL_DEBUG << "CM: Thread started";

// Adaptive backoff state (per-thread)
static thread_local int backoff_us = 50; // start at 50 µs
static thread_local const int backoff_us_max = 2000; // cap at 2 ms

// Prefer blocking progress if supported (verbs with FI_WAIT_FD)
const bool blocking_supported = (rail_manager.getNumControlRails() > 0) &&
rail_manager.getControlRail(0).blocking_cq_sread_supported;

while (!cm_thread_stop_.load(std::memory_order_relaxed)) {
nixl_status_t status;
if (blocking_supported) {
// With blocking control CQ progress, rely on rail_manager to block up to its timeout
status = rail_manager.progressAllControlRails(true); // blocking=true inside rail path
} else {
// Non-blocking path: progress and adaptively back off on idle
status = rail_manager.progressAllControlRails(false);
}

nixl_status_t status = rail_manager.progressAllControlRails();
if (status == NIXL_SUCCESS) {
NIXL_DEBUG << "Processed completions on control rails";
} else if (status != NIXL_IN_PROG && status != NIXL_SUCCESS) {
NIXL_ERROR << "Failed to process completions on control rails";
return NIXL_ERR_BACKEND;
// Work was done reset backoff
backoff_us = 50;
// Optionally continue immediately to drain more completions
continue;
}
// Sleep briefly to avoid spinning too aggressively when blocking cq read is not used
if (!rail_manager.getControlRail(0).blocking_cq_sread_supported) {
std::this_thread::sleep_for(std::chrono::nanoseconds(10));
if (status == NIXL_IN_PROG) {
// No completions available sleep adaptively
std::this_thread::sleep_for(std::chrono::microseconds(backoff_us));
backoff_us = std::min(backoff_us * 2, backoff_us_max);
continue;
}

// Unexpected error log and exit
NIXL_ERROR << "CM: Failed to process completions on control rails, status=" << status;
return NIXL_ERR_BACKEND;
}
NIXL_DEBUG << "ConnectionManagement thread exiting cleanly";
return NIXL_SUCCESS;
Expand All @@ -1366,24 +1388,33 @@ nixlLibfabricEngine::cmThread() {
nixl_status_t
nixlLibfabricEngine::progressThread() {
NIXL_DEBUG << "Progress thread started successfully for data rails only";
// Main progress loop - continuously process completions only on data rails
while (!progress_thread_stop_.load()) {
// Process completions only on data rails (non-blocking)
bool any_completions = false;
nixl_status_t status = rail_manager.progressActiveDataRails();

// Adaptive backoff layered over configured delay
static thread_local int backoff_us = static_cast<int>(progress_thread_delay_.count());
static thread_local const int backoff_us_min = 50; // floor at 50 µs
static thread_local const int backoff_us_max = 5000; // cap at 5 ms
if (backoff_us <= 0) backoff_us = backoff_us_min;

while (!progress_thread_stop_.load(std::memory_order_relaxed)) {
nixl_status_t status = rail_manager.progressActiveDataRails(); // non-blocking
if (status == NIXL_SUCCESS) {
any_completions = true;
NIXL_DEBUG << "Processed completions on data rails";
} else if (status != NIXL_IN_PROG && status != NIXL_SUCCESS) {
NIXL_ERROR << "Failed to process completions on data rails";
// Don't return error, continue for robustness
// Completions processed reset backoff and continue draining
backoff_us = std::max(backoff_us_min, static_cast<int>(progress_thread_delay_.count()));
continue;
}
if (!any_completions) {
std::this_thread::sleep_for(progress_thread_delay_);
if (status == NIXL_IN_PROG) {
// Idle sleep adaptively, increasing up to cap
std::this_thread::sleep_for(std::chrono::microseconds(backoff_us));
backoff_us = std::min(backoff_us * 2, backoff_us_max);
continue;
}
// Error log and keep going for robustness (do not kill the PT)
NIXL_ERROR << "PT: Failed to process completions on data rails, status=" << status;
std::this_thread::sleep_for(std::chrono::microseconds(backoff_us_min));
}
NIXL_DEBUG << "Progress thread exiting cleanly";
NIXL_DEBUG << "PT: Thread exiting";
return NIXL_SUCCESS;

}

void
Expand Down
97 changes: 49 additions & 48 deletions src/utils/libfabric/libfabric_rail.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <cstring>
#include <stdexcept>
#include <stack>
#include <thread>

#ifdef HAVE_SYNAPSEAI
#include <dlfcn.h>
Expand Down Expand Up @@ -700,8 +701,9 @@ nixlLibfabricRail::setXferIdCallback(std::function<void(uint32_t)> callback) {
nixl_status_t
nixlLibfabricRail::progressCompletionQueue(bool use_blocking) const {
// Completion processing
struct fi_cq_data_entry completion;
memset(&completion, 0, sizeof(completion));
// Batch read to amortize lock and syscall overhead
struct fi_cq_data_entry entries[32];
memset(entries, 0, sizeof(entries));

int ret;

Expand All @@ -711,10 +713,10 @@ nixlLibfabricRail::progressCompletionQueue(bool use_blocking) const {

if (use_blocking && blocking_cq_sread_supported) {
// Blocking read using fi_cq_sread (used by CM thread)
ret = fi_cq_sread(cq, &completion, 1, nullptr, NIXL_LIBFABRIC_CQ_SREAD_TIMEOUT_SEC);
ret = fi_cq_sread(cq, entries, 1, nullptr, NIXL_LIBFABRIC_CQ_SREAD_TIMEOUT_SEC);
} else {
// Non-blocking read (used by progress thread or fallback)
ret = fi_cq_read(cq, &completion, 1);
ret = fi_cq_read(cq, entries, 32);
}

if (ret < 0 && ret != -FI_EAGAIN) {
Expand All @@ -738,24 +740,25 @@ nixlLibfabricRail::progressCompletionQueue(bool use_blocking) const {
}
// CQ lock released here - completion is now local data

if (ret == -FI_EAGAIN) {
if (ret == -FI_EAGAIN || ret == 0) {
return NIXL_IN_PROG; // No completions available
}

if (ret == 1) {
NIXL_TRACE << "Completion received on rail " << rail_id << " flags: " << std::hex
<< completion.flags << " data: " << completion.data
<< " context: " << completion.op_context << std::dec;

// Process completion using local data. Callbacks have their own thread safety
nixl_status_t status = processCompletionQueueEntry(&completion);
if (status != NIXL_SUCCESS) {
NIXL_ERROR << "Failed to process completion on rail " << rail_id;
return status;
if (ret > 0) {
bool ok = true;
for (int i = 0; i < ret; ++i) {
NIXL_TRACE << "Completion received on rail " << rail_id << " flags=" << std::hex
<< entries[i].flags << " data=" << entries[i].data
<< " context=" << entries[i].op_context << std::dec;
nixl_status_t status = processCompletionQueueEntry(&entries[i]);
if (status != NIXL_SUCCESS) {
NIXL_ERROR << "Failed to process completion on rail " << rail_id;
ok = false;
break;
}
}

NIXL_DEBUG << "Completion processed on rail " << rail_id;
return NIXL_SUCCESS;
return ok ? NIXL_SUCCESS : NIXL_ERR_BACKEND;
}

return NIXL_ERR_BACKEND; // Unexpected case
Expand Down Expand Up @@ -1077,7 +1080,7 @@ nixlLibfabricRail::postSend(uint64_t immediate_data,

if (ret == -FI_EAGAIN) {
// Resource temporarily unavailable - retry indefinitely for all providers
attempt++;
++attempt;

// Log every N attempts to avoid log spam
if (attempt % NIXL_LIBFABRIC_LOG_INTERVAL_ATTEMPTS == 0) {
Expand All @@ -1088,17 +1091,17 @@ nixlLibfabricRail::postSend(uint64_t immediate_data,
<< ", retrying (attempt " << attempt << ")";
}

// Exponential backoff with cap to avoid overwhelming the system
int delay_us = std::min(NIXL_LIBFABRIC_BASE_RETRY_DELAY_US * (1 + attempt / 10),
NIXL_LIBFABRIC_MAX_RETRY_DELAY_US);

// Progress completion queue to drain pending completions before retry
nixl_status_t progress_status = progressCompletionQueue(false);
if (progress_status == NIXL_SUCCESS) {
NIXL_TRACE << "Progressed completions on rail " << rail_id << " before retry";
// Progress CQ a few times before backing off
if (attempt <= 8) {
(void)progressCompletionQueue(false);
} else {
int delay_us = std::min(1000 * (attempt / 10 + 1), 100000); // 1ms..100ms
if (blocking_cq_sread_supported)
(void)progressCompletionQueue(true);
else
std::this_thread::sleep_for(std::chrono::microseconds(delay_us));
}

usleep(delay_us);
continue;
} else {
// Other error - don't retry, fail immediately
Expand Down Expand Up @@ -1157,7 +1160,7 @@ nixlLibfabricRail::postWrite(const void *local_buffer,

if (ret == -FI_EAGAIN) {
// Resource temporarily unavailable - retry indefinitely for all providers
attempt++;
++attempt;

// Log every N attempts to avoid log spam
if (attempt % NIXL_LIBFABRIC_LOG_INTERVAL_ATTEMPTS == 0) {
Expand All @@ -1168,17 +1171,16 @@ nixlLibfabricRail::postWrite(const void *local_buffer,
<< ", retrying (attempt " << attempt << ")";
}

// Exponential backoff with cap to avoid overwhelming the system
int delay_us = std::min(NIXL_LIBFABRIC_BASE_RETRY_DELAY_US * (1 + attempt / 10),
NIXL_LIBFABRIC_MAX_RETRY_DELAY_US);

// Progress completion queue to drain pending completions before retry
nixl_status_t progress_status = progressCompletionQueue(false);
if (progress_status == NIXL_SUCCESS) {
NIXL_TRACE << "Progressed completions on rail " << rail_id << " before retry";
// Progress CQ a few times before backing off
if (attempt <= 8) {
(void)progressCompletionQueue(false);
} else {
int delay_us = std::min(1000 * (attempt / 10 + 1), 100000); // 1ms..100ms
if (blocking_cq_sread_supported)
(void)progressCompletionQueue(true);
else
std::this_thread::sleep_for(std::chrono::microseconds(delay_us));
}

usleep(delay_us);
continue;
} else {
// Other error - don't retry, fail immediately
Expand Down Expand Up @@ -1245,17 +1247,16 @@ nixlLibfabricRail::postRead(void *local_buffer,
<< ", retrying (attempt " << attempt << ")";
}

// Exponential backoff with cap to avoid overwhelming the system
int delay_us = std::min(NIXL_LIBFABRIC_BASE_RETRY_DELAY_US * (1 + attempt / 10),
NIXL_LIBFABRIC_MAX_RETRY_DELAY_US);

// Progress completion queue to drain pending completions before retry
nixl_status_t progress_status = progressCompletionQueue(false);
if (progress_status == NIXL_SUCCESS) {
NIXL_TRACE << "Progressed completions on rail " << rail_id << " before retry";
// Progress CQ a few times before backing off
if (attempt <= 8) {
(void)progressCompletionQueue(false);
} else {
int delay_us = std::min(1000 * (attempt / 10 + 1), 100000); // 1ms..100ms
if (blocking_cq_sread_supported)
(void)progressCompletionQueue(true);
else
std::this_thread::sleep_for(std::chrono::microseconds(delay_us));
}

usleep(delay_us);
continue;
} else {
// Other error - don't retry, fail immediately
Expand Down
6 changes: 3 additions & 3 deletions src/utils/libfabric/libfabric_rail_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -658,11 +658,11 @@ nixlLibfabricRailManager::progressActiveDataRails() {
}

nixl_status_t
nixlLibfabricRailManager::progressAllControlRails() {
nixlLibfabricRailManager::progressAllControlRails(bool blocking) {
bool any_completions = false;
for (size_t rail_id = 0; rail_id < num_control_rails_; ++rail_id) {
nixl_status_t status =
control_rails_[rail_id]->progressCompletionQueue(true); // Blocking for control rails
nixl_status_t status = control_rails_[rail_id]->progressCompletionQueue(
blocking); // Blocking for control rails
if (status == NIXL_SUCCESS) {
any_completions = true;
NIXL_DEBUG << "Processed completion on control rail " << rail_id;
Expand Down
2 changes: 1 addition & 1 deletion src/utils/libfabric/libfabric_rail_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ class nixlLibfabricRailManager {
* @return NIXL_SUCCESS if completions processed, NIXL_IN_PROG if none, error on failure
*/
nixl_status_t
progressAllControlRails();
progressAllControlRails(bool blocking);
/** Validate that all rails are properly initialized
* @return NIXL_SUCCESS if all rails initialized, error code otherwise
*/
Expand Down
Loading