Skip to content

Commit

Permalink
[fixup] Send execution stats together with actual results only
Browse files Browse the repository at this point in the history
  • Loading branch information
pgj committed Feb 1, 2024
1 parent fa05850 commit 1c1a48c
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 60 deletions.
81 changes: 34 additions & 47 deletions src/mango/src/mango_cursor_view.erl
Original file line number Diff line number Diff line change
Expand Up @@ -348,27 +348,6 @@ choose_best_index(IndexRanges) ->
{SelectedIndex, SelectedIndexRanges, _} = hd(SortedIndexRanges),
{{SelectedIndex, SelectedIndexRanges}, SortedIndexRanges}.

-spec maybe_init_stats(Options) -> ok when
Options :: mrargs_extra().
maybe_init_stats(Options) when is_list(Options) ->
case couch_util:get_value(execution_stats_rolling, Options, false) of
true -> ok;
false -> mango_execution_stats:shard_init()
end.

-spec roll_stats(Stats, Options) -> ok when
Stats :: shard_stats_v2(),
Options :: mrargs_extra().
roll_stats(Stats, Options) when is_list(Options) ->
case couch_util:get_value(execution_stats_rolling, Options, false) of
true ->
ok = rexi:stream2({execution_stats, format_stats(Stats, Options)});
false ->
#{keys_examined := KeysExamined, docs_examined := DocsExamined} = Stats,
mango_execution_stats:shard_incr_keys_examined(KeysExamined),
mango_execution_stats:shard_incr_docs_examined(DocsExamined)
end.

-spec format_stats(RawStats, Options) -> FormattedStats when
RawStats :: shard_stats_v2(),
Options :: mrargs_extra(),
Expand All @@ -382,31 +361,27 @@ format_stats(Stats, Options) when is_list(Options) ->
{docs_examined, DocsExamined}
end.

-spec maybe_submit_stats(Options) -> ok when
-spec submit_stats(Options) -> ok when
Options :: mrargs_extra().
maybe_submit_stats(Options) when is_list(Options) ->
case couch_util:get_value(execution_stats_rolling, Options, false) of
true ->
ok;
false ->
ShardStats = mango_execution_stats:shard_get_stats(),
Stats = format_stats(ShardStats, Options),
% Send execution stats in batch (shard-level)
ok = rexi:stream2({execution_stats, Stats})
end.
submit_stats(Options) when is_list(Options) ->
ShardStats = mango_execution_stats:shard_get_stats(),
Stats = format_stats(ShardStats, Options),
% Send execution stats in batch (shard-level)
ok = rexi:stream2({execution_stats, Stats}).

-spec view_cb
(Message, #mrargs{}) -> Response when
Message :: {meta, any()} | {row, row_properties()} | complete,
Response :: {ok, #mrargs{}};
(ok, ddoc_updated) -> any().
view_cb({meta, Meta}, #mrargs{extra = Options} = Acc) ->
view_cb({meta, Meta}, Acc) ->
% Map function starting
maybe_init_stats(Options),
mango_execution_stats:shard_init(),
set_mango_msg_timestamp(),
ok = rexi:stream2({meta, Meta}),
{ok, Acc};
view_cb({row, Row}, #mrargs{extra = Options} = Acc) ->
mango_execution_stats:shard_incr_keys_examined(),
couch_stats:increment_counter([mango, keys_examined]),
ViewRow = #view_row{
id = couch_util:get_value(id, Row),
Expand Down Expand Up @@ -440,32 +415,36 @@ view_cb({row, Row}, #mrargs{extra = Options} = Acc) ->
{match, FinalDoc} ->
FinalViewRow = ViewRow#view_row{doc = FinalDoc},
ok = rexi:stream2(FinalViewRow),
case couch_util:get_value(execution_stats_rolling, Options, false) of
true ->
submit_stats(Options),
mango_execution_stats:shard_init();
false ->
ok
end,
set_mango_msg_timestamp();
{no_match, undefined} ->
maybe_send_mango_ping()
end
end,
case {ViewRow#view_row.doc, CoveringIndex} of
{null, _} ->
roll_stats(#{keys_examined => 1, docs_examined => 0}, Options),
maybe_send_mango_ping();
{undefined, Index = #idx{}} ->
Doc = derive_doc_from_index(Index, ViewRow),
Process(Doc),
roll_stats(#{keys_examined => 1, docs_examined => 0}, Options);
Process(Doc);
{undefined, _} ->
% include_docs=false. Use quorum fetch at coordinator
ok = rexi:stream2(ViewRow),
roll_stats(#{keys_examined => 1, docs_examined => 0}, Options),
set_mango_msg_timestamp();
{Doc, _} ->
mango_execution_stats:shard_incr_docs_examined(),
couch_stats:increment_counter([mango, docs_examined]),
Process(Doc),
roll_stats(#{keys_examined => 1, docs_examined => 1}, Options)
Process(Doc)
end,
{ok, Acc};
view_cb(complete, #mrargs{extra = Options} = Acc) ->
maybe_submit_stats(Options),
submit_stats(Options),
% Finish view output
ok = rexi:stream_last(complete),
{ok, Acc};
Expand Down Expand Up @@ -1437,6 +1416,7 @@ t_view_cb_row_matching_regular_doc_rolling(_) ->
{execution_stats_rolling, true}
]
},
mango_execution_stats:shard_init(),
?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
?assert(meck:num_calls(rexi, stream2, '_') == 2).

Expand Down Expand Up @@ -1476,9 +1456,10 @@ t_view_cb_row_non_matching_regular_doc_rolling(_) ->
{execution_stats_rolling, true}
]
},
mango_execution_stats:shard_init(),
put(mango_last_msg_timestamp, os:timestamp()),
?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
?assert(meck:num_calls(rexi, stream2, '_') == 1).
?assertNot(meck:called(rexi, stream2, '_')).

t_view_cb_row_null_doc(_) ->
Row = [{id, id}, {key, key}, {doc, null}],
Expand Down Expand Up @@ -1514,9 +1495,10 @@ t_view_cb_row_null_doc_rolling(_) ->
{execution_stats_rolling, true}
]
},
mango_execution_stats:shard_init(),
put(mango_last_msg_timestamp, os:timestamp()),
?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
?assert(meck:num_calls(rexi, stream2, '_') == 1).
?assertNot(meck:called(rexi, stream2, '_')).

t_view_cb_row_missing_doc_triggers_quorum_fetch(_) ->
Row = [{id, id}, {key, key}, {doc, undefined}],
Expand Down Expand Up @@ -1553,8 +1535,9 @@ t_view_cb_row_missing_doc_triggers_quorum_fetch_rolling(_) ->
{execution_stats_rolling, true}
]
},
mango_execution_stats:shard_init(),
?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
?assert(meck:num_calls(rexi, stream2, '_') == 2).
?assert(meck:num_calls(rexi, stream2, '_') == 1).

t_view_cb_row_matching_covered_doc(_) ->
Keys = [key1, key2],
Expand Down Expand Up @@ -1607,6 +1590,7 @@ t_view_cb_row_matching_covered_doc_rolling(_) ->
{execution_stats_rolling, true}
]
},
mango_execution_stats:shard_init(),
?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
?assert(meck:num_calls(rexi, stream2, '_') == 2).

Expand Down Expand Up @@ -1656,9 +1640,10 @@ t_view_cb_row_non_matching_covered_doc_rolling(_) ->
{execution_stats_rolling, true}
]
},
mango_execution_stats:shard_init(),
put(mango_last_msg_timestamp, os:timestamp()),
?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
?assert(meck:num_calls(rexi, stream2, '_') == 1).
?assertNot(meck:called(rexi, stream2, '_')).

