Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to set the pull number for each message? #452

Open
cgeffect opened this issue Jul 21, 2023 · 0 comments
Open

How to set the pull number for each message? #452

cgeffect opened this issue Jul 21, 2023 · 0 comments

Comments

@cgeffect
Copy link

cgeffect commented Jul 21, 2023

When there is a large amount of message accumulation, the consumer's callback function will retrieve multiple messages at once, ranging from 1 to 32, but the maximum is 32
this function msgs param size is 1 <= msgs.size() <= 32
virtual rocketmq::ConsumeStatus consumeMessage(const std::vectorrocketmq::MQMessageExt &msgs);

This set of messages cannot have a separate status of CONSUME_SUCCESS or other status.
So I must wait for all messages in this group to be processed by the business before being uniformly marked as CONSUME_SUCCESS or other status.

I set consumer setConsumeMessageBatchMaxSize(1), but is not work.

For example, if the size of msgs is 32, then I just need to return CONSUME_ SUCCESS means that all 32 messages will be consumed and removed from the message queue broker.
Actually, I only want to consume one by one, rather than group by group

But the same logic applies in the Java version of Rocketmq, I can take one out of msgs and mark return CONSUME_SUCCESS Other messages will continue to be sent and have not been removed from the broker.

OR I can set setPullBatchSize, it default value is 32.

public void setPullBatchSize(int pullBatchSize) {
    this.pullBatchSize = pullBatchSize;
}

this is cpp consumer config

// 消费者
std::string unique_group_name = group + "_" + tag;
consumer = std::make_shared<rocketmq::DefaultMQPushConsumer>(unique_group_name);
consumer->setNamesrvAddr(namesrv);
consumer->setGroupName(group);
consumer->setConsumeThreadCount(max_thread_count);
consumer->setConsumeFromWhere(rocketmq::CONSUME_FROM_LAST_OFFSET);
// 设置每次最大拉取消息量。仅在mq发生消息堆积时候有效
consumer->setConsumeMessageBatchMaxSize(1);
consumer->setTcpTransportConnectTimeout(30 * 1000);
consumer->setAsyncPull(false); // set sync pull
consumer->setMessageModel(rocketmq::CLUSTERING);
consumer->setInstanceName(group);
rocketmq::elogLevel inputLevel = rocketmq::eLOG_LEVEL_LEVEL_NUM;
consumer->setLogLevel(inputLevel);

What should I do?

@cgeffect cgeffect changed the title How to set the pull quantity for each message? How to set the pull number for each message? Jul 21, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant