Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry producing on next partition if possible when a partition is not… #887

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pulsar/default_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewDefaultRouter(
maxBatchingMessages uint,
maxBatchingSize uint,
maxBatchingDelay time.Duration,
disableBatching bool) func(*ProducerMessage, uint32) int {
shouldBatch func() bool) func(*ProducerMessage, uint32) int {
state := &defaultRouter{
currentPartitionCursor: rand.Uint32(),
lastBatchTimestamp: time.Now().UnixNano(),
Expand All @@ -65,7 +65,7 @@ func NewDefaultRouter(
}

// If there's no key, we do round-robin across partition. If no batching go to next partition.
if disableBatching {
if !shouldBatch() {
p := int(state.currentPartitionCursor % numPartitions)
atomic.AddUint32(&state.currentPartitionCursor, 1)
return p
Expand Down
6 changes: 5 additions & 1 deletion pulsar/default_router_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,9 @@ func newBenchDefaultRouter() func(*ProducerMessage, uint32) int {
maxBatchingSize = 524288
maxBatchingDelay = 100 * time.Millisecond
)
return NewDefaultRouter(internal.JavaStringHash, maxBatchingMessages, maxBatchingSize, maxBatchingDelay, false)
return NewDefaultRouter(internal.JavaStringHash,
maxBatchingMessages, maxBatchingSize,
maxBatchingDelay, func() bool {
return true
})
}
29 changes: 23 additions & 6 deletions pulsar/default_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (
const oneHourPublishMaxDelay = time.Hour

func TestDefaultRouterRoutingBecauseBatchingDisabled(t *testing.T) {
router := NewDefaultRouter(internal.JavaStringHash, 20, 100, oneHourPublishMaxDelay, true)
router := NewDefaultRouter(internal.JavaStringHash, 20, 100, oneHourPublishMaxDelay, func() bool {
return false
})
const numPartitions = uint32(3)
p1 := router(&ProducerMessage{
Payload: []byte("message 1"),
Expand All @@ -47,7 +49,10 @@ func TestDefaultRouterRoutingBecauseBatchingDisabled(t *testing.T) {

func TestDefaultRouterRoutingBecauseMaxPublishDelayReached(t *testing.T) {
maxPublishDelay := time.Nanosecond * 10
router := NewDefaultRouter(internal.JavaStringHash, 10, 100, maxPublishDelay, false)
router := NewDefaultRouter(internal.JavaStringHash, 10, 100,
maxPublishDelay, func() bool {
return true
})
const numPartitions = uint32(3)
p1 := router(&ProducerMessage{
Payload: []byte("message 1"),
Expand All @@ -67,7 +72,9 @@ func TestDefaultRouterRoutingBecauseMaxPublishDelayReached(t *testing.T) {
}

func TestDefaultRouterRoutingBecauseMaxNumberOfMessagesReached(t *testing.T) {
router := NewDefaultRouter(internal.JavaStringHash, 2, 100, oneHourPublishMaxDelay, false)
router := NewDefaultRouter(internal.JavaStringHash, 2, 100, oneHourPublishMaxDelay, func() bool {
return true
})
const numPartitions = uint32(3)
p1 := router(&ProducerMessage{
Payload: []byte("message 1"),
Expand All @@ -90,7 +97,11 @@ func TestDefaultRouterRoutingBecauseMaxNumberOfMessagesReached(t *testing.T) {
}

func TestDefaultRouterRoutingBecauseMaxVolumeReached(t *testing.T) {
router := NewDefaultRouter(internal.JavaStringHash, 10, 10, oneHourPublishMaxDelay, false)
router := NewDefaultRouter(internal.JavaStringHash,
10, 10,
oneHourPublishMaxDelay, func() bool {
return true
})
const numPartitions = uint32(3)
p1 := router(&ProducerMessage{
Payload: []byte("message 1"),
Expand All @@ -108,7 +119,10 @@ func TestDefaultRouterRoutingBecauseMaxVolumeReached(t *testing.T) {
}

func TestDefaultRouterNoRoutingBecausePartitionKeyIsSpecified(t *testing.T) {
router := NewDefaultRouter(internal.JavaStringHash, 1, 1, 0, false)
router := NewDefaultRouter(internal.JavaStringHash, 1, 1,
0, func() bool {
return true
})
p1 := router(&ProducerMessage{
Key: "my-key",
Payload: []byte("message 1"),
Expand All @@ -124,7 +138,10 @@ func TestDefaultRouterNoRoutingBecausePartitionKeyIsSpecified(t *testing.T) {

func TestDefaultRouterNoRoutingBecauseOnlyOnePartition(t *testing.T) {

router := NewDefaultRouter(internal.JavaStringHash, 1, 10, oneHourPublishMaxDelay, false)
router := NewDefaultRouter(internal.JavaStringHash, 1, 10,
oneHourPublishMaxDelay, func() bool {
return true
})

// partition index should not change regardless of the batching settings
p1 := router(&ProducerMessage{
Expand Down
7 changes: 7 additions & 0 deletions pulsar/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@ type ProducerOptions struct {
// Setting `DisableBatching: true` will make the producer to send messages individually
DisableBatching bool

// This config will ensure that If possible PartitionedProducer would attempt to produce message on
// another available partitions, If currently picked partition is not available for some reason.
// Next available partition will be chosen by the same routing policy as client is configured with.
// MaxRetryOtherPartitions How many partitions should be tried before bailing out and failing back to the
// old behaviour.
MaxRetryOtherPartitions int

// BatchingMaxPublishDelay specifies the time period within which the messages sent will be batched (default: 10ms)
// if batch messages are enabled. If set to a non zero value, messages will be queued until this time
// interval or until
Expand Down
47 changes: 45 additions & 2 deletions pulsar/producer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,16 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) {
options.BatchingMaxMessages,
options.BatchingMaxSize,
options.BatchingMaxPublishDelay,
options.DisableBatching)
func() bool {
if options.DisableBatching {
return false
}

if options.MaxRetryOtherPartitions > 0 && !p.isProducerConnected() {
return false
}
return true
})
p.messageRouter = func(message *ProducerMessage, metadata TopicMetadata) int {
return internalRouter(message, metadata.NumPartitions())
}
Expand Down Expand Up @@ -160,6 +169,15 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) {
return p, nil
}

func (p *producer) isProducerConnected() bool {
for _, partitionedP := range p.producers {
if partitionedP.(*partitionProducer).getProducerState() != producerReady {
return false
}
}
return true
}

func (p *producer) runBackgroundPartitionDiscovery(period time.Duration) (cancel func()) {
var wg sync.WaitGroup
stopDiscoveryCh := make(chan struct{})
Expand Down Expand Up @@ -306,6 +324,23 @@ func (p *producer) SendAsync(ctx context.Context, msg *ProducerMessage,
p.getPartition(msg).SendAsync(ctx, msg, callback)
}

func getNextConnectedPartition(p *producer, msg *ProducerMessage, startPartition int, maxRetry int) int {
if maxRetry == 0 {
return startPartition
}
partition := p.messageRouter(msg, p)
if partition == startPartition {
return partition
}
producers := *(*[]Producer)(atomic.LoadPointer(&p.producersPtr))
producerForPartition := producers[partition].(*partitionProducer)
if producerForPartition.getProducerState() == producerReady {
return partition
}
maxRetry--
return getNextConnectedPartition(p, msg, partition, maxRetry)
}

func (p *producer) getPartition(msg *ProducerMessage) Producer {
// Since partitions can only increase, it's ok if the producers list
// is updated in between. The numPartition is updated only after the list.
Expand All @@ -316,7 +351,15 @@ func (p *producer) getPartition(msg *ProducerMessage) Producer {
// updated
partition %= len(producers)
}
return producers[partition]
producerForPartition := producers[partition].(*partitionProducer)
if producerForPartition.getProducerState() != producerReady {
nextPartition := getNextConnectedPartition(p, msg, partition, 5)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
nextPartition := getNextConnectedPartition(p, msg, partition, 5)
nextPartition := getNextConnectedPartition(p, msg, partition, p.options.MaxRetryOtherPartitions)

if nextPartition != partition {
return producers[nextPartition].(*partitionProducer)
}
}

return producerForPartition
}

func (p *producer) LastSequenceID() int64 {
Expand Down
62 changes: 62 additions & 0 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,68 @@ func TestFlushInPartitionedProducer(t *testing.T) {
assert.Equal(t, msgCount, numOfMessages/2)
}

func TestRoundRobinRouterPartitionedProducerProduceOnNextTopicOnFailure(t *testing.T) {
topicName := "public/default/partition-ProduceOnNextTopicOnFailure" + strconv.FormatInt(time.Now().Unix(), 10)
numberOfPartitions := 10

// call admin api to make it partitioned
url := adminURL + "/" + "admin/v2/persistent/" + topicName + "/partitions"
makeHTTPCall(t, http.MethodPut, url, strconv.Itoa(numberOfPartitions))

numOfMessages := 100
ctx := context.Background()

// creat client connection
client, err := NewClient(ClientOptions{
URL: serviceURL,
})
assert.NoError(t, err)
defer client.Close()

// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "my-sub",
Type: Exclusive,
})
assert.Nil(t, err)
defer consumer.Close()

// create pro
pro, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
DisableBatching: false,
BatchingMaxMessages: uint(5),
MaxRetryOtherPartitions: 5,
})
p := pro.(*producer).producers[0]
firstProducer := p.(*partitionProducer)
firstProducer.Close()
assert.NotEqual(t, firstProducer.getProducerState(), producerReady)
defer pro.Close()

prefix := "msg-"
for i := 0; i < numOfMessages; i++ {
messageContent := prefix + fmt.Sprintf("%d", i)
_, err = pro.Send(ctx, &ProducerMessage{
Payload: []byte(messageContent),
})
assert.Nil(t, err)
}
mChan := make(chan Message, numOfMessages)
for i := 0; i < numOfMessages; i++ {
go func() {
msg, _ := consumer.Receive(ctx)
assert.NotEqual(t, msg.Topic(), "persistent://"+topicName+"-partition-0")
mChan <- msg
}()
}

time.Sleep(1 * time.Second)
l := len(mChan)
assert.Equal(t, numOfMessages, l, "Number of messages received")
}

func TestRoundRobinRouterPartitionedProducer(t *testing.T) {
topicName := "public/default/partition-testRoundRobinRouterPartitionedProducer"
numberOfPartitions := 5
Expand Down