diff --git a/lib/data_layer.ex b/lib/data_layer.ex index d527bce6..3603a5c2 100644 --- a/lib/data_layer.ex +++ b/lib/data_layer.ex @@ -252,6 +252,31 @@ defmodule AshPostgres.DataLayer do ] } + @partitioning %Spark.Dsl.Section{ + name: :partitioning, + describe: """ + A section for configuring the initial partitioning of the table + """, + examples: [ + """ + partitioning do + method :list + attribute :post + end + """ + ], + schema: [ + method: [ + type: {:one_of, [:range, :list, :hash]}, + doc: "Specifying what partitioning method to use" + ], + attribute: [ + type: :atom, + doc: "The attribute to partition on" + ] + ] + } + @postgres %Spark.Dsl.Section{ name: :postgres, describe: """ @@ -262,7 +287,8 @@ defmodule AshPostgres.DataLayer do @custom_statements, @manage_tenant, @references, - @check_constraints + @check_constraints, + @partitioning ], modules: [ :repo diff --git a/lib/data_layer/info.ex b/lib/data_layer/info.ex index 696439ae..9476b4e0 100644 --- a/lib/data_layer/info.ex +++ b/lib/data_layer/info.ex @@ -222,4 +222,14 @@ defmodule AshPostgres.DataLayer.Info do def manage_tenant_update?(resource) do Extension.get_opt(resource, [:postgres, :manage_tenant], :update?, false) end + + @doc "Partitioning method" + def partitioning_method(resource) do + Extension.get_opt(resource, [:postgres, :partitioning], :method, nil) + end + + @doc "Partitioning attribute" + def partitioning_attribute(resource) do + Extension.get_opt(resource, [:postgres, :partitioning], :attribute, nil) + end end diff --git a/lib/migration_generator/migration_generator.ex b/lib/migration_generator/migration_generator.ex index 84630f7d..d65a0af1 100644 --- a/lib/migration_generator/migration_generator.ex +++ b/lib/migration_generator/migration_generator.ex @@ -1367,7 +1367,8 @@ defmodule AshPostgres.MigrationGenerator do table: table, schema: schema, multitenancy: multitenancy, - repo: repo + repo: repo, + partitioning: partitioning } | rest ], @@ -1376,7 +1377,12 @@ defmodule AshPostgres.MigrationGenerator do ) do group_into_phases( rest, - %Phase.Create{table: table, schema: schema, multitenancy: multitenancy, repo: repo}, + %Phase.Create{ + table: table, + schema: schema, + multitenancy: multitenancy, repo: repo, + partitioning: partitioning + }, acc ) end @@ -2020,7 +2026,8 @@ defmodule AshPostgres.MigrationGenerator do schema: snapshot.schema, repo: snapshot.repo, multitenancy: snapshot.multitenancy, - old_multitenancy: empty_snapshot.multitenancy + old_multitenancy: empty_snapshot.multitenancy, + partitioning: snapshot.partitioning } | acc ]) @@ -3080,7 +3087,8 @@ defmodule AshPostgres.MigrationGenerator do repo: AshPostgres.DataLayer.Info.repo(resource, :mutate), multitenancy: multitenancy(resource), base_filter: AshPostgres.DataLayer.Info.base_filter_sql(resource), - has_create_action: has_create_action?(resource) + has_create_action: has_create_action?(resource), + partitioning: partitioning(resource) } hash = @@ -3155,6 +3163,20 @@ defmodule AshPostgres.MigrationGenerator do end) end + defp partitioning(resource) do + method = AshPostgres.DataLayer.Info.partitioning_method(resource) + attribute = AshPostgres.DataLayer.Info.partitioning_attribute(resource) + + if not is_nil(method) and not is_nil(attribute) do + %{ + method: method, + attribute: attribute + } + else + nil + end + end + defp multitenancy(resource) do strategy = Ash.Resource.Info.multitenancy_strategy(resource) attribute = Ash.Resource.Info.multitenancy_attribute(resource) diff --git a/lib/migration_generator/operation.ex b/lib/migration_generator/operation.ex index 3b266494..aefe3764 100644 --- a/lib/migration_generator/operation.ex +++ b/lib/migration_generator/operation.ex @@ -145,7 +145,7 @@ defmodule AshPostgres.MigrationGenerator.Operation do defmodule CreateTable do @moduledoc false - defstruct [:table, :schema, :multitenancy, :old_multitenancy, :repo] + defstruct [:table, :schema, :multitenancy, :old_multitenancy, :repo, :partitioning] end defmodule AddAttribute do diff --git a/lib/migration_generator/phase.ex b/lib/migration_generator/phase.ex index a4e5861b..8511898d 100644 --- a/lib/migration_generator/phase.ex +++ b/lib/migration_generator/phase.ex @@ -3,7 +3,15 @@ defmodule AshPostgres.MigrationGenerator.Phase do defmodule Create do @moduledoc false - defstruct [:table, :schema, :multitenancy, :repo, operations: [], commented?: false] + defstruct [ + :table, + :schema, + :multitenancy, + :repo, + partitioning: nil, + operations: [], + commented?: false + ] import AshPostgres.MigrationGenerator.Operation.Helper, only: [as_atom: 1] @@ -12,10 +20,13 @@ defmodule AshPostgres.MigrationGenerator.Phase do table: table, operations: operations, multitenancy: multitenancy, - repo: repo + repo: repo, + partitioning: partitioning }) do if multitenancy.strategy == :context do - "create table(:#{as_atom(table)}, primary_key: false, prefix: prefix()) do\n" <> + arguments = arguments([prefix(true), options(partitioning: partitioning)]) + + "create table(:#{as_atom(table)}, primary_key: false#{arguments}) do\n" <> Enum.map_join(operations, "\n", fn operation -> operation.__struct__.up(operation) end) <> "\nend" else @@ -26,15 +37,10 @@ defmodule AshPostgres.MigrationGenerator.Phase do "" end - opts = - if schema do - ", prefix: \"#{schema}\"" - else - "" - end + arguments = arguments([prefix(schema), options(partitioning: partitioning)]) pre_create <> - "create table(:#{as_atom(table)}, primary_key: false#{opts}) do\n" <> + "create table(:#{as_atom(table)}, primary_key: false#{arguments}) do\n" <> Enum.map_join(operations, "\n", fn operation -> operation.__struct__.up(operation) end) <> "\nend" end @@ -54,6 +60,28 @@ defmodule AshPostgres.MigrationGenerator.Phase do "drop table(:#{as_atom(table)}#{opts})" end end + + def arguments([nil, nil]), do: "" + def arguments(arguments), do: ", " <> Enum.join(Enum.reject(arguments, &is_nil(&1)), ",") + + def prefix(true), do: "prefix: prefix()" + def prefix(schema) when is_binary(schema) and schema != "", do: "prefix: \"#{schema}\"" + def prefix(_), do: nil + + def options(_options, _acc \\ []) + def options([], []), do: nil + def options([], acc), do: "options: \"#{Enum.join(acc, " ")}\"" + + def options([{:partitioning, %{method: method, attribute: attribute}} | rest], acc) do + option = "PARTITION BY #{String.upcase(Atom.to_string(method))} (#{attribute})" + + rest + |> options(acc ++ [option]) + end + + def options([_ | rest], acc) do + options(rest, acc) + end end defmodule Alter do diff --git a/lib/partitioning.ex b/lib/partitioning.ex new file mode 100644 index 00000000..5067c989 --- /dev/null +++ b/lib/partitioning.ex @@ -0,0 +1,114 @@ +defmodule AshPostgres.Partitioning do + @moduledoc false + + @doc """ + Create a new partition for a resource + """ + def create_partition(resource, opts) do + repo = AshPostgres.DataLayer.Info.repo(resource) + + resource + |> AshPostgres.DataLayer.Info.partitioning_method() + |> case do + :range -> + create_range_partition(repo, resource, opts) + + :list -> + create_list_partition(repo, resource, opts) + + :hash -> + create_hash_partition(repo, resource, opts) + + unsupported_method -> + raise "Invalid partition method, got: #{unsupported_method}" + end + end + + @doc """ + Check if partition exists + """ + def existing_partition?(resource, opts) do + repo = AshPostgres.DataLayer.Info.repo(resource) + + resource + |> AshPostgres.DataLayer.Info.partitioning_method() + |> case do + :range -> + false + + :list -> + partition_name = partition_name(resource, opts) + schema_exists?(repo, resource, partition_name, opts) + + :hash -> + false + + unsupported_method -> + raise "Invalid partition method, got: #{unsupported_method}" + end + end + + # TBI + defp create_range_partition(_repo, _resource, _opts) do + end + + defp create_list_partition(repo, resource, opts) do + key = Keyword.fetch!(opts, :key) + table = AshPostgres.DataLayer.Info.table(resource) + partition_name = partition_name(resource, opts) + + schema = + Keyword.get(opts, :tenant) + |> tenant_schema(resource) + + if schema_exists?(repo, resource, partition_name, opts) do + {:error, :already_exists} + else + Ecto.Adapters.SQL.query( + repo, + "CREATE TABLE \"#{schema}\".\"#{partition_name}\" PARTITION OF \"#{schema}\".\"#{table}\" FOR VALUES IN ('#{key}')" + ) + + if schema_exists?(repo, resource, partition_name, opts) do + :ok + else + {:error, "Unable to create partition"} + end + end + end + + # TBI + defp create_hash_partition(_repo, _resource, _opts) do + end + + defp schema_exists?(repo, resource, parition_name, opts) do + schema = + Keyword.get(opts, :tenant) + |> tenant_schema(resource) + + %Postgrex.Result{} = + result = + repo + |> Ecto.Adapters.SQL.query!( + "select table_name from information_schema.tables t where t.table_schema = $1 and t.table_name = $2", + [schema, parition_name] + ) + + result.num_rows > 0 + end + + defp partition_name(resource, opts) do + key = Keyword.fetch!(opts, :key) + table = AshPostgres.DataLayer.Info.table(resource) + "#{table}_#{key}" + end + + defp tenant_schema(tenant, resource) do + tenant + |> Ash.ToTenant.to_tenant(resource) + |> case do + nil -> "public" + tenant -> tenant + end + end +end diff --git a/priv/resource_snapshots/test_repo/partitioned_posts/20250214114101.json b/priv/resource_snapshots/test_repo/partitioned_posts/20250214114101.json new file mode 100644 index 00000000..e88c485e --- /dev/null +++ b/priv/resource_snapshots/test_repo/partitioned_posts/20250214114101.json @@ -0,0 +1,43 @@ +{ + "attributes": [ + { + "allow_nil?": false, + "default": "fragment(\"gen_random_uuid()\")", + "generated?": false, + "primary_key?": true, + "references": null, + "size": null, + "source": "id", + "type": "uuid" + }, + { + "allow_nil?": false, + "default": "1", + "generated?": false, + "primary_key?": true, + "references": null, + "size": null, + "source": "key", + "type": "bigint" + } + ], + "base_filter": null, + "check_constraints": [], + "custom_indexes": [], + "custom_statements": [], + "has_create_action": false, + "hash": "7FE5D9659135887A47FAE2729CEB0281FA8FF392EDB3B43426EAFD89A1518FEB", + "identities": [], + "multitenancy": { + "attribute": null, + "global": null, + "strategy": null + }, + "partitioning": { + "attribute": "key", + "method": "list" + }, + "repo": "Elixir.AshPostgres.TestRepo", + "schema": null, + "table": "partitioned_posts" +} \ No newline at end of file diff --git a/priv/resource_snapshots/test_repo/tenants/composite_key/20250217095820.json b/priv/resource_snapshots/test_repo/tenants/composite_key/20250217095820.json new file mode 100644 index 00000000..e78ef415 --- /dev/null +++ b/priv/resource_snapshots/test_repo/tenants/composite_key/20250217095820.json @@ -0,0 +1,40 @@ +{ + "attributes": [ + { + "allow_nil?": false, + "default": "nil", + "generated?": true, + "primary_key?": true, + "references": null, + "size": null, + "source": "id", + "type": "bigint" + }, + { + "allow_nil?": false, + "default": "nil", + "generated?": false, + "primary_key?": false, + "references": null, + "size": null, + "source": "title", + "type": "text" + } + ], + "base_filter": null, + "check_constraints": [], + "custom_indexes": [], + "custom_statements": [], + "has_create_action": true, + "hash": "F547F05D353FC4B04CC604B8F2215A512BFB9FAD20B3C1DD2BCBF2455072D958", + "identities": [], + "multitenancy": { + "attribute": null, + "global": false, + "strategy": "context" + }, + "partitioning": null, + "repo": "Elixir.AshPostgres.TestRepo", + "schema": null, + "table": "composite_key" +} \ No newline at end of file diff --git a/priv/resource_snapshots/test_repo/tenants/composite_key/20250217095828.json b/priv/resource_snapshots/test_repo/tenants/composite_key/20250217095828.json new file mode 100644 index 00000000..0e511ce5 --- /dev/null +++ b/priv/resource_snapshots/test_repo/tenants/composite_key/20250217095828.json @@ -0,0 +1,40 @@ +{ + "attributes": [ + { + "allow_nil?": false, + "default": "nil", + "generated?": true, + "primary_key?": true, + "references": null, + "size": null, + "source": "id", + "type": "bigint" + }, + { + "allow_nil?": false, + "default": "nil", + "generated?": false, + "primary_key?": true, + "references": null, + "size": null, + "source": "title", + "type": "text" + } + ], + "base_filter": null, + "check_constraints": [], + "custom_indexes": [], + "custom_statements": [], + "has_create_action": true, + "hash": "0EA09E46F197BAF8034CBFC7CCEFE46D2CCE9927ACD0991B5E90D5463B9B4AEC", + "identities": [], + "multitenancy": { + "attribute": null, + "global": false, + "strategy": "context" + }, + "partitioning": null, + "repo": "Elixir.AshPostgres.TestRepo", + "schema": null, + "table": "composite_key" +} \ No newline at end of file diff --git a/priv/test_repo/migrations/20250214114101_partitioned_post.exs b/priv/test_repo/migrations/20250214114101_partitioned_post.exs new file mode 100644 index 00000000..28fd2300 --- /dev/null +++ b/priv/test_repo/migrations/20250214114101_partitioned_post.exs @@ -0,0 +1,20 @@ +defmodule AshPostgres.TestRepo.Migrations.PartitionedPost do + @moduledoc """ + Updates resources based on their most recent snapshots. + + This file was autogenerated with `mix ash_postgres.generate_migrations` + """ + + use Ecto.Migration + + def up do + create table(:partitioned_posts, primary_key: false, options: "PARTITION BY LIST (key)") do + add(:id, :uuid, null: false, default: fragment("gen_random_uuid()"), primary_key: true) + add(:key, :bigint, null: false, default: 1, primary_key: true) + end + end + + def down do + drop(table(:partitioned_posts)) + end +end diff --git a/priv/test_repo/tenant_migrations/20250217095820_migrate_resources5.exs b/priv/test_repo/tenant_migrations/20250217095820_migrate_resources5.exs new file mode 100644 index 00000000..5aa2decb --- /dev/null +++ b/priv/test_repo/tenant_migrations/20250217095820_migrate_resources5.exs @@ -0,0 +1,20 @@ +defmodule AshPostgres.TestRepo.TenantMigrations.MigrateResources5 do + @moduledoc """ + Updates resources based on their most recent snapshots. + + This file was autogenerated with `mix ash_postgres.generate_migrations` + """ + + use Ecto.Migration + + def up do + create table(:composite_key, primary_key: false, prefix: prefix()) do + add(:id, :bigserial, null: false, primary_key: true) + add(:title, :text, null: false) + end + end + + def down do + drop(table(:composite_key, prefix: prefix())) + end +end diff --git a/priv/test_repo/tenant_migrations/20250217095828_migrate_resources6.exs b/priv/test_repo/tenant_migrations/20250217095828_migrate_resources6.exs new file mode 100644 index 00000000..f5018509 --- /dev/null +++ b/priv/test_repo/tenant_migrations/20250217095828_migrate_resources6.exs @@ -0,0 +1,29 @@ +defmodule AshPostgres.TestRepo.TenantMigrations.MigrateResources6 do + @moduledoc """ + Updates resources based on their most recent snapshots. + + This file was autogenerated with `mix ash_postgres.generate_migrations` + """ + + use Ecto.Migration + + def up do + drop(constraint("composite_key", "composite_key_pkey", prefix: prefix())) + + alter table(:composite_key, prefix: prefix()) do + modify(:title, :text) + end + + execute("ALTER TABLE \"#{prefix()}\".\"composite_key\" ADD PRIMARY KEY (id, title)") + end + + def down do + drop(constraint("composite_key", "composite_key_pkey", prefix: prefix())) + + alter table(:composite_key, prefix: prefix()) do + modify(:title, :text) + end + + execute("ALTER TABLE \"#{prefix()}\".\"composite_key\" ADD PRIMARY KEY (id)") + end +end diff --git a/test/migration_generator_test.exs b/test/migration_generator_test.exs index e284e847..20e6e00f 100644 --- a/test/migration_generator_test.exs +++ b/test/migration_generator_test.exs @@ -301,6 +301,56 @@ defmodule AshPostgres.MigrationGeneratorTest do end end + describe "creating initial snapshots for resources with partitioning" do + setup do + on_exit(fn -> + File.rm_rf!("test_snapshots_path") + File.rm_rf!("test_migration_path") + end) + + defposts do + postgres do + partitioning do + method(:list) + attribute(:title) + end + end + + attributes do + uuid_primary_key(:id) + attribute(:title, :string, public?: true) + end + end + + defdomain([Post]) + + AshPostgres.MigrationGenerator.generate(Domain, + snapshot_path: "test_snapshots_path", + migration_path: "test_migration_path", + quiet: false, + format: false + ) + + :ok + end + + test "the migration sets up resources correctly" do + # the snapshot exists and contains valid json + assert File.read!(Path.wildcard("test_snapshots_path/test_repo/posts/*.json")) + |> Jason.decode!(keys: :atoms!) + + assert [file] = + Path.wildcard("test_migration_path/**/*_migrate_resources*.exs") + |> Enum.reject(&String.contains?(&1, "extensions")) + + file_contents = File.read!(file) + + # the migration creates the table with options specifing how to partition the table + assert file_contents =~ + ~S{create table(:posts, primary_key: false, options: "PARTITION BY LIST (title)") do} + end + end + describe "custom_indexes with `concurrently: true`" do setup do on_exit(fn -> diff --git a/test/multitenancy_test.exs b/test/multitenancy_test.exs index 0dd99520..91b443f7 100644 --- a/test/multitenancy_test.exs +++ b/test/multitenancy_test.exs @@ -2,7 +2,7 @@ defmodule AshPostgres.Test.MultitenancyTest do use AshPostgres.RepoCase, async: false require Ash.Query - alias AshPostgres.MultitenancyTest.{CompositeKeyPost, NamedOrg, Org, Post, User} + alias AshPostgres.MultitenancyTest.{CompositeKeyPost, NamedOrg, Org, Post, User, CompositeKeyPost} alias AshPostgres.Test.Post, as: GlobalPost setup do @@ -135,6 +135,15 @@ defmodule AshPostgres.Test.MultitenancyTest do assert [_] = CompositeKeyPost |> Ash.Query.set_tenant(org1) |> Ash.read!() end + test "composite key multitenancy works", %{org1: org1} do + CompositeKeyPost + |> Ash.Changeset.for_create(:create, %{title: "foo"}) + |> Ash.Changeset.set_tenant(org1) + |> Ash.create!() + + assert [_] = CompositeKeyPost |> Ash.Query.set_tenant(org1) |> Ash.read!() + end + test "loading attribute multitenant resources from context multitenant resources works" do org = Org diff --git a/test/partition_test.exs b/test/partition_test.exs new file mode 100644 index 00000000..564e36f9 --- /dev/null +++ b/test/partition_test.exs @@ -0,0 +1,15 @@ +defmodule AshPostgres.PartitionTest do + use AshPostgres.RepoCase, async: false + alias AshPostgres.Test.PartitionedPost + + test "seeding data works" do + assert false == AshPostgres.Partitioning.existing_partition?(PartitionedPost, key: 1) + assert :ok == AshPostgres.Partitioning.create_partition(PartitionedPost, key: 1) + assert true == AshPostgres.Partitioning.existing_partition?(PartitionedPost, key: 1) + + Ash.Seed.seed!(%PartitionedPost{key: 1}) + + assert :ok == AshPostgres.Partitioning.create_partition(PartitionedPost, key: 2) + Ash.Seed.seed!(%PartitionedPost{key: 2}) + end +end diff --git a/test/support/domain.ex b/test/support/domain.ex index c8c596d5..c060e46e 100644 --- a/test/support/domain.ex +++ b/test/support/domain.ex @@ -37,6 +37,7 @@ defmodule AshPostgres.Test.Domain do resource(AshPostgres.Test.PostFollower) resource(AshPostgres.Test.StatefulPostFollower) resource(AshPostgres.Test.PostWithEmptyUpdate) + resource(AshPostgres.Test.PartitionedPost) resource(AshPostgres.Test.DbPoint) resource(AshPostgres.Test.DbStringPoint) resource(AshPostgres.Test.CSV) diff --git a/test/support/resources/partitioned_post.ex b/test/support/resources/partitioned_post.ex new file mode 100644 index 00000000..c17df602 --- /dev/null +++ b/test/support/resources/partitioned_post.ex @@ -0,0 +1,28 @@ +defmodule AshPostgres.Test.PartitionedPost do + @moduledoc false + use Ash.Resource, + domain: AshPostgres.Test.Domain, + data_layer: AshPostgres.DataLayer + + postgres do + table "partitioned_posts" + repo AshPostgres.TestRepo + + partitioning do + method(:list) + attribute(:key) + end + end + + actions do + default_accept(:*) + + defaults([:read, :destroy]) + end + + attributes do + uuid_primary_key(:id, writable?: true) + + attribute(:key, :integer, allow_nil?: false, primary_key?: true, default: 1) + end +end