Skip to content

Commit

Permalink
feat: allow using symfony/uid to generate ids
Browse files Browse the repository at this point in the history
  • Loading branch information
alekitto committed Apr 17, 2024
1 parent aa874ac commit 6e45b84
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 22 deletions.
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
"symfony/framework-bundle": "^4.2 || ^5.0 || ^6.0 || ^7.0",
"symfony/process": "^4.2 || ^5.0 || ^6.0 || ^7.0",
"symfony/property-access": "^4.2 || ^5.0 || ^6.0 || ^7.0",
"symfony/serializer": "^4.2 || ^5.0 || ^6.0 || ^7.0"
"symfony/serializer": "^4.2 || ^5.0 || ^6.0 || ^7.0",
"symfony/uid": "^5.1 || ^6.0 || ^7.0"
},
"suggest": {
"ramsey/uuid": "Required to use mongodb and dbal transport",
Expand Down
11 changes: 6 additions & 5 deletions lib/DependencyInjection/Compiler/CheckDependencyPass.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@

use Kcs\MessengerExtra\Transport\Dbal\DbalTransportFactory;
use LogicException;
use Ramsey\Uuid\Doctrine\UuidBinaryType;
use Ramsey\Uuid\UuidInterface;
use Safe\Exceptions\UrlException;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\Uid\Uuid as SymfonyUuid;

use function class_exists;
use function in_array;
Expand Down Expand Up @@ -38,13 +38,14 @@ public function process(ContainerBuilder $container): void

if (
($scheme === 'doctrine' || in_array($scheme, DbalTransportFactory::DBAL_SUPPORTED_SCHEMES, true))
&& ! class_exists(UuidBinaryType::class)
&& ! class_exists(UuidInterface::class)
&& ! class_exists(SymfonyUuid::class)
) {
throw new LogicException('Please install ramsey/uuid-doctrine package to use dbal transport');
throw new LogicException('Please install symfony/uid or ramsey/uuid package to use dbal transport');
}

if ($scheme === 'mongodb' && ! interface_exists(UuidInterface::class)) {
throw new LogicException('Please install ramsey/uuid package to use mongodb transport');
if ($scheme === 'mongodb' && ! class_exists(SymfonyUuid::class) && ! interface_exists(UuidInterface::class)) {
throw new LogicException('Please install symfony/uid or ramsey/uuid package to use mongodb transport');
}
}
}
Expand Down
8 changes: 3 additions & 5 deletions lib/Transport/Dbal/DbalReceiver.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@
use Doctrine\DBAL\ParameterType;
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Types\Types;
use Ramsey\Uuid\Codec\StringCodec;
use Ramsey\Uuid\Uuid;
use Ramsey\Uuid\UuidFactory;
use Safe\DateTimeImmutable;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
Expand All @@ -21,10 +19,12 @@
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Uid\Uuid as SymfonyUuid;
use Throwable;

