This component had officially released in the Vert.x stack, just following dependency to the dependencies section of your build descriptor:
-
Maven (in your
pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-mqtt</artifactId>
<version>${maven.version}</version>
</dependency>
-
Gradle (in your
build.gradle
file):
compile "io.vertx:vertx-mqtt:${maven.version}"
This component provides a server which is able to handle connections, communication and messages exchange with remote MQTT clients. Its API provides a bunch of events related to raw protocol messages received by clients and exposes some features in order to send messages to them.
At the moment of writing, it supports all MQTT version 5.0 features except AUTH message which is yet to be implemented.
It’s not a fully featured MQTT broker but can be used for building something like that or for protocol translation.
Warning
|
this module has the tech preview status, this means the API can change between versions. |
This example shows how it’s possible to handle the connection request from a remote MQTT client. First, an {@link io.vertx.mqtt.MqttServer} instance is created and the {@link io.vertx.mqtt.MqttServer#endpointHandler(io.vertx.core.Handler)} method is used to specify the handler called when a remote client sends a CONNECT message for connecting to the server itself. The {@link io.vertx.mqtt.MqttEndpoint} instance, provided as parameter to the handler, brings all main information related to the CONNECT message like client identifier, username/password, "will" information, clean session flag, protocol version and, "keep alive" timeout and CONNECT message properties (for MQTT version 5.0). Inside that handler, the endpoint instance provides the {@link io.vertx.mqtt.MqttEndpoint#accept(boolean)} method for replying to the remote client with the corresponding CONNACK message : in this way, the connection is established. Finally, the server is started using the {@link io.vertx.mqtt.MqttServer#listen()} method with the default behavior (on localhost and default MQTT port 1883). The same method allows to specify an handler in order to check if the server is started properly or not.
{@link examples.VertxMqttServerExamples#example1}
The same endpoint instance provides the {@link io.vertx.mqtt.MqttEndpoint#disconnectMessageHandler(io.vertx.core.Handler)} for specifying the handler called when the remote client sends a DISCONNECT message in order to disconnect from the server; this handler takes {@link io.vertx.mqtt.messages.MqttDisconnectMessage} as a parameter.
{@link examples.VertxMqttServerExamples#example2}
If MQTT version 5.0 or newer is used server can send DISCONNECT message to client with the reason code and properties using {@link io.vertx.mqtt.MqttEndpoint#disconnect(MqttDisconnectReasonCode, MqttProperties)}.
The server has the support for accepting connection requests through the SSL/TLS protocol for authentication and encryption. In order to do that, the {@link io.vertx.mqtt.MqttServerOptions} class provides the {@link io.vertx.mqtt.MqttServerOptions#setSsl(boolean)} method for setting the usage of SSL/TLS (passing 'true' as value) and some other useful methods for providing server certificate and related private key (as Java key store reference, PEM or PFX format). In the following example, the {@link io.vertx.mqtt.MqttServerOptions#setKeyCertOptions(io.vertx.core.net.KeyCertOptions)} method is used in order to pass the certificates in PEM format. This method requires an instance of the possible implementations of the {@link io.vertx.core.net.KeyCertOptions} interface and in this case the {@link io.vertx.core.net.PemKeyCertOptions} class is used in order to provide the path for the server certificate and the private key with the correspondent {@link io.vertx.core.net.PemKeyCertOptions#setCertPath(java.lang.String)} and {@link io.vertx.core.net.PemKeyCertOptions#setKeyPath(java.lang.String)} methods. The MQTT server is started passing the Vert.x instance as usual and the above MQTT options instance to the creation method.
{@link examples.VertxMqttServerExamples#example3}
If you want to support connections via WebSockets, you can enable this via {@link io.vertx.mqtt.MqttServerOptions},
too. By passing true
to {@link io.vertx.mqtt.MqttServerOptions#setUseWebSocket(boolean)}, it will listen for
websocket connections on the path /mqtt
.
As with other setup configurations, the resulting endpoint connections and related disconnection are managed the same way as regular connections.
{@link examples.VertxMqttServerExamples#example11}
After a connection is established between client and server, the client can send a subscription request for a topic
using the SUBSCRIBE message. The {@link io.vertx.mqtt.MqttEndpoint} interface allows to specify a handler for the
incoming subscription request using the {@link io.vertx.mqtt.MqttEndpoint#subscribeHandler(io.vertx.core.Handler)} method.
Such handler receives an instance of the {@link io.vertx.mqtt.messages.MqttSubscribeMessage} interface which brings
the list of topics with the related subscription options as desired by the client.
Subscription options include QoS level and related flags and for MQTT version 5.0 also additional flags,
such as noLocal
and retainAsPublished
.
Finally, the endpoint instance provides the {@link io.vertx.mqtt.MqttEndpoint#subscribeAcknowledge(int, java.util.List, MqttProperties)} method
for replying to the client with the related SUBACK message containing the reason code
(which is either QoS level or error code - separate per each topic or pattern) and message properties.
{@link examples.VertxMqttServerExamples#example4}
In the same way, it’s possible to use the {@link io.vertx.mqtt.MqttEndpoint#unsubscribeHandler(io.vertx.core.Handler)} method on the endpoint in order to specify the handler called when the client sends an UNSUBSCRIBE message. This handler receives an instance of the {@link io.vertx.mqtt.messages.MqttUnsubscribeMessage} interface as parameter with the list of topics to unsubscribe. Finally, the endpoint instance provides the {@link io.vertx.mqtt.MqttEndpoint#unsubscribeAcknowledge(int)} and {@link io.vertx.mqtt.MqttEndpoint#unsubscribeAcknowledge(int, java.util.List, MqttProperties)} methods for replying to the client with the related UNSUBACK message - either simply acknowledging all unsubscriptions, or specifying the reasons per each topic and the properties in the UNSUBSCRIBE request (supported in MQTT v 5.0 or later).
{@link examples.VertxMqttServerExamples#example5}
In order to handle incoming messages published by the remote client, the {@link io.vertx.mqtt.MqttEndpoint} interface provides the {@link io.vertx.mqtt.MqttEndpoint#publishHandler(io.vertx.core.Handler)} method for specifying the handler called when the client sends a PUBLISH message. This handler receives an instance of the {@link io.vertx.mqtt.messages.MqttPublishMessage} interface as parameter with the payload, the QoS level, the duplicate and retain flags, message properties.
If the QoS level is 0 (AT_MOST_ONCE), there is no need from the endpoint to reply the client.
If the QoS level is 1 (AT_LEAST_ONCE), the endpoind needs to reply with a PUBACK message using the available {@link io.vertx.mqtt.MqttEndpoint#publishAcknowledge(int)} or {@link io.vertx.mqtt.MqttEndpoint#publishAcknowledge(int, MqttPubAckReasonCode, MqttProperties)} method.
If the QoS level is 2 (EXACTLY_ONCE), the endpoint needs to reply with a PUBREC message using the available {@link io.vertx.mqtt.MqttEndpoint#publishReceived(int)} or {@link io.vertx.mqtt.MqttEndpoint#publishReceived(int, MqttPubRecReasonCode, MqttProperties)} method; in this case the same endpoint should handle the PUBREL message received from the client as well (the remote client sends it after receiving the PUBREC from the endpoint) and it can do that specifying the handler through the {@link io.vertx.mqtt.MqttEndpoint#publishReleaseHandler(io.vertx.core.Handler)} or {@link io.vertx.mqtt.MqttEndpoint#publishReleaseMessageHandler(io.vertx.core.Handler)} method - depending on whether the server needs access to MQTT version 5.0 extended capabilities (reason code, message properties). In order to close the QoS level 2 delivery, the endpoint can use the {@link io.vertx.mqtt.MqttEndpoint#publishComplete(int)} or {@link io.vertx.mqtt.MqttEndpoint#publishComplete(int, MqttPubCompReasonCode, MqttProperties)} method for sending the PUBCOMP message to the client.
{@link examples.VertxMqttServerExamples#example6}
The endpoint can publish a message to the remote client (sending a PUBLISH message) using the {@link io.vertx.mqtt.MqttEndpoint#publish(java.lang.String, io.vertx.core.buffer.Buffer, io.netty.handler.codec.mqtt.MqttQoS, boolean, boolean)} method which takes the following input parameters : the topic to publish, the payload, the QoS level, the duplicate and retain flags. If you’re using MQTT version 5.0 or newer and you’d like to specify message properties you can use {@link io.vertx.mqtt.MqttEndpoint#publish(java.lang.String, io.vertx.core.buffer.Buffer, io.netty.handler.codec.mqtt.MqttQoS, boolean, boolean, int, MqttProperties)} method instead which takes message ID and message properties in addition to the previously described method.
If the QoS level is 0 (AT_MOST_ONCE), the endpoint won’t be receiving any feedback from the client.
If the QoS level is 1 (AT_LEAST_ONCE), the endpoint needs to handle the PUBACK message received from the client in order to receive final acknowledge of delivery. It’s possible using the {@link io.vertx.mqtt.MqttEndpoint#publishAcknowledgeHandler(io.vertx.core.Handler)} or {@link io.vertx.mqtt.MqttEndpoint#publishAcknowledgeMessageHandler(io.vertx.core.Handler)} method specifying such a handler.
If the QoS level is 2 (EXACTLY_ONCE), the endpoint needs to handle the PUBREC message received from the client. The {@link io.vertx.mqtt.MqttEndpoint#publishReceivedHandler(io.vertx.core.Handler)} and {@link io.vertx.mqtt.MqttEndpoint#publishReceivedMessageHandler(io.vertx.core.Handler)} methods allow to specify the handler for that. Inside that handler, the endpoint can use the {@link io.vertx.mqtt.MqttEndpoint#publishRelease(int)} or {@link io.vertx.mqtt.MqttEndpoint#publishRelease(int, MqttPubRelReasonCode, MqttProperties)} method for replying to the client with the PUBREL message. The last step is to handle the PUBCOMP message received from the client as final acknowledge for the published message; it’s possible using the {@link io.vertx.mqtt.MqttEndpoint#publishCompletionHandler(io.vertx.core.Handler)} or {@link io.vertx.mqtt.MqttEndpoint#publishCompletionMessageHandler(io.vertx.core.Handler)} for specifying the handler called when the final PUBCOMP message is received.
{@link examples.VertxMqttServerExamples#example7}
The underlying MQTT keep alive mechanism is handled by the server internally. When the CONNECT message is received, the server takes care of the keep alive timeout specified inside that message in order to check if the client doesn’t send messages in such timeout. At same time, for every PINGREQ received, the server replies with the related PINGRESP.
Even if there is no need for the high level application to handle that, the {@link io.vertx.mqtt.MqttEndpoint} interface provides the {@link io.vertx.mqtt.MqttEndpoint#pingHandler(io.vertx.core.Handler)} method for specifying an handler called when a PINGREQ message is received from the client. It’s just a notification to the application that the client isn’t sending meaningful messages but only pings for keeping alive; in any case the PINGRESP is automatically sent by the server internally as described above.
{@link examples.VertxMqttServerExamples#example8}
The {@link io.vertx.mqtt.MqttServer} interface provides the {@link io.vertx.mqtt.MqttServer#close()} method that can be used for closing the server; it stops to listen for incoming connections and closes all the active connections with remote clients. This method is asynchronous and one overload provides the possibility to specify a complention handler that will be called when the server is really closed.
{@link examples.VertxMqttServerExamples#example9}
After a connection is established between client and server, the client can send an auth packet to server using the AUTH message. The {@link io.vertx.mqtt.MqttEndpoint} interface allows to specify a handler for the incoming auth packet using the {@link io.vertx.mqtt.MqttEndpoint#authenticationExchangeHandler(io.vertx.core.Handler)} method. Such handler receives an instance of the {@link io.vertx.mqtt.messages.MqttAuthenticationExchangeMessage} interface which brings the reason code, the authentication method and data. The server could continue to send AUTH packet using the {@link io.vertx.mqtt.MqttEndpoint#authenticationExchange(io.vertx.mqtt.messages.MqttAuthenticationExchangeMessage)} for authentication or just passed it.
{@link examples.VertxMqttServerExamples#example14}
If you’re creating MQTT servers from inside verticles, those servers will be automatically closed when the verticle is undeployed.
The handlers related to the MQTT server are always executed in the same event loop thread. It means that on a system with more cores, only one instance is deployed so only one core is used. In order to use more cores, it’s possible to deploy more instances of the MQTT server.
It’s possible to do that programmatically:
{@link examples.VertxMqttServerExamples#example10}
or using a verticle specifying the number of instances:
{@link examples.VertxMqttServerExamples#example11}
What’s really happen is that even only MQTT server is deployed but as incoming connections arrive, Vert.x distributes them in a round-robin fashion to any of the connect handlers executed on different cores.
This component provides an MQTT client which is compliant with the 3.1.1 spec. Its API provides a bunch of methods for connecting/disconnecting to a broker, publishing messages (with all three different levels of QoS) and subscribing to topics.
Warning
|
This module has the tech preview status, this means the API can change between versions. |
The client gives you opportunity to connect to a server and disconnect from it. Also, you could specify things like the host and port of a server you would like to connect to passing instance of {@link io.vertx.mqtt.MqttClientOptions} as a param through constructor.
This example shows how you could connect to a server and disconnect from it using Vert.x MQTT client and calling {@link io.vertx.mqtt.MqttClient#connect(int, java.lang.String)} and {@link io.vertx.mqtt.MqttClient#disconnect()} methods.
{@link examples.VertxMqttClientExamples#example1}
Note
|
The default address of the server provided by {@link io.vertx.mqtt.MqttClientOptions} is localhost:1883 and localhost:8883 if you are using SSL/TSL. |
Now, lest go deeper and take look at this example:
{@link examples.VertxMqttClientExamples#example2}
Here we have the example of usage of {@link io.vertx.mqtt.MqttClient#subscribe(java.lang.String, int)} method. In order to receive messages from rpi2/temp topic we call {@link io.vertx.mqtt.MqttClient#subscribe(java.lang.String, int)} method. Although, to handle received messages from server you need to provide a handler, which will be called each time you have a new messages in the topics you subscribe on. As this example shows, handler could be provided via {@link io.vertx.mqtt.MqttClient#publishHandler(io.vertx.core.Handler)} method.
If you would like to publish some message into topic then {@link io.vertx.mqtt.MqttClient#publish(java.lang.String, io.vertx.core.buffer.Buffer, io.netty.handler.codec.mqtt.MqttQoS, boolean, boolean)} should be called. Let’s take a look at the example:
{@link examples.VertxMqttClientExamples#example3}
In the example, we send message to topic with name "temperature".
After a connection is established between client and server, the client can send an auth request to server using the {@link io.vertx.mqtt.MqttClient#authenticationExchange(io.vertx.mqtt.messages.MqttAuthenticationExchangeMessage)} for authentication. The Server may return an AUTH packet. The {@link io.vertx.mqtt.MqttClient} interface allows to specify a handler for the incoming auth packet using the {@link io.vertx.mqtt.MqttClient#authenticationExchangeHandler(io.vertx.core.Handler)} method. Such handler receives an instance of the {@link io.vertx.mqtt.messages.MqttAuthenticationExchangeMessage} interface which brings the reason code, the authentication method and data.
{@link examples.VertxMqttClientExamples#example10}
In order to keep connection with server you should time to time send something to server otherwise server will close the connection. The right way to keep connection alive is a {@link io.vertx.mqtt.MqttClient#ping()} method.
Important
|
By default, your client keep connections with server automatically. That means that you don’t need to call {@link io.vertx.mqtt.MqttClient#ping()} in order to keep connections with server. The {@link io.vertx.mqtt.MqttClient} will do it for you. |
If you want to disable this feature then you should call {@link io.vertx.mqtt.MqttClientOptions#setAutoKeepAlive(boolean)} with false
as argument:
{@link examples.VertxMqttClientExamples#example4}
-
publish is completed
You could provide handler by calling {@link io.vertx.mqtt.MqttClient#publishCompletionHandler(io.vertx.core.Handler)}. The handler will be called each time publish is completed. This one is pretty useful because you could see the packetId of just received PUBACK or PUBCOMP packet.
{@link examples.VertxMqttClientExamples#example5}
WarningThe handler WILL NOT BE CALLED if sent publish packet with QoS=0. -
subscribe completed
{@link examples.VertxMqttClientExamples#example6}
-
unsubscribe completed
{@link examples.VertxMqttClientExamples#example7}
-
unsubscribe sent
{@link examples.VertxMqttClientExamples#example8}
-
PINGRESP received
{@link examples.VertxMqttClientExamples#example9}
You can connect to an MQTT server using TLS by configuring the client TCP options, make sure to set:
-
the SSL flag
-
the server certificate or the trust all flag
-
the hostname verification algorithm to
"HTTPS"
if you want to verify the server identity otherwise""
{@link examples.VertxMqttClientExamples#tls}
Note
|
More details on the TLS client config can be found here |
{@link examples.VertxMqttServerExamples#example13}
If your servers are behind haproxy or nginx and you want to get the client’s original ip and port, then you need to set setUseProxyProtocol
to true
Important
|
To enable this feature, you need to add dependency netty-codec-haproxy , but it is not introduced by default, so you need to manually add it
|
-
Maven (in your
pom.xml
):
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-haproxy</artifactId>
<version>${maven.version}</version>
</dependency>
-
Gradle (in your
build.gradle
file):
compile "io.netty:netty-codec-haproxy:${maven.version}"