Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mango): rolling execution statistics #4958

Merged
merged 1 commit into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion src/fabric/src/fabric_view_row.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
get_value/1,
get_doc/1,
get_worker/1,
get_stats/1,
set_key/2,
set_doc/2,
set_worker/2,
set_stats/2,
transform/1
]).

Expand Down Expand Up @@ -91,6 +93,14 @@ set_worker(#view_row{} = Row, Worker) ->
set_worker({view_row, #{} = Row}, Worker) ->
{view_row, Row#{worker => Worker}}.

get_stats({view_row, #{stats := Stats}}) ->
Stats;
get_stats({view_row, #{}}) ->
undefined.

set_stats({view_row, #{} = Row}, Stats) ->
{view_row, Row#{stats => Stats}}.

transform(#view_row{value = {[{reduce_overflow_error, Msg}]}}) ->
{row, [{key, null}, {id, error}, {value, reduce_overflow_error}, {reason, Msg}]};
transform(#view_row{key = Key, id = reduced, value = Value}) ->
Expand All @@ -109,8 +119,13 @@ transform({view_row, #{} = Row0}) ->
Value = maps:get(value, Row0, undefined),
Doc = maps:get(doc, Row0, undefined),
Worker = maps:get(worker, Row0, undefined),
Stats = maps:get(stats, Row0, undefined),
Row = #view_row{id = Id, key = Key, value = Value, doc = Doc, worker = Worker},
transform(Row).
{row, Props} = RowProps = transform(Row),
case Stats of
undefined -> RowProps;
#{} -> {row, [{stats, Stats} | Props]}
end.

-ifdef(TEST).
-include_lib("couch/include/couch_eunit.hrl").
Expand Down
146 changes: 109 additions & 37 deletions src/mango/src/mango_cursor_view.erl
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@
covering_index => 'maybe'(#idx{})
}.

-type mrargs_extra_item() ::
{callback, {atom(), atom()}}
| {selector, any()}
| {callback_args, viewcbargs()}
| {ignore_partition_query_limit, boolean()}
| {execution_stats_map, boolean()}
| {execution_stats_rolling, boolean()}.
-type mrargs_extra() :: [mrargs_extra_item()].

-spec viewcbargs_new(Selector, Fields, CoveringIndex) -> ViewCBArgs when
Selector :: selector(),
Fields :: fields(),
Expand Down Expand Up @@ -207,7 +216,9 @@ base_args(#cursor{index = Idx, selector = Selector, fields = Fields} = Cursor) -
% - Return execution statistics in a map
{execution_stats_map, true},
% - Return view rows in a map
{view_row_map, true}
{view_row_map, true},
% - Stream execution statistics
{execution_stats_rolling, true}
]
}.

Expand Down Expand Up @@ -341,6 +352,43 @@ choose_best_index(IndexRanges) ->
{SelectedIndex, SelectedIndexRanges, _} = hd(SortedIndexRanges),
{{SelectedIndex, SelectedIndexRanges}, SortedIndexRanges}.

-spec format_stats(RawStats, Options) -> FormattedStats when
RawStats :: shard_stats_v2(),
Options :: mrargs_extra(),
FormattedStats :: shard_stats_v1() | shard_stats_v2().
format_stats(Stats, Options) when is_list(Options) ->
case couch_util:get_value(execution_stats_map, Options, false) of
true ->
Stats;
false ->
#{docs_examined := DocsExamined} = Stats,
{docs_examined, DocsExamined}
end.

-spec submit_stats(Options) -> ok when
Options :: mrargs_extra().
submit_stats(Options) when is_list(Options) ->
ShardStats = mango_execution_stats:shard_get_stats(),
Stats = format_stats(ShardStats, Options),
% Send execution stats in batch (shard-level)
ok = rexi:stream2({execution_stats, Stats}).

-spec roll_stats(ViewRow, Options) -> ViewRow when
ViewRow :: view_row(),
Options :: mrargs_extra().
roll_stats(ViewRow, Options) when is_list(Options) ->
ViewRowMap = couch_util:get_value(view_row_map, Options, false),
RollingStats = couch_util:get_value(execution_stats_rolling, Options, false),
case ViewRowMap andalso RollingStats of
true ->
ShardStats = mango_execution_stats:shard_get_stats(),
mango_execution_stats:shard_init(),
Stats = format_stats(ShardStats, Options),
fabric_view_row:set_stats(ViewRow, Stats);
false ->
ViewRow
end.

-spec view_cb
(Message, #mrargs{}) -> Response when
Message :: {meta, any()} | {row, row_properties()} | complete,
Expand Down Expand Up @@ -382,7 +430,8 @@ view_cb({row, Props}, #mrargs{extra = Options} = Acc) ->
case match_and_extract_doc(Doc, Selector, Fields) of
{match, FinalDoc} ->
ViewRow1 = fabric_view_row:set_doc(ViewRow, FinalDoc),
ok = rexi:stream2(ViewRow1),
ViewRow2 = roll_stats(ViewRow1, Options),
ok = rexi:stream2(ViewRow2),
set_mango_msg_timestamp();
{no_match, undefined} ->
maybe_send_mango_ping()
Expand All @@ -397,7 +446,8 @@ view_cb({row, Props}, #mrargs{extra = Options} = Acc) ->
Process(Doc);
{undefined, _} ->
% include_docs=false. Use quorum fetch at coordinator
ok = rexi:stream2(ViewRow),
ViewRow1 = roll_stats(ViewRow, Options),
ok = rexi:stream2(ViewRow1),
set_mango_msg_timestamp();
{Doc, _} ->
mango_execution_stats:shard_incr_docs_examined(),
Expand All @@ -406,17 +456,7 @@ view_cb({row, Props}, #mrargs{extra = Options} = Acc) ->
end,
{ok, Acc};
view_cb(complete, #mrargs{extra = Options} = Acc) ->
ShardStats = mango_execution_stats:shard_get_stats(),
Stats =
case couch_util:get_value(execution_stats_map, Options, false) of
true ->
ShardStats;
false ->
DocsExamined = maps:get(docs_examined, ShardStats),
{docs_examined, DocsExamined}
end,
% Send shard-level execution stats
ok = rexi:stream2({execution_stats, Stats}),
submit_stats(Options),
% Finish view output
ok = rexi:stream_last(complete),
{ok, Acc};
Expand Down Expand Up @@ -472,6 +512,21 @@ maybe_send_mango_ping() ->
set_mango_msg_timestamp() ->
put(mango_last_msg_timestamp, os:timestamp()).

-spec add_shard_stats(#execution_stats{}, shard_stats()) -> #execution_stats{}.
add_shard_stats(Stats0, {docs_examined, DocsExamined}) ->
mango_execution_stats:incr_docs_examined(Stats0, DocsExamined);
add_shard_stats(Stats0, #{} = ShardStats) ->
DocsExamined = shard_stats_get(docs_examined, ShardStats),
KeysExamined = shard_stats_get(keys_examined, ShardStats),
Stats = mango_execution_stats:incr_docs_examined(Stats0, DocsExamined),
mango_execution_stats:incr_keys_examined(Stats, KeysExamined).

-spec handle_execution_stats(#cursor{}, shard_stats()) -> {ok, #cursor{}}.
handle_execution_stats(Cursor0, ShardStats) ->
#cursor{execution_stats = Stats} = Cursor0,
Cursor = Cursor0#cursor{execution_stats = add_shard_stats(Stats, ShardStats)},
{ok, Cursor}.

-spec handle_message(message(), #cursor{}) -> Response when
Response ::
{ok, #cursor{}}
Expand All @@ -495,20 +550,10 @@ handle_message({row, Props}, Cursor) ->
couch_log:error("~s :: Error loading doc: ~p", [?MODULE, Error]),
{ok, Cursor}
end;
handle_message({execution_stats, {docs_examined, DocsExamined}}, Cursor0) ->
#cursor{execution_stats = Stats} = Cursor0,
Cursor = Cursor0#cursor{
execution_stats = mango_execution_stats:incr_docs_examined(Stats, DocsExamined)
},
{ok, Cursor};
handle_message({execution_stats, {docs_examined, _} = ShardStats}, Cursor0) ->
handle_execution_stats(Cursor0, ShardStats);
handle_message({execution_stats, #{} = ShardStats}, Cursor0) ->
DocsExamined = shard_stats_get(docs_examined, ShardStats),
KeysExamined = shard_stats_get(keys_examined, ShardStats),
#cursor{execution_stats = Stats0} = Cursor0,
Stats1 = mango_execution_stats:incr_docs_examined(Stats0, DocsExamined),
Stats = mango_execution_stats:incr_keys_examined(Stats1, KeysExamined),
Cursor = Cursor0#cursor{execution_stats = Stats},
{ok, Cursor};
handle_execution_stats(Cursor0, ShardStats);
handle_message(complete, Cursor) ->
{ok, Cursor};
handle_message({error, Reason}, _Cursor) ->
Expand Down Expand Up @@ -648,9 +693,14 @@ consider_index_coverage(Index, Fields, #mrargs{include_docs = IncludeDocs0} = Ar
| {no_match, null, {execution_stats, shard_stats()}}
| any().
doc_member_and_extract(Cursor, RowProps) ->
Db = Cursor#cursor.db,
Opts = Cursor#cursor.opts,
ExecutionStats = Cursor#cursor.execution_stats,
#cursor{db = Db, opts = Opts, execution_stats = ExecutionStats0} = Cursor,
ExecutionStats =
case couch_util:get_value(stats, RowProps) of
undefined ->
ExecutionStats0;
ShardStats ->
add_shard_stats(ExecutionStats0, ShardStats)
end,
Selector = Cursor#cursor.selector,
case couch_util:get_value(doc, RowProps) of
{DocProps} ->
Expand Down Expand Up @@ -748,7 +798,8 @@ base_opts_test() ->
}},
{ignore_partition_query_limit, true},
{execution_stats_map, true},
{view_row_map, true}
{view_row_map, true},
{execution_stats_rolling, true}
],
MRArgs =
#mrargs{
Expand Down Expand Up @@ -1093,7 +1144,8 @@ t_execute_ok_all_docs(_) ->
}},
{ignore_partition_query_limit, true},
{execution_stats_map, true},
{view_row_map, true}
{view_row_map, true},
{execution_stats_rolling, true}
],
Args =
#mrargs{
Expand Down Expand Up @@ -1180,7 +1232,8 @@ t_execute_ok_query_view(_) ->
}},
{ignore_partition_query_limit, true},
{execution_stats_map, true},
{view_row_map, true}
{view_row_map, true},
{execution_stats_rolling, true}
],
Args =
#mrargs{
Expand Down Expand Up @@ -1279,7 +1332,8 @@ t_execute_ok_all_docs_with_execution_stats(_) ->
}},
{ignore_partition_query_limit, true},
{execution_stats_map, true},
{view_row_map, true}
{view_row_map, true},
{execution_stats_rolling, true}
],
Args =
#mrargs{
Expand Down Expand Up @@ -1394,6 +1448,8 @@ t_view_cb_row_matching_regular_doc(_) ->
fields => all_fields,
covering_index => undefined
}},
{execution_stats_map, true},
{execution_stats_rolling, true},
{view_row_map, true}
]
},
Expand All @@ -1413,6 +1469,8 @@ t_view_cb_row_non_matching_regular_doc(_) ->
fields => all_fields,
covering_index => undefined
}},
{execution_stats_map, true},
{execution_stats_rolling, true},
{view_row_map, true}
]
},
Expand All @@ -1432,6 +1490,8 @@ t_view_cb_row_null_doc(_) ->
fields => all_fields,
covering_index => undefined
}},
{execution_stats_map, true},
{execution_stats_rolling, true},
{view_row_map, true}
]
},
Expand All @@ -1452,6 +1512,8 @@ t_view_cb_row_missing_doc_triggers_quorum_fetch(_) ->
fields => all_fields,
covering_index => undefined
}},
{execution_stats_map, true},
{execution_stats_rolling, true},
{view_row_map, true}
]
},
Expand Down Expand Up @@ -1479,6 +1541,8 @@ t_view_cb_row_matching_covered_doc(_) ->
fields => Fields,
covering_index => Index
}},
{execution_stats_map, true},
{execution_stats_rolling, true},
{view_row_map, true}
]
},
Expand All @@ -1503,6 +1567,8 @@ t_view_cb_row_non_matching_covered_doc(_) ->
fields => Fields,
covering_index => Index
}},
{execution_stats_map, true},
{execution_stats_rolling, true},
{view_row_map, true}
]
},
Expand Down Expand Up @@ -1638,10 +1704,13 @@ t_handle_message_row_ok_above_limit(_) ->
user_acc = accumulator,
user_fun = fun foo:bar/2
},
Row = [{id, id}, {key, key}, {doc, Doc}],
ShardStats = #{keys_examined => 2, docs_examined => 3},
Row = [{id, id}, {key, key}, {doc, Doc}, {stats, ShardStats}],
Cursor1 =
Cursor#cursor{
execution_stats = #execution_stats{resultsReturned = 1},
execution_stats = #execution_stats{
resultsReturned = 1, totalKeysExamined = 2, totalDocsExamined = 3
},
limit = 8,
user_acc = updated_accumulator,
bookmark_docid = id,
Expand Down Expand Up @@ -1689,12 +1758,15 @@ t_handle_message_row_ok_triggers_quorum_fetch_match(_) ->
user_acc = accumulator,
limit = 1
},
Row = [{id, id}, {doc, undefined}],
ShardStats = #{keys_examined => 2, docs_examined => 3},
Row = [{id, id}, {doc, undefined}, {stats, ShardStats}],
Cursor1 =
Cursor#cursor{
execution_stats =
#execution_stats{
totalQuorumDocsExamined = 1,
totalKeysExamined = 2,
totalDocsExamined = 3,
resultsReturned = 1
},
user_acc = updated_accumulator,
Expand Down
Loading