diff --git a/adapters/mqtt-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java b/adapters/mqtt-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java index d35eafd549..a7c611a19e 100644 --- a/adapters/mqtt-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java +++ b/adapters/mqtt-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java @@ -88,6 +88,7 @@ import io.micrometer.core.instrument.Timer.Sample; import io.netty.handler.codec.mqtt.MqttConnectReturnCode; +import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttQoS; import io.opentracing.Span; import io.opentracing.SpanContext; @@ -106,6 +107,9 @@ import io.vertx.mqtt.messages.MqttPublishMessage; import io.vertx.mqtt.messages.MqttSubscribeMessage; import io.vertx.mqtt.messages.MqttUnsubscribeMessage; +import io.vertx.mqtt.messages.codes.MqttPubAckReasonCode; +import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode; +import io.vertx.mqtt.messages.codes.MqttUnsubAckReasonCode; /** * A base class for implementing Vert.x based Hono protocol adapters for publishing events & telemetry data using @@ -885,7 +889,7 @@ public final Future uploadCommandResponseMessage(final MqttContext ctx) { // check that the remote MQTT client is still connected before sending PUBACK if (ctx.isAtLeastOnce() && ctx.deviceEndpoint().isConnected()) { currentSpan.log(EVENT_SENDING_PUBACK); - ctx.acknowledge(); + ctx.acknowledge(MqttPubAckReasonCode.SUCCESS); } currentSpan.finish(); return Future. succeededFuture(); @@ -968,7 +972,7 @@ private Future uploadMessage( // check that the remote MQTT client is still connected before sending PUBACK if (ctx.isAtLeastOnce() && ctx.deviceEndpoint().isConnected()) { currentSpan.log(EVENT_SENDING_PUBACK); - ctx.acknowledge(); + ctx.acknowledge(MqttPubAckReasonCode.SUCCESS); } currentSpan.finish(); return ok; @@ -1266,7 +1270,7 @@ private void handlePublishedMessageError(final MqttContext context, final Throwa span.log("skipped sending PUBACK"); } else if (context.deviceEndpoint().isConnected()) { span.log(EVENT_SENDING_PUBACK); - context.acknowledge(); + context.acknowledge(MqttPubAckReasonCode.UNSPECIFIED_ERROR); } } span.finish(); @@ -1415,11 +1419,11 @@ protected final void onSubscribe(final MqttSubscribeMessage subscribeMsg) { Future.join(new ArrayList<>(subscriptionOutcomes)).onComplete(v -> { if (endpoint.isConnected()) { - // return a status code for each topic filter contained in the SUBSCRIBE packet - final List grantedQosLevels = subscriptionOutcomes.stream() - .map(future -> future.failed() ? MqttQoS.FAILURE : future.result().getQos()) + // return a reason code for each topic filter contained in the SUBSCRIBE packet + final List reasonCodes = subscriptionOutcomes.stream() + .map(future -> future.failed() ? MqttSubAckReasonCode.UNSPECIFIED_ERROR : MqttSubAckReasonCode.qosGranted(future.result().getQos())) .collect(Collectors.toList()); - endpoint.subscribeAcknowledge(subscribeMsg.messageId(), grantedQosLevels); + endpoint.subscribeAcknowledge(subscribeMsg.messageId(), reasonCodes, MqttProperties.NO_PROPERTIES); } else { TracingHelper.logError(span, "skipped sending command subscription notification - endpoint not connected anymore"); log.debug("skipped sending command subscription notification - endpoint not connected anymore [tenant-id: {}, device-id: {}]", @@ -1785,6 +1789,7 @@ protected final void onUnsubscribe(final MqttUnsubscribeMessage unsubscribeMsg) } final Span span = newSpan("UNSUBSCRIBE"); + final List reasonCodes = new ArrayList<>(); final List> removalDoneFutures = new ArrayList<>(unsubscribeMsg.topics().size()); unsubscribeMsg.topics().forEach(topic -> { @@ -1810,13 +1815,15 @@ protected final void onUnsubscribe(final MqttUnsubscribeMessage unsubscribeMsg) if (removedSubscription.get() != null) { log.debug("removed subscription with topic [{}] for device [tenant-id: {}, device-id: {}]", topic, removedSubscription.get().getTenant(), removedSubscription.get().getDeviceId()); + reasonCodes.add(MqttUnsubAckReasonCode.SUCCESS); } else { TracingHelper.logError(span, String.format("no subscription found for topic filter [%s]", topic)); log.debug("cannot unsubscribe - no subscription found for topic filter [{}]", topic); + reasonCodes.add(MqttUnsubAckReasonCode.UNSPECIFIED_ERROR); } }); if (endpoint.isConnected()) { - endpoint.unsubscribeAcknowledge(unsubscribeMsg.messageId()); + endpoint.unsubscribeAcknowledge(unsubscribeMsg.messageId(), reasonCodes, MqttProperties.NO_PROPERTIES); } Future.join(removalDoneFutures).onComplete(r -> span.finish()); } diff --git a/adapters/mqtt-base/src/main/java/org/eclipse/hono/adapter/mqtt/MqttContext.java b/adapters/mqtt-base/src/main/java/org/eclipse/hono/adapter/mqtt/MqttContext.java index 6a73f08bf7..a5ccfbbcbd 100644 --- a/adapters/mqtt-base/src/main/java/org/eclipse/hono/adapter/mqtt/MqttContext.java +++ b/adapters/mqtt-base/src/main/java/org/eclipse/hono/adapter/mqtt/MqttContext.java @@ -28,11 +28,13 @@ import org.eclipse.hono.util.Strings; import io.micrometer.core.instrument.Timer.Sample; +import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttQoS; import io.opentracing.Span; import io.vertx.core.buffer.Buffer; import io.vertx.mqtt.MqttEndpoint; import io.vertx.mqtt.messages.MqttPublishMessage; +import io.vertx.mqtt.messages.codes.MqttPubAckReasonCode; /** * A dictionary of relevant information required during the @@ -162,8 +164,14 @@ public static MqttContext fromPublishPacket( result.propertyBag = bag; result.topic = bag.topicWithoutPropertyBag(); result.endpoint = MetricsTags.EndpointType.fromString(result.topic.getEndpoint()); - // set the content-type using the corresponding value from the property bag - result.contentType = bag.getProperty(MessageHelper.SYS_PROPERTY_CONTENT_TYPE); + // 1. set the content-type using the corresponding value from the property bag + // 2. set the content-type using the corresponding value from the mqtt message properties (MQTT5) + result.contentType = Optional.ofNullable(bag.getProperty(MessageHelper.SYS_PROPERTY_CONTENT_TYPE)) + .orElse(Optional.ofNullable(publishedMessage.properties()) + .map(properties -> properties.getProperty(MqttProperties.MqttPropertyType.CONTENT_TYPE.value())) + .map(MqttProperties.MqttProperty::value) + .map(Object::toString) + .orElse(null)); if (result.endpoint == EndpointType.EVENT) { result.timeToLive = determineTimeToLive(bag); } @@ -423,10 +431,12 @@ public boolean isAtLeastOnce() { /** * Sends a PUBACK for the message to the device. + * + * @param reasonCode Mqtt Publish Acknowledge reason code. */ - public void acknowledge() { + public void acknowledge(final MqttPubAckReasonCode reasonCode) { if (message != null && deviceEndpoint != null && isAtLeastOnce()) { - deviceEndpoint.publishAcknowledge(message.messageId()); + deviceEndpoint.publishAcknowledge(message.messageId(), reasonCode, MqttProperties.NO_PROPERTIES); } } diff --git a/adapters/mqtt-base/src/test/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapterTest.java b/adapters/mqtt-base/src/test/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapterTest.java index 5e0eda7a2d..934b6a65d1 100644 --- a/adapters/mqtt-base/src/test/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapterTest.java +++ b/adapters/mqtt-base/src/test/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapterTest.java @@ -78,6 +78,7 @@ import org.mockito.ArgumentCaptor; import io.netty.handler.codec.mqtt.MqttConnectReturnCode; +import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttQoS; import io.opentracing.Span; import io.opentracing.SpanContext; @@ -97,6 +98,8 @@ import io.vertx.mqtt.MqttTopicSubscription; import io.vertx.mqtt.messages.MqttPublishMessage; import io.vertx.mqtt.messages.MqttSubscribeMessage; +import io.vertx.mqtt.messages.codes.MqttPubAckReasonCode; +import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode; /** * Verifies behavior of {@link AbstractVertxBasedMqttProtocolAdapter}. @@ -737,7 +740,7 @@ private void testUploadQoS1MessageSendsPubAckOnSuccess( upload.accept(adapter, context); // THEN the device does not receive a PUBACK - verify(endpoint, never()).publishAcknowledge(anyInt()); + verify(endpoint, never()).publishAcknowledge(anyInt(), any(MqttPubAckReasonCode.class), any(MqttProperties.class)); // and the message has not been reported as forwarded verify(metrics, never()).reportTelemetry( any(MetricsTags.EndpointType.class), @@ -750,7 +753,7 @@ private void testUploadQoS1MessageSendsPubAckOnSuccess( // until the message has been settled and accepted outcome.complete(); - verify(endpoint).publishAcknowledge(5555555); + verify(endpoint).publishAcknowledge(5555555, MqttPubAckReasonCode.SUCCESS, MqttProperties.NO_PROPERTIES); verify(metrics).reportTelemetry( eq(type), eq("my-tenant"), @@ -883,7 +886,7 @@ public void testUploadTelemetryMessageIncludesRetainAnnotation(final VertxTestCo ctx.verify(() -> { // THEN the device has received a PUBACK - verify(endpoint).publishAcknowledge(5555555); + verify(endpoint).publishAcknowledge(5555555, MqttPubAckReasonCode.SUCCESS, MqttProperties.NO_PROPERTIES); // and the message has been sent downstream // including the "retain" annotation verify(telemetrySender).sendTelemetry( @@ -1148,16 +1151,16 @@ public void testOnSubscribeIncludesStatusCodeForEachFilter() { // THEN the adapter sends a SUBACK packet to the device // which contains a failure status code for each unsupported filter - final ArgumentCaptor> codeCaptor = ArgumentCaptor.forClass(List.class); - verify(endpoint).subscribeAcknowledge(eq(15), codeCaptor.capture()); + final ArgumentCaptor> codeCaptor = ArgumentCaptor.forClass(List.class); + verify(endpoint).subscribeAcknowledge(eq(15), codeCaptor.capture(), eq(MqttProperties.NO_PROPERTIES)); assertThat(codeCaptor.getValue()).hasSize(subscriptions.size()); - assertThat(codeCaptor.getValue().get(0)).isEqualTo(MqttQoS.FAILURE); - assertThat(codeCaptor.getValue().get(1)).isEqualTo(MqttQoS.FAILURE); - assertThat(codeCaptor.getValue().get(2)).isEqualTo(MqttQoS.FAILURE); - assertThat(codeCaptor.getValue().get(3)).isEqualTo(MqttQoS.AT_MOST_ONCE); - assertThat(codeCaptor.getValue().get(4)).isEqualTo(MqttQoS.FAILURE); - assertThat(codeCaptor.getValue().get(5)).isEqualTo(MqttQoS.FAILURE); - assertThat(codeCaptor.getValue().get(6)).isEqualTo(MqttQoS.AT_MOST_ONCE); + assertThat(codeCaptor.getValue().get(0)).isEqualTo(MqttSubAckReasonCode.UNSPECIFIED_ERROR); + assertThat(codeCaptor.getValue().get(1)).isEqualTo(MqttSubAckReasonCode.UNSPECIFIED_ERROR); + assertThat(codeCaptor.getValue().get(2)).isEqualTo(MqttSubAckReasonCode.UNSPECIFIED_ERROR); + assertThat(codeCaptor.getValue().get(3)).isEqualTo(MqttSubAckReasonCode.qosGranted(MqttQoS.AT_MOST_ONCE)); + assertThat(codeCaptor.getValue().get(4)).isEqualTo(MqttSubAckReasonCode.UNSPECIFIED_ERROR); + assertThat(codeCaptor.getValue().get(5)).isEqualTo(MqttSubAckReasonCode.UNSPECIFIED_ERROR); + assertThat(codeCaptor.getValue().get(6)).isEqualTo(MqttSubAckReasonCode.qosGranted(MqttQoS.AT_MOST_ONCE)); } private static MqttTopicSubscription newMockTopicSubscription(final String filter, final MqttQoS qos) { diff --git a/adapters/mqtt-base/src/test/java/org/eclipse/hono/adapter/mqtt/MqttContextTest.java b/adapters/mqtt-base/src/test/java/org/eclipse/hono/adapter/mqtt/MqttContextTest.java index 5df9270325..522d54594b 100644 --- a/adapters/mqtt-base/src/test/java/org/eclipse/hono/adapter/mqtt/MqttContextTest.java +++ b/adapters/mqtt-base/src/test/java/org/eclipse/hono/adapter/mqtt/MqttContextTest.java @@ -11,11 +11,11 @@ * SPDX-License-Identifier: EPL-2.0 */ - package org.eclipse.hono.adapter.mqtt; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -32,6 +32,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import io.netty.handler.codec.mqtt.MqttProperties; import io.opentracing.Span; import io.vertx.mqtt.MqttEndpoint; import io.vertx.mqtt.messages.MqttPublishMessage; @@ -95,17 +96,43 @@ public void verifyPropertyBagRetrievedFromTopic() { * Verifies the content-type value retrieved from the property-bag in a message's topic. */ @Test - public void verifyContentType() { + public void verifyContentTypeFromPropertyBag() { final String contentType = "application/vnd.eclipse.ditto+json"; final String encodedContentType = URLEncoder.encode(contentType, StandardCharsets.UTF_8); final var device = new DeviceUser("tenant", "device"); final MqttPublishMessage msg = mock(MqttPublishMessage.class); + final MqttProperties msgProperties = new MqttProperties(); + msgProperties.add(new MqttProperties.StringProperty(MqttProperties.MqttPropertyType.CONTENT_TYPE.value(), "offending/value+json")); when(msg.topicName()).thenReturn( String.format("event/tenant/device/?Content-Type=%s¶m2=value2¶m3=value3", encodedContentType)); + when(msg.properties()).thenReturn( + msgProperties); final MqttContext context = MqttContext.fromPublishPacket(msg, mock(MqttEndpoint.class), span, device); assertNotNull(context.propertyBag()); assertEquals(contentType, context.propertyBag().getProperty(MessageHelper.SYS_PROPERTY_CONTENT_TYPE)); + assertEquals(contentType, context.contentType()); + } + + /** + * Verifies the content-type value retrieved from the properties in a message. + */ + @Test + public void verifyContentTypeFromProperties() { + final String contentType = "application/vnd.eclipse.ditto+json"; + final var device = new DeviceUser("tenant", "device"); + final MqttPublishMessage msg = mock(MqttPublishMessage.class); + final MqttProperties msgProperties = new MqttProperties(); + msgProperties.add(new MqttProperties.StringProperty(MqttProperties.MqttPropertyType.CONTENT_TYPE.value(), contentType)); + when(msg.topicName()).thenReturn( + "event/tenant/device/?param2=value2¶m3=value3"); + when(msg.properties()).thenReturn( + msgProperties); + final MqttContext context = MqttContext.fromPublishPacket(msg, mock(MqttEndpoint.class), span, device); + + assertNotNull(context.propertyBag()); + assertNull(context.propertyBag().getProperty(MessageHelper.SYS_PROPERTY_CONTENT_TYPE)); + assertEquals(contentType, context.contentType()); } /**