Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions tests/rptest/redpanda_cloud_tests/high_throughput_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1926,6 +1926,95 @@ def stage_tiered_storage_consuming(self):
benchmark.wait(timeout_sec=benchmark_time_min * 60)
benchmark.check_succeed()

@cluster(num_nodes=3, log_allow_list=RESTART_LOG_ALLOW_LIST)
def test_cloud_topics_cold_read(self):
"""Cloud-topics analog of test_ts_resource_utilization: steady produce
on a storage.mode=cloud topic while a large backlog is read cold from
object storage. Requires cloud topics enabled on the cluster's tier.
The backlog volume and drain timeout are calibration knobs: confirm on
first runs that object-storage GETs climb during the drain and that it
completes within the timeout for the target tier."""
self._create_topic_spec()
self.stage_cloud_topics_cold_read()
self.redpanda.assert_cluster_is_reusable()

def stage_cloud_topics_cold_read(self):
# Cloud-topics version of stage_tiered_storage_consuming. Cloud topics
# are cloud-first (~zero local retention), so a backlog larger than the
# batch cache is read cold from object storage. A steady producer keeps
# writing while a consumer drains that backlog cold, so the produce path
# (producer_upload) and the cold fetches (consumer_fetch) share the
# per-shard S3 connection pool. On a production-sized pool this is a
# throughput/functionality check -- cold reads drain at tier scale while
# produce keeps flowing -- not the pool-saturation floor gate (that is
# the cloud_topics_cold_read scale test).
self.logger.info("Starting stage_cloud_topics_cold_read")

# Recreate self.topic in cloud storage mode (overrides the spec from
# _create_topic_spec, which only supplies the name).
self.rpk.create_topic(
self.topic,
partitions=self._partitions_upper_limit,
replicas=3,
config={
TopicSpec.PROPERTY_STORAGE_MODE: TopicSpec.STORAGE_MODE_CLOUD,
"cleanup.policy": "delete",
},
)

producer = KgoVerifierProducer(
self.test_context,
self.redpanda,
self.topic,
msg_size=self.msg_size,
msg_count=5_000_000_000_000,
rate_limit_bps=self._advertised_max_ingress,
)
producer.start()

# Build a backlog far larger than the in-memory batch cache so the reads
# miss it and fetch L1 cold: ~4 minutes of max-rate ingress. The volume
# (via the fill time) is a per-tier calibration knob.
backlog_fill_s = 4 * 60
backlog_bytes = backlog_fill_s * self._advertised_max_ingress
messages_to_produce = int(backlog_bytes / self.msg_size)
produce_timeout_s = 1.5 * backlog_bytes / self._advertised_max_ingress
wait_until(
lambda: producer.produce_status.acked >= messages_to_produce,
timeout_sec=produce_timeout_s,
backoff_sec=5,
err_msg=f"Could not ack production of {messages_to_produce} messages "
f"in {produce_timeout_s:.0f} s",
)

# Drain the backlog cold from offset 0 while produce continues. Allow
# generous headroom: cold L1 fetches are slower than the warm path.
backlog = producer.produce_status.acked
consumer = RpkConsumer(
self._ctx,
self.redpanda,
self.topic,
offset="oldest",
num_msgs=messages_to_produce,
)
consumer.start()
drain_timeout_s = 5 * backlog_bytes / self._advertised_max_ingress
wait_until(
lambda: consumer.message_count >= messages_to_produce,
timeout_sec=drain_timeout_s,
backoff_sec=5,
err_msg=f"Cold consumer could not drain {messages_to_produce} msgs "
f"in {drain_timeout_s:.0f} s",
)
consumer.stop()
consumer.free()

# Produce kept advancing throughout the cold drain (no stall).
assert producer.produce_status.acked > backlog, (
"produce did not advance during the cold drain"
)
producer.stop()

def _prepare_omb_workload(
self, ramp_time, duration, partitions, rate, msg_size, producers, consumers
):
Expand Down
Loading
Loading