关于消费者负载均衡的问题 (顺序消息负载机制) #812
Ssummer520
started this conversation in
General
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
https://rocketmq.apache.org/zh/docs/featureBehavior/08consumerloadbalance
顺序消息负载机制
当我启动两个client来消费顺序消息
Topic := "FIFOTopic"
os.Setenv("mq.consoleAppender.enabled", "true")
golang.ResetLogger()
// new simpleConsumer instance
simpleConsumer, err := golang.NewSimpleConsumer(&golang.Config{
Endpoint: Endpoint,
Credentials: &credentials.SessionCredentials{},
ConsumerGroup: "groupFIFO",
},
这块配置都一致
唯一区别 一个在ack前进行time.sleep模拟 第一条消息没有消费完阻塞,没有返回ack。按照顺序消息负载机制第一个consumer
应该消费成功之后 第二个consumer再轮训到第二个consumer.结果是阻塞的consumer未返回ack 我的另一个consumer消费了剩下全部的顺序消息.第一个consumer重试机制 消息重试发送到第二个consumer
for {
fmt.Println("start recevie message")
mvs, err := simpleConsumer.Receive(context.TODO(), maxMessageNum, invisibleDuration)
if err != nil {
fmt.Println(err)
continue
}
for _, mv := range mvs {
time.Sleep(time.Second *5)
err := simpleConsumer.Ack(context.TODO(), mv)
if err != nil {
return
}
fmt.Println(string(mv.GetBody()) + " " + time.Now().Format(time.StampMilli))
}
}
生产者
func SendFIFO() {
topic := "FIFOTopic"
producer, err := golang.NewProducer(&golang.Config{
Endpoint: Endpoint,
Credentials: &credentials.SessionCredentials{},
},
golang.WithTopics(topic),
)
if err != nil {
log.Fatal(err)
}
// start producer
err = producer.Start()
if err != nil {
log.Fatal(err)
}
// gracefule stop producer
defer producer.GracefulStop()
for i := 0; i < 10; i++ {
msg := &golang.Message{
Topic: topic,
Body: []byte("this is a message groupFIFO: " + strconv.Itoa(i) + " " + time.Now().Format(time.DateTime)),
}
// set keys and tag
msg.SetKeys("a", "b")
msg.SetTag("ab")
msg.SetMessageGroup("fifo")
// send message in sync
resp, err := producer.Send(context.TODO(), msg)
if err != nil {
log.Fatal(err)
}
for i := 0; i < len(resp); i++ {
fmt.Printf("%#v\n", resp[i])
}
// wait a moment
}
结果:
Beta Was this translation helpful? Give feedback.
All reactions