diff --git a/bulkerapp/app/batch_consumer.go b/bulkerapp/app/batch_consumer.go index 4afe446..ac290b9 100644 --- a/bulkerapp/app/batch_consumer.go +++ b/bulkerapp/app/batch_consumer.go @@ -294,7 +294,7 @@ func (bc *BatchConsumerImpl) processFailed(firstPosition *kafka.TopicPartition, } }() - err = bc.topicManager.ensureTopic(bc.retryTopic, 1, bc.topicManager.RetryTopicConfig()) + err = bc.topicManager.ensureTopic(bc.retryTopic, 1, bc.config.TopicConfig("retry")) if err != nil { return counters, fmt.Errorf("failed to create retry topic %s: %v", bc.retryTopic, err) } diff --git a/bulkerapp/app/stream_consumer.go b/bulkerapp/app/stream_consumer.go index 33e1760..8ca37d3 100644 --- a/bulkerapp/app/stream_consumer.go +++ b/bulkerapp/app/stream_consumer.go @@ -283,7 +283,7 @@ func (sc *StreamConsumerImpl) start() { metrics.ConnectionMessageStatuses(sc.destination.Id(), sc.tableName, "deadLettered").Inc() failedTopic = sc.config.KafkaDestinationsDeadLetterTopicName } else { - err = sc.topicManager.ensureTopic(sc.retryTopic, 1, sc.topicManager.RetryTopicConfig()) + err = sc.topicManager.ensureTopic(sc.retryTopic, 1, sc.config.TopicConfig("retry")) if err != nil { sc.Errorf("failed to create retry topic %s: %v", sc.retryTopic, err) } diff --git a/bulkerapp/app/topic_manager.go b/bulkerapp/app/topic_manager.go index e63f074..b3e5d22 100644 --- a/bulkerapp/app/topic_manager.go +++ b/bulkerapp/app/topic_manager.go @@ -339,11 +339,7 @@ func (tm *TopicManager) processMetadata(metadata *kafka.Metadata, nonEmptyTopics } tm.allTopics = allTopics tm.staleTopics = staleTopics - err := tm.ensureTopic(tm.config.KafkaDestinationsTopicName, tm.config.KafkaDestinationsTopicPartitions, - map[string]string{ - "retention.ms": fmt.Sprint(tm.config.KafkaTopicRetentionHours * 60 * 60 * 1000), - "segment.ms": fmt.Sprint(tm.config.KafkaTopicSegmentHours * 60 * 60 * 1000), - }) + err := tm.ensureTopic(tm.config.KafkaDestinationsTopicName, tm.config.KafkaDestinationsTopicPartitions, tm.config.TopicConfig("destination")) if err != nil { metrics.TopicManagerError("destination-topic_error").Inc() tm.SystemErrorf("Failed to create destination topic [%s]: %v", tm.config.KafkaDestinationsTopicName, err) @@ -352,22 +348,13 @@ func (tm *TopicManager) processMetadata(metadata *kafka.Metadata, nonEmptyTopics metrics.TopicManagerError("destination-topic_error").Inc() tm.SystemErrorf("Failed to create multi-threaded destination topic [%s]: %v", tm.config.KafkaDestinationsTopicName, err) } - err = tm.ensureTopic(tm.config.KafkaDestinationsDeadLetterTopicName, 1, map[string]string{ - "cleanup.policy": "delete,compact", - "retention.ms": fmt.Sprint(tm.config.KafkaDeadTopicRetentionHours * 60 * 60 * 1000), - "segment.ms": fmt.Sprint(tm.config.KafkaTopicSegmentHours * 60 * 60 * 1000), - }) + err = tm.ensureTopic(tm.config.KafkaDestinationsDeadLetterTopicName, 1, tm.config.TopicConfig("dead")) if err != nil { metrics.TopicManagerError("destination-topic_error").Inc() tm.SystemErrorf("Failed to create destination dead letter topic [%s]: %v", tm.config.KafkaDestinationsDeadLetterTopicName, err) } destinationsRetryTopicName := tm.config.KafkaDestinationsRetryTopicName - err = tm.ensureTopic(destinationsRetryTopicName, 1, map[string]string{ - "cleanup.policy": "delete,compact", - "segment.bytes": fmt.Sprint(tm.config.KafkaRetryTopicSegmentBytes), - "retention.ms": fmt.Sprint(tm.config.KafkaRetryTopicRetentionHours * 60 * 60 * 1000), - "segment.ms": fmt.Sprint(tm.config.KafkaTopicSegmentHours * 60 * 60 * 1000), - }) + err = tm.ensureTopic(destinationsRetryTopicName, 1, tm.config.TopicConfig("retry")) if err != nil { metrics.TopicManagerError("destination-topic_error").Inc() tm.SystemErrorf("Failed to create destination retry topic [%s]: %v", destinationsRetryTopicName, err) @@ -602,11 +589,7 @@ func (tm *TopicManager) createDestinationTopic(topic string, config map[string]s errorType = "unknown stream mode" return tm.NewError("Unknown stream mode: %s for topic: %s", mode, topic) } - topicConfig := map[string]string{ - "retention.ms": fmt.Sprint(tm.config.KafkaTopicRetentionHours * 60 * 60 * 1000), - "segment.ms": fmt.Sprint(tm.config.KafkaTopicSegmentHours * 60 * 60 * 1000), - "compression.type": tm.config.KafkaTopicCompression, - } + topicConfig := tm.config.TopicConfig("destination") utils.MapPutAll(topicConfig, config) topicRes, err := tm.kaftaAdminClient.CreateTopics(context.Background(), []kafka.TopicSpecification{ { @@ -645,11 +628,7 @@ func (tm *TopicManager) createTopic(topic string, partitions int, config map[str metrics.TopicManagerCreate(topic, "", "", "", "success", "").Inc() } }() - topicConfig := map[string]string{ - "compression.type": tm.config.KafkaTopicCompression, - "retention.ms": fmt.Sprint(tm.config.KafkaTopicRetentionHours * 60 * 60 * 1000), - "segment.ms": fmt.Sprint(tm.config.KafkaTopicSegmentHours * 60 * 60 * 1000), - } + topicConfig := tm.config.TopicConfig("destination") utils.MapPutAll(topicConfig, config) topicRes, err := tm.kaftaAdminClient.CreateTopics(context.Background(), []kafka.TopicSpecification{ { @@ -679,15 +658,6 @@ func (tm *TopicManager) createTopic(topic string, partitions int, config map[str return nil } -func (tm *TopicManager) RetryTopicConfig() map[string]string { - return map[string]string{ - "cleanup.policy": "delete,compact", - "segment.bytes": fmt.Sprint(tm.config.KafkaRetryTopicSegmentBytes), - "retention.ms": fmt.Sprint(tm.config.KafkaRetryTopicRetentionHours * 60 * 60 * 1000), - "segment.ms": fmt.Sprint(tm.config.KafkaTopicSegmentHours * 60 * 60 * 1000), - } -} - func (tm *TopicManager) Refresh() { select { case tm.refreshChan <- true: diff --git a/kafkabase/kafka_config.go b/kafkabase/kafka_config.go index b793dfd..b10194d 100644 --- a/kafkabase/kafka_config.go +++ b/kafkabase/kafka_config.go @@ -24,6 +24,7 @@ type KafkaConfig struct { KafkaTopicCompression string `mapstructure:"KAFKA_TOPIC_COMPRESSION" default:"snappy"` KafkaTopicRetentionHours int `mapstructure:"KAFKA_TOPIC_RETENTION_HOURS" default:"48"` KafkaTopicSegmentHours int `mapstructure:"KAFKA_TOPIC_SEGMENT_HOURS" default:"24"` + KafkaAllowSegmentConfig bool `mapstructure:"KAFKA_ALLOW_SEGMENT_CONFIG" default:"true"` KafkaTopicPrefix string `mapstructure:"KAFKA_TOPIC_PREFIX" default:""` KafkaFetchMessageMaxBytes int `mapstructure:"KAFKA_FETCH_MESSAGE_MAX_BYTES" default:"1048576"` @@ -94,6 +95,34 @@ func (ac *KafkaConfig) GetKafkaConfig() *kafka.ConfigMap { return kafkaConfig } +func (c *KafkaConfig) TopicConfig(mode string) map[string]string { + config := map[string]string{} + config["compression.type"] = c.KafkaTopicCompression + + switch mode { + case "retry": + config["retention.ms"] = fmt.Sprint(c.KafkaRetryTopicRetentionHours * 60 * 60 * 1000) + config["cleanup.policy"] = "delete,compact" + + if c.KafkaAllowSegmentConfig { + config["segment.bytes"] = fmt.Sprint(c.KafkaRetryTopicSegmentBytes) + config["segment.ms"] = fmt.Sprint(c.KafkaTopicSegmentHours * 60 * 60 * 1000) + } + case "dead": + config["retention.ms"] = fmt.Sprint(c.KafkaDeadTopicRetentionHours * 60 * 60 * 1000) + config["cleanup.policy"] = "delete,compact" + if c.KafkaAllowSegmentConfig { + config["segment.ms"] = fmt.Sprint(c.KafkaTopicSegmentHours * 60 * 60 * 1000) + } + default: + config["retention.ms"] = fmt.Sprint(c.KafkaTopicRetentionHours * 60 * 60 * 1000) + if c.KafkaAllowSegmentConfig { + config["segment.ms"] = fmt.Sprint(c.KafkaTopicSegmentHours * 60 * 60 * 1000) + } + } + return config +} + func (c *KafkaConfig) PostInit(settings *appbase.AppSettings) error { return nil }