Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to configure DetailType for EventBridge events #79

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,17 @@ services:

Now, anytime a message is dispatched to EventBridge for that source, the Lambda function will be called. The Bref consumer class will put back the message into Symfony Messenger to be processed.

EventBridge event dispatching:

By default `DetailType` is `Symfony Messenger message` but you can change it by:
```yaml
# config/services.yaml
services:
bref.messenger.eventbridge_detail_type_resolver:
class: Bref\Symfony\Messenger\Service\EventBridge\DefaultEventBridgeDetailTypeResolver
```
You can use `DefaultEventBridgeDetailTypeResolver` which is provided by this package (it will use message class name as DetailType) or create your own resolver by implementing `Bref\Symfony\Messenger\Service\EventBridge\EventBridgeDetailTypeResolverInterface`.

## Error handling

AWS Lambda has error handling mechanisms (retrying and handling failed messages). Because of that, this package does not integrates Symfony Messenger's retry mechanism. Instead, it works with Lambda's retry mechanism.
Expand Down
2 changes: 2 additions & 0 deletions src/Resources/config/services.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,7 @@ services:
tags: ['messenger.transport_factory']
arguments:
- '@bref.messenger.eventbridge_client'
- '@?bref.messenger.eventbridge_detail_type_resolver'
bref.messenger.eventbridge_client:
class: AsyncAws\EventBridge\EventBridgeClient

15 changes: 15 additions & 0 deletions src/Service/EventBridge/DefaultEventBridgeDetailTypeResolver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php declare(strict_types=1);

namespace Bref\Symfony\Messenger\Service\EventBridge;

use Symfony\Component\Messenger\Envelope;

class DefaultEventBridgeDetailTypeResolver implements EventBridgeDetailTypeResolver
{
public function resolveDetailType(Envelope $message): string
{
$explodedFQCN = explode('\\', get_class($message->getMessage()));

return end($explodedFQCN);
}
}
10 changes: 10 additions & 0 deletions src/Service/EventBridge/EventBridgeDetailTypeResolver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php declare(strict_types=1);

namespace Bref\Symfony\Messenger\Service\EventBridge;

use Symfony\Component\Messenger\Envelope;

interface EventBridgeDetailTypeResolver
{
public function resolveDetailType(Envelope $message): string;
}
35 changes: 21 additions & 14 deletions src/Service/EventBridge/EventBridgeTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,34 @@
use Exception;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
use Throwable;

