Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,106 @@ blocks:
- python3 -m venv _venv && source _venv/bin/activate
- chmod u+r+x tools/source-package-verification.sh
- tools/source-package-verification.sh
- name: "Ducktape Performance Tests (Linux x64)"
dependencies: []
task:
agent:
machine:
type: s1-prod-ubuntu24-04-amd64-3
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, how is the machine type chosen?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"s1-prod-" this is a production grade system, which is used by other semaphore jobs. IMO, its already timetested for this repository, so we should keep using it.

env_vars:
- name: OS_NAME
value: linux
- name: ARCH
value: x64
- name: BENCHMARK_BOUNDS_CONFIG
value: tests/ducktape/benchmark_bounds.json
- name: BENCHMARK_ENVIRONMENT
value: ci
prologue:
commands:
- '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY'
jobs:
- name: Build and Tests
commands:
# Setup Python environment
- sem-version python 3.9
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess using an older python version will catch more perf issues but I worry new versions might have behavioral differences we'd miss. e.g. some of our C-bindings we use are deprecated in 3.13.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be ok with leaving a TODO to matrix this across a couple python versions down the line. What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense! We could add couple of python versions eg. 3.9, 3.11, 3.13

- python3 -m venv _venv && source _venv/bin/activate

# Install ducktape framework and additional dependencies
- pip install ducktape psutil

# Install existing test requirements
- pip install -r requirements/requirements-tests.txt

# Build and install confluent-kafka from source
- lib_dir=dest/runtimes/$OS_NAME-$ARCH/native
- tools/wheels/install-librdkafka.sh "${LIBRDKAFKA_VERSION#v}" dest
- export CFLAGS="$CFLAGS -I${PWD}/dest/build/native/include"
- export LDFLAGS="$LDFLAGS -L${PWD}/${lib_dir}"
- export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$PWD/$lib_dir"
- python3 -m pip install -e .

# Store project root for reliable navigation
- PROJECT_ROOT="${PWD}"

# Start Kafka cluster and Schema Registry using dedicated ducktape compose file (KRaft mode)
- cd "${PROJECT_ROOT}/tests/docker"
- docker-compose -f docker-compose.ducktape.yml up -d kafka schema-registry

# Debug: Check container status and logs
- echo "=== Container Status ==="
- docker-compose -f docker-compose.ducktape.yml ps
- echo "=== Kafka Logs ==="
- docker-compose -f docker-compose.ducktape.yml logs kafka | tail -50

# Wait for Kafka to be ready (using PLAINTEXT listener for external access)
- |
timeout 1800 bash -c '
counter=0
until docker-compose -f docker-compose.ducktape.yml exec -T kafka kafka-topics --bootstrap-server localhost:9092 --list >/dev/null 2>&1; do
echo "Waiting for Kafka... (attempt $((counter+1)))"

# Show logs every 4th attempt (every 20 seconds)
if [ $((counter % 4)) -eq 0 ] && [ $counter -gt 0 ]; then
echo "=== Recent Kafka Logs ==="
docker-compose -f docker-compose.ducktape.yml logs --tail=10 kafka
echo "=== Container Status ==="
docker-compose -f docker-compose.ducktape.yml ps kafka
fi

counter=$((counter+1))
sleep 5
done
'
- echo "Kafka cluster is ready!"

# Wait for Schema Registry to be ready
- echo "=== Waiting for Schema Registry ==="
- |
timeout 300 bash -c '
counter=0
until curl -f http://localhost:8081/subjects >/dev/null 2>&1; do
echo "Waiting for Schema Registry... (attempt $((counter+1)))"

# Show logs every 3rd attempt (every 15 seconds)
if [ $((counter % 3)) -eq 0 ] && [ $counter -gt 0 ]; then
echo "=== Recent Schema Registry Logs ==="
docker-compose -f docker-compose.ducktape.yml logs --tail=10 schema-registry
echo "=== Schema Registry Container Status ==="
docker-compose -f docker-compose.ducktape.yml ps schema-registry
fi

counter=$((counter+1))
sleep 5
done
'
- echo "Schema Registry is ready!"

# Run standard ducktape tests with CI bounds
- cd "${PROJECT_ROOT}" && PYTHONPATH="${PROJECT_ROOT}" python tests/ducktape/run_ducktape_test.py

