diff --git a/SimpleEventStore.Tests/InMemory/InMemoryEventStoreAppending.cs b/SimpleEventStore.Tests/InMemory/InMemoryEventStoreAppending.cs index 9258434..81cbf50 100644 --- a/SimpleEventStore.Tests/InMemory/InMemoryEventStoreAppending.cs +++ b/SimpleEventStore.Tests/InMemory/InMemoryEventStoreAppending.cs @@ -1,15 +1,126 @@ +using System; +using System.Linq; using System.Threading.Tasks; using NUnit.Framework; using SimpleEventStore.InMemory; +using SimpleEventStore.Tests.Events; namespace SimpleEventStore.Tests.InMemory { [TestFixture] public class InMemoryEventStoreAppending : EventStoreAppending { + private InMemoryEventStreamChanged lastReceivedInMemoryEventStreamChangedEvent; + private readonly TestMetadata metadata = new TestMetadata { Value = "Hello" }; + protected override Task CreateStorageEngine() { - return Task.FromResult((IStorageEngine)new InMemoryStorageEngine()); + var storageEngine = new InMemoryStorageEngine(); + storageEngine.OnStreamChanged += StorageEngineOnStreamChanged; + return Task.FromResult((IStorageEngine)storageEngine); + } + + private void StorageEngineOnStreamChanged(object sender, InMemoryEventStreamChanged e) + { + lastReceivedInMemoryEventStreamChangedEvent = e; + } + + [Test] + public async Task WhenAppendingToANewStreamAStreamChangedEventIsPublished() + { + var streamId = Guid.NewGuid().ToString(); + var subject = await GetEventStore(); + var @event = new EventData(Guid.NewGuid(), new OrderCreated(streamId), metadata); + + await subject.AppendToStream(streamId, 0, @event); + + Assert.IsNotNull(lastReceivedInMemoryEventStreamChangedEvent); + Assert.AreEqual(1, lastReceivedInMemoryEventStreamChangedEvent.NewEvents.Count); + + var newEvent = lastReceivedInMemoryEventStreamChangedEvent.NewEvents.Single(); + Assert.AreEqual(@event.EventId, newEvent.EventId); + Assert.AreEqual(@event.Body, newEvent.EventBody); + Assert.AreEqual(@event.Metadata, newEvent.Metadata); + Assert.AreEqual(1, newEvent.EventNumber); + } + + [Test] + public async Task WhenAppendingToAnExistingStreamAStreamChangedEventIsPublished() + { + var streamId = Guid.NewGuid().ToString(); + var subject = await GetEventStore(); + await subject.AppendToStream(streamId, 0, new EventData(Guid.NewGuid(), new OrderCreated(streamId), metadata)); + var @event = new EventData(Guid.NewGuid(), new OrderDispatched(streamId), metadata); + + await subject.AppendToStream(streamId, 1, @event); + + Assert.IsNotNull(lastReceivedInMemoryEventStreamChangedEvent); + Assert.AreEqual(1, lastReceivedInMemoryEventStreamChangedEvent.NewEvents.Count); + + var newEvent = lastReceivedInMemoryEventStreamChangedEvent.NewEvents.Single(); + Assert.AreEqual(@event.EventId, newEvent.EventId); + Assert.AreEqual(@event.Body, newEvent.EventBody); + Assert.AreEqual(@event.Metadata, newEvent.Metadata); + Assert.AreEqual(2, newEvent.EventNumber); + } + + [Test] + public async Task WhenAppendingToANewStreamWithMultipleEventsAStreamChangedEventIsPublished() + { + var streamId = Guid.NewGuid().ToString(); + var subject = await GetEventStore(); + var events = new[] + { + new EventData(Guid.NewGuid(), new OrderCreated(streamId), metadata), + new EventData(Guid.NewGuid(), new OrderDispatched(streamId), metadata) + }; + + await subject.AppendToStream(streamId, 0, events); + + Assert.IsNotNull(lastReceivedInMemoryEventStreamChangedEvent); + Assert.AreEqual(2, lastReceivedInMemoryEventStreamChangedEvent.NewEvents.Count); + + var firstNewEvent = lastReceivedInMemoryEventStreamChangedEvent.NewEvents.First(); + Assert.AreEqual(events[0].EventId, firstNewEvent.EventId); + Assert.AreEqual(events[0].Body, firstNewEvent.EventBody); + Assert.AreEqual(events[0].Metadata, firstNewEvent.Metadata); + Assert.AreEqual(1, firstNewEvent.EventNumber); + + var lastNewEvent = lastReceivedInMemoryEventStreamChangedEvent.NewEvents.Last(); + Assert.AreEqual(events[1].EventId, lastNewEvent.EventId); + Assert.AreEqual(events[1].Body, lastNewEvent.EventBody); + Assert.AreEqual(events[1].Metadata, lastNewEvent.Metadata); + Assert.AreEqual(2, lastNewEvent.EventNumber); + } + + [Test] + public async Task WhenAppendingToAnExistingStreamWithMultipleEventsAStreamChangedEventIsPublished() + { + var streamId = Guid.NewGuid().ToString(); + var subject = await GetEventStore(); + await subject.AppendToStream(streamId, 0, new EventData(Guid.NewGuid(), new OrderCreated(streamId), metadata)); + var events = new[] + { + new EventData(Guid.NewGuid(), new OrderCreated(streamId), metadata), + new EventData(Guid.NewGuid(), new OrderDispatched(streamId), metadata) + }; + + await subject.AppendToStream(streamId, 1, events); + + Assert.IsNotNull(lastReceivedInMemoryEventStreamChangedEvent); + Assert.AreEqual(2, lastReceivedInMemoryEventStreamChangedEvent.NewEvents.Count); + + var firstNewEvent = lastReceivedInMemoryEventStreamChangedEvent.NewEvents.First(); + Assert.AreEqual(events[0].EventId, firstNewEvent.EventId); + Assert.AreEqual(events[0].Body, firstNewEvent.EventBody); + Assert.AreEqual(events[0].Metadata, firstNewEvent.Metadata); + Assert.AreEqual(2, firstNewEvent.EventNumber); + + var lastNewEvent = lastReceivedInMemoryEventStreamChangedEvent.NewEvents.Last(); + Assert.AreEqual(events[1].EventId, lastNewEvent.EventId); + Assert.AreEqual(events[1].Body, lastNewEvent.EventBody); + Assert.AreEqual(events[1].Metadata, lastNewEvent.Metadata); + Assert.AreEqual(3, lastNewEvent.EventNumber); } } } \ No newline at end of file diff --git a/SimpleEventStore/InMemory/InMemoryEventStreamChanged.cs b/SimpleEventStore/InMemory/InMemoryEventStreamChanged.cs new file mode 100644 index 0000000..2f3317b --- /dev/null +++ b/SimpleEventStore/InMemory/InMemoryEventStreamChanged.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; + +namespace SimpleEventStore.InMemory +{ + public class InMemoryEventStreamChanged : EventArgs + { + public InMemoryEventStreamChanged(IReadOnlyList newEvents) + { + NewEvents = newEvents; + } + + public IReadOnlyList NewEvents { get; } + } +} diff --git a/SimpleEventStore/InMemory/InMemoryStorageEngine.cs b/SimpleEventStore/InMemory/InMemoryStorageEngine.cs index 2accf2a..286e4d1 100644 --- a/SimpleEventStore/InMemory/InMemoryStorageEngine.cs +++ b/SimpleEventStore/InMemory/InMemoryStorageEngine.cs @@ -13,6 +13,8 @@ public class InMemoryStorageEngine : IStorageEngine private readonly ConcurrentDictionary> streams = new ConcurrentDictionary>(); private readonly List allEvents = new List(); + public event EventHandler OnStreamChanged; + public Task AppendToStream(string streamId, IEnumerable events) { return Task.Run(() => @@ -46,7 +48,7 @@ public Task DeleteStream(string streamId) allEvents.Remove(@event); } - streams.TryRemove(streamId, out var removedStream); + streams.TryRemove(streamId, out _); return Task.CompletedTask; } @@ -57,6 +59,15 @@ private void AddEventsToAllStream(IEnumerable events) { allEvents.Add(e); } + + var handler = OnStreamChanged; + + if (handler is null) + { + return; + } + + handler(this, new InMemoryEventStreamChanged(new List(events).AsReadOnly())); } public Task> ReadStreamForwards(string streamId, int startPosition, int numberOfEventsToRead)