diff --git a/src/plugins/posix/aio_queue.cpp b/src/plugins/posix/aio_queue.cpp index 460ead307..83d33fbd8 100644 --- a/src/plugins/posix/aio_queue.cpp +++ b/src/plugins/posix/aio_queue.cpp @@ -22,13 +22,13 @@ #include #include #include +#include aioQueue::aioQueue(int num_entries, nixl_xfer_op_t operation) : aiocbs(num_entries), num_entries(num_entries), completed(num_entries), num_completed(0), - num_submitted(0), operation(operation) { if (num_entries <= 0) { throw std::runtime_error("Invalid number of entries for AIO queue"); @@ -40,8 +40,9 @@ aioQueue::aioQueue(int num_entries, nixl_xfer_op_t operation) aioQueue::~aioQueue() { // There should not be any in-flight I/Os at destruction time - if (num_submitted > num_completed) { - NIXL_ERROR << "Programming error: Destroying aioQueue with " << (num_submitted - num_completed) << " in-flight I/Os"; + if (num_ios_outstanding > 0) { + NIXL_ERROR << "Programming error: Destroying aioQueue with " << num_ios_outstanding + << " in-flight I/Os"; } // Cancel any remaining I/Os @@ -53,11 +54,13 @@ aioQueue::~aioQueue() { } nixl_status_t -aioQueue::submit (const nixl_meta_dlist_t &, const nixl_meta_dlist_t &) { - num_submitted = 0; - // Submit all I/Os at once - for (auto& aiocb : aiocbs) { +aioQueue::submitBatch(int start_idx, int count, int &submitted_count) { + // Submit the batch + submitted_count = 0; + for (int i = start_idx; i < start_idx + count; i++) { + auto &aiocb = aiocbs[i]; if (aiocb.aio_fildes == 0 || aiocb.aio_nbytes == 0) continue; + // Check if file descriptor is valid if (aiocb.aio_fildes < 0) { NIXL_ERROR << "Invalid file descriptor in AIO request"; @@ -73,35 +76,33 @@ aioQueue::submit (const nixl_meta_dlist_t &, const nixl_meta_dlist_t &) { if (ret < 0) { if (errno == EAGAIN) { - // If we hit the kernel limit, cancel all submitted I/Os and return error - NIXL_ERROR << "AIO submit failed: kernel queue full"; - for (auto& cb : aiocbs) { - if (cb.aio_fildes != 0) { - aio_cancel(cb.aio_fildes, &cb); - } - } - return NIXL_ERR_BACKEND; + // If we hit the kernel limit, stop submitting and return partial success + NIXL_ERROR << absl::StrFormat( + "AIO submit partial due to EAGAIN: %d/%d", submitted_count, count); + // Return success with the count we actually submitted + return NIXL_SUCCESS; } NIXL_PERROR << "AIO submit failed"; return NIXL_ERR_BACKEND; } - num_submitted++; + submitted_count++; } - completed.assign(num_entries, false); - num_completed = 0; - return NIXL_IN_PROG; + return NIXL_SUCCESS; } nixl_status_t aioQueue::checkCompleted() { - if (num_completed == num_entries) + // Check if all IOs are submitted and completed + if (num_ios_submitted_total >= num_ios_to_submit && num_ios_outstanding == 0) { + num_ios_submitted_total = 0; + num_ios_to_submit = 0; return NIXL_SUCCESS; + } - // Check all submitted I/Os - for (int i = 0; i < num_entries; i++) { - if (completed[i] || aiocbs[i].aio_fildes == 0 || aiocbs[i].aio_nbytes == 0) - continue; // Skip completed I/Os + // Check all submitted I/Os for completion + for (int i = 0; i < num_ios_submitted_total; i++) { + if (completed[i]) continue; // Skip already completed I/Os int status = aio_error(&aiocbs[i]); if (status == 0) { // Operation completed @@ -111,41 +112,43 @@ nixl_status_t aioQueue::checkCompleted() { return NIXL_ERR_BACKEND; } num_completed++; + num_ios_outstanding--; completed[i] = true; } else if (status == EINPROGRESS) { - return NIXL_IN_PROG; // At least one operation still in progress + // Still in progress, continue checking others + continue; } else { NIXL_PERROR << "AIO error"; return NIXL_ERR_BACKEND; } } - return (num_completed == num_submitted) ? NIXL_SUCCESS : NIXL_IN_PROG; + return NIXL_IN_PROG; // Continue until all IOs are submitted and completed } nixl_status_t aioQueue::prepIO(int fd, void* buf, size_t len, off_t offset) { - // Find an unused control block - for (auto& aiocb : aiocbs) { - if (aiocb.aio_fildes == 0) { - // Check if file descriptor is valid - if (fd < 0) { - NIXL_ERROR << "Invalid file descriptor provided to prepareIO"; - return NIXL_ERR_BACKEND; - } + // Check if file descriptor is valid + if (fd < 0) { + NIXL_ERROR << "Invalid file descriptor provided to prepareIO"; + return NIXL_ERR_BACKEND; + } - // Check buffer and length - if (!buf || len == 0) { - NIXL_ERROR << "Invalid buffer or length provided to prepareIO"; - return NIXL_ERR_BACKEND; - } + // Check buffer and length + if (!buf || len == 0) { + NIXL_ERROR << "Invalid buffer or length provided to prepareIO"; + return NIXL_ERR_BACKEND; + } - aiocb.aio_fildes = fd; - aiocb.aio_buf = buf; - aiocb.aio_nbytes = len; - aiocb.aio_offset = offset; - return NIXL_SUCCESS; - } + if (num_ios_to_submit >= num_entries) { + NIXL_ERROR << "No available AIO control blocks"; + return NIXL_ERR_BACKEND; } - NIXL_ERROR << "No available AIO control blocks"; - return NIXL_ERR_BACKEND; + + auto &aiocb = aiocbs[num_ios_to_submit]; + aiocb.aio_fildes = fd; + aiocb.aio_buf = buf; + aiocb.aio_nbytes = len; + aiocb.aio_offset = offset; + num_ios_to_submit++; + return NIXL_SUCCESS; } diff --git a/src/plugins/posix/aio_queue.h b/src/plugins/posix/aio_queue.h index cba8e9a0f..6e0dc2f0e 100644 --- a/src/plugins/posix/aio_queue.h +++ b/src/plugins/posix/aio_queue.h @@ -30,8 +30,7 @@ class aioQueue : public nixlPosixQueue { std::vector aiocbs; // Array of AIO control blocks int num_entries; // Total number of entries expected std::vector completed; // Track completed I/Os - int num_completed; // Number of completed operations - int num_submitted; // Track number of submitted I/Os + int num_completed; // Number of completed operations nixl_xfer_op_t operation; // Whether this is a read operation // Delete copy and move operations @@ -44,7 +43,7 @@ class aioQueue : public nixlPosixQueue { aioQueue(int num_entries, nixl_xfer_op_t operation); ~aioQueue(); nixl_status_t - submit (const nixl_meta_dlist_t &, const nixl_meta_dlist_t &) override; + submitBatch(int start_idx, int count, int &submitted_count) override; nixl_status_t checkCompleted() override; nixl_status_t prepIO(int fd, void* buf, size_t len, off_t offset) override; }; diff --git a/src/plugins/posix/linux_aio_queue.cpp b/src/plugins/posix/linux_aio_queue.cpp index eb69bbfca..1ff3a8afe 100644 --- a/src/plugins/posix/linux_aio_queue.cpp +++ b/src/plugins/posix/linux_aio_queue.cpp @@ -27,8 +27,6 @@ linuxAioQueue::linuxAioQueue(int num_entries, nixl_xfer_op_t operation) : io_ctx(io_context_t()), ios(num_entries), num_entries(num_entries), - num_ios_to_submit(0), - num_ios_to_complete(0), operation(operation) { if (num_entries <= 0) { throw std::runtime_error("Invalid number of entries for AIO queue"); @@ -52,37 +50,26 @@ linuxAioQueue::~linuxAioQueue() { } nixl_status_t -linuxAioQueue::submit(const nixl_meta_dlist_t &, const nixl_meta_dlist_t &) { - if (!num_ios_to_submit) { - return NIXL_IN_PROG; - } - - if (num_ios_to_complete) { - NIXL_ERROR << "previously submitted IO is not yet complete"; - return NIXL_ERR_NOT_ALLOWED; +linuxAioQueue::submitBatch(int start_idx, int count, int &submitted_count) { + // Submit the batch starting from start_idx + int ret = io_submit(io_ctx, count, &ios_to_submit[start_idx]); + if (ret < 0) { + NIXL_ERROR << absl::StrFormat("linux_aio submit failed: %s", nixl_strerror(-ret)); + submitted_count = 0; + return NIXL_ERR_BACKEND; } - int ret = io_submit(io_ctx, num_ios_to_submit, ios_to_submit.data()); - if (ret != num_ios_to_submit) { - if (ret < 0) { - NIXL_ERROR << absl::StrFormat("linux_aio submit failed: %s", nixl_strerror(-ret)); - } else { - NIXL_ERROR << absl::StrFormat( - "linux_aio submit failed. Partial submission: %d/%d", num_ios_to_submit, ret); - } - return NIXL_ERR_BACKEND; + // io_submit can return partial submissions + submitted_count = ret; + if (ret != count) { + NIXL_ERROR << absl::StrFormat("linux_aio submit partial: %d/%d", ret, count); } - num_ios_to_complete = ret; - return NIXL_IN_PROG; + return NIXL_SUCCESS; } nixl_status_t linuxAioQueue::checkCompleted() { - if (!num_ios_to_complete) { - return NIXL_SUCCESS; - } - struct io_event events[32]; int rc; struct timespec timeout = {0, 0}; @@ -100,9 +87,19 @@ linuxAioQueue::checkCompleted() { } } - num_ios_to_complete -= rc; + num_ios_outstanding -= rc; + if (!num_ios_outstanding) { + // No outstanding IOs, check if we need to submit more + if (num_ios_submitted_total < num_ios_to_submit) { + // More IOs to submit, signal to caller to submit more + return NIXL_IN_PROG; + } + // All IOs submitted and completed + num_ios_submitted_total = 0; + return NIXL_SUCCESS; + } - return num_ios_to_complete ? NIXL_IN_PROG : NIXL_SUCCESS; + return NIXL_IN_PROG; // Continue until all IOs are submitted and completed } nixl_status_t diff --git a/src/plugins/posix/linux_aio_queue.h b/src/plugins/posix/linux_aio_queue.h index 4f48a3d00..3fa93bb27 100644 --- a/src/plugins/posix/linux_aio_queue.h +++ b/src/plugins/posix/linux_aio_queue.h @@ -31,8 +31,6 @@ class linuxAioQueue : public nixlPosixQueue { std::vector ios; // Array of I/Os int num_entries; // Total number of entries expected std::vector ios_to_submit; // Array of I/Os to submit - int num_ios_to_submit; // Total number of entries to submit - int num_ios_to_complete; // Total number of entries to complete nixl_xfer_op_t operation; // Whether this is a read operation // Delete copy and move operations @@ -47,7 +45,7 @@ class linuxAioQueue : public nixlPosixQueue { linuxAioQueue(int num_entries, nixl_xfer_op_t operation); ~linuxAioQueue(); nixl_status_t - submit(const nixl_meta_dlist_t &, const nixl_meta_dlist_t &) override; + submitBatch(int start_idx, int count, int &submitted_count) override; nixl_status_t checkCompleted() override; nixl_status_t diff --git a/src/plugins/posix/meson.build b/src/plugins/posix/meson.build index c093815cd..378e76a2f 100644 --- a/src/plugins/posix/meson.build +++ b/src/plugins/posix/meson.build @@ -25,6 +25,7 @@ posix_sources = [ 'posix_backend.cpp', 'posix_backend.h', 'posix_plugin.cpp', + 'posix_queue.cpp', 'queue_factory_impl.cpp' ] diff --git a/src/plugins/posix/posix_backend.cpp b/src/plugins/posix/posix_backend.cpp index 92e12a1b1..49267a842 100644 --- a/src/plugins/posix/posix_backend.cpp +++ b/src/plugins/posix/posix_backend.cpp @@ -217,7 +217,17 @@ nixl_status_t nixlPosixBackendReqH::prepXfer() { } nixl_status_t nixlPosixBackendReqH::checkXfer() { - return queue->checkCompleted(); + nixl_status_t status = queue->checkCompleted(); + + if (status == NIXL_IN_PROG) { + // Submit more IOs to maintain target outstanding count + nixl_status_t submit_status = queue->submit(local, remote); + if (submit_status == NIXL_ERR_BACKEND || submit_status == NIXL_ERR_NOT_ALLOWED) { + return submit_status; + } + } + + return status; } nixl_status_t nixlPosixBackendReqH::postXfer() { diff --git a/src/plugins/posix/posix_queue.cpp b/src/plugins/posix/posix_queue.cpp new file mode 100644 index 000000000..9df20b791 --- /dev/null +++ b/src/plugins/posix/posix_queue.cpp @@ -0,0 +1,50 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "posix_queue.h" +#include + +nixl_status_t +nixlPosixQueue::submit(const nixl_meta_dlist_t &, const nixl_meta_dlist_t &) { + // If nothing left to submit, we're done + if (num_ios_submitted_total >= num_ios_to_submit) { + return NIXL_IN_PROG; + } + + // Calculate how many more we can submit to reach target outstanding + int remaining = num_ios_to_submit - num_ios_submitted_total; + int slots_available = MAX_IO_OUTSTANDING - num_ios_outstanding; + int to_submit = std::min(remaining, slots_available); + + // Nothing to submit if we're already at target outstanding + if (to_submit <= 0) { + return NIXL_IN_PROG; + } + + // Call queue-specific batch submission + int actual_submitted = 0; + nixl_status_t status = submitBatch(num_ios_submitted_total, to_submit, actual_submitted); + if (status != NIXL_SUCCESS) { + return status; + } + + // Update tracking with actual number submitted + num_ios_submitted_total += actual_submitted; + num_ios_outstanding += actual_submitted; + + return NIXL_IN_PROG; +} diff --git a/src/plugins/posix/posix_queue.h b/src/plugins/posix/posix_queue.h index d9251897c..7fc52e0a5 100644 --- a/src/plugins/posix/posix_queue.h +++ b/src/plugins/posix/posix_queue.h @@ -24,19 +24,29 @@ // Abstract base class for async I/O operations class nixlPosixQueue { - public: - virtual ~nixlPosixQueue() = default; - virtual nixl_status_t - submit (const nixl_meta_dlist_t &local, const nixl_meta_dlist_t &remote) = 0; - virtual nixl_status_t checkCompleted() = 0; - virtual nixl_status_t prepIO(int fd, void* buf, size_t len, off_t offset) = 0; +protected: + int num_ios_to_submit = 0; + int num_ios_submitted_total = 0; + int num_ios_outstanding = 0; + static constexpr int MAX_IO_OUTSTANDING = 16; - enum class queue_t { - AIO, - URING, - POSIXAIO, - UNSUPPORTED, - }; +public: + virtual ~nixlPosixQueue() = default; + virtual nixl_status_t + submitBatch(int start_idx, int count, int &submitted_count) = 0; + virtual nixl_status_t + checkCompleted() = 0; + virtual nixl_status_t + prepIO(int fd, void *buf, size_t len, off_t offset) = 0; + nixl_status_t + submit(const nixl_meta_dlist_t &local, const nixl_meta_dlist_t &remote); + + enum class queue_t { + AIO, + URING, + POSIXAIO, + UNSUPPORTED, + }; }; #endif // POSIX_QUEUE_H diff --git a/src/plugins/posix/uring_queue.cpp b/src/plugins/posix/uring_queue.cpp index a36a70968..53ddc2208 100644 --- a/src/plugins/posix/uring_queue.cpp +++ b/src/plugins/posix/uring_queue.cpp @@ -75,13 +75,13 @@ nixl_status_t UringQueue::init(int entries, const io_uring_params& params) { return NIXL_SUCCESS; } -UringQueue::UringQueue(int num_entries, const io_uring_params& params, nixl_xfer_op_t operation) - : num_entries(num_entries) - , num_completed(0) - , prep_op(operation == NIXL_READ ? - reinterpret_cast(io_uring_prep_read) : - reinterpret_cast(io_uring_prep_write)) -{ +UringQueue::UringQueue(int num_entries, const io_uring_params ¶ms, nixl_xfer_op_t operation) + : num_entries(num_entries), + num_completed(0), + descriptors(num_entries), + prep_op(operation == NIXL_READ ? + reinterpret_cast(io_uring_prep_read) : + reinterpret_cast(io_uring_prep_write)) { if (num_entries <= 0) { throw std::invalid_argument("Invalid number of entries for UringQueue"); } @@ -94,38 +94,43 @@ UringQueue::~UringQueue() { } nixl_status_t -UringQueue::submit (const nixl_meta_dlist_t &local, const nixl_meta_dlist_t &remote) { - for (auto [local_it, remote_it] = std::make_pair (local.begin(), remote.begin()); - local_it != local.end() && remote_it != remote.end(); - ++local_it, ++remote_it) { - int fd = remote_it->devId; - void *buf = reinterpret_cast (local_it->addr); - size_t len = local_it->len; - off_t offset = remote_it->addr; +UringQueue::submitBatch(int start_idx, int count, int &submitted_count) { + // Prepare SQEs for the batch + for (int i = 0; i < count; i++) { + int idx = start_idx + i; + auto &desc = descriptors[idx]; struct io_uring_sqe *sqe = io_uring_get_sqe (&uring); if (!sqe) { NIXL_ERROR << "Failed to get io_uring submission queue entry"; + submitted_count = 0; return NIXL_ERR_BACKEND; } - prep_op (sqe, fd, buf, len, offset); + prep_op(sqe, desc.fd, desc.buf, desc.len, desc.offset); } + // Submit the batch int ret = io_uring_submit(&uring); - if (ret != num_entries) { - if (ret < 0) { - NIXL_ERROR << absl::StrFormat("io_uring submit failed: %s", nixl_strerror(-ret)); - } else { - NIXL_ERROR << absl::StrFormat("io_uring submit failed. Partial submission: %d/%d", num_entries, ret); - } + if (ret < 0) { + NIXL_ERROR << absl::StrFormat("io_uring submit failed: %s", nixl_strerror(-ret)); + submitted_count = 0; return NIXL_ERR_BACKEND; } - num_completed = 0; - return NIXL_IN_PROG; + + // io_uring_submit can return partial submissions + submitted_count = ret; + if (ret != count) { + NIXL_ERROR << absl::StrFormat("io_uring submit partial: %d/%d", ret, count); + } + + return NIXL_SUCCESS; } nixl_status_t UringQueue::checkCompleted() { - if (num_completed == num_entries) { + // Check if all IOs are submitted and completed + if (num_ios_submitted_total >= num_ios_to_submit && num_completed == num_ios_to_submit) { + num_ios_submitted_total = 0; + num_ios_to_submit = 0; return NIXL_SUCCESS; } @@ -147,12 +152,20 @@ nixl_status_t UringQueue::checkCompleted() { // Mark all seen io_uring_cq_advance(&uring, count); num_completed += count; + num_ios_outstanding -= count; - logOnPercentStep(num_completed, num_entries); + logOnPercentStep(num_completed, num_ios_to_submit); - return (num_completed == num_entries) ? NIXL_SUCCESS : NIXL_IN_PROG; + return NIXL_IN_PROG; // Continue until all IOs are submitted and completed } nixl_status_t UringQueue::prepIO(int fd, void* buf, size_t len, off_t offset) { + if (num_ios_to_submit >= num_entries) { + NIXL_ERROR << "No available io_uring entries"; + return NIXL_ERR_BACKEND; + } + + descriptors[num_ios_to_submit] = {fd, buf, len, offset}; + num_ios_to_submit++; return NIXL_SUCCESS; } diff --git a/src/plugins/posix/uring_queue.h b/src/plugins/posix/uring_queue.h index 61d66be37..79e45a5bd 100644 --- a/src/plugins/posix/uring_queue.h +++ b/src/plugins/posix/uring_queue.h @@ -30,9 +30,17 @@ typedef void (*io_uring_prep_func_t)(struct io_uring_sqe*, int, const void*, uns class UringQueue : public nixlPosixQueue { private: + struct io_descriptor { + int fd; + void *buf; + size_t len; + off_t offset; + }; + struct io_uring uring; // The io_uring instance for async I/O operations const int num_entries; // Total number of entries expected in this ring int num_completed; // Number of completed operations so far + std::vector descriptors; // Stored descriptor information io_uring_prep_func_t prep_op; // Pointer to prep function // Initialize the queue with the given parameters @@ -48,7 +56,7 @@ class UringQueue : public nixlPosixQueue { UringQueue(int num_entries, const struct io_uring_params& params, nixl_xfer_op_t operation); ~UringQueue(); nixl_status_t - submit (const nixl_meta_dlist_t &local, const nixl_meta_dlist_t &remote) override; + submitBatch(int start_idx, int count, int &submitted_count) override; nixl_status_t checkCompleted() override; nixl_status_t prepIO(int fd, void* buf, size_t len, off_t offset) override; };