Skip to content

Partitioning: Initial suggestion #485

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
28 changes: 27 additions & 1 deletion lib/data_layer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,31 @@ defmodule AshPostgres.DataLayer do
]
}

@partitioning %Spark.Dsl.Section{
name: :partitioning,
describe: """
A section for configuring the initial partitioning of the table
""",
examples: [
"""
partitioning do
method :list
attribute :post
end
"""
],
schema: [
method: [
type: {:one_of, [:range, :list, :hash]},
doc: "Specifying what partitioning method to use"
],
attribute: [
type: :atom,
doc: "The attribute to partition on"
]
]
}

@postgres %Spark.Dsl.Section{
name: :postgres,
describe: """
Expand All @@ -262,7 +287,8 @@ defmodule AshPostgres.DataLayer do
@custom_statements,
@manage_tenant,
@references,
@check_constraints
@check_constraints,
@partitioning
],
modules: [
:repo
Expand Down
10 changes: 10 additions & 0 deletions lib/data_layer/info.ex
Original file line number Diff line number Diff line change
Expand Up @@ -222,4 +222,14 @@ defmodule AshPostgres.DataLayer.Info do
def manage_tenant_update?(resource) do
Extension.get_opt(resource, [:postgres, :manage_tenant], :update?, false)
end

@doc "Partitioning method"
def partitioning_method(resource) do
Extension.get_opt(resource, [:postgres, :partitioning], :method, nil)
end

@doc "Partitioning attribute"
def partitioning_attribute(resource) do
Extension.get_opt(resource, [:postgres, :partitioning], :attribute, nil)
end
end
30 changes: 26 additions & 4 deletions lib/migration_generator/migration_generator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1367,7 +1367,8 @@ defmodule AshPostgres.MigrationGenerator do
table: table,
schema: schema,
multitenancy: multitenancy,
repo: repo
repo: repo,
partitioning: partitioning
}
| rest
],
Expand All @@ -1376,7 +1377,12 @@ defmodule AshPostgres.MigrationGenerator do
) do
group_into_phases(
rest,
%Phase.Create{table: table, schema: schema, multitenancy: multitenancy, repo: repo},
%Phase.Create{
table: table,
schema: schema,
multitenancy: multitenancy, repo: repo,
partitioning: partitioning
},
acc
)
end
Expand Down Expand Up @@ -2020,7 +2026,8 @@ defmodule AshPostgres.MigrationGenerator do
schema: snapshot.schema,
repo: snapshot.repo,
multitenancy: snapshot.multitenancy,
old_multitenancy: empty_snapshot.multitenancy
old_multitenancy: empty_snapshot.multitenancy,
partitioning: snapshot.partitioning
}
| acc
])
Expand Down Expand Up @@ -3080,7 +3087,8 @@ defmodule AshPostgres.MigrationGenerator do
repo: AshPostgres.DataLayer.Info.repo(resource, :mutate),
multitenancy: multitenancy(resource),
base_filter: AshPostgres.DataLayer.Info.base_filter_sql(resource),
has_create_action: has_create_action?(resource)
has_create_action: has_create_action?(resource),
partitioning: partitioning(resource)
}

hash =
Expand Down Expand Up @@ -3155,6 +3163,20 @@ defmodule AshPostgres.MigrationGenerator do
end)
end

defp partitioning(resource) do
method = AshPostgres.DataLayer.Info.partitioning_method(resource)
attribute = AshPostgres.DataLayer.Info.partitioning_attribute(resource)

if not is_nil(method) and not is_nil(attribute) do
%{
method: method,
attribute: attribute
}
else
nil
end
end

defp multitenancy(resource) do
strategy = Ash.Resource.Info.multitenancy_strategy(resource)
attribute = Ash.Resource.Info.multitenancy_attribute(resource)
Expand Down
2 changes: 1 addition & 1 deletion lib/migration_generator/operation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ defmodule AshPostgres.MigrationGenerator.Operation do

defmodule CreateTable do
@moduledoc false
defstruct [:table, :schema, :multitenancy, :old_multitenancy, :repo]
defstruct [:table, :schema, :multitenancy, :old_multitenancy, :repo, :partitioning]
end

defmodule AddAttribute do
Expand Down
48 changes: 38 additions & 10 deletions lib/migration_generator/phase.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,15 @@ defmodule AshPostgres.MigrationGenerator.Phase do

defmodule Create do
@moduledoc false
defstruct [:table, :schema, :multitenancy, :repo, operations: [], commented?: false]
defstruct [
:table,
:schema,
:multitenancy,
:repo,
partitioning: nil,
operations: [],
commented?: false
]

import AshPostgres.MigrationGenerator.Operation.Helper, only: [as_atom: 1]

Expand All @@ -12,10 +20,13 @@ defmodule AshPostgres.MigrationGenerator.Phase do
table: table,
operations: operations,
multitenancy: multitenancy,
repo: repo
repo: repo,
partitioning: partitioning
}) do
if multitenancy.strategy == :context do
"create table(:#{as_atom(table)}, primary_key: false, prefix: prefix()) do\n" <>
arguments = arguments([prefix(true), options(partitioning: partitioning)])

"create table(:#{as_atom(table)}, primary_key: false#{arguments}) do\n" <>
Enum.map_join(operations, "\n", fn operation -> operation.__struct__.up(operation) end) <>
"\nend"
else
Expand All @@ -26,15 +37,10 @@ defmodule AshPostgres.MigrationGenerator.Phase do
""
end

opts =
if schema do
", prefix: \"#{schema}\""
else
""
end
arguments = arguments([prefix(schema), options(partitioning: partitioning)])

pre_create <>
"create table(:#{as_atom(table)}, primary_key: false#{opts}) do\n" <>
"create table(:#{as_atom(table)}, primary_key: false#{arguments}) do\n" <>
Enum.map_join(operations, "\n", fn operation -> operation.__struct__.up(operation) end) <>
"\nend"
end
Expand All @@ -54,6 +60,28 @@ defmodule AshPostgres.MigrationGenerator.Phase do
"drop table(:#{as_atom(table)}#{opts})"
end
end

def arguments([nil, nil]), do: ""
def arguments(arguments), do: ", " <> Enum.join(Enum.reject(arguments, &is_nil(&1)), ",")

def prefix(true), do: "prefix: prefix()"
def prefix(schema) when is_binary(schema) and schema != "", do: "prefix: \"#{schema}\""
def prefix(_), do: nil

def options(_options, _acc \\ [])
def options([], []), do: nil
def options([], acc), do: "options: \"#{Enum.join(acc, " ")}\""

def options([{:partitioning, %{method: method, attribute: attribute}} | rest], acc) do
option = "PARTITION BY #{String.upcase(Atom.to_string(method))} (#{attribute})"

rest
|> options(acc ++ [option])
end

def options([_ | rest], acc) do
options(rest, acc)
end
end

defmodule Alter do
Expand Down
114 changes: 114 additions & 0 deletions lib/partitioning.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
defmodule AshPostgres.Partitioning do
@moduledoc false

@doc """
Create a new partition for a resource
"""
def create_partition(resource, opts) do
repo = AshPostgres.DataLayer.Info.repo(resource)

resource
|> AshPostgres.DataLayer.Info.partitioning_method()
|> case do
:range ->
create_range_partition(repo, resource, opts)

:list ->
create_list_partition(repo, resource, opts)

:hash ->
create_hash_partition(repo, resource, opts)

unsupported_method ->
raise "Invalid partition method, got: #{unsupported_method}"
end
end

@doc """
Check if partition exists
"""
def existing_partition?(resource, opts) do
repo = AshPostgres.DataLayer.Info.repo(resource)

resource
|> AshPostgres.DataLayer.Info.partitioning_method()
|> case do
:range ->
false

:list ->
partition_name = partition_name(resource, opts)
schema_exists?(repo, resource, partition_name, opts)

:hash ->
false

unsupported_method ->
raise "Invalid partition method, got: #{unsupported_method}"
end
end

# TBI
defp create_range_partition(_repo, _resource, _opts) do
end

defp create_list_partition(repo, resource, opts) do
key = Keyword.fetch!(opts, :key)
table = AshPostgres.DataLayer.Info.table(resource)
partition_name = partition_name(resource, opts)

schema =
Keyword.get(opts, :tenant)
|> tenant_schema(resource)

if schema_exists?(repo, resource, partition_name, opts) do
{:error, :already_exists}
else
Ecto.Adapters.SQL.query(
repo,
"CREATE TABLE \"#{schema}\".\"#{partition_name}\" PARTITION OF \"#{schema}\".\"#{table}\" FOR VALUES IN ('#{key}')"
)

if schema_exists?(repo, resource, partition_name, opts) do
:ok
else
{:error, "Unable to create partition"}
end
end
end

# TBI
defp create_hash_partition(_repo, _resource, _opts) do
end

defp schema_exists?(repo, resource, parition_name, opts) do
schema =
Keyword.get(opts, :tenant)
|> tenant_schema(resource)

%Postgrex.Result{} =
result =
repo
|> Ecto.Adapters.SQL.query!(
"select table_name from information_schema.tables t where t.table_schema = $1 and t.table_name = $2",
[schema, parition_name]
)

result.num_rows > 0
end

defp partition_name(resource, opts) do
key = Keyword.fetch!(opts, :key)
table = AshPostgres.DataLayer.Info.table(resource)
"#{table}_#{key}"
end

defp tenant_schema(tenant, resource) do
tenant
|> Ash.ToTenant.to_tenant(resource)
|> case do
nil -> "public"
tenant -> tenant
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{
"attributes": [
{
"allow_nil?": false,
"default": "fragment(\"gen_random_uuid()\")",
"generated?": false,
"primary_key?": true,
"references": null,
"size": null,
"source": "id",
"type": "uuid"
},
{
"allow_nil?": false,
"default": "1",
"generated?": false,
"primary_key?": true,
"references": null,
"size": null,
"source": "key",
"type": "bigint"
}
],
"base_filter": null,
"check_constraints": [],
"custom_indexes": [],
"custom_statements": [],
"has_create_action": false,
"hash": "7FE5D9659135887A47FAE2729CEB0281FA8FF392EDB3B43426EAFD89A1518FEB",
"identities": [],
"multitenancy": {
"attribute": null,
"global": null,
"strategy": null
},
"partitioning": {
"attribute": "key",
"method": "list"
},
"repo": "Elixir.AshPostgres.TestRepo",
"schema": null,
"table": "partitioned_posts"
}
Loading
Loading