From b793b72f5561fe83df00b64ea3e65c6f0ccf6a7a Mon Sep 17 00:00:00 2001 From: Michael Shapiro Date: Wed, 19 Feb 2020 17:32:31 -0800 Subject: [PATCH] Reset mnesia queue's run_at when queue restarts, issue #99. --- lib/honeydew/queue/mnesia.ex | 39 ++++++++++++++-- lib/honeydew/queue/mnesia/wrapped_job.ex | 14 +++--- .../queue/mnesia_queue_integration_test.exs | 45 ++++++++++++++++++- 3 files changed, 88 insertions(+), 10 deletions(-) diff --git a/lib/honeydew/queue/mnesia.ex b/lib/honeydew/queue/mnesia.ex index be890f9..2d879b0 100644 --- a/lib/honeydew/queue/mnesia.ex +++ b/lib/honeydew/queue/mnesia.ex @@ -67,7 +67,7 @@ defmodule Honeydew.Queue.Mnesia do ]) # inspect/1 here becase queue_name can be of the form {:global, poolname} - table = ["honeydew", inspect(queue_name)] |> Enum.join("_") |> String.to_atom + table = table_name(queue_name) in_progress_table = ["honeydew", inspect(queue_name), "in_progress"] |> Enum.join("_") |> String.to_atom tables = %{table => [type: :ordered_set], @@ -94,7 +94,7 @@ defmodule Honeydew.Queue.Mnesia do in_progress_table: in_progress_table, access_context: access_context(table_opts)} - :ok = reset_after_crash(state) + :ok = reset_after_shutdown(state) time_warp_mode_warning() poll() @@ -232,7 +232,14 @@ defmodule Honeydew.Queue.Mnesia do {reply, state} end - defp reset_after_crash(%PState{in_progress_table: in_progress_table} = state) do + defp reset_after_shutdown(state) do + reset_all_in_progress_jobs(state) + reset_all_pending_jobs(state) + + :ok + end + + defp reset_all_in_progress_jobs(%PState{in_progress_table: in_progress_table} = state) do in_progress_table |> :mnesia.dirty_first() |> case do @@ -244,8 +251,26 @@ defmodule Honeydew.Queue.Mnesia do |> WrappedJob.id_from_key |> move_to_pending_table(%{}, state) - reset_after_crash(state) + reset_all_in_progress_jobs(state) end + + :ok + end + + defp reset_all_pending_jobs(%PState{access_context: access_context, table: table}) do + :mnesia.activity(access_context, fn -> + :mnesia.foldl(fn wrapped_job_record, _ -> + new_wrapped_job_record = + wrapped_job_record + |> WrappedJob.from_record() + |> WrappedJob.reset_run_at() + |> WrappedJob.to_record() + + :ok = :mnesia.delete_object(table, wrapped_job_record, :write) + :ok = :mnesia.write(table, new_wrapped_job_record, :write) + end, [], table) + end) + :ok end @@ -321,6 +346,12 @@ defmodule Honeydew.Queue.Mnesia do {:noreply, Queue.dispatch(queue_state)} end + def table_name(queue_name) do + ["honeydew", inspect(queue_name)] + |> Enum.join("_") + |> String.to_atom() + end + defp poll do {:ok, _} = :timer.send_after(@poll_interval, :__poll__) end diff --git a/lib/honeydew/queue/mnesia/wrapped_job.ex b/lib/honeydew/queue/mnesia/wrapped_job.ex index 8ec71d6..3d071b7 100644 --- a/lib/honeydew/queue/mnesia/wrapped_job.ex +++ b/lib/honeydew/queue/mnesia/wrapped_job.ex @@ -20,15 +20,13 @@ defmodule Honeydew.Queue.Mnesia.WrappedJob do def record_name, do: @record_name def record_fields, do: @record_fields - def new(%Job{delay_secs: delay_secs} = job) do + def new(%Job{} = job) do id = :erlang.unique_integer() - run_at = now() + delay_secs job = %{job | private: id} - %__MODULE__{run_at: run_at, - id: id, - job: job} + %__MODULE__{id: id, job: job} + |> reset_run_at() end def from_record({@record_name, {run_at, id}, job}) do @@ -55,6 +53,12 @@ defmodule Honeydew.Queue.Mnesia.WrappedJob do id end + def reset_run_at(%__MODULE__{job: %Job{delay_secs: delay_secs}} = wrapped_job) do + run_at = now() + delay_secs + + %__MODULE__{wrapped_job | run_at: run_at} + end + def id_pattern(id) do %__MODULE__{ id: id, diff --git a/test/honeydew/queue/mnesia_queue_integration_test.exs b/test/honeydew/queue/mnesia_queue_integration_test.exs index c2dbf58..1e74dca 100644 --- a/test/honeydew/queue/mnesia_queue_integration_test.exs +++ b/test/honeydew/queue/mnesia_queue_integration_test.exs @@ -1,6 +1,7 @@ defmodule Honeydew.MnesiaQueueIntegrationTest do use ExUnit.Case, async: false # shares doctest queue name with ErlangQueue test alias Honeydew.Job + alias Honeydew.Queue.Mnesia.WrappedJob @moduletag :capture_log @@ -222,7 +223,7 @@ defmodule Honeydew.MnesiaQueueIntegrationTest do assert Enum.count(monitors) < 20 end - test "resets in-progress jobs after crashing", %{queue: queue} do + test "resets in-progress jobs after restart", %{queue: queue} do Enum.each(1..10, fn _ -> Honeydew.async(fn -> Process.sleep(20_000) end, queue) end) @@ -243,6 +244,34 @@ defmodule Honeydew.MnesiaQueueIntegrationTest do assert in_progress == 0 end + test "resets job run_at after restart", %{queue: queue} do + :ok = Honeydew.stop_workers(queue) + + num_jobs = 10 + delay_secs = 15 + + Enum.each(1..num_jobs, fn _ -> + Honeydew.async(fn -> Process.sleep(20_000) end, queue, delay_secs: delay_secs) + end) + + %{queue: %{count: ^num_jobs, in_progress: 0}} = Honeydew.status(queue) + + :ok = Honeydew.stop_queue(queue) + jobs = wrapped_jobs(queue) + + Process.sleep(2_000) # let the monotonic clock tick + :ok = start_queue(queue) + %{queue: %{count: ^num_jobs, in_progress: 0}} = Honeydew.status(queue) + + reset_jobs = wrapped_jobs(queue) + + Enum.zip(jobs, reset_jobs) + |> Enum.each(fn {%WrappedJob{run_at: old_run_at}, %WrappedJob{run_at: new_run_at}} -> + refute_in_delta old_run_at, new_run_at, 1 + assert_in_delta old_run_at, new_run_at, 3 + end) + end + @tag :skip_worker_pool test "when workers join a queue with existing jobs", %{queue: queue} do %Job{} = {:send_msg, [self(), :hi]} |> Honeydew.async(queue) @@ -417,4 +446,18 @@ defmodule Honeydew.MnesiaQueueIntegrationTest do :ok end + + defp wrapped_jobs(queue_name) do + alias Honeydew.Queue.Mnesia, as: MnesiaQueue + + table = MnesiaQueue.table_name(queue_name) + + :mnesia.activity(:async_dirty, fn -> + :mnesia.foldl(fn wrapped_job_record, list -> + [wrapped_job_record | list] + end, [], table) + end) + |> Enum.map(&WrappedJob.from_record/1) + |> Enum.reverse() + end end