Skip to content

Commit 96d1b27

Browse files
committed
sessions transfer
1 parent 104f584 commit 96d1b27

File tree

4 files changed

+411
-3
lines changed

4 files changed

+411
-3
lines changed

lib/plausible/application.ex

+5
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ defmodule Plausible.Application do
4040
global_ttl: :timer.minutes(30),
4141
ets_options: [read_concurrency: true, write_concurrency: true]
4242
),
43+
on_ee do
44+
if data_dir = Application.get_env(:plausible, :data_dir) do
45+
{Plausible.Session.Persistence, base_path: Path.join(data_dir, "sessions")}
46+
end
47+
end,
4348
warmed_cache(Plausible.Site.Cache,
4449
adapter_opts: [
4550
n_lock_partitions: 1,

lib/plausible/session/persistence.ex

+154
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
defmodule Plausible.Session.Persistence do
2+
@moduledoc """
3+
Inter-process persistence and sharing for `:sessions` cache during deployments.
4+
5+
It works by establishing a client-server architecture where:
6+
- The "taker" one-time task retrieves ETS data from other processes via Unix domain sockets
7+
- The "giver" server process responds to requests for ETS data via Unix domain sockets
8+
"""
9+
10+
alias Plausible.ClickhouseSessionV2
11+
alias Plausible.Session.Persistence.TinySock
12+
13+
@took_sessions_key :took_sessions
14+
def took?, do: Application.get_env(:plausible, @took_sessions_key, false)
15+
defp took, do: Application.put_env(:plausible, @took_sessions_key, true)
16+
17+
@doc false
18+
def child_spec(opts) do
19+
%{
20+
id: __MODULE__,
21+
start: {__MODULE__, :start_link, [opts]},
22+
type: :supervisor
23+
}
24+
end
25+
26+
@doc false
27+
def start_link(opts) do
28+
base_path = Keyword.fetch!(opts, :base_path)
29+
30+
taker = {Task, fn -> take_ets(base_path) end}
31+
giver = {TinySock, base_path: base_path, handler: &giver_handler/1}
32+
33+
children = [
34+
# Supervisor.child_spec(DumpRestore, restart: :transient),
35+
Supervisor.child_spec(taker, restart: :temporary),
36+
Supervisor.child_spec(giver, restart: :transient)
37+
]
38+
39+
Supervisor.start_link(children, strategy: :one_for_one)
40+
end
41+
42+
defp session_version do
43+
ClickhouseSessionV2.module_info()[:md5]
44+
end
45+
46+
@give_tag "GIVE-ETS"
47+
48+
@doc false
49+
def take_ets(base_path) do
50+
socks = TinySock.list(base_path)
51+
session_version = session_version()
52+
53+
Enum.each(socks, fn sock ->
54+
dump_path = Path.join(base_path, "dump" <> Base.url_encode64(:crypto.strong_rand_bytes(6)))
55+
File.mkdir_p!(dump_path)
56+
57+
try do
58+
dumps = TinySock.call(sock, {@give_tag, session_version, dump_path})
59+
60+
tasks =
61+
Enum.map(dumps, fn path ->
62+
Task.async(fn -> scansave(File.read!(path)) end)
63+
end)
64+
65+
Task.await_many(tasks)
66+
after
67+
File.rm_rf!(dump_path)
68+
end
69+
end)
70+
after
71+
took()
72+
end
73+
74+
@doc false
75+
def giver_handler({@give_tag, session_version, dump_path}) do
76+
if session_version == session_version() do
77+
give_ets(dump_path)
78+
else
79+
[]
80+
end
81+
end
82+
83+
@doc false
84+
def give_ets(dump_path) do
85+
cache_names = Plausible.Cache.Adapter.get_names(:sessions)
86+
87+
dumps =
88+
Enum.map(cache_names, fn cache_name ->
89+
tab = ConCache.ets(cache_name)
90+
path = Path.join(dump_path, to_string(cache_name))
91+
{path, Task.async(fn -> dumpscan(tab, path) end)}
92+
end)
93+
94+
Enum.reduce(dumps, [], fn {path, task}, paths ->
95+
:ok = Task.await(task)
96+
[path | paths]
97+
end)
98+
end
99+
100+
defp dumpscan(tab, file) do
101+
tab = :ets.whereis(tab)
102+
:ets.safe_fixtable(tab, true)
103+
104+
File.rm(file)
105+
fd = File.open!(file, [:raw, :binary, :append, :exclusive])
106+
107+
try do
108+
dumpscan(:ets.first_lookup(tab), [], 0, tab, fd)
109+
after
110+
:ok = File.close(fd)
111+
:ets.safe_fixtable(tab, false)
112+
end
113+
end
114+
115+
defp dumpscan({k, [record]}, cache, cache_len, tab, fd) do
116+
{_user_id, %ClickhouseSessionV2{}} = record
117+
118+
bin = :erlang.term_to_binary(record)
119+
bin_len = byte_size(bin)
120+
121+
true = bin_len < 4_294_967_296
122+
123+
new_cache = append_cache(cache, <<bin_len::32, bin::bytes>>)
124+
new_cache_len = cache_len + bin_len + 4
125+
126+
if new_cache_len > 500_000 do
127+
:ok = :file.write(fd, new_cache)
128+
dumpscan(:ets.next_lookup(tab, k), [], 0, tab, fd)
129+
else
130+
dumpscan(:ets.next_lookup(tab, k), new_cache, new_cache_len, tab, fd)
131+
end
132+
end
133+
134+
defp dumpscan(:"$end_of_table", cache, cache_len, _tab, fd) do
135+
if cache_len > 0 do
136+
:ok = :file.write(fd, cache)
137+
end
138+
139+
:ok
140+
end
141+
142+
@dialyzer :no_improper_lists
143+
@compile {:inline, append_cache: 2}
144+
defp append_cache([], bin), do: bin
145+
defp append_cache(cache, bin), do: [cache | bin]
146+
147+
defp scansave(<<bin_len::32, bin::size(bin_len)-bytes, rest::bytes>>) do
148+
{user_id, %ClickhouseSessionV2{} = session} = :erlang.binary_to_term(bin, [:safe])
149+
Plausible.Cache.Adapter.put(:sessions, user_id, session)
150+
scansave(rest)
151+
end
152+
153+
defp scansave(<<>>), do: :ok
154+
end

0 commit comments

Comments
 (0)