Skip to content

Commit

Permalink
ARTEMIS-4744 Fully support multple host broker connections URIs
Browse files Browse the repository at this point in the history
Create a new NettyConnector for each connection attempt that is configured from
distinct broker connection URIs which allows for differing TLS configuration
per remote connection configuration.
  • Loading branch information
tabish121 authored and gemmellr committed Apr 25, 2024
1 parent 934fe24 commit ee7a2c0
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,11 @@
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.remoting.CertificateUtil;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
Expand All @@ -65,10 +68,13 @@
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
import org.apache.activemq.artemis.protocol.amqp.connect.AMQPBrokerConnectionManager.ClientProtocolManagerWithAMQP;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationPolicySupport;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationSource;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerAggregation;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.ReferenceIDSupplier;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPLargeMessageWriter;
Expand Down Expand Up @@ -120,10 +126,13 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
*/
public static final boolean DEFAULT_CORE_MESSAGE_TUNNELING_ENABLED = true;

private static final NettyConnectorFactory CONNECTOR_FACTORY = new NettyConnectorFactory().setServerConnector(true);

private final ProtonProtocolManagerFactory protonProtocolManagerFactory;
private final ReferenceIDSupplier referenceIdSupplier;
private final AMQPBrokerConnectConfiguration brokerConnectConfiguration;
private final ProtonProtocolManager protonProtocolManager;
private final ActiveMQServer server;
private final NettyConnector bridgesConnector;
private final List<TransportConfiguration> configurations;
private NettyConnection connection;
private Session session;
private AMQPSessionContext sessionContext;
Expand All @@ -134,6 +143,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
private AMQPFederationSource brokerFederation;
private int retryCounter = 0;
private int lastRetryCounter;
private int connectionTimeout;
private boolean connecting = false;
private volatile ScheduledFuture<?> reconnectFuture;
private final Set<Queue> senders = new HashSet<>();
Expand All @@ -153,16 +163,16 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,

public AMQPBrokerConnection(AMQPBrokerConnectionManager bridgeManager,
AMQPBrokerConnectConfiguration brokerConnectConfiguration,
ProtonProtocolManager protonProtocolManager,
ActiveMQServer server,
NettyConnector bridgesConnector) {
ProtonProtocolManagerFactory protonProtocolManagerFactory,
ActiveMQServer server) throws Exception {
this.bridgeManager = bridgeManager;
this.brokerConnectConfiguration = brokerConnectConfiguration;
this.protonProtocolManager = protonProtocolManager;
this.server = server;
this.bridgesConnector = bridgesConnector;
connectExecutor = server.getExecutorFactory().getExecutor();
scheduledExecutorService = server.getScheduledPool();
this.configurations = brokerConnectConfiguration.getTransportConfigurations();
this.connectExecutor = server.getExecutorFactory().getExecutor();
this.scheduledExecutorService = server.getScheduledPool();
this.protonProtocolManagerFactory = protonProtocolManagerFactory;
this.referenceIdSupplier = new ReferenceIDSupplier(server);
}

@Override
Expand Down Expand Up @@ -190,7 +200,7 @@ public boolean isConnecting() {
}

public int getConnectionTimeout() {
return bridgesConnector.getConnectTimeoutMillis();
return connectionTimeout;
}

@Override
Expand Down Expand Up @@ -340,19 +350,32 @@ private void doConnect() {
try {
connecting = true;

List<TransportConfiguration> configurationList = brokerConnectConfiguration.getTransportConfigurations();
TransportConfiguration configuration = configurations.get(retryCounter % configurations.size());
host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration.getParams());
port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, configuration.getParams());

TransportConfiguration tpConfig = configurationList.get(retryCounter % configurationList.size());
ProtonProtocolManager protonProtocolManager =
(ProtonProtocolManager)protonProtocolManagerFactory.createProtocolManager(server, configuration.getExtraParams(), null, null);
NettyConnector connector = (NettyConnector)CONNECTOR_FACTORY.createConnector(
configuration.getParams(), null, this, server.getExecutorFactory().getExecutor(), server.getThreadPool(), server.getScheduledPool(), new ClientProtocolManagerWithAMQP(protonProtocolManager));
connector.start();

