Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 3e4ac4f

Browse files
committedMar 23, 2025·
sessions transfer
1 parent 104f584 commit 3e4ac4f

File tree

4 files changed

+390
-3
lines changed

4 files changed

+390
-3
lines changed
 

‎lib/plausible/application.ex

+3
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ defmodule Plausible.Application do
4040
global_ttl: :timer.minutes(30),
4141
ets_options: [read_concurrency: true, write_concurrency: true]
4242
),
43+
if data_dir = Application.get_env(:plausible, :data_dir) do
44+
{Plausible.Session.Persistence, base_path: Path.join(data_dir, "sessions")}
45+
end,
4346
warmed_cache(Plausible.Site.Cache,
4447
adapter_opts: [
4548
n_lock_partitions: 1,

‎lib/plausible/session/persistence.ex

+139
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
defmodule Plausible.Session.Persistence do
2+
@moduledoc """
3+
Inter-process persistence and sharing for `:sessions` cache during deployments.
4+
5+
It works by establishing a client-server architecture where:
6+
- The "taker" one-time task retrieves ETS data from other processes via Unix domain sockets
7+
- The "giver" server process responds to requests for ETS data via Unix domain sockets
8+
"""
9+
10+
alias Plausible.ClickhouseSessionV2
11+
alias Plausible.Session.Persistence.TinySock
12+
13+
@spec took? :: boolean
14+
def took? do
15+
Application.get_env(:plausible, :took_sessions, false)
16+
end
17+
18+
@doc false
19+
def child_spec(opts) do
20+
%{
21+
id: __MODULE__,
22+
start: {__MODULE__, :start_link, [opts]},
23+
type: :supervisor
24+
}
25+
end
26+
27+
@doc false
28+
def start_link(opts) do
29+
base_path = Keyword.fetch!(opts, :base_path)
30+
31+
taker = {Task, fn -> take_ets(base_path) end}
32+
giver = {TinySock, base_path: base_path, handler: &give_ets/1}
33+
34+
children = [
35+
# Supervisor.child_spec(DumpRestore, restart: :transient),
36+
Supervisor.child_spec(taker, restart: :temporary),
37+
Supervisor.child_spec(giver, restart: :transient)
38+
]
39+
40+
Supervisor.start_link(children, strategy: :one_for_one)
41+
end
42+
43+
@doc false
44+
def give_ets({"GIVE-ETS", session_version, dump_path}) do
45+
if session_version == ClickhouseSessionV2.module_info()[:md5] do
46+
cache_names = Plausible.Cache.Adapter.get_names(:sessions)
47+
48+
dumps =
49+
Enum.map(cache_names, fn cache_name ->
50+
tab = ConCache.ets(cache_name)
51+
path = Path.join(dump_path, cache_name)
52+
{path, Task.async(fn -> dumpscan(tab, path) end)}
53+
end)
54+
55+
Enum.reduce(dumps, [], fn {path, task}, paths ->
56+
:ok = Task.await(task)
57+
[path | paths]
58+
end)
59+
else
60+
[]
61+
end
62+
end
63+
64+
@doc false
65+
def take_ets(base_path) do
66+
socks = TinySock.list(base_path)
67+
session_version = ClickhouseSessionV2.module_info()[:md5]
68+
69+
Enum.each(socks, fn sock ->
70+
dump_path = Path.join(base_path, Base.url_encode64(:crypto.strong_rand_bytes(6)))
71+
File.mkdir_p!(dump_path)
72+
73+
try do
74+
dumps = TinySock.call(sock, {"GIVE-ETS", session_version, dump_path})
75+
76+
tasks =
77+
Enum.map(dumps, fn path ->
78+
Task.async(fn -> scansave(File.read!(path)) end)
79+
end)
80+
81+
Task.await_many(tasks)
82+
after
83+
File.rm_rf!(dump_path)
84+
end
85+
end)
86+
after
87+
Application.put_env(:plausible, :took_sessions, true)
88+
end
89+
90+
defp dumpscan(tab, file) do
91+
tab = :ets.whereis(tab)
92+
:ets.safe_fixtable(tab, true)
93+
94+
File.rm(file)
95+
fd = File.open!(file, [:raw, :binary, :append, :exclusive])
96+
97+
try do
98+
dumpscan(:ets.first_lookup(tab), [], 0, tab, fd)
99+
after
100+
:ok = File.close(fd)
101+
:ets.safe_fixtable(tab, false)
102+
end
103+
end
104+
105+
defp dumpscan({k, [{_user_id, %ClickhouseSessionV2{}} = record]}, cache, cache_len, tab, fd) do
106+
bin = :erlang.term_to_binary(record)
107+
bin_len = byte_size(bin)
108+
true = bin_len < 4_294_967_296
109+
new_cache = append_cache(cache, <<bin_len::32, bin::bytes>>)
110+
new_cache_len = cache_len + bin_len + 4
111+
112+
if new_cache_len > 500_000 do
113+
:ok = :file.write(fd, new_cache)
114+
dumpscan(:ets.next_lookup(tab, k), [], 0, tab, fd)
115+
else
116+
dumpscan(:ets.next_lookup(tab, k), new_cache, new_cache_len, tab, fd)
117+
end
118+
end
119+
120+
defp dumpscan(:"$end_of_table", cache, cache_len, _tab, fd) do
121+
if cache_len > 0 do
122+
:ok = :file.write(fd, cache)
123+
end
124+
125+
:ok
126+
end
127+
128+
@compile {:inline, append_cache: 2}
129+
defp append_cache([], bin), do: bin
130+
defp append_cache(cache, bin), do: [cache | bin]
131+
132+
defp scansave(<<bin_len::32, bin::size(bin_len)-bytes, rest::bytes>>) do
133+
{user_id, session} = :erlang.binary_to_term(bin, [:safe])
134+
Plausible.Cache.Adapter.put(:sessions, user_id, session)
135+
scansave(rest)
136+
end
137+
138+
defp scansave(<<>>), do: :ok
139+
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
defmodule Plausible.Session.Persistence.TinySock do
2+
@moduledoc ~S"""
3+
Communication over Unix domain sockets.
4+
5+
## Usage
6+
7+
```elixir
8+
TinySock.server(
9+
base_path: "/tmp",
10+
handler: fn
11+
{"DUMP-ETS", requested_version, path} ->
12+
if requested_version == SessionV2.module_info[:md5] do
13+
for tab <- [:sessions1, :sessions2, :sessions3] do
14+
:ok = :ets.tab2file(tab, Path.join(path, "ets#{tab}"))
15+
end
16+
17+
:ok
18+
else
19+
{:error, :invalid_version}
20+
end
21+
end
22+
)
23+
24+
dump_path = "/tmp/ysSEjw"
25+
File.mkdir_p!(dump_path)
26+
[sock_path] = TinySock.list("/tmp")
27+
28+
with :ok <- TinySock.call(sock_path, {"DUMP-ETS", SessionV2.module_info[:md5], dump_path}) do
29+
for "ets" <> tab <- File.ls!(dump_path) do
30+
:ets.file2tab(Path.join(dump_path, tab))
31+
end
32+
end
33+
```
34+
"""
35+
36+
use GenServer, restart: :transient
37+
require Logger
38+
39+
@listen_opts [:binary, packet: :raw, nodelay: true, backlog: 128, active: false]
40+
@connect_opts [:binary, packet: :raw, nodelay: true, active: false]
41+
42+
@tag_data "tinysock"
43+
@tag_data_size byte_size(@tag_data)
44+
45+
def server(opts), do: start_link(opts)
46+
def socket(server), do: GenServer.call(server, :socket)
47+
48+
def acceptors(server) do
49+
:ets.tab2list(GenServer.call(server, :acceptors))
50+
end
51+
52+
def stop(server), do: GenServer.stop(server)
53+
54+
@doc "TODO"
55+
def list(base_path) do
56+
with {:ok, names} <- File.ls(base_path) do
57+
sock_paths =
58+
for @tag_data <> _rand = name <- names do
59+
Path.join(base_path, name)
60+
end
61+
62+
{:ok, sock_paths}
63+
end
64+
end
65+
66+
@doc "TODO"
67+
def call(sock_path, message, timeout \\ :timer.seconds(5)) do
68+
with {:ok, socket} <- sock_connect_or_rm(sock_path, timeout) do
69+
try do
70+
with :ok <- sock_send(socket, :erlang.term_to_binary(message)) do
71+
sock_recv(socket, timeout)
72+
end
73+
after
74+
sock_shut_and_close(socket)
75+
end
76+
end
77+
end
78+
79+
@doc false
80+
def start_link(opts) do
81+
{gen_opts, opts} = Keyword.split(opts, [:debug, :name, :spawn_opt, :hibernate_after])
82+
base_path = Keyword.fetch!(opts, :base_path)
83+
handler = Keyword.fetch!(opts, :handler)
84+
85+
case File.mkdir_p(base_path) do
86+
:ok ->
87+
GenServer.start_link(__MODULE__, {base_path, handler}, gen_opts)
88+
89+
{:error, reason} ->
90+
Logger.warning(
91+
"tinysock failed to create directory at #{inspect(base_path)}, reason: #{inspect(reason)}"
92+
)
93+
94+
:ignore
95+
end
96+
end
97+
98+
@impl true
99+
def init({base_path, handler}) do
100+
case sock_listen_or_retry(base_path) do
101+
{:ok, socket} ->
102+
acceptors = :ets.new(:acceptors, [:protected])
103+
state = {socket, acceptors, handler}
104+
for _ <- 1..10, do: spawn_acceptor(state)
105+
{:ok, state}
106+
107+
{:error, reason} ->
108+
Logger.warning(
109+
"tinysock failed to open a listen socket in #{inspect(base_path)}, reason: #{inspect(reason)}"
110+
)
111+
112+
:ignore
113+
end
114+
end
115+
116+
@impl true
117+
def handle_call(:acceptors, _from, {_socket, acceptors, _handler} = state) do
118+
{:reply, acceptors, state}
119+
end
120+
121+
def handle_call(:socket, _from, {socket, _acceptors, _handler} = state) do
122+
{:reply, socket, state}
123+
end
124+
125+
@impl true
126+
def handle_cast(:accepted, {socket, _acceptors, _handler} = state) do
127+
if socket, do: spawn_acceptor(state)
128+
{:noreply, state}
129+
end
130+
131+
@impl true
132+
def handle_info({:DOWN, _ref, :process, pid, reason}, state) do
133+
case reason do
134+
:normal ->
135+
remove_acceptor(state, pid)
136+
{:noreply, state}
137+
138+
:emfile ->
139+
raise File.Error, reason: reason, action: "accept socket", path: "tinysock lol"
140+
141+
reason ->
142+
# :telemetry.execute([:reuse, :acceptor, :crash], reason)
143+
Logger.error("tinysock acceptor crashed, reason: #{inspect(reason)}")
144+
{:noreply, state}
145+
end
146+
end
147+
148+
defp remove_acceptor({_socket, acceptors, _handler}, pid) do
149+
:ets.delete(acceptors, pid)
150+
end
151+
152+
defp spawn_acceptor({socket, acceptors, handler}) do
153+
{pid, _ref} =
154+
:proc_lib.spawn_opt(
155+
__MODULE__,
156+
:accept_loop,
157+
[_parent = self(), socket, handler],
158+
[:monitor]
159+
)
160+
161+
:ets.insert(acceptors, {pid})
162+
end
163+
164+
@doc false
165+
def accept_loop(parent, listen_socket, handler) do
166+
case :gen_tcp.accept(listen_socket, :timer.seconds(5)) do
167+
{:ok, socket} ->
168+
GenServer.cast(parent, :accepted)
169+
handle_message(socket, handler)
170+
171+
{:error, :timeout} ->
172+
accept_loop(parent, listen_socket, handler)
173+
174+
{:error, :closed} ->
175+
:ok
176+
177+
{:error, reason} ->
178+
exit(reason)
179+
end
180+
end
181+
182+
defp handle_message(socket, handler) do
183+
{:ok, message} = sock_recv(socket, _timeout = :timer.seconds(5))
184+
sock_send(socket, :erlang.term_to_binary(handler.(message)))
185+
after
186+
sock_shut_and_close(socket)
187+
end
188+
189+
defp sock_listen_or_retry(base_path) do
190+
sock_name = @tag_data <> Base.url_encode64(:crypto.strong_rand_bytes(4), padding: false)
191+
sock_path = Path.join(base_path, sock_name)
192+
193+
case :gen_tcp.listen(0, [{:ifaddr, {:local, sock_path}} | @listen_opts]) do
194+
{:ok, socket} -> {:ok, socket}
195+
{:error, :eaddrinuse} -> sock_listen_or_retry(base_path)
196+
{:error, reason} -> {:error, reason}
197+
end
198+
end
199+
200+
defp sock_connect_or_rm(sock_path, timeout) do
201+
case :gen_tcp.connect({:local, sock_path}, 0, @connect_opts, timeout) do
202+
{:ok, socket} ->
203+
{:ok, socket}
204+
205+
{:error, :timeout} = error ->
206+
error
207+
208+
{:error, _reason} = error ->
209+
Logger.notice(
210+
"tinysock failed to connect to #{inspect(sock_path)}, reason: #{inspect(error)}"
211+
)
212+
213+
_ = File.rm(sock_path)
214+
error
215+
end
216+
end
217+
218+
defp sock_send(socket, binary) do
219+
:gen_tcp.send(socket, <<@tag_data, byte_size(binary)::64, binary::bytes>>)
220+
end
221+
222+
defp sock_recv(socket, timeout) do
223+
with {:ok, <<@tag_data, size::64>>} <- :gen_tcp.recv(socket, @tag_data_size + 8, timeout),
224+
{:ok, binary} <- :gen_tcp.recv(socket, size, timeout) do
225+
try do
226+
{:ok, :erlang.binary_to_term(binary, [:safe])}
227+
rescue
228+
e -> {:error, e}
229+
end
230+
end
231+
end
232+
233+
defp sock_shut_and_close(socket) do
234+
:gen_tcp.shutdown(socket, :read_write)
235+
:gen_tcp.close(socket)
236+
end
237+
end

‎lib/plausible_web/controllers/api/system_controller.ex

+11-3
Original file line numberDiff line numberDiff line change
@@ -59,17 +59,25 @@ defmodule PlausibleWeb.Api.SystemController do
5959
"ok"
6060
end
6161

62+
sessions_health =
63+
if Plausible.Session.Persistence.took?() do
64+
"ok"
65+
else
66+
"waiting"
67+
end
68+
6269
status =
63-
case {postgres_health, clickhouse_health, cache_health} do
64-
{"ok", "ok", "ok"} -> 200
70+
case {postgres_health, clickhouse_health, cache_health, sessions_health} do
71+
{"ok", "ok", "ok", "ok"} -> 200
6572
_ -> 500
6673
end
6774

6875
put_status(conn, status)
6976
|> json(%{
7077
postgres: postgres_health,
7178
clickhouse: clickhouse_health,
72-
sites_cache: cache_health
79+
sites_cache: cache_health,
80+
sessions: sessions_health
7381
})
7482
end
7583
end

0 commit comments

Comments
 (0)
Please sign in to comment.