Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARTEMIS-3622 MQTT can deadlock on client cxn/discxn #4909

Merged
merged 1 commit into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public MQTTConnectionManager(MQTTSession session) {
session.getConnection().addFailureListener(failureListener);
}

void connect(MqttConnectMessage connect, String validatedUser, String username, String password) throws Exception {
synchronized void connect(MqttConnectMessage connect, String validatedUser, String username, String password) throws Exception {
if (session.getVersion() == MQTTVersion.MQTT_5) {
session.getConnection().setProtocolVersion(Byte.toString(MqttVersion.MQTT_5.protocolLevel()));
String authenticationMethod = MQTTUtil.getProperty(String.class, connect.variableHeader().properties(), AUTHENTICATION_METHOD);
Expand All @@ -68,67 +68,65 @@ void connect(MqttConnectMessage connect, String validatedUser, String username,

String clientId = session.getConnection().getClientID();
boolean sessionPresent = session.getStateManager().getSessionStates().containsKey(clientId);
MQTTSessionState sessionState = getSessionState(clientId);
synchronized (sessionState) {
session.setSessionState(sessionState);
sessionState.setFailed(false);
ServerSessionImpl serverSession = createServerSession(username, password, validatedUser);
serverSession.start();
ServerSessionImpl internalServerSession = createServerSession(username, password, validatedUser);
internalServerSession.disableSecurity();
internalServerSession.start();
session.setServerSession(serverSession, internalServerSession);

if (cleanStart) {
/* [MQTT-3.1.2-6] If CleanSession is set to 1, the Client and Server MUST discard any previous Session and
* start a new one. This Session lasts as long as the Network Connection. State data associated with this Session
* MUST NOT be reused in any subsequent Session */
session.clean(true);
session.setClean(true);
}
MQTTSessionState sessionState = session.getStateManager().getSessionState(clientId);
session.setSessionState(sessionState);
sessionState.setFailed(false);
ServerSessionImpl serverSession = createServerSession(username, password, validatedUser);
serverSession.start();
ServerSessionImpl internalServerSession = createServerSession(username, password, validatedUser);
internalServerSession.disableSecurity();
internalServerSession.start();
session.setServerSession(serverSession, internalServerSession);

if (cleanStart) {
/* [MQTT-3.1.2-6] If CleanSession is set to 1, the Client and Server MUST discard any previous Session and
* start a new one. This Session lasts as long as the Network Connection. State data associated with this Session
* MUST NOT be reused in any subsequent Session */
session.clean(true);
session.setClean(true);
}

if (connect.variableHeader().isWillFlag()) {
session.getState().setWill(true);
byte[] willMessage = connect.payload().willMessageInBytes();
session.getState().setWillMessage(ByteBufAllocator.DEFAULT.buffer(willMessage.length).writeBytes(willMessage));
session.getState().setWillQoSLevel(connect.variableHeader().willQos());
session.getState().setWillRetain(connect.variableHeader().isWillRetain());
session.getState().setWillTopic(connect.payload().willTopic());

if (session.getVersion() == MQTTVersion.MQTT_5) {
MqttProperties willProperties = connect.payload().willProperties();
if (willProperties != null) {
MqttProperties.MqttProperty willDelayInterval = willProperties.getProperty(WILL_DELAY_INTERVAL.value());
if (willDelayInterval != null) {
session.getState().setWillDelayInterval(( int) willDelayInterval.value());
}
List<? extends MqttProperties.MqttProperty> userProperties = willProperties.getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value());
if (userProperties != null) {
session.getState().setWillUserProperties(userProperties);
}
if (connect.variableHeader().isWillFlag()) {
session.getState().setWill(true);
byte[] willMessage = connect.payload().willMessageInBytes();
session.getState().setWillMessage(ByteBufAllocator.DEFAULT.buffer(willMessage.length).writeBytes(willMessage));
session.getState().setWillQoSLevel(connect.variableHeader().willQos());
session.getState().setWillRetain(connect.variableHeader().isWillRetain());
session.getState().setWillTopic(connect.payload().willTopic());

if (session.getVersion() == MQTTVersion.MQTT_5) {
MqttProperties willProperties = connect.payload().willProperties();
if (willProperties != null) {
MqttProperties.MqttProperty willDelayInterval = willProperties.getProperty(WILL_DELAY_INTERVAL.value());
if (willDelayInterval != null) {
session.getState().setWillDelayInterval(( int) willDelayInterval.value());
}
List<? extends MqttProperties.MqttProperty> userProperties = willProperties.getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value());
if (userProperties != null) {
session.getState().setWillUserProperties(userProperties);
}
}
}
}

MqttProperties connackProperties;
if (session.getVersion() == MQTTVersion.MQTT_5) {
session.getConnection().setReceiveMaximum(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), RECEIVE_MAXIMUM, -1));

sessionState.setClientSessionExpiryInterval(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), SESSION_EXPIRY_INTERVAL, 0));
sessionState.setClientMaxPacketSize(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), MAXIMUM_PACKET_SIZE, 0));
sessionState.setClientTopicAliasMaximum(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), TOPIC_ALIAS_MAXIMUM));
MqttProperties connackProperties;
if (session.getVersion() == MQTTVersion.MQTT_5) {
session.getConnection().setReceiveMaximum(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), RECEIVE_MAXIMUM, -1));

