Skip to content

Latest commit

 

History

History
678 lines (519 loc) · 22.5 KB

chapter_07_events_and_message_bus.asciidoc

File metadata and controls

678 lines (519 loc) · 22.5 KB

Events and the Message Bus

So far we’ve spent a lot of time and energy on a simple problem that we could easily have solved with Django. You might be asking if the increased testability and expressiveness are really worth all the effort.

In practice, though, we find that it’s not the obvious features that make a mess of our codebases: it’s the goop around the edge. It’s reporting, and permissions and workflows that touch a zillion objects.

Our example will be a typical notification requirement: when we can’t allocate an order because we’re out of stock, we should alert the buying team. They’ll go and fix the problem by buying more stock, and all will be well.

For a first version, our product owner says we can just send the alert by email.

Let’s see how our architecture holds up once we need to plug in some of the mundane stuff that makes up so much of our systems.

We’ll start by doing the simplest, most expeditious thing, and talk about why it’s exactly this kind of decision that leads us to Big Ball of Mud.

Then we’ll show how to use Domain Events to separate side-effects from our use cases, and how to build a simple Message Bus for triggering behaviour based on those events. We’ll show a few different options for creating those events and how to pass them to the message bus, and finally we’ll show how the Unit of Work can be modified to connect the two together elegantly.

message bus diagram
Figure 1. Events flowing through the system
[ditaa, message_bus_diagram]

+----------+            +--------+
|          |  raises    |  Unit  |
|  Domain  |--events--> |   of   |
|          |            |  Work  |
+----------+            +--------+
                            |
                        publishes
                          events
                            |
                            V
        +----------------------+
        |     Message Bus      |
        +----------------------+
           |
        handles
        events
           |     +-----------------+
           +---> |     Handlers    |-+
                 +-----------------+ | +
                   +-----------------+ |
                     +-----------------+

Avoiding Making a Mess.

First, Avoid Making a Mess of of our Web Controllers

So. Email alerts when we run out of stock. When we have a new requirement like this, that’s not really to do with the core domain, it’s all too easy to start dumping these things into our web controllers:

Example 1. Just whack it in the endpoint, what could go wrong? (src/allocation/flask_app.py)
@app.route("/allocate", methods=['POST'])
def allocate_endpoint():
    line = model.OrderLine(
        request.json['orderid'],
        request.json['sku'],
        request.json['qty'],
    )
    try:
        batchref = services.allocate(line, unit_of_work.start)
    except (model.OutOfStock, services.InvalidSku) as e:
        send_mail(
            'out of stock',
            '[email protected]',
            f'{line.orderid} - {line.sku}'
        )
        return jsonify({'message': str(e)}), 400

    return jsonify({'batchref': batchref}), 201

As a one-off hack, this might be okay, but it’s easy to see how we can quickly end up in a mess by patching things in this way. Sending emails isn’t the job of our HTTP layer, and we’d like to be able to unit test this new feature.

…​ And Let’s Not Make a Mess of our Model Either

Assuming we don’t want to put this code into our web controllers, because we want them to be as thin as possible, we may look at putting it right at the source, in the model:

Example 2. Email-sending code in our model isn’t lovely either (src/allocation/model.py)
    def allocate(self, line: OrderLine) -> str:
        try:
            batch = next(
                b for b in sorted(self.batches) if b.can_allocate(line)
            )
            #...
        except StopIteration:
            email.send_mail('[email protected]', f'Out of stock for {line.sku}')
            raise exceptions.OutOfStock(f'Out of stock for sku {line.sku}')

But that’s even worse! We don’t want our model to have any dependencies on infrastructure concerns like email.send_mail.

This email sending thing is unwelcome goop messing up the nice clean flow of our system. What we’d like is to keep our domain model focused on the rule "You can’t allocate more stuff than is actually available."

The domain model’s job is to know that we’re out of stock, but the responsibility of sending an alert belongs elsewhere. We should be able to turn this feature on or off, or to switch to SMS notifications instead, without needing to change the rules of our domain model.

…​ Or the Service Layer!

The requirement "Try to allocate some stock, and send an email if it fails" is an example of workflow orchestration: it’s a set of steps that the system has to follow to achieve a goal.

We’ve written a service layer to manage orchestration for us, but even here the feature feels out of place:

