Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 112 additions & 1 deletion SimpleEventStore.Tests/InMemory/InMemoryEventStoreAppending.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,126 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using NUnit.Framework;
using SimpleEventStore.InMemory;
using SimpleEventStore.Tests.Events;

namespace SimpleEventStore.Tests.InMemory
{
[TestFixture]
public class InMemoryEventStoreAppending : EventStoreAppending
{
private InMemoryEventStreamChanged lastReceivedInMemoryEventStreamChangedEvent;
private readonly TestMetadata metadata = new TestMetadata { Value = "Hello" };

protected override Task<IStorageEngine> CreateStorageEngine()
{
return Task.FromResult((IStorageEngine)new InMemoryStorageEngine());
var storageEngine = new InMemoryStorageEngine();
storageEngine.OnStreamChanged += StorageEngineOnStreamChanged;
return Task.FromResult((IStorageEngine)storageEngine);
}

private void StorageEngineOnStreamChanged(object sender, InMemoryEventStreamChanged e)
{
lastReceivedInMemoryEventStreamChangedEvent = e;
}

[Test]
public async Task WhenAppendingToANewStreamAStreamChangedEventIsPublished()
{
var streamId = Guid.NewGuid().ToString();
var subject = await GetEventStore();
var @event = new EventData(Guid.NewGuid(), new OrderCreated(streamId), metadata);

await subject.AppendToStream(streamId, 0, @event);

Assert.IsNotNull(lastReceivedInMemoryEventStreamChangedEvent);
Assert.AreEqual(1, lastReceivedInMemoryEventStreamChangedEvent.NewEvents.Count);

var newEvent = lastReceivedInMemoryEventStreamChangedEvent.NewEvents.Single();
Assert.AreEqual(@event.EventId, newEvent.EventId);
Assert.AreEqual(@event.Body, newEvent.EventBody);
Assert.AreEqual(@event.Metadata, newEvent.Metadata);
Assert.AreEqual(1, newEvent.EventNumber);
}

[Test]
public async Task WhenAppendingToAnExistingStreamAStreamChangedEventIsPublished()
{
var streamId = Guid.NewGuid().ToString();
var subject = await GetEventStore();
await subject.AppendToStream(streamId, 0, new EventData(Guid.NewGuid(), new OrderCreated(streamId), metadata));
var @event = new EventData(Guid.NewGuid(), new OrderDispatched(streamId), metadata);

await subject.AppendToStream(streamId, 1, @event);

Assert.IsNotNull(lastReceivedInMemoryEventStreamChangedEvent);
Assert.AreEqual(1, lastReceivedInMemoryEventStreamChangedEvent.NewEvents.Count);

var newEvent = lastReceivedInMemoryEventStreamChangedEvent.NewEvents.Single();
Assert.AreEqual(@event.EventId, newEvent.EventId);
Assert.AreEqual(@event.Body, newEvent.EventBody);
Assert.AreEqual(@event.Metadata, newEvent.Metadata);
Assert.AreEqual(2, newEvent.EventNumber);
}

[Test]
public async Task WhenAppendingToANewStreamWithMultipleEventsAStreamChangedEventIsPublished()
{
var streamId = Guid.NewGuid().ToString();
var subject = await GetEventStore();
var events = new[]
{
new EventData(Guid.NewGuid(), new OrderCreated(streamId), metadata),
new EventData(Guid.NewGuid(), new OrderDispatched(streamId), metadata)
};

await subject.AppendToStream(streamId, 0, events);

Assert.IsNotNull(lastReceivedInMemoryEventStreamChangedEvent);
Assert.AreEqual(2, lastReceivedInMemoryEventStreamChangedEvent.NewEvents.Count);

var firstNewEvent = lastReceivedInMemoryEventStreamChangedEvent.NewEvents.First();
Assert.AreEqual(events[0].EventId, firstNewEvent.EventId);
Assert.AreEqual(events[0].Body, firstNewEvent.EventBody);
Assert.AreEqual(events[0].Metadata, firstNewEvent.Metadata);
Assert.AreEqual(1, firstNewEvent.EventNumber);

var lastNewEvent = lastReceivedInMemoryEventStreamChangedEvent.NewEvents.Last();
Assert.AreEqual(events[1].EventId, lastNewEvent.EventId);
Assert.AreEqual(events[1].Body, lastNewEvent.EventBody);
Assert.AreEqual(events[1].Metadata, lastNewEvent.Metadata);
Assert.AreEqual(2, lastNewEvent.EventNumber);
}

[Test]
public async Task WhenAppendingToAnExistingStreamWithMultipleEventsAStreamChangedEventIsPublished()
{
var streamId = Guid.NewGuid().ToString();
var subject = await GetEventStore();
await subject.AppendToStream(streamId, 0, new EventData(Guid.NewGuid(), new OrderCreated(streamId), metadata));
var events = new[]
{
new EventData(Guid.NewGuid(), new OrderCreated(streamId), metadata),
new EventData(Guid.NewGuid(), new OrderDispatched(streamId), metadata)
};

await subject.AppendToStream(streamId, 1, events);

Assert.IsNotNull(lastReceivedInMemoryEventStreamChangedEvent);
Assert.AreEqual(2, lastReceivedInMemoryEventStreamChangedEvent.NewEvents.Count);

var firstNewEvent = lastReceivedInMemoryEventStreamChangedEvent.NewEvents.First();
Assert.AreEqual(events[0].EventId, firstNewEvent.EventId);
Assert.AreEqual(events[0].Body, firstNewEvent.EventBody);
Assert.AreEqual(events[0].Metadata, firstNewEvent.Metadata);
Assert.AreEqual(2, firstNewEvent.EventNumber);

var lastNewEvent = lastReceivedInMemoryEventStreamChangedEvent.NewEvents.Last();
Assert.AreEqual(events[1].EventId, lastNewEvent.EventId);
Assert.AreEqual(events[1].Body, lastNewEvent.EventBody);
Assert.AreEqual(events[1].Metadata, lastNewEvent.Metadata);
Assert.AreEqual(3, lastNewEvent.EventNumber);
}
}
}
15 changes: 15 additions & 0 deletions SimpleEventStore/InMemory/InMemoryEventStreamChanged.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;

