5
5
6
6
package mqtt .x509pubsub ;
7
7
8
- import software .amazon .awssdk .crt .CRT ;
9
- import software .amazon .awssdk .crt .CrtResource ;
10
- import software .amazon .awssdk .crt .CrtRuntimeException ;
11
- import software .amazon .awssdk .crt .io .ClientBootstrap ;
12
- import software .amazon .awssdk .crt .mqtt5 .*;
13
- import software .amazon .awssdk .crt .mqtt5 .Mqtt5ClientOptions .LifecycleEvents ;
14
- import software .amazon .awssdk .crt .mqtt5 .packets .*;
15
- import software .amazon .awssdk .iot .AwsIotMqtt5ClientBuilder ;
16
-
17
- import java .util .List ;
18
- import java .util .concurrent .CompletableFuture ;
19
-
20
- // CHECK THESE
21
8
import java .nio .charset .StandardCharsets ;
22
9
import java .time .Duration ;
23
10
import java .util .Arrays ;
24
11
import java .util .UUID ;
25
12
import java .util .concurrent .CountDownLatch ;
26
13
import java .util .concurrent .TimeUnit ;
27
14
15
+ import software .amazon .awssdk .crt .CrtResource ;
16
+ import software .amazon .awssdk .crt .CrtRuntimeException ;
17
+ import software .amazon .awssdk .crt .mqtt5 .*;
18
+ import software .amazon .awssdk .crt .mqtt5 .packets .*;
19
+ import software .amazon .awssdk .iot .AwsIotMqtt5ClientBuilder ;
20
+
28
21
/**
29
22
* MQTT5 X509 Sample (mTLS)
30
- *
31
- * Usage (macOS/Linux):
32
- * java -cp target/classes:target/deps/* samples.Mqtt5X509Sample \
33
- * --endpoint <endpoint> \
34
- * --cert </path/certificate.pem> \
35
- * --key </path/private.key> \
36
- * [--ca_file </path/AmazonRootCA1.pem>] \
37
- * [--client-id my-client] \
38
- * [--topic test/topic] \
39
- * [--message "Hello from mqtt5 sample"] \
40
- * [--count 5]
41
23
*/
42
24
public class X509PubSub {
43
25
@@ -110,11 +92,16 @@ public static void main(String[] argv) {
110
92
System .out .println ("\n Starting MQTT5 X509 PubSub Sample\n " );
111
93
final int TIMEOUT_SECONDS = 100 ;
112
94
113
- // Latches for flow control of Sample
95
+ /*
96
+ * Latches for flow control of Sample
97
+ */
114
98
CountDownLatch connected = new CountDownLatch (1 );
115
99
CountDownLatch stopped = new CountDownLatch (1 );
116
- CountDownLatch receivedAll = new CountDownLatch (args .count > 0 ? args .count : 1 ); // if infinite, we won't await fully
100
+ CountDownLatch receivedAll = new CountDownLatch (args .count > 0 ? args .count : 1 );
117
101
102
+ /*
103
+ * Handle MQTT5 Client lifecycle events
104
+ */
118
105
Mqtt5ClientOptions .LifecycleEvents lifecycleEvents = new Mqtt5ClientOptions .LifecycleEvents () {
119
106
@ Override
120
107
public void onAttemptingConnect (Mqtt5Client client , OnAttemptingConnectReturn onAttemptingConnectReturn ) {
@@ -152,6 +139,9 @@ public void onStopped(Mqtt5Client client, OnStoppedReturn onStoppedReturn) {
152
139
}
153
140
};
154
141
142
+ /*
143
+ * Handle Publishes received by the MQTT5 Client
144
+ */
155
145
Mqtt5ClientOptions .PublishEvents publishEvents = new Mqtt5ClientOptions .PublishEvents () {
156
146
@ Override
157
147
public void onMessageReceived (Mqtt5Client client , PublishReturn publishReturn ) {
@@ -167,102 +157,105 @@ public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) {
167
157
}
168
158
};
169
159
160
+ Mqtt5Client client ;
161
+
162
+ /**
163
+ * Create MQTT5 client using mutual TLS via X509 Certificate and Private Key
164
+ */
165
+ System .out .println ("==== Creating MQTT5 Client ====\n " );
166
+ AwsIotMqtt5ClientBuilder builder = AwsIotMqtt5ClientBuilder .newDirectMqttBuilderWithMtlsFromPath (
167
+ args .endpoint ,args .certPath , args .keyPath );
168
+ ConnectPacket .ConnectPacketBuilder connectProperties = new ConnectPacket .ConnectPacketBuilder ();
169
+ connectProperties .withClientId (args .clientId );
170
+ builder .withConnectProperties (connectProperties );
171
+ builder .withLifeCycleEvents (lifecycleEvents );
172
+ builder .withPublishEvents (publishEvents );
173
+ client = builder .build ();
174
+ builder .close ();
175
+
176
+ System .out .println ("==== Starting client ====" );
177
+ client .start ();
170
178
try {
171
- Mqtt5Client client ;
172
-
173
- /**
174
- * Create MQTT5 client using mutual TLS via X509 Certificate and Private Key
175
- */
176
- System .out .println ("==== Creating MQTT5 Client ====\n " );
177
- AwsIotMqtt5ClientBuilder builder = AwsIotMqtt5ClientBuilder .newDirectMqttBuilderWithMtlsFromPath (
178
- args .endpoint ,args .certPath , args .keyPath );
179
- ConnectPacket .ConnectPacketBuilder connectProperties = new ConnectPacket .ConnectPacketBuilder ();
180
- connectProperties .withClientId (args .clientId );
181
- builder .withConnectProperties (connectProperties );
182
- builder .withLifeCycleEvents (lifecycleEvents );
183
- builder .withPublishEvents (publishEvents );
184
- client = builder .build ();
185
- builder .close ();
186
-
187
-
188
- System .out .println ("==== Starting client ====" );
189
- client .start ();
190
179
if (!connected .await (TIMEOUT_SECONDS , TimeUnit .SECONDS )) {
191
180
throw new RuntimeException ("Connection timeout" );
192
181
}
182
+ } catch (InterruptedException ex ) {
183
+ onApplicationFailure (ex );
184
+ }
193
185
186
+ /* Subscribe */
187
+ System .out .printf ("==== Subscribing to topic '%s' ====%n" , args .topic );
188
+ SubscribePacket subscribePacket = SubscribePacket .of (args .topic , QOS .AT_LEAST_ONCE );
189
+ try {
190
+ SubAckPacket subAckPacket = client .subscribe (subscribePacket ).get (TIMEOUT_SECONDS , TimeUnit .SECONDS );
191
+ System .out .println ("SubAck received with reason code:" + subAckPacket .getReasonCodes () + "\n " );
192
+ } catch (Exception ex ) {
193
+ onApplicationFailure (ex );
194
+ }
194
195
195
- /* Subscribe */
196
- System .out .printf ("==== Subscribing to topic '%s' ====%n" , args .topic );
197
- SubscribePacket subscribePacket = SubscribePacket .of (args .topic , QOS .AT_LEAST_ONCE );
196
+ /* Publish */
197
+ if (args .count == 0 ) {
198
+ System .out .println ("==== Sending messages until program killed ====\n " );
199
+ } else {
200
+ System .out .printf ("==== Sending %d message(s) ====%n%n" , args .count );
201
+ }
202
+ int publishCount = 1 ;
203
+ while (args .count == 0 || publishCount <= args .count ) {
204
+ String payload = args .message + " [" + publishCount + "]" ;
205
+ System .out .printf ("Publishing message to topic '%s': %s%n" , args .topic , payload );
206
+ PublishPacket publishPacket = PublishPacket .of (
207
+ args .topic ,
208
+ QOS .AT_LEAST_ONCE ,
209
+ payload .getBytes (StandardCharsets .UTF_8 ));
198
210
try {
199
- SubAckPacket subAckPacket = client .subscribe ( subscribePacket ).get (TIMEOUT_SECONDS , TimeUnit .SECONDS );
200
- System .out .println ("SubAck received with reason code: " + subAckPacket . getReasonCodes () + "\n " );
211
+ PubAckPacket pubAck = client .publish ( publishPacket ).get (TIMEOUT_SECONDS , TimeUnit .SECONDS ). getResultPubAck ( );
212
+ System .out .println ("PubAck received with reason: " + pubAck . getReasonCode () + "\n " );
201
213
} catch (Exception ex ) {
202
214
onApplicationFailure (ex );
203
215
}
204
-
205
- /* Publish */
206
- if (args .count == 0 ) {
207
- System .out .println ("==== Sending messages until program killed ====\n " );
208
- } else {
209
- System .out .printf ("==== Sending %d message(s) ====%n%n" , args .count );
210
- }
211
-
212
- int publishCount = 1 ;
213
- while (args .count == 0 || publishCount <= args .count ) {
214
- String payload = args .message + " [" + publishCount + "]" ;
215
- System .out .printf ("Publishing message to topic '%s': %s%n" , args .topic , payload );
216
-
217
- PublishPacket publishPacket = PublishPacket .of (
218
- args .topic ,
219
- QOS .AT_LEAST_ONCE ,
220
- payload .getBytes (StandardCharsets .UTF_8 ));
221
-
222
- try {
223
- PubAckPacket pubAck = client .publish (publishPacket ).get (TIMEOUT_SECONDS , TimeUnit .SECONDS ).getResultPubAck ();
224
- System .out .println ("PubAck received with reason: " + pubAck .getReasonCode () + "\n " );
225
- } catch (Exception ex ) {
226
- onApplicationFailure (ex );
227
- }
228
-
216
+ try {
229
217
Thread .sleep (Duration .ofMillis (1500 ).toMillis ());
230
- publishCount ++;
218
+ } catch (InterruptedException ex ) {
219
+ onApplicationFailure (ex );
231
220
}
232
-
233
- if (args .count > 0 ) {
234
- long remaining = receivedAll .getCount ();
235
- if (remaining > 0 ) {
221
+ publishCount ++;
222
+ }
223
+ if (args .count > 0 ) {
224
+ long remaining = receivedAll .getCount ();
225
+ if (remaining > 0 ) {
226
+ try {
236
227
receivedAll .await (TIMEOUT_SECONDS , TimeUnit .SECONDS );
228
+ } catch (InterruptedException ex ) {
229
+ onApplicationFailure (ex );
237
230
}
238
- long received = (args .count - receivedAll .getCount ());
239
- System .out .printf ("%d message(s) received.%n%n" , received );
240
231
}
232
+ long received = (args .count - receivedAll .getCount ());
233
+ System .out .printf ("%d message(s) received.%n%n" , received );
234
+ }
241
235
242
- // ---------- Unsubscribe ----------
243
- System .out .printf ("==== Unsubscribing from topic '%s' ====%n" , args .topic );
244
- UnsubscribePacket unsubscribePacket = UnsubscribePacket .of (args .topic );
245
- try {
246
- UnsubAckPacket unsubAckPacket = client .unsubscribe (unsubscribePacket ).get (TIMEOUT_SECONDS , TimeUnit .SECONDS );
247
- System .out .println ("UnsubAck received with reason code:" + unsubAckPacket .getReasonCodes () + "\n " );
248
- } catch (Exception ex ) {
249
- onApplicationFailure (ex );
250
- }
236
+ // ---------- Unsubscribe ----------
237
+ System .out .printf ("==== Unsubscribing from topic '%s' ====%n" , args .topic );
238
+ UnsubscribePacket unsubscribePacket = UnsubscribePacket .of (args .topic );
239
+ try {
240
+ UnsubAckPacket unsubAckPacket = client .unsubscribe (unsubscribePacket ).get (TIMEOUT_SECONDS , TimeUnit .SECONDS );
241
+ System .out .println ("UnsubAck received with reason code:" + unsubAckPacket .getReasonCodes () + "\n " );
242
+ } catch (Exception ex ) {
243
+ onApplicationFailure (ex );
244
+ }
251
245
252
-
253
- System . out . println ( "==== Stopping Client ====" );
254
- client . stop ();
246
+ System . out . println ( "==== Stopping Client ====" );
247
+ client . stop ( );
248
+ try {
255
249
if (!stopped .await (TIMEOUT_SECONDS , TimeUnit .SECONDS )) {
256
250
throw new RuntimeException ("Stop timeout" );
257
251
}
258
- System .out .println ("==== Client Stopped! ====" );
259
-
260
- /* Close the client to free memory */
261
- client .close ();
262
-
263
- } catch (CrtRuntimeException | InterruptedException ex ) {
252
+ } catch (InterruptedException ex ) {
264
253
onApplicationFailure (ex );
265
254
}
255
+ System .out .println ("==== Client Stopped! ====" );
256
+
257
+ /* Close the client to free memory */
258
+ client .close ();
266
259
267
260
CrtResource .waitForNoResources ();
268
261
System .out .println ("Complete!" );
0 commit comments