Example 3. And in the services layer it’s out of place (src/allocation/services.py)
def allocate(
        orderid: str, sku: str, qty: int,
        uow: unit_of_work.AbstractUnitOfWork
) -> str:
    line = OrderLine(orderid, sku, qty)
    with uow:
        product = uow.products.get(sku=line.sku)
        if product is None:
            raise exceptions.InvalidSku(f'Invalid sku {line.sku}')
        try:
            batchref = product.allocate(line)
            uow.commit()
            return batchref
        except exceptions.OutOfStock:
            email.send_mail('[email protected]', f'Out of stock for {line.sku}')
            raise

Catching an exception and re-raising it? I mean, it could be worse, but it’s definitely making us unhappy. Why is it so hard to find a suitable home for this code?

Single Responsibility Principle

Really this is a violation of the single responsibility principle[1]. Our use case is allocation. Our endpoint, service function, and domain methods are all called allocate, not allocate_and_send_mail_if_out_of_stock.

Tip
Rule of thumb: if you can’t describe what your function does without using words like "then" or "and," you might be violating the SRP.

One formulation of the SRP is that each class should only have a single reason to change. When we switch from email to SMS, we shouldn’t have to update our "allocate" function, because that’s clearly a separate responsibility.

To solve the problem, we’re going to split the orchestration into separate steps, so that the different concerns don’t get tangled up. The domain model’s job is to know that we’re out of stock, but the responsibility of sending an alert belongs elsewhere. We should be able to turn this feature on or off, or to switch to SMS notifications instead, without needing to change the rules of our domain model.

We’d also like to keep the service layer free of implementation details. We want to apply the Dependency Inversion Principle to notifications, so that our service layer depends on an abstraction, in the same way as we avoid depending on the database by using a UnitOfWork.

All Aboard the Message Bus!

The patterns we’re going to introduce here are Domain Events and the Message Bus. There’s a few different ways you can implement them, so we’ll show a couple of different ones before settling on the one we most like.

The Model Records Events

First, rather than being concerned about emails, our model will be in charge of recording "events"--facts about things that have happened. We’ll use a Message Bus to respond to events, and invoke some new operation.

Events Are Simple Dataclasses

An Event is a kind of value object. They don’t have any behaviour, because they’re pure data structures. We always name events in the language of the domain, and we think of them as part of our domain model.

We could store them in model.py, but we may as well keep them in their own file. (this might be a good time to consider refactoring out a directory called "domain," so we have domain/model.py and domain/events.py).

Example 4. Event classes (src/allocation/events.py)
from dataclasses import dataclass

class Event:  #(1)
    pass

@dataclass
class OutOfStock(Event):  #(2)
    sku: str
  1. Once we have a number of events we’ll find it useful to have a parent class that can store common attributes. It’s also useful for type hints in our message bus, as we’ll see shortly.

  2. dataclasses are great for domain events too.

The Model Records Events

When our domain model records a fact that happened, we say it "raises" an event.

Example 5. The model raises a domain event (src/allocation/model.py)
class Product:

    def __init__(self, sku: str, batches: List[Batch], version_number: int = 0):
        self.sku = sku
        self.batches = batches
        self.version_number = version_number
        self.events = []  # type: List[events.Event]  #(1)

    def allocate(self, line: OrderLine) -> str:
        try:
            #...
        except StopIteration:
            self.events.append(events.OutOfStock(line.sku))  #(2)
            # raise exceptions.OutOfStock(f'Out of stock for sku {line.sku}')  #(3)
            return None
  1. Our Aggregate grows a .events attribute, where it will store facts about what has happened.

  2. Rather than invoking some email-sending code directly, we record those events at the place they occur, using only the language of the domain.

  3. We’re also going to stop raising an exception for the out-of-stock case. The event will do the job the exception was doing.

Note
We’re actually addressing a code smell we had until now, which is that we were using exceptions for control flow. In general, if you’re implementing domain events, don’t raise exceptions to describe the same domain concept. As we’ll see later when we handle events in the Unit of Work, it’s confusing to have to reason about events and exceptions together.

The Message Bus Maps Events to Handlers

A message bus basically says "when I see this event, I should invoke the following handler function". In other words, it’s a simple publish-subscribe system. Handlers are subscribed to receive events, which we publish to the bus. It sounds harder than it is, and we usually implement it with a dict:

Example 6. Simple message bus (src/allocation/messagebus.py)
def handle(event: events.Event):
    for handler in HANDLERS[type(event)]:
        handler(event)


def send_out_of_stock_notification(event: events.OutOfStock):
    email.send_mail(
        '[email protected]',
        f'Out of stock for {event.sku}',
    )


