Skip to content

Commit f9eaee9

Browse files
author
morten.lund@maskon.no
committed
Initial suggestion
1 parent 23cc10a commit f9eaee9

File tree

12 files changed

+325
-9
lines changed

12 files changed

+325
-9
lines changed

lib/data_layer.ex

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,31 @@ defmodule AshPostgres.DataLayer do
256256
]
257257
}
258258

259+
@partitioning %Spark.Dsl.Section{
260+
name: :partitioning,
261+
describe: """
262+
A section for configuring the initial partitioning of the table
263+
""",
264+
examples: [
265+
"""
266+
partitioning do
267+
method :list
268+
attribute :post
269+
end
270+
"""
271+
],
272+
schema: [
273+
method: [
274+
type: {:one_of, [:range, :list, :hash]},
275+
doc: "Specifying what partitioning method to use"
276+
],
277+
attribute: [
278+
type: :atom,
279+
doc: "The attribute to partition on"
280+
]
281+
]
282+
}
283+
259284
@postgres %Spark.Dsl.Section{
260285
name: :postgres,
261286
describe: """
@@ -266,7 +291,8 @@ defmodule AshPostgres.DataLayer do
266291
@custom_statements,
267292
@manage_tenant,
268293
@references,
269-
@check_constraints
294+
@check_constraints,
295+
@partitioning
270296
],
271297
modules: [
272298
:repo

lib/data_layer/info.ex

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,4 +226,14 @@ defmodule AshPostgres.DataLayer.Info do
226226
def manage_tenant_update?(resource) do
227227
Extension.get_opt(resource, [:postgres, :manage_tenant], :update?, false)
228228
end
229+
230+
@doc "Partitioning method"
231+
def partitioning_method(resource) do
232+
Extension.get_opt(resource, [:postgres, :partitioning], :method, nil)
233+
end
234+
235+
@doc "Partitioning attribute"
236+
def partitioning_attribute(resource) do
237+
Extension.get_opt(resource, [:postgres, :partitioning], :attribute, nil)
238+
end
229239
end

lib/migration_generator/migration_generator.ex

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1366,7 +1366,8 @@ defmodule AshPostgres.MigrationGenerator do
13661366
table: table,
13671367
schema: schema,
13681368
multitenancy: multitenancy,
1369-
repo: repo
1369+
repo: repo,
1370+
partitioning: partitioning
13701371
}
13711372
| rest
13721373
],
@@ -1375,7 +1376,8 @@ defmodule AshPostgres.MigrationGenerator do
13751376
) do
13761377
group_into_phases(
13771378
rest,
1378-
%Phase.Create{table: table, schema: schema, multitenancy: multitenancy, repo: repo},
1379+
%Phase.Create{table: table, schema: schema, multitenancy: multitenancy, repo: repo,
1380+
partitioning: partitioning},
13791381
acc
13801382
)
13811383
end
@@ -2022,7 +2024,8 @@ defmodule AshPostgres.MigrationGenerator do
20222024
attribute: nil,
20232025
strategy: nil,
20242026
global: nil
2025-
}
2027+
},
2028+
partitioning: snapshot.partitioning
20262029
}
20272030

20282031
do_fetch_operations(snapshot, empty_snapshot, opts, [
@@ -3103,7 +3106,8 @@ defmodule AshPostgres.MigrationGenerator do
31033106
repo: AshPostgres.DataLayer.Info.repo(resource, :mutate),
31043107
multitenancy: multitenancy(resource),
31053108
base_filter: AshPostgres.DataLayer.Info.base_filter_sql(resource),
3106-
has_create_action: has_create_action?(resource)
3109+
has_create_action: has_create_action?(resource),
3110+
partitioning: partitioning(resource)
31073111
}
31083112

31093113
hash =
@@ -3178,6 +3182,20 @@ defmodule AshPostgres.MigrationGenerator do
31783182
end)
31793183
end
31803184

3185+
defp partitioning(resource) do
3186+
method = AshPostgres.DataLayer.Info.partitioning_method(resource)
3187+
attribute = AshPostgres.DataLayer.Info.partitioning_attribute(resource)
3188+
3189+
if not is_nil(method) and not is_nil(attribute) do
3190+
%{
3191+
method: method,
3192+
attribute: attribute
3193+
}
3194+
else
3195+
nil
3196+
end
3197+
end
3198+
31813199
defp multitenancy(resource) do
31823200
strategy = Ash.Resource.Info.multitenancy_strategy(resource)
31833201
attribute = Ash.Resource.Info.multitenancy_attribute(resource)

lib/migration_generator/operation.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ defmodule AshPostgres.MigrationGenerator.Operation do
149149

150150
defmodule CreateTable do
151151
@moduledoc false
152-
defstruct [:table, :schema, :multitenancy, :old_multitenancy, :repo]
152+
defstruct [:table, :schema, :multitenancy, :old_multitenancy, :repo, :partitioning]
153153
end
154154

155155
defmodule AddAttribute do

lib/migration_generator/phase.ex

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ defmodule AshPostgres.MigrationGenerator.Phase do
77

88
defmodule Create do
99
@moduledoc false
10-
defstruct [:table, :schema, :multitenancy, :repo, operations: [], commented?: false]
10+
defstruct [:table, :schema, :multitenancy, partitioning: nil, :repo, operations: [], commented?: false]
1111

1212
import AshPostgres.MigrationGenerator.Operation.Helper, only: [as_atom: 1]
1313

@@ -16,10 +16,13 @@ defmodule AshPostgres.MigrationGenerator.Phase do
1616
table: table,
1717
operations: operations,
1818
multitenancy: multitenancy,
19+
partitioning: partitioning,
1920
repo: repo
2021
}) do
2122
if multitenancy.strategy == :context do
22-
"create table(:#{as_atom(table)}, primary_key: false, prefix: prefix()) do\n" <>
23+
arguments = arguments([prefix("prefix()"), options(partitioning: partitioning)])
24+
25+
"create table(:#{as_atom(table)}, primary_key: false#{arguments}) do\n" <>
2326
Enum.map_join(operations, "\n", fn operation -> operation.__struct__.up(operation) end) <>
2427
"\nend"
2528
else
@@ -36,9 +39,11 @@ defmodule AshPostgres.MigrationGenerator.Phase do
3639
else
3740
""
3841
end
42+
43+
arguments = arguments([prefix(schema), options(partitioning: partitioning)])
3944

4045
pre_create <>
41-
"create table(:#{as_atom(table)}, primary_key: false#{opts}) do\n" <>
46+
"create table(:#{as_atom(table)}, primary_key: false#{opts}#{arguments}) do\n" <>
4247
Enum.map_join(operations, "\n", fn operation -> operation.__struct__.up(operation) end) <>
4348
"\nend"
4449
end
@@ -58,6 +63,27 @@ defmodule AshPostgres.MigrationGenerator.Phase do
5863
"drop table(:#{as_atom(table)}#{opts})"
5964
end
6065
end
66+
67+
def arguments(["",""]), do: ""
68+
def arguments(arguments), do: ", " <> Enum.join(Enum.reject(arguments, &is_nil(&1)), ",")
69+
70+
def prefix(nil), do: nil
71+
def prefix(schema), do: "prefix: #{schema}"
72+
73+
def options(_options, _acc \\ [])
74+
def options([], []), do: ""
75+
def options([], acc), do: "options: \"#{Enum.join(acc, " ")}\""
76+
77+
def options([{:partitioning, %{method: method, attribute: attribute}} | rest], acc) do
78+
option = "PARTITION BY #{String.upcase(Atom.to_string(method))} (#{attribute})"
79+
80+
rest
81+
|> options(acc ++ [option])
82+
end
83+
84+
def options([_| rest], acc) do
85+
options(rest, acc)
86+
end
6187
end
6288

6389
defmodule Alter do

lib/partitioning.ex

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
defmodule AshPostgres.Partitioning do
2+
@moduledoc false
3+
4+
@doc """
5+
Create a new partition for a resource
6+
"""
7+
def create_partition(resource, opts) do
8+
repo = AshPostgres.DataLayer.Info.repo(resource)
9+
10+
resource
11+
|> AshPostgres.DataLayer.Info.partitioning_method()
12+
|> case do
13+
:range ->
14+
create_range_partition(repo, resource, opts)
15+
16+
:list ->
17+
create_list_partition(repo, resource, opts)
18+
19+
:hash ->
20+
create_hash_partition(repo, resource, opts)
21+
22+
unsupported_method ->
23+
raise "Invalid partition method, got: #{unsupported_method}"
24+
end
25+
end
26+
27+
@doc """
28+
Check if partition exists
29+
"""
30+
def exists?(resource, opts) do
31+
repo = AshPostgres.DataLayer.Info.repo(resource)
32+
key = Keyword.fetch!(opts, :key)
33+
table = AshPostgres.DataLayer.Info.table(resource)
34+
partition_name = table <> "_" <> "#{key}"
35+
36+
partition_exists?(repo, resource, partition_name)
37+
end
38+
39+
# TBI
40+
defp create_range_partition(repo, resource, opts) do
41+
end
42+
43+
defp create_list_partition(repo, resource, opts) do
44+
key = Keyword.fetch!(opts, :key)
45+
table = AshPostgres.DataLayer.Info.table(resource)
46+
partition_name = table <> "_" <> "#{key}"
47+
48+
if partition_exists?(repo, resource, partition_name) do
49+
{:error, :allready_exists}
50+
else
51+
Ecto.Adapters.SQL.query(
52+
repo,
53+
"CREATE TABLE #{partition_name} PARTITION OF public.#{table} FOR VALUES IN (#{key})"
54+
)
55+
56+
if partition_exists?(repo, resource, partition_name) do
57+
:ok
58+
else
59+
{:error, "Unable to create partition"}
60+
end
61+
end
62+
end
63+
64+
# TBI
65+
defp create_hash_partition(repo, resource, opts) do
66+
end
67+
68+
defp partition_exists?(repo, resource, parition_name) do
69+
%Postgrex.Result{} =
70+
result =
71+
repo
72+
|> Ecto.Adapters.SQL.query!(
73+
"select table_name from information_schema.tables t where t.table_schema = 'public' and t.table_name = $1",
74+
[parition_name]
75+
)
76+
77+
result.num_rows > 0
78+
end
79+
end
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
{
2+
"attributes": [
3+
{
4+
"allow_nil?": false,
5+
"default": "fragment(\"gen_random_uuid()\")",
6+
"generated?": false,
7+
"primary_key?": true,
8+
"references": null,
9+
"size": null,
10+
"source": "id",
11+
"type": "uuid"
12+
},
13+
{
14+
"allow_nil?": false,
15+
"default": "1",
16+
"generated?": false,
17+
"primary_key?": true,
18+
"references": null,
19+
"size": null,
20+
"source": "key",
21+
"type": "bigint"
22+
}
23+
],
24+
"base_filter": null,
25+
"check_constraints": [],
26+
"custom_indexes": [],
27+
"custom_statements": [],
28+
"has_create_action": false,
29+
"hash": "7FE5D9659135887A47FAE2729CEB0281FA8FF392EDB3B43426EAFD89A1518FEB",
30+
"identities": [],
31+
"multitenancy": {
32+
"attribute": null,
33+
"global": null,
34+
"strategy": null
35+
},
36+
"partitioning": {
37+
"attribute": "key",
38+
"method": "list"
39+
},
40+
"repo": "Elixir.AshPostgres.TestRepo",
41+
"schema": null,
42+
"table": "partitioned_posts"
43+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
defmodule AshPostgres.TestRepo.Migrations.PartitionedPost do
2+
@moduledoc """
3+
Updates resources based on their most recent snapshots.
4+
5+
This file was autogenerated with `mix ash_postgres.generate_migrations`
6+
"""
7+
8+
use Ecto.Migration
9+
10+
def up do
11+
create table(:partitioned_posts, primary_key: false, options: "PARTITION BY LIST (key)") do
12+
add(:id, :uuid, null: false, default: fragment("gen_random_uuid()"), primary_key: true)
13+
add(:key, :bigint, null: false, default: 1, primary_key: true)
14+
end
15+
end
16+
17+
def down do
18+
drop(table(:partitioned_posts))
19+
end
20+
end

test/migration_generator_test.exs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,56 @@ defmodule AshPostgres.MigrationGeneratorTest do
378378
end
379379
end
380380

381+
describe "creating initial snapshots for resources with partitioning" do
382+
setup do
383+
on_exit(fn ->
384+
File.rm_rf!("test_snapshots_path")
385+
File.rm_rf!("test_migration_path")
386+
end)
387+
388+
defposts do
389+
postgres do
390+
partitioning do
391+
method(:list)
392+
attribute(:title)
393+
end
394+
end
395+
396+
attributes do
397+
uuid_primary_key(:id)
398+
attribute(:title, :string, public?: true)
399+
end
400+
end
401+
402+
defdomain([Post])
403+
404+
AshPostgres.MigrationGenerator.generate(Domain,
405+
snapshot_path: "test_snapshots_path",
406+
migration_path: "test_migration_path",
407+
quiet: false,
408+
format: false
409+
)
410+
411+
:ok
412+
end
413+
414+
test "the migration sets up resources correctly" do
415+
# the snapshot exists and contains valid json
416+
assert File.read!(Path.wildcard("test_snapshots_path/test_repo/posts/*.json"))
417+
|> Jason.decode!(keys: :atoms!)
418+
419+
assert [file] =
420+
Path.wildcard("test_migration_path/**/*_migrate_resources*.exs")
421+
|> Enum.reject(&String.contains?(&1, "extensions"))
422+
423+
file_contents = File.read!(file)
424+
425+
# the migration creates the table with options specifing how to partition the table
426+
assert file_contents =~
427+
~S{create table(:posts, primary_key: false, options: "PARTITION BY LIST (title)") do}
428+
end
429+
end
430+
381431
describe "custom_indexes with `concurrently: true`" do
382432
setup do
383433
on_exit(fn ->

0 commit comments

Comments
 (0)