Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,6 @@ protected RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, f
msgInner.setTopic(newTopic);
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));

msgInner.setQueueId(queueIdInt);
Expand All @@ -229,6 +227,7 @@ protected RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, f

String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));

boolean succeeded = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
Expand Down Expand Up @@ -266,9 +265,10 @@ private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) {
long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
msgInner.setTagsCode(tagsCodeValue);
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC);
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID);
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
return msgInner;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,10 +338,8 @@ private MessageExtBrokerInner messageTimeUp(MessageExt msgExt) {
MessageAccessor.setProperties(msgInner, msgExt.getProperties());

TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
long tagsCodeValue =
MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
msgInner.setTagsCode(tagsCodeValue);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));

msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
Expand All @@ -353,6 +351,7 @@ private MessageExtBrokerInner messageTimeUp(MessageExt msgExt) {
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TIMER_DELIVER_MS);
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TIMER_DELAY_SEC);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));

msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,6 @@ private SendResult sendKernelImpl(final Message msg,
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
Expand All @@ -1015,6 +1014,7 @@ private SendResult sendKernelImpl(final Message msg,
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));

SendResult sendResult = null;
switch (communicationMode) {
Expand Down
Loading