Skip to content

Commit

Permalink
[fixup] Add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pgj committed Feb 29, 2024
1 parent b9630ea commit 15f38bf
Show file tree
Hide file tree
Showing 9 changed files with 1,731 additions and 7 deletions.
146 changes: 146 additions & 0 deletions src/couch_replicator/src/couch_replicator_fabric.erl
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,149 @@ maybe_fetch_and_filter_doc(Id, undecided, State) ->
end;
maybe_fetch_and_filter_doc(_Id, Doc, _State) ->
Doc.

-ifdef(TEST).

-include_lib("couch/include/couch_eunit.hrl").

handle_message_test_() ->
{
foreach,
fun() ->
meck:new(foo, [non_strict]),
meck:new(fabric_view)
end,
fun(_) -> meck:unload() end,
[
?TDEF_FE(t_handle_message_rexi_down),
?TDEF_FE(t_handle_message_rexi_exit),
?TDEF_FE(t_handle_message_meta_zero),
?TDEF_FE(t_handle_message_meta),
?TDEF_FE(t_handle_message_row_skip),
?TDEF_FE(t_handle_message_row),
?TDEF_FE(t_handle_message_complete)
]
}.

t_handle_message_rexi_down(_) ->
Message = {rexi_DOWN, undefined, {undefined, node}, undefined},
meck:expect(fabric_view, check_down_shards, [state, node], meck:val(fabric_view_result)),
?assertEqual(fabric_view_result, handle_message(Message, source, state)).

t_handle_message_rexi_exit(_) ->
Message = {rexi_EXIT, reason},
meck:expect(
fabric_view, handle_worker_exit, [state, source, reason], meck:val(fabric_view_result)
),
?assertEqual(fabric_view_result, handle_message(Message, source, state)).

t_handle_message_meta_zero(_) ->
Meta = [{total, 3}, {offset, 2}, {update_seq, 1}],
Worker = {worker1, from},
Counters1 = [{worker1, 0}, {worker2, 0}],
Counters2 = [{worker1, 1}, {worker2, 0}],
State1 = #collector{counters = Counters1, total_rows = 0, update_seq = nil, offset = 0},
State2 = #collector{counters = Counters2, total_rows = 3, update_seq = nil, offset = 2},
meck:expect(rexi, stream_ack, [from], meck:val(ok)),
?assertEqual({ok, State2}, handle_message({meta, Meta}, Worker, State1)).

t_handle_message_meta(_) ->
Meta1 = [{total, 10}, {offset, 2}],
Meta2 = [{total, 10}, {offset, 5}],
Meta3 = [{total, 10}, {offset, 5}],
Worker = {worker1, from},
Counters1 = [{worker1, 0}, {worker2, 3}, {worker3, 5}],
Counters2 = [{worker1, 0}, {worker2, 2}, {worker3, 4}],
State1 = #collector{
counters = Counters1,
total_rows = 0,
update_seq = [],
offset = 0,
skip = 3,
callback = fun foo:bar/2,
user_acc = accumulator1
},
State2 = #collector{
counters = Counters1,
total_rows = 0,
update_seq = nil,
offset = 0,
skip = 3,
callback = fun foo:bar/2,
user_acc = accumulator2
},
State3 = #collector{
counters = Counters2,
total_rows = 10,
update_seq = [],
offset = 5,
skip = 3,
callback = fun foo:bar/2,
user_acc = updated_accumulator1
},
State4 = #collector{
counters = Counters2,
total_rows = 10,
update_seq = nil,
offset = 5,
skip = 3,
callback = fun foo:bar/2,
user_acc = updated_accumulator2
},
meck:expect(
foo,
bar,
[
{[{meta, Meta2}, accumulator1], meck:val({go1, updated_accumulator1})},
{[{meta, Meta3}, accumulator2], meck:val({go2, updated_accumulator2})}
]
),
meck:expect(rexi, stream_ack, [from], meck:val(ok)),
?assertEqual({go1, State3}, handle_message({meta, Meta1}, Worker, State1)),
?assertEqual({go2, State4}, handle_message({meta, Meta1}, Worker, State2)).

