Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
bpaquet committed Jul 24, 2024
1 parent a984e5a commit 5246ab9
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 16 deletions.
25 changes: 16 additions & 9 deletions neurow/lib/neurow/internal_api/endpoint.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,15 @@ defmodule Neurow.InternalApi.Endpoint do

get "/history/:topic" do
history = Neurow.ReceiverShardManager.get_history(topic)
history = Enum.map(history, fn {_, message} -> %{
timestamp: message.timestamp,
payload: message.payload,
type: message.type,
} end)

history =
Enum.map(history, fn {_, message} ->
%{
timestamp: message.timestamp,
payload: message.payload,
type: message.type
}
end)

conn
|> put_resp_header("content-type", "application/json")
Expand Down Expand Up @@ -90,10 +94,13 @@ defmodule Neurow.InternalApi.Endpoint do

conn
|> put_resp_header("content-type", "application/json")
|> send_resp(200, Jason.encode!(%{
nb_published: nb_publish,
publish_timestamp: publish_timestamp
}))
|> send_resp(
200,
Jason.encode!(%{
nb_published: nb_publish,
publish_timestamp: publish_timestamp
})
)

{:error, reason} ->
conn |> send_bad_request(:invalid_payload, reason)
Expand Down
4 changes: 3 additions & 1 deletion neurow/lib/neurow/receiver_shard.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ defmodule Neurow.ReceiverShard do

result = :ets.lookup(table_0, topic) ++ :ets.lookup(table_1, topic)

Enum.sort(result, fn {_, message0}, {_, message1} -> message0.timestamp < message1.timestamp end)
Enum.sort(result, fn {_, message0}, {_, message1} ->
message0.timestamp < message1.timestamp
end)
end

@impl true
Expand Down
14 changes: 8 additions & 6 deletions neurow/test/neurow/history_integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ defmodule Neurow.HistoryIntegrationTest do

drop_one_message()
drop_one_message()
[[_, id]] = Regex.scan(~r/id=(\d+)/, call.resp_body)

# Avoid to publish message in the same millisecond
Process.sleep(2)
id
result = Jason.decode!(call.resp_body)
result["publish_timestamp"]
end

defp history(topic) do
Expand Down Expand Up @@ -111,7 +113,7 @@ defmodule Neurow.HistoryIntegrationTest do
assert length(expected_history) == length(actual_history)

Enum.each(0..(length(expected_history) - 1), fn index ->
assert Enum.at(expected_history, index) == Enum.at(actual_history, index)["message"]
assert Enum.at(expected_history, index) == Enum.at(actual_history, index)["payload"]
end)

drop_one_message()
Expand Down Expand Up @@ -172,7 +174,7 @@ defmodule Neurow.HistoryIntegrationTest do

:ok = :httpc.cancel_request(request_id)

request_id = subscribe("bar", [{["Last-Event-ID"], first_id}])
request_id = subscribe("bar", [{["Last-Event-ID"], to_string(first_id)}])
{:stream, msg} = next_message()
assert msg == "id: #{second_id}\ndata: foo57\n\n"

Expand All @@ -183,7 +185,7 @@ defmodule Neurow.HistoryIntegrationTest do
:ok = :httpc.cancel_request(request_id)

# End of is history
request_id = subscribe("bar", [{["Last-Event-ID"], second_id}])
request_id = subscribe("bar", [{["Last-Event-ID"], to_string(second_id)}])

assert_raise RuntimeError, ~r/^Timeout waiting for message$/, fn ->
next_message()
Expand All @@ -207,7 +209,7 @@ defmodule Neurow.HistoryIntegrationTest do
end)

start = 11
request_id = subscribe("bar", [{["Last-Event-ID"], Enum.at(ids, start)}])
request_id = subscribe("bar", [{["Last-Event-ID"], to_string(Enum.at(ids, start))}])
output = all_messages()

expected =
Expand Down

0 comments on commit 5246ab9

Please sign in to comment.