Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions src/v/cloud_io/tests/db_s3_imposter_fixture.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ ss::future<> write_iobuf_to_stream(iobuf buf, ss::output_stream<char> out) {
co_await out.close();
}

// Bodies up to this size are inlined into the reply's sstring content,
// which causes seastar's httpd to set Content-Length automatically.
// cloud_io::remote::download_stream requires Content-Length, so any test
// that drives it through this imposter must keep payloads under this cap.
// Larger bodies are streamed via chunked transfer encoding.
inline constexpr size_t k_small_response_threshold = 100 * 1024;

} // namespace

// Async HTTP handler backed by the LSM database. Holds references to
Expand Down Expand Up @@ -164,8 +171,18 @@ struct db_s3_imposter_fixture::handler : ss::httpd::handler_base {
result->trim_back(result->size_bytes() - (end - start + 1));
}

// Stream the body from the iobuf fragments rather than
// linearizing to sstring (which has a 128KiB size limit).
repl.set_content_type("xml");

// For bodies that fit in an sstring, inline them so seastar's httpd
// sets Content-Length automatically; download_stream consumers (e.g.
// range GETs) require Content-Length and don't accept chunked
// transfer encoding. Larger bodies fall through to streaming chunked
// encoding -- tests using such payloads must use download_object, not
// download_stream.
if (result->size_bytes() <= k_small_response_threshold) {
repl._content = result->linearize_to_string();
co_return;
}
repl.write_body(
"xml",
ss::http::body_writer_type([result = std::move(*result)](
Expand Down
53 changes: 53 additions & 0 deletions src/v/cloud_io/tests/db_s3_imposter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,41 @@ class db_s3_imposter_test
.get();
}

// Downloads via download_stream (rather than download_object), reading
// the streamed body to a string. download_stream requires a
// Content-Length on the response.
std::pair<download_result, ss::sstring> download_via_stream(
std::string_view key,
std::optional<cloud_storage_clients::http_byte_range> range
= std::nullopt) {
retry_chain_node rtc(never_abort, 5s, 100ms);
ss::sstring out;
auto res = remote()
.download_stream(
{.bucket = bucket_name,
.key = object_key(key),
.parent_rtc = rtc},
[&out](
this auto,
uint64_t content_length,
ss::input_stream<char> in) -> ss::future<uint64_t> {
while (true) {
auto buf = co_await in.read();
if (buf.empty()) {
break;
}
out.append(buf.get(), buf.size());
}
co_await in.close();
co_return content_length;
},
"test-download-stream",
/*acquire_hydration_units=*/true,
range)
.get();
return {res, out};
}

private:
std::unique_ptr<scoped_remote> _scoped;
};
Expand Down Expand Up @@ -285,3 +320,21 @@ TEST_F(db_s3_imposter_test, multipart_abort) {

EXPECT_EQ(head("aborted/obj"), download_result::notfound);
}

// download_stream needs a Content-Length, which the imposter can only set on
// its small-body inline path -- see k_small_response_threshold in
// db_s3_imposter_fixture.cc for why. These keep payloads small to stay on it.
TEST_F(db_s3_imposter_test, download_stream_whole_object) {
ASSERT_EQ(upload("ds/whole", "hello world"), upload_result::success);
auto [res, content] = download_via_stream("ds/whole");
EXPECT_EQ(res, download_result::success);
EXPECT_EQ(content, "hello world");
}

TEST_F(db_s3_imposter_test, download_stream_byte_range) {
ASSERT_EQ(upload("ds/range", "0123456789"), upload_result::success);
auto [res, content] = download_via_stream(
"ds/range", cloud_storage_clients::http_byte_range{2, 5});
EXPECT_EQ(res, download_result::success);
EXPECT_EQ(content, "2345");
}
26 changes: 16 additions & 10 deletions src/v/cloud_topics/level_one/domain/tests/db_domain_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -232,16 +232,22 @@ void flush_as_manifest(
term_state_update_t new_terms) {
auto domain_prefix = cloud_storage_clients::object_key{
domain_cloud_prefix(uuid)};
auto cloud_db = lsm::database::open(
{.database_epoch = db_epoch},
lsm::io::persistence{
.data = lsm::io::open_cloud_cache_data_persistence(
cache, remote, bucket, domain_prefix)
.get(),
.metadata = lsm::io::open_cloud_metadata_persistence(
remote, bucket, domain_prefix)
.get()})
.get();
auto cloud_db
= lsm::database::open(
{.database_epoch = db_epoch},
lsm::io::persistence{
.data = lsm::io::open_cloud_cache_data_persistence(
cache,
remote,
bucket,
domain_prefix,
config::shard_local_cfg()
.cloud_topics_metastore_sst_chunk_size.bind())
.get(),
.metadata = lsm::io::open_cloud_metadata_persistence(
remote, bucket, domain_prefix)
.get()})
.get();

// Pre-register the object IDs before building the add_objects rows.
preregister_objects_db_update prereg_update;
Expand Down
7 changes: 6 additions & 1 deletion src/v/cloud_topics/level_one/metastore/lsm/replicated_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,12 @@ replicated_database::open(

auto data_persist_fut = co_await ss::coroutine::as_future(
lsm::io::open_cloud_cache_data_persistence(
cache, remote, bucket, domain_prefix));
cache,
remote,
bucket,
domain_prefix,
config::shard_local_cfg()
.cloud_topics_metastore_sst_chunk_size.bind()));
if (data_persist_fut.failed()) {
co_return std::unexpected(wrap_failed_future(
data_persist_fut.get_exception(), "Failed to open data persistence"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,9 @@ class ReplicatedDatabaseTest
&test_cache.local(),
&sr->remote.local(),
bucket_name,
domain_prefix)
domain_prefix,
config::shard_local_cfg()
.cloud_topics_metastore_sst_chunk_size.bind())
.get(),
.metadata = lsm::io::open_cloud_metadata_persistence(
&sr->remote.local(), bucket_name, domain_prefix)
Expand Down
6 changes: 5 additions & 1 deletion src/v/cloud_topics/read_replica/snapshot_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,11 @@ ss::future<> database_refresher::open_or_refresh() {
cloud_storage_clients::object_key domain_prefix{
domain_cloud_prefix(domain_uuid_)};
auto data_persist = co_await lsm::io::open_cloud_cache_data_persistence(
cache_, remote_, bucket_, domain_prefix);
cache_,
remote_,
bucket_,
domain_prefix,
config::shard_local_cfg().cloud_topics_metastore_sst_chunk_size.bind());
auto meta_persist = co_await lsm::io::open_cloud_metadata_persistence(
remote_, bucket_, domain_prefix);
lsm::io::persistence io{
Expand Down
7 changes: 6 additions & 1 deletion src/v/cloud_topics/read_replica/tests/db_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "cloud_topics/level_one/metastore/lsm/state_reader.h"
#include "cloud_topics/level_one/metastore/lsm/state_update.h"
#include "cloud_topics/level_one/metastore/lsm/write_batch_row.h"
#include "config/configuration.h"
#include "container/chunked_hash_map.h"
#include "lsm/io/cloud_cache_persistence.h"
#include "lsm/lsm.h"
Expand Down Expand Up @@ -49,7 +50,11 @@ inline ss::future<lsm::database*> get_or_create_writer_db(
cloud_storage_clients::object_key domain_prefix{cloud_prefix};

auto data_persist = co_await lsm::io::open_cloud_cache_data_persistence(
cache, remote, bucket, domain_prefix);
cache,
remote,
bucket,
domain_prefix,
config::shard_local_cfg().cloud_topics_metastore_sst_chunk_size.bind());
auto meta_persist = co_await lsm::io::open_cloud_metadata_persistence(
remote, bucket, domain_prefix);

Expand Down
12 changes: 12 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4996,6 +4996,18 @@ configuration::configuration()
{.needs_restart = needs_restart::yes, .visibility = visibility::tunable},
3,
{.min = 1})
, cloud_topics_metastore_sst_chunk_size(
*this,
"cloud_topics_metastore_sst_chunk_size",
"Size of the byte ranges used to read metastore LSM SST files from "
"object storage into the local cache. Reads hydrate only the chunks they "
"cover, and a read spanning several uncached chunks fetches them in "
"parallel. Smaller chunks reduce read and cache amplification for point "
"reads; larger chunks reduce request count for scans. Changing the value "
"invalidates previously cached chunks.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
24_MiB,
{.min = 1_MiB})
, cloud_topics_produce_write_inflight_limit(
*this,
"cloud_topics_produce_write_inflight_limit",
Expand Down
1 change: 1 addition & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,7 @@ struct configuration final : public config_store {
property<std::chrono::milliseconds>
cloud_topics_long_term_file_deletion_delay;
bounded_property<int32_t> cloud_topics_num_metastore_partitions;
bounded_property<size_t> cloud_topics_metastore_sst_chunk_size;

bounded_property<size_t> cloud_topics_produce_write_inflight_limit;
bounded_property<size_t> cloud_topics_produce_no_pid_concurrency;
Expand Down
6 changes: 4 additions & 2 deletions src/v/lsm/block/tests/contents_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ TEST_CORO(Contents, StringView) {
co_await file->append(b.share());
co_await file->close();
}
auto file = co_await persistence->open_random_access_reader({});
auto file = co_await persistence->open_random_access_reader(
{}, b.size_bytes());
ASSERT_TRUE_CORO(bool(file));
for (auto offset : std::to_array<size_t>({0, 1, 2, 3, 4, 5, 10, 64_KiB})) {
auto buf = b.share(offset, b.size_bytes() - offset);
Expand Down Expand Up @@ -79,7 +80,8 @@ TEST_CORO(Contents, IobufShare) {
co_await file->append(b.share());
co_await file->close();
}
auto file = co_await persistence->open_random_access_reader({});
auto file = co_await persistence->open_random_access_reader(
{}, b.size_bytes());
ASSERT_TRUE_CORO(bool(file));
for (auto offset : std::to_array<size_t>({0, 1, 2, 3, 4, 5, 10, 64_KiB})) {
auto buf = b.share(offset, b.size_bytes() - offset);
Expand Down
3 changes: 2 additions & 1 deletion src/v/lsm/db/table_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,8 @@ class table_cache::impl {

ss::future<ss::lw_shared_ptr<sst::reader>>
open_reader(internal::file_handle h, uint64_t file_size) {
auto file = co_await _persistence->open_random_access_reader(h);
auto file = co_await _persistence->open_random_access_reader(
h, file_size);
if (!file) {
throw invalid_argument_exception("file for ID {} is not found", h);
}
Expand Down
5 changes: 3 additions & 2 deletions src/v/lsm/db/tests/compaction_task_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@ class fault_injecting_data_persistence final
}

ss::future<lsm::io::optional_pointer<lsm::io::random_access_file_reader>>
open_random_access_reader(lsm::internal::file_handle h) override {
return _inner->open_random_access_reader(h);
open_random_access_reader(
lsm::internal::file_handle h, uint64_t file_size) override {
return _inner->open_random_access_reader(h, file_size);
}

ss::future<std::unique_ptr<lsm::io::sequential_file_writer>>
Expand Down
8 changes: 4 additions & 4 deletions src/v/lsm/db/tests/impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class proxy_data_persistence : public io::data_persistence {
: _p(p) {}

ss::future<io::optional_pointer<io::random_access_file_reader>>
open_random_access_reader(file_handle h) override {
return _p->open_random_access_reader(h);
open_random_access_reader(file_handle h, uint64_t file_size) override {
return _p->open_random_access_reader(h, file_size);
}

ss::future<std::unique_ptr<io::sequential_file_writer>>
Expand Down Expand Up @@ -84,9 +84,9 @@ class tracking_data_persistence : public io::data_persistence {
: _underlying(underlying) {}

ss::future<io::optional_pointer<io::random_access_file_reader>>
open_random_access_reader(file_handle h) override {
open_random_access_reader(file_handle h, uint64_t file_size) override {
_opened_files.insert(h);
return _underlying->open_random_access_reader(h);
return _underlying->open_random_access_reader(h, file_size);
}

ss::future<std::unique_ptr<io::sequential_file_writer>>
Expand Down
27 changes: 26 additions & 1 deletion src/v/lsm/io/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ redpanda_cc_library(
srcs = ["cloud_cache_persistence.cc"],
hdrs = ["cloud_cache_persistence.h"],
implementation_deps = [
":chunked_remote_file_reader",
":file_io",
"//src/v/cloud_io:io_result",
"//src/v/config",
"//src/v/lsm/core:exceptions",
"//src/v/lsm/core/internal:files",
"//src/v/ssx:future_util",
Expand All @@ -66,6 +66,31 @@ redpanda_cc_library(
"//src/v/cloud_io:cache",
"//src/v/cloud_io:remote",
"//src/v/cloud_storage_clients",
"//src/v/config",
"@seastar",
],
)

redpanda_cc_library(
name = "chunked_remote_file_reader",
srcs = ["chunked_remote_file_reader.cc"],
hdrs = ["chunked_remote_file_reader.h"],
implementation_deps = [
":file_io",
"//src/v/cloud_io:io_result",
"//src/v/container:chunked_vector",
"//src/v/lsm/core:exceptions",
"@fmt",
],
deps = [
":persistence",
"//src/v/base",
"//src/v/bytes:ioarray",
"//src/v/cloud_io:cache",
"//src/v/cloud_io:remote",
"//src/v/cloud_storage_clients",
"//src/v/container:chunked_hash_map",
"//src/v/utils:retry_chain_node",
"@seastar",
],
)
Expand Down
Loading
Loading