Skip to content

Commit

Permalink
Merge branch 'main' into rework-publish
Browse files Browse the repository at this point in the history
  • Loading branch information
achouippe committed Jul 24, 2024
2 parents fdd7dc7 + bae3da9 commit 2660a32
Show file tree
Hide file tree
Showing 11 changed files with 526 additions and 47 deletions.
5 changes: 4 additions & 1 deletion neurow/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@ config :neurow,

config :neurow,
internal_api_jwt_max_lifetime:
String.to_integer(System.get_env("INTERNAL_API_JWT_MAX_LIFETIME") || "120")
String.to_integer(System.get_env("INTERNAL_API_JWT_MAX_LIFETIME") || "1500")

config :neurow, sse_timeout: String.to_integer(System.get_env("SSE_TIMEOUT") || "900000")
config :neurow, sse_keepalive: String.to_integer(System.get_env("SSE_KEEPALIVE") || "600000")

config :neurow, ssl_keyfile: System.get_env("SSL_KEYFILE")
config :neurow, ssl_certfile: System.get_env("SSL_CERTFILE")

config :neurow,
history_min_duration: String.to_integer(System.get_env("HISTORY_MIN_DURATION") || "30")

case config_env() do
:prod ->
{:ok, interservice_json_config} =
Expand Down
26 changes: 15 additions & 11 deletions neurow/lib/neurow/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ defmodule Neurow.Application do
{:ok, ssl_keyfile} = Application.fetch_env(:neurow, :ssl_keyfile)
{:ok, ssl_certfile} = Application.fetch_env(:neurow, :ssl_certfile)

{:ok, history_min_duration} = Application.fetch_env(:neurow, :history_min_duration)

base_public_api_http_config = [
port: public_api_port,
protocol_options: [idle_timeout: :infinity],
Expand All @@ -42,17 +44,19 @@ defmodule Neurow.Application do
{:http, base_public_api_http_config}
end

children = [
Neurow.Configuration,
{Phoenix.PubSub,
name: Neurow.PubSub, options: [adapter: Phoenix.PubSub.PG2, pool_size: 10]},
{Plug.Cowboy,
scheme: :http, plug: Neurow.InternalApi.Endpoint, options: [port: internal_api_port]},
{Plug.Cowboy,
scheme: sse_http_scheme, plug: Neurow.PublicApi, options: public_api_http_config},
{Plug.Cowboy.Drainer, refs: [Neurow.PublicApi.HTTP], shutdown: 20_000},
{StopListener, []}
]
children =
[
Neurow.Configuration,
{Phoenix.PubSub,
name: Neurow.PubSub, options: [adapter: Phoenix.PubSub.PG2, pool_size: 10]},
{Plug.Cowboy,
scheme: :http, plug: Neurow.InternalApi.Endpoint, options: [port: internal_api_port]},
{Plug.Cowboy,
scheme: sse_http_scheme, plug: Neurow.PublicApi, options: public_api_http_config},
{Plug.Cowboy.Drainer, refs: [Neurow.PublicApi.HTTP], shutdown: 20_000},
{StopListener, []},
{Neurow.ReceiverShardManager, [history_min_duration]}
] ++ Neurow.ReceiverShardManager.create_receivers()

MetricsPlugExporter.setup()
Stats.setup()
Expand Down
25 changes: 17 additions & 8 deletions neurow/lib/neurow/internal_api/endpoint.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule Neurow.InternalApi.Endpoint do
&Neurow.Configuration.internal_api_verbose_authentication_errors/0,
max_lifetime: &Neurow.Configuration.internal_api_jwt_max_lifetime/0,
inc_error_callback: &Stats.inc_jwt_errors_internal/0,
exclude_path_prefixes: ["/ping", "/nodes", "/cluster_size_above"]
exclude_path_prefixes: ["/ping", "/nodes", "/cluster_size_above", "/history"]
)

plug(:match)
Expand Down Expand Up @@ -54,21 +54,30 @@ defmodule Neurow.InternalApi.Endpoint do
|> send_resp((cluster_size >= size && 200) || 404, "Cluster size: #{cluster_size}\n")
end

get "/history/:topic" do
history = Neurow.ReceiverShardManager.get_history(topic)
history = Enum.map(history, fn {_, {id, message}} -> %{id: id, message: message} end)

