From 635610cc4881f5ab47975309bc193688808293a3 Mon Sep 17 00:00:00 2001 From: Marvin Mall Date: Tue, 1 Sep 2020 20:44:22 +0200 Subject: [PATCH 1/3] Add RedisRepository The repository is inspired by the previous implementation but uses hash maps over sets for pending messages (due to message identifiers as keys). --- src/Repositories/RedisConnectionSettings.php | 109 +++++ src/Repositories/RedisRepository.php | 434 +++++++++++++++++++ 2 files changed, 543 insertions(+) create mode 100644 src/Repositories/RedisConnectionSettings.php create mode 100644 src/Repositories/RedisRepository.php diff --git a/src/Repositories/RedisConnectionSettings.php b/src/Repositories/RedisConnectionSettings.php new file mode 100644 index 0000000..4943ae9 --- /dev/null +++ b/src/Repositories/RedisConnectionSettings.php @@ -0,0 +1,109 @@ +host; + } + + /** + * @param string $host + * @return RedisConnectionSettings + */ + public function setHost(string $host): RedisConnectionSettings + { + $clone = clone $this; + + $clone->host = $host; + + return $clone; + } + + /** + * @return int + */ + public function getPort(): int + { + return $this->port; + } + + /** + * @param int $port + * @return RedisConnectionSettings + */ + public function setPort(int $port): RedisConnectionSettings + { + $clone = clone $this; + + $clone->port = $port; + + return $clone; + } + + /** + * @return float + */ + public function getConnectTimeout(): float + { + return $this->connectTimeout; + } + + /** + * @param float $connectTimeout + * @return RedisConnectionSettings + */ + public function setConnectTimeout(float $connectTimeout): RedisConnectionSettings + { + $clone = clone $this; + + $clone->connectTimeout = $connectTimeout; + + return $clone; + } + + /** + * @return int + */ + public function getDatabase(): int + { + return $this->database; + } + + /** + * @param int $database + * @return RedisConnectionSettings + */ + public function setDatabase(int $database): RedisConnectionSettings + { + $clone = clone $this; + + $clone->database = $database; + + return $clone; + } +} diff --git a/src/Repositories/RedisRepository.php b/src/Repositories/RedisRepository.php new file mode 100644 index 0000000..16b6782 --- /dev/null +++ b/src/Repositories/RedisRepository.php @@ -0,0 +1,434 @@ +isConnected())) { + throw new ConfigurationInvalidException('Redis repository requires connection settings or connected Redis instance.'); + } + + if ($redis !== null && $redis->isConnected()) { + $this->redis = $redis; + } else { + $this->ensureConnectionSettingsAreValid($connectionSettings); + + $redis = new \Redis(); + $result = $redis->connect($connectionSettings->getHost(), $connectionSettings->getPort(), $connectionSettings->getConnectTimeout()); + + if ($result === false) { + throw new ConfigurationInvalidException('Connecting to the Redis server failed. Is the configuration correct?'); + } + + $redis->select($connectionSettings->getDatabase()); + + $this->redis = $redis; + } + + $this->identifier = $identifier; + + $this->redis->setOption(\Redis::OPT_PREFIX, $this->identifier . ':'); + $this->redis->setOption(\Redis::OPT_SCAN, \Redis::SCAN_RETRY); + $this->redis->setOption(\Redis::OPT_SERIALIZER, \Redis::SERIALIZER_PHP); + + $this->ensureRepositoryIsInitialized(); + } + + /** + * Ensures the given connection settings are valid (i.e. usable to connect to a Redis instance). + * This method does not validate whether connecting is actually possible. + * + * @param RedisConnectionSettings $connectionSettings + * @return void + * @throws ConfigurationInvalidException + */ + protected function ensureConnectionSettingsAreValid(RedisConnectionSettings $connectionSettings): void + { + if ($connectionSettings->getHost() === null) { + throw new ConfigurationInvalidException('No host has been configured for the Redis repository.'); + } + + if ($connectionSettings->getDatabase() < 0 || $connectionSettings->getDatabase() > 15) { + throw new ConfigurationInvalidException('The configured Redis database is invalid. Only databases 0 to 15 are supported.'); + } + } + + /** + * This method initializes the required keys for this repository using the established Redis connection. + * + * @return void + */ + protected function ensureRepositoryIsInitialized(): void + { + // Set the next to be used message id to one, since zero is an invalid message id. + $this->redis->setnx(self::KEY_NEXT_MESSAGE_ID, 1); + } + + /** + * Returns a new message id. The message id might have been used before, + * but it is currently not being used (i.e. in a resend queue). + * + * @return int + * @throws RepositoryException + */ + public function newMessageId(): int + { + if ($this->countPendingOutgoingMessages() >= 65535) { + // This should never happen, as the server receive queue is normally smaller than the actual + // number of message ids. Also, when using MQTT 5.0 the server can specify a smaller receive + // queue size (mosquitto for example has 20 by default), so the client has to implement the + // logic to honor this restriction and fallback to the protocol limit. + throw new RepositoryException('No more message identifiers available. The queue is full.'); + } + + $nextMessageId = $this->redis->get(static::KEY_NEXT_MESSAGE_ID); + while ($this->redis->hExists(static::KEY_PENDING_OUTGOING_MESSAGES, $nextMessageId)) { + $nextMessageId++; + if ($nextMessageId > 65535) { + $nextMessageId = 1; + } + } + + $this->redis->set(static::KEY_NEXT_MESSAGE_ID, $nextMessageId); + + return $nextMessageId; + } + + /** + * Returns the number of pending outgoing messages. + * + * @return int + */ + public function countPendingOutgoingMessages(): int + { + $result = $this->redis->hLen(static::KEY_PENDING_OUTGOING_MESSAGES); + + if ($result === false) { + $result = 0; + } + + return $result; + } + + /** + * Gets a pending outgoing message with the given message identifier, if found. + * + * @param int $messageId + * @return PendingMessage|null + */ + public function getPendingOutgoingMessage(int $messageId): ?PendingMessage + { + /** @var PendingMessage|false $pendingMessage */ + $pendingMessage = $this->redis->hGet(static::KEY_PENDING_OUTGOING_MESSAGES, $messageId); + + if ($pendingMessage === false) { + return null; + } + + return $pendingMessage; + } + + /** + * Gets a list of pending outgoing messages last sent before the given date time. + * + * If date time is `null`, all pending messages are returned. + * + * The messages are returned in the same order they were added to the repository. + * + * @param DateTime|null $dateTime + * @return PendingMessage[] + */ + public function getPendingOutgoingMessagesLastSentBefore(DateTime $dateTime = null): array + { + $result = []; + + $iterator = null; + while ($messages = $this->redis->hScan(self::KEY_PENDING_OUTGOING_MESSAGES, $iterator)) { + /** @var PendingMessage[] $messages */ + foreach ($messages as $message) { + if ($message->getLastSentAt() < $dateTime) { + $result[] = $message; + } + } + } + + return $result; + } + + /** + * Adds a pending outgoing message to the repository. + * + * @param PendingMessage $message + * @return void + * @throws PendingMessageAlreadyExistsException + */ + public function addPendingOutgoingMessage(PendingMessage $message): void + { + $added = $this->redis->hSetNx(static::KEY_PENDING_OUTGOING_MESSAGES, $message->getMessageId(), $message); + + if ($added === false) { + throw new PendingMessageAlreadyExistsException($message->getMessageId()); + } + } + + /** + * Marks an existing pending outgoing published message as received in the repository. + * + * If the message does not exists, an exception is thrown, + * otherwise `true` is returned if the message was marked as received, and `false` + * in case it was already marked as received. + * + * @param int $messageId + * @return bool + * @throws PendingMessageNotFoundException + */ + public function markPendingOutgoingPublishedMessageAsReceived(int $messageId): bool + { + $message = $this->getPendingOutgoingMessage($messageId); + + if ($message === null || !($message instanceof PublishedMessage)) { + throw new PendingMessageNotFoundException($messageId); + } + + $result = $message->markAsReceived(); + + $this->redis->hSet(static::KEY_PENDING_OUTGOING_MESSAGES, $messageId, $message); + + return $result; + } + + /** + * Removes a pending outgoing message from the repository. + * + * If a pending message with the given identifier is found and + * successfully removed from the repository, `true` is returned. + * Otherwise `false` will be returned. + * + * @param int $messageId + * @return bool + */ + public function removePendingOutgoingMessage(int $messageId): bool + { + $result = $this->redis->hDel(static::KEY_PENDING_OUTGOING_MESSAGES, $messageId); + + if ($result === false) { + return false; + } + + return true; + } + + /** + * Returns the number of pending incoming messages. + * + * @return int + */ + public function countPendingIncomingMessages(): int + { + $result = $this->redis->hLen(static::KEY_PENDING_INCOMING_MESSAGES); + + if ($result === false) { + $result = 0; + } + + return $result; + } + + /** + * Gets a pending incoming message with the given message identifier, if found. + * + * @param int $messageId + * @return PendingMessage|null + */ + public function getPendingIncomingMessage(int $messageId): ?PendingMessage + { + /** @var PendingMessage|false $pendingMessage */ + $pendingMessage = $this->redis->hGet(static::KEY_PENDING_INCOMING_MESSAGES, $messageId); + + if ($pendingMessage === false) { + return null; + } + + return $pendingMessage; + } + + /** + * Adds a pending outgoing message to the repository. + * + * @param PendingMessage $message + * @return void + * @throws PendingMessageAlreadyExistsException + */ + public function addPendingIncomingMessage(PendingMessage $message): void + { + $added = $this->redis->hSetNx(static::KEY_PENDING_INCOMING_MESSAGES, $message->getMessageId(), $message); + + if ($added === false) { + throw new PendingMessageAlreadyExistsException($message->getMessageId()); + } + } + + /** + * Removes a pending incoming message from the repository. + * + * If a pending message with the given identifier is found and + * successfully removed from the repository, `true` is returned. + * Otherwise `false` will be returned. + * + * @param int $messageId + * @return bool + */ + public function removePendingIncomingMessage(int $messageId): bool + { + $result = $this->redis->hDel(static::KEY_PENDING_INCOMING_MESSAGES, $messageId); + + if ($result === false) { + return false; + } + + return true; + } + + /** + * Returns the number of registered subscriptions. + * + * @return int + */ + public function countSubscriptions(): int + { + return $this->redis->sCard(static::KEY_SUBSCRIPTIONS); + } + + /** + * Adds a subscription to the repository. + * + * @param Subscription $subscription + * @return void + */ + public function addSubscription(Subscription $subscription): void + { + $this->redis->sAdd(static::KEY_SUBSCRIPTIONS, $subscription); + } + + /** + * Gets all subscriptions matching the given criteria. + * + * @param string|null $topicName + * @param int|null $subscriptionId + * @return Subscription[] + */ + public function getMatchingSubscriptions(string $topicName = null, int $subscriptionId = null): array + { + $result = []; + + $iterator = null; + while ($subscriptions = $this->redis->sScan(self::KEY_SUBSCRIPTIONS, $iterator)) { + /** @var Subscription[] $subscriptions */ + foreach ($subscriptions as $subscription) { + if ($topicName !== null && !$subscription->matchTopicFilter($topicName)) { + continue; + } + + if ($subscriptionId !== null && $subscription->getSubscriptionId() !== $subscriptionId) { + continue; + } + + $result[] = $subscription; + } + } + + return $result; + } + + /** + * Removes the subscription with the given topic filter from the repository. + * + * Returns `true` if a topic subscription existed and has been removed. + * Otherwise, `false` is returned. + * + * @param string $topicFilter + * @return bool + */ + public function removeSubscription(string $topicFilter): bool + { + $subscription = $this->getTopicSubscriptionByTopicFilter($topicFilter); + + if ($subscription === null) { + return false; + } + + $result = $this->redis->sRem(self::KEY_SUBSCRIPTIONS, $subscription); + if ($result === false || $result === 0) { + return false; + } + + return true; + } + + /** + * Find a topic subscription with the given topic filter. + * + * @param string $topicFilter + * @return Subscription|null + */ + protected function getTopicSubscriptionByTopicFilter(string $topicFilter): ?Subscription + { + $iterator = null; + while ($subscriptions = $this->redis->sScan(self::KEY_SUBSCRIPTIONS, $iterator)) { + /** @var Subscription[] $subscriptions */ + foreach ($subscriptions as $subscription) { + if ($subscription->getTopicFilter() === $topicFilter) { + return $subscription; + } + } + } + + return null; + } +} From 81189e7d5a6e01d16a943879111c3d5f29dc43aa Mon Sep 17 00:00:00 2001 From: Marvin Mall Date: Tue, 1 Sep 2020 20:44:45 +0200 Subject: [PATCH 2/3] Fix minor issues with MemoryRepository --- src/Repositories/MemoryRepository.php | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Repositories/MemoryRepository.php b/src/Repositories/MemoryRepository.php index 02d17c3..2993b2e 100644 --- a/src/Repositories/MemoryRepository.php +++ b/src/Repositories/MemoryRepository.php @@ -36,6 +36,7 @@ class MemoryRepository implements Repository * but it is currently not being used (i.e. in a resend queue). * * @return int + * @throws RepositoryException */ public function newMessageId(): int { @@ -194,11 +195,11 @@ public function getMatchingSubscriptions(string $topicName = null, int $subscrip $result = []; foreach ($this->subscriptions as $subscription) { - if (($topicName !== null) && !$subscription->matchTopicFilter($topicName)) { + if ($topicName !== null && !$subscription->matchTopicFilter($topicName)) { continue; } - if (($subscriptionId !== null) && ($subscription->getSubscriptionId() !== $subscriptionId)) { + if ($subscriptionId !== null && $subscription->getSubscriptionId() !== $subscriptionId) { continue; } From 32c094b2c3a977fcc8b5092fbb9067e82785d820 Mon Sep 17 00:00:00 2001 From: Marvin Mall Date: Sat, 24 Oct 2020 10:40:14 +0200 Subject: [PATCH 3/3] Pass message id to Redis as string --- src/Repositories/RedisRepository.php | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/Repositories/RedisRepository.php b/src/Repositories/RedisRepository.php index 16b6782..86f8e64 100644 --- a/src/Repositories/RedisRepository.php +++ b/src/Repositories/RedisRepository.php @@ -127,15 +127,15 @@ public function newMessageId(): int throw new RepositoryException('No more message identifiers available. The queue is full.'); } - $nextMessageId = $this->redis->get(static::KEY_NEXT_MESSAGE_ID); - while ($this->redis->hExists(static::KEY_PENDING_OUTGOING_MESSAGES, $nextMessageId)) { + $nextMessageId = (int) $this->redis->get(static::KEY_NEXT_MESSAGE_ID); + while ($this->redis->hExists(static::KEY_PENDING_OUTGOING_MESSAGES, (string) $nextMessageId)) { $nextMessageId++; if ($nextMessageId > 65535) { $nextMessageId = 1; } } - $this->redis->set(static::KEY_NEXT_MESSAGE_ID, $nextMessageId); + $this->redis->set(static::KEY_NEXT_MESSAGE_ID, (string) $nextMessageId); return $nextMessageId; } @@ -165,7 +165,7 @@ public function countPendingOutgoingMessages(): int public function getPendingOutgoingMessage(int $messageId): ?PendingMessage { /** @var PendingMessage|false $pendingMessage */ - $pendingMessage = $this->redis->hGet(static::KEY_PENDING_OUTGOING_MESSAGES, $messageId); + $pendingMessage = $this->redis->hGet(static::KEY_PENDING_OUTGOING_MESSAGES, (string) $messageId); if ($pendingMessage === false) { return null; @@ -210,7 +210,7 @@ public function getPendingOutgoingMessagesLastSentBefore(DateTime $dateTime = nu */ public function addPendingOutgoingMessage(PendingMessage $message): void { - $added = $this->redis->hSetNx(static::KEY_PENDING_OUTGOING_MESSAGES, $message->getMessageId(), $message); + $added = $this->redis->hSetNx(static::KEY_PENDING_OUTGOING_MESSAGES, (string) $message->getMessageId(), $message); if ($added === false) { throw new PendingMessageAlreadyExistsException($message->getMessageId()); @@ -238,7 +238,7 @@ public function markPendingOutgoingPublishedMessageAsReceived(int $messageId): b $result = $message->markAsReceived(); - $this->redis->hSet(static::KEY_PENDING_OUTGOING_MESSAGES, $messageId, $message); + $this->redis->hSet(static::KEY_PENDING_OUTGOING_MESSAGES, (string) $messageId, $message); return $result; } @@ -255,7 +255,7 @@ public function markPendingOutgoingPublishedMessageAsReceived(int $messageId): b */ public function removePendingOutgoingMessage(int $messageId): bool { - $result = $this->redis->hDel(static::KEY_PENDING_OUTGOING_MESSAGES, $messageId); + $result = $this->redis->hDel(static::KEY_PENDING_OUTGOING_MESSAGES, (string) $messageId); if ($result === false) { return false; @@ -289,7 +289,7 @@ public function countPendingIncomingMessages(): int public function getPendingIncomingMessage(int $messageId): ?PendingMessage { /** @var PendingMessage|false $pendingMessage */ - $pendingMessage = $this->redis->hGet(static::KEY_PENDING_INCOMING_MESSAGES, $messageId); + $pendingMessage = $this->redis->hGet(static::KEY_PENDING_INCOMING_MESSAGES, (string) $messageId); if ($pendingMessage === false) { return null; @@ -307,7 +307,7 @@ public function getPendingIncomingMessage(int $messageId): ?PendingMessage */ public function addPendingIncomingMessage(PendingMessage $message): void { - $added = $this->redis->hSetNx(static::KEY_PENDING_INCOMING_MESSAGES, $message->getMessageId(), $message); + $added = $this->redis->hSetNx(static::KEY_PENDING_INCOMING_MESSAGES, (string) $message->getMessageId(), $message); if ($added === false) { throw new PendingMessageAlreadyExistsException($message->getMessageId()); @@ -326,7 +326,7 @@ public function addPendingIncomingMessage(PendingMessage $message): void */ public function removePendingIncomingMessage(int $messageId): bool { - $result = $this->redis->hDel(static::KEY_PENDING_INCOMING_MESSAGES, $messageId); + $result = $this->redis->hDel(static::KEY_PENDING_INCOMING_MESSAGES, (string) $messageId); if ($result === false) { return false;