Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 50 additions & 47 deletions src/plugins/posix/aio_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
#include <string.h>
#include <time.h>
#include <stdexcept>
#include <absl/strings/str_format.h>

aioQueue::aioQueue(int num_entries, nixl_xfer_op_t operation)
: aiocbs(num_entries),
num_entries(num_entries),
completed(num_entries),
num_completed(0),
num_submitted(0),
operation(operation) {
if (num_entries <= 0) {
throw std::runtime_error("Invalid number of entries for AIO queue");
Expand All @@ -40,8 +40,9 @@ aioQueue::aioQueue(int num_entries, nixl_xfer_op_t operation)

aioQueue::~aioQueue() {
// There should not be any in-flight I/Os at destruction time
if (num_submitted > num_completed) {
NIXL_ERROR << "Programming error: Destroying aioQueue with " << (num_submitted - num_completed) << " in-flight I/Os";
if (num_ios_outstanding > 0) {
NIXL_ERROR << "Programming error: Destroying aioQueue with " << num_ios_outstanding
<< " in-flight I/Os";
}

// Cancel any remaining I/Os
Expand All @@ -53,11 +54,13 @@ aioQueue::~aioQueue() {
}

nixl_status_t
aioQueue::submit (const nixl_meta_dlist_t &, const nixl_meta_dlist_t &) {
num_submitted = 0;
// Submit all I/Os at once
for (auto& aiocb : aiocbs) {
aioQueue::submitBatch(int start_idx, int count, int &submitted_count) {
// Submit the batch
submitted_count = 0;
for (int i = start_idx; i < start_idx + count; i++) {
auto &aiocb = aiocbs[i];
if (aiocb.aio_fildes == 0 || aiocb.aio_nbytes == 0) continue;

// Check if file descriptor is valid
if (aiocb.aio_fildes < 0) {
NIXL_ERROR << "Invalid file descriptor in AIO request";
Expand All @@ -73,35 +76,33 @@ aioQueue::submit (const nixl_meta_dlist_t &, const nixl_meta_dlist_t &) {

if (ret < 0) {
if (errno == EAGAIN) {
// If we hit the kernel limit, cancel all submitted I/Os and return error
NIXL_ERROR << "AIO submit failed: kernel queue full";
for (auto& cb : aiocbs) {
if (cb.aio_fildes != 0) {
aio_cancel(cb.aio_fildes, &cb);
}
}
return NIXL_ERR_BACKEND;
// If we hit the kernel limit, stop submitting and return partial success
NIXL_ERROR << absl::StrFormat(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using NIXL_INFO? Since EAGAIN is an expected condition that the batching logic handles gracefully (returning NIXL_SUCCESS with partial count), logging it as an error may be misleading?

"AIO submit partial due to EAGAIN: %d/%d", submitted_count, count);
// Return success with the count we actually submitted
return NIXL_SUCCESS;
}
NIXL_PERROR << "AIO submit failed";
return NIXL_ERR_BACKEND;
}

num_submitted++;
submitted_count++;
}

completed.assign(num_entries, false);
num_completed = 0;
return NIXL_IN_PROG;
return NIXL_SUCCESS;
}

nixl_status_t aioQueue::checkCompleted() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completed is not reset. I think this may be a bug. Without this reset, subsequent transfers will skip checking IOs because completed[i] remains true from the previous transfer, causing an infinite loop in checkCompleted().
Probably need something like
num_completed = 0;
completed.assign(num_entries, false);

if (num_completed == num_entries)
// Check if all IOs are submitted and completed
if (num_ios_submitted_total >= num_ios_to_submit && num_ios_outstanding == 0) {
num_ios_submitted_total = 0;
num_ios_to_submit = 0;
return NIXL_SUCCESS;
}

// Check all submitted I/Os
for (int i = 0; i < num_entries; i++) {
if (completed[i] || aiocbs[i].aio_fildes == 0 || aiocbs[i].aio_nbytes == 0)
continue; // Skip completed I/Os
// Check all submitted I/Os for completion
for (int i = 0; i < num_ios_submitted_total; i++) {
if (completed[i]) continue; // Skip already completed I/Os

int status = aio_error(&aiocbs[i]);
if (status == 0) { // Operation completed
Expand All @@ -111,41 +112,43 @@ nixl_status_t aioQueue::checkCompleted() {
return NIXL_ERR_BACKEND;
}
num_completed++;
num_ios_outstanding--;
completed[i] = true;
} else if (status == EINPROGRESS) {
return NIXL_IN_PROG; // At least one operation still in progress
// Still in progress, continue checking others
continue;
} else {
NIXL_PERROR << "AIO error";
return NIXL_ERR_BACKEND;
}
}

return (num_completed == num_submitted) ? NIXL_SUCCESS : NIXL_IN_PROG;
return NIXL_IN_PROG; // Continue until all IOs are submitted and completed
}

nixl_status_t aioQueue::prepIO(int fd, void* buf, size_t len, off_t offset) {
// Find an unused control block
for (auto& aiocb : aiocbs) {
if (aiocb.aio_fildes == 0) {
// Check if file descriptor is valid
if (fd < 0) {
NIXL_ERROR << "Invalid file descriptor provided to prepareIO";
return NIXL_ERR_BACKEND;
}
// Check if file descriptor is valid
if (fd < 0) {
NIXL_ERROR << "Invalid file descriptor provided to prepareIO";
return NIXL_ERR_BACKEND;
}

// Check buffer and length
if (!buf || len == 0) {
NIXL_ERROR << "Invalid buffer or length provided to prepareIO";
return NIXL_ERR_BACKEND;
}
// Check buffer and length
if (!buf || len == 0) {
NIXL_ERROR << "Invalid buffer or length provided to prepareIO";
return NIXL_ERR_BACKEND;
}

aiocb.aio_fildes = fd;
aiocb.aio_buf = buf;
aiocb.aio_nbytes = len;
aiocb.aio_offset = offset;
return NIXL_SUCCESS;
}
if (num_ios_to_submit >= num_entries) {
NIXL_ERROR << "No available AIO control blocks";
return NIXL_ERR_BACKEND;
}
NIXL_ERROR << "No available AIO control blocks";
return NIXL_ERR_BACKEND;

auto &aiocb = aiocbs[num_ios_to_submit];
aiocb.aio_fildes = fd;
aiocb.aio_buf = buf;
aiocb.aio_nbytes = len;
aiocb.aio_offset = offset;
num_ios_to_submit++;
return NIXL_SUCCESS;
}
5 changes: 2 additions & 3 deletions src/plugins/posix/aio_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ class aioQueue : public nixlPosixQueue {
std::vector<struct aiocb> aiocbs; // Array of AIO control blocks
int num_entries; // Total number of entries expected
std::vector<bool> completed; // Track completed I/Os
int num_completed; // Number of completed operations
int num_submitted; // Track number of submitted I/Os
int num_completed; // Number of completed operations
nixl_xfer_op_t operation; // Whether this is a read operation

// Delete copy and move operations
Expand All @@ -44,7 +43,7 @@ class aioQueue : public nixlPosixQueue {
aioQueue(int num_entries, nixl_xfer_op_t operation);
~aioQueue();
nixl_status_t
submit (const nixl_meta_dlist_t &, const nixl_meta_dlist_t &) override;
submitBatch(int start_idx, int count, int &submitted_count) override;
nixl_status_t checkCompleted() override;
nixl_status_t prepIO(int fd, void* buf, size_t len, off_t offset) override;
};
Expand Down
51 changes: 24 additions & 27 deletions src/plugins/posix/linux_aio_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ linuxAioQueue::linuxAioQueue(int num_entries, nixl_xfer_op_t operation)
: io_ctx(io_context_t()),
ios(num_entries),
num_entries(num_entries),
num_ios_to_submit(0),
num_ios_to_complete(0),
operation(operation) {
if (num_entries <= 0) {
throw std::runtime_error("Invalid number of entries for AIO queue");
Expand All @@ -52,37 +50,26 @@ linuxAioQueue::~linuxAioQueue() {
}

nixl_status_t
linuxAioQueue::submit(const nixl_meta_dlist_t &, const nixl_meta_dlist_t &) {
if (!num_ios_to_submit) {
return NIXL_IN_PROG;
}

if (num_ios_to_complete) {
NIXL_ERROR << "previously submitted IO is not yet complete";
return NIXL_ERR_NOT_ALLOWED;
linuxAioQueue::submitBatch(int start_idx, int count, int &submitted_count) {
// Submit the batch starting from start_idx
int ret = io_submit(io_ctx, count, &ios_to_submit[start_idx]);
if (ret < 0) {
NIXL_ERROR << absl::StrFormat("linux_aio submit failed: %s", nixl_strerror(-ret));
submitted_count = 0;
return NIXL_ERR_BACKEND;
}

int ret = io_submit(io_ctx, num_ios_to_submit, ios_to_submit.data());
if (ret != num_ios_to_submit) {
if (ret < 0) {
NIXL_ERROR << absl::StrFormat("linux_aio submit failed: %s", nixl_strerror(-ret));
} else {
NIXL_ERROR << absl::StrFormat(
"linux_aio submit failed. Partial submission: %d/%d", num_ios_to_submit, ret);
}
return NIXL_ERR_BACKEND;
// io_submit can return partial submissions
submitted_count = ret;
if (ret != count) {
NIXL_ERROR << absl::StrFormat("linux_aio submit partial: %d/%d", ret, count);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here.. NIXL_ERROR here is misleading.

}

num_ios_to_complete = ret;
return NIXL_IN_PROG;
return NIXL_SUCCESS;
}

nixl_status_t
linuxAioQueue::checkCompleted() {
if (!num_ios_to_complete) {
return NIXL_SUCCESS;
}

struct io_event events[32];
int rc;
struct timespec timeout = {0, 0};
Expand All @@ -100,9 +87,19 @@ linuxAioQueue::checkCompleted() {
}
}

num_ios_to_complete -= rc;
num_ios_outstanding -= rc;
if (!num_ios_outstanding) {
// No outstanding IOs, check if we need to submit more
if (num_ios_submitted_total < num_ios_to_submit) {
// More IOs to submit, signal to caller to submit more
return NIXL_IN_PROG;
}
// All IOs submitted and completed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

num_ios_submitted_total needs to be zeroed here to enable the next sending attempt. Same applies to other queues.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

num_ios_submitted_total = 0;
return NIXL_SUCCESS;
}

return num_ios_to_complete ? NIXL_IN_PROG : NIXL_SUCCESS;
return NIXL_IN_PROG; // Continue until all IOs are submitted and completed
}

nixl_status_t
Expand Down
4 changes: 1 addition & 3 deletions src/plugins/posix/linux_aio_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ class linuxAioQueue : public nixlPosixQueue {
std::vector<struct iocb> ios; // Array of I/Os
int num_entries; // Total number of entries expected
std::vector<struct iocb *> ios_to_submit; // Array of I/Os to submit
int num_ios_to_submit; // Total number of entries to submit
int num_ios_to_complete; // Total number of entries to complete
nixl_xfer_op_t operation; // Whether this is a read operation

// Delete copy and move operations
Expand All @@ -47,7 +45,7 @@ class linuxAioQueue : public nixlPosixQueue {
linuxAioQueue(int num_entries, nixl_xfer_op_t operation);
~linuxAioQueue();
nixl_status_t
submit(const nixl_meta_dlist_t &, const nixl_meta_dlist_t &) override;
submitBatch(int start_idx, int count, int &submitted_count) override;
nixl_status_t
checkCompleted() override;
nixl_status_t
Expand Down
1 change: 1 addition & 0 deletions src/plugins/posix/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ posix_sources = [
'posix_backend.cpp',
'posix_backend.h',
'posix_plugin.cpp',
'posix_queue.cpp',
'queue_factory_impl.cpp'
]

Expand Down
12 changes: 11 additions & 1 deletion src/plugins/posix/posix_backend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,17 @@ nixl_status_t nixlPosixBackendReqH::prepXfer() {
}

nixl_status_t nixlPosixBackendReqH::checkXfer() {
return queue->checkCompleted();
nixl_status_t status = queue->checkCompleted();

if (status == NIXL_IN_PROG) {
// Submit more IOs to maintain target outstanding count
nixl_status_t submit_status = queue->submit(local, remote);
if (submit_status == NIXL_ERR_BACKEND || submit_status == NIXL_ERR_NOT_ALLOWED) {
return submit_status;
}
}

return status;
}

nixl_status_t nixlPosixBackendReqH::postXfer() {
Expand Down
50 changes: 50 additions & 0 deletions src/plugins/posix/posix_queue.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "posix_queue.h"
#include <algorithm>

nixl_status_t
nixlPosixQueue::submit(const nixl_meta_dlist_t &, const nixl_meta_dlist_t &) {
// If nothing left to submit, we're done
if (num_ios_submitted_total >= num_ios_to_submit) {
return NIXL_IN_PROG;
}

// Calculate how many more we can submit to reach target outstanding
int remaining = num_ios_to_submit - num_ios_submitted_total;
int slots_available = MAX_IO_OUTSTANDING - num_ios_outstanding;
int to_submit = std::min(remaining, slots_available);

// Nothing to submit if we're already at target outstanding
if (to_submit <= 0) {
return NIXL_IN_PROG;
}

// Call queue-specific batch submission
int actual_submitted = 0;
nixl_status_t status = submitBatch(num_ios_submitted_total, to_submit, actual_submitted);
if (status != NIXL_SUCCESS) {
return status;
}

// Update tracking with actual number submitted
num_ios_submitted_total += actual_submitted;
num_ios_outstanding += actual_submitted;

return NIXL_IN_PROG;
}
Loading