Skip to content

Commit

Permalink
Allow writing additional info with the projector state. (#6)
Browse files Browse the repository at this point in the history
If the implementation of `IProjectorState` has additional custom properties, you can now use the `EnrichState` delegate to update the state object before it is written to the database.
  • Loading branch information
dennisdoomen authored Sep 27, 2017
1 parent d273ebb commit f74d646
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 42 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,13 @@ Output/*
**/*.sln.ide/**
_NCrunch_LiquidProjections/
Artifacts/**
.vs/**/*
**/*.ide

# User-specific stuff:
.idea

# Cake related
Build/**
!Build/packages.config
Tools/**
Tools/**
Binary file modified .vs/LiquidProjections.NHibernate/v15/sqlite3/storage.ide
Binary file not shown.
8 changes: 5 additions & 3 deletions README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ This [package](https://www.nuget.org/packages/LiquidProjections.NHibernate/) pro
* Allows customizing how many LiquidProjections [Transaction](https://github.com/liquidprojections/LiquidProjections/blob/master/Src/LiquidProjections.Abstractions/Transaction.cs)s it should process in a single database transaction.
* Supports nesting one or more `NHibernateChildProjector`s that can be used for maintaining lookup tables and which are part of the same database transaction as the parent projector creates.
* Ensures autonomy by storing the `IProjectionState` (e.g. the last transaction checkpoint) in the same transaction as the actual projection data.
* Provides a simple caching mechanism in the form of the `IProjectionCache` and ships with the `LruProjectionCache` based on the [FluidCaching](https://www.nuget.org/packages/FluidCaching.Sources/) project.
* Exposes the `ShouldRetry` delegate so that you can handle projection exceptions and get full control on whether or not the projector should retry (again).
* If the implementation of `IProjectorStat`e has additional custom properties, you can use the `EnrichState` delegate of the `NHibernateProjector` to update the state object before it is written to the database. This is called as part of the same database transaction that encompasses the projector code.

## Are there any limitations?
* The `IProjectionCache` is meant for the most common scenarios and thus has some limitations. If you need more advanced caching, please refer to NHibernate's [Second Level Caching](http://nhibernate.info/doc/nhibernate-reference/caches.html) feature.
## Caching support
This package provides a simple caching mechanism in the form of the `IProjectionCache` and ships with the `LruProjectionCache` based on the [FluidCaching](https://www.nuget.org/packages/FluidCaching.Sources/) project. The `IProjectionCache` is meant for the most common scenarios and thus has some limitations:
* If the projector performs database modifications directly on the NHibernate `ISession`, that projector must make sure the cache is updated or cleared accordingly.
* The cache doesn't understand relationships where a projection refers to another projection maintained by the same projector. For instance, a projector that maintains a graph of parents and children, where a child is also a (direct or indirect) parent must use a more advanced type of caching.

If you need more advanced caching, please refer to NHibernate's [Second Level Caching](http://nhibernate.info/doc/nhibernate-reference/caches.html) feature.

## How to get started?
Check out the [unit tests](https://github.com/liquidprojections/LiquidProjections.NHibernate/blob/master/Tests/LiquidProjections.NHibernate.Specs/NHibernateProjectorSpecs.cs) to learn how the many features work together. You can also look at the non-NHibernate example [here](https://github.com/liquidprojections/LiquidProjections/blob/master/Samples/ExampleHost/ExampleProjector.cs#L25) and replace `Projector` with `NHibernateProjector`. A proper example project for NHibernate is [planned](https://github.com/liquidprojections/LiquidProjections.NHibernate/issues/5).
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
<Compile Include="EnumerableExtensions.cs" />
<Compile Include="LruProjectionCache.cs" />
<Compile Include="PassthroughCache.cs" />
<Compile Include="ShouldRetry.cs" />
<Compile Include="INHibernateChildProjector.cs" />
<Compile Include="IProjectorState.cs" />
<Compile Include="ITrackingState.cs" />
Expand Down
25 changes: 25 additions & 0 deletions Src/LiquidProjections.NHibernate/NHibernateProjector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ public ShouldRetry ShouldRetry
set => shouldRetry = value ?? throw new ArgumentNullException(nameof(value), "Retry policy is missing.");
}

/// <summary>
/// Allows enriching the projector state with additional details before the updated state is written to the database.
/// </summary>
/// <remarks>
/// Is called before the transaction wrapping a batch of transactions is committed.
/// </remarks>
public EnrichState<TState> EnrichState { get; set; } = (state, transaction) => {};

/// <summary>
/// A cache that can be used to avoid loading projections from the database.
/// </summary>
Expand Down Expand Up @@ -225,6 +233,8 @@ private void StoreLastCheckpoint(ISession session, Transaction transaction)
{
session.Save(state);
}

EnrichState(state, transaction);
}
catch (Exception exception)
{
Expand All @@ -243,4 +253,19 @@ private void StoreLastCheckpoint(ISession session, Transaction transaction)
}
}
}

/// <summary>
/// A delegate that can be implemented to retry projecting a batch of transactions when it fails.
/// </summary>
/// <returns>Returns true if the projector should retry to project the batch of transactions, false if it shoud fail with the specified exception.</returns>
/// <param name="exception">The exception that occured that caused this batch to fail.</param>
/// <param name="attempts">The number of attempts that were made to project this batch of transactions.</param>
public delegate Task<bool> ShouldRetry(ProjectionException exception, int attempts);

/// <summary>
/// Defines the signature of a method that can be used to update the projection state as explained
/// in <see cref="NHibernateProjector{TProjection,TKey,TState}.EnrichState"/>.
/// </summary>
public delegate void EnrichState<in TState>(TState state, Transaction transaction)
where TState : IProjectorState;
}
12 changes: 0 additions & 12 deletions Src/LiquidProjections.NHibernate/ShouldRetry.cs

This file was deleted.

107 changes: 82 additions & 25 deletions Tests/LiquidProjections.NHibernate.Specs/NHibernateProjectorSpecs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class Given_a_sqlite_projector_with_an_in_memory_event_source :
protected EventMapBuilder<ProductCatalogEntry, string, NHibernateProjectionContext> Events;
protected LruProjectionCache<ProductCatalogEntry, string> Cache;
protected Exception ProjectionException = null;
private readonly List<INHibernateChildProjector> children = new List<INHibernateChildProjector>();

public Given_a_sqlite_projector_with_an_in_memory_event_source()
{
Expand All @@ -35,26 +36,26 @@ public Given_a_sqlite_projector_with_an_in_memory_event_source()
() => DateTime.UtcNow);

Events = new EventMapBuilder<ProductCatalogEntry, string, NHibernateProjectionContext>();
});
}

protected void StartProjecting(string stateKey = null, INHibernateChildProjector[] children = null)
{
WithSubject(_ =>
{
return new NHibernateProjector<ProductCatalogEntry, string, ProjectorState>(
The<ISessionFactory>().OpenSession, Events, (entry, id) => entry.Id = id, children)
WithSubject(_ =>
{
BatchSize = 10,
Cache = Cache,
};
return new NHibernateProjector<ProductCatalogEntry, string, ProjectorState>(
The<ISessionFactory>().OpenSession, Events, (entry, id) => entry.Id = id, children)
{
BatchSize = 10,
Cache = Cache,
};
});
});
}

if (!string.IsNullOrEmpty(stateKey))
{
Subject.StateKey = stateKey;
}
protected void AddChildProjector(INHibernateChildProjector childProjector)
{
children.Add(childProjector);
}

protected void StartProjecting()
{
The<MemoryEventSource>().Subscribe(0, new Subscriber
{
HandleTransactions = async (transactions, info) =>
Expand Down Expand Up @@ -88,12 +89,12 @@ public When_a_create_was_requested_but_the_database_already_contained_that_proje
Id = "c350E",
Category = "Gas"
});

StartProjecting();
});

When(async () =>
{
StartProjecting();

await The<MemoryEventSource>().Write(new ProductAddedToCatalogEvent
{
ProductKey = "c350E",
Expand Down Expand Up @@ -1118,7 +1119,9 @@ public When_a_custom_state_key_is_set()
.Using((productCatalogEntry, productAddedToCatalogEvent, context) =>
productCatalogEntry.Category = productAddedToCatalogEvent.Category);

StartProjecting(stateKey: "CatalogEntries");
Subject.StateKey = "CatalogEntries";

StartProjecting();
});

When(() => The<MemoryEventSource>().Write(new ProductAddedToCatalogEvent
Expand All @@ -1139,6 +1142,56 @@ public void Then_it_should_store_projector_state_with_that_key()
}
}

public class When_the_projector_state_is_enriched : Given_a_sqlite_projector_with_an_in_memory_event_source
{
public When_the_projector_state_is_enriched()
{
Given(() =>
{
Events.Map<ProductAddedToCatalogEvent>()
.AsCreateOf(@event => @event.ProductKey)
.Using((p, @event, context) => p.Category = @event.Category);

Subject.EnrichState = (state, transaction) =>
{
state.LastStreamId = transaction.StreamId;
};
});

When(() =>
{
StartProjecting();

return The<MemoryEventSource>().Write(new Transaction
{
StreamId = "Product1",
Events = new[]
{
new EventEnvelope
{
Body = new ProductAddedToCatalogEvent
{
ProductKey = "c350E",
Category = "Hybrid"
}
}
}
});
});
}

[Fact]
public void Then_it_should_store_the_custom_property_along_with_the_state()
{
using (var session = The<ISessionFactory>().OpenSession())
{
ProjectorState projectorState = session.Get<ProjectorState>(Subject.StateKey);
projectorState.LastStreamId.Should().Be("Product1");
}
}
}


public class When_an_event_has_a_header : Given_a_sqlite_projector_with_an_in_memory_event_source
{
public When_an_event_has_a_header()
Expand Down Expand Up @@ -1262,14 +1315,14 @@ public When_there_is_a_child_projector()
.AsCreateOf(anEvent => anEvent.ProductKey)
.Using((entry, anEvent) => entry.Category = anEvent.Category);

var childProjector = new NHibernateChildProjector<ProductCatalogChildEntry, string>(
childMapBuilder, (childEntry, id) => childEntry.Id = id);

StartProjecting(children: new INHibernateChildProjector[] {childProjector});
AddChildProjector(new NHibernateChildProjector<ProductCatalogChildEntry, string>(
childMapBuilder, (childEntry, id) => childEntry.Id = id));
});

When(async () =>
{
StartProjecting();

var transaction1 = new Transaction
{
Events = new[]
Expand Down Expand Up @@ -1374,13 +1427,13 @@ public When_a_child_projector_has_its_own_cache()
childCache = new LruProjectionCache<ProductCatalogChildEntry, string>(1000, 1.Hours(), 2.Hours(), e => e.Id,
() => DateTime.UtcNow);

var childProjector = new NHibernateChildProjector<ProductCatalogChildEntry, string>(
AddChildProjector(new NHibernateChildProjector<ProductCatalogChildEntry, string>(
childMapBuilder, (childEntry, id) => childEntry.Id = id)
{
Cache = childCache
};
});

StartProjecting(children: new INHibernateChildProjector[] {childProjector});
StartProjecting();
});

When(async () =>
Expand Down Expand Up @@ -1630,6 +1683,8 @@ public void Then_it_should_completely_clear_the_cache()
}
}

#region Supporting Types

public class ProductCatalogEntry
{
public virtual string Id { get; set; }
Expand Down Expand Up @@ -1686,4 +1741,6 @@ public class CategoryDiscontinuedEvent
{
public string Category { get; set; }
}

#endregion
}
3 changes: 3 additions & 0 deletions Tests/LiquidProjections.NHibernate.Specs/ProjectorState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ public class ProjectorState : IProjectorState
public virtual string Id { get; set; }
public virtual long Checkpoint { get; set; }
public virtual DateTime LastUpdateUtc { get; set; }

public virtual string LastStreamId { get; set; }
}

internal sealed class ProjectorStateClassMap : ClassMap<ProjectorState>
Expand All @@ -18,6 +20,7 @@ public ProjectorStateClassMap()
Id(x => x.Id).Not.Nullable().Length(150);
Map(x => x.Checkpoint);
Map(x => x.LastUpdateUtc);
Map(x => x.LastStreamId);
}
}
}

0 comments on commit f74d646

Please sign in to comment.