Skip to content

Commit

Permalink
ARTEMIS-4745 Allow configuration of the pull consumer batch size
Browse files Browse the repository at this point in the history
Allow for configuration of the batch size granted to the remote when an
AMQP federation queue receiver is pulling messages only when there is
local capacity to handle them. Some code housekeeping is done here to
make adding future properties a bit simpler and require fewer changes.
  • Loading branch information
tabish121 authored and gemmellr committed Apr 26, 2024
1 parent ee7a2c0 commit 659b17c
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -132,39 +131,9 @@ public synchronized boolean isStarted() {
public abstract AMQPSessionContext getSessionContext();

/**
* @return the timeout before signaling an error when creating remote link (0 mean disable).
* @return the federation configuration that is in effect.
*/
public abstract int getLinkAttachTimeout();

/**
* @return the configured {@link Receiver} link credit batch size.
*/
public abstract int getReceiverCredits();

/**
* @return the configured {@link Receiver} link credit low value.
*/
public abstract int getReceiverCreditsLow();

/**
* @return the size in bytes before a message is considered large.
*/
public abstract int getLargeMessageThreshold();

/**
* @return the true if the federation should ignore filters on queue consumers.
*/
public abstract boolean isIgnoreQueueConsumerFilters();

/**
* @return the true if the federation should ignore priorities on queue consumers.
*/
public abstract boolean isIgnoreQueueConsumerPriorities();

/**
* @return the true if the federation should support core message tunneling.
*/
public abstract boolean isCoreMessageTunnelingEnabled();
public abstract AMQPFederationConfiguration getConfiguration();

@Override
public final synchronized void start() throws ActiveMQException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,11 +321,11 @@ private void asyncCreateReceiver() {
final ScheduledFuture<?> openTimeoutTask;
final AtomicBoolean openTimedOut = new AtomicBoolean(false);

if (federation.getLinkAttachTimeout() > 0) {
if (configuration.getLinkAttachTimeout() > 0) {
openTimeoutTask = federation.getServer().getScheduledPool().schedule(() -> {
openTimedOut.set(true);
federation.signalResourceCreateError(ActiveMQAMQPProtocolMessageBundle.BUNDLE.brokerConnectionTimeout());
}, federation.getLinkAttachTimeout(), TimeUnit.SECONDS);
}, configuration.getLinkAttachTimeout(), TimeUnit.SECONDS);
} else {
openTimeoutTask = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ protected boolean testIfAddressMatchesPolicy(AddressInfo addressInfo) {

// Address consumers can't pull as we have no real metric to indicate when / how much
// we should pull so instead we refuse to match if credit set to zero.
if (federation.getReceiverCredits() <= 0) {
if (federation.getConfiguration().getReceiverCredits() <= 0) {
logger.debug("Federation address policy rejecting match on {} because credit is set to zero:", addressInfo.getName());
return false;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LARGE_MESSAGE_THRESHOLD;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LINK_ATTACH_TIMEOUT;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.PULL_RECEIVER_BATCH_SIZE;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.RECEIVER_CREDITS;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.RECEIVER_CREDITS_LOW;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.IGNORE_QUEUE_CONSUMER_FILTERS;
Expand Down Expand Up @@ -47,6 +48,11 @@ public final class AMQPFederationConfiguration {
*/
public static final int DEFAULT_LINK_ATTACH_TIMEOUT = 30;

/**
* Default credits granted to a receiver that is in pull mode.
*/
public static final int DEFAULT_PULL_CREDIT_BATCH_SIZE = 100;

/**
* Default value for the core message tunneling feature that indicates if core protocol messages
* should be streamed as binary blobs as the payload of an custom AMQP message which avoids any
Expand Down Expand Up @@ -112,6 +118,20 @@ public int getReceiverCreditsLow() {
}
}

/**
* @return the credit batch size offered to a {@link Receiver} link that is in pull mode.
*/
public int getPullReceiverBatchSize() {
final Object property = properties.get(PULL_RECEIVER_BATCH_SIZE);
if (property instanceof Number) {
return ((Number) property).intValue();
} else if (property instanceof String) {
return Integer.parseInt((String) property);
} else {
return DEFAULT_PULL_CREDIT_BATCH_SIZE;
}
}

/**
* @return the size in bytes of an incoming message after which the {@link Receiver} treats it as large.
*/
Expand Down Expand Up @@ -193,6 +213,7 @@ public Map<String, Object> toConfigurationMap() {

configMap.put(RECEIVER_CREDITS, getReceiverCredits());
configMap.put(RECEIVER_CREDITS_LOW, getReceiverCreditsLow());
configMap.put(PULL_RECEIVER_BATCH_SIZE, getPullReceiverBatchSize());
configMap.put(LARGE_MESSAGE_THRESHOLD, getLargeMessageThreshold());
configMap.put(LINK_ATTACH_TIMEOUT, getLinkAttachTimeout());
configMap.put(IGNORE_QUEUE_CONSUMER_FILTERS, isIgnoreSubscriptionFilters());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ public final class AMQPFederationConstants {
*/
public static final String RECEIVER_CREDITS_LOW = "amqpLowCredits";

/**
* Configuration property that defines the amount of credits to batch to an AMQP receiver link
* and the top up value when sending more credit once the broker has capacity available for
* them. this can be sent to the peer so that dual federation configurations share the same
* configuration on both sides of the connection.
*/
public static final String PULL_RECEIVER_BATCH_SIZE = "amqpPullConsumerCredits";

/**
* Configuration property used to convey the local side value to use when considering if a message
* is a large message, this can be sent to the peer so that dual federation configurations share
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.IGNORE_QUEUE_CONSUMER_PRIORITIES;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LARGE_MESSAGE_THRESHOLD;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LINK_ATTACH_TIMEOUT;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.PULL_RECEIVER_BATCH_SIZE;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.RECEIVER_CREDITS;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.RECEIVER_CREDITS_LOW;

Expand All @@ -29,6 +30,7 @@
import java.util.Map;

import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.qpid.proton.engine.Receiver;

/**
* Configuration options applied to a consumer created from federation policies
Expand Down Expand Up @@ -59,7 +61,7 @@ public int getReceiverCredits() {
} else if (property instanceof String) {
return Integer.parseInt((String) property);
} else {
return federation.getReceiverCredits();
return federation.getConfiguration().getReceiverCredits();
}
}

Expand All @@ -70,7 +72,21 @@ public int getReceiverCreditsLow() {
} else if (property instanceof String) {
return Integer.parseInt((String) property);
} else {
return federation.getReceiverCreditsLow();
return federation.getConfiguration().getReceiverCreditsLow();
}
}

/**
* @return the credit batch size offered to a {@link Receiver} link that is in pull mode.
*/
public int getPullReceiverBatchSize() {
final Object property = properties.get(PULL_RECEIVER_BATCH_SIZE);
if (property instanceof Number) {
return ((Number) property).intValue();
} else if (property instanceof String) {
return Integer.parseInt((String) property);
} else {
return federation.getConfiguration().getPullReceiverBatchSize();
}
}

Expand All @@ -81,7 +97,7 @@ public int getLargeMessageThreshold() {
} else if (property instanceof String) {
return Integer.parseInt((String) property);
} else {
return federation.getLargeMessageThreshold();
return federation.getConfiguration().getLargeMessageThreshold();
}
}

Expand All @@ -92,7 +108,7 @@ public int getLinkAttachTimeout() {
} else if (property instanceof String) {
return Integer.parseInt((String) property);
} else {
return federation.getLinkAttachTimeout();
return federation.getConfiguration().getLinkAttachTimeout();
}
}

Expand All @@ -103,7 +119,7 @@ public boolean isCoreMessageTunnelingEnabled() {
} else if (property instanceof String) {
return Boolean.parseBoolean((String) property);
} else {
return federation.isCoreMessageTunnelingEnabled();
return federation.getConfiguration().isCoreMessageTunnelingEnabled();
}
}

Expand All @@ -114,7 +130,7 @@ public boolean isIgnoreSubscriptionFilters() {
} else if (property instanceof String) {
return Boolean.parseBoolean((String) property);
} else {
return federation.isIgnoreQueueConsumerFilters();
return federation.getConfiguration().isIgnoreSubscriptionFilters();
}
}

Expand All @@ -125,7 +141,7 @@ public boolean isIgnoreSubscriptionPriorities() {
} else if (property instanceof String) {
return Boolean.parseBoolean((String) property);
} else {
return federation.isIgnoreQueueConsumerPriorities();
return federation.getConfiguration().isIgnoreSubscriptionPriorities();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ public class AMQPFederationQueueConsumer implements FederationConsumerInternal {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

public static final int DEFAULT_PULL_CREDIT_BATCH_SIZE = 100;

public static final int DEFAULT_PENDING_MSG_CHECK_BACKOFF_MULTIPLIER = 2;
public static final int DEFAULT_PENDING_MSG_CHECK_MAX_DELAY = 30;

Expand Down Expand Up @@ -315,11 +313,11 @@ private void asyncCreateReceiver() {
final ScheduledFuture<?> openTimeoutTask;
final AtomicBoolean openTimedOut = new AtomicBoolean(false);

if (federation.getLinkAttachTimeout() > 0) {
if (configuration.getLinkAttachTimeout() > 0) {
openTimeoutTask = federation.getServer().getScheduledPool().schedule(() -> {
openTimedOut.set(true);
federation.signalResourceCreateError(ActiveMQAMQPProtocolMessageBundle.BUNDLE.brokerConnectionTimeout());
}, federation.getLinkAttachTimeout(), TimeUnit.SECONDS);
}, configuration.getLinkAttachTimeout(), TimeUnit.SECONDS);
} else {
openTimeoutTask = null;
}
Expand Down Expand Up @@ -516,7 +514,7 @@ protected Runnable createCreditRunnable(AMQPConnectionContext connection) {
// credit. This also allows consumers created on the remote side of a federation connection
// to read from properties sent from the federation source that indicate the values that are
// configured on the local side.
if (federation.getReceiverCredits() > 0) {
if (configuration.getReceiverCredits() > 0) {
return createCreditRunnable(configuration.getReceiverCredits(), configuration.getReceiverCreditsLow(), receiver, connection, this);
} else {
return this::checkIfCreditTopUpNeeded;
Expand Down Expand Up @@ -579,7 +577,7 @@ private void performCreditTopUp() {
return; // Closed before this was triggered.
}

receiver.flow(DEFAULT_PULL_CREDIT_BATCH_SIZE);
receiver.flow(configuration.getPullReceiverBatchSize());
connection.instantFlush();
lastBacklogCheckDelay = 0;
creditTopUpInProgress.set(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,6 @@ public AMQPBrokerConnection getBrokerConnection() {
return brokerConnection;
}

@Override
public int getLinkAttachTimeout() {
return configuration.getLinkAttachTimeout();
}

@Override
public synchronized AMQPSessionContext getSessionContext() {
if (!connected) {
Expand All @@ -148,58 +143,12 @@ public synchronized AMQPConnectionContext getConnectionContext() {
}

@Override
public synchronized int getReceiverCredits() {
if (!connected) {
throw new IllegalStateException("Cannot access connection configuration, federation is not connected");
}

return configuration.getReceiverCredits();
}

@Override
public synchronized int getReceiverCreditsLow() {
public synchronized AMQPFederationConfiguration getConfiguration() {
if (!connected) {
throw new IllegalStateException("Cannot access connection configuration, federation is not connected");
}

return configuration.getReceiverCreditsLow();
}

@Override
public synchronized int getLargeMessageThreshold() {
if (!connected) {
throw new IllegalStateException("Cannot access connection configuration, federation is not connected");
}

return configuration.getLargeMessageThreshold();
}

@Override
public boolean isCoreMessageTunnelingEnabled() {
if (!connected) {
throw new IllegalStateException("Cannot access connection configuration, federation is not connected");
}

return configuration.isCoreMessageTunnelingEnabled();
}


@Override
public boolean isIgnoreQueueConsumerFilters() {
if (!connected) {
throw new IllegalStateException("Cannot access connection configuration, federation is not connected");
}

return configuration.isIgnoreSubscriptionFilters();
}

@Override
public boolean isIgnoreQueueConsumerPriorities() {
if (!connected) {
throw new IllegalStateException("Cannot access connection configuration, federation is not connected");
throw new IllegalStateException("Cannot access connection while federation is not connected");
}

return configuration.isIgnoreSubscriptionPriorities();
return configuration;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,38 +67,8 @@ public AMQPSessionContext getSessionContext() {
}

@Override
public int getReceiverCredits() {
return configuration.getReceiverCredits();
}

@Override
public int getReceiverCreditsLow() {
return configuration.getReceiverCreditsLow();
}

@Override
public int getLargeMessageThreshold() {
return configuration.getLargeMessageThreshold();
}

@Override
public int getLinkAttachTimeout() {
return configuration.getLinkAttachTimeout();
}

@Override
public boolean isCoreMessageTunnelingEnabled() {
return configuration.isCoreMessageTunnelingEnabled();
}

@Override
public boolean isIgnoreQueueConsumerFilters() {
return configuration.isIgnoreSubscriptionFilters();
}

@Override
public boolean isIgnoreQueueConsumerPriorities() {
return configuration.isIgnoreSubscriptionPriorities();
public synchronized AMQPFederationConfiguration getConfiguration() {
return configuration;
}

@Override
Expand Down
Loading

0 comments on commit 659b17c

Please sign in to comment.