diff --git a/src/eredis_cluster.erl b/src/eredis_cluster.erl index 1da3a9f..19badec 100644 --- a/src/eredis_cluster.erl +++ b/src/eredis_cluster.erl @@ -9,7 +9,7 @@ -export([start/0, stop/0, connect/1]). % Application Management. % Generic redis call --export([q/1, qp/1, qw/2, qk/2, qa/1, qmn/1, transaction/1, transaction/2]). +-export([q/1, qp/1, qw/2, qk/2, qa/1, qa2/1, qmn/1, transaction/1, transaction/2]). % Specific redis command implementation -export([flushdb/0]). @@ -89,7 +89,7 @@ transaction(Transaction, Slot, ExpectedValue, Counter) -> -spec qmn(redis_pipeline_command()) -> redis_pipeline_result(). qmn(Commands) -> qmn(Commands, 0). -qmn(_, ?REDIS_CLUSTER_REQUEST_TTL) -> +qmn(_, ?REDIS_CLUSTER_REQUEST_TTL) -> {error, no_connection}; qmn(Commands, Counter) -> %% Throttle retries @@ -106,7 +106,7 @@ qmn2([{Pool, PoolCommands} | T1], [{Pool, Mapping} | T2], Acc, Version) -> Result = eredis_cluster_pool:transaction(Pool, Transaction), case handle_transaction_result(Result, Version, check_pipeline_result) of retry -> retry; - Res -> + Res -> MappedRes = lists:zip(Mapping,Res), qmn2(T1, T2, MappedRes ++ Acc, Version) end; @@ -190,9 +190,9 @@ query(Transaction, Slot, Counter) -> end. handle_transaction_result(Result, Version) -> - case Result of + case Result of % If we detect a node went down, we should probably refresh the slot - % mapping. + % mapping. {error, no_connection} -> eredis_cluster_monitor:refresh_mapping(Version), retry; @@ -298,7 +298,7 @@ optimistic_locking_transaction(WatchedKey, GetCommand, UpdateFunction) -> RedisResult = qw(Worker, [["MULTI"]] ++ UpdateCommand ++ [["EXEC"]]), {lists:last(RedisResult), Result} end, - case transaction(Transaction, Slot, {ok, undefined}, ?OL_TRANSACTION_TTL) of + case transaction(Transaction, Slot, {ok, undefined}, ?OL_TRANSACTION_TTL) of {{ok, undefined}, _} -> {error, resource_busy}; {{ok, TransactionResult}, UpdateResult} -> @@ -342,6 +342,61 @@ qa(Command) -> Transaction = fun(Worker) -> qw(Worker, Command) end, [eredis_cluster_pool:transaction(Pool, Transaction) || Pool <- Pools]. +%% ============================================================================= +-spec qa2(redis_command()) -> [{atom(), redis_result()}] | {error, no_connection}. +%% @doc Perform a given query on all master nodes of a redis cluster and +%% return result with master node reference in result. +%% When connection to master failed then do refresh mapping and try again to +%% query. +%% @end +%% ============================================================================= +qa2(Command) -> + qa2(Command, 0, []). + +qa2(_, ?REDIS_CLUSTER_REQUEST_TTL, Res) -> + case Res of + [] -> + {error, no_connection}; + _ -> + %% Return result per Pool: + Res + end; + +qa2(Command, Counter, Res) -> + Pools = eredis_cluster_monitor:get_all_pools(), + case Pools of + [] -> + Version = eredis_cluster_monitor:get_state_version(), + eredis_cluster_monitor:refresh_mapping(Version), + qa2(Command, Counter + 1, Res); + _ -> + Transaction = fun(Worker) -> qw(Worker, Command) end, + %% Throttle retries + throttle_retries(Counter), + Result = + [{Pool, + eredis_cluster_pool:transaction(Pool, + Transaction)} || Pool <- Pools], + State = eredis_cluster_monitor:get_state(), + Tmp = lists:foldl( + fun({_P, TR}, Acc) -> + case handle_transaction_result(TR, + eredis_cluster_monitor:get_state_version(State)) + of + retry -> + [retry|Acc]; + _ -> + Acc + end + end, [], Result), + case lists:member(retry, Tmp) of + true -> + qa2(Command, Counter + 1, Result); + false -> + Result + end + end. + %% ============================================================================= %% @doc Wrapper function to be used for direct call to a pool worker in the %% function passed to the transaction/2 method diff --git a/test/eredis_cluster_tests.erl b/test/eredis_cluster_tests.erl index 7b8a7ce..012377c 100644 --- a/test/eredis_cluster_tests.erl +++ b/test/eredis_cluster_tests.erl @@ -163,6 +163,16 @@ basic_test_() -> eredis_cluster:eval(Script, ScriptHash, ["qrs"], ["evaltest"]), ?assertEqual({ok, <<"evaltest">>}, eredis_cluster:q(["get", "qrs"])) end + }, + + { "get cluster info with pool specified in response", + fun () -> + QA2SR = eredis_cluster:qa2(["cluster", "slots"]), + ?assertMatch({_Pool, {ok, _}}, lists:last(QA2SR)), + QA2Res = eredis_cluster:qa2(["get", "qrs"]), + ?assertMatch({error,_}, + proplists:lookup(error, [PR || {_P, PR} <- QA2Res])) + end } ]