Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/v/cloud_io/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,15 @@ redpanda_cc_library(

redpanda_cc_library(
name = "scheduler_types",
srcs = [
"scheduler_types.cc",
],
hdrs = [
"scheduler_types.h",
],
include_prefix = "cloud_io",
implementation_deps = [
"//src/v/strings:string_switch",
],
visibility = ["//visibility:public"],
deps = [
"//src/v/base",
Expand Down Expand Up @@ -193,6 +198,10 @@ redpanda_cc_library(
hdrs = [
"scheduler.h",
],
implementation_deps = [
":reservation_policy",
"//src/v/base",
],
include_prefix = "cloud_io",
visibility = ["//visibility:public"],
deps = [
Expand Down Expand Up @@ -225,6 +234,7 @@ redpanda_cc_library(
":reservation_policy_types",
":scheduler_policy",
":scheduler_types",
"//src/v/metrics",
"@seastar",
],
)
Expand Down
154 changes: 154 additions & 0 deletions src/v/cloud_io/reservation_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,16 @@
#include "base/vassert.h"
#include "base/vlog.h"
#include "cloud_io/logger.h"
#include "config/configuration.h"
#include "metrics/metrics.h"
#include "metrics/prometheus_sanitize.h"
#include "ssx/sformat.h"

#include <seastar/core/coroutine.hh>
#include <seastar/core/future-util.hh>
#include <seastar/core/lowres_clock.hh>
#include <seastar/core/manual_clock.hh>
#include <seastar/core/metrics.hh>
#include <seastar/util/log.hh>

#include <fmt/ranges.h>
Expand All @@ -26,6 +31,7 @@
#include <functional>
#include <ranges>
#include <utility>
#include <vector>

namespace cloud_io {

Expand Down Expand Up @@ -64,6 +70,9 @@ reservation_policy<Clock>::reservation_policy(

_reclaim_timer.arm(reclaim_interval);

setup_metrics();
setup_public_metrics();

vlog(
log.info,
"reservation_policy initialized: capacity={} dwell={}s "
Expand All @@ -76,6 +85,8 @@ reservation_policy<Clock>::reservation_policy(
template<class Clock>
ss::future<> reservation_policy<Clock>::stop() {
_reclaim_timer.cancel();
_metrics.clear();
_public_metrics.clear();
for (auto& gs : _groups) {
gs.stop();
}
Expand Down Expand Up @@ -247,6 +258,11 @@ reservation_policy<Clock>::admit_immediate_total(group_id g) const noexcept {
return _groups[g].admit_immediate_total;
}

template<class Clock>
uint64_t reservation_policy<Clock>::canceled_total(group_id g) const noexcept {
return _groups[g].canceled_total;
}

template<class Clock>
size_t reservation_policy<Clock>::total_waiters() const noexcept {
return std::ranges::fold_left(
Expand All @@ -255,6 +271,14 @@ size_t reservation_policy<Clock>::total_waiters() const noexcept {
});
}

template<class Clock>
uint64_t reservation_policy<Clock>::total_canceled() const noexcept {
return std::ranges::fold_left(
_groups, uint64_t{0}, [](uint64_t acc, const auto& gs) {
return acc + gs.canceled_total;
});
}

template<class Clock>
void reservation_policy<Clock>::reclaim_idle_reservations() {
for (auto& gs : _groups) {
Expand Down Expand Up @@ -289,6 +313,136 @@ void reservation_policy<Clock>::put_common_slots(size_t n) noexcept {
_shared += n;
}

template<class Clock>
void reservation_policy<Clock>::setup_metrics() {
if (config::shard_local_cfg().disable_metrics()) {
return;
}

namespace sm = ss::metrics;
const auto group_name = prometheus_sanitize::metrics_name(
"cloud_io_scheduler");
constexpr auto group_label_key = "group_id";

_metrics.add_group(
group_name,
{
sm::make_gauge(
"available_slots",
[this] { return available_slots(); },
sm::description(
"Total slots currently available (shared + all reserved).")),
sm::make_gauge(
"total_capacity",
[this] { return total_capacity(); },
sm::description("Configured total slot capacity.")),
sm::make_gauge(
"total_waiters",
[this] { return total_waiters(); },
sm::description("Total fibers queued across all groups.")),
sm::make_counter(
"total_waiters_canceled",
[this] { return total_canceled(); },
sm::description(
"Total waiters that aborted while queued across all groups.")),
});

for (auto g : all_group_ids) {
const std::vector<sm::label_instance> labels{
sm::label(group_label_key)(ssx::sformat("{}", g))};

_metrics.add_group(
group_name,
{
sm::make_gauge(
"in_flight",
[this, g] { return in_flight(g); },
sm::description("Concurrent ops currently holding a slot."),
labels),
sm::make_gauge(
"waiters",
[this, g] { return waiters(g); },
sm::description("Fibers queued on this group."),
labels),
sm::make_counter(
"admit_total",
[this, g] { return admit_total(g); },
sm::description("Total admit() calls completed for this group."),
labels),
sm::make_counter(
"admit_immediate_total",
[this, g] { return admit_immediate_total(g); },
sm::description(
"admit() calls that took the fast path (no queue)."),
labels),
sm::make_counter(
"canceled_total",
[this, g] { return canceled_total(g); },
sm::description(
"Waiters that aborted while queued on this group."),
labels),
sm::make_gauge(
"current_reserved",
[this, g] { return current_reserved(g); },
sm::description(
"Runtime reservation size. Starts at target_reserved; "
"reclaimed when idle past dwell; rebuilt via refill."),
labels),
});
}
}

template<class Clock>
void reservation_policy<Clock>::setup_public_metrics() {
if (config::shard_local_cfg().disable_public_metrics()) {
return;
}

namespace sm = ss::metrics;
const auto group_name = prometheus_sanitize::metrics_name(
"cloud_io_scheduler");
constexpr auto group_label_key = "group_id";
const auto aggregate_labels = std::vector<sm::label>{sm::shard_label};

_public_metrics.add_group(
group_name,
{
sm::make_gauge(
"available_slots",
[this] { return available_slots(); },
sm::description(
"Total slots currently available (shared + all reserved)."))
.aggregate(aggregate_labels),
sm::make_gauge(
"total_capacity",
[this] { return total_capacity(); },
sm::description("Configured total slot capacity."))
.aggregate(aggregate_labels),
});

for (auto g : all_group_ids) {
const std::vector<sm::label_instance> labels{
sm::label(group_label_key)(ssx::sformat("{}", g))};

_public_metrics.add_group(
group_name,
{
sm::make_gauge(
"in_flight",
[this, g] { return in_flight(g); },
sm::description("Concurrent ops currently holding a slot."),
labels)
.aggregate(aggregate_labels),
sm::make_gauge(
"waiters",
[this, g] { return waiters(g); },
sm::description("Fibers queued on this group."),
labels)
.aggregate(aggregate_labels),
});
}
}

template class reservation_policy<ss::lowres_clock>;
template class reservation_policy<ss::manual_clock>;

Expand Down
14 changes: 14 additions & 0 deletions src/v/cloud_io/reservation_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "cloud_io/reservation_policy_types.h"
#include "cloud_io/scheduler_policy.h"
#include "cloud_io/scheduler_types.h"
#include "metrics/metrics.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/future.hh>
Expand Down Expand Up @@ -99,9 +100,15 @@ class reservation_policy final : public scheduler_policy {
/// Lifetime fast-path-admit count for a group.
uint64_t admit_immediate_total(group_id) const noexcept;

/// Lifetime count of waiters that aborted while queued for a group.
uint64_t canceled_total(group_id) const noexcept;

/// Total queued waiters across all groups.
size_t total_waiters() const noexcept;

/// Lifetime count of canceled waiters across all groups.
uint64_t total_canceled() const noexcept;

private:
/// Dispatch the next queued waiter on behalf of a release on
/// `releasing_group`. Three-tier choice:
Expand Down Expand Up @@ -133,6 +140,10 @@ class reservation_policy final : public scheduler_policy {

void put_common_slots(size_t n) noexcept;

void setup_metrics();

void setup_public_metrics();

size_t _current_total_capacity{0};
/// The common pool of slots. Any group can claim from it; releases
/// go back here unless refill diverts them into a group's
Expand All @@ -150,6 +161,9 @@ class reservation_policy final : public scheduler_policy {
static constexpr
typename Clock::duration reclaim_interval = std::chrono::seconds{1};
ss::timer<Clock> _reclaim_timer;

metrics::internal_metric_groups _metrics;
metrics::public_metric_groups _public_metrics;
};

extern template class reservation_policy<ss::lowres_clock>;
Expand Down
6 changes: 6 additions & 0 deletions src/v/cloud_io/reservation_policy_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,11 @@ struct reservation_group_state {
uint64_t admit_total = 0;
uint64_t admit_immediate_total = 0;

/// Lifetime count of waiters that aborted while queued (incl. the
/// shutdown sweep in stop()). Bumped once per waiter in
/// reservation_waiter::cancel().
uint64_t canceled_total = 0;

/// Timestamp of the most recent `active` → `dwelling` transition.
/// nullopt if the group has never been `active`.
std::optional<time_point> inactive_since;
Expand Down Expand Up @@ -474,6 +479,7 @@ void reservation_waiter<Clock>::cancel() noexcept {
state == waiter_state::enqueued,
"reservation_waiter::cancel: linked waiter in non-enqueued state");
state = waiter_state::canceled;
++_gs->canceled_total;
p.set_exception(
std::make_exception_ptr(ss::abort_requested_exception{}));
_gs->maybe_mark_inactive();
Expand Down
6 changes: 4 additions & 2 deletions src/v/cloud_io/scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
*/
#include "cloud_io/scheduler.h"

#include "base/vassert.h"
#include "cloud_io/reservation_policy.h"
#include "cloud_io/scheduler_policy.h"

#include <seastar/core/coroutine.hh>
Expand Down Expand Up @@ -50,7 +50,9 @@ scheduler::make_policy(size_t capacity, scheduler_config cfg) {
case policy_type::passthrough:
return std::make_unique<passthrough>(capacity);
case policy_type::reservation:
vassert(false, "reservation_policy not yet supported");
return std::make_unique<reservation_policy<>>(
capacity,
std::move(cfg.reservation).value_or(reservation_policy_config{}));
}
std::unreachable();
}
Expand Down
52 changes: 52 additions & 0 deletions src/v/cloud_io/scheduler_types.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2026 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/
#include "cloud_io/scheduler_types.h"

#include "strings/string_switch.h"

#include <charconv>

namespace cloud_io {

std::optional<group_target>
try_parse_target_spec(std::string_view spec) noexcept {
const auto colon = spec.find(':');
if (colon == std::string_view::npos || colon == 0) {
return std::nullopt;
}
const auto name = spec.substr(0, colon);
const auto value_str = spec.substr(colon + 1);
if (value_str.empty()) {
return std::nullopt;
}
uint32_t value = 0;
const auto [end, ec] = std::from_chars(
value_str.data(), value_str.data() + value_str.size(), value);
if (ec != std::errc{} || end != value_str.data() + value_str.size()) {
return std::nullopt;
}
const auto g = string_switch<std::optional<group_id>>(name)
.match(
to_string_view(group_id::producer_upload),
group_id::producer_upload)
.match(
to_string_view(group_id::consumer_fetch),
group_id::consumer_fetch)
.match(
to_string_view(group_id::default_group),
group_id::default_group)
.default_match(std::nullopt);
if (!g.has_value()) {
return std::nullopt;
}
return group_target{*g, value};
}

} // namespace cloud_io
Loading
Loading