Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 61 additions & 6 deletions src/eredis_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
-export([start/0, stop/0, connect/1]). % Application Management.

% Generic redis call
-export([q/1, qp/1, qw/2, qk/2, qa/1, qmn/1, transaction/1, transaction/2]).
-export([q/1, qp/1, qw/2, qk/2, qa/1, qa2/1, qmn/1, transaction/1, transaction/2]).

% Specific redis command implementation
-export([flushdb/0]).
Expand Down Expand Up @@ -89,7 +89,7 @@ transaction(Transaction, Slot, ExpectedValue, Counter) ->
-spec qmn(redis_pipeline_command()) -> redis_pipeline_result().
qmn(Commands) -> qmn(Commands, 0).

qmn(_, ?REDIS_CLUSTER_REQUEST_TTL) ->
qmn(_, ?REDIS_CLUSTER_REQUEST_TTL) ->
{error, no_connection};
qmn(Commands, Counter) ->
%% Throttle retries
Expand All @@ -106,7 +106,7 @@ qmn2([{Pool, PoolCommands} | T1], [{Pool, Mapping} | T2], Acc, Version) ->
Result = eredis_cluster_pool:transaction(Pool, Transaction),
case handle_transaction_result(Result, Version, check_pipeline_result) of
retry -> retry;
Res ->
Res ->
MappedRes = lists:zip(Mapping,Res),
qmn2(T1, T2, MappedRes ++ Acc, Version)
end;
Expand Down Expand Up @@ -190,9 +190,9 @@ query(Transaction, Slot, Counter) ->
end.

handle_transaction_result(Result, Version) ->
case Result of
case Result of
% If we detect a node went down, we should probably refresh the slot
% mapping.
% mapping.
{error, no_connection} ->
eredis_cluster_monitor:refresh_mapping(Version),
retry;
Expand Down Expand Up @@ -298,7 +298,7 @@ optimistic_locking_transaction(WatchedKey, GetCommand, UpdateFunction) ->
RedisResult = qw(Worker, [["MULTI"]] ++ UpdateCommand ++ [["EXEC"]]),
{lists:last(RedisResult), Result}
end,
case transaction(Transaction, Slot, {ok, undefined}, ?OL_TRANSACTION_TTL) of
case transaction(Transaction, Slot, {ok, undefined}, ?OL_TRANSACTION_TTL) of
{{ok, undefined}, _} ->
{error, resource_busy};
{{ok, TransactionResult}, UpdateResult} ->
Expand Down Expand Up @@ -342,6 +342,61 @@ qa(Command) ->
Transaction = fun(Worker) -> qw(Worker, Command) end,
[eredis_cluster_pool:transaction(Pool, Transaction) || Pool <- Pools].

%% =============================================================================
-spec qa2(redis_command()) -> [{atom(), redis_result()}] | {error, no_connection}.
%% @doc Perform a given query on all master nodes of a redis cluster and
%% return result with master node reference in result.
%% When connection to master failed then do refresh mapping and try again to
%% query.
%% @end
%% =============================================================================
qa2(Command) ->
qa2(Command, 0, []).

qa2(_, ?REDIS_CLUSTER_REQUEST_TTL, Res) ->
case Res of
[] ->
{error, no_connection};
_ ->
%% Return result per Pool:
Res
end;

qa2(Command, Counter, Res) ->
Pools = eredis_cluster_monitor:get_all_pools(),
case Pools of
[] ->
Version = eredis_cluster_monitor:get_state_version(),
eredis_cluster_monitor:refresh_mapping(Version),
qa2(Command, Counter + 1, Res);
_ ->
Transaction = fun(Worker) -> qw(Worker, Command) end,
%% Throttle retries
throttle_retries(Counter),
Result =
[{Pool,
eredis_cluster_pool:transaction(Pool,
Transaction)} || Pool <- Pools],
State = eredis_cluster_monitor:get_state(),
Tmp = lists:foldl(
fun({_P, TR}, Acc) ->
case handle_transaction_result(TR,
eredis_cluster_monitor:get_state_version(State))
of
retry ->
[retry|Acc];
_ ->
Acc
end
end, [], Result),
case lists:member(retry, Tmp) of
true ->
qa2(Command, Counter + 1, Result);
false ->
Result
end
end.

%% =============================================================================
%% @doc Wrapper function to be used for direct call to a pool worker in the
%% function passed to the transaction/2 method
Expand Down
10 changes: 10 additions & 0 deletions test/eredis_cluster_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,16 @@ basic_test_() ->
eredis_cluster:eval(Script, ScriptHash, ["qrs"], ["evaltest"]),
?assertEqual({ok, <<"evaltest">>}, eredis_cluster:q(["get", "qrs"]))
end
},

{ "get cluster info with pool specified in response",
fun () ->
QA2SR = eredis_cluster:qa2(["cluster", "slots"]),
?assertMatch({_Pool, {ok, _}}, lists:last(QA2SR)),
QA2Res = eredis_cluster:qa2(["get", "qrs"]),
?assertMatch({error,_},
proplists:lookup(error, [PR || {_P, PR} <- QA2Res]))
end
}

]
Expand Down