diff --git a/writer.go b/writer.go index 3817bf53..ff006c9b 100644 --- a/writer.go +++ b/writer.go @@ -125,7 +125,7 @@ type Writer struct { BatchBytes int64 // Time limit on how often incomplete message batches will be flushed to - // kafka. + // kafka. This is ignored if SendPartialBatch is set to true // // The default is to flush at least every second. BatchTimeout time.Duration @@ -158,6 +158,13 @@ type Writer struct { // Defaults to false. Async bool + // SendPartialBatch forces WriteMessages to send a partial batch instead of + // blocking until a full batch is made. This is useful if you are already batching + // messages before the producer, and want to flush everything sent to WriteMessages + // without blocking while also still being synchronous. When set BatchTimeout does + // nothing. + SendPartialBatch bool + // An optional function called when the writer succeeds or fails the // delivery of messages to a kafka partition. When writing the messages // fails, the `err` parameter will be non-nil. @@ -276,6 +283,13 @@ type WriterConfig struct { // The default is to use a kafka default value of 1048576. BatchBytes int + // SendPartialBatch forces WriteMessages to send a partial batch instead of + // blocking until a full batch is made. This is useful if you are already batching + // messages before the producer, and want to flush everything sent to WriteMessages + // without blocking while also still being synchronous. When set BatchTimeout does + // nothing. + SendPartialBatch bool + // Time limit on how often incomplete message batches will be flushed to // kafka. // @@ -487,22 +501,23 @@ func NewWriter(config WriterConfig) *Writer { } w := &Writer{ - Addr: TCP(config.Brokers...), - Topic: config.Topic, - MaxAttempts: config.MaxAttempts, - BatchSize: config.BatchSize, - Balancer: config.Balancer, - BatchBytes: int64(config.BatchBytes), - BatchTimeout: config.BatchTimeout, - ReadTimeout: config.ReadTimeout, - WriteTimeout: config.WriteTimeout, - RequiredAcks: RequiredAcks(config.RequiredAcks), - Async: config.Async, - Logger: config.Logger, - ErrorLogger: config.ErrorLogger, - Transport: transport, - transport: transport, - writerStats: stats, + Addr: TCP(config.Brokers...), + Topic: config.Topic, + MaxAttempts: config.MaxAttempts, + BatchSize: config.BatchSize, + Balancer: config.Balancer, + BatchBytes: int64(config.BatchBytes), + BatchTimeout: config.BatchTimeout, + SendPartialBatch: config.SendPartialBatch, + ReadTimeout: config.ReadTimeout, + WriteTimeout: config.WriteTimeout, + RequiredAcks: RequiredAcks(config.RequiredAcks), + Async: config.Async, + Logger: config.Logger, + ErrorLogger: config.ErrorLogger, + Transport: transport, + transport: transport, + writerStats: stats, } if config.RequiredAcks == 0 { @@ -1059,13 +1074,26 @@ func (ptw *partitionWriter) writeMessages(msgs []Message, indexes []int32) map[* batches[batch] = append(batches[batch], i) } } + + // if we are sending partial batches and the current batch is not empty send + // the batch right away instead of lagging. + if ptw.w.SendPartialBatch && !ptw.currBatch.empty() { + ptw.currBatch.trigger() + ptw.queue.Put(ptw.currBatch) + ptw.currBatch = nil + } + return batches } // ptw.w can be accessed here because this is called with the lock ptw.mutex already held. func (ptw *partitionWriter) newWriteBatch() *writeBatch { batch := newWriteBatch(time.Now(), ptw.w.batchTimeout()) - ptw.w.spawn(func() { ptw.awaitBatch(batch) }) + // if we are sending partial batches we don't need to wait for a timeout + if !ptw.w.SendPartialBatch { + ptw.w.spawn(func() { ptw.awaitBatch(batch) }) + } + return batch } @@ -1239,6 +1267,11 @@ func (b *writeBatch) full(maxSize int, maxBytes int64) bool { return b.size >= maxSize || b.bytes >= maxBytes } +// empty returns if the batch has no data in it at all +func (b *writeBatch) empty() bool { + return b == nil || b.size == 0 +} + func (b *writeBatch) trigger() { close(b.ready) } diff --git a/writer_test.go b/writer_test.go index 6f894ecd..1b89dde7 100644 --- a/writer_test.go +++ b/writer_test.go @@ -450,6 +450,61 @@ func readPartition(topic string, partition int, offset int64) (msgs []Message, e } } +func testWriterPartailBatch(t *testing.T) { + topic := makeTopic() + createTopic(t, topic, 1) + defer deleteTopic(t, topic) + + offset, err := readOffset(topic, 0) + if err != nil { + t.Fatal(err) + } + + w := newTestWriter(WriterConfig{ + Topic: topic, + BatchBytes: 75, + SendPartialBatch: true, + Balancer: &RoundRobin{}, + }) + defer w.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := w.WriteMessages(ctx, []Message{ + // first batch + {Value: []byte("M0")}, // 25 Bytes + {Value: []byte("M1")}, // 25 Bytes + {Value: []byte("M2")}, // 25 Bytes + // second batch + {Value: []byte("M3")}, // 25 Bytes + }...); err != nil { + t.Error(err) + return + } + + if w.Stats().Writes != 2 { + t.Error("didn't create expected batches") + return + } + msgs, err := readPartition(topic, 0, offset) + if err != nil { + t.Error("error reading partition", err) + return + } + + if len(msgs) != 4 { + t.Error("bad messages in partition", msgs) + return + } + + for i, m := range msgs { + if string(m.Value) == "M"+strconv.Itoa(i) { + continue + } + t.Error("bad messages in partition", string(m.Value)) + } +} + func testWriterBatchBytes(t *testing.T) { topic := makeTopic() createTopic(t, topic, 1)