diff --git a/README.md b/README.md index eda26ad..ef5b1fe 100644 --- a/README.md +++ b/README.md @@ -398,6 +398,17 @@ services: Now, anytime a message is dispatched to EventBridge for that source, the Lambda function will be called. The Bref consumer class will put back the message into Symfony Messenger to be processed. +EventBridge event dispatching: + +By default `DetailType` is `Symfony Messenger message` but you can change it by: +```yaml +# config/services.yaml +services: + bref.messenger.eventbridge_detail_type_resolver: + class: Bref\Symfony\Messenger\Service\EventBridge\DefaultEventBridgeDetailTypeResolver +``` +You can use `DefaultEventBridgeDetailTypeResolver` which is provided by this package (it will use message class name as DetailType) or create your own resolver by implementing `Bref\Symfony\Messenger\Service\EventBridge\EventBridgeDetailTypeResolverInterface`. + ## Error handling AWS Lambda has error handling mechanisms (retrying and handling failed messages). Because of that, this package does not integrates Symfony Messenger's retry mechanism. Instead, it works with Lambda's retry mechanism. diff --git a/src/Resources/config/services.yaml b/src/Resources/config/services.yaml index 0bf098a..98fd54f 100644 --- a/src/Resources/config/services.yaml +++ b/src/Resources/config/services.yaml @@ -17,5 +17,7 @@ services: tags: ['messenger.transport_factory'] arguments: - '@bref.messenger.eventbridge_client' + - '@?bref.messenger.eventbridge_detail_type_resolver' bref.messenger.eventbridge_client: class: AsyncAws\EventBridge\EventBridgeClient + diff --git a/src/Service/EventBridge/DefaultEventBridgeDetailTypeResolver.php b/src/Service/EventBridge/DefaultEventBridgeDetailTypeResolver.php new file mode 100644 index 0000000..32bf2a9 --- /dev/null +++ b/src/Service/EventBridge/DefaultEventBridgeDetailTypeResolver.php @@ -0,0 +1,15 @@ +getMessage())); + + return end($explodedFQCN); + } +} diff --git a/src/Service/EventBridge/EventBridgeDetailTypeResolver.php b/src/Service/EventBridge/EventBridgeDetailTypeResolver.php new file mode 100644 index 0000000..9db6407 --- /dev/null +++ b/src/Service/EventBridge/EventBridgeDetailTypeResolver.php @@ -0,0 +1,10 @@ +eventBridge = $eventBridge; $this->serializer = $serializer; $this->source = $source; $this->eventBusName = $eventBusName; + $this->detailTypeResolver = $detailTypeResolver; } public function send(Envelope $envelope): Envelope @@ -37,8 +43,9 @@ public function send(Envelope $envelope): Envelope 'Entries' => [ [ 'Detail' => json_encode($encodedMessage, JSON_THROW_ON_ERROR), - // Ideally here we could put the class name of the message, but how to retrieve it? - 'DetailType' => 'Symfony Messenger message', + 'DetailType' => $this->detailTypeResolver !== null ? + $this->detailTypeResolver->resolveDetailType($envelope) : + 'Symfony Messenger message', 'Source' => $this->source, ], ], diff --git a/src/Service/EventBridge/EventBridgeTransportFactory.php b/src/Service/EventBridge/EventBridgeTransportFactory.php index 9d7f80c..d9e26db 100644 --- a/src/Service/EventBridge/EventBridgeTransportFactory.php +++ b/src/Service/EventBridge/EventBridgeTransportFactory.php @@ -13,9 +13,12 @@ final class EventBridgeTransportFactory implements TransportFactoryInterface /** @var EventBridgeClient */ private $eventBridge; - public function __construct(EventBridgeClient $eventBridge) + private ?EventBridgeDetailTypeResolver $detailTypeResolver; + + public function __construct(EventBridgeClient $eventBridge, ?EventBridgeDetailTypeResolver $detailTypeResolver = null) { $this->eventBridge = $eventBridge; + $this->detailTypeResolver = $detailTypeResolver; } public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface @@ -29,7 +32,13 @@ public function createTransport(string $dsn, array $options, SerializerInterface parse_str($parsedUrl['query'], $query); } - return new EventBridgeTransport($this->eventBridge, $serializer, $parsedUrl['host'], $query['event_bus_name'] ?? null); + return new EventBridgeTransport( + $this->eventBridge, + $serializer, + $parsedUrl['host'], + $query['event_bus_name'] ?? null, + $this->detailTypeResolver + ); } public function supports(string $dsn, array $options): bool diff --git a/tests/Unit/Service/EventBridge/DefaultEventBridgeDetailTypeResolverTest.php b/tests/Unit/Service/EventBridge/DefaultEventBridgeDetailTypeResolverTest.php new file mode 100644 index 0000000..170fb32 --- /dev/null +++ b/tests/Unit/Service/EventBridge/DefaultEventBridgeDetailTypeResolverTest.php @@ -0,0 +1,24 @@ +resolver = new DefaultEventBridgeDetailTypeResolver(); + } + + public function testResolver(): void + { + $envelope = new Envelope(new \stdClass()); + + $this->assertEquals('stdClass', $this->resolver->resolveDetailType($envelope)); + } +} diff --git a/tests/Unit/Service/EventBridge/EventBridgeTransportTest.php b/tests/Unit/Service/EventBridge/EventBridgeTransportTest.php new file mode 100644 index 0000000..8dd2ec5 --- /dev/null +++ b/tests/Unit/Service/EventBridge/EventBridgeTransportTest.php @@ -0,0 +1,211 @@ +eventBusName = null; + $this->detailTypeResolver = null; + $this->serializer = $this->getMockForAbstractClass(SerializerInterface::class); + $this->eventBridge = $this->createMock(EventBridgeClient::class); + } + + public function testSendSuccess() + { + $envelope = new Envelope(new \stdClass()); + $result = $this->createMock(PutEventsResponse::class); + + $this->serializer + ->expects($this->once()) + ->method('encode') + ->with($envelope) + ->willReturn(['body' => 'test']); + $this->eventBridge + ->expects($this->once()) + ->method('putEvents') + ->with( + [ + 'Entries' => [ + [ + 'Detail' => '{"body":"test"}', + 'DetailType' => 'Symfony Messenger message', + 'Source' => $this->source, + ], + ], + ] + ) + ->willReturn($result); + $result->expects($this->once()) + ->method('getFailedEntryCount') + ->willReturn(0); + + $this->assertSame( + $envelope, + $this->createTransport()->send($envelope) + ); + } + + public function testSendSuccessWithCustomBusName() + { + $this->eventBusName = 'custom'; + $envelope = new Envelope(new \stdClass()); + $result = $this->createMock(PutEventsResponse::class); + + $this->serializer + ->expects($this->once()) + ->method('encode') + ->with($envelope) + ->willReturn(['body' => 'test']); + $this->eventBridge + ->expects($this->once()) + ->method('putEvents') + ->with( + [ + 'Entries' => [ + [ + 'Detail' => '{"body":"test"}', + 'DetailType' => 'Symfony Messenger message', + 'Source' => $this->source, + 'EventBusName' => 'custom', + ], + ], + ] + ) + ->willReturn($result); + $result->expects($this->once()) + ->method('getFailedEntryCount') + ->willReturn(0); + + $this->assertSame( + $envelope, + $this->createTransport()->send($envelope) + ); + } + + public function testSendSuccessWithDetailTypeResolver() + { + $this->detailTypeResolver = $this->getMockForAbstractClass(EventBridgeDetailTypeResolver::class); + $envelope = new Envelope(new \stdClass()); + $result = $this->createMock(PutEventsResponse::class); + + $this->serializer + ->expects($this->once()) + ->method('encode') + ->with($envelope) + ->willReturn(['body' => 'test']); + $this->detailTypeResolver + ->expects($this->once()) + ->method('resolveDetailType') + ->with($envelope) + ->willReturn('stdClass'); + $this->eventBridge + ->expects($this->once()) + ->method('putEvents') + ->with( + [ + 'Entries' => [ + [ + 'Detail' => '{"body":"test"}', + 'DetailType' => 'stdClass', + 'Source' => $this->source, + ], + ], + ] + ) + ->willReturn($result); + $result->expects($this->once()) + ->method('getFailedEntryCount') + ->willReturn(0); + + $this->assertSame( + $envelope, + $this->createTransport()->send($envelope) + ); + } + + public function testSendFailed() + { + $envelope = new Envelope(new \stdClass()); + + $this->serializer + ->expects($this->once()) + ->method('encode') + ->with($envelope) + ->willReturn(['body' => 'test']); + $this->eventBridge + ->expects($this->once()) + ->method('putEvents') + ->willThrowException(new \Exception('event bridge exception')); + $this->expectException(TransportException::class); + + $this->createTransport()->send($envelope); + } + + public function testSendResultFailed() + { + $envelope = new Envelope(new \stdClass()); + $result = $this->createMock(PutEventsResponse::class); + $resultErrorEntry = new PutEventsResultEntry(['ErrorMessage' => 'Error message']); + + $this->serializer + ->expects($this->once()) + ->method('encode') + ->with($envelope) + ->willReturn(['body' => 'test']); + $this->eventBridge + ->expects($this->once()) + ->method('putEvents') + ->willReturn($result); + $result->expects($this->once()) + ->method('getFailedEntryCount') + ->willReturn(1); + $result->expects($this->once()) + ->method('getEntries') + ->willReturn([$resultErrorEntry]); + + $this->expectException(TransportException::class); + $this->expectExceptionMessage("1 message(s) could not be published to EventBridge: Error message."); + + $this->assertSame( + $envelope, + $this->createTransport()->send($envelope) + ); + } + + private function createTransport(): EventBridgeTransport + { + return new EventBridgeTransport( + $this->eventBridge, + $this->serializer, + $this->source, + $this->eventBusName, + $this->detailTypeResolver + ); + } +}