Skip to content

Commit

Permalink
ARTEMIS-3622 MQTT can deadlock on client cxn/discxn
Browse files Browse the repository at this point in the history
This commit fixes the deadlock described on ARTEMIS-3622 by moving the
synchronization "up" a level from the MQTTSession to the
MQTTConnectionManager. It also eliminates the synchronization on the
MQTTSessionState in the MQTTConnectionManager because it's no longer
needed. This change should not only eliminate the deadlock, but improve
performance relatively as well.

There is no test associated with this commit as I wasn't able to
reproduce the deadlock with any kind of straight-forward test. There was
a test linked on the Jira, but it involved intrusive and fragile
scaffolding and wasn't ultimately tenable. That said, I did test this
fix with that test and it was successful. In any case, I think static
analysis should be sufficient here as the changes are pretty
straight-forward.
  • Loading branch information
jbertram authored and gemmellr committed May 21, 2024
1 parent 3474a39 commit 3c058e9
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public MQTTConnectionManager(MQTTSession session) {
session.getConnection().addFailureListener(failureListener);
}

void connect(MqttConnectMessage connect, String validatedUser, String username, String password) throws Exception {
synchronized void connect(MqttConnectMessage connect, String validatedUser, String username, String password) throws Exception {
if (session.getVersion() == MQTTVersion.MQTT_5) {
session.getConnection().setProtocolVersion(Byte.toString(MqttVersion.MQTT_5.protocolLevel()));
String authenticationMethod = MQTTUtil.getProperty(String.class, connect.variableHeader().properties(), AUTHENTICATION_METHOD);
Expand All @@ -68,67 +68,65 @@ void connect(MqttConnectMessage connect, String validatedUser, String username,

String clientId = session.getConnection().getClientID();
boolean sessionPresent = session.getStateManager().getSessionStates().containsKey(clientId);
MQTTSessionState sessionState = getSessionState(clientId);
synchronized (sessionState) {
session.setSessionState(sessionState);
sessionState.setFailed(false);
ServerSessionImpl serverSession = createServerSession(username, password, validatedUser);
serverSession.start();
ServerSessionImpl internalServerSession = createServerSession(username, password, validatedUser);
internalServerSession.disableSecurity();
internalServerSession.start();
session.setServerSession(serverSession, internalServerSession);

if (cleanStart) {
/* [MQTT-3.1.2-6] If CleanSession is set to 1, the Client and Server MUST discard any previous Session and
* start a new one. This Session lasts as long as the Network Connection. State data associated with this Session
* MUST NOT be reused in any subsequent Session */
session.clean(true);
session.setClean(true);
}
MQTTSessionState sessionState = session.getStateManager().getSessionState(clientId);
session.setSessionState(sessionState);
sessionState.setFailed(false);
ServerSessionImpl serverSession = createServerSession(username, password, validatedUser);
serverSession.start();
ServerSessionImpl internalServerSession = createServerSession(username, password, validatedUser);
internalServerSession.disableSecurity();
internalServerSession.start();
session.setServerSession(serverSession, internalServerSession);

if (cleanStart) {
/* [MQTT-3.1.2-6] If CleanSession is set to 1, the Client and Server MUST discard any previous Session and
* start a new one. This Session lasts as long as the Network Connection. State data associated with this Session
* MUST NOT be reused in any subsequent Session */
session.clean(true);
session.setClean(true);
}

if (connect.variableHeader().isWillFlag()) {
session.getState().setWill(true);
byte[] willMessage = connect.payload().willMessageInBytes();
session.getState().setWillMessage(ByteBufAllocator.DEFAULT.buffer(willMessage.length).writeBytes(willMessage));
session.getState().setWillQoSLevel(connect.variableHeader().willQos());
session.getState().setWillRetain(connect.variableHeader().isWillRetain());
session.getState().setWillTopic(connect.payload().willTopic());

if (session.getVersion() == MQTTVersion.MQTT_5) {
MqttProperties willProperties = connect.payload().willProperties();
if (willProperties != null) {
MqttProperties.MqttProperty willDelayInterval = willProperties.getProperty(WILL_DELAY_INTERVAL.value());
if (willDelayInterval != null) {
session.getState().setWillDelayInterval(( int) willDelayInterval.value());
}
List<? extends MqttProperties.MqttProperty> userProperties = willProperties.getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value());
if (userProperties != null) {
session.getState().setWillUserProperties(userProperties);
}
if (connect.variableHeader().isWillFlag()) {
session.getState().setWill(true);
byte[] willMessage = connect.payload().willMessageInBytes();
session.getState().setWillMessage(ByteBufAllocator.DEFAULT.buffer(willMessage.length).writeBytes(willMessage));
session.getState().setWillQoSLevel(connect.variableHeader().willQos());
session.getState().setWillRetain(connect.variableHeader().isWillRetain());
session.getState().setWillTopic(connect.payload().willTopic());

if (session.getVersion() == MQTTVersion.MQTT_5) {
MqttProperties willProperties = connect.payload().willProperties();
if (willProperties != null) {
MqttProperties.MqttProperty willDelayInterval = willProperties.getProperty(WILL_DELAY_INTERVAL.value());
if (willDelayInterval != null) {
session.getState().setWillDelayInterval(( int) willDelayInterval.value());
}
List<? extends MqttProperties.MqttProperty> userProperties = willProperties.getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value());
if (userProperties != null) {
session.getState().setWillUserProperties(userProperties);
}
}
}
}

MqttProperties connackProperties;
if (session.getVersion() == MQTTVersion.MQTT_5) {
session.getConnection().setReceiveMaximum(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), RECEIVE_MAXIMUM, -1));

sessionState.setClientSessionExpiryInterval(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), SESSION_EXPIRY_INTERVAL, 0));
sessionState.setClientMaxPacketSize(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), MAXIMUM_PACKET_SIZE, 0));
sessionState.setClientTopicAliasMaximum(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), TOPIC_ALIAS_MAXIMUM));
MqttProperties connackProperties;
if (session.getVersion() == MQTTVersion.MQTT_5) {
session.getConnection().setReceiveMaximum(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), RECEIVE_MAXIMUM, -1));

