Skip to content

Commit

Permalink
Reset mnesia queue's run_at when queue restarts, issue #99.
Browse files Browse the repository at this point in the history
  • Loading branch information
koudelka committed Feb 20, 2020
1 parent 3a35093 commit b793b72
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 10 deletions.
39 changes: 35 additions & 4 deletions lib/honeydew/queue/mnesia.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ defmodule Honeydew.Queue.Mnesia do
])

# inspect/1 here becase queue_name can be of the form {:global, poolname}
table = ["honeydew", inspect(queue_name)] |> Enum.join("_") |> String.to_atom
table = table_name(queue_name)
in_progress_table = ["honeydew", inspect(queue_name), "in_progress"] |> Enum.join("_") |> String.to_atom

tables = %{table => [type: :ordered_set],
Expand All @@ -94,7 +94,7 @@ defmodule Honeydew.Queue.Mnesia do
in_progress_table: in_progress_table,
access_context: access_context(table_opts)}

:ok = reset_after_crash(state)
:ok = reset_after_shutdown(state)

time_warp_mode_warning()
poll()
Expand Down Expand Up @@ -232,7 +232,14 @@ defmodule Honeydew.Queue.Mnesia do
{reply, state}
end

defp reset_after_crash(%PState{in_progress_table: in_progress_table} = state) do
defp reset_after_shutdown(state) do
reset_all_in_progress_jobs(state)
reset_all_pending_jobs(state)

:ok
end

defp reset_all_in_progress_jobs(%PState{in_progress_table: in_progress_table} = state) do
in_progress_table
|> :mnesia.dirty_first()
|> case do
Expand All @@ -244,8 +251,26 @@ defmodule Honeydew.Queue.Mnesia do
|> WrappedJob.id_from_key
|> move_to_pending_table(%{}, state)

reset_after_crash(state)
reset_all_in_progress_jobs(state)
end

:ok
end

defp reset_all_pending_jobs(%PState{access_context: access_context, table: table}) do
:mnesia.activity(access_context, fn ->
:mnesia.foldl(fn wrapped_job_record, _ ->
new_wrapped_job_record =
wrapped_job_record
|> WrappedJob.from_record()
|> WrappedJob.reset_run_at()
|> WrappedJob.to_record()

:ok = :mnesia.delete_object(table, wrapped_job_record, :write)
:ok = :mnesia.write(table, new_wrapped_job_record, :write)
end, [], table)
end)

:ok
end

Expand Down Expand Up @@ -321,6 +346,12 @@ defmodule Honeydew.Queue.Mnesia do
{:noreply, Queue.dispatch(queue_state)}
end

def table_name(queue_name) do
["honeydew", inspect(queue_name)]
|> Enum.join("_")
|> String.to_atom()
end

defp poll do
{:ok, _} = :timer.send_after(@poll_interval, :__poll__)
end
Expand Down
14 changes: 9 additions & 5 deletions lib/honeydew/queue/mnesia/wrapped_job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@ defmodule Honeydew.Queue.Mnesia.WrappedJob do
def record_name, do: @record_name
def record_fields, do: @record_fields

def new(%Job{delay_secs: delay_secs} = job) do
def new(%Job{} = job) do
id = :erlang.unique_integer()
run_at = now() + delay_secs

job = %{job | private: id}

%__MODULE__{run_at: run_at,
id: id,
job: job}
%__MODULE__{id: id, job: job}
|> reset_run_at()
end

def from_record({@record_name, {run_at, id}, job}) do
Expand All @@ -55,6 +53,12 @@ defmodule Honeydew.Queue.Mnesia.WrappedJob do
id
end

def reset_run_at(%__MODULE__{job: %Job{delay_secs: delay_secs}} = wrapped_job) do
run_at = now() + delay_secs

%__MODULE__{wrapped_job | run_at: run_at}
end

def id_pattern(id) do
%__MODULE__{
id: id,
Expand Down
45 changes: 44 additions & 1 deletion test/honeydew/queue/mnesia_queue_integration_test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
defmodule Honeydew.MnesiaQueueIntegrationTest do
use ExUnit.Case, async: false # shares doctest queue name with ErlangQueue test
alias Honeydew.Job
alias Honeydew.Queue.Mnesia.WrappedJob

@moduletag :capture_log

Expand Down Expand Up @@ -222,7 +223,7 @@ defmodule Honeydew.MnesiaQueueIntegrationTest do
assert Enum.count(monitors) < 20
end

test "resets in-progress jobs after crashing", %{queue: queue} do
test "resets in-progress jobs after restart", %{queue: queue} do
Enum.each(1..10, fn _ ->
Honeydew.async(fn -> Process.sleep(20_000) end, queue)
end)
Expand All @@ -243,6 +244,34 @@ defmodule Honeydew.MnesiaQueueIntegrationTest do
assert in_progress == 0
end

test "resets job run_at after restart", %{queue: queue} do
:ok = Honeydew.stop_workers(queue)

num_jobs = 10
delay_secs = 15

Enum.each(1..num_jobs, fn _ ->
Honeydew.async(fn -> Process.sleep(20_000) end, queue, delay_secs: delay_secs)
end)

%{queue: %{count: ^num_jobs, in_progress: 0}} = Honeydew.status(queue)

:ok = Honeydew.stop_queue(queue)
jobs = wrapped_jobs(queue)

Process.sleep(2_000) # let the monotonic clock tick
:ok = start_queue(queue)
%{queue: %{count: ^num_jobs, in_progress: 0}} = Honeydew.status(queue)

reset_jobs = wrapped_jobs(queue)

Enum.zip(jobs, reset_jobs)
|> Enum.each(fn {%WrappedJob{run_at: old_run_at}, %WrappedJob{run_at: new_run_at}} ->
refute_in_delta old_run_at, new_run_at, 1
assert_in_delta old_run_at, new_run_at, 3
end)
end

@tag :skip_worker_pool
test "when workers join a queue with existing jobs", %{queue: queue} do
%Job{} = {:send_msg, [self(), :hi]} |> Honeydew.async(queue)
Expand Down Expand Up @@ -417,4 +446,18 @@ defmodule Honeydew.MnesiaQueueIntegrationTest do

:ok
end

defp wrapped_jobs(queue_name) do
alias Honeydew.Queue.Mnesia, as: MnesiaQueue

table = MnesiaQueue.table_name(queue_name)

:mnesia.activity(:async_dirty, fn ->
:mnesia.foldl(fn wrapped_job_record, list ->
[wrapped_job_record | list]
end, [], table)
end)
|> Enum.map(&WrappedJob.from_record/1)
|> Enum.reverse()
end
end

0 comments on commit b793b72

Please sign in to comment.