Skip to content

Commit

Permalink
[fixup] Fuse view_row and execution_stats messages in a map
Browse files Browse the repository at this point in the history
  • Loading branch information
pgj committed Feb 4, 2024
1 parent 99bb9a9 commit 0b0834c
Show file tree
Hide file tree
Showing 5 changed files with 334 additions and 76 deletions.
71 changes: 68 additions & 3 deletions src/fabric/src/fabric_view.erl
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ possibly_embed_doc(
#view_row{value = undefined} = Row
) ->
Row;
possibly_embed_doc(
_State,
#{value := undefined} = Row
) ->
Row;
possibly_embed_doc(
#collector{db_name = DbName, query_args = Args},
#view_row{key = _Key, id = _Id, value = Value, doc = _Doc} = Row
Expand Down Expand Up @@ -241,11 +246,63 @@ possibly_embed_doc(
end;
_ ->
Row
end;
possibly_embed_doc(
#collector{db_name = DbName, query_args = Args},
#{key := _Key, id := _Id, value := Value, doc := _Doc} = Row
) ->
#mrargs{include_docs = IncludeDocs} = Args,
case IncludeDocs andalso is_tuple(Value) of
true ->
{Props} = Value,
Rev0 = couch_util:get_value(<<"_rev">>, Props),
case couch_util:get_value(<<"_id">>, Props) of
null ->
Row#{doc => null};
undefined ->
Row;
IncId ->
% use separate process to call fabric:open_doc
% to not interfere with current call
{Pid, Ref} = spawn_monitor(fun() ->
exit(
case Rev0 of
undefined ->
case fabric:open_doc(DbName, IncId, []) of
{ok, NewDoc} ->
Row#{doc => couch_doc:to_json_obj(NewDoc, [])};
{not_found, _} ->
Row#{doc => null};
Else ->
Row#{doc => {error, Else}}
end;
Rev0 ->
Rev = couch_doc:parse_rev(Rev0),
case fabric:open_revs(DbName, IncId, [Rev], []) of
{ok, [{ok, NewDoc}]} ->
Row#{doc => couch_doc:to_json_obj(NewDoc, [])};
{ok, [{{not_found, _}, Rev}]} ->
Row#{doc => null};
Else ->
Row#{doc => {error, Else}}
end
end
)
end),
receive
{'DOWN', Ref, process, Pid, Resp} ->
Resp
end
end;
_ ->
Row
end.

detach_partition(#view_row{key = {p, _Partition, Key}} = Row) ->
Row#view_row{key = Key};
detach_partition(#view_row{} = Row) ->
detach_partition(#{key := {p, _Partition, Key}} = Row) ->
Row#{key => Key};
detach_partition(Row) ->
Row.

keydict(undefined) ->
Expand Down Expand Up @@ -297,7 +354,11 @@ get_next_row(#collector{reducer = RedSrc} = St) when RedSrc =/= undefined ->
end;
get_next_row(State) ->
#collector{rows = [Row | Rest], counters = Counters0} = State,
{Worker, From} = Row#view_row.worker,
{Worker, From} =
case Row of
#view_row{worker = W} -> W;
#{worker := W} -> W
end,
rexi:stream_ack(From),
Counters1 = fabric_dict:update_counter(Worker, -1, Counters0),
{Row, State#collector{rows = Rest, counters = Counters1}}.
Expand Down Expand Up @@ -349,7 +410,11 @@ transform_row(#view_row{key = Key, id = Id, value = Value, doc = undefined}) ->
transform_row(#view_row{key = Key, id = _Id, value = _Value, doc = {error, Reason}}) ->
{row, [{id, error}, {key, Key}, {value, Reason}]};
transform_row(#view_row{key = Key, id = Id, value = Value, doc = Doc}) ->
{row, [{id, Id}, {key, Key}, {value, Value}, {doc, Doc}]}.
{row, [{id, Id}, {key, Key}, {value, Value}, {doc, Doc}]};
transform_row(#{} = Row) ->
#{key := Key, id := Id, value := Value, doc := Doc, stats := Stats} = Row,
{row, Props} = transform_row(#view_row{key = Key, id = Id, value = Value, doc = Doc}),
{row, Props, Stats}.

compare(fwd, <<"raw">>, A, B) -> A < B;
compare(rev, <<"raw">>, A, B) -> B < A;
Expand Down
154 changes: 144 additions & 10 deletions src/fabric/src/fabric_view_all_docs.erl
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,49 @@ shards(Db, Args) ->
end,
fabric_view:get_shards(Db, NewArgs).

handle_row(Row0, {Worker, _} = Source, State) ->
#collector{query_args = Args, counters = Counters0, rows = Rows0} = State,
#mrargs{extra = Options, direction = Dir} = Args,
Row1 =
case Row0 of
#view_row{} -> Row0#view_row{worker = Source};
#{} -> Row0#{worker => Source}
end,
% It has to be ensured that rows of the same format are merged in case of
% mixed-version cluster scenarios.
Row =
case couch_util:get_value(execution_stats_rolling, Options, false) of
true ->
case Row1 of
#view_row{} ->
#{
key => Row1#view_row.key,
id => Row1#view_row.id,
value => Row1#view_row.value,
doc => Row1#view_row.doc,
worker => Row1#view_row.worker,
stats => #{}
};
#{} ->
Row1
end;
false ->
case Row1 of
#view_row{} ->
Row1;
#{} ->
#{id := Id, key := Key} = Row1,
Value = maps:get(value, Row1, null),
Doc = maps:get(doc, Row1, null),
Worker = maps:get(worker, Row1, null),
#view_row{id = Id, key = Key, value = Value, doc = Doc, worker = Worker}
end
end,
Rows = merge_row(Dir, Row, Rows0),
Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
State1 = State#collector{rows = Rows, counters = Counters1},
fabric_view:maybe_send_row(State1).

handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
fabric_view:check_down_shards(State, NodeRef);
handle_message({rexi_EXIT, Reason}, Worker, State) ->
Expand Down Expand Up @@ -257,13 +300,10 @@ handle_message({meta, Meta0}, {Worker, From}, State) ->
update_seq = UpdateSeq0
}}
end;
handle_message(#view_row{} = Row, {Worker, From}, State) ->
#collector{query_args = Args, counters = Counters0, rows = Rows0} = State,
Dir = Args#mrargs.direction,
Rows = merge_row(Dir, Row#view_row{worker = {Worker, From}}, Rows0),
Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
State1 = State#collector{rows = Rows, counters = Counters1},
fabric_view:maybe_send_row(State1);
handle_message(#view_row{} = Row, {_, _} = Source, State) ->
handle_row(Row, Source, State);
handle_message({view_row, #{} = Row}, {_, _} = Source, State) ->
handle_row(Row, Source, State);
handle_message(complete, Worker, State) ->
Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters),
fabric_view:maybe_send_row(State#collector{counters = Counters});
Expand All @@ -273,10 +313,24 @@ handle_message({execution_stats, _} = Msg, {_, From}, St) ->
rexi:stream_ack(From),
{Go, St#collector{user_acc = Acc}}.

merge_row(fwd, Row, Rows) ->
insert_row(_Dir, _Key, R, []) ->
[R];
insert_row(fwd, Key, R, [H | T] = List) ->
V1 = maps:get(Key, R),
V2 = maps:get(Key, H),
if
V1 =< V2 -> [R | List];
true -> [H | insert_row(fwd, Key, R, T)]
end;
insert_row(rev, Key, R, List) ->
lists:reverse(insert_row(fwd, Key, R, lists:reverse(List))).

merge_row(fwd, #view_row{} = Row, Rows) ->
lists:keymerge(#view_row.id, [Row], Rows);
merge_row(rev, Row, Rows) ->
lists:rkeymerge(#view_row.id, [Row], Rows).
merge_row(rev, #view_row{} = Row, Rows) ->
lists:rkeymerge(#view_row.id, [Row], Rows);
merge_row(Dir, #{} = Row, Rows) ->
insert_row(Dir, id, Row, Rows).

all_docs_concurrency() ->
Value = config:get("fabric", "all_docs_concurrency", "10"),
Expand Down Expand Up @@ -385,3 +439,83 @@ filter_keys_by_namespace(Keys, Namespace) when Namespace =:= <<"_local">> ->
);
filter_keys_by_namespace(Keys, _Namespace) ->
Keys.

-ifdef(TEST).
-include_lib("couch/include/couch_eunit.hrl").

merge_row_record_fwd_test() ->
RowX1 = #view_row{id = 4},
Row1 = #view_row{id = 1},
Row2 = #view_row{id = 3},
Row3 = #view_row{id = 5},
Row4 = #view_row{id = 7},
Rows = [Row1, Row2, Row3, Row4],
Expected1 = [Row1, Row2, RowX1, Row3, Row4],
?assertEqual(Expected1, merge_row(fwd, RowX1, Rows)),
RowX2 = #view_row{id = 0},
Expected2 = [RowX2, Row1, Row2, Row3, Row4],
?assertEqual(Expected2, merge_row(fwd, RowX2, Rows)),
RowX3 = #view_row{id = 8},
Expected3 = [Row1, Row2, Row3, Row4, RowX3],
?assertEqual(Expected3, merge_row(fwd, RowX3, Rows)),
RowX4 = #view_row{id = 5},
Expected4 = [Row1, Row2, RowX4, Row3, Row4],
?assertEqual(Expected4, merge_row(fwd, RowX4, Rows)).

merge_row_record_rev_test() ->
RowX1 = #view_row{id = 5},
Row1 = #view_row{id = 2},
Row2 = #view_row{id = 4},
Row3 = #view_row{id = 6},
Row4 = #view_row{id = 8},
Rows = [Row4, Row3, Row2, Row1],
Expected1 = [Row4, Row3, RowX1, Row2, Row1],
?assertEqual(Expected1, merge_row(rev, RowX1, Rows)),
RowX2 = #view_row{id = 1},
Expected2 = [Row4, Row3, Row2, Row1, RowX2],
?assertEqual(Expected2, merge_row(rev, RowX2, Rows)),
RowX3 = #view_row{id = 9},
Expected3 = [RowX3, Row4, Row3, Row2, Row1],
?assertEqual(Expected3, merge_row(rev, RowX3, Rows)),
RowX4 = #view_row{id = 6},
Expected4 = [Row4, Row3, RowX4, Row2, Row1],
?assertEqual(Expected4, merge_row(rev, RowX4, Rows)).

merge_row_map_fwd_test() ->
RowX1 = #{id => 4},
Row1 = #{id => 1},
Row2 = #{id => 3},
Row3 = #{id => 5},
Row4 = #{id => 7},
Rows = [Row1, Row2, Row3, Row4],
Expected1 = [Row1, Row2, RowX1, Row3, Row4],
?assertEqual(Expected1, merge_row(fwd, RowX1, Rows)),
RowX2 = #{id => 0},
Expected2 = [RowX2, Row1, Row2, Row3, Row4],
?assertEqual(Expected2, merge_row(fwd, RowX2, Rows)),
RowX3 = #{id => 8},
Expected3 = [Row1, Row2, Row3, Row4, RowX3],
?assertEqual(Expected3, merge_row(fwd, RowX3, Rows)),
RowX4 = #{id => 5},
Expected4 = [Row1, Row2, RowX4, Row3, Row4],
?assertEqual(Expected4, merge_row(fwd, RowX4, Rows)).

merge_row_map_rev_test() ->
RowX1 = #{id => 5},
Row1 = #{id => 2},
Row2 = #{id => 4},
Row3 = #{id => 6},
Row4 = #{id => 8},
Rows = [Row4, Row3, Row2, Row1],
Expected1 = [Row4, Row3, RowX1, Row2, Row1],
?assertEqual(Expected1, merge_row(rev, RowX1, Rows)),
RowX2 = #{id => 1},
Expected2 = [Row4, Row3, Row2, Row1, RowX2],
?assertEqual(Expected2, merge_row(rev, RowX2, Rows)),
RowX3 = #{id => 9},
Expected3 = [RowX3, Row4, Row3, Row2, Row1],
?assertEqual(Expected3, merge_row(rev, RowX3, Rows)),
RowX4 = #{id => 6},
Expected4 = [Row4, Row3, RowX4, Row2, Row1],
?assertEqual(Expected4, merge_row(rev, RowX4, Rows)).
-endif.
Loading

0 comments on commit 0b0834c

Please sign in to comment.