Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
bpaquet committed Jul 22, 2024
1 parent 0bb9209 commit 32d00c3
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 45 deletions.
92 changes: 52 additions & 40 deletions neurow/lib/neurow/public_api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,75 +46,82 @@ defmodule Neurow.PublicApi do

:ok = Phoenix.PubSub.subscribe(Neurow.PubSub, topic)

conn = send_chunked(conn, 200)
last_event_id = extract_last_event_id(conn)

conn =
case conn.req_headers |> List.keyfind("last-event-id", 0) do
nil ->
conn

{"last-event-id", last_event_id} ->
{last_event_id, ""} = Integer.parse(last_event_id)
{conn, sent} = import_history(conn, topic, last_event_id)

Logger.debug(fn ->
"Imported history for #{topic}, last_event_id: #{last_event_id}, imported size: #{sent}"
end)
case last_event_id do
:error ->
conn |> resp(:bad_request, "Wrong value for last-event-id")

conn
end
_ ->
conn = send_chunked(conn, 200)
conn = import_history(conn, topic, last_event_id)

Logger.debug("Client subscribed to #{topic}")
Logger.debug("Client subscribed to #{topic}")

last_message = :os.system_time(:millisecond)
conn |> loop(timeout, keep_alive, last_message, last_message)
Logger.debug("Client disconnected from #{topic}")
conn
last_message = :os.system_time(:millisecond)
conn |> loop(timeout, keep_alive, last_message, last_message)
Logger.debug("Client disconnected from #{topic}")
conn
end

_ ->
conn |> resp(:bad_request, "Expected JWT claims are missing")
end
end

defp extract_last_event_id(conn) do
case conn.req_headers |> List.keyfind("last-event-id", 0) do
nil ->
nil

{"last-event-id", last_event_id} ->
case Integer.parse(last_event_id) do
{last_event_id, ""} -> last_event_id
_ -> :error
end
end
end

defp import_history(conn, _, nil) do
conn
end

defp import_history(conn, topic, last_event_id) do
history = GenServer.call(Neurow.TopicManager, {:get_history, topic})

process_history(conn, last_event_id, false, 0, history)
end
{conn, sent} = process_history(conn, last_event_id, 0, history)

defp process_history(conn, last_event_id, send, sent, [first | rest]) do
{_, {msg_id, msg}} = first
Logger.debug(fn ->
"Imported history for #{topic}, last_event_id: #{last_event_id}, imported size: #{sent}"
end)

new_send =
cond do
send == true ->
true
conn
end

msg_id == last_event_id ->
# Workaround: avoid to loose messages in tests
Process.sleep(1)
true
defp process_history(conn, last_event_id, sent, [first | rest]) do
{_, {msg_id, msg}} = first

true ->
false
if msg_id > last_event_id do
if sent == 0 do
# Workaround: avoid to loose messages in tests
Process.sleep(1)
end

if send do
{:ok, conn} = chunk(conn, "id: #{msg_id}\ndata: #{msg}\n\n")
process_history(conn, last_event_id, new_send, sent + 1, rest)
conn = write_chunk(conn, msg_id, msg)
process_history(conn, last_event_id, sent + 1, rest)
else
process_history(conn, last_event_id, new_send, sent, rest)
process_history(conn, last_event_id, sent, rest)
end
end

defp process_history(conn, _, _, sent, []) do
defp process_history(conn, _, sent, []) do
{conn, sent}
end

defp loop(conn, sse_timeout, keep_alive, last_message, last_ping) do
receive do
{:pubsub_message, msg_id, msg} ->
{:ok, conn} = chunk(conn, "id: #{msg_id}\ndata: #{msg}\n\n")
conn = write_chunk(conn, msg_id, msg)
Stats.inc_msg_published()
new_last_message = :os.system_time(:millisecond)
loop(conn, sse_timeout, keep_alive, new_last_message, new_last_message)
Expand Down Expand Up @@ -145,6 +152,11 @@ defmodule Neurow.PublicApi do
end
end

defp write_chunk(conn, msg_id, msg) do
{:ok, conn} = chunk(conn, "id: #{msg_id}\ndata: #{msg}\n\n")
conn
end

match _ do
send_resp(conn, 404, "")
end
Expand Down
25 changes: 20 additions & 5 deletions neurow/test/neurow/history_integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ defmodule Neurow.HistoryIntegrationTest do
end
end

defp subscribe(topic, headers \\ []) do
defp send_subscribe(topic, headers) do
url = "http://localhost:4000/v1/subscribe"

headers =
Expand All @@ -81,7 +81,12 @@ defmodule Neurow.HistoryIntegrationTest do
{:ok, request_id} =
:httpc.request(:get, {url, headers}, [], [{:sync, false}, {:stream, :self}])

{:start, headers} = next_message()
request_id
end

defp subscribe(topic, headers \\ [], timeout \\ 100) do
request_id = send_subscribe(topic, headers)
{:start, headers} = next_message(timeout)
assert_headers(headers, {"content-type", "text/event-stream"})
assert_headers(headers, {"cache-control", "no-cache"})
assert_headers(headers, {"connection", "close"})
Expand Down Expand Up @@ -123,6 +128,17 @@ defmodule Neurow.HistoryIntegrationTest do
:ok = :httpc.cancel_request(request_id)
end

test "last event id is a string" do
request_id = send_subscribe("bar", [{["Last-Event-ID"], "xxx"}])

receive do
{:http, {_, {{_, 400, _}, _, "Wrong value for last-event-id"}}} -> :ok
msg -> raise("Unexpected message: #{msg}")
end

:ok = :httpc.cancel_request(request_id)
end

test "last-event-id" do
first_id = publish_message("foo56", "bar")
Process.sleep(2)
Expand Down Expand Up @@ -165,10 +181,9 @@ defmodule Neurow.HistoryIntegrationTest do

# Unknown id
request_id = subscribe("bar", [{["Last-Event-ID"], "12"}])
output = all_messages()

assert_raise RuntimeError, ~r/^Timeout waiting for message$/, fn ->
next_message()
end
assert output == "id: #{first_id}\ndata: foo56\n\nid: #{second_id}\ndata: foo57\n\n"

:ok = :httpc.cancel_request(request_id)
end
Expand Down

0 comments on commit 32d00c3

Please sign in to comment.