conn
|> put_resp_header("content-type", "application/json")
|> send_resp(200, Jason.encode!(history))
end

post "/v1/publish" do
case extract_params(conn) do
{:ok, messages, topics} ->
publish_timestamp = :os.system_time(:millisecond)

nb_publish = length(messages) * length(topics)
nb_publish =
length(messages) * length(topics)

Enum.each(topics, fn topic ->
Enum.each(messages, fn message ->
Phoenix.PubSub.broadcast!(
Neurow.PubSub,
topic,
{:pubsub_message,
%Message{message | timestamp: message.timestamp || publish_timestamp}}
)
:ok =
Neurow.ReceiverShardManager.broadcast(topic, %Message{
message
| timestamp: message.timestamp || publish_timestamp
})

Stats.inc_msg_received()
Logger.debug("Message published on topic: #{topic}")
Expand Down
80 changes: 73 additions & 7 deletions neurow/lib/neurow/public_api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,27 +46,88 @@ defmodule Neurow.PublicApi do

:ok = Phoenix.PubSub.subscribe(Neurow.PubSub, topic)

conn = send_chunked(conn, 200)
last_event_id = extract_last_event_id(conn)

Logger.debug("Client subscribed to #{topic}")
case last_event_id do
:error ->
conn |> resp(:bad_request, "Wrong value for last-event-id")

last_message = :os.system_time(:millisecond)
conn |> loop(timeout, keep_alive, last_message, last_message)
Logger.debug("Client disconnected from #{topic}")
conn
_ ->
conn = send_chunked(conn, 200)
conn = import_history(conn, topic, last_event_id)

Logger.debug("Client subscribed to #{topic}")

last_message = :os.system_time(:millisecond)
conn |> loop(timeout, keep_alive, last_message, last_message)
Logger.debug("Client disconnected from #{topic}")
conn
end

_ ->
conn |> resp(:bad_request, "Expected JWT claims are missing")
end
end

defp extract_last_event_id(conn) do
case conn.req_headers |> List.keyfind("last-event-id", 0) do
nil ->
nil

{"last-event-id", last_event_id} ->
case Integer.parse(last_event_id) do
{last_event_id, ""} -> last_event_id
_ -> :error
end
end
end

defp import_history(conn, _, nil) do
conn
end

defp import_history(conn, topic, last_event_id) do
history = Neurow.ReceiverShardManager.get_history(topic)

{conn, sent} = process_history(conn, last_event_id, 0, history)

Logger.debug(fn ->
"Imported history for #{topic}, last_event_id: #{last_event_id}, imported size: #{sent}"
end)

conn
end

defp process_history(conn, last_event_id, sent, [first | rest]) do
{_, message} = first

if message.timestamp > last_event_id do
if sent == 0 do
# Workaround: avoid to loose messages in tests
Process.sleep(1)
end

conn = write_chunk(conn, message.timestamp, message.payload)
process_history(conn, last_event_id, sent + 1, rest)
else
process_history(conn, last_event_id, sent, rest)
end
end

defp process_history(conn, _, sent, []) do
{conn, sent}
end

defp loop(conn, sse_timeout, keep_alive, last_message, last_ping) do
receive do
{:pubsub_message, message} ->
{:ok, conn} = chunk(conn, "id: #{message.timestamp}\ndata: #{message.payload}\n\n")
{:ok, conn} = write_chunk(conn, message.timestamp, message.payload)
Stats.inc_msg_published()
new_last_message = :os.system_time(:millisecond)
loop(conn, sse_timeout, keep_alive, new_last_message, new_last_message)

other ->
dbg(other)
after
1000 ->
now = :os.system_time(:millisecond)
Expand Down Expand Up @@ -94,6 +155,11 @@ defmodule Neurow.PublicApi do
end
end

defp write_chunk(conn, msg_id, msg) do
{:ok, conn} = chunk(conn, "id: #{msg_id}\ndata: #{msg}\n\n")
conn
end

match _ do
send_resp(conn, 404, "")
end
Expand Down
58 changes: 58 additions & 0 deletions neurow/lib/neurow/receiver_shard.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
defmodule Neurow.ReceiverShard do
use GenServer

def start_link(shard) do
GenServer.start_link(__MODULE__, shard, name: build_name(shard))
end

def build_name(shard) do
String.to_atom("receiver_#{shard}")
end

defp create_table(table_name) do
:ets.new(table_name, [:duplicate_bag, :protected, :named_table])
end

def table_name(shard, sub_shard) do
String.to_atom("history_#{shard}_#{sub_shard}")
end

@impl true
def init(shard) do
:ok = Phoenix.PubSub.subscribe(Neurow.PubSub, Neurow.ReceiverShardManager.build_topic(shard))
table_0 = table_name(shard, 0)
table_1 = table_name(shard, 1)
create_table(table_0)
create_table(table_1)
{:ok, {table_0, table_1}}
end

# Read from the current process, not from GenServer process
def get_history(shard, topic) do
table_0 = table_name(shard, 0)
table_1 = table_name(shard, 1)

result = :ets.lookup(table_0, topic) ++ :ets.lookup(table_1, topic)

Enum.sort(result, fn {_, {id_0, _}}, {_, {id_1, _}} -> id_0 < id_1 end)
end

@impl true
def handle_info({:pubsub_message, user_topic, message}, {table_0, table_1}) do
:ok =
Phoenix.PubSub.local_broadcast(
Neurow.PubSub,
user_topic,
{:pubsub_message, message}
)

true = :ets.insert(table_1, {user_topic, message})
{:noreply, {table_0, table_1}}
end

@impl true
def handle_info({:rotate}, {table_0, table_1}) do
:ets.delete_all_objects(table_0)
{:noreply, {table_1, table_0}}
end
end
86 changes: 86 additions & 0 deletions neurow/lib/neurow/receiver_shard_manager.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
defmodule Neurow.ReceiverShardManager do
require Logger
use GenServer

@shards 8

def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

@impl true
def init([history_min_duration]) do
Process.send_after(
self(),
{:rotate_trigger, history_min_duration},
history_min_duration * 1_000
)

{:ok, nil}
end

@impl true
def handle_info({:rotate_trigger, history_min_duration}, state) do
Process.send_after(
self(),
{:rotate_trigger, history_min_duration},
history_min_duration * 1_000
)

rotate()
{:noreply, state}
end

def all_pids(fun) do
Enum.map(0..(@shards - 1), fn shard ->
fun.({shard, Neurow.ReceiverShard.build_name(shard)})
end)
end

def rotate do
Stats.inc_history_rotate()

all_pids(fn {_, pid} ->
send(pid, {:rotate})
end)
end

@impl true
def handle_call({:rotate}, _, opts) do
rotate()
{:reply, :ok, opts}
end

def build_topic(shard) do
"__topic#{shard}"
end

# Read from the current process, not from GenServer process
def get_history(topic) do
Neurow.ReceiverShard.get_history(shard_from_topic(topic), topic)
end

def create_receivers() do
all_pids(fn {shard, pid} ->
Supervisor.child_spec({Neurow.ReceiverShard, shard}, id: pid)
end)
end

def broadcast(topic, message) do
broadcast_topic = broadcast_topic(topic)

Phoenix.PubSub.broadcast!(
Neurow.PubSub,
broadcast_topic,
{:pubsub_message, topic, message}
)
end

defp broadcast_topic(topic) do
build_topic(shard_from_topic(topic))
end

defp shard_from_topic(topic) do
:erlang.phash2(topic, @shards)
end
end
10 changes: 10 additions & 0 deletions neurow/lib/stats.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,18 @@ defmodule Stats do
help: "JWT Errors"
)

Gauge.declare(
name: :history_rotate,
help: "History rotate counter"
)

Gauge.set([name: :current_connections], 0)
Gauge.set([name: :connections], 0)
Gauge.set([name: :jwt_errors, labels: [:public]], 0)
Gauge.set([name: :jwt_errors, labels: [:internal]], 0)
Gauge.set([name: :messages, labels: [:received]], 0)
Gauge.set([name: :messages, labels: [:published]], 0)
Gauge.set([name: :history_rotate], 0)
end

def inc_connections() do
Expand All @@ -56,4 +62,8 @@ defmodule Stats do
def inc_jwt_errors_internal() do
Gauge.inc(name: :jwt_errors, labels: [:internal])
end

def inc_history_rotate() do
Gauge.inc(name: :history_rotate)
end
end
Loading

0 comments on commit 2660a32

Please sign in to comment.