Skip to content

Commit 803d664

Browse files
thg2kNamoshek
andauthored
Added 'retained' parameter to subscriptions callbacks for published messages (#19)
* Added 'retained' parameter to subscriptions callbacks for published messages If the publisher set the retained flag, the broker will honor it and send the published messages to the subscriber as soon as they subscrib. But the subscribe callback might need to know whether the message was delivered live or if it was retained, which could mean the message is stale. * Make $retained false by default and add PhpDoc * Explain subscription callback in PhpDoc Co-authored-by: Marvin Mall <[email protected]>
1 parent 33099e5 commit 803d664

File tree

2 files changed

+49
-10
lines changed

2 files changed

+49
-10
lines changed

src/Contracts/MQTTClient.php

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,24 @@ public function publish(string $topic, string $message, int $qualityOfService =
5151
/**
5252
* Subscribe to the given topic with the given quality of service.
5353
*
54+
* The subscription callback is passed the topic as first and the message as second
55+
* parameter. A third parameter indicates whether the received message has been sent
56+
* because it was retained by the broker.
57+
*
58+
* Example:
59+
* ```php
60+
* $mqtt->subscribe(
61+
* '/foo/bar/+',
62+
* function (string $topic, string $message, bool $retained) use ($logger) {
63+
* $logger->info("Received {retained} message on topic [{topic}]: {message}", [
64+
* 'topic' => $topic,
65+
* 'message' => $message,
66+
* 'retained' => $retained ? 'retained' : 'live'
67+
* ]);
68+
* }
69+
* );
70+
* ```
71+
*
5472
* @param string $topic
5573
* @param callable $callback
5674
* @param int $qualityOfService

src/MQTTClient.php

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,10 @@ public function __construct(
113113
* If no custom settings are passed, the client will use the default settings.
114114
* See {@see ConnectionSettings} for more details about the defaults.
115115
*
116-
* @param string|null $username
117-
* @param string|null $password
118-
* @param ConnectionSettings $settings
119-
* @param bool $sendCleanSessionFlag
116+
* @param string|null $username
117+
* @param string|null $password
118+
* @param ConnectionSettings|null $settings
119+
* @param bool $sendCleanSessionFlag
120120
* @return void
121121
* @throws ConnectingToBrokerFailedException
122122
*/
@@ -469,6 +469,24 @@ protected function publishMessage(
469469
/**
470470
* Subscribe to the given topic with the given quality of service.
471471
*
472+
* The subscription callback is passed the topic as first and the message as second
473+
* parameter. A third parameter indicates whether the received message has been sent
474+
* because it was retained by the broker.
475+
*
476+
* Example:
477+
* ```php
478+
* $mqtt->subscribe(
479+
* '/foo/bar/+',
480+
* function (string $topic, string $message, bool $retained) use ($logger) {
481+
* $logger->info("Received {retained} message on topic [{topic}]: {message}", [
482+
* 'topic' => $topic,
483+
* 'message' => $message,
484+
* 'retained' => $retained ? 'retained' : 'live'
485+
* ]);
486+
* }
487+
* );
488+
* ```
489+
*
472490
* @param string $topic
473491
* @param callable $callback
474492
* @param int $qualityOfService
@@ -598,6 +616,7 @@ public function loop(bool $allowSleep = true, bool $exitWhenQueuesEmpty = false,
598616
// Read the first byte of a message (command and flags).
599617
$command = (int)(ord($byte) / 16);
600618
$qualityOfService = (ord($byte) & 0x06) >> 1;
619+
$retained = (bool) (ord($byte) & 0x01);
601620

602621
// Read the second byte of a message (remaining length)
603622
// If the continuation bit (8) is set on the length byte,
@@ -621,7 +640,7 @@ public function loop(bool $allowSleep = true, bool $exitWhenQueuesEmpty = false,
621640
case 2:
622641
throw new UnexpectedAcknowledgementException(self::EXCEPTION_ACK_CONNECT, 'We unexpectedly received a connection acknowledgement.');
623642
case 3:
624-
$this->handlePublishedMessage($buffer, $qualityOfService);
643+
$this->handlePublishedMessage($buffer, $qualityOfService, $retained);
625644
break;
626645
case 4:
627646
$this->handlePublishAcknowledgement($buffer);
@@ -722,10 +741,11 @@ protected function allQueuesAreEmpty(): bool
722741
*
723742
* @param string $buffer
724743
* @param int $qualityOfServiceLevel
744+
* @param bool $retained
725745
* @return void
726746
* @throws DataTransferException
727747
*/
728-
protected function handlePublishedMessage(string $buffer, int $qualityOfServiceLevel): void
748+
protected function handlePublishedMessage(string $buffer, int $qualityOfServiceLevel, bool $retained = false): void
729749
{
730750
$topicLength = (ord($buffer[0]) << 8) + ord($buffer[1]);
731751
$topic = substr($buffer, 2, $topicLength);
@@ -762,7 +782,7 @@ protected function handlePublishedMessage(string $buffer, int $qualityOfServiceL
762782
}
763783
}
764784

765-
$this->deliverPublishedMessage($topic, $message, $qualityOfServiceLevel);
785+
$this->deliverPublishedMessage($topic, $message, $qualityOfServiceLevel, $retained);
766786
}
767787

768788
/**
@@ -1061,9 +1081,10 @@ protected function handlePingAcknowledgement(): void
10611081
* @param string $topic
10621082
* @param string $message
10631083
* @param int $qualityOfServiceLevel
1084+
* @param bool $retained
10641085
* @return void
10651086
*/
1066-
protected function deliverPublishedMessage(string $topic, string $message, int $qualityOfServiceLevel): void
1087+
protected function deliverPublishedMessage(string $topic, string $message, int $qualityOfServiceLevel, bool $retained = false): void
10671088
{
10681089
$subscribers = $this->repository->getTopicSubscriptionsMatchingTopic($topic);
10691090

@@ -1082,7 +1103,7 @@ protected function deliverPublishedMessage(string $topic, string $message, int $
10821103
}
10831104

10841105
try {
1085-
call_user_func($subscriber->getCallback(), $topic, $message);
1106+
call_user_func($subscriber->getCallback(), $topic, $message, $retained);
10861107
} catch (\Throwable $e) {
10871108
// We ignore errors produced by custom callbacks.
10881109
}
@@ -1300,7 +1321,7 @@ protected function writeToSocket(string $data, int $length = null): void
13001321
]);
13011322
throw new DataTransferException(self::EXCEPTION_TX_DATA, 'Sending data over the socket failed. Has it been closed?');
13021323
}
1303-
1324+
13041325
// After writing successfully to the socket, the broker should have received a new message from us.
13051326
// Because we only need to send a ping if no other messages are delivered, we can safely reset the ping timer.
13061327
$this->lastPingAt = microtime(true);

0 commit comments

Comments
 (0)