# Cleanup
- cd "${PROJECT_ROOT}/tests/docker" && docker-compose -f docker-compose.ducktape.yml down -v || true
- name: "Packaging"
run:
when: "tag =~ '.*'"
Expand Down
36 changes: 36 additions & 0 deletions tests/docker/docker-compose.ducktape.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
services:
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka-ducktape
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093,PLAINTEXT_HOST://0.0.0.0:29092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_HOST://dockerhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
CLUSTER_ID: 4L6g3nShT-eMCtK--X86sw
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my knowledge, is this the cluster dedicated for kafka testing? https://github.com/search?q=org%3Aconfluentinc%204L6g3nShT-eMCtK--X86sw&type=code

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes Each CI job spins up its own isolated Kafka container with this ID


schema-registry:
image: confluentinc/cp-schema-registry:latest
container_name: schema-registry-ducktape
depends_on:
- kafka
ports:
- "8081:8081"
extra_hosts:
- "dockerhost:172.17.0.1"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: dockerhost:29092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
SCHEMA_REGISTRY_KAFKASTORE_TOPIC_REPLICATION_FACTOR: 1
SCHEMA_REGISTRY_DEBUG: 'true'
60 changes: 46 additions & 14 deletions tests/ducktape/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ Ducktape-based producer tests for the Confluent Kafka Python client with compreh
## Prerequisites

- `pip install ducktape confluent-kafka psutil`
- Kafka running on `localhost:9092`
- Kafka running on `localhost:9092` (PLAINTEXT listener - ducktape tests use the simple port)
- Schema Registry running on `localhost:8081` (uses `host.docker.internal:29092` for Kafka connection)

## Running Tests

Expand Down Expand Up @@ -37,29 +38,60 @@ Every test automatically includes:

## Configuration

Performance bounds are loaded from a JSON config file. By default, it loads `benchmark_bounds.json`, but you can override this with the `BENCHMARK_BOUNDS_CONFIG` environment variable:
Performance bounds are loaded from an environment-based JSON config file. By default, it loads `benchmark_bounds.json`, but you can override this with the `BENCHMARK_BOUNDS_CONFIG` environment variable.

### Environment-Based Configuration

The bounds configuration supports different environments with different performance thresholds:

```json
{
"min_throughput_msg_per_sec": 1500.0,
"max_p95_latency_ms": 1500.0,
"max_error_rate": 0.01,
"min_success_rate": 0.99,
"max_p99_latency_ms": 2500.0,
"max_memory_growth_mb": 600.0,
"max_buffer_full_rate": 0.03,
"min_messages_per_poll": 15.0
"_comment": "Performance bounds for benchmark tests by environment",
"local": {
"_comment": "Default bounds for local development - more relaxed thresholds",
"min_throughput_msg_per_sec": 1000.0,
"max_p95_latency_ms": 2000.0,
"max_error_rate": 0.02,
"min_success_rate": 0.98,
"max_p99_latency_ms": 3000.0,
"max_memory_growth_mb": 800.0,
"max_buffer_full_rate": 0.05,
"min_messages_per_poll": 10.0
},
"ci": {
"_comment": "Stricter bounds for CI environment - production-like requirements",
"min_throughput_msg_per_sec": 1500.0,
"max_p95_latency_ms": 1500.0,
"max_error_rate": 0.01,
"min_success_rate": 0.99,
"max_p99_latency_ms": 2500.0,
"max_memory_growth_mb": 600.0,
"max_buffer_full_rate": 0.03,
"min_messages_per_poll": 15.0
},
"_default_environment": "local"
}
```

### Environment Selection

- **BENCHMARK_ENVIRONMENT**: Selects which environment bounds to use (`local`, `ci`, etc.)
- **Default**: Uses "local" environment if not specified
- **CI**: Automatically uses "ci" environment in CI pipelines

Usage:
```bash
# Use default config file
# Use default environment (local)
./run_ducktape_test.py

# Use different configs for different environments
BENCHMARK_BOUNDS_CONFIG=ci_bounds.json ./run_ducktape_test.py
BENCHMARK_BOUNDS_CONFIG=production_bounds.json ./run_ducktape_test.py
# Explicitly use local environment
BENCHMARK_ENVIRONMENT=local ./run_ducktape_test.py

# Use CI environment with stricter bounds
BENCHMARK_ENVIRONMENT=ci ./run_ducktape_test.py

# Use different config file entirely
BENCHMARK_BOUNDS_CONFIG=custom_bounds.json ./run_ducktape_test.py
```

