Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARTEMIS-4744 Fully support multple host broker connections URIs #4906

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,11 @@
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.remoting.CertificateUtil;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
Expand All @@ -65,10 +68,13 @@
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
import org.apache.activemq.artemis.protocol.amqp.connect.AMQPBrokerConnectionManager.ClientProtocolManagerWithAMQP;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationPolicySupport;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationSource;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerAggregation;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.ReferenceIDSupplier;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPLargeMessageWriter;
Expand Down Expand Up @@ -120,10 +126,13 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
*/
public static final boolean DEFAULT_CORE_MESSAGE_TUNNELING_ENABLED = true;

private static final NettyConnectorFactory CONNECTOR_FACTORY = new NettyConnectorFactory().setServerConnector(true);

private final ProtonProtocolManagerFactory protonProtocolManagerFactory;
private final ReferenceIDSupplier referenceIdSupplier;
private final AMQPBrokerConnectConfiguration brokerConnectConfiguration;
private final ProtonProtocolManager protonProtocolManager;
private final ActiveMQServer server;
private final NettyConnector bridgesConnector;
private final List<TransportConfiguration> configurations;
private NettyConnection connection;
private Session session;
private AMQPSessionContext sessionContext;
Expand All @@ -134,6 +143,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
private AMQPFederationSource brokerFederation;
private int retryCounter = 0;
private int lastRetryCounter;
private int connectionTimeout;
private boolean connecting = false;
private volatile ScheduledFuture<?> reconnectFuture;
private final Set<Queue> senders = new HashSet<>();
Expand All @@ -153,16 +163,16 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,

public AMQPBrokerConnection(AMQPBrokerConnectionManager bridgeManager,
AMQPBrokerConnectConfiguration brokerConnectConfiguration,
ProtonProtocolManager protonProtocolManager,
ActiveMQServer server,
NettyConnector bridgesConnector) {
ProtonProtocolManagerFactory protonProtocolManagerFactory,
ActiveMQServer server) throws Exception {
this.bridgeManager = bridgeManager;
this.brokerConnectConfiguration = brokerConnectConfiguration;
this.protonProtocolManager = protonProtocolManager;
this.server = server;
this.bridgesConnector = bridgesConnector;
connectExecutor = server.getExecutorFactory().getExecutor();
scheduledExecutorService = server.getScheduledPool();
this.configurations = brokerConnectConfiguration.getTransportConfigurations();
this.connectExecutor = server.getExecutorFactory().getExecutor();
this.scheduledExecutorService = server.getScheduledPool();
this.protonProtocolManagerFactory = protonProtocolManagerFactory;
this.referenceIdSupplier = new ReferenceIDSupplier(server);
}

@Override
Expand Down Expand Up @@ -190,7 +200,7 @@ public boolean isConnecting() {
}

public int getConnectionTimeout() {
return bridgesConnector.getConnectTimeoutMillis();
return connectionTimeout;
}

@Override
Expand Down Expand Up @@ -340,19 +350,32 @@ private void doConnect() {
try {
connecting = true;

List<TransportConfiguration> configurationList = brokerConnectConfiguration.getTransportConfigurations();
TransportConfiguration configuration = configurations.get(retryCounter % configurations.size());
host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration.getParams());
port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, configuration.getParams());

TransportConfiguration tpConfig = configurationList.get(retryCounter % configurationList.size());
ProtonProtocolManager protonProtocolManager =
(ProtonProtocolManager)protonProtocolManagerFactory.createProtocolManager(server, configuration.getExtraParams(), null, null);
NettyConnector connector = (NettyConnector)CONNECTOR_FACTORY.createConnector(
configuration.getParams(), null, this, server.getExecutorFactory().getExecutor(), server.getThreadPool(), server.getScheduledPool(), new ClientProtocolManagerWithAMQP(protonProtocolManager));
connector.start();

