Skip to content

Commit

Permalink
Kafka source offset policy
Browse files Browse the repository at this point in the history
Allow to specify offset policy to read topic either at start or at end of a data, this simplify initial setup and allow to crop bad data from the past.

closes #127

---

Pull Request resolved: #128

Co-authored-by: tserakhau <[email protected]>
commit_hash:fa95aafd98a46c99c8a7127b770109fa32150d78
  • Loading branch information
laskoviymishka authored and robot-piglet committed Dec 4, 2024
1 parent a4219e2 commit e18c046
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 0 deletions.
10 changes: 10 additions & 0 deletions pkg/providers/kafka/model_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,18 @@ type KafkaSource struct {
ParserConfig map[string]interface{}
IsHomo bool // enabled kafka mirror protocol which can work only with kafka target
SynchronizeIsNeeded bool // true, if we need to send synchronize events on releasing partitions

OffsetPolicy OffsetPolicy // specify from what topic part start message consumption
}

type OffsetPolicy string

const (
NoOffsetPolicy = OffsetPolicy("") // Not specified
AtStartOffsetPolicy = OffsetPolicy("at_start")
AtEndOffsetPolicy = OffsetPolicy("at_end")
)

var _ model.Source = (*KafkaSource)(nil)

func (s *KafkaSource) MDBClusterID() string {
Expand Down
5 changes: 5 additions & 0 deletions pkg/providers/kafka/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,11 @@ func newSourceWithCallbacks(cfg *KafkaSource, logger log.Logger, registry metric
}),
kgo.ConsumeTopics(topics...),
)
if cfg.OffsetPolicy == AtStartOffsetPolicy {
opts = append(opts, kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()))
} else if cfg.OffsetPolicy == AtEndOffsetPolicy {
opts = append(opts, kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd()))
}

kfClient, err := kgo.NewClient(opts...)
if err != nil {
Expand Down
56 changes: 56 additions & 0 deletions pkg/providers/kafka/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,59 @@ func TestNonExistsTopic(t *testing.T) {
_, err = NewSource("asd", kafkaSource, logger.Log, solomon.NewRegistry(solomon.NewRegistryOpts()))
require.Error(t, err)
}

func TestOffsetPolicy(t *testing.T) {
parserConfigMap, err := parsers.ParserConfigStructToMap(&jsonparser.ParserConfigJSONCommon{
Fields: []abstract.ColSchema{{ColumnName: "ts", DataType: "DateTime"}, {ColumnName: "msg", DataType: "string"}},
AddRest: false,
AddDedupeKeys: true,
})
require.NoError(t, err)
kafkaSource, err := SourceRecipe()
require.NoError(t, err)
kafkaSource.Topic = "topic2"
kafkaSource.ParserConfig = parserConfigMap

kafkaClient, err := client.NewClient(kafkaSource.Connection.Brokers, nil, nil)
require.NoError(t, err)
require.NoError(t, kafkaClient.CreateTopicIfNotExist(logger.Log, kafkaSource.Topic, nil))

lgr, closer, err := logger.NewKafkaLogger(&logger.KafkaConfig{
Broker: kafkaSource.Connection.Brokers[0],
Topic: kafkaSource.Topic,
User: kafkaSource.Auth.User,
Password: kafkaSource.Auth.Password,
})
require.NoError(t, err)

defer closer.Close()
for i := 0; i < 3; i++ {
lgr.Infof("log item: %v", i)
}
time.Sleep(time.Second) // just in case

kafkaSource.OffsetPolicy = AtStartOffsetPolicy // Will read old item (1, 2 and 3)
src, err := NewSource("asd", kafkaSource, logger.Log, solomon.NewRegistry(solomon.NewRegistryOpts()))
require.NoError(t, err)
items, err := src.Fetch()
require.NoError(t, err)
src.Stop()
require.True(t, len(items) >= 3) // At least 3 old items
abstract.Dump(items)

go func() {
time.Sleep(time.Second)
for i := 3; i < 5; i++ {
lgr.Infof("log item: %v", i)
}
}()

kafkaSource.OffsetPolicy = AtEndOffsetPolicy // Will read only new items (3 and 4)
src, err = NewSource("asd", kafkaSource, logger.Log, solomon.NewRegistry(solomon.NewRegistryOpts()))
require.NoError(t, err)
items, err = src.Fetch()
require.NoError(t, err)
src.Stop()
abstract.Dump(items)
require.Len(t, items, 2)
}

0 comments on commit e18c046

Please sign in to comment.