diff --git a/lib/cache/cache.ex b/lib/cache/cache.ex index 201360e..fdc1a25 100644 --- a/lib/cache/cache.ex +++ b/lib/cache/cache.ex @@ -17,10 +17,22 @@ defmodule Cache.Registry do end end + def get_or_create_coalescer(key) do + GenServer.call(__MODULE__, {:get_or_create_coalescer, key}) + end + + def get_coalesce_pid(key) do + case GenServer.call(__MODULE__, {:get_coalesce, key}) do + {:ok, response} -> response + {:not_found} -> nil + end + end + + def remove_coalesce_key(key) do + GenServer.call(__MODULE__, {:remove_coalesce, key}) + end + def store({_method, _full_path, _get_params, _allowed_groups} = key, response) do - # IO.puts "Going to store new content" - # IO.inspect( key, label: "Key to store under" ) - # IO.inspect( response, label: "Response to save" ) GenServer.call(__MODULE__, {:store, key, response}) end @@ -32,11 +44,42 @@ defmodule Cache.Registry do # GenServer API ### def start_link(_) do - GenServer.start_link(__MODULE__, [%{cache: %{}, caches_by_key: %{}}], name: __MODULE__) + GenServer.start_link(__MODULE__, [%{cache: %{}, caches_by_key: %{}, coalesce_handlers: %{}}], + name: __MODULE__ + ) end def init(_) do - {:ok, %{cache: %{}, caches_by_key: %{}}} + {:ok, %{cache: %{}, caches_by_key: %{}, coalesce_handlers: %{}}} + end + + def handle_call({:get_or_create_coalescer, key}, _from, state) do + case Map.get(state.coalesce_handlers, key, nil) do + nil -> + {:ok, pid} = Coalesce.Registry.start(%{}) + new_state = put_in(state[:coalesce_handlers][key], pid) + {:reply, {:created, pid}, new_state} + + pid -> + {:reply, {:attach, pid}, state} + end + end + + def handle_call({:get_coalesce, key}, _from, state) do + if Map.has_key?(state.coalesce_handlers, key) do + {:reply, {:ok, Map.get(state.coalesce_handlers, key)}, state} + else + {:reply, {:not_found}, state} + end + end + + def handle_call({:remove_coalesce, key}, _from, state) do + if Map.has_key?(state.coalesce_handlers, key) do + {_, new_state} = pop_in(state[:coalesce_handlers][key]) + {:reply, {:ok}, new_state} + else + {:reply, {:not_found}, state} + end end def handle_call({:find_cache, key}, _from, state) do @@ -48,21 +91,13 @@ defmodule Cache.Registry do end def handle_call({:store, request_key, response}, _from, state) do - # IO.inspect( request_key, label: "Request key" ) - # IO.inspect( response, label: "Response" ) - %{cache_keys: cache_keys, clear_keys: clear_keys} = response - # IO.inspect { :cache_keys, cache_keys } - # IO.inspect { :clear_keys, clear_keys } - state = state # update state for clear_keys |> clear_keys!(clear_keys) - # IO.puts "Executed clear keys" - if cache_keys == [] do {:reply, :ok, state} else @@ -96,6 +131,7 @@ defmodule Cache.Registry do cache = Enum.reduce(clear_keys, cache, fn clear_key, cache -> keys_to_remove = Map.get(caches_by_key, clear_key, []) + cache = Map.drop(cache, keys_to_remove) cache end) diff --git a/lib/cache/coalesce.ex b/lib/cache/coalesce.ex new file mode 100644 index 0000000..102cb67 --- /dev/null +++ b/lib/cache/coalesce.ex @@ -0,0 +1,109 @@ +defmodule Coalesce.Registry do + @moduledoc """ + Maintains coalescing requests for specific request, dependant on key. + """ + + use GenServer + + def add_connection(pid, connection) do + {:ok, conn} = GenServer.call(pid, {:add_conn, connection}) + conn + end + + def assure_status_sent(state, status) do + if not is_nil(status) and is_nil(state.status) and not is_nil(state.headers) do + # First time there was a status + conns = + state.connections + |> Enum.map(fn {conn, from} -> + conn = + Plug.Conn.merge_resp_headers(conn, state.headers) + |> Plug.Conn.send_chunked(status) + + {conn, from} + end) + |> Enum.flat_map(fn {conn, from} -> + case push_body_parts(conn, state.body) do + nil -> [] + conn -> [{conn, from}] + end + end) + + %{state | connections: conns, status: status} + else + state + end + end + + def push_body_parts(conn, body_parts) do + Enum.reduce_while(body_parts, conn, fn ch, conn -> + case Plug.Conn.chunk(conn, ch) do + {:ok, conn} -> {:cont, conn} + {:error, :closed} -> {:halt, nil} + end + end) + end + + ### + # GenServer API + ### + def start(_) do + GenServer.start(__MODULE__, [%{}]) + end + + @impl true + def init(_) do + {:ok, %{connections: [], headers: nil, body: [], status: nil}} + end + + @impl true + def handle_call({:add_conn, conn}, from, state) do + conn = if not is_nil(state.status) and not is_nil(state.headers) do + Plug.Conn.merge_resp_headers(conn, state.headers) + |> Plug.Conn.send_chunked(state.status) + |> push_body_parts(state.body) + else + conn + end + + conns = [{conn, from} | state.connections] + + new_state = %{state | connections: conns} + + {:noreply, new_state} + end + + @impl true + def handle_cast({:headers, headers, status}, state) do + state_with_headers = %{state | headers: headers} + new_state = assure_status_sent(state_with_headers, status) + + {:noreply, new_state} + end + + @impl true + def handle_cast({:chunk, data, status}, state) do + new_state = assure_status_sent(state, status) + + conns = + Enum.flat_map(new_state.connections, fn {conn, from} -> + case Plug.Conn.chunk(conn, data) do + {:ok, conn} -> [{conn, from}] + {:error, :closed} -> [] + end + end) + + new_state = %{new_state | connections: conns} + + {:noreply, new_state} + end + + @impl true + def handle_cast({:finished, status}, state) do + new_state = assure_status_sent(state, status) + + Enum.each(new_state.connections, fn {conn, from} -> GenServer.reply(from, {:ok, conn}) end) + + {:stop, :normal, new_state} + end +end diff --git a/lib/manipulators/cache_key_logger.ex b/lib/manipulators/cache_key_logger.ex index cdc5e54..b28799e 100644 --- a/lib/manipulators/cache_key_logger.ex +++ b/lib/manipulators/cache_key_logger.ex @@ -38,7 +38,9 @@ defmodule Manipulators.CacheKeyLogger do end defp header_value(headers, header_name) do - header = Enum.find(headers, header_name) + header = + headers + |> Enum.find(&match?({^header_name, _}, &1)) if header do elem(header, 1) diff --git a/lib/manipulators/coalesce_response.ex b/lib/manipulators/coalesce_response.ex new file mode 100644 index 0000000..1ae1f17 --- /dev/null +++ b/lib/manipulators/coalesce_response.ex @@ -0,0 +1,50 @@ +defmodule Manipulators.CoalesceResponse do + @moduledoc """ + Manipulates the response, notifying the Coalesce.Registry for this request. + """ + + alias Cache.Registry, as: Cache + + @behaviour ProxyManipulator + + @impl true + def headers(headers, {conn_in, conn_out}) do + all_response_headers = Mint.HTTP.get_private(conn_out, :mu_cache_original_headers) + + allowed_groups = + all_response_headers + |> Enum.find({nil, "[]"}, &match?({"mu-auth-allowed-groups", _}, &1)) + |> elem(1) + |> Poison.decode!() + + key = {conn_in.method, conn_in.request_path, conn_in.query_string, allowed_groups} + + pid = Cache.get_coalesce_pid(key) + + conn_out = + conn_out + |> Mint.HTTP.put_private(:coalesce_pid, pid) + |> Mint.HTTP.put_private(:coalesce_key, key) + + GenServer.cast(pid, {:headers, headers, conn_in.status}) + + {headers, {conn_in, conn_out}} + end + + @impl true + def chunk(chunk, {conn_in, conn_out}) do + pid = Mint.HTTP.get_private(conn_out, :coalesce_pid) + GenServer.cast(pid, {:chunk, chunk, conn_in.status}) + + :skip + end + + @impl true + def finish(_, {conn_in, conn_out}) do + pid = Mint.HTTP.get_private(conn_out, :coalesce_pid) + key = Mint.HTTP.get_private(conn_out, :coalesce_key) + Cache.remove_coalesce_key(key) + GenServer.cast(pid, {:finished, conn_in.status}) + :skip + end +end diff --git a/lib/manipulators/remove_cache_related_keys.ex b/lib/manipulators/remove_cache_related_keys.ex index 06d070d..37f0ced 100644 --- a/lib/manipulators/remove_cache_related_keys.ex +++ b/lib/manipulators/remove_cache_related_keys.ex @@ -9,9 +9,9 @@ defmodule Manipulators.RemoveCacheRelatedKeys do @behaviour ProxyManipulator @impl true - def headers(headers, connection) do + def headers(headers_inp, connection) do new_headers = - headers + headers_inp |> Enum.reject(fn {"cache-keys", _} -> true {"clear-keys", _} -> true diff --git a/lib/manipulators/store_response.ex b/lib/manipulators/store_response.ex index 3d462e4..3dac912 100644 --- a/lib/manipulators/store_response.ex +++ b/lib/manipulators/store_response.ex @@ -66,6 +66,7 @@ defmodule Manipulators.StoreResponse do all_response_headers |> Enum.find({nil, "[]"}, &match?({"mu-auth-allowed-groups", _}, &1)) |> elem(1) + |> Poison.decode!() cache_keys = all_response_headers @@ -79,8 +80,6 @@ defmodule Manipulators.StoreResponse do |> elem(1) |> Poison.decode!() - # IO.inspect( {conn_in.method, conn_in.request_path, conn_in.query_string, allowed_groups}, label: "Signature to store" ) - Cache.store( {conn_in.method, conn_in.request_path, conn_in.query_string, allowed_groups}, %{ @@ -109,8 +108,6 @@ defmodule Manipulators.StoreResponse do |> elem(1) |> Poison.decode!() - IO.inspect(clear_keys, label: "Clear keys") - Cache.clear_keys(clear_keys) true -> diff --git a/lib/mu_cache_plug.ex b/lib/mu_cache_plug.ex index 9a60a20..1b60fc2 100644 --- a/lib/mu_cache_plug.ex +++ b/lib/mu_cache_plug.ex @@ -14,7 +14,10 @@ defmodule MuCachePlug do @response_manipulators [ Manipulators.CacheKeyLogger, Manipulators.StoreResponse, - Manipulators.RemoveCacheRelatedKeys + Manipulators.RemoveCacheRelatedKeys, + + # Make sure this is the last one, coalescing responses as generated at this point + Manipulators.CoalesceResponse ] @manipulators ProxyManipulatorSettings.make_settings( @request_manipulators, @@ -49,16 +52,16 @@ defmodule MuCachePlug do ConnectionForwarder.forward(conn, path, "http://backend/", @manipulators) cached_value = - Cache.find_cache({conn.method, full_path, conn.query_string, known_allowed_groups}) -> + Cache.find_cache({conn.method, full_path, conn.query_string, Poison.decode!(known_allowed_groups)}) -> # with allowed groups and a cache, we should use the cache respond_with_cache(conn, cached_value) true -> - # without a cache, we should consult the backend - # IO.inspect( - # {conn.method, full_path, conn.query_string, known_allowed_groups}, label: "Cache miss for signature") - - ConnectionForwarder.forward(conn, path, "http://backend/", @manipulators) + key = {conn.method, full_path, conn.query_string, Poison.decode!(known_allowed_groups)} + case Cache.get_or_create_coalescer(key) do + {:created, _pid} -> ConnectionForwarder.forward(conn, path, "http://backend/", @manipulators) + {:attach, pid} -> Coalesce.Registry.add_connection(pid, conn) + end end end diff --git a/mix.lock b/mix.lock index 7715dc6..b1edd8b 100644 --- a/mix.lock +++ b/mix.lock @@ -1,10 +1,10 @@ %{ "bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"}, - "castore": {:hex, :castore, "0.1.9", "eb08a94c12ebff92a92d844c6ccd90728dc7662aab9bdc8b3b785ba653c499d5", [:mix], [], "hexpm", "99c3a38ad9c0bab03fee1418c98390da1a31f3b85e317db5840d51a1443d26c8"}, - "cowboy": {:hex, :cowboy, "2.8.0", "f3dc62e35797ecd9ac1b50db74611193c29815401e53bac9a5c0577bd7bc667d", [:rebar3], [{:cowlib, "~> 2.9.1", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "~> 1.7.1", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "4643e4fba74ac96d4d152c75803de6fad0b3fa5df354c71afdd6cbeeb15fac8a"}, + "castore": {:hex, :castore, "0.1.11", "c0665858e0e1c3e8c27178e73dffea699a5b28eb72239a3b2642d208e8594914", [:mix], [], "hexpm", "91b009ba61973b532b84f7c09ce441cba7aa15cb8b006cf06c6f4bba18220081"}, + "cowboy": {:hex, :cowboy, "2.9.0", "865dd8b6607e14cf03282e10e934023a1bd8be6f6bacf921a7e2a96d800cd452", [:make, :rebar3], [{:cowlib, "2.11.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "2c729f934b4e1aa149aff882f57c6372c15399a20d54f65c8d67bef583021bde"}, "cowboy_telemetry": {:hex, :cowboy_telemetry, "0.3.1", "ebd1a1d7aff97f27c66654e78ece187abdc646992714164380d8a041eda16754", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "3a6efd3366130eab84ca372cbd4a7d3c3a97bdfcfb4911233b035d117063f0af"}, - "cowlib": {:hex, :cowlib, "2.9.1", "61a6c7c50cf07fdd24b2f45b89500bb93b6686579b069a89f88cb211e1125c78", [:rebar3], [], "hexpm", "e4175dc240a70d996156160891e1c62238ede1729e45740bdd38064dad476170"}, - "credo": {:hex, :credo, "1.5.5", "e8f422026f553bc3bebb81c8e8bf1932f498ca03339856c7fec63d3faac8424b", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "dd8623ab7091956a855dc9f3062486add9c52d310dfd62748779c4315d8247de"}, + "cowlib": {:hex, :cowlib, "2.11.0", "0b9ff9c346629256c42ebe1eeb769a83c6cb771a6ee5960bd110ab0b9b872063", [:make, :rebar3], [], "hexpm", "2b3e9da0b21c4565751a6d4901c20d1b4cc25cbb7fd50d91d2ab6dd287bc86a9"}, + "credo": {:hex, :credo, "1.5.6", "e04cc0fdc236fefbb578e0c04bd01a471081616e741d386909e527ac146016c6", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "4b52a3e558bd64e30de62a648518a5ea2b6e3e5d2b164ef5296244753fc7eb17"}, "dialyxir": {:hex, :dialyxir, "1.1.0", "c5aab0d6e71e5522e77beff7ba9e08f8e02bad90dfbeffae60eaf0cb47e29488", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "07ea8e49c45f15264ebe6d5b93799d4dd56a44036cf42d0ad9c960bc266c0b9a"}, "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, @@ -16,6 +16,6 @@ "plug_crypto": {:hex, :plug_crypto, "1.2.2", "05654514ac717ff3a1843204b424477d9e60c143406aa94daf2274fdd280794d", [:mix], [], "hexpm", "87631c7ad914a5a445f0a3809f99b079113ae4ed4b867348dd9eec288cecb6db"}, "plug_mint_proxy": {:git, "https://github.com/madnificent/plug-mint-proxy.git", "cb52954d260117a0b0e65baa8d3f313561bc2cf7", [branch: "feature/separate-example-runner"]}, "poison": {:hex, :poison, "2.2.0", "4763b69a8a77bd77d26f477d196428b741261a761257ff1cf92753a0d4d24a63", [:mix], [], "hexpm", "519bc209e4433961284174c497c8524c001e285b79bdf80212b47a1f898084cc"}, - "ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm", "451d8527787df716d99dc36162fca05934915db0b6141bbdac2ea8d3c7afc7d7"}, - "telemetry": {:hex, :telemetry, "0.4.2", "2808c992455e08d6177322f14d3bdb6b625fbcfd233a73505870d8738a2f4599", [:rebar3], [], "hexpm", "2d1419bd9dda6a206d7b5852179511722e2b18812310d304620c7bd92a13fcef"}, + "ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"}, + "telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"}, }