Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/coalesce handler #6

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 49 additions & 13 deletions lib/cache/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,22 @@ defmodule Cache.Registry do
end
end

def get_or_create_coalescer(key) do
GenServer.call(__MODULE__, {:get_or_create_coalescer, key})
end

def get_coalesce_pid(key) do
case GenServer.call(__MODULE__, {:get_coalesce, key}) do
{:ok, response} -> response
{:not_found} -> nil
end
end

def remove_coalesce_key(key) do
GenServer.call(__MODULE__, {:remove_coalesce, key})
end

def store({_method, _full_path, _get_params, _allowed_groups} = key, response) do
# IO.puts "Going to store new content"
# IO.inspect( key, label: "Key to store under" )
# IO.inspect( response, label: "Response to save" )
GenServer.call(__MODULE__, {:store, key, response})
end

Expand All @@ -32,11 +44,42 @@ defmodule Cache.Registry do
# GenServer API
###
def start_link(_) do
GenServer.start_link(__MODULE__, [%{cache: %{}, caches_by_key: %{}}], name: __MODULE__)
GenServer.start_link(__MODULE__, [%{cache: %{}, caches_by_key: %{}, coalesce_handlers: %{}}],
name: __MODULE__
)
end

def init(_) do
{:ok, %{cache: %{}, caches_by_key: %{}}}
{:ok, %{cache: %{}, caches_by_key: %{}, coalesce_handlers: %{}}}
end

def handle_call({:get_or_create_coalescer, key}, _from, state) do
case Map.get(state.coalesce_handlers, key, nil) do
nil ->
{:ok, pid} = Coalesce.Registry.start(%{})
new_state = put_in(state[:coalesce_handlers][key], pid)
{:reply, {:created, pid}, new_state}

pid ->
{:reply, {:attach, pid}, state}
end
end

def handle_call({:get_coalesce, key}, _from, state) do
if Map.has_key?(state.coalesce_handlers, key) do
{:reply, {:ok, Map.get(state.coalesce_handlers, key)}, state}
else
{:reply, {:not_found}, state}
end
end

def handle_call({:remove_coalesce, key}, _from, state) do
if Map.has_key?(state.coalesce_handlers, key) do
{_, new_state} = pop_in(state[:coalesce_handlers][key])
{:reply, {:ok}, new_state}
else
{:reply, {:not_found}, state}
end
end

def handle_call({:find_cache, key}, _from, state) do
Expand All @@ -48,21 +91,13 @@ defmodule Cache.Registry do
end

def handle_call({:store, request_key, response}, _from, state) do
# IO.inspect( request_key, label: "Request key" )
# IO.inspect( response, label: "Response" )

%{cache_keys: cache_keys, clear_keys: clear_keys} = response

# IO.inspect { :cache_keys, cache_keys }
# IO.inspect { :clear_keys, clear_keys }

state =
state
# update state for clear_keys
|> clear_keys!(clear_keys)

# IO.puts "Executed clear keys"

if cache_keys == [] do
{:reply, :ok, state}
else
Expand Down Expand Up @@ -96,6 +131,7 @@ defmodule Cache.Registry do
cache =
Enum.reduce(clear_keys, cache, fn clear_key, cache ->
keys_to_remove = Map.get(caches_by_key, clear_key, [])

cache = Map.drop(cache, keys_to_remove)
cache
end)
Expand Down
109 changes: 109 additions & 0 deletions lib/cache/coalesce.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
defmodule Coalesce.Registry do
@moduledoc """
Maintains coalescing requests for specific request, dependant on key.
"""

use GenServer

def add_connection(pid, connection) do
{:ok, conn} = GenServer.call(pid, {:add_conn, connection})
conn
end

