Skip to content

Commit 99967b4

Browse files
Add connected event (handler) (#152)
* Added an connected event (handler list) * Prevent adding duplicated subscriptions * Fix phpcs * Pass info to connected event handlers if is auto-reconnect * Prevent double subscriptions in MemoryRepository * Add tests for the new connected event handlers * Revert "Fix phpcs" This reverts commit 10a9d42. * Remove trailing whitespace --------- Co-authored-by: Marvin Mall <[email protected]>
1 parent 57b554d commit 99967b4

6 files changed

+206
-10
lines changed

src/Concerns/OffersHooks.php

+77
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ trait OffersHooks
2323
/** @var \SplObjectStorage|array<\Closure> */
2424
private $messageReceivedEventHandlers;
2525

26+
/** @var \SplObjectStorage|array<\Closure> */
27+
private $connectedEventHandlers;
28+
2629
/**
2730
* Needs to be called in order to initialize the trait.
2831
*
@@ -33,6 +36,7 @@ protected function initializeEventHandlers(): void
3336
$this->loopEventHandlers = new \SplObjectStorage();
3437
$this->publishEventHandlers = new \SplObjectStorage();
3538
$this->messageReceivedEventHandlers = new \SplObjectStorage();
39+
$this->connectedEventHandlers = new \SplObjectStorage();
3640
}
3741

3842
/**
@@ -266,4 +270,77 @@ private function runMessageReceivedEventHandlers(string $topic, string $message,
266270
}
267271
}
268272
}
273+
274+
/**
275+
* Registers an event handler which is called when the client established a connection to the broker.
276+
* This also includes manual reconnects as well as auto-reconnects by the client itself.
277+
*
278+
* The event handler is passed the MQTT client as first argument,
279+
* followed by a flag which indicates whether an auto-reconnect occurred as second argument.
280+
*
281+
* Example:
282+
* ```php
283+
* $mqtt->registerConnectedEventHandler(function (
284+
* MqttClient $mqtt,
285+
* bool $isAutoReconnect
286+
* ) use ($logger) {
287+
* if ($isAutoReconnect) {
288+
* $logger->info("Client successfully auto-reconnected to the broker.);
289+
* } else {
290+
* $logger->info("Client successfully connected to the broker.");
291+
* }
292+
* });
293+
* ```
294+
*
295+
* Multiple event handlers can be registered at the same time.
296+
*
297+
* @param \Closure $callback
298+
* @return MqttClient
299+
*/
300+
public function registerConnectedEventHandler(\Closure $callback): MqttClient
301+
{
302+
$this->connectedEventHandlers->attach($callback);
303+
304+
/** @var MqttClient $this */
305+
return $this;
306+
}
307+
308+
/**
309+
* Unregisters a connected event handler which prevents it from being called in the future.
310+
*
311+
* This does not affect other registered event handlers. It is possible
312+
* to unregister all registered event handlers by passing null as callback.
313+
*
314+
* @param \Closure|null $callback
315+
* @return MqttClient
316+
*/
317+
public function unregisterConnectedEventHandler(\Closure $callback = null): MqttClient
318+
{
319+
if ($callback === null) {
320+
$this->connectedEventHandlers->removeAll($this->connectedEventHandlers);
321+
} else {
322+
$this->connectedEventHandlers->detach($callback);
323+
}
324+
325+
/** @var MqttClient $this */
326+
return $this;
327+
}
328+
329+
/**
330+
* Runs all the registered connected event handlers.
331+
* Each event handler is executed in a try-catch block to avoid spilling exceptions.
332+
*
333+
* @param bool $isAutoReconnect
334+
* @return void
335+
*/
336+
private function runConnectedEventHandlers(bool $isAutoReconnect): void
337+
{
338+
foreach ($this->connectedEventHandlers as $handler) {
339+
try {
340+
call_user_func($handler, $this, $isAutoReconnect);
341+
} catch (\Throwable $e) {
342+
$this->logger->error('Connected hook callback threw exception.', ['exception' => $e]);
343+
}
344+
}
345+
}
269346
}

src/ConnectionSettings.php

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66

77
/**
88
* The settings used during connection to a broker.
9-
*
10-
* This class is immutable and all setters return a clone of the original class because
9+
*
10+
* This class is immutable and all setters return a clone of the original class because
1111
* connection settings must not change once passed to MqttClient.
1212
*
1313
* @package PhpMqtt\Client

src/MqttClient.php

+5-2
Original file line numberDiff line numberDiff line change
@@ -148,10 +148,11 @@ public function connect(
148148
* Connect to the MQTT broker using the configured settings.
149149
*
150150
* @param bool $useCleanSession
151+
* @param bool $isAutoReconnect
151152
* @return void
152153
* @throws ConnectingToBrokerFailedException
153154
*/
154-
protected function connectInternal(bool $useCleanSession = false): void
155+
protected function connectInternal(bool $useCleanSession = false, bool $isAutoReconnect = false): void
155156
{
156157
try {
157158
$this->establishSocketConnection();
@@ -163,6 +164,8 @@ protected function connectInternal(bool $useCleanSession = false): void
163164
}
164165

165166
$this->connected = true;
167+
168+
$this->runConnectedEventHandlers($isAutoReconnect);
166169
}
167170