connackProperties = getConnackProperties();
} else {
sessionState.setClientSessionExpiryInterval(session.getProtocolManager().getDefaultMqttSessionExpiryInterval());
connackProperties = MqttProperties.NO_PROPERTIES;
}
sessionState.setClientSessionExpiryInterval(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), SESSION_EXPIRY_INTERVAL, 0));
sessionState.setClientMaxPacketSize(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), MAXIMUM_PACKET_SIZE, 0));
sessionState.setClientTopicAliasMaximum(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), TOPIC_ALIAS_MAXIMUM));

session.getConnection().setConnected(true);
session.getProtocolHandler().sendConnack(MQTTReasonCodes.SUCCESS, sessionPresent && !cleanStart, connackProperties);
// ensure we don't publish before the CONNACK
session.start();
connackProperties = getConnackProperties();
} else {
sessionState.setClientSessionExpiryInterval(session.getProtocolManager().getDefaultMqttSessionExpiryInterval());
connackProperties = MqttProperties.NO_PROPERTIES;
}

session.getConnection().setConnected(true);
session.getProtocolHandler().sendConnack(MQTTReasonCodes.SUCCESS, sessionPresent && !cleanStart, connackProperties);
// ensure we don't publish before the CONNACK
session.start();
}

private MqttProperties getConnackProperties() {
Expand Down Expand Up @@ -176,33 +174,27 @@ ServerSessionImpl createServerSession(String username, String password, String v
return (ServerSessionImpl) serverSession;
}

void disconnect(boolean failure) {
synchronized void disconnect(boolean failure) {
if (session == null || session.getStopped()) {
return;
}

synchronized (session.getState()) {
try {
session.stop(failure);
session.getConnection().destroy();
} catch (Exception e) {
MQTTLogger.LOGGER.errorDisconnectingClient(e);
} finally {
if (session.getState() != null) {
String clientId = session.getState().getClientId();
/**
* ensure that the connection for the client ID matches *this* connection otherwise we could remove the
* entry for the client who "stole" this client ID via [MQTT-3.1.4-2]
*/
if (clientId != null && session.getStateManager().isClientConnected(clientId, session.getConnection())) {
session.getStateManager().removeConnectedClient(clientId);
}
try {
session.stop(failure);
session.getConnection().destroy();
} catch (Exception e) {
MQTTLogger.LOGGER.errorDisconnectingClient(e);
} finally {
if (session.getState() != null) {
String clientId = session.getState().getClientId();
/**
* ensure that the connection for the client ID matches *this* connection otherwise we could remove the
* entry for the client who "stole" this client ID via [MQTT-3.1.4-2]
*/
if (clientId != null && session.getStateManager().isClientConnected(clientId, session.getConnection())) {
session.getStateManager().removeConnectedClient(clientId);
}
}
}
}

private synchronized MQTTSessionState getSessionState(String clientId) throws Exception {
return session.getStateManager().getSessionState(clientId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,21 @@ public MQTTSession(MQTTProtocolHandler protocolHandler,
logger.debug("MQTT session created: {}", id);
}

// Called after the client has Connected.
synchronized void start() throws Exception {
/*
* This method is only called by MQTTConnectionManager.connect
* which is synchronized with MQTTConnectionManager.disconnect
*/
void start() throws Exception {
mqttPublishManager.start();
subscriptionManager.start();
stopped = false;
}

synchronized void stop(boolean failure) throws Exception {
/*
* This method is only called by MQTTConnectionManager.disconnect
* which is synchronized with MQTTConnectionManager.connect
*/
void stop(boolean failure) throws Exception {
state.setFailed(failure);

if (!stopped) {
Expand Down
Loading