```python
Expand Down
33 changes: 24 additions & 9 deletions tests/ducktape/benchmark_bounds.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,26 @@
{
"_comment": "Default performance bounds for benchmark tests",
"min_throughput_msg_per_sec": 1500.0,
"max_p95_latency_ms": 1500.0,
"max_error_rate": 0.01,
"min_success_rate": 0.99,
"max_p99_latency_ms": 2500.0,
"max_memory_growth_mb": 600.0,
"max_buffer_full_rate": 0.03,
"min_messages_per_poll": 15.0
"_comment": "Performance bounds for benchmark tests by environment",
"local": {
"_comment": "Default bounds for local development - more relaxed thresholds",
"min_throughput_msg_per_sec": 1000.0,
"max_p95_latency_ms": 2000.0,
"max_error_rate": 0.02,
"min_success_rate": 0.98,
"max_p99_latency_ms": 3000.0,
"max_memory_growth_mb": 800.0,
"max_buffer_full_rate": 0.05,
"min_messages_per_poll": 10.0
},
"ci": {
"_comment": "Stricter bounds for CI environment - production-like requirements",
"min_throughput_msg_per_sec": 1500.0,
"max_p95_latency_ms": 1500.0,
"max_error_rate": 0.01,
"min_success_rate": 0.99,
"max_p99_latency_ms": 2500.0,
"max_memory_growth_mb": 600.0,
"max_buffer_full_rate": 0.03,
"min_messages_per_poll": 10.0
},
"_default_environment": "local"
}
18 changes: 16 additions & 2 deletions tests/ducktape/benchmark_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,22 @@ def _load_from_config_file(self, config_path: str):
with open(config_path, 'r') as f:
config = json.load(f)

# Set values from config file
for key, value in config.items():
# Always use environment-based format
environment = os.getenv('BENCHMARK_ENVIRONMENT',
config.get('_default_environment', 'local'))

if environment not in config:
available_envs = [k for k in config.keys() if not k.startswith('_')]
raise ValueError(
f"Environment '{environment}' not found in config. "
f"Available environments: {available_envs}"
)

bounds_config = config[environment]
print(f"Loading benchmark bounds for environment: {environment}")

# Set values from the selected configuration
for key, value in bounds_config.items():
if not key.startswith('_'): # Skip comment fields like "_comment"
setattr(self, key, value)

Expand Down
15 changes: 13 additions & 2 deletions tests/ducktape/run_ducktape_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import os
import subprocess
import tempfile
import time

import ducktape

Expand All @@ -21,9 +22,10 @@ def create_cluster_config():

def create_test_config():
"""Create test configuration file"""
timestamp = int(time.time())
config = {
"ducktape_dir": os.path.dirname(os.path.abspath(__file__)),
"results_dir": os.path.join(tempfile.gettempdir(), "ducktape_results")
"results_dir": os.path.join(tempfile.gettempdir(), f"ducktape_results_{timestamp}")
}
return config

Expand All @@ -33,7 +35,16 @@ def main():
print("Confluent Kafka Python - Ducktape Producer Test Runner")
print("=" * 60)

print(f"Using ducktape version: {ducktape.__version__}")
try:
print(f"Using ducktape version: {ducktape.__version__}")
except AttributeError:
# Some ducktape versions don't have __version__, try alternative methods
try:
import pkg_resources
version = pkg_resources.get_distribution('ducktape').version
print(f"Using ducktape version: {version}")
except:
print("Using ducktape version: unknown")

# Check if confluent_kafka is available
try:
Expand Down
13 changes: 8 additions & 5 deletions tests/ducktape/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ def delivery_callback(err, msg):

# Validate against performance bounds
if not is_valid:
self.logger.warning("Performance bounds validation failed: %s", "; ".join(violations))
self.logger.error("Performance bounds validation failed: %s", "; ".join(violations))
assert False, f"Performance bounds validation failed: {'; '.join(violations)}"

self.logger.info("Successfully completed basic production test with comprehensive metrics")

Expand Down Expand Up @@ -255,8 +256,9 @@ def delivery_callback(err, msg):

# Validate against performance bounds
if not is_valid:
self.logger.warning("Performance bounds validation failed for %ds test: %s",
test_duration, "; ".join(violations))
self.logger.error("Performance bounds validation failed for %ds test: %s",
test_duration, "; ".join(violations))
assert False, f"Performance bounds validation failed for {test_duration}s test: {'; '.join(violations)}"

self.logger.info("Successfully completed %ds batch production test with comprehensive metrics", test_duration)

Expand Down Expand Up @@ -369,8 +371,9 @@ def delivery_callback(err, msg):

# Validate against performance bounds
if not is_valid:
self.logger.warning("Performance bounds validation failed for %s compression: %s",
compression_type, "; ".join(violations))
self.logger.error("Performance bounds validation failed for %s compression: %s",
compression_type, "; ".join(violations))
assert False, f"Performance bounds validation failed for {compression_type} compression: {'; '.join(violations)}"

self.logger.info("Successfully completed %s compression test with comprehensive metrics", compression_type)

Expand Down