def assure_status_sent(state, status) do
if not is_nil(status) and is_nil(state.status) and not is_nil(state.headers) do
# First time there was a status
conns =
state.connections
|> Enum.map(fn {conn, from} ->
conn =
Plug.Conn.merge_resp_headers(conn, state.headers)
|> Plug.Conn.send_chunked(status)

{conn, from}
end)
|> Enum.flat_map(fn {conn, from} ->
case push_body_parts(conn, state.body) do
nil -> []
conn -> [{conn, from}]
end
end)

%{state | connections: conns, status: status}
else
state
end
end

def push_body_parts(conn, body_parts) do
Enum.reduce_while(body_parts, conn, fn ch, conn ->
case Plug.Conn.chunk(conn, ch) do
{:ok, conn} -> {:cont, conn}
{:error, :closed} -> {:halt, nil}
end
end)
end

###
# GenServer API
###
def start(_) do
GenServer.start(__MODULE__, [%{}])
end

@impl true
def init(_) do
{:ok, %{connections: [], headers: nil, body: [], status: nil}}
end

@impl true
def handle_call({:add_conn, conn}, from, state) do
conn = if not is_nil(state.status) and not is_nil(state.headers) do
Plug.Conn.merge_resp_headers(conn, state.headers)
|> Plug.Conn.send_chunked(state.status)
|> push_body_parts(state.body)
else
conn
end

conns = [{conn, from} | state.connections]

new_state = %{state | connections: conns}

{:noreply, new_state}
end

@impl true
def handle_cast({:headers, headers, status}, state) do
state_with_headers = %{state | headers: headers}
new_state = assure_status_sent(state_with_headers, status)

{:noreply, new_state}
end

@impl true
def handle_cast({:chunk, data, status}, state) do
new_state = assure_status_sent(state, status)

conns =
Enum.flat_map(new_state.connections, fn {conn, from} ->
case Plug.Conn.chunk(conn, data) do
{:ok, conn} -> [{conn, from}]
{:error, :closed} -> []
end
end)

new_state = %{new_state | connections: conns}

{:noreply, new_state}
end

@impl true
def handle_cast({:finished, status}, state) do
new_state = assure_status_sent(state, status)

Enum.each(new_state.connections, fn {conn, from} -> GenServer.reply(from, {:ok, conn}) end)

{:stop, :normal, new_state}
end
end
4 changes: 3 additions & 1 deletion lib/manipulators/cache_key_logger.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ defmodule Manipulators.CacheKeyLogger do
end

defp header_value(headers, header_name) do
header = Enum.find(headers, header_name)
header =
headers
|> Enum.find(&match?({^header_name, _}, &1))

if header do
elem(header, 1)
Expand Down
50 changes: 50 additions & 0 deletions lib/manipulators/coalesce_response.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
defmodule Manipulators.CoalesceResponse do
@moduledoc """
Manipulates the response, notifying the Coalesce.Registry for this request.
"""

alias Cache.Registry, as: Cache

@behaviour ProxyManipulator

@impl true
def headers(headers, {conn_in, conn_out}) do
all_response_headers = Mint.HTTP.get_private(conn_out, :mu_cache_original_headers)

allowed_groups =
all_response_headers
|> Enum.find({nil, "[]"}, &match?({"mu-auth-allowed-groups", _}, &1))
|> elem(1)
|> Poison.decode!()

key = {conn_in.method, conn_in.request_path, conn_in.query_string, allowed_groups}

pid = Cache.get_coalesce_pid(key)

conn_out =
conn_out
|> Mint.HTTP.put_private(:coalesce_pid, pid)
|> Mint.HTTP.put_private(:coalesce_key, key)

GenServer.cast(pid, {:headers, headers, conn_in.status})

{headers, {conn_in, conn_out}}
end

@impl true
def chunk(chunk, {conn_in, conn_out}) do
pid = Mint.HTTP.get_private(conn_out, :coalesce_pid)
GenServer.cast(pid, {:chunk, chunk, conn_in.status})

:skip
end