use function assert;
use function bin2hex;
use function class_exists;
use function is_resource;
use function json_decode;
use function microtime;
Expand All @@ -44,7 +44,6 @@ class DbalReceiver implements ReceiverInterface, MessageCountAwareInterface, Lis
private Connection $connection;
private float $redeliverMessagesLastExecutedAt;
private float $removeExpiredMessagesLastExecutedAt;
private StringCodec $codec;
private QueryBuilder $select;
private QueryBuilder $update;
private int $retryingSafetyCounter = 0;
Expand All @@ -54,7 +53,6 @@ public function __construct(Connection $connection, string $tableName, ?Serializ
$this->connection = $connection;
$this->tableName = $tableName;
$this->serializer = $serializer ?? Serializer::create();
$this->codec = new StringCodec((new UuidFactory())->getUuidBuilder());

$this->select = $this->connection->createQueryBuilder()
->select('id')
Expand Down Expand Up @@ -194,7 +192,7 @@ private function hydrate(array $row): Envelope
*/
private function fetchMessage(): ?Envelope
{
$deliveryId = $this->codec->encodeBinary(Uuid::uuid4());
$deliveryId = class_exists(SymfonyUuid::class) ? SymfonyUuid::v4()->toRfc4122() : Uuid::uuid4()->toString();
$result = $this->select
->setParameter('delayedUntil', new DateTimeImmutable(), Types::DATETIMETZ_IMMUTABLE)
->executeQuery()
Expand Down
8 changes: 1 addition & 7 deletions lib/Transport/Dbal/DbalSender.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
use Kcs\MessengerExtra\Message\PriorityAwareMessageInterface;
use Kcs\MessengerExtra\Message\TTLAwareMessageInterface;
use Kcs\MessengerExtra\Message\UniqueMessageInterface;
use Ramsey\Uuid\Codec\OrderedTimeCodec;
use Ramsey\Uuid\Uuid;
use Ramsey\Uuid\UuidFactory;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
Expand All @@ -38,15 +35,12 @@ class DbalSender implements SenderInterface
private SerializerInterface $serializer;
private string $tableName;
private Connection $connection;
private OrderedTimeCodec $codec;

public function __construct(Connection $connection, string $tableName, ?SerializerInterface $serializer = null)
{
$this->connection = $connection;
$this->tableName = $tableName;
$this->serializer = $serializer ?? Serializer::create();

$this->codec = new OrderedTimeCodec((new UuidFactory())->getUuidBuilder());
}

public function send(Envelope $envelope): Envelope
Expand All @@ -65,7 +59,7 @@ public function send(Envelope $envelope): Envelope
->withoutStampsOfType(TransportMessageIdStamp::class)
->withoutStampsOfType(DelayStamp::class));

$messageId = $this->codec->encodeBinary(Uuid::uuid1());
$messageId = MessageId::generate();
$values = [
'id' => $messageId,
'published_at' => new DateTimeImmutable(),
Expand Down
37 changes: 37 additions & 0 deletions lib/Transport/Dbal/MessageId.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?php

declare(strict_types=1);

namespace Kcs\MessengerExtra\Transport\Dbal;

use Ramsey\Uuid\Uuid;
use Ramsey\Uuid\UuidInterface;
use RuntimeException;
use Symfony\Component\Uid\Uuid as SymfonyUuid;

use function class_exists;
use function substr;

final class MessageId
{
public static function generate(): string
{
if (class_exists(SymfonyUuid::class)) {
$uuid = SymfonyUuid::v1();

return $uuid->toBinary();
}

if (class_exists(UuidInterface::class)) {
$uuid = Uuid::uuid1();
$bytes = $uuid->getFields()->getBytes();

return $bytes[6] . $bytes[7]
. $bytes[4] . $bytes[5]
. $bytes[0] . $bytes[1] . $bytes[2] . $bytes[3]
. substr($bytes, 8);
}

throw new RuntimeException('No Uuid class found. Please install ramsey/uuid package or symfony/uid package.');
}
}
10 changes: 6 additions & 4 deletions lib/Transport/Mongo/MongoReceiver.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Uid\Uuid as SymfonyUuid;
use Throwable;

use function assert;
use function class_exists;
use function microtime;
use function time;

Expand Down Expand Up @@ -121,17 +123,17 @@ public function getMessageCount(): int
*/
private function hydrate(array $row): Envelope
{
$envelope = $this->serializer->decode($row);

return $envelope->with(new TransportMessageIdStamp($row['_id']));
return $this->serializer
->decode($row)
->with(new TransportMessageIdStamp($row['_id']));
}

/**
* Fetches a message if it is any.
*/
private function fetchMessage(): ?Envelope
{
$deliveryId = Uuid::uuid4()->toString();
$deliveryId = class_exists(SymfonyUuid::class) ? SymfonyUuid::v4()->toRfc4122() : Uuid::uuid4()->toString();
$now = time();

/** @phpstan-var array{_id: string, body: string, headers: string, id: (resource|string), time_to_live: ?int}|null $message */
Expand Down

0 comments on commit 6e45b84

Please sign in to comment.