Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bulkerapp/app/batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (bc *BatchConsumerImpl) processFailed(firstPosition *kafka.TopicPartition,
}

}()
err = bc.topicManager.ensureTopic(bc.retryTopic, 1, bc.topicManager.RetryTopicConfig())
err = bc.topicManager.ensureTopic(bc.retryTopic, 1, bc.topicManager.config.TopicConfig("retry"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bc has config property too

if err != nil {
return counters, fmt.Errorf("failed to create retry topic %s: %v", bc.retryTopic, err)
}
Expand Down
2 changes: 1 addition & 1 deletion bulkerapp/app/stream_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (sc *StreamConsumerImpl) start() {
metrics.ConnectionMessageStatuses(sc.destination.Id(), sc.tableName, "deadLettered").Inc()
failedTopic = sc.config.KafkaDestinationsDeadLetterTopicName
} else {
err = sc.topicManager.ensureTopic(sc.retryTopic, 1, sc.topicManager.RetryTopicConfig())
err = sc.topicManager.ensureTopic(sc.retryTopic, 1, sc.topicManager.config.TopicConfig("retry"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sc has config property too

if err != nil {
sc.Errorf("failed to create retry topic %s: %v", sc.retryTopic, err)
}
Expand Down
40 changes: 5 additions & 35 deletions bulkerapp/app/topic_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,11 +339,7 @@ func (tm *TopicManager) processMetadata(metadata *kafka.Metadata, nonEmptyTopics
}
tm.allTopics = allTopics
tm.staleTopics = staleTopics
err := tm.ensureTopic(tm.config.KafkaDestinationsTopicName, tm.config.KafkaDestinationsTopicPartitions,
map[string]string{
"retention.ms": fmt.Sprint(tm.config.KafkaTopicRetentionHours * 60 * 60 * 1000),
"segment.ms": fmt.Sprint(tm.config.KafkaTopicSegmentHours * 60 * 60 * 1000),
})
err := tm.ensureTopic(tm.config.KafkaDestinationsTopicName, tm.config.KafkaDestinationsTopicPartitions, tm.config.TopicConfig("destination"))
if err != nil {
metrics.TopicManagerError("destination-topic_error").Inc()
tm.SystemErrorf("Failed to create destination topic [%s]: %v", tm.config.KafkaDestinationsTopicName, err)
Expand All @@ -352,22 +348,13 @@ func (tm *TopicManager) processMetadata(metadata *kafka.Metadata, nonEmptyTopics
metrics.TopicManagerError("destination-topic_error").Inc()
tm.SystemErrorf("Failed to create multi-threaded destination topic [%s]: %v", tm.config.KafkaDestinationsTopicName, err)
}
err = tm.ensureTopic(tm.config.KafkaDestinationsDeadLetterTopicName, 1, map[string]string{
"cleanup.policy": "delete,compact",
"retention.ms": fmt.Sprint(tm.config.KafkaDeadTopicRetentionHours * 60 * 60 * 1000),
"segment.ms": fmt.Sprint(tm.config.KafkaTopicSegmentHours * 60 * 60 * 1000),
})
err = tm.ensureTopic(tm.config.KafkaDestinationsDeadLetterTopicName, 1, tm.config.TopicConfig("dead"))
if err != nil {
metrics.TopicManagerError("destination-topic_error").Inc()
tm.SystemErrorf("Failed to create destination dead letter topic [%s]: %v", tm.config.KafkaDestinationsDeadLetterTopicName, err)
}
destinationsRetryTopicName := tm.config.KafkaDestinationsRetryTopicName
err = tm.ensureTopic(destinationsRetryTopicName, 1, map[string]string{
"cleanup.policy": "delete,compact",
"segment.bytes": fmt.Sprint(tm.config.KafkaRetryTopicSegmentBytes),
"retention.ms": fmt.Sprint(tm.config.KafkaRetryTopicRetentionHours * 60 * 60 * 1000),
"segment.ms": fmt.Sprint(tm.config.KafkaTopicSegmentHours * 60 * 60 * 1000),
})
err = tm.ensureTopic(destinationsRetryTopicName, 1, tm.config.TopicConfig("retry"))
if err != nil {
metrics.TopicManagerError("destination-topic_error").Inc()
tm.SystemErrorf("Failed to create destination retry topic [%s]: %v", destinationsRetryTopicName, err)
Expand Down Expand Up @@ -602,11 +589,7 @@ func (tm *TopicManager) createDestinationTopic(topic string, config map[string]s
errorType = "unknown stream mode"
return tm.NewError("Unknown stream mode: %s for topic: %s", mode, topic)
}
topicConfig := map[string]string{
"retention.ms": fmt.Sprint(tm.config.KafkaTopicRetentionHours * 60 * 60 * 1000),
"segment.ms": fmt.Sprint(tm.config.KafkaTopicSegmentHours * 60 * 60 * 1000),
"compression.type": tm.config.KafkaTopicCompression,
}
topicConfig := tm.config.TopicConfig("destination")
utils.MapPutAll(topicConfig, config)
topicRes, err := tm.kaftaAdminClient.CreateTopics(context.Background(), []kafka.TopicSpecification{
{
Expand Down Expand Up @@ -645,11 +628,7 @@ func (tm *TopicManager) createTopic(topic string, partitions int, config map[str
metrics.TopicManagerCreate(topic, "", "", "", "success", "").Inc()
}
}()
topicConfig := map[string]string{
"compression.type": tm.config.KafkaTopicCompression,
"retention.ms": fmt.Sprint(tm.config.KafkaTopicRetentionHours * 60 * 60 * 1000),
"segment.ms": fmt.Sprint(tm.config.KafkaTopicSegmentHours * 60 * 60 * 1000),
}
topicConfig := tm.config.TopicConfig("destination")
utils.MapPutAll(topicConfig, config)
topicRes, err := tm.kaftaAdminClient.CreateTopics(context.Background(), []kafka.TopicSpecification{
{
Expand Down Expand Up @@ -679,15 +658,6 @@ func (tm *TopicManager) createTopic(topic string, partitions int, config map[str
return nil
}

func (tm *TopicManager) RetryTopicConfig() map[string]string {
return map[string]string{
"cleanup.policy": "delete,compact",
"segment.bytes": fmt.Sprint(tm.config.KafkaRetryTopicSegmentBytes),
"retention.ms": fmt.Sprint(tm.config.KafkaRetryTopicRetentionHours * 60 * 60 * 1000),
"segment.ms": fmt.Sprint(tm.config.KafkaTopicSegmentHours * 60 * 60 * 1000),
}
}

func (tm *TopicManager) Refresh() {
select {
case tm.refreshChan <- true:
Expand Down
29 changes: 29 additions & 0 deletions kafkabase/kafka_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type KafkaConfig struct {
KafkaTopicCompression string `mapstructure:"KAFKA_TOPIC_COMPRESSION" default:"snappy"`
KafkaTopicRetentionHours int `mapstructure:"KAFKA_TOPIC_RETENTION_HOURS" default:"48"`
KafkaTopicSegmentHours int `mapstructure:"KAFKA_TOPIC_SEGMENT_HOURS" default:"24"`
KafkaAllowSegmentConfig bool `mapstructure:"KAFKA_ALLOW_SEGMENT_CONFIG" default:"true"`
KafkaTopicPrefix string `mapstructure:"KAFKA_TOPIC_PREFIX" default:""`
KafkaFetchMessageMaxBytes int `mapstructure:"KAFKA_FETCH_MESSAGE_MAX_BYTES" default:"1048576"`

Expand Down Expand Up @@ -91,6 +92,34 @@ func (ac *KafkaConfig) GetKafkaConfig() *kafka.ConfigMap {
return kafkaConfig
}

func (c *KafkaConfig) TopicConfig(mode string) map[string]string {
config := map[string]string{}

switch mode {
case "retry":
config["retention.ms"] = fmt.Sprint(c.KafkaTopicRetentionHours * 60 * 60 * 1000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KafkaRetryTopicRetentionHours

config["cleanup.policy"] = "delete,compact"

if c.KafkaAllowSegmentConfig {
config["segment.bytes"] = fmt.Sprint(c.KafkaRetryTopicSegmentBytes)
config["segment.ms"] = fmt.Sprint(c.KafkaTopicSegmentHours * 60 * 60 * 1000)
}
case "dead":
config["retention.ms"] = fmt.Sprint(c.KafkaDeadTopicRetentionHours * 60 * 60 * 1000)
config["cleanup.policy"] = "delete,compact"
if c.KafkaAllowSegmentConfig {
config["segment.ms"] = fmt.Sprint(c.KafkaTopicSegmentHours * 60 * 60 * 1000)
}
default:
config["compression.type"] = c.KafkaTopicCompression
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should set "compression.type" for all modes with map init

config["retention.ms"] = fmt.Sprint(c.KafkaTopicRetentionHours * 60 * 60 * 1000)
if c.KafkaAllowSegmentConfig {
config["segment.ms"] = fmt.Sprint(c.KafkaTopicSegmentHours * 60 * 60 * 1000)
}
}
return config
}

func (c *KafkaConfig) PostInit(settings *appbase.AppSettings) error {
return nil
}