diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl index 87f464b365d..4e61b9e83e4 100644 --- a/src/fabric/src/fabric_view.erl +++ b/src/fabric/src/fabric_view.erl @@ -192,6 +192,11 @@ possibly_embed_doc( #view_row{value = undefined} = Row ) -> Row; +possibly_embed_doc( + _State, + #{value := undefined} = Row +) -> + Row; possibly_embed_doc( #collector{db_name = DbName, query_args = Args}, #view_row{key = _Key, id = _Id, value = Value, doc = _Doc} = Row @@ -241,11 +246,63 @@ possibly_embed_doc( end; _ -> Row + end; +possibly_embed_doc( + #collector{db_name = DbName, query_args = Args}, + #{key := _Key, id := _Id, value := Value, doc := _Doc} = Row +) -> + #mrargs{include_docs = IncludeDocs} = Args, + case IncludeDocs andalso is_tuple(Value) of + true -> + {Props} = Value, + Rev0 = couch_util:get_value(<<"_rev">>, Props), + case couch_util:get_value(<<"_id">>, Props) of + null -> + Row#{doc => null}; + undefined -> + Row; + IncId -> + % use separate process to call fabric:open_doc + % to not interfere with current call + {Pid, Ref} = spawn_monitor(fun() -> + exit( + case Rev0 of + undefined -> + case fabric:open_doc(DbName, IncId, []) of + {ok, NewDoc} -> + Row#{doc => couch_doc:to_json_obj(NewDoc, [])}; + {not_found, _} -> + Row#{doc => null}; + Else -> + Row#{doc => {error, Else}} + end; + Rev0 -> + Rev = couch_doc:parse_rev(Rev0), + case fabric:open_revs(DbName, IncId, [Rev], []) of + {ok, [{ok, NewDoc}]} -> + Row#{doc => couch_doc:to_json_obj(NewDoc, [])}; + {ok, [{{not_found, _}, Rev}]} -> + Row#{doc => null}; + Else -> + Row#{doc => {error, Else}} + end + end + ) + end), + receive + {'DOWN', Ref, process, Pid, Resp} -> + Resp + end + end; + _ -> + Row end. detach_partition(#view_row{key = {p, _Partition, Key}} = Row) -> Row#view_row{key = Key}; -detach_partition(#view_row{} = Row) -> +detach_partition(#{key := {p, _Partition, Key}} = Row) -> + Row#{key => Key}; +detach_partition(Row) -> Row. keydict(undefined) -> @@ -297,7 +354,11 @@ get_next_row(#collector{reducer = RedSrc} = St) when RedSrc =/= undefined -> end; get_next_row(State) -> #collector{rows = [Row | Rest], counters = Counters0} = State, - {Worker, From} = Row#view_row.worker, + {Worker, From} = + case Row of + #view_row{worker = W} -> W; + #{worker := W} -> W + end, rexi:stream_ack(From), Counters1 = fabric_dict:update_counter(Worker, -1, Counters0), {Row, State#collector{rows = Rest, counters = Counters1}}. @@ -349,7 +410,11 @@ transform_row(#view_row{key = Key, id = Id, value = Value, doc = undefined}) -> transform_row(#view_row{key = Key, id = _Id, value = _Value, doc = {error, Reason}}) -> {row, [{id, error}, {key, Key}, {value, Reason}]}; transform_row(#view_row{key = Key, id = Id, value = Value, doc = Doc}) -> - {row, [{id, Id}, {key, Key}, {value, Value}, {doc, Doc}]}. + {row, [{id, Id}, {key, Key}, {value, Value}, {doc, Doc}]}; +transform_row(#{} = Row) -> + #{key := Key, id := Id, value := Value, doc := Doc, stats := Stats} = Row, + {row, Props} = transform_row(#view_row{key = Key, id = Id, value = Value, doc = Doc}), + {row, Props, Stats}. compare(fwd, <<"raw">>, A, B) -> A < B; compare(rev, <<"raw">>, A, B) -> B < A; diff --git a/src/fabric/src/fabric_view_all_docs.erl b/src/fabric/src/fabric_view_all_docs.erl index a6786bff788..304143ee12c 100644 --- a/src/fabric/src/fabric_view_all_docs.erl +++ b/src/fabric/src/fabric_view_all_docs.erl @@ -187,6 +187,49 @@ shards(Db, Args) -> end, fabric_view:get_shards(Db, NewArgs). +handle_row(Row0, {Worker, _} = Source, State) -> + #collector{query_args = Args, counters = Counters0, rows = Rows0} = State, + #mrargs{extra = Options, direction = Dir} = Args, + Row1 = + case Row0 of + #view_row{} -> Row0#view_row{worker = Source}; + #{} -> Row0#{worker => Source} + end, + % It has to be ensured that rows of the same format are merged in case of + % mixed-version cluster scenarios. + Row = + case couch_util:get_value(execution_stats_rolling, Options, false) of + true -> + case Row1 of + #view_row{} -> + #{ + key => Row1#view_row.key, + id => Row1#view_row.id, + value => Row1#view_row.value, + doc => Row1#view_row.doc, + worker => Row1#view_row.worker, + stats => #{} + }; + #{} -> + Row1 + end; + false -> + case Row1 of + #view_row{} -> + Row1; + #{} -> + #{id := Id, key := Key} = Row1, + Value = maps:get(value, Row1, null), + Doc = maps:get(doc, Row1, null), + Worker = maps:get(worker, Row1, null), + #view_row{id = Id, key = Key, value = Value, doc = Doc, worker = Worker} + end + end, + Rows = merge_row(Dir, Row, Rows0), + Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), + State1 = State#collector{rows = Rows, counters = Counters1}, + fabric_view:maybe_send_row(State1). + handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) -> fabric_view:check_down_shards(State, NodeRef); handle_message({rexi_EXIT, Reason}, Worker, State) -> @@ -257,13 +300,10 @@ handle_message({meta, Meta0}, {Worker, From}, State) -> update_seq = UpdateSeq0 }} end; -handle_message(#view_row{} = Row, {Worker, From}, State) -> - #collector{query_args = Args, counters = Counters0, rows = Rows0} = State, - Dir = Args#mrargs.direction, - Rows = merge_row(Dir, Row#view_row{worker = {Worker, From}}, Rows0), - Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), - State1 = State#collector{rows = Rows, counters = Counters1}, - fabric_view:maybe_send_row(State1); +handle_message(#view_row{} = Row, {_, _} = Source, State) -> + handle_row(Row, Source, State); +handle_message({view_row, #{} = Row}, {_, _} = Source, State) -> + handle_row(Row, Source, State); handle_message(complete, Worker, State) -> Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters), fabric_view:maybe_send_row(State#collector{counters = Counters}); @@ -273,10 +313,24 @@ handle_message({execution_stats, _} = Msg, {_, From}, St) -> rexi:stream_ack(From), {Go, St#collector{user_acc = Acc}}. -merge_row(fwd, Row, Rows) -> +insert_row(_Dir, _Key, R, []) -> + [R]; +insert_row(fwd, Key, R, [H | T] = List) -> + V1 = maps:get(Key, R), + V2 = maps:get(Key, H), + if + V1 =< V2 -> [R | List]; + true -> [H | insert_row(fwd, Key, R, T)] + end; +insert_row(rev, Key, R, List) -> + lists:reverse(insert_row(fwd, Key, R, lists:reverse(List))). + +merge_row(fwd, #view_row{} = Row, Rows) -> lists:keymerge(#view_row.id, [Row], Rows); -merge_row(rev, Row, Rows) -> - lists:rkeymerge(#view_row.id, [Row], Rows). +merge_row(rev, #view_row{} = Row, Rows) -> + lists:rkeymerge(#view_row.id, [Row], Rows); +merge_row(Dir, #{} = Row, Rows) -> + insert_row(Dir, id, Row, Rows). all_docs_concurrency() -> Value = config:get("fabric", "all_docs_concurrency", "10"), @@ -385,3 +439,83 @@ filter_keys_by_namespace(Keys, Namespace) when Namespace =:= <<"_local">> -> ); filter_keys_by_namespace(Keys, _Namespace) -> Keys. + +-ifdef(TEST). +-include_lib("couch/include/couch_eunit.hrl"). + +merge_row_record_fwd_test() -> + RowX1 = #view_row{id = 4}, + Row1 = #view_row{id = 1}, + Row2 = #view_row{id = 3}, + Row3 = #view_row{id = 5}, + Row4 = #view_row{id = 7}, + Rows = [Row1, Row2, Row3, Row4], + Expected1 = [Row1, Row2, RowX1, Row3, Row4], + ?assertEqual(Expected1, merge_row(fwd, RowX1, Rows)), + RowX2 = #view_row{id = 0}, + Expected2 = [RowX2, Row1, Row2, Row3, Row4], + ?assertEqual(Expected2, merge_row(fwd, RowX2, Rows)), + RowX3 = #view_row{id = 8}, + Expected3 = [Row1, Row2, Row3, Row4, RowX3], + ?assertEqual(Expected3, merge_row(fwd, RowX3, Rows)), + RowX4 = #view_row{id = 5}, + Expected4 = [Row1, Row2, RowX4, Row3, Row4], + ?assertEqual(Expected4, merge_row(fwd, RowX4, Rows)). + +merge_row_record_rev_test() -> + RowX1 = #view_row{id = 5}, + Row1 = #view_row{id = 2}, + Row2 = #view_row{id = 4}, + Row3 = #view_row{id = 6}, + Row4 = #view_row{id = 8}, + Rows = [Row4, Row3, Row2, Row1], + Expected1 = [Row4, Row3, RowX1, Row2, Row1], + ?assertEqual(Expected1, merge_row(rev, RowX1, Rows)), + RowX2 = #view_row{id = 1}, + Expected2 = [Row4, Row3, Row2, Row1, RowX2], + ?assertEqual(Expected2, merge_row(rev, RowX2, Rows)), + RowX3 = #view_row{id = 9}, + Expected3 = [RowX3, Row4, Row3, Row2, Row1], + ?assertEqual(Expected3, merge_row(rev, RowX3, Rows)), + RowX4 = #view_row{id = 6}, + Expected4 = [Row4, Row3, RowX4, Row2, Row1], + ?assertEqual(Expected4, merge_row(rev, RowX4, Rows)). + +merge_row_map_fwd_test() -> + RowX1 = #{id => 4}, + Row1 = #{id => 1}, + Row2 = #{id => 3}, + Row3 = #{id => 5}, + Row4 = #{id => 7}, + Rows = [Row1, Row2, Row3, Row4], + Expected1 = [Row1, Row2, RowX1, Row3, Row4], + ?assertEqual(Expected1, merge_row(fwd, RowX1, Rows)), + RowX2 = #{id => 0}, + Expected2 = [RowX2, Row1, Row2, Row3, Row4], + ?assertEqual(Expected2, merge_row(fwd, RowX2, Rows)), + RowX3 = #{id => 8}, + Expected3 = [Row1, Row2, Row3, Row4, RowX3], + ?assertEqual(Expected3, merge_row(fwd, RowX3, Rows)), + RowX4 = #{id => 5}, + Expected4 = [Row1, Row2, RowX4, Row3, Row4], + ?assertEqual(Expected4, merge_row(fwd, RowX4, Rows)). + +merge_row_map_rev_test() -> + RowX1 = #{id => 5}, + Row1 = #{id => 2}, + Row2 = #{id => 4}, + Row3 = #{id => 6}, + Row4 = #{id => 8}, + Rows = [Row4, Row3, Row2, Row1], + Expected1 = [Row4, Row3, RowX1, Row2, Row1], + ?assertEqual(Expected1, merge_row(rev, RowX1, Rows)), + RowX2 = #{id => 1}, + Expected2 = [Row4, Row3, Row2, Row1, RowX2], + ?assertEqual(Expected2, merge_row(rev, RowX2, Rows)), + RowX3 = #{id => 9}, + Expected3 = [RowX3, Row4, Row3, Row2, Row1], + ?assertEqual(Expected3, merge_row(rev, RowX3, Rows)), + RowX4 = #{id => 6}, + Expected4 = [Row4, Row3, RowX4, Row2, Row1], + ?assertEqual(Expected4, merge_row(rev, RowX4, Rows)). +-endif. diff --git a/src/fabric/src/fabric_view_map.erl b/src/fabric/src/fabric_view_map.erl index 43922d5d59e..73a31fcfbe9 100644 --- a/src/fabric/src/fabric_view_map.erl +++ b/src/fabric/src/fabric_view_map.erl @@ -121,6 +121,41 @@ go(DbName, Workers, {map, View, _}, Args, Callback, Acc0) -> {ok, Resp} end. +handle_stop(State) -> + #collector{callback = Callback} = State, + {_, Acc} = Callback(complete, State#collector.user_acc), + {stop, State#collector{user_acc = Acc}}. + +handle_non_sorted(Row, {_, From}, State) -> + #collector{callback = Callback, user_acc = AccIn, limit = Limit} = State, + {Go, Acc} = Callback(fabric_view:transform_row(Row), AccIn), + rexi:stream_ack(From), + {Go, State#collector{user_acc = Acc, limit = Limit - 1}}. + +handle_sorted(Row0, {Worker, _} = Source, State) -> + #collector{ + query_args = #mrargs{direction = Dir}, + counters = Counters0, + rows = Rows0, + keys = KeyDict0, + collation = Collation + } = State, + Row = + case Row0 of + #view_row{} -> Row0#view_row{worker = Source}; + #{} -> Row0#{worker => Source} + end, + {Rows, KeyDict} = merge_row( + Dir, + Collation, + KeyDict0, + Row, + Rows0 + ), + Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), + State1 = State#collector{rows = Rows, counters = Counters1, keys = KeyDict}, + fabric_view:maybe_send_row(State1). + handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) -> fabric_view:check_down_shards(State, NodeRef); handle_message({rexi_EXIT, Reason}, Worker, State) -> @@ -176,32 +211,17 @@ handle_message({meta, Meta0}, {Worker, From}, State) -> }} end; handle_message(#view_row{}, {_, _}, #collector{sorted = false, limit = 0} = State) -> - #collector{callback = Callback} = State, - {_, Acc} = Callback(complete, State#collector.user_acc), - {stop, State#collector{user_acc = Acc}}; -handle_message(#view_row{} = Row, {_, From}, #collector{sorted = false} = St) -> - #collector{callback = Callback, user_acc = AccIn, limit = Limit} = St, - {Go, Acc} = Callback(fabric_view:transform_row(Row), AccIn), - rexi:stream_ack(From), - {Go, St#collector{user_acc = Acc, limit = Limit - 1}}; -handle_message(#view_row{} = Row, {Worker, From}, State) -> - #collector{ - query_args = #mrargs{direction = Dir}, - counters = Counters0, - rows = Rows0, - keys = KeyDict0, - collation = Collation - } = State, - {Rows, KeyDict} = merge_row( - Dir, - Collation, - KeyDict0, - Row#view_row{worker = {Worker, From}}, - Rows0 - ), - Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), - State1 = State#collector{rows = Rows, counters = Counters1, keys = KeyDict}, - fabric_view:maybe_send_row(State1); + handle_stop(State); +handle_message(#view_row{} = Row, {_, From}, #collector{sorted = false} = State) -> + handle_non_sorted(Row, From, State); +handle_message(#view_row{} = Row, Source, State) -> + handle_sorted(Row, Source, State); +handle_message({view_row, #{}}, {_, _}, #collector{sorted = false, limit = 0} = State) -> + handle_stop(State); +handle_message({view_row, #{} = Row}, {_, _} = Source, #collector{sorted = false} = State) -> + handle_non_sorted(Row, Source, State); +handle_message({view_row, #{} = Row}, {_, _} = Source, State) -> + handle_sorted(Row, Source, State); handle_message(complete, Worker, State) -> Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters), fabric_view:maybe_send_row(State#collector{counters = Counters}); @@ -215,10 +235,15 @@ handle_message(ddoc_updated, _Worker, State) -> handle_message(insufficient_storage, _Worker, State) -> {stop, State}. +key_id_from_row(#view_row{key = Key, id = Id}) -> + {Key, Id}; +key_id_from_row(#{key := Key, id := Id}) -> + {Key, Id}. + merge_row(Dir, Collation, undefined, Row, Rows0) -> Rows1 = lists:merge( - fun(#view_row{key = KeyA, id = IdA}, #view_row{key = KeyB, id = IdB}) -> - compare(Dir, Collation, {KeyA, IdA}, {KeyB, IdB}) + fun(RowA, RowB) -> + compare(Dir, Collation, key_id_from_row(RowA), key_id_from_row(RowB)) end, [Row], Rows0 @@ -240,12 +265,15 @@ merge_row(Dir, Collation, KeyDict0, Row, Rows0) -> _ -> fun couch_ejson_compare:less/2 end, - case maybe_update_keydict(Row#view_row.key, KeyDict0, CmpFun) of + {Key, _Id} = key_id_from_row(Row), + case maybe_update_keydict(Key, KeyDict0, CmpFun) of undefined -> {Rows0, KeyDict0}; KeyDict1 -> Rows1 = lists:merge( - fun(#view_row{key = A, id = IdA}, #view_row{key = B, id = IdB}) -> + fun(RowA, RowB) -> + {A, IdA} = key_id_from_row(RowA), + {B, IdB} = key_id_from_row(RowB), case {Dir, CmpFun(A, B)} of {fwd, 0} -> IdA < IdB; diff --git a/src/mango/src/mango_cursor_view.erl b/src/mango/src/mango_cursor_view.erl index 470cfba483e..7e682782c4f 100644 --- a/src/mango/src/mango_cursor_view.erl +++ b/src/mango/src/mango_cursor_view.erl @@ -383,7 +383,7 @@ view_cb({meta, Meta}, Acc) -> view_cb({row, Row}, #mrargs{extra = Options} = Acc) -> mango_execution_stats:shard_incr_keys_examined(), couch_stats:increment_counter([mango, keys_examined]), - ViewRow = #view_row{ + ViewRow0 = #view_row{ id = couch_util:get_value(id, Row), key = couch_util:get_value(key, Row), doc = couch_util:get_value(doc, Row) @@ -413,28 +413,52 @@ view_cb({row, Row}, #mrargs{extra = Options} = Acc) -> % However, this oddness is confined to being visible in this module. case match_and_extract_doc(Doc, Selector, Fields) of {match, FinalDoc} -> - FinalViewRow = ViewRow#view_row{doc = FinalDoc}, - ok = rexi:stream2(FinalViewRow), - case couch_util:get_value(execution_stats_rolling, Options, false) of - true -> - submit_stats(Options), - mango_execution_stats:shard_init(); - false -> - ok - end, + ViewRow = + case couch_util:get_value(execution_stats_rolling, Options, false) of + true -> + ShardStats = mango_execution_stats:shard_get_stats(), + mango_execution_stats:shard_init(), + Stats = format_stats(ShardStats, Options), + {view_row, #{ + id => ViewRow0#view_row.id, + key => ViewRow0#view_row.key, + value => ViewRow0#view_row.value, + doc => FinalDoc, + stats => Stats + }}; + false -> + ViewRow0#view_row{doc = FinalDoc} + end, + ok = rexi:stream2(ViewRow), set_mango_msg_timestamp(); {no_match, undefined} -> maybe_send_mango_ping() end end, - case {ViewRow#view_row.doc, CoveringIndex} of + case {ViewRow0#view_row.doc, CoveringIndex} of {null, _} -> maybe_send_mango_ping(); {undefined, Index = #idx{}} -> - Doc = derive_doc_from_index(Index, ViewRow), + Doc = derive_doc_from_index(Index, ViewRow0), Process(Doc); {undefined, _} -> % include_docs=false. Use quorum fetch at coordinator + ViewRow = + case couch_util:get_value(execution_stats_rolling, Options, false) of + true -> + ShardStats = mango_execution_stats:shard_get_stats(), + mango_execution_stats:shard_init(), + Stats = format_stats(ShardStats, Options), + {view_row, #{ + id => ViewRow0#view_row.id, + key => ViewRow0#view_row.key, + value => ViewRow0#view_row.value, + doc => ViewRow0#view_row.doc, + stats => Stats + }}; + false -> + ViewRow0 + end, ok = rexi:stream2(ViewRow), set_mango_msg_timestamp(); {Doc, _} -> @@ -536,6 +560,10 @@ handle_message({execution_stats, #{} = ShardStats}, Cursor0) -> Stats = mango_execution_stats:incr_keys_examined(Stats1, KeysExamined), Cursor = Cursor0#cursor{execution_stats = Stats}, {ok, Cursor}; +handle_message({row, Props, Stats}, Cursor0) -> + {Status, Cursor1} = handle_message({row, Props}, Cursor0), + {_, Cursor} = handle_message({execution_stats, Stats}, Cursor1), + {Status, Cursor}; handle_message(complete, Cursor) -> {ok, Cursor}; handle_message({error, Reason}, _Cursor) -> @@ -550,6 +578,10 @@ handle_all_docs_message({row, Props}, Cursor) -> true -> {ok, Cursor}; false -> handle_message({row, Props}, Cursor) end; +handle_all_docs_message({row, Props, Stats}, Cursor0) -> + {Status, Cursor1} = handle_all_docs_message({row, Props}, Cursor0), + {_, Cursor} = handle_all_docs_message({execution_stats, Stats}, Cursor1), + {Status, Cursor}; handle_all_docs_message(Message, Cursor) -> handle_message(Message, Cursor). @@ -1383,8 +1415,8 @@ t_view_cb_meta_rolling(_) -> t_view_cb_row_matching_regular_doc(_) -> Row = [{id, id}, {key, key}, {doc, doc}], - Result = #view_row{id = id, key = key, doc = doc}, - meck:expect(rexi, stream2, [Result], meck:val(ok)), + ViewRow = #view_row{id = id, key = key, doc = doc}, + meck:expect(rexi, stream2, [ViewRow], meck:val(ok)), Accumulator = #mrargs{ extra = [ @@ -1401,9 +1433,8 @@ t_view_cb_row_matching_regular_doc(_) -> t_view_cb_row_matching_regular_doc_rolling(_) -> Row = [{id, id}, {key, key}, {doc, doc}], - Result = #view_row{id = id, key = key, doc = doc}, - ExecutionStats = {execution_stats, #{keys_examined => 1, docs_examined => 1}}, - meck:expect(rexi, stream2, [{[Result], ok}, {[ExecutionStats], ok}]), + Stats = #{keys_examined => 1, docs_examined => 1}, + ViewRow = {view_row, #{id => id, key => key, doc => doc, stats => Stats}}, Accumulator = #mrargs{ extra = [ @@ -1416,9 +1447,10 @@ t_view_cb_row_matching_regular_doc_rolling(_) -> {execution_stats_rolling, true} ] }, + meck:expect(rexi, stream2, [ViewRow], meck:val(ok)), mango_execution_stats:shard_init(), ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)), - ?assert(meck:num_calls(rexi, stream2, '_') == 2). + ?assert(meck:num_calls(rexi, stream2, '_') == 1). t_view_cb_row_non_matching_regular_doc(_) -> Doc = {[]}, @@ -1442,8 +1474,7 @@ t_view_cb_row_non_matching_regular_doc(_) -> t_view_cb_row_non_matching_regular_doc_rolling(_) -> Doc = {[]}, Row = [{id, id}, {key, key}, {doc, Doc}], - ExecutionStats = {execution_stats, #{keys_examined => 1, docs_examined => 1}}, - meck:expect(rexi, stream2, [{[ExecutionStats], ok}]), + Stats = #{keys_examined => 100, docs_examined => 90}, Accumulator = #mrargs{ extra = [ @@ -1456,6 +1487,7 @@ t_view_cb_row_non_matching_regular_doc_rolling(_) -> {execution_stats_rolling, true} ] }, + meck:expect(rexi, stream2, ['_'], undefined), mango_execution_stats:shard_init(), put(mango_last_msg_timestamp, os:timestamp()), ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)), @@ -1481,8 +1513,6 @@ t_view_cb_row_null_doc(_) -> t_view_cb_row_null_doc_rolling(_) -> Row = [{id, id}, {key, key}, {doc, null}], - ExecutionStats = {execution_stats, #{keys_examined => 1, docs_examined => 0}}, - meck:expect(rexi, stream2, [{[ExecutionStats], ok}]), Accumulator = #mrargs{ extra = [ @@ -1495,6 +1525,7 @@ t_view_cb_row_null_doc_rolling(_) -> {execution_stats_rolling, true} ] }, + meck:expect(rexi, stream2, ['_'], undefined), mango_execution_stats:shard_init(), put(mango_last_msg_timestamp, os:timestamp()), ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)), @@ -1520,9 +1551,10 @@ t_view_cb_row_missing_doc_triggers_quorum_fetch(_) -> t_view_cb_row_missing_doc_triggers_quorum_fetch_rolling(_) -> Row = [{id, id}, {key, key}, {doc, undefined}], - ViewRow = #view_row{id = id, key = key, doc = undefined}, - ExecutionStats = {execution_stats, #{keys_examined => 1, docs_examined => 0}}, - meck:expect(rexi, stream2, [{[ViewRow], ok}, {[ExecutionStats], ok}]), + Stats = #{keys_examined => 1, docs_examined => 0}, + ViewRow = + {view_row, #{id => id, key => key, doc => undefined, value => undefined, stats => Stats}}, + meck:expect(rexi, stream2, [ViewRow], meck:val(ok)), Accumulator = #mrargs{ extra = [ @@ -1569,15 +1601,14 @@ t_view_cb_row_matching_covered_doc_rolling(_) -> Keys = [key1, key2], Row = [{id, id}, {key, Keys}, {doc, undefined}], Doc = {[{<<"field1">>, key1}, {<<"field2">>, key2}]}, - Result = #view_row{id = id, key = Keys, doc = Doc}, + Stats = #{keys_examined => 1, docs_examined => 0}, + ViewRow = {view_row, #{id => id, key => Keys, doc => Doc, stats => Stats}}, Fields = [<<"field1">>, <<"field2">>], Index = #idx{ type = <<"json">>, def = {[{<<"fields">>, {[{<<"field1">>, undefined}, {<<"field2">>, undefined}]}}]} }, - ExecutionStats = {execution_stats, #{keys_examined => 1, docs_examined => 0}}, - meck:expect(rexi, stream2, [{[Result], ok}, {[ExecutionStats], ok}]), Accumulator = #mrargs{ extra = [ @@ -1590,9 +1621,10 @@ t_view_cb_row_matching_covered_doc_rolling(_) -> {execution_stats_rolling, true} ] }, + meck:expect(rexi, stream2, [ViewRow], meck:val(ok)), mango_execution_stats:shard_init(), ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)), - ?assert(meck:num_calls(rexi, stream2, '_') == 2). + ?assert(meck:num_calls(rexi, stream2, '_') == 1). t_view_cb_row_non_matching_covered_doc(_) -> Row = [{id, id}, {key, [key1, key2]}, {doc, undefined}], @@ -1626,8 +1658,6 @@ t_view_cb_row_non_matching_covered_doc_rolling(_) -> type = <<"json">>, def = {[{<<"fields">>, {[{<<"field1">>, undefined}, {<<"field2">>, undefined}]}}]} }, - ExecutionStats = {execution_stats, #{keys_examined => 1, docs_examined => 0}}, - meck:expect(rexi, stream2, [{[ExecutionStats], ok}]), Accumulator = #mrargs{ extra = [ @@ -1640,6 +1670,7 @@ t_view_cb_row_non_matching_covered_doc_rolling(_) -> {execution_stats_rolling, true} ] }, + meck:expect(rexi, stream2, ['_'], undefined), mango_execution_stats:shard_init(), put(mango_last_msg_timestamp, os:timestamp()), ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)), diff --git a/src/mango/test/15-execution-stats-test.py b/src/mango/test/15-execution-stats-test.py index 2aa3b9bb8e4..f812c2c5cc6 100644 --- a/src/mango/test/15-execution-stats-test.py +++ b/src/mango/test/15-execution-stats-test.py @@ -80,8 +80,8 @@ def test_reporting_consistency(self): "selector": {"age": {"$lte": 42}}, "fields": ["name", "email", "age"], "limit": 3, - "total_keys_examined": 3, - "total_docs_examined": 3, + "total_keys_examined": 4, + "total_docs_examined": 4, "results_returned": 3, }, {