t_view_cb_row_backwards_compatible(_) ->
Row = [{id, id}, {key, key}, {doc, null}],
Expand Down Expand Up @@ -1689,11 +1674,13 @@ t_view_cb_complete_shard_stats_v2(_) ->
?assert(meck:called(rexi, stream_last, '_')).

t_view_cb_complete_shard_stats_v2_rolling(_) ->
meck:expect(rexi, stream2, ['_'], undefined),
ShardStats = #{docs_examined => '_', keys_examined => '_'},
meck:expect(rexi, stream2, [{execution_stats, ShardStats}], meck:val(ok)),
meck:expect(rexi, stream_last, [complete], meck:val(ok)),
Accumulator = #mrargs{extra = [{execution_stats_map, true}, {execution_stats_rolling, true}]},
mango_execution_stats:shard_init(),
?assertEqual({ok, Accumulator}, view_cb(complete, Accumulator)),
?assertNot(meck:called(rexi, stream2, '_')),
?assert(meck:called(rexi, stream2, '_')),
?assert(meck:called(rexi, stream_last, '_')).

t_view_cb_ok(_) ->
Expand Down
24 changes: 11 additions & 13 deletions src/mango/src/mango_execution_stats.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
log_stats/1,
maybe_add_stats/4,
shard_init/0,
shard_incr_keys_examined/1,
shard_incr_docs_examined/1,
shard_incr_keys_examined/0,
shard_incr_docs_examined/0,
shard_get_stats/0
]).

Expand Down Expand Up @@ -123,21 +123,19 @@ shard_init() ->
InitialState = #{docs_examined => 0, keys_examined => 0},
put(?SHARD_STATS_KEY, InitialState).

-spec shard_incr_keys_examined(integer()) -> any().
shard_incr_keys_examined(N) ->
incr(keys_examined, N).
-spec shard_incr_keys_examined() -> any().
shard_incr_keys_examined() ->
incr(keys_examined).

-spec shard_incr_docs_examined(integer()) -> any().
shard_incr_docs_examined(N) ->
incr(docs_examined, N).
-spec shard_incr_docs_examined() -> any().
shard_incr_docs_examined() ->
incr(docs_examined).

-spec incr(atom(), integer()) -> any().
incr(_Key, 0) ->
ok;
incr(Key, N) when is_integer(N), N > 0 ->
-spec incr(atom()) -> any().
incr(Key) ->
case get(?SHARD_STATS_KEY) of
#{} = Stats0 ->
Stats = maps:update_with(Key, fun(X) -> X + N end, Stats0),
Stats = maps:update_with(Key, fun(X) -> X + 1 end, Stats0),
put(?SHARD_STATS_KEY, Stats);
_ ->
ok
Expand Down

0 comments on commit 1c1a48c

Please sign in to comment.