namespace SimpleEventStore.InMemory
{
public class InMemoryEventStreamChanged : EventArgs
{
public InMemoryEventStreamChanged(IReadOnlyList<StorageEvent> newEvents)
{
NewEvents = newEvents;
}

public IReadOnlyList<StorageEvent> NewEvents { get; }
}
}
13 changes: 12 additions & 1 deletion SimpleEventStore/InMemory/InMemoryStorageEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ public class InMemoryStorageEngine : IStorageEngine
private readonly ConcurrentDictionary<string, List<StorageEvent>> streams = new ConcurrentDictionary<string, List<StorageEvent>>();
private readonly List<StorageEvent> allEvents = new List<StorageEvent>();

public event EventHandler<InMemoryEventStreamChanged> OnStreamChanged;

public Task AppendToStream(string streamId, IEnumerable<StorageEvent> events)
{
return Task.Run(() =>
Expand Down Expand Up @@ -46,7 +48,7 @@ public Task DeleteStream(string streamId)
allEvents.Remove(@event);
}

streams.TryRemove(streamId, out var removedStream);
streams.TryRemove(streamId, out _);

return Task.CompletedTask;
}
Expand All @@ -57,6 +59,15 @@ private void AddEventsToAllStream(IEnumerable<StorageEvent> events)
{
allEvents.Add(e);
}

var handler = OnStreamChanged;

if (handler is null)
{
return;
}

handler(this, new InMemoryEventStreamChanged(new List<StorageEvent>(events).AsReadOnly()));
}

public Task<IReadOnlyCollection<StorageEvent>> ReadStreamForwards(string streamId, int startPosition, int numberOfEventsToRead)
Expand Down