String hostOnParameter = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, tpConfig.getParams());
int portOnParameter = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, tpConfig.getParams());
this.host = hostOnParameter;
this.port = portOnParameter;
connection = bridgesConnector.createConnection(null, hostOnParameter, portOnParameter);
logger.debug("Connecting {}", configuration);

if (connection == null) {
retryConnection();
return;
connectionTimeout = connector.getConnectTimeoutMillis();
try {
connection = (NettyConnection) connector.createConnection();
if (connection == null) {
retryConnection();
return;
}
} finally {
if (connection == null) {
try {
connector.close();
} catch (Exception ex) {
}
}
}

lastRetryCounter = retryCounter;
Expand All @@ -368,12 +391,15 @@ private void doConnect() {

ClientSASLFactory saslFactory = new SaslFactory(connection, brokerConnectConfiguration);

NettyConnectorCloseHandler connectorCloseHandler = new NettyConnectorCloseHandler(connector, connectExecutor);
ConnectionEntry entry = protonProtocolManager.createOutgoingConnectionEntry(connection, saslFactory);
server.getRemotingService().addConnectionEntry(connection, entry);
protonRemotingConnection = (ActiveMQProtonRemotingConnection) entry.connection;
protonRemotingConnection.getAmqpConnection().addLinkRemoteCloseListener(getName(), this::linkClosed);
protonRemotingConnection.addCloseListener(connectorCloseHandler);
protonRemotingConnection.addFailureListener(connectorCloseHandler);

connection.getChannel().pipeline().addLast(new AMQPBrokerConnectionChannelHandler(bridgesConnector.getChannelGroup(), protonRemotingConnection.getAmqpConnection().getHandler(), this, server.getExecutorFactory().getExecutor()));
connection.getChannel().pipeline().addLast(new AMQPBrokerConnectionChannelHandler(connector.getChannelGroup(), protonRemotingConnection.getAmqpConnection().getHandler(), this, server.getExecutorFactory().getExecutor()));

session = protonRemotingConnection.getAmqpConnection().getHandler().getConnection().session();
sessionContext = protonRemotingConnection.getAmqpConnection().getSessionExtension(session);
Expand Down Expand Up @@ -531,7 +557,7 @@ private Queue installMirrorController(AMQPMirrorBrokerConnectionElement replicaC
throw new IllegalAccessException("Cannot start replica");
}

AMQPMirrorControllerSource newPartition = new AMQPMirrorControllerSource(protonProtocolManager, snfQueue, server, replicaConfig, this);
AMQPMirrorControllerSource newPartition = new AMQPMirrorControllerSource(referenceIdSupplier, snfQueue, server, replicaConfig, this);

this.mirrorControllerSource = newPartition;

Expand Down Expand Up @@ -702,11 +728,11 @@ private void connectSender(Queue queue,

AtomicBoolean cancelled = new AtomicBoolean(false);

if (bridgesConnector.getConnectTimeoutMillis() > 0) {
if (getConnectionTimeout() > 0) {
futureTimeout = server.getScheduledPool().schedule(() -> {
cancelled.set(true);
error(ActiveMQAMQPProtocolMessageBundle.BUNDLE.brokerConnectionTimeout(), lastRetryCounter);
}, bridgesConnector.getConnectTimeoutMillis(), TimeUnit.MILLISECONDS);
}, getConnectionTimeout(), TimeUnit.MILLISECONDS);
} else {
futureTimeout = null;
}
Expand Down Expand Up @@ -1059,4 +1085,39 @@ public static boolean isCoreMessageTunnelingEnabled(AMQPMirrorBrokerConnectionEl
return DEFAULT_CORE_MESSAGE_TUNNELING_ENABLED;
}
}

public static class NettyConnectorCloseHandler implements FailureListener, CloseListener {

private final NettyConnector connector;
private final Executor connectionExecutor;

public NettyConnectorCloseHandler(NettyConnector connector, Executor connectionExecutor) {
this.connector = connector;
this.connectionExecutor = connectionExecutor;
}

@Override
public void connectionClosed() {
doCloseConnector();
}

@Override
public void connectionFailed(ActiveMQException exception, boolean failedOver) {
doCloseConnector();
}

@Override
public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
doCloseConnector();
}

private void doCloseConnector() {
connectionExecutor.execute(() -> {
try {
connector.close();
} catch (Exception ex) {
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
Expand All @@ -60,8 +58,6 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon
private final Map<String, AMQPBrokerConnectConfiguration> amqpConnectionsConfig;
private final Map<String, AMQPBrokerConnection> amqpBrokerConnections = new HashMap<>();

private ProtonProtocolManager protonProtocolManager;

public AMQPBrokerConnectionManager(ProtonProtocolManagerFactory factory, List<AMQPBrokerConnectConfiguration> amqpConnectionsConfig, ActiveMQServer server) {
this.amqpConnectionsConfig =
amqpConnectionsConfig.stream()
Expand All @@ -71,10 +67,6 @@ public AMQPBrokerConnectionManager(ProtonProtocolManagerFactory factory, List<AM
this.protonProtocolManagerFactory = factory;
}

public ProtonProtocolManagerFactory getProtocolManagerFactory() {
return protonProtocolManagerFactory;
}

@Override
public void start() throws Exception {
if (!started) {
Expand All @@ -94,14 +86,7 @@ public int getConfiguredConnectionsCount() {
}

private void createBrokerConnection(AMQPBrokerConnectConfiguration configuration, boolean start) throws Exception {
NettyConnectorFactory factory = new NettyConnectorFactory().setServerConnector(true);
protonProtocolManager = (ProtonProtocolManager)protonProtocolManagerFactory.createProtocolManager(server, configuration.getTransportConfigurations().get(0).getExtraParams(), null, null);
NettyConnector bridgesConnector = (NettyConnector)factory.createConnector(configuration.getTransportConfigurations().get(0).getParams(), null, this, server.getExecutorFactory().getExecutor(), server.getThreadPool(), server.getScheduledPool(), new ClientProtocolManagerWithAMQP(protonProtocolManager));
bridgesConnector.start();

logger.debug("Connecting {}", configuration);

AMQPBrokerConnection amqpBrokerConnection = new AMQPBrokerConnection(this, configuration, protonProtocolManager, server, bridgesConnector);
AMQPBrokerConnection amqpBrokerConnection = new AMQPBrokerConnection(this, configuration, protonProtocolManagerFactory, server);
amqpBrokerConnections.put(configuration.getName(), amqpBrokerConnection);
server.registerBrokerConnection(amqpBrokerConnection);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.connect.AMQPBrokerConnection;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
Expand Down Expand Up @@ -164,7 +163,7 @@ public boolean isStarted() {
return started;
}

public AMQPMirrorControllerSource(ProtonProtocolManager protonProtocolManager, Queue snfQueue, ActiveMQServer server, AMQPMirrorBrokerConnectionElement replicaConfig,
public AMQPMirrorControllerSource(ReferenceIDSupplier referenceIdSupplier, Queue snfQueue, ActiveMQServer server, AMQPMirrorBrokerConnectionElement replicaConfig,
AMQPBrokerConnection brokerConnection) {
super(server);
assert snfQueue != null;
Expand All @@ -175,7 +174,7 @@ public AMQPMirrorControllerSource(ProtonProtocolManager protonProtocolManager, Q
snfQueue.setInternalQueue(true); // to avoid redistribution kicking in
}
this.server = server;
this.idSupplier = protonProtocolManager.getReferenceIDSupplier();
this.idSupplier = referenceIdSupplier;
this.addQueues = replicaConfig.isQueueCreation();
this.deleteQueues = replicaConfig.isQueueRemoval();
this.addressFilter = new MirrorAddressFilter(replicaConfig.getAddressFilter());
Expand Down
Loading
Loading