168171
/**
@@ -414,7 +417,7 @@ protected function reconnect(): void
414417

415418
for ($i = 1; $i <= $maxReconnectAttempts; $i++) {
416419
try {
417-
$this->connectInternal();
420+
$this->connectInternal(false, true);
418421

419422
return;
420423
} catch (ConnectingToBrokerFailedException $e) {

src/Repositories/MemoryRepository.php

+5-5
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,9 @@ public function countSubscriptions(): int
191191
*/
192192
public function addSubscription(Subscription $subscription): void
193193
{
194+
// Remove a potentially existing subscription for this topic filter.
195+
$this->removeSubscription($subscription->getTopicFilter());
196+
194197
$this->subscriptions[] = $subscription;
195198
}
196199

@@ -217,16 +220,13 @@ public function getSubscriptionsMatchingTopic(string $topicName): array
217220
*/
218221
public function removeSubscription(string $topicFilter): bool
219222
{
220-
$result = false;
221-
222223
foreach ($this->subscriptions as $index => $subscription) {
223224
if ($subscription->getTopicFilter() === $topicFilter) {
224225
unset($this->subscriptions[$index]);
225-
$result = true;
226-
break;
226+
return true;
227227
}
228228
}
229229

230-
return $result;
230+
return false;
231231
}
232232
}
+116
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
<?php
2+
3+
/** @noinspection PhpUnhandledExceptionInspection */
4+
5+
declare(strict_types=1);
6+
7+
namespace Tests\Feature;
8+
9+
use PhpMqtt\Client\MqttClient;
10+
use Tests\TestCase;
11+
12+
/**
13+
* Tests that the connected event handler work as intended.
14+
*
15+
* @package Tests\Feature
16+
*/
17+
class ConnectedEventHandlerTest extends TestCase
18+
{
19+
public function test_connected_event_handlers_are_called_every_time_the_client_connects_successfully(): void
20+
{
21+
$client = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'test-connected-event-handler');
22+
23+
$handlerCallCount = 0;
24+
$handler = function () use (&$handlerCallCount) {
25+
$handlerCallCount++;
26+
};
27+
28+
$client->registerConnectedEventHandler($handler);
29+
$client->connect();
30+
31+
$this->assertSame(1, $handlerCallCount);
32+
33+
$client->disconnect();
34+
$client->connect();
35+
36+
$this->assertSame(2, $handlerCallCount);
37+
38+
$client->disconnect();
39+
$client->connect();
40+
41+
$this->assertSame(3, $handlerCallCount);
42+
43+
$client->disconnect();
44+
}
45+
46+
public function test_connected_event_handlers_can_be_unregistered_and_will_not_be_called_anymore(): void
47+
{
48+
$client = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'test-connected-event-handler');
49+
50+
$handlerCallCount = 0;
51+
$handler = function () use (&$handlerCallCount) {
52+
$handlerCallCount++;
53+
};
54+
55+
$client->registerConnectedEventHandler($handler);
56+
$client->connect();
57+
58+
$this->assertSame(1, $handlerCallCount);
59+
60+
$client->unregisterConnectedEventHandler($handler);
61+
$client->disconnect();
62+
$client->connect();
63+
64+
$this->assertSame(1, $handlerCallCount);
65+
66+
$client->registerConnectedEventHandler($handler);
67+
$client->disconnect();
68+
$client->connect();
69+
70+
$this->assertSame(2, $handlerCallCount);
71+
72+
$client->unregisterConnectedEventHandler($handler);
73+
$client->disconnect();
74+
$client->connect();
75+
76+
$this->assertSame(2, $handlerCallCount);
77+
78+
$client->disconnect();
79+
}
80+
81+
public function test_connected_event_handlers_can_throw_exceptions_which_does_not_affect_other_handlers_or_the_application(): void
82+
{
83+
$client = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'test-connected-event-handler');
84+
85+
$handlerCallCount = 0;
86+
$handler1 = function () use (&$handlerCallCount) {
87+
$handlerCallCount++;
88+
};
89+
$handler2 = function () {
90+
throw new \Exception('Something went wrong!');
91+
};
92+
93+
$client->registerConnectedEventHandler($handler1);
94+
$client->registerConnectedEventHandler($handler2);
95+
96+
$client->connect();
97+
98+
$this->assertSame(1, $handlerCallCount);
99+
100+
$client->disconnect();
101+
}
102+
103+
public function test_connected_event_handler_is_passed_the_mqtt_client_and_the_auto_reconnect_flag_as_arguments(): void
104+
{
105+
$client = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'test-connected-event-handler');
106+
107+
$client->registerConnectedEventHandler(function ($mqttClient, $isAutoReconnect) {
108+
$this->assertInstanceOf(MqttClient::class, $mqttClient);
109+
$this->assertIsBool($isAutoReconnect);
110+
$this->assertFalse($isAutoReconnect);
111+
});
112+
113+
$client->connect();
114+
$client->disconnect();
115+
}
116+
}

tests/Feature/PublishEventHandlerTest.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
use Tests\TestCase;
1111

1212
/**
13-
* Tests that the loop event handler work as intended.
13+
* Tests that the publish event handler work as intended.
1414
*
1515
* @package Tests\Feature
1616
*/

0 commit comments

Comments
 (0)