Skip to content

Commit

Permalink
WIP: improve kafka replay speed
Browse files Browse the repository at this point in the history
Signed-off-by: Dimitar Dimitrov <[email protected]>
  • Loading branch information
dimitarvdimitrov committed Sep 19, 2024
1 parent bbd2a24 commit e28dbab
Show file tree
Hide file tree
Showing 38 changed files with 6,921 additions and 411 deletions.
50 changes: 50 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -6647,6 +6647,56 @@
"fieldDefaultValue": 20000000000,
"fieldFlag": "ingest-storage.kafka.wait-strong-read-consistency-timeout",
"fieldType": "duration"
},
{
"kind": "field",
"name": "replay_concurrency",
"required": false,
"desc": "The number of concurrent fetch requests that the ingester sends to kafka when catching up during startup.",
"fieldValue": null,
"fieldDefaultValue": 1,
"fieldFlag": "ingest-storage.kafka.replay-concurrency",
"fieldType": "int"
},
{
"kind": "field",
"name": "replay_shards",
"required": false,
"desc": "The number of concurrent appends to the TSDB head. 0 to disable.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "ingest-storage.kafka.replay-shards",
"fieldType": "int"
},
{
"kind": "field",
"name": "batch_size",
"required": false,
"desc": "The number of timeseries to batch together before ingesting into TSDB.",
"fieldValue": null,
"fieldDefaultValue": 128,
"fieldFlag": "ingest-storage.kafka.batch-size",
"fieldType": "int"
},
{
"kind": "field",
"name": "records_per_fetch",
"required": false,
"desc": "The number of records to fetch from Kafka in a single request.",
"fieldValue": null,
"fieldDefaultValue": 128,
"fieldFlag": "ingest-storage.kafka.records-per-fetch",
"fieldType": "int"
},
{
"kind": "field",
"name": "use_compressed_bytes_as_fetch_max_bytes",
"required": false,
"desc": "When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently.",
"fieldValue": null,
"fieldDefaultValue": true,
"fieldFlag": "ingest-storage.kafka.use-compressed-bytes-as-fetch-max-bytes",
"fieldType": "boolean"
}
],
"fieldValue": null,
Expand Down
10 changes: 10 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1351,6 +1351,8 @@ Usage of ./cmd/mimir/mimir:
When auto-creation of Kafka topic is enabled and this value is positive, Kafka's num.partitions configuration option is set on Kafka brokers with this value when Mimir component that uses Kafka starts. This configuration option specifies the default number of partitions that the Kafka broker uses for auto-created topics. Note that this is a Kafka-cluster wide setting, and applies to any auto-created topic. If the setting of num.partitions fails, Mimir proceeds anyways, but auto-created topics could have an incorrect number of partitions.
-ingest-storage.kafka.auto-create-topic-enabled
Enable auto-creation of Kafka topic if it doesn't exist. (default true)
-ingest-storage.kafka.batch-size int
The number of timeseries to batch together before ingesting into TSDB. (default 128)
-ingest-storage.kafka.client-id string
The Kafka client ID.
-ingest-storage.kafka.consume-from-position-at-startup string
Expand All @@ -1373,10 +1375,18 @@ Usage of ./cmd/mimir/mimir:
The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit. (default 1073741824)
-ingest-storage.kafka.producer-max-record-size-bytes int
The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes. (default 15983616)
-ingest-storage.kafka.records-per-fetch int
The number of records to fetch from Kafka in a single request. (default 128)
-ingest-storage.kafka.replay-concurrency int
The number of concurrent fetch requests that the ingester sends to kafka when catching up during startup. (default 1)
-ingest-storage.kafka.replay-shards int
The number of concurrent appends to the TSDB head. 0 to disable.
-ingest-storage.kafka.target-consumer-lag-at-startup duration
The best-effort maximum lag a consumer tries to achieve at startup. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 2s)
-ingest-storage.kafka.topic string
The Kafka topic name.
-ingest-storage.kafka.use-compressed-bytes-as-fetch-max-bytes
When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently. (default true)
-ingest-storage.kafka.wait-strong-read-consistency-timeout duration
The maximum allowed for a read requests processed by an ingester to wait until strong read consistency is enforced. 0 to disable the timeout. (default 20s)
-ingest-storage.kafka.write-clients int
Expand Down
10 changes: 10 additions & 0 deletions cmd/mimir/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,8 @@ Usage of ./cmd/mimir/mimir:
When auto-creation of Kafka topic is enabled and this value is positive, Kafka's num.partitions configuration option is set on Kafka brokers with this value when Mimir component that uses Kafka starts. This configuration option specifies the default number of partitions that the Kafka broker uses for auto-created topics. Note that this is a Kafka-cluster wide setting, and applies to any auto-created topic. If the setting of num.partitions fails, Mimir proceeds anyways, but auto-created topics could have an incorrect number of partitions.
-ingest-storage.kafka.auto-create-topic-enabled
Enable auto-creation of Kafka topic if it doesn't exist. (default true)
-ingest-storage.kafka.batch-size int
The number of timeseries to batch together before ingesting into TSDB. (default 128)
-ingest-storage.kafka.client-id string
The Kafka client ID.
-ingest-storage.kafka.consume-from-position-at-startup string
Expand All @@ -443,10 +445,18 @@ Usage of ./cmd/mimir/mimir:
The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit. (default 1073741824)
-ingest-storage.kafka.producer-max-record-size-bytes int
The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes. (default 15983616)
-ingest-storage.kafka.records-per-fetch int
The number of records to fetch from Kafka in a single request. (default 128)
-ingest-storage.kafka.replay-concurrency int
The number of concurrent fetch requests that the ingester sends to kafka when catching up during startup. (default 1)
-ingest-storage.kafka.replay-shards int
The number of concurrent appends to the TSDB head. 0 to disable.
-ingest-storage.kafka.target-consumer-lag-at-startup duration
The best-effort maximum lag a consumer tries to achieve at startup. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 2s)
-ingest-storage.kafka.topic string
The Kafka topic name.
-ingest-storage.kafka.use-compressed-bytes-as-fetch-max-bytes
When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently. (default true)
-ingest-storage.kafka.wait-strong-read-consistency-timeout duration
The maximum allowed for a read requests processed by an ingester to wait until strong read consistency is enforced. 0 to disable the timeout. (default 20s)
-ingest-storage.kafka.write-clients int
Expand Down
7 changes: 7 additions & 0 deletions development/mimir-ingest-storage/config/datasource-mimir.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,10 @@ datasources:
isDefault: true
jsonData:
prometheusType: Mimir
timeInterval: 5s
- name: Jaeger
type: jaeger
access: proxy
uid: jaeger
orgID: 1
url: http://jaeger:16686/
9 changes: 8 additions & 1 deletion development/mimir-ingest-storage/config/grafana-agent.flow
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ prometheus.scrape "metrics_local_mimir_read_write_mode_mimir_write" {
container = "mimir-write",
namespace = "mimir-read-write-mode",
}],
[{
__address__ = "mimir-write-zone-c-61:8080",
cluster = "docker-compose",
container = "mimir-write",
namespace = "mimir-read-write-mode",
job = "mimir-write-zone-c",
}],
)
forward_to = [prometheus.remote_write.metrics_local.receiver]
job_name = "mimir-read-write-mode/mimir-write"
Expand Down Expand Up @@ -97,7 +104,7 @@ prometheus.scrape "metrics_local_mimir_read_write_mode_mimir_backend" {
prometheus.remote_write "metrics_local" {
endpoint {
name = "local"
url = "http://mimir-write-zone-a-1:8080/api/v1/push"
url = "http://mimir-write-zone-a-2:8080/api/v1/push"
send_native_histograms = true

queue_config { }
Expand Down
11 changes: 8 additions & 3 deletions development/mimir-ingest-storage/config/mimir.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@ common:
ingest_storage:
enabled: true
kafka:
address: kafka:9092
address: kafka_1:9092
topic: mimir-ingest
last_produced_offset_poll_interval: 500ms
replay_concurrency: 3
replay_shards: 8

ingester:
track_ingester_owned_series: true
track_ingester_owned_series: false # suppress log messages in c-61 about empty ring; doesn't affect testing

partition_ring:
min_partition_owners_count: 2
min_partition_owners_count: 1
min_partition_owners_duration: 10s
delete_inactive_partition_after: 1m

Expand Down Expand Up @@ -99,3 +101,6 @@ limits:

runtime_config:
file: ./config/runtime.yaml

server:
log_level: debug
98 changes: 97 additions & 1 deletion development/mimir-ingest-storage/config/runtime.yaml
Original file line number Diff line number Diff line change
@@ -1 +1,97 @@
# This file can be used to set overrides or other runtime config.
distributor_limits:
max_inflight_push_requests: 2000
max_inflight_push_requests_bytes: 2.147483648e+09
ingester_limits:
max_inflight_push_requests: 30000
max_inflight_push_requests_bytes: 5.36870912e+08
max_ingestion_rate: 0
max_series: 3e+06
max_tenants: 500
overrides:
"17065":
compactor_block_upload_enabled: true
compactor_blocks_retention_period: 10000d
compactor_split_and_merge_shards: 4
compactor_split_groups: 8
compactor_tenant_shard_size: 0
ha_cluster_label: __cluster__
ingestion_burst_size: 2.25e+07
ingestion_partitions_tenant_shard_size: 174
ingestion_rate: 2.25e+06
ingestion_tenant_shard_size: 522
max_global_exemplars_per_user: 1e+06
max_global_metadata_per_metric: 10
max_global_metadata_per_user: 1e+07
max_global_series_per_user: 5e+07
max_labels_query_length: 32d
native_histograms_ingestion_enabled: true
query_sharding_total_shards: 16
ruler_max_rule_groups_per_tenant: 600
ruler_max_rules_per_rule_group: 150
ruler_tenant_shard_size: 8
store_gateway_tenant_shard_size: 24
"17923":
compactor_block_upload_enabled: true
compactor_blocks_retention_period: 10000d
compactor_split_and_merge_shards: 4
compactor_split_groups: 8
compactor_tenant_shard_size: 0
ha_cluster_label: __cluster__
ingestion_burst_size: 2.25e+07
ingestion_partitions_tenant_shard_size: 174
ingestion_rate: 2.25e+06
ingestion_tenant_shard_size: 522
max_global_exemplars_per_user: 1e+06
max_global_metadata_per_metric: 10
max_global_metadata_per_user: 1e+07
max_global_series_per_user: 5e+07
max_labels_query_length: 32d
native_histograms_ingestion_enabled: true
otel_metric_suffixes_enabled: true
query_sharding_total_shards: 16
ruler_max_rule_groups_per_tenant: 600
ruler_max_rules_per_rule_group: 150
ruler_tenant_shard_size: 8
store_gateway_tenant_shard_size: 24
"230448":
ingestion_burst_size: 1e+06
ingestion_partitions_tenant_shard_size: 0
ingestion_rate: 100000
ingestion_tenant_shard_size: 0
max_global_metadata_per_metric: 10
max_global_metadata_per_user: 200000
max_global_series_per_user: 1e+06
ruler_max_rule_groups_per_tenant: 140
ruler_max_rules_per_rule_group: 20
ruler_tenant_shard_size: 2
store_gateway_tenant_shard_size: 0
"295486":
ingestion_burst_size: 1e+06
ingestion_partitions_tenant_shard_size: 9
ingestion_rate: 100000
ingestion_tenant_shard_size: 27
max_global_exemplars_per_user: 1e+06
max_global_metadata_per_metric: 10
max_global_metadata_per_user: 200000
max_global_series_per_user: 1e+06
ruler_max_rule_groups_per_tenant: 140
ruler_max_rules_per_rule_group: 20
ruler_tenant_shard_size: 2
store_gateway_tenant_shard_size: 6
load-generator-1:
ingestion_burst_size: 1e+06
ingestion_partitions_tenant_shard_size: 9
ingestion_rate: 100000
ingestion_tenant_shard_size: 27
max_global_metadata_per_metric: 10
max_global_metadata_per_user: 200000
max_global_series_per_user: 1e+06
ruler_max_rule_groups_per_tenant: 140
ruler_max_rules_per_rule_group: 20
ruler_tenant_shard_size: 2
store_gateway_tenant_shard_size: 6
ruler_limits:
ruler_max_rule_groups_per_tenant_by_namespace:
asserts: 300
ruler_max_rules_per_rule_group_by_namespace:
asserts: 50
Loading

0 comments on commit e28dbab

Please sign in to comment.