HANDLERS = {
    events.OutOfStock: [send_out_of_stock_notification],

}  # type: Dict[Type[events.Event], List[Callable]]
Note
Note that the messagebus as implemented is entirely synchronous—​our objective isn’t to create an async/nonblocking task queue to defer execution in time, we’re only aiming to separate tasks conceptually.

Option 1 : The Service Layer Takes Events from the Model and Puts them on the Message Bus

Our domain model raises events, and our message bus will call the right handlers whenever an event happens. Now all we need is to connect the two. We need something to catch events from the model and pass them to the message bus-- the "publishing" step.

The simplest way to do this is by adding some code into our service layer.

Example 7. The service layer with an explicit message bus (src/allocation/services.py)
def allocate(
        orderid: str, sku: str, qty: int,
        uow: unit_of_work.AbstractUnitOfWork
) -> str:
    line = OrderLine(orderid, sku, qty)
    with uow:
        product = uow.products.get(sku=line.sku)
        if product is None:
            raise exceptions.InvalidSku(f'Invalid sku {line.sku}')
        try:  #(1)
            batchref = product.allocate(line)
            uow.commit()
            return batchref
        finally:  #(1)
            messagebus.handle(product.events)  #(2)
  1. We keep the try/finally from our ugly earlier implementation (we haven’t got rid of all exceptions yet, just OutOfStock).

  2. But now instead of depending directly on some email infrastructure, the service layer is just in charge of passing events from the model up to the message bus.

That already avoids some of the ugliness that we had in our naive implementation, and we have several systems that work like this, in which the service layer explicitly collects events from aggregates, and passes them to the messagebus.

Option 2: The Service Layer Raises its own Events

Another variant on this which we’ve used is that you can have the service layer in charge of creating and raising events directly, rather than having them raised by the domain model.

Example 8. Listing title
def allocate(
        orderid: str, sku: str, qty: int,
        uow: unit_of_work.AbstractUnitOfWork
) -> str:
    line = OrderLine(orderid, sku, qty)
    with uow:
        product = uow.products.get(sku=line.sku)
        if product is None:
            raise exceptions.InvalidSku(f'Invalid sku {line.sku}')
        batchref = product.allocate(line)
        if batchref is None:
            messagebus.handle(events.OutOfStock(line.sku))
        uow.commit()
        return batchref

Again, we have applications in production that implement the pattern in this way. What works for you will depend on the particular trade-offs you face, but we’d like to show you what we think is the most elegant solution, in which we put the unit of work in charge of collecting and raising events.

The Unit of Work Can Publish Events to the Message Bus

The UoW already has a try/finally, and it knows about all the aggregates currently in play because it provides access to the Repository. So it’s a good place to spot events and pass them to the message bus:

Example 9. The UoW meets the message bus (src/allocation/unit_of_work.py)
class AbstractUnitOfWork(abc.ABC):
    ...

    def commit(self):
        self._commit()  #(1)
        self.publish_events()  #(2)

    def publish_events(self):  #(2)
        for product in self.products.seen:  #(3)
            while product.events:
                event = product.events.pop(0)
                messagebus.handle(event)

    @abc.abstractmethod
    def _commit(self):
        raise NotImplementedError

...

class SqlAlchemyUnitOfWork(AbstractUnitOfWork):
    ...

    def _commit(self):  #(1)
        self.session.commit()
  1. We’ll change our commit method to require a private ._commit() method from subclasses

  2. After committing, we run through all the objects that our repository has seen and pass their events to the message bus.

  3. That relies on the repository keeping track of aggregates that have been loaded using a new attribute, .seen, as we’ll see in the next listing.

Note
Are you wondering about error-handling, what happens if one of the handlers fails? We’ll discuss that in detail in [chapter_09_commands].
Example 10. Repository tracks aggregates seen (src/allocation/repository.py)
class AbstractRepository(abc.ABC):

    def __init__(self):
        self.seen = set()  # type: Set[model.Product]  #(1)

    def add(self, product):  #(2)
        self._add(product)
        self.seen.add(product)

    def get(self, sku):  #(3)
        p = self._get(sku)
        if p:
            self.seen.add(p)
        return p

    @abc.abstractmethod
    def _add(self, product):  #(2)
        raise NotImplementedError

    @abc.abstractmethod  #(3)
    def _get(self, sku):
        raise NotImplementedError



class SqlAlchemyRepository(AbstractRepository):

    def __init__(self, session):
        super().__init__()
        self.session = session

    def _add(self, product):  #(2)
        self.session.add(product)

    def _get(self, sku):  #(3)
        return self.session.query(model.Product).filter_by(sku=sku).first()
  1. We initialise a set to store objects seen. That means our implementations need to call super().init()

  2. The parent add() method adds things to .seen, and now requires subclasses to implement ._add()

  3. Similarly, .get() delegates to a ._get() function, to be implemented by subclasses, in order to capture objects seen.

Once the UoW and repository collaborate in this way to automatically keep track of live objects and process their events, the service layer can now be totally free of event-handling concerns:

Example 11. Service layer is clean again (src/allocation/services.py)
def allocate(
        orderid: str, sku: str, qty: int,
        uow: unit_of_work.AbstractUnitOfWork
) -> str:
    line = OrderLine(orderid, sku, qty)
    with uow:
        product = uow.products.get(sku=line.sku)
        if product is None:
            raise exceptions.InvalidSku(f'Invalid sku {line.sku}')
        batchref = product.allocate(line)
        uow.commit()
        return batchref

We do also have to remember to change the fakes in the service layer and make them call super() in the right places, and implement underscorey methods, but the changes are minimal:

Example 12. Service-layer fakes need tweaking. (tests/unit/test_services.py)
class FakeRepository(repository.AbstractRepository):

    def __init__(self, products):
        super().__init__()
        self._products = set(products)

    def _add(self, product):
        self._products.add(product)

    def _get(self, sku):
        return next((p for p in self._products if p.sku == sku), None)

...

class FakeUnitOfWork(unit_of_work.AbstractUnitOfWork):
    ...

    def _commit(self):
        self.committed = True

You may be starting to worry that maintaining these fakes is going to be a maintenance burden. There’s no doubt that it is work, but in our experience it’s not a lot of work. Once your project is up and running, the interface for your repository and UoW abstractions really don’t change much. And if you’re using ABC’s, they’ll help remind you when things get out of sync.

Unit Testing with a Fake Message Bus

TODO: discuss replacing @mock test with FakeMessageBus, and the idea that once you have a messagebus and handlers, you can test each handler separately. pros and cons of each. maybe not in this chapter tho?

Wrap-Up

Domain events give us a way to handle workflows in our system. We often find, listening to our domain experts, that they express requirements in a causal or temporal way, for example "When we try to allocate stock, but there’s none available, then we should send an email to the buying team".

The magic words "When X then Y" often tell us about an event that we can make concrete in our system. Treating events as first-class things in our model helps us to make our code more testable and observable, and helps to isolate concerns.

Events are useful for more than just sending emails, though. In Chapter 5 we spent a lot of time convincing you that you should define aggregates, or boundaries where we guarantee consistency. People often ask "what should I do if I need to change multiple aggregates as part of a request?" Now we have the tools we need to answer the question.

If we have two things that can be transactionally isolated (eg. an Order and a Product) then we can make them eventually consistent by using events. When an Order is cancelled, then we should find the products that were allocated to it, and remove the allocations.

In Chapter 8, we’ll look at this idea in more detail as we build a more complex workflow with our new message bus.

Recap: Domain Events and the Message Bus
Events can help with SRP

Code gets tangled up when we mix multiple concerns in one place. Events can help us to keep things tidy by separating primary use-cases from secondary ones. We also use events for communicating between aggregates so that we don’t need to run long-running transactions that lock against multiple tables.

A Message Bus routes messages to handlers

You can think of a message bus as a dict that maps from events to their consumers. It doesn’t "know" anything about the meaning of events, it’s just a piece of dumb infrastructure for getting messages around the system.

Option 1: Service Layer raises events and passes them to Message Bus

The simplest way to start using events in your system is to raise them from handlers, by calling bus.handle(some_new_event) after you commit your unit of work.

Option 2: Domain Model raises events, Service Layer passes them to Message Bus

The logic about when to raise an event really should live with the model, so we can improve our system’s design and testability by raising events from the domain model. It’s easy for our handlers to collect events off the model objects after commit and pass them to the bus.

Option 3: Unit of Work collects events from Aggregates and passes them to Message Bus

Adding bus.handle(aggregate.events) to every handler is annoying, so we can tidy up by making our unit of work responsible for raising events that were raised by loaded objects. This is the most complex design and might rely on ORM magic, but it’s clean and easy to use once it’s set up.

Table 1. Domain Events: The Trade-Offs
Pros Cons
  • SRP

  • Events = quite a nice way to model real life

  • message bus is weird.

  • the one we’ve presented is not async, so it can still, eg, hang your web responses, which prolly isn’t what you’re expecting.

  • celery is fine.

TODO: finish up pros + cons


1. the S from SOLID