@impl true
def finish(_, {conn_in, conn_out}) do
pid = Mint.HTTP.get_private(conn_out, :coalesce_pid)
key = Mint.HTTP.get_private(conn_out, :coalesce_key)
Cache.remove_coalesce_key(key)
GenServer.cast(pid, {:finished, conn_in.status})
:skip
end
end
4 changes: 2 additions & 2 deletions lib/manipulators/remove_cache_related_keys.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ defmodule Manipulators.RemoveCacheRelatedKeys do
@behaviour ProxyManipulator

@impl true
def headers(headers, connection) do
def headers(headers_inp, connection) do
new_headers =
headers
headers_inp
|> Enum.reject(fn
{"cache-keys", _} -> true
{"clear-keys", _} -> true
Expand Down
5 changes: 1 addition & 4 deletions lib/manipulators/store_response.ex
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ defmodule Manipulators.StoreResponse do
all_response_headers
|> Enum.find({nil, "[]"}, &match?({"mu-auth-allowed-groups", _}, &1))
|> elem(1)
|> Poison.decode!()

cache_keys =
all_response_headers
Expand All @@ -79,8 +80,6 @@ defmodule Manipulators.StoreResponse do
|> elem(1)
|> Poison.decode!()

# IO.inspect( {conn_in.method, conn_in.request_path, conn_in.query_string, allowed_groups}, label: "Signature to store" )

Cache.store(
{conn_in.method, conn_in.request_path, conn_in.query_string, allowed_groups},
%{
Expand Down Expand Up @@ -109,8 +108,6 @@ defmodule Manipulators.StoreResponse do
|> elem(1)
|> Poison.decode!()

IO.inspect(clear_keys, label: "Clear keys")

Cache.clear_keys(clear_keys)

true ->
Expand Down
17 changes: 10 additions & 7 deletions lib/mu_cache_plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ defmodule MuCachePlug do
@response_manipulators [
Manipulators.CacheKeyLogger,
Manipulators.StoreResponse,
Manipulators.RemoveCacheRelatedKeys
Manipulators.RemoveCacheRelatedKeys,

# Make sure this is the last one, coalescing responses as generated at this point
Manipulators.CoalesceResponse
]
@manipulators ProxyManipulatorSettings.make_settings(
@request_manipulators,
Expand Down Expand Up @@ -49,16 +52,16 @@ defmodule MuCachePlug do
ConnectionForwarder.forward(conn, path, "http://backend/", @manipulators)

cached_value =
Cache.find_cache({conn.method, full_path, conn.query_string, known_allowed_groups}) ->
Cache.find_cache({conn.method, full_path, conn.query_string, Poison.decode!(known_allowed_groups)}) ->
# with allowed groups and a cache, we should use the cache
respond_with_cache(conn, cached_value)

true ->
# without a cache, we should consult the backend
# IO.inspect(
# {conn.method, full_path, conn.query_string, known_allowed_groups}, label: "Cache miss for signature")

ConnectionForwarder.forward(conn, path, "http://backend/", @manipulators)
key = {conn.method, full_path, conn.query_string, Poison.decode!(known_allowed_groups)}
case Cache.get_or_create_coalescer(key) do
{:created, _pid} -> ConnectionForwarder.forward(conn, path, "http://backend/", @manipulators)
{:attach, pid} -> Coalesce.Registry.add_connection(pid, conn)
end
end
end

Expand Down
12 changes: 6 additions & 6 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
%{
"bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"},
"castore": {:hex, :castore, "0.1.9", "eb08a94c12ebff92a92d844c6ccd90728dc7662aab9bdc8b3b785ba653c499d5", [:mix], [], "hexpm", "99c3a38ad9c0bab03fee1418c98390da1a31f3b85e317db5840d51a1443d26c8"},
"cowboy": {:hex, :cowboy, "2.8.0", "f3dc62e35797ecd9ac1b50db74611193c29815401e53bac9a5c0577bd7bc667d", [:rebar3], [{:cowlib, "~> 2.9.1", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "~> 1.7.1", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "4643e4fba74ac96d4d152c75803de6fad0b3fa5df354c71afdd6cbeeb15fac8a"},
"castore": {:hex, :castore, "0.1.11", "c0665858e0e1c3e8c27178e73dffea699a5b28eb72239a3b2642d208e8594914", [:mix], [], "hexpm", "91b009ba61973b532b84f7c09ce441cba7aa15cb8b006cf06c6f4bba18220081"},
"cowboy": {:hex, :cowboy, "2.9.0", "865dd8b6607e14cf03282e10e934023a1bd8be6f6bacf921a7e2a96d800cd452", [:make, :rebar3], [{:cowlib, "2.11.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "2c729f934b4e1aa149aff882f57c6372c15399a20d54f65c8d67bef583021bde"},
"cowboy_telemetry": {:hex, :cowboy_telemetry, "0.3.1", "ebd1a1d7aff97f27c66654e78ece187abdc646992714164380d8a041eda16754", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "3a6efd3366130eab84ca372cbd4a7d3c3a97bdfcfb4911233b035d117063f0af"},
"cowlib": {:hex, :cowlib, "2.9.1", "61a6c7c50cf07fdd24b2f45b89500bb93b6686579b069a89f88cb211e1125c78", [:rebar3], [], "hexpm", "e4175dc240a70d996156160891e1c62238ede1729e45740bdd38064dad476170"},
"credo": {:hex, :credo, "1.5.5", "e8f422026f553bc3bebb81c8e8bf1932f498ca03339856c7fec63d3faac8424b", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "dd8623ab7091956a855dc9f3062486add9c52d310dfd62748779c4315d8247de"},
"cowlib": {:hex, :cowlib, "2.11.0", "0b9ff9c346629256c42ebe1eeb769a83c6cb771a6ee5960bd110ab0b9b872063", [:make, :rebar3], [], "hexpm", "2b3e9da0b21c4565751a6d4901c20d1b4cc25cbb7fd50d91d2ab6dd287bc86a9"},
"credo": {:hex, :credo, "1.5.6", "e04cc0fdc236fefbb578e0c04bd01a471081616e741d386909e527ac146016c6", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "4b52a3e558bd64e30de62a648518a5ea2b6e3e5d2b164ef5296244753fc7eb17"},
"dialyxir": {:hex, :dialyxir, "1.1.0", "c5aab0d6e71e5522e77beff7ba9e08f8e02bad90dfbeffae60eaf0cb47e29488", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "07ea8e49c45f15264ebe6d5b93799d4dd56a44036cf42d0ad9c960bc266c0b9a"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
Expand All @@ -16,6 +16,6 @@
"plug_crypto": {:hex, :plug_crypto, "1.2.2", "05654514ac717ff3a1843204b424477d9e60c143406aa94daf2274fdd280794d", [:mix], [], "hexpm", "87631c7ad914a5a445f0a3809f99b079113ae4ed4b867348dd9eec288cecb6db"},
"plug_mint_proxy": {:git, "https://github.com/madnificent/plug-mint-proxy.git", "cb52954d260117a0b0e65baa8d3f313561bc2cf7", [branch: "feature/separate-example-runner"]},
"poison": {:hex, :poison, "2.2.0", "4763b69a8a77bd77d26f477d196428b741261a761257ff1cf92753a0d4d24a63", [:mix], [], "hexpm", "519bc209e4433961284174c497c8524c001e285b79bdf80212b47a1f898084cc"},
"ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm", "451d8527787df716d99dc36162fca05934915db0b6141bbdac2ea8d3c7afc7d7"},
"telemetry": {:hex, :telemetry, "0.4.2", "2808c992455e08d6177322f14d3bdb6b625fbcfd233a73505870d8738a2f4599", [:rebar3], [], "hexpm", "2d1419bd9dda6a206d7b5852179511722e2b18812310d304620c7bd92a13fcef"},
"ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"},
"telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"},
}