String hostOnParameter = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, tpConfig.getParams());
int portOnParameter = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, tpConfig.getParams());
this.host = hostOnParameter;
this.port = portOnParameter;
connection = bridgesConnector.createConnection(null, hostOnParameter, portOnParameter);
logger.debug("Connecting {}", configuration);

if (connection == null) {
retryConnection();
return;
connectionTimeout = connector.getConnectTimeoutMillis();
try {
connection = (NettyConnection) connector.createConnection();
if (connection == null) {
retryConnection();
return;
}
} finally {
if (connection == null) {
try {
connector.close();
} catch (Exception ex) {
}
}
}

lastRetryCounter = retryCounter;
Expand All @@ -368,12 +391,15 @@ private void doConnect() {

ClientSASLFactory saslFactory = new SaslFactory(connection, brokerConnectConfiguration);

NettyConnectorCloseHandler connectorCloseHandler = new NettyConnectorCloseHandler(connector, connectExecutor);
ConnectionEntry entry = protonProtocolManager.createOutgoingConnectionEntry(connection, saslFactory);
server.getRemotingService().addConnectionEntry(connection, entry);
protonRemotingConnection = (ActiveMQProtonRemotingConnection) entry.connection;
protonRemotingConnection.getAmqpConnection().addLinkRemoteCloseListener(getName(), this::linkClosed);
protonRemotingConnection.addCloseListener(connectorCloseHandler);
protonRemotingConnection.addFailureListener(connectorCloseHandler);

connection.getChannel().pipeline().addLast(new AMQPBrokerConnectionChannelHandler(bridgesConnector.getChannelGroup(), protonRemotingConnection.getAmqpConnection().getHandler(), this, server.getExecutorFactory().getExecutor()));
connection.getChannel().pipeline().addLast(new AMQPBrokerConnectionChannelHandler(connector.getChannelGroup(), protonRemotingConnection.getAmqpConnection().getHandler(), this, server.getExecutorFactory().getExecutor()));

session = protonRemotingConnection.getAmqpConnection().getHandler().getConnection().session();
sessionContext = protonRemotingConnection.getAmqpConnection().getSessionExtension(session);
Expand Down Expand Up @@ -531,7 +557,7 @@ private Queue installMirrorController(AMQPMirrorBrokerConnectionElement replicaC
throw new IllegalAccessException("Cannot start replica");
}

AMQPMirrorControllerSource newPartition = new AMQPMirrorControllerSource(protonProtocolManager, snfQueue, server, replicaConfig, this);
AMQPMirrorControllerSource newPartition = new AMQPMirrorControllerSource(referenceIdSupplier, snfQueue, server, replicaConfig, this);

this.mirrorControllerSource = newPartition;

Expand Down Expand Up @@ -702,11 +728,11 @@ private void connectSender(Queue queue,

AtomicBoolean cancelled = new AtomicBoolean(false);

if (bridgesConnector.getConnectTimeoutMillis() > 0) {
if (getConnectionTimeout() > 0) {
futureTimeout = server.getScheduledPool().schedule(() -> {
cancelled.set(true);
error(ActiveMQAMQPProtocolMessageBundle.BUNDLE.brokerConnectionTimeout(), lastRetryCounter);
}, bridgesConnector.getConnectTimeoutMillis(), TimeUnit.MILLISECONDS);
}, getConnectionTimeout(), TimeUnit.MILLISECONDS);
} else {
futureTimeout = null;
}
Expand Down Expand Up @@ -1059,4 +1085,39 @@ public static boolean isCoreMessageTunnelingEnabled(AMQPMirrorBrokerConnectionEl
return DEFAULT_CORE_MESSAGE_TUNNELING_ENABLED;
}
}

public static class NettyConnectorCloseHandler implements FailureListener, CloseListener {

private final NettyConnector connector;
private final Executor connectionExecutor;

public NettyConnectorCloseHandler(NettyConnector connector, Executor connectionExecutor) {
this.connector = connector;
this.connectionExecutor = connectionExecutor;
}

@Override
public void connectionClosed() {
doCloseConnector();
}

@Override
public void connectionFailed(ActiveMQException exception, boolean failedOver) {
doCloseConnector();
}

@Override
public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
doCloseConnector();
}

private void doCloseConnector() {
connectionExecutor.execute(() -> {
try {
connector.close();
} catch (Exception ex) {
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
Expand All @@ -60,8 +58,6 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon
private final Map<String, AMQPBrokerConnectConfiguration> amqpConnectionsConfig;
private final Map<String, AMQPBrokerConnection> amqpBrokerConnections = new HashMap<>();

private ProtonProtocolManager protonProtocolManager;

public AMQPBrokerConnectionManager(ProtonProtocolManagerFactory factory, List<AMQPBrokerConnectConfiguration> amqpConnectionsConfig, ActiveMQServer server) {
this.amqpConnectionsConfig =
amqpConnectionsConfig.stream()
Expand All @@ -71,10 +67,6 @@ public AMQPBrokerConnectionManager(ProtonProtocolManagerFactory factory, List<AM
this.protonProtocolManagerFactory = factory;
}

public ProtonProtocolManagerFactory getProtocolManagerFactory() {
return protonProtocolManagerFactory;
}

@Override
public void start() throws Exception {
if (!started) {
Expand All @@ -94,14 +86,7 @@ public int getConfiguredConnectionsCount() {
}

private void createBrokerConnection(AMQPBrokerConnectConfiguration configuration, boolean start) throws Exception {
NettyConnectorFactory factory = new NettyConnectorFactory().setServerConnector(true);
protonProtocolManager = (ProtonProtocolManager)protonProtocolManagerFactory.createProtocolManager(server, configuration.getTransportConfigurations().get(0).getExtraParams(), null, null);
NettyConnector bridgesConnector = (NettyConnector)factory.createConnector(configuration.getTransportConfigurations().get(0).getParams(), null, this, server.getExecutorFactory().getExecutor(), server.getThreadPool(), server.getScheduledPool(), new ClientProtocolManagerWithAMQP(protonProtocolManager));
bridgesConnector.start();

logger.debug("Connecting {}", configuration);

AMQPBrokerConnection amqpBrokerConnection = new AMQPBrokerConnection(this, configuration, protonProtocolManager, server, bridgesConnector);
AMQPBrokerConnection amqpBrokerConnection = new AMQPBrokerConnection(this, configuration, protonProtocolManagerFactory, server);
amqpBrokerConnections.put(configuration.getName(), amqpBrokerConnection);
server.registerBrokerConnection(amqpBrokerConnection);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.connect.AMQPBrokerConnection;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
Expand Down Expand Up @@ -164,7 +163,7 @@ public boolean isStarted() {
return started;
}

public AMQPMirrorControllerSource(ProtonProtocolManager protonProtocolManager, Queue snfQueue, ActiveMQServer server, AMQPMirrorBrokerConnectionElement replicaConfig,
public AMQPMirrorControllerSource(ReferenceIDSupplier referenceIdSupplier, Queue snfQueue, ActiveMQServer server, AMQPMirrorBrokerConnectionElement replicaConfig,
AMQPBrokerConnection brokerConnection) {
super(server);
assert snfQueue != null;
Expand All @@ -175,7 +174,7 @@ public AMQPMirrorControllerSource(ProtonProtocolManager protonProtocolManager, Q
snfQueue.setInternalQueue(true); // to avoid redistribution kicking in
}
this.server = server;
this.idSupplier = protonProtocolManager.getReferenceIDSupplier();
this.idSupplier = referenceIdSupplier;
this.addQueues = replicaConfig.isQueueCreation();
this.deleteQueues = replicaConfig.isQueueRemoval();
this.addressFilter = new MirrorAddressFilter(replicaConfig.getAddressFilter());
Expand Down
Loading

0 comments on commit ee7a2c0

Please sign in to comment.