Skip to content

Commit

Permalink
Add write timeout configuration for Kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
hackerwins committed Feb 12, 2025
1 parent 4ab985d commit e18d764
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 18 deletions.
13 changes: 10 additions & 3 deletions cmd/yorkie/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ var (
authWebhookCacheTTL time.Duration
projectCacheTTL time.Duration

kafkaAddresses string
kafkaTopic string
kafkaAddresses string
kafkaTopic string
kafkaWriteTimeout time.Duration

conf = server.NewConfig()
)
Expand Down Expand Up @@ -379,9 +380,15 @@ func init() {
cmd.Flags().StringVar(
&kafkaTopic,
"kafka-topic",
"",
server.DefaultKafkaTopic,
"Kafka topic name to publish events",
)
cmd.Flags().DurationVar(
&kafkaWriteTimeout,
"kafka-write-timeout",
server.DefaultKafkaWriteTimeout,
"Timeout for writing messages to Kafka",
)

rootCmd.AddCommand(cmd)
}
28 changes: 26 additions & 2 deletions server/backend/messagebroker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"net/url"
"strings"
"time"
)

var (
Expand All @@ -29,12 +30,31 @@ var (

// ErrEmptyTopic is returned when the topic is empty.
ErrEmptyTopic = errors.New("topic cannot be empty")

// ErrInvalidDuration is returned when the duration is invalid.
ErrInvalidDuration = errors.New("invalid duration")
)

// Config is the configuration for creating a message broker instance.
type Config struct {
Addresses string `yaml:"Addresses"`
Topic string `yaml:"Topic"`
Addresses string `yaml:"Addresses"`
Topic string `yaml:"Topic"`
WriteTimeout string `yaml:"WriteTimeout"`
}

// SplitAddresses splits the addresses by comma.
func (c *Config) SplitAddresses() []string {
return strings.Split(c.Addresses, ",")
}

// MustParseWriteTimeout parses the write timeout and returns the duration.
func (c *Config) MustParseWriteTimeout() time.Duration {
d, err := time.ParseDuration(c.WriteTimeout)
if err != nil {
panic(ErrInvalidDuration)
}

return d
}

// Validate validates this config.
Expand All @@ -58,5 +78,9 @@ func (c *Config) Validate() error {
return ErrEmptyTopic
}

if _, err := time.ParseDuration(c.WriteTimeout); err != nil {
return fmt.Errorf(`parse write timeout "%s": %w`, c.WriteTimeout, ErrInvalidDuration)
}

return nil
}
51 changes: 51 additions & 0 deletions server/backend/messagebroker/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2025 The Yorkie Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package messagebroker_test

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/yorkie-team/yorkie/server/backend/messagebroker"
)

func TestConfig(t *testing.T) {
t.Run("test split addresses", func(t *testing.T) {
c := &messagebroker.Config{
Addresses: "localhost:8080,localhost:8081",
}
addrs := c.SplitAddresses()
assert.Equal(t, []string{"localhost:8080", "localhost:8081"}, addrs)
})

t.Run("test must parse write timeout", func(t *testing.T) {
c := &messagebroker.Config{
WriteTimeout: "1s",
}
assert.Equal(t, time.Second, c.MustParseWriteTimeout())
})

t.Run("test must parse write timeout with invalid duration", func(t *testing.T) {
c := &messagebroker.Config{
WriteTimeout: "1",
}
assert.PanicsWithError(t, messagebroker.ErrInvalidDuration.Error(), func() {
c.MustParseWriteTimeout()
})
})
}
20 changes: 11 additions & 9 deletions server/backend/messagebroker/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,20 @@ import (

// KafkaBroker is a producer for Kafka.
type KafkaBroker struct {
conf *Config
writer *kafka.Writer
}

// newKafkaBroker creates a new instance of KafkaProducer.
func newKafkaBroker(addresses []string, topic string) *KafkaBroker {
func newKafkaBroker(conf *Config) *KafkaBroker {
return &KafkaBroker{
conf: conf,
writer: &kafka.Writer{
Addr: kafka.TCP(addresses...),
Topic: topic,
Balancer: &kafka.LeastBytes{},
Async: true,
Addr: kafka.TCP(conf.SplitAddresses()...),
Topic: conf.Topic,
WriteTimeout: conf.MustParseWriteTimeout(),
Balancer: &kafka.LeastBytes{},
Async: true,
},
}
}
Expand All @@ -47,12 +50,11 @@ func (mb *KafkaBroker) Produce(
) error {
value, err := msg.Marshal()
if err != nil {
return fmt.Errorf("marshal message: %v", err)
return fmt.Errorf("marshal message: %w", err)
}

// TODO(hackerwins): Consider using message batching.
if err := mb.writer.WriteMessages(ctx, kafka.Message{Value: value}); err != nil {
return fmt.Errorf("write message to kafka: %v", err)
return fmt.Errorf("write message to kafka: %w", err)
}

return nil
Expand All @@ -61,7 +63,7 @@ func (mb *KafkaBroker) Produce(
// Close closes the KafkaProducer.
func (mb *KafkaBroker) Close() error {
if err := mb.writer.Close(); err != nil {
return fmt.Errorf("close kafka writer: %v", err)
return fmt.Errorf("close kafka writer: %w", err)
}

return nil
Expand Down
7 changes: 3 additions & 4 deletions server/backend/messagebroker/messagebroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/yorkie-team/yorkie/api/types/events"
Expand All @@ -35,10 +34,10 @@ type Message interface {

// UserEventMessage represents a message for user events
type UserEventMessage struct {
ProjectID string `json:"project_id"`
EventType events.ClientEventType `json:"event_type"`
UserID string `json:"user_id"`
Timestamp time.Time `json:"timestamp"`
EventType events.ClientEventType `json:"event_type"`
ProjectID string `json:"project_id"`
UserAgent string `json:"user_agent"`
}

Expand Down Expand Up @@ -74,5 +73,5 @@ func Ensure(kafkaConf *Config) Broker {
kafkaConf.Topic,
)

return newKafkaBroker(strings.Split(kafkaConf.Addresses, ","), kafkaConf.Topic)
return newKafkaBroker(kafkaConf)
}
3 changes: 3 additions & 0 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ const (
DefaultMongoPingTimeout = 5 * time.Second
DefaultMongoYorkieDatabase = "yorkie-meta"

DefaultKafkaTopic = "user-events"
DefaultKafkaWriteTimeout = 5 * time.Second

DefaultAdminUser = "admin"
DefaultAdminPassword = "admin"
DefaultSecretKey = "yorkie-secret"
Expand Down
3 changes: 3 additions & 0 deletions server/config.sample.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,6 @@ Kafka:

# Topic is the message broker topic to use.
Topic: "user-events"

# WriteTimeout is the timeout for writing to the message broker.
WriteTimeout: "5s"

0 comments on commit e18d764

Please sign in to comment.