connackProperties = getConnackProperties();
} else {
sessionState.setClientSessionExpiryInterval(session.getProtocolManager().getDefaultMqttSessionExpiryInterval());
connackProperties = MqttProperties.NO_PROPERTIES;
}
sessionState.setClientSessionExpiryInterval(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), SESSION_EXPIRY_INTERVAL, 0));
sessionState.setClientMaxPacketSize(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), MAXIMUM_PACKET_SIZE, 0));
sessionState.setClientTopicAliasMaximum(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), TOPIC_ALIAS_MAXIMUM));

session.getConnection().setConnected(true);
session.getProtocolHandler().sendConnack(MQTTReasonCodes.SUCCESS, sessionPresent && !cleanStart, connackProperties);
// ensure we don't publish before the CONNACK
session.start();
connackProperties = getConnackProperties();
} else {
sessionState.setClientSessionExpiryInterval(session.getProtocolManager().getDefaultMqttSessionExpiryInterval());
connackProperties = MqttProperties.NO_PROPERTIES;
}

session.getConnection().setConnected(true);
session.getProtocolHandler().sendConnack(MQTTReasonCodes.SUCCESS, sessionPresent && !cleanStart, connackProperties);
// ensure we don't publish before the CONNACK
session.start();
}

private MqttProperties getConnackProperties() {
Expand Down Expand Up @@ -176,33 +174,27 @@ ServerSessionImpl createServerSession(String username, String password, String v
return (ServerSessionImpl) serverSession;
}

void disconnect(boolean failure) {
synchronized void disconnect(boolean failure) {
if (session == null || session.getStopped()) {
return;
}

synchronized (session.getState()) {
try {
session.stop(failure);
session.getConnection().destroy();
} catch (Exception e) {
MQTTLogger.LOGGER.errorDisconnectingClient(e);
} finally {
if (session.getState() != null) {
String clientId = session.getState().getClientId();
/**
* ensure that the connection for the client ID matches *this* connection otherwise we could remove the
* entry for the client who "stole" this client ID via [MQTT-3.1.4-2]
*/
if (clientId != null && session.getStateManager().isClientConnected(clientId, session.getConnection())) {
session.getStateManager().removeConnectedClient(clientId);
}
try {
session.stop(failure);
session.getConnection().destroy();
} catch (Exception e) {
MQTTLogger.LOGGER.errorDisconnectingClient(e);
} finally {
if (session.getState() != null) {
String clientId = session.getState().getClientId();
/**
* ensure that the connection for the client ID matches *this* connection otherwise we could remove the
* entry for the client who "stole" this client ID via [MQTT-3.1.4-2]
*/
if (clientId != null && session.getStateManager().isClientConnected(clientId, session.getConnection())) {
session.getStateManager().removeConnectedClient(clientId);
}
}
}
}

private synchronized MQTTSessionState getSessionState(String clientId) throws Exception {
return session.getStateManager().getSessionState(clientId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,21 @@ public MQTTSession(MQTTProtocolHandler protocolHandler,
logger.debug("MQTT session created: {}", id);
}

// Called after the client has Connected.
synchronized void start() throws Exception {
/*
* This method is only called by MQTTConnectionManager.connect
* which is synchronized with MQTTConnectionManager.disconnect
*/
void start() throws Exception {
mqttPublishManager.start();
subscriptionManager.start();
stopped = false;
}

synchronized void stop(boolean failure) throws Exception {
/*
* This method is only called by MQTTConnectionManager.disconnect
* which is synchronized with MQTTConnectionManager.connect
*/
void stop(boolean failure) throws Exception {
state.setFailed(failure);

if (!stopped) {
Expand Down

0 comments on commit 3c058e9

Please sign in to comment.