t_handle_message_row_skip(_) ->
State = #collector{db_name = db, query_args = #mrargs{}},
Worker = {worker, from},
Row1 = #view_row{id = id, doc = undecided},
Row2 = {view_row, #{id => id, doc => undecided}},
meck:expect(couch_replicator, active_doc, [db, id], meck:val({error, not_found})),
meck:expect(rexi, stream_ack, [from], meck:val(ok)),
?assertEqual({ok, State}, handle_message(Row1, Worker, State)),
?assertEqual({ok, State}, handle_message(Row2, Worker, State)).

t_handle_message_row(_) ->
QueryArgs = #mrargs{direction = fwd, extra = [{filter_states, []}]},
Counters1 = [{worker, 4}],
Counters2 = [{worker, 5}],
Worker = {worker, from},
Doc = {[{<<"_id">>, doc_id}, {<<"_rev">>, doc_rev}]},
Row1 = #view_row{id = id, doc = Doc},
Row11 = #view_row{id = id, doc = Doc, worker = Worker},
Row2 = {view_row, #{id => id, doc => undecided}},
Row21 = {view_row, #{id => id, doc => Doc, worker => Worker}},
Rows1 = [],
Rows2 = [],
Rows3 = [Row11],
Rows4 = [Row21],
State1 = #collector{db_name = db, query_args = QueryArgs, counters = Counters1, rows = Rows1},
State2 = #collector{db_name = db, query_args = QueryArgs, counters = Counters1, rows = Rows2},
State3 = #collector{db_name = db, query_args = QueryArgs, counters = Counters2, rows = Rows3},
State4 = #collector{db_name = db, query_args = QueryArgs, counters = Counters2, rows = Rows4},
meck:expect(couch_replicator, active_doc, [db, id], meck:val({ok, Doc})),
meck:expect(fabric_view, maybe_send_row, [
{[State3], meck:val(next_row1)}, {[State4], meck:val(next_row2)}
]),
?assertEqual(next_row1, handle_message(Row1, Worker, State1)),
?assertEqual(next_row2, handle_message(Row2, Worker, State2)).

t_handle_message_complete(_) ->
Worker = worker,
Counters1 = [{Worker, 6}],
Counters2 = [{Worker, 7}],
State1 = #collector{counters = Counters1},
State2 = #collector{counters = Counters2},
meck:expect(fabric_view, maybe_send_row, [State2], meck:val(maybe_row)),
?assertEqual(maybe_row, handle_message(complete, Worker, State1)).

-endif.
106 changes: 106 additions & 0 deletions src/couch_replicator/src/couch_replicator_fabric_rpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,109 @@ rep_doc_state(Shard, Id, {[_ | _]} = Doc, States, HealthThreshold) ->

get_doc_state({Props}) ->
couch_util:get_value(state, Props).

-ifdef(TEST).

-include_lib("couch/include/couch_eunit.hrl").

docs_test_() ->
{
foreach,
fun() -> ok end,
fun(_) -> ok end,
[
?TDEF_FE(t_docs)
]
}.

t_docs(_) ->
Options1 = [],
Options2 = [{io_priority, priority}],
QueryArgs1 = #mrargs{skip = 3, limit = 7, extra = extra},
QueryArgs2 = #mrargs{skip = 0, limit = 10, extra = extra},
Accumulator = {db_name, health_threshold, extra},
meck:expect(couch_replicator_scheduler, health_threshold, [], meck:val(health_threshold)),
meck:expect(couch_db, open_int, [db_name, '_'], meck:val({ok, db})),
meck:expect(
couch_mrview,
query_all_docs,
[db, QueryArgs2, '_', Accumulator],
meck:val(all_docs)
),
?assertEqual(all_docs, docs(db_name, Options1, QueryArgs1)),
IoPrio1 = get(io_priority),
?assertEqual({interactive, db_name}, IoPrio1),
?assertEqual(all_docs, docs(db_name, Options2, QueryArgs1)),
IoPrio2 = get(io_priority),
?assertEqual(priority, IoPrio2).

docs_cb_test_() ->
{
foreach,
fun() -> ok end,
fun(_) -> ok end,
[
?TDEF_FE(t_docs_cb_meta),
?TDEF_FE(t_docs_cb_row_skip),
?TDEF_FE(t_docs_cb_row),
?TDEF_FE(t_docs_cb_complete)
]
}.

t_docs_cb_meta(_) ->
Meta = {meta, meta},
meck:expect(rexi, stream2, [Meta], meck:val(ok)),
?assertEqual({ok, accumulator}, docs_cb(Meta, accumulator)).

t_docs_cb_row_skip(_) ->
Accumulator1 = {db_name, health_threshold, []},
Accumulator2 = {db_name, health_threshold, [{view_row_map, true}]},
Row = {row, [{id, <<"_design/ddoc">>}, {doc, doc}]},
meck:reset(rexi),
meck:expect(rexi, stream2, ['_'], undefined),
?assertEqual({ok, Accumulator1}, docs_cb(Row, Accumulator1)),
?assertNot(meck:called(rexi, stream2, '_')),
meck:reset(rexi),
meck:expect(rexi, stream2, ['_'], undefined),
?assertEqual({ok, Accumulator2}, docs_cb(Row, Accumulator2)),
?assertNot(meck:called(rexi, stream2, '_')).

t_docs_cb_row(_) ->
Accumulator1 = {db_name, health_threshold, [{filter_states, []}]},
Accumulator2 = {db_name, health_threshold, [{filter_states, []}, {view_row_map, true}]},
Doc = {[{<<"_id">>, id}, {<<"_rev">>, rev}]},
DocInfo1 = {[{state, other}]},
DocInfo2 = {[{state, null}]},
EtsInfo = {[{state, other}]},
Row = {row, [{id, id}, {doc, Doc}]},
ViewRow1 = #view_row{id = id, doc = DocInfo1},
ViewRow2 = {view_row, #{id => id, doc => EtsInfo}},
ViewRow3 = {view_row, #{id => id, doc => undecided}},
meck:expect(mem3, dbname, [db_name], meck:val(mem3_db_name)),
meck:expect(couch_replicator, info_from_doc, [mem3_db_name, Doc], meck:val(DocInfo1)),
meck:expect(rexi, stream2, [ViewRow1], meck:val(ok)),
?assertEqual({ok, Accumulator1}, docs_cb(Row, Accumulator1)),
meck:expect(couch_replicator, info_from_doc, [mem3_db_name, Doc], meck:val(DocInfo2)),
meck:expect(
couch_replicator_doc_processor,
doc_lookup,
[db_name, id, health_threshold],
meck:val({ok, EtsInfo})
),
meck:expect(rexi, stream2, [ViewRow2], meck:val(ok)),
?assertEqual({ok, Accumulator2}, docs_cb(Row, Accumulator2)),
meck:expect(couch_replicator, info_from_doc, [mem3_db_name, Doc], meck:val(DocInfo2)),
meck:expect(
couch_replicator_doc_processor,
doc_lookup,
[db_name, id, health_threshold],
meck:val({error, not_found})
),
meck:expect(rexi, stream2, [ViewRow3], meck:val(ok)),
?assertEqual({ok, Accumulator2}, docs_cb(Row, Accumulator2)).

t_docs_cb_complete(_) ->
meck:expect(rexi, stream_last, [complete], meck:val(ok)),
?assertEqual({ok, accumulator}, docs_cb(complete, accumulator)).

-endif.
92 changes: 91 additions & 1 deletion src/fabric/src/fabric_rpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ uuid(Db) ->
binary:part(Uuid, {0, Prefix}).

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-include_lib("couch/include/couch_eunit.hrl").

maybe_filtered_json_doc_no_filter_test() ->
Body = {[{<<"a">>, 1}]},
Expand All @@ -710,4 +710,94 @@ maybe_filtered_json_doc_with_filter_test() ->
{JDocProps} = maybe_filtered_json_doc(Doc, [], Filter),
?assertEqual(JDocProps, [{<<"a">>, 1}]).

view_cb_test_() ->
{
foreach,
fun() -> meck:new(rexi) end,
fun(_) -> meck:unload() end,
[
?TDEF_FE(t_view_cb_meta),
?TDEF_FE(t_view_cb_row_record),
?TDEF_FE(t_view_cb_row_map),
?TDEF_FE(t_view_cb_complete),
?TDEF_FE(t_view_cb_ddoc_updated),
?TDEF_FE(t_view_cb_insufficient_storage)
]
}.

t_view_cb_meta(_) ->
meck:expect(rexi, stream2, [{meta, meta}], meck:val(ok)),
?assertEqual({ok, acc}, view_cb({meta, meta}, acc)).

t_view_cb_row_record(_) ->
Acc = #mrargs{},
Props = [{id, id}, {key, key}, {value, value}],
ViewRow = #view_row{id = id, key = key, value = value},
meck:expect(rexi, stream2, [ViewRow], meck:val(ok)),
?assertEqual({ok, Acc}, view_cb({row, Props}, Acc)).

t_view_cb_row_map(_) ->
Acc = #mrargs{extra = [{view_row_map, true}]},
Props = [{id, id}, {key, key}, {value, value}],
ViewRow = {view_row, #{id => id, key => key, value => value}},
meck:expect(rexi, stream2, [ViewRow], meck:val(ok)),
?assertEqual({ok, Acc}, view_cb({row, Props}, Acc)).

t_view_cb_complete(_) ->
meck:expect(rexi, stream_last, [complete], meck:val(ok)),
?assertEqual({ok, acc}, view_cb(complete, acc)).

t_view_cb_ddoc_updated(_) ->
meck:expect(rexi, reply, [{ok, ddoc_updated}], meck:val(ok)),
?assertEqual(ok, view_cb(ok, ddoc_updated)).

t_view_cb_insufficient_storage(_) ->
meck:expect(rexi, reply, [{ok, insufficient_storage}], meck:val(ok)),
?assertEqual(ok, view_cb(ok, insufficient_storage)).

reduce_cb_test_() ->
{
foreach,
fun() -> meck:new(rexi) end,
fun(_) -> meck:unload() end,
[
?TDEF_FE(t_reduce_cb_meta),
?TDEF_FE(t_reduce_cb_row_record),
?TDEF_FE(t_reduce_cb_row_map),
?TDEF_FE(t_reduce_cb_complete),
?TDEF_FE(t_reduce_cb_ddoc_updated),
?TDEF_FE(t_reduce_cb_insufficient_storage)
]
}.

t_reduce_cb_meta(_) ->
meck:expect(rexi, stream2, [{meta, meta}], meck:val(ok)),
?assertEqual({ok, acc}, reduce_cb({meta, meta}, acc, options)).

t_reduce_cb_row_record(_) ->
Options = [],
Props = [{id, id}, {key, key}, {value, value}],
ViewRow = #view_row{id = id, key = key, value = value},
meck:expect(rexi, stream2, [ViewRow], meck:val(ok)),
?assertEqual({ok, acc}, reduce_cb({row, Props}, acc, Options)).

t_reduce_cb_row_map(_) ->
Options = [{view_row_map, true}],
Props = [{id, id}, {key, key}, {value, value}],
ViewRow = {view_row, #{id => id, key => key, value => value}},
meck:expect(rexi, stream2, [ViewRow], meck:val(ok)),
?assertEqual({ok, acc}, reduce_cb({row, Props}, acc, Options)).

t_reduce_cb_complete(_) ->
meck:expect(rexi, stream_last, [complete], meck:val(ok)),
?assertEqual({ok, acc}, reduce_cb(complete, acc, options)).

t_reduce_cb_ddoc_updated(_) ->
meck:expect(rexi, reply, [{ok, ddoc_updated}], meck:val(ok)),
?assertEqual(ok, reduce_cb(ok, ddoc_updated, options)).

t_reduce_cb_insufficient_storage(_) ->
meck:expect(rexi, reply, [{ok, insufficient_storage}], meck:val(ok)),
?assertEqual(ok, reduce_cb(ok, insufficient_storage, options)).

-endif.
Loading

0 comments on commit 15f38bf

Please sign in to comment.