Skip to content

Feature/parallel migrations #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions PlainSql.Migrations.Tests/AbstractMigratorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,33 @@ public void Does_not_execute_the_same_script_twice()
});
}

public static async Task ExecuteMigrationsInParallel(Func<IDbConnection> connection)
{
connection().CleanDbConnection();
Migrator.CreateMigrationsTable(connection(), null);

var result =
await Task.WhenAll(
Task.Factory.StartNew(ExecuteMigration),
Task.Factory.StartNew(ExecuteMigration),
Task.Factory.StartNew(ExecuteMigration));

// Migrations table + bla
Assert.Equal(2, connection().Query<string>("SELECT Filename FROM Migrations").ToList().Count);

bool ExecuteMigration()
{
var migration = new MigrationScript
{
Name = "create bla table",
Script = "CREATE TABLE bla (Id varchar(1) NOT NULL)"
};
using (var c = connection())
Migrator.ExecuteMigrations(c, new[] {migration}, false);
return true;
}
}

private string RemoveEmptyLines(string x)
{
return String.Join("\r\n",
Expand Down
19 changes: 12 additions & 7 deletions PlainSql.Migrations.Tests/IDbConnectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,22 @@ namespace PlainSql.Migrations.Tests
{
public static class IDbConnectionExtensions
{
public static void CleanDbConnection(this IDbConnection connection)
{
using (var tx = connection.BeginTransaction())
{
connection.Execute("DROP TABLE IF EXISTS bla", transaction: tx);
connection.Execute("DROP TABLE IF EXISTS Migrations", transaction: tx);

tx.Commit();
}
}

public static void WithCleanDbConnection(this IDbConnection connection, Action<IDbConnection> action)
{
using (var c = connection)
{
using (var tx = c.BeginTransaction())
{
c.Execute("DROP TABLE IF EXISTS bla", transaction: tx);
c.Execute("DROP TABLE IF EXISTS Migrations", transaction: tx);

tx.Commit();
}
c.CleanDbConnection();

action(c);
}
Expand Down
8 changes: 8 additions & 0 deletions PlainSql.Migrations.Tests/MsSqlMigratorTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System;
using System.Data;
using System.Data.SqlClient;
using System.Threading.Tasks;
using Xunit;

namespace PlainSql.Migrations.Tests
{
Expand Down Expand Up @@ -37,5 +39,11 @@ protected string GetConnectionString()

return "Data Source=localhost;Initial Catalog=PlainSqlMigrations;User id=SA;Password=test123!;";
}

[Fact]
public Task Can_execute_scripts_in_parallel()
{
return ExecuteMigrationsInParallel(() => Connection);
}
}
}
8 changes: 8 additions & 0 deletions PlainSql.Migrations.Tests/PostgreMigratorTest.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System;
using System.Data;
using System.Threading.Tasks;
using Npgsql;
using Xunit;

namespace PlainSql.Migrations.Tests
{
Expand Down Expand Up @@ -37,5 +39,11 @@ protected string GetConnectionString()

return "Server=127.0.0.1;User Id=postgres;Password=postgres";
}

[Fact]
public Task Can_execute_scripts_in_parallel()
{
return ExecuteMigrationsInParallel(() => Connection);
}
}
}
42 changes: 34 additions & 8 deletions PlainSql.Migrations/Migrator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
using System.Data;
using System.Linq;
using System.Text;

using Dapper;
using System.Collections;
using System.Data.SqlClient;
using Serilog;

namespace PlainSql.Migrations
Expand All @@ -31,24 +31,50 @@ public static void ExecuteMigrations(this IDbConnection connection, IEnumerable<

public static void ExecuteMigrations(this IDbConnection connection, IEnumerable<MigrationScript> migrationScripts, MigrationOptions options)
{
using (var transaction = connection.BeginTransaction(IsolationLevel.Serializable))
var retryCount = 3;
while (retryCount > 0)
{
try
{
TryExecute(connection, migrationScripts, options);
return;
}
// a sql exception that is a deadlock
catch (SqlException e) when (e.Number == 1205 && retryCount != 0)
{
Log.Information(e,"{RetryCount} remaining retries for the execution of migrations", retryCount);
retryCount--;
}
}
}

private static void TryExecute(IDbConnection connection, IEnumerable<MigrationScript> migrationScripts, MigrationOptions options)
{
using (var transaction = connection.BeginTransaction())
{
var containsMigrationTable = false;

var selectMigrationsExecuted = "SELECT Filename FROM Migrations";

if (connection.IsSqlite())
{
containsMigrationTable = connection.ExecuteScalar<int>("SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='Migrations'",
transaction: transaction) == 1;
containsMigrationTable = connection.ExecuteScalar<int>(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='Migrations'",
transaction: transaction) == 1;
}
else if (connection.IsPostgre())
{
containsMigrationTable = connection.ExecuteScalar<int>("SELECT COUNT(*) FROM pg_catalog.pg_tables WHERE schemaname='public' AND tablename='migrations'",
connection.Execute("SELECT pg_advisory_xact_lock(1)", transaction: transaction);
containsMigrationTable = connection.ExecuteScalar<int>(
"SELECT COUNT(*) FROM pg_catalog.pg_tables WHERE schemaname='public' AND tablename='migrations'",
transaction: transaction) == 1;
}
else
{
containsMigrationTable = connection.ExecuteScalar<int>("SELECT COUNT(*) FROM INFORMATION_SCHEMA.tables WHERE TABLE_SCHEMA='dbo' AND TABLE_NAME='Migrations'",
transaction: transaction) == 1;
containsMigrationTable = connection.ExecuteScalar<int>(
"SELECT COUNT(*) FROM INFORMATION_SCHEMA.tables with (SERIALIZABLE) WHERE TABLE_SCHEMA='dbo' AND TABLE_NAME='Migrations' ",
transaction: transaction) == 1;
selectMigrationsExecuted = "SELECT Filename FROM Migrations with(SERIALIZABLE)";
}

if (options.CreateMigrationsTable && !containsMigrationTable)
Expand All @@ -57,7 +83,7 @@ public static void ExecuteMigrations(this IDbConnection connection, IEnumerable<
CreateMigrationsTable(connection, transaction);
}

var migrationsExecuted = connection.Query<string>("SELECT Filename FROM Migrations", transaction: transaction).ToList();
var migrationsExecuted = connection.Query<string>(selectMigrationsExecuted, transaction: transaction).ToList();

var migrationScriptsToExecute = migrationScripts
.Where(migrationScript => !migrationsExecuted.Contains(migrationScript.Name, StringComparer.OrdinalIgnoreCase))
Expand Down