Skip to content

Commit

Permalink
Add more unit test (#146)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaron-ai authored Aug 9, 2022
1 parent da6eb7b commit 7830691
Show file tree
Hide file tree
Showing 13 changed files with 372 additions and 183 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ public void onFailure(Throwable t) {
return future;
}

ListenableFuture<TopicRouteData> fetchTopicRoute0(final String topic) {
protected ListenableFuture<TopicRouteData> fetchTopicRoute0(final String topic) {
try {
Resource topicResource = Resource.newBuilder().setName(topic).build();
final QueryRouteRequest request = QueryRouteRequest.newBuilder().setTopic(topicResource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,8 @@ class ProducerImpl extends ClientImpl implements Producer {
private static final Logger LOGGER = LoggerFactory.getLogger(ProducerImpl.class);

protected final ProducerSettings producerSettings;

final ConcurrentMap<String/* topic */, PublishingLoadBalancer> publishingRouteDataCache;
private final TransactionChecker checker;
private final ConcurrentMap<String/* topic */, PublishingLoadBalancer> publishingRouteDataCache;

/**
* The caller is supposed to have validated the arguments and handled throwing exception or
Expand Down Expand Up @@ -197,7 +196,7 @@ public HeartbeatRequest wrapHeartbeatRequest() {
*/
@Override
public SendReceipt send(Message message) throws ClientException {
final ListenableFuture<SendReceipt> future = Futures.transform(send0(Collections.singletonList(message), false),
final ListenableFuture<SendReceipt> future = Futures.transform(send(Collections.singletonList(message), false),
sendReceipts -> sendReceipts.iterator().next(), MoreExecutors.directExecutor());
return handleClientFuture(future);
}
Expand All @@ -217,7 +216,7 @@ public SendReceipt send(Message message, Transaction transaction) throws ClientE
} catch (Throwable t) {
throw new ClientException(t);
}
final ListenableFuture<List<SendReceiptImpl>> future = send0(Collections.singletonList(publishingMessage),
final ListenableFuture<List<SendReceiptImpl>> future = send(Collections.singletonList(publishingMessage),
true);
final List<SendReceiptImpl> receipts = handleClientFuture(future);
final SendReceiptImpl sendReceipt = receipts.iterator().next();
Expand All @@ -230,7 +229,7 @@ public SendReceipt send(Message message, Transaction transaction) throws ClientE
*/
@Override
public CompletableFuture<SendReceipt> sendAsync(Message message) {
final ListenableFuture<SendReceipt> future = Futures.transform(send0(Collections.singletonList(message), false),
final ListenableFuture<SendReceipt> future = Futures.transform(send(Collections.singletonList(message), false),
sendReceipts -> sendReceipts.iterator().next(), MoreExecutors.directExecutor());
return FutureConverter.toCompletableFuture(future);
}
Expand Down Expand Up @@ -329,7 +328,7 @@ private List<MessageQueueImpl> takeMessageQueues(PublishingLoadBalancer result)
return result.takeMessageQueues(isolated, this.getRetryPolicy().getMaxAttempts());
}

private ListenableFuture<List<SendReceiptImpl>> send0(List<Message> messages, boolean txEnabled) {
private ListenableFuture<List<SendReceiptImpl>> send(List<Message> messages, boolean txEnabled) {
SettableFuture<List<SendReceiptImpl>> future = SettableFuture.create();

// Check producer state before message publishing.
Expand Down Expand Up @@ -419,52 +418,52 @@ private ListenableFuture<List<SendReceiptImpl>> send0(List<Message> messages, bo
/**
* The caller is supposed to make sure different messages have the same message type and same topic.
*/
private SendMessageRequest wrapSendMessageRequest(List<PublishingMessageImpl> messages) {
return SendMessageRequest.newBuilder()
.addAllMessages(messages.stream().map(PublishingMessageImpl::toProtobuf).collect(Collectors.toList()))
.build();
private SendMessageRequest wrapSendMessageRequest(List<PublishingMessageImpl> pubMessages, MessageQueueImpl mq) {
final List<apache.rocketmq.v2.Message> messages = pubMessages.stream()
.map(publishingMessage -> publishingMessage.toProtobuf(mq)).collect(Collectors.toList());
return SendMessageRequest.newBuilder().addAllMessages(messages).build();
}

private void send0(SettableFuture<List<SendReceiptImpl>> future, String topic, MessageType messageType,
ListenableFuture<List<SendReceiptImpl>> send0(Metadata metadata, Endpoints endpoints,
List<PublishingMessageImpl> pubMessages, MessageQueueImpl mq) {
final SendMessageRequest request = wrapSendMessageRequest(pubMessages, mq);
final ListenableFuture<RpcInvocation<SendMessageResponse>> future0 =
clientManager.sendMessage(endpoints, metadata, request, clientConfiguration.getRequestTimeout());
return Futures.transformAsync(future0,
invocation -> Futures.immediateFuture(SendReceiptImpl.processResponseInvocation(mq, invocation)),
MoreExecutors.directExecutor());
}

private void send0(SettableFuture<List<SendReceiptImpl>> future0, String topic, MessageType messageType,
final List<MessageQueueImpl> candidates, final List<PublishingMessageImpl> messages, final int attempt) {
Metadata metadata;
try {
metadata = sign();
} catch (Throwable t) {
// Failed to sign, no need to proceed.
future.setException(t);
future0.setException(t);
return;
}
// Calculate the current message queue.
final MessageQueueImpl messageQueue = candidates.get(IntMath.mod(attempt - 1, candidates.size()));
final List<MessageType> acceptMessageTypes = messageQueue.getAcceptMessageTypes();
final MessageQueueImpl mq = candidates.get(IntMath.mod(attempt - 1, candidates.size()));
final List<MessageType> acceptMessageTypes = mq.getAcceptMessageTypes();
if (producerSettings.isValidateMessageType() && !acceptMessageTypes.contains(messageType)) {
final IllegalArgumentException e = new IllegalArgumentException("Current message type not match with "
+ "topic accept message types, topic=" + topic + ", actualMessageType=" + messageType + ", "
+ "acceptMessageTypes=" + acceptMessageTypes);
future.setException(e);
future0.setException(e);
return;
}
final Endpoints endpoints = messageQueue.getBroker().getEndpoints();
final SendMessageRequest request = wrapSendMessageRequest(messages);

final ListenableFuture<RpcInvocation<SendMessageResponse>> responseFuture =
clientManager.sendMessage(endpoints, metadata, request, clientConfiguration.getRequestTimeout());

final ListenableFuture<List<SendReceiptImpl>> attemptFuture = Futures.transformAsync(responseFuture,
invocation -> Futures.immediateFuture(SendReceiptImpl.processSendMessageResponseInvocation(messageQueue,
invocation)),
MoreExecutors.directExecutor());

final Endpoints endpoints = mq.getBroker().getEndpoints();
final ListenableFuture<List<SendReceiptImpl>> future = send0(metadata, endpoints, messages, mq);
final int maxAttempts = this.getRetryPolicy().getMaxAttempts();

// Intercept before message publishing.
final Stopwatch stopwatch = Stopwatch.createStarted();
final List<MessageCommon> messageCommons =
messages.stream().map(PublishingMessageImpl::getMessageCommon).collect(Collectors.toList());
doBefore(MessageHookPoints.SEND, messageCommons);

Futures.addCallback(attemptFuture, new FutureCallback<List<SendReceiptImpl>>() {
Futures.addCallback(future, new FutureCallback<List<SendReceiptImpl>>() {
@Override
public void onSuccess(List<SendReceiptImpl> sendReceipts) {
// Intercept after message publishing.
Expand All @@ -475,11 +474,11 @@ public void onSuccess(List<SendReceiptImpl> sendReceipts) {
final InternalErrorException e = new InternalErrorException("[Bug] due to an"
+ " unknown reason from remote, received send receipt's quantity " + sendReceipts.size()
+ " is not equal to sent message's quantity " + messages.size());
future.setException(e);
future0.setException(e);
return;
}
// No need more attempts.
future.set(sendReceipts);
future0.set(sendReceipts);
// Resend message(s) successfully.
if (1 < attempt) {
// Collect messageId(s) for logging.
Expand Down Expand Up @@ -509,15 +508,15 @@ public void onFailure(Throwable t) {
isolate(endpoints);
if (attempt >= maxAttempts) {
// No need more attempts.
future.setException(t);
future0.setException(t);
LOGGER.error("Failed to send message(s) finally, run out of attempt times, maxAttempts={}, " +
"attempt={}, topic={}, messageId(s)={}, endpoints={}, clientId={}",
maxAttempts, attempt, topic, messageIds, endpoints, clientId, t);
return;
}
// No need more attempts for transactional message.
if (MessageType.TRANSACTION.equals(messageType)) {
future.setException(t);
future0.setException(t);
LOGGER.error("Failed to send transactional message finally, maxAttempts=1, attempt={}, " +
"topic={}, messageId(s)={}, endpoints={}, clientId={}", attempt, topic, messageIds,
endpoints, clientId, t);
Expand All @@ -530,14 +529,14 @@ public void onFailure(Throwable t) {
LOGGER.warn("Failed to send message, would attempt to resend right now, maxAttempts={}, "
+ "attempt={}, topic={}, messageId(s)={}, endpoints={}, clientId={}", maxAttempts, attempt,
topic, messageIds, endpoints, clientId, t);
send0(future, topic, messageType, candidates, messages, nextAttempt);
send0(future0, topic, messageType, candidates, messages, nextAttempt);
return;
}
final Duration delay = ProducerImpl.this.getRetryPolicy().getNextAttemptDelay(nextAttempt);
LOGGER.warn("Failed to send message due to too many requests, would attempt to resend after {}, "
+ "maxAttempts={}, attempt={}, topic={}, messageId(s)={}, endpoints={}, clientId={}", delay,
maxAttempts, attempt, topic, messageIds, endpoints, clientId, t);
clientManager.getScheduler().schedule(() -> send0(future, topic, messageType, candidates, messages,
clientManager.getScheduler().schedule(() -> send0(future0, topic, messageType, candidates, messages,
nextAttempt), delay.toNanos(), TimeUnit.NANOSECONDS);
}
}, clientCallbackExecutor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void applySettingsCommand(Settings settings) {
final apache.rocketmq.v2.RetryPolicy backoffPolicy = settings.getBackoffPolicy();
final Publishing publishing = settings.getPublishing();
RetryPolicy exist = retryPolicy;
this.retryPolicy = exist.updateBackoff(backoffPolicy);
this.retryPolicy = exist.inheritBackoff(backoffPolicy);
this.validateMessageType = settings.getPublishing().getValidateMessageType();
this.maxBodySizeBytes = publishing.getMaxBodySize();
this.arrivedFuture.setFuture(Futures.immediateVoidFuture());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public long getOffset() {
return offset;
}

public static List<SendReceiptImpl> processSendMessageResponseInvocation(MessageQueueImpl mq,
public static List<SendReceiptImpl> processResponseInvocation(MessageQueueImpl mq,
RpcInvocation<SendMessageResponse> invocation) throws ClientException {
final SendMessageResponse response = invocation.getResponse();
Status status = response.getStatus();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.rocketmq.client.java.impl.producer.ProducerSettings;
import org.apache.rocketmq.client.java.message.protocol.Encoding;
import org.apache.rocketmq.client.java.misc.Utilities;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;

/**
* This class is a publishing view for message, which could be considered as an extension of {@link MessageImpl}.
Expand Down Expand Up @@ -97,7 +98,7 @@ public Optional<String> getTraceContext() {
* <p>This method should be invoked before each message sending, because the born time is reset before each
* invocation, which means that it should not be invoked ahead of time.
*/
public apache.rocketmq.v2.Message toProtobuf() {
public apache.rocketmq.v2.Message toProtobuf(MessageQueueImpl mq) {
final apache.rocketmq.v2.SystemProperties.Builder systemPropertiesBuilder =
apache.rocketmq.v2.SystemProperties.newBuilder()
// Message keys
Expand All @@ -110,6 +111,8 @@ public apache.rocketmq.v2.Message toProtobuf() {
.setBornHost(Utilities.hostName())
// Body encoding
.setBodyEncoding(Encoding.toProtobuf(Encoding.IDENTITY))
// Queue id
.setQueueId(mq.getQueueId())
// Message type
.setMessageType(MessageType.toProtobuf(messageType));
// Message tag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public int getMaxAttempts() {
return maxAttempts;
}

List<Duration> getDurations() {
return durations;
}

@Override
public Duration getNextAttemptDelay(int attempt) {
checkArgument(attempt > 0, "attempt must be positive");
Expand All @@ -68,14 +72,14 @@ public static CustomizedBackoffRetryPolicy fromProtobuf(apache.rocketmq.v2.Retry
}

@Override
public RetryPolicy updateBackoff(apache.rocketmq.v2.RetryPolicy retryPolicy) {
public RetryPolicy inheritBackoff(apache.rocketmq.v2.RetryPolicy retryPolicy) {
if (!CUSTOMIZED_BACKOFF.equals(retryPolicy.getStrategyCase())) {
throw new IllegalArgumentException("strategy must be customized backoff");
}
return updateBackoff(retryPolicy.getCustomizedBackoff());
return inheritBackoff(retryPolicy.getCustomizedBackoff());
}

private RetryPolicy updateBackoff(CustomizedBackoff backoff) {
private RetryPolicy inheritBackoff(CustomizedBackoff backoff) {
final List<Duration> durations = backoff.getNextList().stream()
.map(duration -> Duration.ofNanos(Durations.toNanos(duration)))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,18 @@ public int getMaxAttempts() {
return maxAttempts;
}

Duration getInitialBackoff() {
return initialBackoff;
}

Duration getMaxBackoff() {
return maxBackoff;
}

double getBackoffMultiplier() {
return backoffMultiplier;
}

@Override
public Duration getNextAttemptDelay(int attempt) {
checkArgument(attempt > 0, "attempt must be positive");
Expand Down Expand Up @@ -88,14 +100,14 @@ public static ExponentialBackoffRetryPolicy fromProtobuf(apache.rocketmq.v2.Retr
}

@Override
public RetryPolicy updateBackoff(apache.rocketmq.v2.RetryPolicy retryPolicy) {
public RetryPolicy inheritBackoff(apache.rocketmq.v2.RetryPolicy retryPolicy) {
if (!EXPONENTIAL_BACKOFF.equals(retryPolicy.getStrategyCase())) {
throw new IllegalArgumentException("strategy must be exponential backoff");
}
return updateBackoff(retryPolicy.getExponentialBackoff());
return inheritBackoff(retryPolicy.getExponentialBackoff());
}

private RetryPolicy updateBackoff(ExponentialBackoff backoff) {
private RetryPolicy inheritBackoff(ExponentialBackoff backoff) {
return new ExponentialBackoffRetryPolicy(maxAttempts,
Duration.ofNanos(Durations.toNanos(backoff.getInitial())),
Duration.ofNanos(Durations.toNanos(backoff.getMax())),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ public interface RetryPolicy {
* @param retryPolicy retry policy which contains the backoff strategy.
* @return the new retry policy.
*/
RetryPolicy updateBackoff(apache.rocketmq.v2.RetryPolicy retryPolicy);
RetryPolicy inheritBackoff(apache.rocketmq.v2.RetryPolicy retryPolicy);

/**
* Convert to {@link apache.rocketmq.v2.RetryPolicy}.
*/
apache.rocketmq.v2.RetryPolicy toProtobuf();
}
Loading

0 comments on commit 7830691

Please sign in to comment.