final class EventBridgeTransport implements TransportInterface
{
/** @var SerializerInterface */
private $serializer;
/** @var EventBridgeClient */
private $eventBridge;
/** @var string */
private $source;
/** @var string */
private $eventBusName;

public function __construct(EventBridgeClient $eventBridge, SerializerInterface $serializer, string $source, ?string $eventBusName = null)
{
private SerializerInterface $serializer;

private EventBridgeClient $eventBridge;

private string $source;

private ?string $eventBusName;

private ?EventBridgeDetailTypeResolver $detailTypeResolver;

public function __construct(
EventBridgeClient $eventBridge,
SerializerInterface $serializer,
string $source,
?string $eventBusName = null,
?EventBridgeDetailTypeResolver $detailTypeResolver = null
) {
$this->eventBridge = $eventBridge;
$this->serializer = $serializer;
$this->source = $source;
$this->eventBusName = $eventBusName;
$this->detailTypeResolver = $detailTypeResolver;
}

public function send(Envelope $envelope): Envelope
Expand All @@ -37,8 +43,9 @@ public function send(Envelope $envelope): Envelope
'Entries' => [
[
'Detail' => json_encode($encodedMessage, JSON_THROW_ON_ERROR),
// Ideally here we could put the class name of the message, but how to retrieve it?
'DetailType' => 'Symfony Messenger message',
'DetailType' => $this->detailTypeResolver !== null ?
$this->detailTypeResolver->resolveDetailType($envelope) :
'Symfony Messenger message',
'Source' => $this->source,
],
],
Expand Down
13 changes: 11 additions & 2 deletions src/Service/EventBridge/EventBridgeTransportFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ final class EventBridgeTransportFactory implements TransportFactoryInterface
/** @var EventBridgeClient */
private $eventBridge;

public function __construct(EventBridgeClient $eventBridge)
private ?EventBridgeDetailTypeResolver $detailTypeResolver;

public function __construct(EventBridgeClient $eventBridge, ?EventBridgeDetailTypeResolver $detailTypeResolver = null)
{
$this->eventBridge = $eventBridge;
$this->detailTypeResolver = $detailTypeResolver;
}

public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
Expand All @@ -29,7 +32,13 @@ public function createTransport(string $dsn, array $options, SerializerInterface
parse_str($parsedUrl['query'], $query);
}

return new EventBridgeTransport($this->eventBridge, $serializer, $parsedUrl['host'], $query['event_bus_name'] ?? null);
return new EventBridgeTransport(
$this->eventBridge,
$serializer,
$parsedUrl['host'],
$query['event_bus_name'] ?? null,
$this->detailTypeResolver
);
}

public function supports(string $dsn, array $options): bool
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?php

namespace Bref\Symfony\Messenger\Test\Unit\Service\EventBridge;

use Bref\Symfony\Messenger\Service\EventBridge\DefaultEventBridgeDetailTypeResolver;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;

class DefaultEventBridgeDetailTypeResolverTest extends TestCase
{
private DefaultEventBridgeDetailTypeResolver $resolver;

public function setUp(): void
{
$this->resolver = new DefaultEventBridgeDetailTypeResolver();
}

public function testResolver(): void
{
$envelope = new Envelope(new \stdClass());

$this->assertEquals('stdClass', $this->resolver->resolveDetailType($envelope));
}
}
211 changes: 211 additions & 0 deletions tests/Unit/Service/EventBridge/EventBridgeTransportTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
<?php

namespace Bref\Symfony\Messenger\Test\Unit\Service\EventBridge;

use AsyncAws\EventBridge\EventBridgeClient;
use AsyncAws\EventBridge\Result\PutEventsResponse;
use AsyncAws\EventBridge\ValueObject\PutEventsResultEntry;
use Bref\Symfony\Messenger\Service\EventBridge\EventBridgeDetailTypeResolver;
use Bref\Symfony\Messenger\Service\EventBridge\EventBridgeTransport;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

class EventBridgeTransportTest extends TestCase
{
/** @var SerializerInterface|MockObject */
private $serializer;

/** @var EventBridgeClient|MockObject */
private $eventBridge;

/** @var EventBridgeDetailTypeResolver|MockObject|null */
private $detailTypeResolver;

private string $source = 'myapp';

private ?string $eventBusName = null;

public function setUp(): void
{
$this->eventBusName = null;
$this->detailTypeResolver = null;
$this->serializer = $this->getMockForAbstractClass(SerializerInterface::class);
$this->eventBridge = $this->createMock(EventBridgeClient::class);
}

public function testSendSuccess()
{
$envelope = new Envelope(new \stdClass());
$result = $this->createMock(PutEventsResponse::class);

$this->serializer
->expects($this->once())
->method('encode')
->with($envelope)
->willReturn(['body' => 'test']);
$this->eventBridge
->expects($this->once())
->method('putEvents')
->with(
[
'Entries' => [
[
'Detail' => '{"body":"test"}',
'DetailType' => 'Symfony Messenger message',
'Source' => $this->source,
],
],
]
)
->willReturn($result);
$result->expects($this->once())
->method('getFailedEntryCount')
->willReturn(0);

$this->assertSame(
$envelope,
$this->createTransport()->send($envelope)
);
}

public function testSendSuccessWithCustomBusName()
{
$this->eventBusName = 'custom';
$envelope = new Envelope(new \stdClass());
$result = $this->createMock(PutEventsResponse::class);

$this->serializer
->expects($this->once())
->method('encode')
->with($envelope)
->willReturn(['body' => 'test']);
$this->eventBridge
->expects($this->once())
->method('putEvents')
->with(
[
'Entries' => [
[
'Detail' => '{"body":"test"}',
'DetailType' => 'Symfony Messenger message',
'Source' => $this->source,
'EventBusName' => 'custom',
],
],
]
)
->willReturn($result);
$result->expects($this->once())
->method('getFailedEntryCount')
->willReturn(0);

$this->assertSame(
$envelope,
$this->createTransport()->send($envelope)
);
}

public function testSendSuccessWithDetailTypeResolver()
{
$this->detailTypeResolver = $this->getMockForAbstractClass(EventBridgeDetailTypeResolver::class);
$envelope = new Envelope(new \stdClass());
$result = $this->createMock(PutEventsResponse::class);

$this->serializer
->expects($this->once())
->method('encode')
->with($envelope)
->willReturn(['body' => 'test']);
$this->detailTypeResolver
->expects($this->once())
->method('resolveDetailType')
->with($envelope)
->willReturn('stdClass');
$this->eventBridge
->expects($this->once())
->method('putEvents')
->with(
[
'Entries' => [
[
'Detail' => '{"body":"test"}',
'DetailType' => 'stdClass',
'Source' => $this->source,
],
],
]
)
->willReturn($result);
$result->expects($this->once())
->method('getFailedEntryCount')
->willReturn(0);

$this->assertSame(
$envelope,
$this->createTransport()->send($envelope)
);
}

public function testSendFailed()
{
$envelope = new Envelope(new \stdClass());

$this->serializer
->expects($this->once())
->method('encode')
->with($envelope)
->willReturn(['body' => 'test']);
$this->eventBridge
->expects($this->once())
->method('putEvents')
->willThrowException(new \Exception('event bridge exception'));
$this->expectException(TransportException::class);

$this->createTransport()->send($envelope);
}

public function testSendResultFailed()
{
$envelope = new Envelope(new \stdClass());
$result = $this->createMock(PutEventsResponse::class);
$resultErrorEntry = new PutEventsResultEntry(['ErrorMessage' => 'Error message']);

$this->serializer
->expects($this->once())
->method('encode')
->with($envelope)
->willReturn(['body' => 'test']);
$this->eventBridge
->expects($this->once())
->method('putEvents')
->willReturn($result);
$result->expects($this->once())
->method('getFailedEntryCount')
->willReturn(1);
$result->expects($this->once())
->method('getEntries')
->willReturn([$resultErrorEntry]);

$this->expectException(TransportException::class);
$this->expectExceptionMessage("1 message(s) could not be published to EventBridge: Error message.");

$this->assertSame(
$envelope,
$this->createTransport()->send($envelope)
);
}

private function createTransport(): EventBridgeTransport
{
return new EventBridgeTransport(
$this->eventBridge,
$this->serializer,
$this->source,
$this->eventBusName,
$this->detailTypeResolver
);
}
}