Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tests/ducktape/consumer_benchmark_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,10 @@ def __init__(self,
min_consumption_rate: float = 1.0,
max_avg_latency_ms: float = 5000.0,
max_p95_latency_ms: float = 10000.0,
min_success_rate: float = 0.95,
min_success_rate: float = 0.90,
max_error_rate: float = 0.05,
max_memory_growth_mb: float = 600.0,
min_messages_per_consume: float = 1.0,
min_messages_per_consume: float = 0.5,
max_empty_consume_rate: float = 0.5):
self.min_consumption_rate = min_consumption_rate
self.max_avg_latency_ms = max_avg_latency_ms
Expand Down
14 changes: 7 additions & 7 deletions tests/ducktape/producer_benchmark_bounds.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,24 @@
"local": {
"_comment": "Default bounds for local development - more relaxed thresholds",
"min_throughput_msg_per_sec": 1000.0,
"max_p95_latency_ms": 2000.0,
"max_p95_latency_ms": 6000.0,
"max_error_rate": 0.02,
"min_success_rate": 0.98,
"max_p99_latency_ms": 3000.0,
"max_p99_latency_ms": 7000.0,
"max_memory_growth_mb": 800.0,
"max_buffer_full_rate": 0.05,
"min_messages_per_poll": 10.0
"min_messages_per_poll": 5.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we should just remove min_messages_per_poll, as messages_per_poll is essentially equal to the batch_size according to our code

},
"ci": {
"_comment": "Stricter bounds for CI environment - production-like requirements",
"min_throughput_msg_per_sec": 1500.0,
"max_p95_latency_ms": 1500.0,
"max_p95_latency_ms": 11000.0,
"max_error_rate": 0.01,
"min_success_rate": 0.99,
"max_p99_latency_ms": 2500.0,
"max_memory_growth_mb": 600.0,
"max_p99_latency_ms": 12000.0,
"max_memory_growth_mb": 800.0,
"max_buffer_full_rate": 0.03,
"min_messages_per_poll": 10.0
"min_messages_per_poll": 5.0
},
"_default_environment": "local"
}
10 changes: 10 additions & 0 deletions tests/ducktape/producer_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@
}
return self.create_producer(config_overrides=overrides)

def produce_messages(self, topic_name, test_duration, start_time, message_formatter,

Check failure on line 428 in tests/ducktape/producer_strategy.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

tests/ducktape/producer_strategy.py#L428

Refactor this function to reduce its Cognitive Complexity from 78 to the 15 allowed.
delivered_container, failed_container=None, serialization_type=None,
use_transaction=False):

Expand Down Expand Up @@ -509,6 +509,16 @@
pending_futures.append((delivery_future, message_key)) # Store delivery future
messages_sent += 1

# Use configured polling interval (default to 50 if not set)
poll_interval = getattr(self, 'poll_interval', 50)

if messages_sent % poll_interval == 0:
poll_start = time.time()
await producer.poll(0)
poll_times.append(time.time() - poll_start)
if self.metrics:
self.metrics.record_poll()

except Exception as e:
if failed_container is not None:
failed_container.append(e)
Expand Down
3 changes: 2 additions & 1 deletion tests/ducktape/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,9 +570,10 @@ def _run_consumer_performance_benchmark(

# Validate against performance bounds
if not is_valid:
self.logger.warning(
self.logger.error(
"Performance bounds validation failed: %s", "; ".join(violations)
)
assert False, f"Performance bounds validation failed: {'; '.join(violations)}"

self.logger.info(
f"Successfully completed basic {operation_type} test with comprehensive metrics"
Expand Down
18 changes: 13 additions & 5 deletions tests/ducktape/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,10 @@ def message_formatter(msg_num):

# Validate against performance bounds
if not is_valid:
self.logger.warning(
self.logger.error(
"Performance bounds validation failed: %s", "; ".join(violations)
)
assert False, f"Performance bounds validation failed: {'; '.join(violations)}"

self.logger.info(
"Successfully completed basic production test with comprehensive metrics"
Expand Down Expand Up @@ -219,9 +220,10 @@ def message_formatter(msg_num):

# Validate against performance bounds
if not is_valid:
self.logger.warning(
self.logger.error(
"Performance bounds validation failed: %s", "; ".join(violations)
)
assert False, f"Performance bounds validation failed: {'; '.join(violations)}"

self.logger.info(
"Successfully completed basic production test with comprehensive metrics with transaction"
Expand Down Expand Up @@ -325,11 +327,12 @@ def message_formatter(msg_num):

# Validate against performance bounds
if not is_valid:
self.logger.warning(
self.logger.error(
"Performance bounds validation failed for %ds test: %s",
test_duration,
"; ".join(violations),
)
assert False, f"Performance bounds validation failed for {test_duration}s test: {'; '.join(violations)}"

self.logger.info(
"Successfully completed %ds batch production test with comprehensive metrics",
Expand Down Expand Up @@ -466,11 +469,15 @@ def message_formatter(msg_num):

# Validate against performance bounds
if not is_valid:
self.logger.warning(
self.logger.error(
"Performance bounds validation failed for %s compression: %s",
compression_type,
"; ".join(violations),
)
assert False, (
f"Performance bounds validation failed for {compression_type} compression: "
f"{'; '.join(violations)}"
)

self.logger.info(
"Successfully completed %s compression test with comprehensive metrics",
Expand Down Expand Up @@ -607,9 +614,10 @@ def message_formatter(i):
), f"Send throughput too low: {metrics_summary['send_throughput_msg_per_sec']:.2f} msg/s"

if not is_valid:
self.logger.warning(
self.logger.error(
"Performance bounds validation failed: %s", "; ".join(violations)
)
assert False, f"Performance bounds validation failed: {'; '.join(violations)}"

self.logger.info(
"Successfully completed SR production test with comprehensive metrics"
Expand Down