Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added SendPartialBatch Setting #1339

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 51 additions & 18 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ type Writer struct {
BatchBytes int64

// Time limit on how often incomplete message batches will be flushed to
// kafka.
// kafka. This is ignored if SendPartialBatch is set to true
//
// The default is to flush at least every second.
BatchTimeout time.Duration
Expand Down Expand Up @@ -158,6 +158,13 @@ type Writer struct {
// Defaults to false.
Async bool

// SendPartialBatch forces WriteMessages to send a partial batch instead of
// blocking until a full batch is made. This is useful if you are already batching
// messages before the producer, and want to flush everything sent to WriteMessages
// without blocking while also still being synchronous. When set BatchTimeout does
// nothing.
SendPartialBatch bool

// An optional function called when the writer succeeds or fails the
// delivery of messages to a kafka partition. When writing the messages
// fails, the `err` parameter will be non-nil.
Expand Down Expand Up @@ -276,6 +283,13 @@ type WriterConfig struct {
// The default is to use a kafka default value of 1048576.
BatchBytes int

// SendPartialBatch forces WriteMessages to send a partial batch instead of
// blocking until a full batch is made. This is useful if you are already batching
// messages before the producer, and want to flush everything sent to WriteMessages
// without blocking while also still being synchronous. When set BatchTimeout does
// nothing.
SendPartialBatch bool

// Time limit on how often incomplete message batches will be flushed to
// kafka.
//
Expand Down Expand Up @@ -487,22 +501,23 @@ func NewWriter(config WriterConfig) *Writer {
}

w := &Writer{
Addr: TCP(config.Brokers...),
Topic: config.Topic,
MaxAttempts: config.MaxAttempts,
BatchSize: config.BatchSize,
Balancer: config.Balancer,
BatchBytes: int64(config.BatchBytes),
BatchTimeout: config.BatchTimeout,
ReadTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout,
RequiredAcks: RequiredAcks(config.RequiredAcks),
Async: config.Async,
Logger: config.Logger,
ErrorLogger: config.ErrorLogger,
Transport: transport,
transport: transport,
writerStats: stats,
Addr: TCP(config.Brokers...),
Topic: config.Topic,
MaxAttempts: config.MaxAttempts,
BatchSize: config.BatchSize,
Balancer: config.Balancer,
BatchBytes: int64(config.BatchBytes),
BatchTimeout: config.BatchTimeout,
SendPartialBatch: config.SendPartialBatch,
ReadTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout,
RequiredAcks: RequiredAcks(config.RequiredAcks),
Async: config.Async,
Logger: config.Logger,
ErrorLogger: config.ErrorLogger,
Transport: transport,
transport: transport,
writerStats: stats,
}

if config.RequiredAcks == 0 {
Expand Down Expand Up @@ -1059,13 +1074,26 @@ func (ptw *partitionWriter) writeMessages(msgs []Message, indexes []int32) map[*
batches[batch] = append(batches[batch], i)
}
}

// if we are sending partial batches and the current batch is not empty send
// the batch right away instead of lagging.
if ptw.w.SendPartialBatch && !ptw.currBatch.empty() {
ptw.currBatch.trigger()
ptw.queue.Put(ptw.currBatch)
ptw.currBatch = nil
}

return batches
}

// ptw.w can be accessed here because this is called with the lock ptw.mutex already held.
func (ptw *partitionWriter) newWriteBatch() *writeBatch {
batch := newWriteBatch(time.Now(), ptw.w.batchTimeout())
ptw.w.spawn(func() { ptw.awaitBatch(batch) })
// if we are sending partial batches we don't need to wait for a timeout
if !ptw.w.SendPartialBatch {
ptw.w.spawn(func() { ptw.awaitBatch(batch) })
}

return batch
}

Expand Down Expand Up @@ -1239,6 +1267,11 @@ func (b *writeBatch) full(maxSize int, maxBytes int64) bool {
return b.size >= maxSize || b.bytes >= maxBytes
}

// empty returns if the batch has no data in it at all
func (b *writeBatch) empty() bool {
return b == nil || b.size == 0
}

func (b *writeBatch) trigger() {
close(b.ready)
}
Expand Down
55 changes: 55 additions & 0 deletions writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,61 @@ func readPartition(topic string, partition int, offset int64) (msgs []Message, e
}
}

func testWriterPartailBatch(t *testing.T) {
topic := makeTopic()
createTopic(t, topic, 1)
defer deleteTopic(t, topic)

offset, err := readOffset(topic, 0)
if err != nil {
t.Fatal(err)
}

w := newTestWriter(WriterConfig{
Topic: topic,
BatchBytes: 75,
SendPartialBatch: true,
Balancer: &RoundRobin{},
})
defer w.Close()

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := w.WriteMessages(ctx, []Message{
// first batch
{Value: []byte("M0")}, // 25 Bytes
{Value: []byte("M1")}, // 25 Bytes
{Value: []byte("M2")}, // 25 Bytes
// second batch
{Value: []byte("M3")}, // 25 Bytes
}...); err != nil {
t.Error(err)
return
}

if w.Stats().Writes != 2 {
t.Error("didn't create expected batches")
return
}
msgs, err := readPartition(topic, 0, offset)
if err != nil {
t.Error("error reading partition", err)
return
}

if len(msgs) != 4 {
t.Error("bad messages in partition", msgs)
return
}

for i, m := range msgs {
if string(m.Value) == "M"+strconv.Itoa(i) {
continue
}
t.Error("bad messages in partition", string(m.Value))
}
}

func testWriterBatchBytes(t *testing.T) {
topic := makeTopic()
createTopic(t, topic, 1)
Expand Down