Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2955] MQTT 5 support #3654

Merged
merged 1 commit into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@

import io.micrometer.core.instrument.Timer.Sample;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.opentracing.Span;
import io.opentracing.SpanContext;
Expand All @@ -106,6 +107,9 @@
import io.vertx.mqtt.messages.MqttPublishMessage;
import io.vertx.mqtt.messages.MqttSubscribeMessage;
import io.vertx.mqtt.messages.MqttUnsubscribeMessage;
import io.vertx.mqtt.messages.codes.MqttPubAckReasonCode;
import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode;
import io.vertx.mqtt.messages.codes.MqttUnsubAckReasonCode;

/**
* A base class for implementing Vert.x based Hono protocol adapters for publishing events & telemetry data using
Expand Down Expand Up @@ -885,7 +889,7 @@ public final Future<Void> uploadCommandResponseMessage(final MqttContext ctx) {
// check that the remote MQTT client is still connected before sending PUBACK
if (ctx.isAtLeastOnce() && ctx.deviceEndpoint().isConnected()) {
currentSpan.log(EVENT_SENDING_PUBACK);
ctx.acknowledge();
ctx.acknowledge(MqttPubAckReasonCode.SUCCESS);
}
currentSpan.finish();
return Future.<Void> succeededFuture();
Expand Down Expand Up @@ -968,7 +972,7 @@ private Future<Void> uploadMessage(
// check that the remote MQTT client is still connected before sending PUBACK
if (ctx.isAtLeastOnce() && ctx.deviceEndpoint().isConnected()) {
currentSpan.log(EVENT_SENDING_PUBACK);
ctx.acknowledge();
ctx.acknowledge(MqttPubAckReasonCode.SUCCESS);
}
currentSpan.finish();
return ok;
Expand Down Expand Up @@ -1266,7 +1270,7 @@ private void handlePublishedMessageError(final MqttContext context, final Throwa
span.log("skipped sending PUBACK");
} else if (context.deviceEndpoint().isConnected()) {
span.log(EVENT_SENDING_PUBACK);
context.acknowledge();
context.acknowledge(MqttPubAckReasonCode.UNSPECIFIED_ERROR);
}
}
span.finish();
Expand Down Expand Up @@ -1415,11 +1419,11 @@ protected final void onSubscribe(final MqttSubscribeMessage subscribeMsg) {
Future.join(new ArrayList<>(subscriptionOutcomes)).onComplete(v -> {

if (endpoint.isConnected()) {
// return a status code for each topic filter contained in the SUBSCRIBE packet
final List<MqttQoS> grantedQosLevels = subscriptionOutcomes.stream()
.map(future -> future.failed() ? MqttQoS.FAILURE : future.result().getQos())
// return a reason code for each topic filter contained in the SUBSCRIBE packet
final List<MqttSubAckReasonCode> reasonCodes = subscriptionOutcomes.stream()
.map(future -> future.failed() ? MqttSubAckReasonCode.UNSPECIFIED_ERROR : MqttSubAckReasonCode.qosGranted(future.result().getQos()))
.collect(Collectors.toList());
endpoint.subscribeAcknowledge(subscribeMsg.messageId(), grantedQosLevels);
endpoint.subscribeAcknowledge(subscribeMsg.messageId(), reasonCodes, MqttProperties.NO_PROPERTIES);
} else {
TracingHelper.logError(span, "skipped sending command subscription notification - endpoint not connected anymore");
log.debug("skipped sending command subscription notification - endpoint not connected anymore [tenant-id: {}, device-id: {}]",
Expand Down Expand Up @@ -1785,6 +1789,7 @@ protected final void onUnsubscribe(final MqttUnsubscribeMessage unsubscribeMsg)
}
final Span span = newSpan("UNSUBSCRIBE");

final List<MqttUnsubAckReasonCode> reasonCodes = new ArrayList<>();
final List<Future<Void>> removalDoneFutures = new ArrayList<>(unsubscribeMsg.topics().size());
unsubscribeMsg.topics().forEach(topic -> {

Expand All @@ -1810,13 +1815,15 @@ protected final void onUnsubscribe(final MqttUnsubscribeMessage unsubscribeMsg)
if (removedSubscription.get() != null) {
log.debug("removed subscription with topic [{}] for device [tenant-id: {}, device-id: {}]",
topic, removedSubscription.get().getTenant(), removedSubscription.get().getDeviceId());
reasonCodes.add(MqttUnsubAckReasonCode.SUCCESS);
} else {
TracingHelper.logError(span, String.format("no subscription found for topic filter [%s]", topic));
log.debug("cannot unsubscribe - no subscription found for topic filter [{}]", topic);
reasonCodes.add(MqttUnsubAckReasonCode.UNSPECIFIED_ERROR);
}
});
if (endpoint.isConnected()) {
endpoint.unsubscribeAcknowledge(unsubscribeMsg.messageId());
endpoint.unsubscribeAcknowledge(unsubscribeMsg.messageId(), reasonCodes, MqttProperties.NO_PROPERTIES);
}
Future.join(removalDoneFutures).onComplete(r -> span.finish());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import org.eclipse.hono.util.Strings;

import io.micrometer.core.instrument.Timer.Sample;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.opentracing.Span;
import io.vertx.core.buffer.Buffer;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.messages.MqttPublishMessage;
import io.vertx.mqtt.messages.codes.MqttPubAckReasonCode;

/**
* A dictionary of relevant information required during the
Expand Down Expand Up @@ -162,8 +164,14 @@ public static MqttContext fromPublishPacket(
result.propertyBag = bag;
result.topic = bag.topicWithoutPropertyBag();
result.endpoint = MetricsTags.EndpointType.fromString(result.topic.getEndpoint());
// set the content-type using the corresponding value from the property bag
result.contentType = bag.getProperty(MessageHelper.SYS_PROPERTY_CONTENT_TYPE);
// 1. set the content-type using the corresponding value from the property bag
// 2. set the content-type using the corresponding value from the mqtt message properties (MQTT5)
result.contentType = Optional.ofNullable(bag.getProperty(MessageHelper.SYS_PROPERTY_CONTENT_TYPE))
.orElse(Optional.ofNullable(publishedMessage.properties())
.map(properties -> properties.getProperty(MqttProperties.MqttPropertyType.CONTENT_TYPE.value()))
.map(MqttProperties.MqttProperty::value)
.map(Object::toString)
.orElse(null));
if (result.endpoint == EndpointType.EVENT) {
result.timeToLive = determineTimeToLive(bag);
}
Expand Down Expand Up @@ -423,10 +431,12 @@ public boolean isAtLeastOnce() {

/**
* Sends a PUBACK for the message to the device.
*
* @param reasonCode Mqtt Publish Acknowledge reason code.
*/
public void acknowledge() {
public void acknowledge(final MqttPubAckReasonCode reasonCode) {
if (message != null && deviceEndpoint != null && isAtLeastOnce()) {
deviceEndpoint.publishAcknowledge(message.messageId());
deviceEndpoint.publishAcknowledge(message.messageId(), reasonCode, MqttProperties.NO_PROPERTIES);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.mockito.ArgumentCaptor;

import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.opentracing.Span;
import io.opentracing.SpanContext;
Expand All @@ -97,6 +98,8 @@
import io.vertx.mqtt.MqttTopicSubscription;
import io.vertx.mqtt.messages.MqttPublishMessage;
import io.vertx.mqtt.messages.MqttSubscribeMessage;
import io.vertx.mqtt.messages.codes.MqttPubAckReasonCode;
import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode;

/**
* Verifies behavior of {@link AbstractVertxBasedMqttProtocolAdapter}.
Expand Down Expand Up @@ -737,7 +740,7 @@ private void testUploadQoS1MessageSendsPubAckOnSuccess(
upload.accept(adapter, context);

// THEN the device does not receive a PUBACK
verify(endpoint, never()).publishAcknowledge(anyInt());
verify(endpoint, never()).publishAcknowledge(anyInt(), any(MqttPubAckReasonCode.class), any(MqttProperties.class));
// and the message has not been reported as forwarded
verify(metrics, never()).reportTelemetry(
any(MetricsTags.EndpointType.class),
Expand All @@ -750,7 +753,7 @@ private void testUploadQoS1MessageSendsPubAckOnSuccess(

// until the message has been settled and accepted
outcome.complete();
verify(endpoint).publishAcknowledge(5555555);
verify(endpoint).publishAcknowledge(5555555, MqttPubAckReasonCode.SUCCESS, MqttProperties.NO_PROPERTIES);
verify(metrics).reportTelemetry(
eq(type),
eq("my-tenant"),
Expand Down Expand Up @@ -883,7 +886,7 @@ public void testUploadTelemetryMessageIncludesRetainAnnotation(final VertxTestCo

ctx.verify(() -> {
// THEN the device has received a PUBACK
verify(endpoint).publishAcknowledge(5555555);
verify(endpoint).publishAcknowledge(5555555, MqttPubAckReasonCode.SUCCESS, MqttProperties.NO_PROPERTIES);
// and the message has been sent downstream
// including the "retain" annotation
verify(telemetrySender).sendTelemetry(
Expand Down Expand Up @@ -1148,16 +1151,16 @@ public void testOnSubscribeIncludesStatusCodeForEachFilter() {

// THEN the adapter sends a SUBACK packet to the device
// which contains a failure status code for each unsupported filter
final ArgumentCaptor<List<MqttQoS>> codeCaptor = ArgumentCaptor.forClass(List.class);
verify(endpoint).subscribeAcknowledge(eq(15), codeCaptor.capture());
final ArgumentCaptor<List<MqttSubAckReasonCode>> codeCaptor = ArgumentCaptor.forClass(List.class);
verify(endpoint).subscribeAcknowledge(eq(15), codeCaptor.capture(), eq(MqttProperties.NO_PROPERTIES));
assertThat(codeCaptor.getValue()).hasSize(subscriptions.size());
assertThat(codeCaptor.getValue().get(0)).isEqualTo(MqttQoS.FAILURE);
assertThat(codeCaptor.getValue().get(1)).isEqualTo(MqttQoS.FAILURE);
assertThat(codeCaptor.getValue().get(2)).isEqualTo(MqttQoS.FAILURE);
assertThat(codeCaptor.getValue().get(3)).isEqualTo(MqttQoS.AT_MOST_ONCE);
assertThat(codeCaptor.getValue().get(4)).isEqualTo(MqttQoS.FAILURE);
assertThat(codeCaptor.getValue().get(5)).isEqualTo(MqttQoS.FAILURE);
assertThat(codeCaptor.getValue().get(6)).isEqualTo(MqttQoS.AT_MOST_ONCE);
assertThat(codeCaptor.getValue().get(0)).isEqualTo(MqttSubAckReasonCode.UNSPECIFIED_ERROR);
assertThat(codeCaptor.getValue().get(1)).isEqualTo(MqttSubAckReasonCode.UNSPECIFIED_ERROR);
assertThat(codeCaptor.getValue().get(2)).isEqualTo(MqttSubAckReasonCode.UNSPECIFIED_ERROR);
assertThat(codeCaptor.getValue().get(3)).isEqualTo(MqttSubAckReasonCode.qosGranted(MqttQoS.AT_MOST_ONCE));
assertThat(codeCaptor.getValue().get(4)).isEqualTo(MqttSubAckReasonCode.UNSPECIFIED_ERROR);
assertThat(codeCaptor.getValue().get(5)).isEqualTo(MqttSubAckReasonCode.UNSPECIFIED_ERROR);
assertThat(codeCaptor.getValue().get(6)).isEqualTo(MqttSubAckReasonCode.qosGranted(MqttQoS.AT_MOST_ONCE));
}

private static MqttTopicSubscription newMockTopicSubscription(final String filter, final MqttQoS qos) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
* SPDX-License-Identifier: EPL-2.0
*/


package org.eclipse.hono.adapter.mqtt;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand All @@ -32,6 +32,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import io.netty.handler.codec.mqtt.MqttProperties;
import io.opentracing.Span;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.messages.MqttPublishMessage;
Expand Down Expand Up @@ -95,17 +96,43 @@ public void verifyPropertyBagRetrievedFromTopic() {
* Verifies the <em>content-type</em> value retrieved from the <em>property-bag</em> in a message's topic.
*/
@Test
public void verifyContentType() {
public void verifyContentTypeFromPropertyBag() {
final String contentType = "application/vnd.eclipse.ditto+json";
final String encodedContentType = URLEncoder.encode(contentType, StandardCharsets.UTF_8);
final var device = new DeviceUser("tenant", "device");
final MqttPublishMessage msg = mock(MqttPublishMessage.class);
final MqttProperties msgProperties = new MqttProperties();
msgProperties.add(new MqttProperties.StringProperty(MqttProperties.MqttPropertyType.CONTENT_TYPE.value(), "offending/value+json"));
when(msg.topicName()).thenReturn(
String.format("event/tenant/device/?Content-Type=%s&param2=value2&param3=value3", encodedContentType));
when(msg.properties()).thenReturn(
msgProperties);
final MqttContext context = MqttContext.fromPublishPacket(msg, mock(MqttEndpoint.class), span, device);

assertNotNull(context.propertyBag());
assertEquals(contentType, context.propertyBag().getProperty(MessageHelper.SYS_PROPERTY_CONTENT_TYPE));
assertEquals(contentType, context.contentType());
}

/**
* Verifies the <em>content-type</em> value retrieved from the <em>properties</em> in a message.
*/
@Test
public void verifyContentTypeFromProperties() {
final String contentType = "application/vnd.eclipse.ditto+json";
final var device = new DeviceUser("tenant", "device");
final MqttPublishMessage msg = mock(MqttPublishMessage.class);
final MqttProperties msgProperties = new MqttProperties();
msgProperties.add(new MqttProperties.StringProperty(MqttProperties.MqttPropertyType.CONTENT_TYPE.value(), contentType));
when(msg.topicName()).thenReturn(
"event/tenant/device/?param2=value2&param3=value3");
when(msg.properties()).thenReturn(
msgProperties);
final MqttContext context = MqttContext.fromPublishPacket(msg, mock(MqttEndpoint.class), span, device);

assertNotNull(context.propertyBag());
assertNull(context.propertyBag().getProperty(MessageHelper.SYS_PROPERTY_CONTENT_TYPE));
assertEquals(contentType, context.contentType());
}

/**
Expand Down