diff --git a/Akka.Linq2Db.Sandbox/Akka.Linq2Db.Sandbox.csproj b/Akka.Linq2Db.Sandbox/Akka.Linq2Db.Sandbox.csproj
new file mode 100644
index 00000000..8019ffa3
--- /dev/null
+++ b/Akka.Linq2Db.Sandbox/Akka.Linq2Db.Sandbox.csproj
@@ -0,0 +1,15 @@
+
+
+
+ netcoreapp3.1
+ enable
+ enable
+
+
+
+
+
+
+
+
+
diff --git a/Akka.Persistence.Linq2Db.IndexHelperApp/Akka.Persistence.Linq2Db.IndexHelperApp.csproj b/Akka.Persistence.Linq2Db.IndexHelperApp/Akka.Persistence.Linq2Db.IndexHelperApp.csproj
new file mode 100644
index 00000000..caf07276
--- /dev/null
+++ b/Akka.Persistence.Linq2Db.IndexHelperApp/Akka.Persistence.Linq2Db.IndexHelperApp.csproj
@@ -0,0 +1,25 @@
+
+
+
+ Exe
+ netcoreapp3.1
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Always
+
+
+
+
diff --git a/Akka.Persistence.Linq2Db.IndexHelperApp/Options.cs b/Akka.Persistence.Linq2Db.IndexHelperApp/Options.cs
new file mode 100644
index 00000000..b8ebaa58
--- /dev/null
+++ b/Akka.Persistence.Linq2Db.IndexHelperApp/Options.cs
@@ -0,0 +1,22 @@
+using CommandLine;
+
+namespace Akka.Persistence.Linq2Db.IndexHelperApp
+{
+ public class Options
+ {
+ [Option('f',"file", Required=true, HelpText = "Specify the HOCON file to use")]
+ public string File { get; set; }
+
+ [Option('p',"path", Required = true, HelpText = "The Path to the Akka.Persistence.Linq2Db Config in the HOCON.")]
+ public string HoconPath { get; set; }
+
+ [Option("OrderingIdx", Required = true, Group = "IndexType", HelpText = "Generates the SQL Text for an Ordering index")]
+ public bool GenerateOrdering { get; set; }
+
+ [Option("PidSeqNoIdx", Required = true, Group = "IndexType", HelpText = "Generates the SQL Text for an index on PersistenceID and SeqNo")]
+ public bool GeneratePidSeqNo { get; set; }
+
+ [Option("TimeStampIdx", Required = true, Group = "IndexType", HelpText = "Generates the SQL Text for a Timestamp Index")]
+ public bool GenerateTimestamp { get; set; }
+ }
+}
\ No newline at end of file
diff --git a/Akka.Persistence.Linq2Db.IndexHelperApp/Program.cs b/Akka.Persistence.Linq2Db.IndexHelperApp/Program.cs
new file mode 100644
index 00000000..14a386b5
--- /dev/null
+++ b/Akka.Persistence.Linq2Db.IndexHelperApp/Program.cs
@@ -0,0 +1,141 @@
+using System;
+using System.IO;
+using Akka.Configuration;
+using Akka.Persistence.Linq2Db.IndexHelperLib;
+using Akka.Persistence.Sql.Linq2Db.Config;
+using Akka.Persistence.Sql.Linq2Db.Tests;
+using CommandLine;
+using FluentMigrator.Expressions;
+using FluentMigrator.Runner.Generators;
+using FluentMigrator.Runner.Generators.Generic;
+using FluentMigrator.Runner.Generators.MySql;
+using FluentMigrator.Runner.Generators.Oracle;
+using FluentMigrator.Runner.Generators.Postgres;
+using FluentMigrator.Runner.Generators.Postgres92;
+using FluentMigrator.Runner.Generators.SQLite;
+using FluentMigrator.Runner.Generators.SqlServer;
+using FluentMigrator.Runner.Processors.Postgres;
+using LinqToDB;
+using Microsoft.Extensions.Options;
+using Newtonsoft.Json;
+
+namespace Akka.Persistence.Linq2Db.IndexHelperApp
+{
+ class Program
+ {
+
+ static void Main(string[] args)
+ {
+ Parser.Default.ParseArguments(args)
+ .WithParsed(opts =>
+ {
+ //var str = Linq2DbJournalDefaultSpecConfig.customConfig("testGen",
+ // "journalTbl", "metadataTbl", ProviderName.SqlServer,
+ // "connStr");
+ var conf =
+ ConfigurationFactory.ParseString(
+ File.ReadAllText(opts.File));
+
+ var journalConf =
+ new Akka.Persistence.Sql.Linq2Db.Config.JournalConfig(
+ conf.GetConfig(
+ opts.HoconPath
+ //"akka.persistence.journal.linq2db.testGen"
+ )
+ .WithFallback(Akka.Persistence.Sql.Linq2Db
+ .Journal
+ .Linq2DbWriteJournal.DefaultConfiguration));
+ var generator = getGenerator(journalConf.ProviderName);
+ var helper = new JournalIndexHelper();
+ CreateIndexExpression expr = null;
+ GeneratePerOptions(opts, helper, journalConf, generator);
+ });
+ }
+
+ private static void GeneratePerOptions(Options opts, JournalIndexHelper helper,
+ JournalConfig journalConf, GenericGenerator generator)
+ {
+ CreateIndexExpression expr;
+ if (opts.GeneratePidSeqNo)
+ {
+ expr = new CreateIndexExpression()
+ {
+ Index = helper.JournalOrdering(journalConf.TableConfig.TableName,
+ journalConf.TableConfig.ColumnNames.Ordering,
+ journalConf.TableConfig.SchemaName)
+ };
+ GenerateWithHeaderAndFooter(generator, expr, "Ordering");
+ }
+
+ if (opts.GeneratePidSeqNo)
+ {
+ expr = new CreateIndexExpression()
+ {
+ Index = helper.DefaultJournalIndex(
+ journalConf.TableConfig.TableName,
+ journalConf.TableConfig.ColumnNames.PersistenceId,
+ journalConf.TableConfig.ColumnNames.SequenceNumber,
+ journalConf.TableConfig.SchemaName)
+ };
+ GenerateWithHeaderAndFooter(generator, expr, "PidAndSequenceNo");
+ }
+
+ if (opts.GenerateTimestamp)
+ {
+ expr = new CreateIndexExpression()
+ {
+ Index = helper.JournalTimestamp(journalConf.TableConfig.TableName,
+ journalConf.TableConfig.ColumnNames.Created,
+ journalConf.TableConfig.SchemaName)
+ };
+ GenerateWithHeaderAndFooter(generator, expr, "Timestamp");
+ }
+ }
+
+ private static void GenerateWithHeaderAndFooter(GenericGenerator generator,
+ CreateIndexExpression expr, string indexType)
+ {
+ Console.WriteLine("-------");
+ Console.WriteLine($"----{indexType} Index Create Below");
+ Console.WriteLine(generator.Generate(expr));
+ Console.WriteLine($"----{indexType} Index Create Above");
+ Console.WriteLine("-------");
+ }
+
+ static GenericGenerator getGenerator(string dbArg)
+ {
+ if (dbArg.StartsWith("sqlserver",
+ StringComparison.InvariantCultureIgnoreCase))
+ {
+ return new SqlServer2008Generator();
+ }
+ else if (dbArg.Contains("sqlite",
+ StringComparison.InvariantCultureIgnoreCase))
+ {
+ return new SQLiteGenerator();
+ }
+ else if (dbArg.Contains("postgres",
+ StringComparison.InvariantCultureIgnoreCase))
+ {
+ return new Postgres92Generator(
+ new PostgresQuoter(new PostgresOptions()),
+ new OptionsWrapper(
+ new GeneratorOptions()));
+ }
+ else if (dbArg.Contains("mysql",
+ StringComparison.InvariantCultureIgnoreCase))
+ {
+ return new MySql5Generator();
+ }
+ else if (dbArg.Contains("oracle",
+ StringComparison.InvariantCultureIgnoreCase))
+ {
+ return new OracleGenerator();
+ }
+ else
+ {
+ throw new Exception("IDK what to do with this!");
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/Akka.Persistence.Linq2Db.IndexHelperApp/example.hocon b/Akka.Persistence.Linq2Db.IndexHelperApp/example.hocon
new file mode 100644
index 00000000..a382c24c
--- /dev/null
+++ b/Akka.Persistence.Linq2Db.IndexHelperApp/example.hocon
@@ -0,0 +1,15 @@
+akka.persistence.journal.linq2db{
+ testGen {
+ class = "Akka.Persistence.Sql.Linq2Db.Journal.Linq2DbWriteJournal, Akka.Persistence.Sql.Linq2Db"
+ provider-name = "SqlServer"
+ connection-string = "connStr"
+ tables{
+ journal{
+ auto-init = true
+ warn-on-auto-init-fail = false
+ table-name = "journalTbl"
+ metadata-table-name = "metadataTbl"
+ }
+ }
+}
+}
\ No newline at end of file
diff --git a/Akka.Persistence.Linq2Db.IndexHelperLib/Akka.Persistence.Linq2Db.IndexHelperLib.csproj b/Akka.Persistence.Linq2Db.IndexHelperLib/Akka.Persistence.Linq2Db.IndexHelperLib.csproj
new file mode 100644
index 00000000..49c19454
--- /dev/null
+++ b/Akka.Persistence.Linq2Db.IndexHelperLib/Akka.Persistence.Linq2Db.IndexHelperLib.csproj
@@ -0,0 +1,15 @@
+
+
+
+ netstandard2.0
+
+
+
+
+
+
+
+
+
+
+
diff --git a/Akka.Persistence.Linq2Db.IndexHelperLib/Class1.cs b/Akka.Persistence.Linq2Db.IndexHelperLib/Class1.cs
new file mode 100644
index 00000000..65f5b08e
--- /dev/null
+++ b/Akka.Persistence.Linq2Db.IndexHelperLib/Class1.cs
@@ -0,0 +1,50 @@
+using System;
+using FluentMigrator.Model;
+
+namespace Akka.Persistence.Linq2Db.IndexHelperLib
+{
+ public class JournalIndexHelper
+ {
+ public IndexDefinition DefaultJournalIndex(string tableName, string persistenceIdCol, string sequenceNoCol, string schemaName = null)
+ {
+ var idx = beginCreateIndex(tableName, schemaName, $"UX_{tableName}_PID_SEQNO");
+ //short name for easy compat with all dbs. (*cough* oracle *cough*)
+ idx.Columns.Add(new IndexColumnDefinition(){ Name = persistenceIdCol });
+ idx.Columns.Add(new IndexColumnDefinition(){Name = sequenceNoCol, Direction = Direction.Ascending});
+ idx.IsUnique = true;
+ return idx;
+ }
+
+ public IndexDefinition JournalOrdering(string tableName,
+ string orderingCol, string schemaName = null)
+ {
+ var idx = beginCreateIndex(tableName, schemaName,$"IX_{tableName}_Ordering");
+ idx.Columns.Add(new IndexColumnDefinition(){Name = orderingCol});
+ //Should it be?
+ //idx.IsUnique = true;
+ return idx;
+ }
+
+ public IndexDefinition JournalTimestamp(string tableName,
+ string timestampCol, string schemaName = null)
+ {
+ var idx = beginCreateIndex(tableName, schemaName,
+ $"IX_{tableName}_TimeStamp");
+ idx.Columns.Add(new IndexColumnDefinition(){Name = timestampCol});
+ //Not unique by any stretch.
+ return idx;
+ }
+
+ private static IndexDefinition beginCreateIndex(string tableName, string schemaName, string indexName)
+ {
+ var idx = new IndexDefinition();
+ if (string.IsNullOrWhiteSpace(schemaName) == false)
+ {
+ idx.SchemaName = schemaName;
+ }
+ idx.TableName = tableName;
+ idx.Name = indexName;
+ return idx;
+ }
+ }
+}
\ No newline at end of file
diff --git a/Akka.Persistence.Linq2Db.sln b/Akka.Persistence.Linq2Db.sln
index f84ac72e..81a30e5c 100644
--- a/Akka.Persistence.Linq2Db.sln
+++ b/Akka.Persistence.Linq2Db.sln
@@ -36,6 +36,12 @@ EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests", "Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests\Akka.Persistence.Linq2Db.Benchmark.DockerComparisonTests.csproj", "{170698FA-DA1E-40BC-896D-AFA67976C0EB}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Persistence.Linq2Db.IndexHelperLib", "Akka.Persistence.Linq2Db.IndexHelperLib\Akka.Persistence.Linq2Db.IndexHelperLib.csproj", "{AACE3FBC-51FE-4A9B-B6C4-4CCA750DB22E}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Persistence.Linq2Db.IndexHelperApp", "Akka.Persistence.Linq2Db.IndexHelperApp\Akka.Persistence.Linq2Db.IndexHelperApp.csproj", "{D5C851AA-DB80-4E9F-BD2E-03E63DC3082E}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Linq2Db.Sandbox", "Akka.Linq2Db.Sandbox\Akka.Linq2Db.Sandbox.csproj", "{697B9FC8-29E2-4F7D-B63B-9E4B873F6AA1}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -78,6 +84,18 @@ Global
{170698FA-DA1E-40BC-896D-AFA67976C0EB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{170698FA-DA1E-40BC-896D-AFA67976C0EB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{170698FA-DA1E-40BC-896D-AFA67976C0EB}.Release|Any CPU.Build.0 = Release|Any CPU
+ {AACE3FBC-51FE-4A9B-B6C4-4CCA750DB22E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {AACE3FBC-51FE-4A9B-B6C4-4CCA750DB22E}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {AACE3FBC-51FE-4A9B-B6C4-4CCA750DB22E}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {AACE3FBC-51FE-4A9B-B6C4-4CCA750DB22E}.Release|Any CPU.Build.0 = Release|Any CPU
+ {D5C851AA-DB80-4E9F-BD2E-03E63DC3082E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {D5C851AA-DB80-4E9F-BD2E-03E63DC3082E}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {D5C851AA-DB80-4E9F-BD2E-03E63DC3082E}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {D5C851AA-DB80-4E9F-BD2E-03E63DC3082E}.Release|Any CPU.Build.0 = Release|Any CPU
+ {697B9FC8-29E2-4F7D-B63B-9E4B873F6AA1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {697B9FC8-29E2-4F7D-B63B-9E4B873F6AA1}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {697B9FC8-29E2-4F7D-B63B-9E4B873F6AA1}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {697B9FC8-29E2-4F7D-B63B-9E4B873F6AA1}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
diff --git a/src/Akka.Persistence.Sql.Linq2Db/Config/BaseByteArrayJournalDaoConfig.cs b/src/Akka.Persistence.Sql.Linq2Db/Config/BaseByteArrayJournalDaoConfig.cs
index 0410df3d..5e6499a1 100644
--- a/src/Akka.Persistence.Sql.Linq2Db/Config/BaseByteArrayJournalDaoConfig.cs
+++ b/src/Akka.Persistence.Sql.Linq2Db/Config/BaseByteArrayJournalDaoConfig.cs
@@ -10,6 +10,7 @@ public BaseByteArrayJournalDaoConfig(Configuration.Config config)
BufferSize = config.GetInt("buffer-size", 5000);
BatchSize = config.GetInt("batch-size", 100);
DbRoundTripBatchSize = config.GetInt("db-round-trip-max-batch-size", 1000);
+ DbRoundTripTagBatchSize = config.GetInt("db-round-trip-max-tag-batch-size", 1000);
PreferParametersOnMultiRowInsert =
config.GetBoolean("prefer-parameters-on-multirow-insert",
false);
@@ -43,6 +44,6 @@ public BaseByteArrayJournalDaoConfig(Configuration.Config config)
public int BufferSize { get; protected set; }
public bool SqlCommonCompatibilityMode { get; protected set; }
-
+ public int DbRoundTripTagBatchSize { get; set; }
}
}
\ No newline at end of file
diff --git a/src/Akka.Persistence.Sql.Linq2Db/Config/JournalConfig.cs b/src/Akka.Persistence.Sql.Linq2Db/Config/JournalConfig.cs
index ac40d71c..52708066 100644
--- a/src/Akka.Persistence.Sql.Linq2Db/Config/JournalConfig.cs
+++ b/src/Akka.Persistence.Sql.Linq2Db/Config/JournalConfig.cs
@@ -21,7 +21,6 @@ public JournalConfig(Configuration.Config config)
UseSharedDb = string.IsNullOrWhiteSpace(dbConf) ? null : dbConf;
UseCloneConnection =
config.GetBoolean("use-clone-connection", false);
-
}
public string MaterializerDispatcher { get; protected set; }
@@ -44,6 +43,7 @@ public IDaoConfig IDaoConfig
public string ProviderName { get; }
public string ConnectionString { get; }
public bool UseCloneConnection { get; set; }
+
}
public interface IProviderConfig
@@ -61,4 +61,6 @@ public interface IDaoConfig
bool SqlCommonCompatibilityMode { get; }
int Parallelism { get; }
}
+
+
}
\ No newline at end of file
diff --git a/src/Akka.Persistence.Sql.Linq2Db/Config/JournalTableConfig.cs b/src/Akka.Persistence.Sql.Linq2Db/Config/JournalTableConfig.cs
index 4d680076..bdd78d96 100644
--- a/src/Akka.Persistence.Sql.Linq2Db/Config/JournalTableConfig.cs
+++ b/src/Akka.Persistence.Sql.Linq2Db/Config/JournalTableConfig.cs
@@ -3,6 +3,19 @@
namespace Akka.Persistence.Sql.Linq2Db.Config
{
+ [Flags]
+ public enum TagWriteMode
+ {
+ CommaSeparatedArray = 1,
+ TagTable = 2,
+ CommaSeparatedArrayAndTagTable = 3,
+ }
+
+ public enum TagTableMode
+ {
+ OrderingId,
+ SequentialUUID
+ }
public class JournalTableConfig
{
@@ -13,6 +26,12 @@ public class JournalTableConfig
public string MetadataTableName { get; protected set; }
public MetadataTableColumnNames MetadataColumnNames { get; protected set; }
public bool WarnOnAutoInitializeFail { get; }
+
+ public TagWriteMode TagWriteMode { get; }
+ public TagTableMode TagTableMode { get; }
+ public string? TagTableName { get; }
+ public bool UseEventManifestColumn { get; }
+
public JournalTableConfig(Configuration.Config config)
{
@@ -28,6 +47,31 @@ public JournalTableConfig(Configuration.Config config)
AutoInitialize = localcfg.GetBoolean("auto-init", false);
WarnOnAutoInitializeFail =
localcfg.GetBoolean("warn-on-auto-init-fail", true);
+
+ var s = config.GetString("tag-write-mode", "default");
+ if (Enum.TryParse(s, true, out TagWriteMode res))
+ {
+
+ }
+ else if (s.Equals("default", StringComparison.InvariantCultureIgnoreCase))
+ {
+ res = TagWriteMode.CommaSeparatedArray;
+ }
+ else if (s.Equals("migration",
+ StringComparison.InvariantCultureIgnoreCase))
+ {
+ res = TagWriteMode.CommaSeparatedArrayAndTagTable;
+ }
+ else if (s.Equals("tagtableonly",
+ StringComparison.InvariantCultureIgnoreCase))
+ {
+ res = TagWriteMode.TagTable;
+ }
+ else
+ {
+ res = TagWriteMode.CommaSeparatedArray;
+ }
+ TagWriteMode = res;
}
@@ -40,7 +84,8 @@ protected bool Equals(JournalTableConfig other)
AutoInitialize == other.AutoInitialize &&
MetadataTableName == other.MetadataTableName &&
WarnOnAutoInitializeFail == other.WarnOnAutoInitializeFail &&
- Equals(MetadataColumnNames, other.MetadataColumnNames);
+ Equals(MetadataColumnNames, other.MetadataColumnNames) &&
+ TagWriteMode== other.TagWriteMode;
}
public override bool Equals(object obj)
@@ -54,7 +99,7 @@ public override bool Equals(object obj)
public override int GetHashCode()
{
return HashCode.Combine(ColumnNames, TableName, SchemaName,
- AutoInitialize, MetadataTableName, MetadataColumnNames);
+ AutoInitialize, MetadataTableName, MetadataColumnNames, TagWriteMode);
}
}
}
\ No newline at end of file
diff --git a/src/Akka.Persistence.Sql.Linq2Db/Config/ReadJournalPluginConfig.cs b/src/Akka.Persistence.Sql.Linq2Db/Config/ReadJournalPluginConfig.cs
index 691d3e9d..c27d5806 100644
--- a/src/Akka.Persistence.Sql.Linq2Db/Config/ReadJournalPluginConfig.cs
+++ b/src/Akka.Persistence.Sql.Linq2Db/Config/ReadJournalPluginConfig.cs
@@ -1,4 +1,6 @@
-namespace Akka.Persistence.Sql.Linq2Db.Config
+using System;
+
+namespace Akka.Persistence.Sql.Linq2Db.Config
{
public class ReadJournalPluginConfig
{
@@ -7,11 +9,35 @@ public ReadJournalPluginConfig(Configuration.Config config)
TagSeparator = config.GetString("tag-separator", ",");
Dao = config.GetString("dao",
"akka.persistence.sql.linq2db.dao.bytea.readjournal.bytearrayreadjournaldao");
-
+ var tagReadStr = config.GetString("tag-read-mode", "default");
+ if (Enum.TryParse(tagReadStr,true,out TagReadMode tgr))
+ {
+
+ }
+ else if (tagReadStr.Equals("default", StringComparison.InvariantCultureIgnoreCase))
+ {
+ tgr = TagReadMode.CommaSeparatedArray;
+ }
+ else if (tagReadStr.Equals("migrate", StringComparison.InvariantCultureIgnoreCase))
+ {
+ tgr = TagReadMode.CommaSeparatedArrayAndTagTable;
+ }
+
+ TagReadMode = tgr;
}
public string Dao { get; set; }
public string TagSeparator { get; set; }
+ public TagReadMode TagReadMode { get; set; }
+ public TagTableMode TagTableMode { get; }
+ }
+
+ [Flags]
+ public enum TagReadMode
+ {
+ CommaSeparatedArray = 1,
+ TagTable = 2,
+ CommaSeparatedArrayAndTagTable = 3
}
}
\ No newline at end of file
diff --git a/src/Akka.Persistence.Sql.Linq2Db/Db/AkkaPersistenceDataConnectionFactory.cs b/src/Akka.Persistence.Sql.Linq2Db/Db/AkkaPersistenceDataConnectionFactory.cs
index 5a63fef1..589a8580 100644
--- a/src/Akka.Persistence.Sql.Linq2Db/Db/AkkaPersistenceDataConnectionFactory.cs
+++ b/src/Akka.Persistence.Sql.Linq2Db/Db/AkkaPersistenceDataConnectionFactory.cs
@@ -6,6 +6,7 @@
using Akka.Persistence.Sql.Linq2Db.Journal;
using Akka.Persistence.Sql.Linq2Db.Journal.Types;
using Akka.Persistence.Sql.Linq2Db.Snapshot;
+using Akka.Streams.Dsl;
using Akka.Util;
using LinqToDB;
using LinqToDB.Configuration;
@@ -41,7 +42,6 @@ public AkkaPersistenceDataConnectionFactory(IProviderConfig
opts = new LinqToDbConnectionOptionsBuilder()
.UseConnectionString(providerName, connString)
.UseMappingSchema(mappingSchema).Build();
-
if (providerName.ToLower().StartsWith("sqlserver"))
{
policy = new SqlServerRetryPolicy();
@@ -135,7 +135,13 @@ private static void MapJournalRow(IProviderConfig config,
.HasColumnName(tableConfig.ColumnNames.SequenceNumber)
.Member(r => r.Timestamp)
.HasColumnName(tableConfig.ColumnNames.Created);
-
+ //We can skip writing tags the old way by ignoring the column in mapping.
+ journalRowBuilder.Member(r => r.tagArr).IsNotColumn();
+ if ((tableConfig.TagWriteMode &
+ TagWriteMode.CommaSeparatedArray) == 0)
+ {
+ journalRowBuilder.Member(r => r.tags).IsNotColumn();
+ }
if (config.ProviderName.ToLower().Contains("sqlite"))
{
journalRowBuilder.Member(r => r.ordering).IsPrimaryKey().HasDbType("INTEGER")
@@ -148,6 +154,52 @@ private static void MapJournalRow(IProviderConfig config,
.Member(r=>r.sequenceNumber).IsPrimaryKey();
}
+ void SetJoinCol(PropertyMappingBuilder builder,
+ PropertyMappingBuilder propertyMappingBuilder)
+ {
+ if (config.TableConfig.TagTableMode ==
+ TagTableMode.SequentialUUID)
+ {
+ builder.Member(r => r.JournalOrderingId)
+ .IsNotColumn()
+ .Member(r => r.WriteUUID)
+ .IsColumn().IsPrimaryKey();
+ journalRowBuilder.Member(r => r.WriteUUID)
+ .IsColumn();
+ }
+ else
+ {
+ builder.Member(r => r.WriteUUID)
+ .IsNotColumn()
+ .Member(r => r.JournalOrderingId)
+ .IsColumn().IsPrimaryKey();
+ journalRowBuilder.Member(r => r.WriteUUID)
+ .IsNotColumn();
+ }
+ }
+
+ if (config.TableConfig.UseEventManifestColumn)
+ {
+ journalRowBuilder.Member(r => r.eventManifest)
+ .IsColumn().HasLength(64);
+ }
+ else
+ {
+ journalRowBuilder.Member(r => r.eventManifest)
+ .IsNotColumn();
+ }
+ if ((config.TableConfig.TagWriteMode & TagWriteMode.TagTable) != 0)
+ {
+ var tagTableBuilder = fmb.Entity()
+ .HasTableName(tableConfig.TagTableName)
+ .HasSchemaName(tableConfig.SchemaName)
+ .Member(r => r.TagValue)
+ .IsColumn().IsNullable(false)
+ .HasLength(64)
+ .IsPrimaryKey();
+ SetJoinCol(tagTableBuilder, journalRowBuilder);
+ }
+
//Probably overkill, but we only set Metadata Mapping if specified
//That we are in delete compatibility mode.
if (config.IDaoConfig.SqlCommonCompatibilityMode)
diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs
index 2c749fa4..ed0f14e7 100644
--- a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs
+++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseByteArrayJournalDao.cs
@@ -1,4 +1,5 @@
using System;
+using System.Collections;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Data;
@@ -23,6 +24,37 @@
namespace Akka.Persistence.Sql.Linq2Db.Journal.DAO
{
+ public class SequentialUUIDGenerator
+ {
+ private long _counter = DateTime.UtcNow.Ticks;
+
+ ///
+ /// Gets a value to be assigned to a property.
+ ///
+ /// The change tracking entry of the entity for which the value is being generated.
+ /// The value to be assigned to a property.
+ public Guid Next()
+ {
+ var guidBytes = Guid.NewGuid().ToByteArray();
+ var counterBytes = BitConverter.GetBytes(Interlocked.Increment(ref _counter));
+
+ if (!BitConverter.IsLittleEndian)
+ {
+ System.Array.Reverse(counterBytes);
+ }
+
+ guidBytes[08] = counterBytes[1];
+ guidBytes[09] = counterBytes[0];
+ guidBytes[10] = counterBytes[7];
+ guidBytes[11] = counterBytes[6];
+ guidBytes[12] = counterBytes[5];
+ guidBytes[13] = counterBytes[4];
+ guidBytes[14] = counterBytes[3];
+ guidBytes[15] = counterBytes[2];
+
+ return new Guid(guidBytes);
+ }
+ }
public abstract class BaseByteArrayJournalDao :
BaseJournalDaoWithReadMessages,
IJournalDaoWithUpdates
@@ -53,6 +85,7 @@ protected BaseByteArrayJournalDao(IAdvancedScheduler sched,
Serializer = serializer;
deserializeFlow = Serializer.DeserializeFlow();
deserializeFlowMapped = Serializer.DeserializeFlow().Select(MessageWithBatchMapper());
+ _uuidGen = new SequentialUUIDGenerator();
//Due to C# rules we have to initialize WriteQueue here
//Keeping it here vs init function prevents accidental moving of init
//to where variables aren't set yet.
@@ -106,6 +139,7 @@ private async Task QueueWriteJournalRows(Seq xs)
new TaskCompletionSource(
TaskCreationOptions.RunContinuationsAsynchronously
);
+
//Send promise and rows into queue. If the Queue takes it,
//It will write the Promise state when finished writing (or failing)
var result =
@@ -137,6 +171,7 @@ private async Task QueueWriteJournalRows(Seq xs)
private async Task WriteJournalRows(Seq xs)
{
{
+
//hot path:
//If we only have one row, penalty for BulkCopy
//Isn't worth it due to insert caching/transaction/etc.
@@ -149,15 +184,137 @@ private async Task WriteJournalRows(Seq xs)
private async Task InsertSingle(Seq xs)
{
- //If we are writing a single row,
- //we don't need to worry about transactions.
- using (var db = _connectionFactory.GetConnection())
+ if ((_journalConfig.TableConfig.TagWriteMode & TagWriteMode.TagTable)!=0 && xs.Head.tagArr.Length>0)
+ {
+ //Lazy fallback; do the InsertMultiple call here and leave it at that.
+ await InsertMultiple(xs);
+ }
+ else
{
- await db.InsertAsync(xs.Head);
+ //If we are writing a single row,
+ //we don't need to worry about transactions.
+ using (var db = _connectionFactory.GetConnection())
+ {
+ await db.InsertAsync(xs.Head);
+ }
}
+
+
}
+ private async Task InsertWithOrderingAndBulkInsertTags(DataConnection dc,
+ Seq xs)
+ {
+ var tagsToInsert = new List(xs.Count);
+ foreach (var journalRow in xs)
+ {
+ var dbid = await dc.InsertWithInt64IdentityAsync(journalRow);
+ foreach (var s1 in journalRow.tagArr)
+ {
+ tagsToInsert.Add(new JournalTagRow(){JournalOrderingId = dbid, TagValue = s1});
+ }
+ }
+ await dc.GetTable()
+ .BulkCopyAsync(new BulkCopyOptions()
+ {
+ BulkCopyType = BulkCopyType.MultipleRows,
+ UseParameters = _journalConfig.DaoConfig
+ .PreferParametersOnMultiRowInsert,
+ MaxBatchSize = _journalConfig.DaoConfig.DbRoundTripTagBatchSize
+ }, tagsToInsert);
+ }
+ private async Task BulkInsertNoTagTableTags(DataConnection dc, Seq xs)
+ {
+ await dc.GetTable()
+ .BulkCopyAsync(
+ new BulkCopyOptions()
+ {
+ BulkCopyType =
+ xs.Count > _journalConfig.DaoConfig
+ .MaxRowByRowSize
+ ? BulkCopyType.Default
+ : BulkCopyType.MultipleRows,
+ UseParameters = _journalConfig.DaoConfig.PreferParametersOnMultiRowInsert,
+ MaxBatchSize = _journalConfig.DaoConfig.DbRoundTripBatchSize
+ }, xs);
+ }
private async Task InsertMultiple(Seq xs)
+ {
+ if ((_journalConfig.TableConfig.TagWriteMode & TagWriteMode.TagTable) !=0)
+ {
+ if (_journalConfig.TableConfig.TagTableMode ==
+ TagTableMode.OrderingId)
+ {
+ await HandleTagTableInsert(xs);
+ }
+ else
+ {
+ await HandleTagTableUUIDInsert(xs);
+ }
+ }
+ else
+ {
+ await HandleDefaultInsert(xs);
+ }
+
+ }
+
+ private async Task HandleTagTableUUIDInsert(Seq xs)
+ {
+ var tagWrites = new List();
+ foreach (var journalRow in xs)
+ {
+ if (journalRow.tagArr?.Length > 0)
+ {
+ var uid = NextUUID();
+ journalRow.WriteUUID = uid;
+ foreach (var s1 in journalRow.tagArr)
+ {
+ tagWrites.Add(new JournalTagRow(){WriteUUID=uid, TagValue = s1});
+ }
+ }
+ }
+
+ using (var ctx = _connectionFactory.GetConnection())
+ {
+ using (var tx = await ctx.BeginTransactionAsync())
+ {
+ try
+ {
+ await ctx.BulkCopyAsync(new BulkCopyOptions()
+ {
+ TableName = _journalConfig.TableConfig.TableName,
+ MaxBatchSize = _journalConfig.DaoConfig
+ .DbRoundTripBatchSize
+ },xs);
+ if (tagWrites.Count > 0)
+ {
+ await ctx.BulkCopyAsync(new BulkCopyOptions()
+ {
+ TableName = _journalConfig.TableConfig.TagTableName,
+ MaxBatchSize = _journalConfig.DaoConfig
+ .DbRoundTripTagBatchSize,
+ UseParameters = _journalConfig.DaoConfig
+ .PreferParametersOnMultiRowInsert
+ }, tagWrites);
+ }
+ await ctx.CommitTransactionAsync();
+ }
+ catch (Exception e)
+ {
+ await ctx.RollbackTransactionAsync();
+ throw;
+ }
+ }
+ }
+ }
+
+ private Guid NextUUID()
+ {
+ return _uuidGen.Next();
+ }
+
+ private async Task HandleDefaultInsert(Seq xs)
{
using (var db = _connectionFactory.GetConnection())
{
@@ -165,18 +322,7 @@ private async Task InsertMultiple(Seq xs)
{
await db.BeginTransactionAsync(IsolationLevel
.ReadCommitted);
- await db.GetTable()
- .BulkCopyAsync(
- new BulkCopyOptions()
- {
- BulkCopyType =
- xs.Count > _journalConfig.DaoConfig
- .MaxRowByRowSize
- ? BulkCopyType.Default
- : BulkCopyType.MultipleRows,
- UseParameters = _journalConfig.DaoConfig.PreferParametersOnMultiRowInsert,
- MaxBatchSize = _journalConfig.DaoConfig.DbRoundTripBatchSize
- }, xs);
+ await BulkInsertNoTagTableTags(db, xs);
await db.CommitTransactionAsync();
}
catch (Exception e)
@@ -195,9 +341,58 @@ await db.GetTable()
}
}
+ private async Task HandleTagTableInsert(Seq xs)
+ {
+ using (var db = _connectionFactory.GetConnection())
+ {
+ try
+ {
+ await db.BeginTransactionAsync(IsolationLevel
+ .ReadCommitted);
+ await consumeSequenceForTagInsert(xs, db);
+ await db.CommitTransactionAsync();
+ }
+ catch (Exception ex)
+ {
+ try
+ {
+ await db.RollbackTransactionAsync();
+ }
+ catch (Exception exception)
+ {
+ throw ex;
+ }
+
+ throw;
+ }
+ }
+ }
+
+ private async Task consumeSequenceForTagInsert(Seq xs, DataConnection db)
+ {
+ Seq tail = xs;
+ while (tail.Count > 0)
+ {
+ Seq noTags;
+ Seq hasTags;
+ (noTags, tail) =
+ tail.Span(r => r.tagArr.Length == 0);
+ if (noTags.Count > 0)
+ {
+ await BulkInsertNoTagTableTags(db, noTags);
+ }
+
+ (hasTags, tail) =
+ tail.Span(r => r.tagArr.Length > 0);
+ if (hasTags.Count > 0)
+ {
+ await InsertWithOrderingAndBulkInsertTags(db, hasTags);
+ }
+ }
+ }
//By using a custom flatten here, we avoid an Enumerable/LINQ allocation
//And are able to have a little more control over default capacity of array.
- static List FlattenListOfListsToList(List>> source) {
+ static List FlattenListOfListsToList(List> source) {
//List ResultSet(
// Akka.Util.Try> item)
@@ -236,7 +431,7 @@ public async Task> AsyncWriteMessages(
}
protected static ImmutableList BuildWriteRejections(
- List>> serializedTries)
+ List> serializedTries)
{
Exception[] builderEx =
new Exception[serializedTries.Count];
@@ -449,7 +644,9 @@ private static readonly
Expression>
sequenceNumberSelector =
r => r.SequenceNumber;
-
+
+ private readonly SequentialUUIDGenerator _uuidGen;
+
public async Task Update(string persistenceId, long sequenceNr,
object payload)
diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs
index b201b7ef..0f6f8310 100644
--- a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs
+++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/BaseJournalDaoWithReadMessages.cs
@@ -40,21 +40,30 @@ protected BaseJournalDaoWithReadMessages(IAdvancedScheduler ec,
FlowControlEnum.Continue),
async opt =>
{
- async Task>)>>
- RetrieveNextBatch()
+ async Task>> BatchFromDb(string s, long l, int i,
+ long fromSeqNo)
{
- Seq<
- Util.Try> msg;
+ Seq> msg;
using (var conn =
_connectionFactory.GetConnection())
{
- msg = await Messages(conn, persistenceId,
- opt.Item1,
- toSequenceNr, batchSize)
+ msg = await Messages(conn, s,
+ fromSeqNo,
+ l, i)
.RunWith(
- ExtSeq.Seq>(), mat);
+ ExtSeq.Seq>(), mat);
}
+ return msg;
+ }
+
+ async Task>)>>
+ RetrieveNextBatch(long fromSeq)
+ {
+ Seq<
+ Util.Try> msg;
+ msg = await BatchFromDb(persistenceId, toSequenceNr, batchSize, fromSeq);
+
var hasMoreEvents = msg.Count == batchSize;
//var lastMsg = msg.IsEmpty.LastOrDefault();
Util.Option lastSeq = Util.Option.None;
@@ -99,9 +108,9 @@ protected BaseJournalDaoWithReadMessages(IAdvancedScheduler ec,
case FlowControlEnum.Stop:
return Util.Option<((long, FlowControlEnum), Seq>)>.None;
case FlowControlEnum.Continue:
- return await RetrieveNextBatch();
+ return await RetrieveNextBatch(opt.Item1);
case FlowControlEnum.ContinueDelayed when refreshInterval.HasValue:
- return await FutureTimeoutSupport.After(refreshInterval.Value.Item1,refreshInterval.Value.Item2, RetrieveNextBatch);
+ return await FutureTimeoutSupport.After(refreshInterval.Value.Item1,refreshInterval.Value.Item2,()=> RetrieveNextBatch(opt.Item1));
default:
return InvalidFlowThrowHelper(opt);
}
diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs
index 21ec4de9..b070bd9e 100644
--- a/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs
+++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/DAO/ByteArrayJournalSerializer.cs
@@ -20,6 +20,7 @@ public class ByteArrayJournalSerializer : FlowPersistentReprSerializer _journalConfig;
private readonly string[] _separatorArray;
+ private readonly TagWriteMode _tagWriteMode;
public ByteArrayJournalSerializer(IProviderConfig journalConfig, Akka.Serialization.Serialization serializer, string separator)
{
@@ -27,6 +28,7 @@ public ByteArrayJournalSerializer(IProviderConfig journalCon
_serializer = serializer;
_separator = separator;
_separatorArray = new[] {_separator};
+ _tagWriteMode = journalConfig.TableConfig.TagWriteMode;
}
///
@@ -47,6 +49,43 @@ private static string StringSep(IImmutableSet tags,
tl + separator + tr);
}
+ private JournalRow CreateJournalRow(
+ IImmutableSet tags, IPersistentRepresentation _persistentRepr, long ts)
+ {
+ switch (_tagWriteMode)
+ {
+ case TagWriteMode.CommaSeparatedArray:
+ return new JournalRow()
+ {
+ tags = StringSep(tags, _separator),
+ Timestamp = _persistentRepr.Timestamp == 0
+ ? ts
+ : _persistentRepr.Timestamp
+ };
+ case TagWriteMode.TagTable:
+ return new JournalRow()
+ {
+ tags = "",
+ tagArr = tags.ToArray(),
+ Timestamp = _persistentRepr.Timestamp == 0
+ ? ts
+ : _persistentRepr.Timestamp
+ };
+ case TagWriteMode.CommaSeparatedArrayAndTagTable:
+ return new JournalRow()
+ {
+ tags = StringSep(tags, _separator),
+ tagArr = tags.ToArray(),
+ Timestamp = _persistentRepr.Timestamp == 0
+ ? ts
+ : _persistentRepr.Timestamp
+ };
+ default:
+ throw new Exception("Invalid Tag Write Mode!");
+ }
+ }
+
+
protected override Try Serialize(IPersistentRepresentation persistentRepr, IImmutableSet tTags, long timeStamp = 0)
{
try
@@ -55,39 +94,39 @@ protected override Try Serialize(IPersistentRepresentation persisten
return Akka.Serialization.Serialization.WithTransport(
_serializer.System, (persistentRepr
, _serializer.FindSerializerForType(persistentRepr.Payload.GetType(),_journalConfig.DefaultSerializer),
- StringSep(tTags,_separator),
- timeStamp
+ CreateJournalRow(tTags,persistentRepr,timeStamp)
),
state =>
{
- var (_persistentRepr, serializer,tags,ts) = state;
- string thisManifest = "";
+ var (_persistentRepr, serializer,row) = state;
+
if (serializer is SerializerWithStringManifest withStringManifest)
{
- thisManifest =
+ row.manifest =
withStringManifest.Manifest(_persistentRepr.Payload);
}
- else
+ else if (serializer.IncludeManifest)
{
- if (serializer.IncludeManifest)
- {
- thisManifest = _persistentRepr.Payload
+ row.manifest = _persistentRepr.Payload
.GetType().TypeQualifiedName();
- }
}
- return new Try(new JournalRow()
+ else
+ {
+ row.manifest = "";
+ }
+
{
- message =
- serializer.ToBinary(_persistentRepr.Payload),
- manifest = thisManifest,
- persistenceId = _persistentRepr.PersistenceId,
- tags = tags,
- Identifier = serializer.Identifier,
- sequenceNumber = _persistentRepr.SequenceNr,
- Timestamp = _persistentRepr.Timestamp == 0
- ? ts
- : _persistentRepr.Timestamp
- });
+ row.message =
+ serializer.ToBinary(_persistentRepr
+ .Payload);
+ row.persistenceId =
+ _persistentRepr.PersistenceId;
+ row.Identifier = serializer.Identifier;
+ row.sequenceNumber = _persistentRepr.SequenceNr;
+ row.eventManifest = _persistentRepr.Manifest;
+ }
+ return new Try(row
+ );
});
}
catch (Exception e)
@@ -118,10 +157,10 @@ protected override Try Serialize(IPersistentRepresentation persisten
state.message, state.type);
}), t.sequenceNumber,
t.persistenceId,
- t.manifest, t.deleted, ActorRefs.NoSender, null, t.Timestamp),
+ t.eventManifest??t.manifest, t.deleted, ActorRefs.NoSender, null, t.Timestamp),
t.tags?.Split(_separatorArray,
StringSplitOptions.RemoveEmptyEntries)
- .ToImmutableHashSet() ?? ImmutableHashSet.Empty,
+ .ToImmutableHashSet() ?? t.tagArr?.ToImmutableHashSet()?? ImmutableHashSet.Empty,
t.ordering));
}
else
@@ -131,10 +170,10 @@ protected override Try Serialize(IPersistentRepresentation persisten
new Persistent(_serializer.Deserialize(t.message,
identifierMaybe.Value,t.manifest), t.sequenceNumber,
t.persistenceId,
- t.manifest, t.deleted, ActorRefs.NoSender, null, t.Timestamp),
+ t.eventManifest?? t.manifest, t.deleted, ActorRefs.NoSender, null, t.Timestamp),
t.tags?.Split(_separatorArray,
StringSplitOptions.RemoveEmptyEntries)
- .ToImmutableHashSet() ?? ImmutableHashSet.Empty,
+ .ToImmutableHashSet() ??t.tagArr?.ToImmutableHashSet()?? ImmutableHashSet.Empty,
t.ordering));
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
}
diff --git a/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs b/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs
index 4574e9d6..9573af07 100644
--- a/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs
+++ b/src/Akka.Persistence.Sql.Linq2Db/Journal/Types/JournalRow.cs
@@ -1,8 +1,16 @@
-using LinqToDB;
+using System;
+using LinqToDB;
using LinqToDB.Mapping;
namespace Akka.Persistence.Sql.Linq2Db.Journal.Types
{
+
+ public sealed class JournalTagRow
+ {
+ public long JournalOrderingId { get; set; }
+ public string TagValue { get; set; }
+ public Guid WriteUUID { get; set; }
+ }
public sealed class JournalRow
{
public JournalRow()
@@ -22,5 +30,8 @@ public JournalRow()
public string tags { get; set; }
public string manifest { get; set; }
public int? Identifier { get; set; }
+ public string[] tagArr { get; set; }
+ public Guid? WriteUUID { get; set; }
+ public string eventManifest { get; set; }
}
}
\ No newline at end of file
diff --git a/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs b/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs
index 9d1ff11f..eae422ab 100644
--- a/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs
+++ b/src/Akka.Persistence.Sql.Linq2Db/Query/Dao/BaseByteReadArrayJournalDAO.cs
@@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
+using System.Linq.Expressions;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Persistence.Sql.Linq2Db.Config;
@@ -15,6 +16,7 @@
using Akka.Util;
using LinqToDB;
using LinqToDB.Data;
+using LinqToDB.Tools;
namespace Akka.Persistence.Sql.Linq2Db.Query.Dao
{
@@ -44,6 +46,7 @@ protected BaseByteReadArrayJournalDAO(IAdvancedScheduler ec,
protected IQueryable baseQuery(DataConnection connection)
{
+
return connection.GetTable()
.Where(jr =>
includeDeleted == false || (jr.deleted == false));
@@ -100,23 +103,107 @@ public Source<
var maxTake = MaxTake(max);
- return AsyncSource.FromEnumerable(new {_connectionFactory,maxTake,maxOffset,offset},async(input)=>
+ return AsyncSource.FromEnumerable(new {t=this,maxTake,maxOffset,offset},async(input)=>
{
- using (var conn = input._connectionFactory.GetConnection())
+ using (var conn = input.t._connectionFactory.GetConnection())
{
- return await conn.GetTable()
+ var evts = await input.t.baseQuery(conn)
.OrderBy(r => r.ordering)
.Where(r =>
r.ordering > input.offset &&
r.ordering <= input.maxOffset)
.Take(input.maxTake).ToListAsync();
+ return await AddTagDataIfNeeded(evts, conn);
}
}
).Via(deserializeFlow);
}
-
+
+ public async Task> AddTagDataIfNeeded(List toAdd, DataConnection context)
+ {
+ if ((_readJournalConfig.PluginConfig.TagReadMode &
+ TagReadMode.TagTable) != 0)
+ {
+ await addTagDataFromTagTable(toAdd, context);
+ }
+ return toAdd;
+ }
+
+ private async Task addTagDataFromTagTable(List toAdd, DataConnection context)
+ {
+ var pred = TagCheckPredicate(toAdd);
+ var tagRows = pred.HasValue
+ ? await context.GetTable()
+ .Where(pred.Value)
+ .ToListAsync()
+ : new List();
+ if (_readJournalConfig.TableConfig.TagTableMode ==
+ TagTableMode.OrderingId)
+ {
+ foreach (var journalRow in toAdd)
+ {
+ journalRow.tagArr =
+ tagRows.Where(r =>
+ r.JournalOrderingId ==
+ journalRow.ordering)
+ .Select(r => r.TagValue)
+ .ToArray();
+ }
+ }
+ else
+ {
+ foreach (var journalRow in toAdd)
+ {
+ journalRow.tagArr =
+ tagRows.Where(r =>
+ r.WriteUUID ==
+ journalRow.WriteUUID)
+ .Select(r => r.TagValue)
+ .ToArray();
+ }
+ }
+ }
+
+ public Option>> TagCheckPredicate(
+ List toAdd)
+ {
+ if (_readJournalConfig.PluginConfig.TagTableMode ==
+ TagTableMode.SequentialUUID)
+ {
+ //Check whether we have anything to query for two reasons:
+ //1: Linq2Db may choke on an empty 'in' set.
+ //2: Don't wanna make a useless round trip to the DB,
+ // if we know nothing is tagged.
+ var set = toAdd.Where(r => r.WriteUUID.HasValue)
+ .Select(r => r.WriteUUID.Value).ToList();
+ if (set.Count == 0)
+ {
+ return Option>>.None;
+ }
+ else
+ {
+ return new Option>>(r =>
+ r.WriteUUID.In(set));
+ }
+ }
+ else
+ {
+ //We can just check the count here.
+ //Alas, we won't know if there are tags
+ //Until we actually query on this one.
+ if (toAdd.Count == 0)
+ {
+ return Option>>.None;
+ }
+ else
+ {
+ return new Option>>( r =>
+ r.JournalOrderingId.In(toAdd.Select(r => r.ordering)));
+ }
+ }
+ }
public Source<
Akka.Util.Try<(IPersistentRepresentation, IImmutableSet, long)>,
NotUsed> EventsByTag(string tag, long offset, long maxOffset,
@@ -124,29 +211,163 @@ public Source<
{
var separator = _readJournalConfig.PluginConfig.TagSeparator;
var maxTake = MaxTake(max);
- return AsyncSource.FromEnumerable(new{separator,tag,offset,maxOffset,maxTake,_connectionFactory},
- async(input)=>
- {
- using (var conn = input._connectionFactory.GetConnection())
+ switch (_readJournalConfig.PluginConfig.TagReadMode)
+ {
+ case TagReadMode.CommaSeparatedArray:
+ return AsyncSource.FromEnumerable(new{separator,tag,offset,maxOffset,maxTake,t=this},
+ async(input)=>
+ {
+ using (var conn = input.t._connectionFactory.GetConnection())
+ {
+ return await input.t.baseQuery(conn)
+ .Where(r => r.tags.Contains(input.tag))
+ .OrderBy(r => r.ordering)
+ .Where(r =>
+ r.ordering > input.offset && r.ordering <= input.maxOffset)
+ .Take(input.maxTake).ToListAsync();
+ }
+ }).Via(perfectlyMatchTag(tag, separator))
+ .Via(deserializeFlow);
+ case TagReadMode.CommaSeparatedArrayAndTagTable:
+ return eventByTagMigration(tag, offset, maxOffset, separator, maxTake);
+ case TagReadMode.TagTable:
+ return eventByTagTableOnly(tag, offset, maxOffset, separator, maxTake);
+ default:
+ throw new ArgumentOutOfRangeException();
+ }
+
+
+
+ }
+
+ private Source<
+ Try<(IPersistentRepresentation, IImmutableSet, long)>,
+ NotUsed> eventByTagTableOnly(string tag, long offset,
+ long maxOffset,
+ string separator, int maxTake)
+ {
+ return AsyncSource.FromEnumerable(
+ new
{
- return await conn.GetTable()
- .Where(r => r.tags.Contains(input.tag))
- .OrderBy(r => r.ordering)
- .Where(r =>
- r.ordering > input.offset && r.ordering <= input.maxOffset)
- .Take(input.maxTake).ToListAsync();
- }
- }).Via(perfectlyMatchTag(tag, separator))
+ separator, tag, offset, maxOffset, maxTake, t=this
+ },
+ async (input) =>
+ {
+ //TODO: Optimize Flow
+ using (var conn = input.t._connectionFactory.GetConnection())
+ {
+ //First, Get eligible rows.
+ var mainRows = await
+ input.t.baseQuery(conn)
+ .LeftJoin(
+ conn.GetTable<
+ JournalTagRow>(),
+ EventsByTagOnlyJoinPredicate,
+ (jr, jtr) =>
+ new { jr, jtr })
+ .Where(r =>
+ r.jtr.TagValue == input.tag)
+ .Select(r => r.jr)
+ .Where(r =>
+ r.ordering > input.offset &&
+ r.ordering <= input.maxOffset)
+ .Take(input.maxTake).ToListAsync();
+ await addTagDataFromTagTable(mainRows, conn);
+ return mainRows;
+ }
+ })
+ //We still PerfectlyMatchTag here
+ //Because DB Collation :)
+ .Via(perfectlyMatchTag(tag, separator))
.Via(deserializeFlow);
+ }
+ private Expression> EventsByTagOnlyJoinPredicate
+ {
+ get
+ {
+ if (_readJournalConfig.TableConfig
+ .TagTableMode ==
+ TagTableMode.OrderingId)
+ return (jr, jtr) =>
+ jr.ordering ==
+ jtr.JournalOrderingId;
+ else
+ return (jr, jtr) =>
+ jr.WriteUUID == jtr.WriteUUID;
+ }
+ }
+
+ private Source, long)>, NotUsed> eventByTagMigration(string tag, long offset, long maxOffset,
+ string separator, int maxTake)
+ {
+ return AsyncSource.FromEnumerable(
+ new
+ {
+ separator, tag, offset, maxOffset, maxTake, t =this
+ },
+ async (input) =>
+ {
+ //NOTE: This flow is probably not performant,
+ //It is meant to allow for safe migration
+ //And is not necessarily intended for long term use
+ using (var conn = input.t._connectionFactory.GetConnection())
+ {
+ //First, find the rows.
+ //We use IN here instead of leftjoin
+ //because it's safer from a
+ //'avoid duplicate rows tripping things up later'
+ //standpoint.
+ var mainRows = await input.t.baseQuery(conn)
+ .Where(
+ eventsByTagMigrationPredicate(conn, input.tag)
+ )
+ .OrderBy(r => r.ordering)
+ .Where(r =>
+ r.ordering > input.offset &&
+ r.ordering <= input.maxOffset)
+ .Take(input.maxTake).ToListAsync();
+ await addTagDataFromTagTable(mainRows, conn);
+ return mainRows;
+ }
+ }).Via(perfectlyMatchTag(tag, separator))
+ .Via(deserializeFlow);
+ }
+
+ private Expression> eventsByTagMigrationPredicate(DataConnection conn, string tagVal)
+ {
+ if (_readJournalConfig.TableConfig.TagTableMode == TagTableMode.OrderingId)
+ {
+ return (JournalRow r) => r.ordering.In(
+ conn.GetTable<
+ JournalTagRow>().Where(r =>
+ r.TagValue ==
+ tagVal)
+ .Select(r =>
+ r.JournalOrderingId))
+ || r.tags.Contains(tagVal);
+ }
+ else
+ {
+ return (JournalRow r) => r.WriteUUID.Value.In(
+ conn.GetTable<
+ JournalTagRow>().Where(r =>
+ r.TagValue ==
+ tagVal)
+ .Select(r =>
+ r.WriteUUID))
+ || r.tags.Contains(tagVal);
+ }
}
private Flow perfectlyMatchTag(
string tag,
string separator)
{
-
+ //Do the tagArr check first here
+ //Since the logic is simpler.
return Flow.Create().Where(r =>
+ r.tagArr?.Contains(tag)??
(r.tags ?? "")
.Split(new[] {separator}, StringSplitOptions.RemoveEmptyEntries)
.Any(t => t.Contains(tag)));
@@ -200,6 +421,7 @@ public Source JournalSequence(long offset, long limit)
using (var conn = input._connectionFactory.GetConnection())
{
+ //persistence-jdbc does not filter deleted here.
return await conn.GetTable()
.Where(r => r.ordering > input.offset)
.Select(r => r.ordering)
@@ -212,8 +434,10 @@ public async Task MaxJournalSequenceAsync()
{
using (var db = _connectionFactory.GetConnection())
{
+ //persistence-jdbc does not filter deleted here.
return await db.GetTable()
.Select(r => r.ordering)
+ .OrderByDescending(r=>r)
.FirstOrDefaultAsync();
}
}
diff --git a/src/Akka.Persistence.Sql.Linq2Db/Query/JournalSequenceActor.cs b/src/Akka.Persistence.Sql.Linq2Db/Query/JournalSequenceActor.cs
index b71cd6f0..efab6533 100644
--- a/src/Akka.Persistence.Sql.Linq2Db/Query/JournalSequenceActor.cs
+++ b/src/Akka.Persistence.Sql.Linq2Db/Query/JournalSequenceActor.cs
@@ -72,7 +72,7 @@ protected bool _receive(object message, long currentMaxOrdering,
.RunWith(Sink.Seq(), _mat)
.PipeTo(Self, sender: Self,
success: res =>
- new NewOrderingIds(currentMaxOrdering, res));
+ new NewOrderingIds(currentMaxOrdering, res),f=> new Status.Failure(f));
}
else if (message is NewOrderingIds nids)
{
@@ -186,9 +186,7 @@ protected override void PreStart()
self.Tell(new QueryOrderingIds());
try
{
-
-
- _readJournalDao.MaxJournalSequenceAsync().ContinueWith(t =>
+ _readJournalDao.MaxJournalSequenceAsync().ContinueWith(t =>
{
if (t.IsFaulted)
{
diff --git a/src/Akka.Persistence.Sql.Linq2Db/Readme.MD b/src/Akka.Persistence.Sql.Linq2Db/Readme.MD
index fe826cff..9a29c031 100644
--- a/src/Akka.Persistence.Sql.Linq2Db/Readme.MD
+++ b/src/Akka.Persistence.Sql.Linq2Db/Readme.MD
@@ -57,7 +57,26 @@ Please read the documentation carefully. Some features may be specific to use ca
- Classes used in place of ValueTuples in certain areas
- We don't have separate Query classes at this time. This can definitely be improved in future
- A couple of places around `WriteMessagesAsync` have had their logic moved to facilitate performance (i.e. use of `await` instead of `ContinueWith`)
- - Backwards Compatibility mode is implemented, to interoperate with existing journals and snapsho stores.
+ - Backwards Compatibility mode is implemented, to interoperate with existing journals and snapsho stores.
+
+ - Tag Table Support (Alpha):
+ - Allows the writing of tags to a separate table to allow for different performance strategies when working with tags.
+ - Supports Two Tag Table Modes:
+ - WriteUUID: The tag table and join uses a 'sequential-uuid' type field that will have lower page splits while allowing for good row locality on insert.
+ - This option is intended for those who want maximum write performance, at the expense of database storage and load.
+ - OrderingId: Uses the Journal Row's 'ordering' sequential Int64 for the tag table and join.
+ - This option is intended for those who want more efficient use of the DB's space
+ - This will result in slower writes, but faster/more efficient reads.
+ - Provides multiple modes of operation for reads and writes, note that there are separate switches for both read and write!
+ - CommaSeparatedOnly: The old behavior, where the comma separated tags are held in a column
+ - CommaSeparatedAndTagTable: Will Read/Write from both the Comma Separated column as well as the Tag Table
+ - TagTableOnly: will only use the tag table for Read/Write
+ - 'Live' Migration should be possible via the following flow:
+ 1. Run Migration scripts to create new columns/tables.
+ 2. Rolling Deploy your system with Reads and Writes in 'CommaSeparatedAndTagTable' mode.
+ 3. Rolling deploy your system (again), with Writes now in 'TagTableOnly' mode.
+ 4. Run Migration App/Script to move existing tags into tag table.
+ 5. Rolling deploy your system (last one!) with Reads now in 'TagTableOnly' mode.
## Currently Implemented:
diff --git a/src/Akka.Persistence.Sql.Linq2Db/Serialization/PersistentReprSerializer.cs b/src/Akka.Persistence.Sql.Linq2Db/Serialization/PersistentReprSerializer.cs
index c2b1e347..fb3f9cac 100644
--- a/src/Akka.Persistence.Sql.Linq2Db/Serialization/PersistentReprSerializer.cs
+++ b/src/Akka.Persistence.Sql.Linq2Db/Serialization/PersistentReprSerializer.cs
@@ -11,7 +11,7 @@ namespace Akka.Persistence.Sql.Linq2Db.Serialization
{
public abstract class PersistentReprSerializer
{
- public List>> Serialize(
+ public List> Serialize(
IEnumerable messages, long timeStamp = 0)
{
return messages.Select(aw =>
@@ -23,58 +23,106 @@ public abstract class PersistentReprSerializer
//Also, if we are only persisting a single event
//We will only enumerate if we have more than one element.
var payloads =
- (aw.Payload as IImmutableList
- );
+ (aw.Payload as IImmutableList
+ );
if (payloads is null)
{
- return new Util.Try>(
+ return new Util.Try(
new ArgumentNullException(
$"{aw.PersistenceId} received empty payload for sequenceNr range " +
$"{aw.LowestSequenceNr} - {aw.HighestSequenceNr}"));
}
+
//Preallocate our list; In the common case
//This saves a tiny bit of garbage
- var retList = new List(payloads.Count);
+ var retList = new T[payloads.Count];
if (payloads.Count == 1)
+ {
+ // If there's only one payload
+ // Don't allocate the enumerable.
+ var ser = Serialize(payloads[0], timeStamp);
+ var opt = ser.Success;
+ if (opt.HasValue)
{
- // If there's only one payload
- // Don't allocate the enumerable.
- var ser = Serialize(payloads[0], timeStamp);
- var opt = ser.Success;
- if (opt.HasValue)
- {
- retList.Add(opt.Value);
- return new Util.Try>(retList);
- }
- else
- {
- return new Util.Try>(ser.Failure.Value);
- }
+ retList[0] = opt.Value;
+ return new Util.Try(retList);
}
+ else
+ {
+ return new Util.Try(ser.Failure.Value);
+ }
+ }
else
{
+ int idx = 0;
foreach (var p in payloads)
{
var ser = Serialize(p, timeStamp);
var opt = ser.Success;
if (opt.HasValue)
{
- retList.Add(opt.Value);
+ retList[idx] = opt.Value;
+ idx = idx + 1;
}
else
{
- return new Util.Try>(ser.Failure.Value);
+ return new Util.Try(ser.Failure.Value);
}
}
- return new Util.Try>(retList);
+ return new Util.Try(retList);
}
//return new Util.Try>(retList);
-
}).ToList();
}
+ private List> HandleSerializeList(long timeStamp, AtomicWrite[] msgArr)
+ {
+ List> fullSet =
+ new List>(msgArr.Length);
+ for (int i = 0; i < msgArr.Length; i++)
+ {
+ var payloads =
+ (msgArr[i].Payload as
+ IImmutableList
+ );
+ if (payloads is null)
+ {
+ fullSet.Add(new Util.Try(
+ new ArgumentNullException(
+ $"{msgArr[i].PersistenceId} received empty payload for sequenceNr range " +
+ $"{msgArr[i].LowestSequenceNr} - {msgArr[i].HighestSequenceNr}")));
+ }
+ else
+ {
+ fullSet.Add(serializerItem(timeStamp, payloads));
+ }
+ }
+
+ return fullSet;
+ }
+
+ private Util.Try serializerItem(long timeStamp, IImmutableList payloads)
+ {
+ var retList = new T[payloads.Count];
+ for (int j = 0; j < payloads.Count; j++)
+ {
+ var ser = Serialize(payloads[j], timeStamp);
+ var opt = ser.Success;
+ if (opt.HasValue)
+ {
+ retList[j] = opt.Value;
+ }
+ else
+ {
+ return new Util.Try(ser.Failure.Value);
+ }
+ }
+
+ return new Util.Try(retList);
+ }
+
public Akka.Util.Try Serialize(IPersistentRepresentation persistentRepr, long timeStamp = 0)
{