Skip to content

Commit

Permalink
fabric: switch to maps for view rows
Browse files Browse the repository at this point in the history
The `#view_row{}` record that is used for capturing messages with
row data is not flexible enough to have it extended easily.  If
one wanted to introduce a fresh field, the change would have to be
propagated through many functions and modules.  Especially, if
support for mixed-version clusters is a concern, this would come
with some degree of duplication.

Leverage Erlang/OTP's built-in maps for mitigating this issue and
offer the view callbacks the `view_row_map` Boolean key in
`#mrargs.extra` to request this communication format.  This way the
old record-based format would be still in use unless requested
otherwise.  This facilitates the smooth interoperability of old
coordinators and new workers.  In parallel to that, the new
coordinator could still receive view rows from old workers.
  • Loading branch information
pgj committed Feb 20, 2024
1 parent 2064f21 commit 57f1b4b
Show file tree
Hide file tree
Showing 16 changed files with 538 additions and 219 deletions.
7 changes: 4 additions & 3 deletions src/couch_mrview/src/couch_mrview.erl
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ query_all_docs(Db, Args0, Callback, Acc) ->
all_docs_fold(Db, Args2, Callback, Acc1).

query_view(Db, DDoc, VName) ->
query_view(Db, DDoc, VName, #mrargs{}).
Args = #mrargs{extra = [{view_row_map, true}]},
query_view(Db, DDoc, VName, Args).

query_view(Db, DDoc, VName, Args) when is_list(Args) ->
query_view(Db, DDoc, VName, to_mrargs(Args), fun default_cb/2, []);
Expand Down Expand Up @@ -325,7 +326,7 @@ get_view_info(Db, DDoc, VName) ->
Db,
DDoc,
VName,
#mrargs{}
#mrargs{extra = [{view_row_map, true}]}
),

%% get the total number of rows
Expand Down Expand Up @@ -763,7 +764,7 @@ to_mrargs(KeyList) ->
Index = lookup_index(couch_util:to_existing_atom(Key)),
setelement(Index, Acc, Value)
end,
#mrargs{},
#mrargs{extra = [{view_row_map, true}]},
KeyList
).

Expand Down
9 changes: 6 additions & 3 deletions src/couch_mrview/src/couch_mrview_http.erl
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ row_to_json(Id0, Row) ->
parse_params(#httpd{} = Req, Keys) ->
parse_params(chttpd:qs(Req), Keys);
parse_params(Props, Keys) ->
Args = #mrargs{},
Args = #mrargs{extra = [{view_row_map, true}]},
parse_params(Props, Keys, Args).

parse_params(Props, Keys, Args) ->
Expand Down Expand Up @@ -511,13 +511,16 @@ parse_body_and_query(Req, Keys) ->
#mrargs{
keys = Keys,
group = undefined,
group_level = undefined
group_level = undefined,
extra = [{view_row_map, true}]
},
[keep_group_level]
).

parse_body_and_query(Req, {Props}, Keys) ->
Args = #mrargs{keys = Keys, group = undefined, group_level = undefined},
Args = #mrargs{
keys = Keys, group = undefined, group_level = undefined, extra = [{view_row_map, true}]
},
BodyArgs0 = parse_params(Props, Keys, Args, [decoded]),
BodyArgs1 =
case is_view(Req) of
Expand Down
45 changes: 24 additions & 21 deletions src/couch_replicator/src/couch_replicator_fabric.erl
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,24 @@ docs_int(DbName, Workers, QueryArgs, Callback, Acc0) ->
{ok, Resp}
end.

handle_row(Row0, {Worker, From} = Source, State) ->
#collector{query_args = Args, counters = Counters0, rows = Rows0} = State,
Id = fabric_view_row:get_id(Row0),
Doc = fabric_view_row:get_doc(Row0),
case maybe_fetch_and_filter_doc(Id, Doc, State) of
{[_ | _]} = NewDoc ->
Row1 = fabric_view_row:set_doc(Row0, NewDoc),
Row = fabric_view_row:set_worker(Row1, Source),
Dir = Args#mrargs.direction,
Rows = fabric_view_row:merge(Dir, Row, Rows0),
Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
State1 = State#collector{rows = Rows, counters = Counters1},
fabric_view:maybe_send_row(State1);
skip ->
rexi:stream_ack(From),
{ok, State}
end.

handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
fabric_view:check_down_shards(State, NodeRef);
handle_message({rexi_EXIT, Reason}, Worker, State) ->
Expand Down Expand Up @@ -120,32 +138,17 @@ handle_message({meta, Meta0}, {Worker, From}, State) ->
user_acc = Acc
}}
end;
handle_message(#view_row{id = Id, doc = Doc} = Row0, {Worker, From}, State) ->
#collector{query_args = Args, counters = Counters0, rows = Rows0} = State,
case maybe_fetch_and_filter_doc(Id, Doc, State) of
{[_ | _]} = NewDoc ->
Row = Row0#view_row{doc = NewDoc},
Dir = Args#mrargs.direction,
Rows = merge_row(Dir, Row#view_row{worker = {Worker, From}}, Rows0),
Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
State1 = State#collector{rows = Rows, counters = Counters1},
fabric_view:maybe_send_row(State1);
skip ->
rexi:stream_ack(From),
{ok, State}
end;
handle_message(#view_row{} = Row, {_, _} = Source, State) ->
handle_row(Row, Source, State);
handle_message({view_row, #{}} = Row, {_, _} = Source, State) ->
handle_row(Row, Source, State);
handle_message(complete, Worker, State) ->
Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters),
fabric_view:maybe_send_row(State#collector{counters = Counters}).

merge_row(fwd, Row, Rows) ->
lists:keymerge(#view_row.id, [Row], Rows);
merge_row(rev, Row, Rows) ->
lists:rkeymerge(#view_row.id, [Row], Rows).

maybe_fetch_and_filter_doc(Id, undecided, State) ->
#collector{db_name = DbName, query_args = #mrargs{extra = Extra}} = State,
FilterStates = proplists:get_value(filter_states, Extra),
#collector{db_name = DbName, query_args = #mrargs{extra = Options}} = State,
FilterStates = couch_util:get_value(filter_states, Options),
case couch_replicator:active_doc(DbName, Id) of
{ok, {Props} = DocInfo} ->
DocState = couch_util:get_value(state, Props),
Expand Down
19 changes: 8 additions & 11 deletions src/couch_replicator/src/couch_replicator_fabric_rpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,26 @@
docs(DbName, Options, Args0) ->
set_io_priority(DbName, Options),
#mrargs{skip = Skip, limit = Limit, extra = Extra} = Args0,
FilterStates = proplists:get_value(filter_states, Extra),
Args = Args0#mrargs{skip = 0, limit = Skip + Limit},
HealthThreshold = couch_replicator_scheduler:health_threshold(),
{ok, Db} = couch_db:open_int(DbName, Options),
Acc = {DbName, FilterStates, HealthThreshold},
Acc = {DbName, HealthThreshold, Extra},
couch_mrview:query_all_docs(Db, Args, fun docs_cb/2, Acc).

docs_cb({meta, Meta}, Acc) ->
ok = rexi:stream2({meta, Meta}),
{ok, Acc};
docs_cb({row, Row}, {DbName, States, HealthThreshold} = Acc) ->
Id = couch_util:get_value(id, Row),
Doc = couch_util:get_value(doc, Row),
ViewRow = #view_row{
id = Id,
key = couch_util:get_value(key, Row),
value = couch_util:get_value(value, Row)
},
docs_cb({row, Props}, {DbName, HealthThreshold, Options} = Acc) ->
States = couch_util:get_value(filter_states, Options),
Id = couch_util:get_value(id, Props),
Doc = couch_util:get_value(doc, Props),
ViewRow0 = fabric_view_row:from_props(Props, Options),
case rep_doc_state(DbName, Id, Doc, States, HealthThreshold) of
skip ->
ok;
Other ->
ok = rexi:stream2(ViewRow#view_row{doc = Other})
ViewRow = fabric_view_row:set_doc(ViewRow0, Other),
ok = rexi:stream2(ViewRow)
end,
{ok, Acc};
docs_cb(complete, Acc) ->
Expand Down
3 changes: 2 additions & 1 deletion src/couch_replicator/src/couch_replicator_httpd.erl
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,12 @@ handle_scheduler_docs(Db, Req) when is_binary(Db) ->
VArgs0 = couch_mrview_http:parse_params(Req, undefined),
StatesQs = chttpd:qs_value(Req, "states"),
States = couch_replicator_httpd_util:parse_replication_state_filter(StatesQs),
Extra = VArgs0#mrargs.extra,
VArgs1 = VArgs0#mrargs{
view_type = map,
include_docs = true,
reduce = false,
extra = [{filter_states, States}]
extra = [{filter_states, States} | Extra]
},
VArgs2 = couch_mrview_util:validate_args(VArgs1),
Opts = [{user_ctx, Req#httpd.user_ctx}],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,22 @@ t_scheduler_docs_total_rows({_Ctx, {RepDb, Source, Target}}) ->
Body = test_util:wait(
fun() ->
case req(get, SchedulerDocsUrl) of
{200, #{<<"docs">> := [_ | _]} = Decoded} -> Decoded;
{_, #{}} -> wait
{200, #{<<"docs">> := [_ | _]} = Decoded} ->
Decoded;
{_, #{<<"error">> := Error, <<"reason">> := Reason}} ->
?debugVal(Error, 100),
?debugVal(binary_to_list(Reason), 100);
{_, #{}} ->
wait
end
end,
14000,
1000
),
?assertNotEqual(Body, timeout),
Docs = maps:get(<<"docs">>, Body),
TotalRows = maps:get(<<"total_rows">>, Body),
?assertEqual(TotalRows, length(Docs)),
ok.
?assertEqual(TotalRows, length(Docs)).

t_local_docs_can_be_written({_Ctx, {RepDb, _, _}}) ->
DocUrl1 = rep_doc_url(RepDb, <<"_local/doc1">>),
Expand Down
14 changes: 14 additions & 0 deletions src/fabric/include/fabric.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,17 @@

-record(view_row, {key, id, value, doc, worker}).
-record(change, {key, id, value, deleted=false, doc, worker}).

-type row_property_key() :: id | key | value | doc | worker.
-type row_properties() :: [{row_property_key(), any()}].

-type view_row_map() ::
#{
id => term(),
key => term(),
value => term(),
doc => term(),
worker => term()
}.

-type view_row() :: #view_row{} | {view_row, view_row_map()}.
8 changes: 5 additions & 3 deletions src/fabric/src/fabric.erl
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,8 @@ changes(DbName, Callback, Acc0, Options) ->

%% @equiv query_view(DbName, DesignName, ViewName, #mrargs{})
query_view(DbName, DesignName, ViewName) ->
query_view(DbName, DesignName, ViewName, #mrargs{}).
QueryArgs = #mrargs{extra = [{view_row_map, true}]},
query_view(DbName, DesignName, ViewName, QueryArgs).

%% @equiv query_view(DbName, DesignName,
%% ViewName, fun default_callback/2, [], QueryArgs)
Expand Down Expand Up @@ -545,10 +546,11 @@ end_changes() ->
%% @doc retrieve all the design docs from a database
-spec design_docs(dbname()) -> {ok, [json_obj()]} | {error, Reason :: term()}.
design_docs(DbName) ->
Extra0 = [{view_row_map, true}],
Extra =
case get(io_priority) of
undefined -> [];
Else -> [{io_priority, Else}]
undefined -> Extra0;
Else -> [{io_priority, Else} | Extra0]
end,
QueryArgs0 = #mrargs{
include_docs = true,
Expand Down
28 changes: 11 additions & 17 deletions src/fabric/src/fabric_rpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ reduce_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
Args = fabric_util:upgrade_mrargs(Args0),
{ok, Db} = get_or_create_db(DbName, DbOptions),
VAcc0 = #vacc{db = Db},
couch_mrview:query_view(Db, DDoc, ViewName, Args, fun reduce_cb/2, VAcc0).
Callback = fun(Msg, Acc) -> reduce_cb(Msg, Acc, Args#mrargs.extra) end,
couch_mrview:query_view(Db, DDoc, ViewName, Args, Callback, VAcc0).

create_db(DbName) ->
create_db(DbName, []).
Expand Down Expand Up @@ -491,14 +492,9 @@ view_cb({meta, Meta}, Acc) ->
% Map function starting
ok = rexi:stream2({meta, Meta}),
{ok, Acc};
view_cb({row, Row}, Acc) ->
view_cb({row, Props}, #mrargs{extra = Options} = Acc) ->
% Adding another row
ViewRow = #view_row{
id = couch_util:get_value(id, Row),
key = couch_util:get_value(key, Row),
value = couch_util:get_value(value, Row),
doc = couch_util:get_value(doc, Row)
},
ViewRow = fabric_view_row:from_props(Props, Options),
ok = rexi:stream2(ViewRow),
{ok, Acc};
view_cb(complete, Acc) ->
Expand All @@ -510,24 +506,22 @@ view_cb(ok, ddoc_updated) ->
view_cb(ok, insufficient_storage) ->
rexi:reply({ok, insufficient_storage}).

reduce_cb({meta, Meta}, Acc) ->
reduce_cb({meta, Meta}, Acc, _Options) ->
% Map function starting
ok = rexi:stream2({meta, Meta}),
{ok, Acc};
reduce_cb({row, Row}, Acc) ->
reduce_cb({row, Props}, Acc, Options) ->
% Adding another row
ok = rexi:stream2(#view_row{
key = couch_util:get_value(key, Row),
value = couch_util:get_value(value, Row)
}),
ViewRow = fabric_view_row:from_props(Props, Options),
ok = rexi:stream2(ViewRow),
{ok, Acc};
reduce_cb(complete, Acc) ->
reduce_cb(complete, Acc, _Options) ->
% Finish view output
ok = rexi:stream_last(complete),
{ok, Acc};
reduce_cb(ok, ddoc_updated) ->
reduce_cb(ok, ddoc_updated, _Options) ->
rexi:reply({ok, ddoc_updated});
reduce_cb(ok, insufficient_storage) ->
reduce_cb(ok, insufficient_storage, _Options) ->
rexi:reply({ok, insufficient_storage}).

changes_enumerator(#full_doc_info{} = FDI, Acc) ->
Expand Down
Loading

0 comments on commit 57f1b4b

Please sign in to comment.