Skip to content

Commit c225e75

Browse files
committed
fabric: switch to maps for view rows
The `#view_row{}` record that is used for capturing messages with row data is not flexible enough to have it extended easily. If one wanted to introduce a fresh field, the change would have to be propagated through many functions and modules. Especially, if support for mixed-version clusters is a concern, this would come with some degree of duplication. Leverage Erlang/OTP's built-in maps for mitigating this issue and offer the view callbacks the `view_row_map` Boolean key in `#mrargs.extra` to request this communication format. This way the old record-based format would be still in use unless requested otherwise. This facilitates the smooth interoperability of old coordinators and new workers. In parallel to that, the new coordinator could still receive view rows from old workers.
1 parent 8f6797b commit c225e75

9 files changed

+458
-151
lines changed

src/couch_replicator/src/couch_replicator_fabric.erl

+30-19
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,32 @@ docs_int(DbName, Workers, QueryArgs, Callback, Acc0) ->
8181
{ok, Resp}
8282
end.
8383

84+
handle_row(Row0, {Worker, From} = Source, State) ->
85+
#collector{query_args = Args, counters = Counters0, rows = Rows0} = State,
86+
{Id, Doc} =
87+
case Row0 of
88+
#view_row{id = I, doc = D} -> {I, D};
89+
#{id := I, doc := D} -> {I, D}
90+
end,
91+
case maybe_fetch_and_filter_doc(Id, Doc, State) of
92+
{[_ | _]} = NewDoc ->
93+
Row =
94+
case Row0 of
95+
#view_row{} ->
96+
Row0#view_row{doc = NewDoc, worker = Source};
97+
#{} ->
98+
Row0#{doc => NewDoc, worker => Source}
99+
end,
100+
Dir = Args#mrargs.direction,
101+
Rows = fabric_util:merge_row(Dir, Row, Rows0),
102+
Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
103+
State1 = State#collector{rows = Rows, counters = Counters1},
104+
fabric_view:maybe_send_row(State1);
105+
skip ->
106+
rexi:stream_ack(From),
107+
{ok, State}
108+
end.
109+
84110
handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
85111
fabric_view:check_down_shards(State, NodeRef);
86112
handle_message({rexi_EXIT, Reason}, Worker, State) ->
@@ -120,29 +146,14 @@ handle_message({meta, Meta0}, {Worker, From}, State) ->
120146
user_acc = Acc
121147
}}
122148
end;
123-
handle_message(#view_row{id = Id, doc = Doc} = Row0, {Worker, From}, State) ->
124-
#collector{query_args = Args, counters = Counters0, rows = Rows0} = State,
125-
case maybe_fetch_and_filter_doc(Id, Doc, State) of
126-
{[_ | _]} = NewDoc ->
127-
Row = Row0#view_row{doc = NewDoc},
128-
Dir = Args#mrargs.direction,
129-
Rows = merge_row(Dir, Row#view_row{worker = {Worker, From}}, Rows0),
130-
Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
131-
State1 = State#collector{rows = Rows, counters = Counters1},
132-
fabric_view:maybe_send_row(State1);
133-
skip ->
134-
rexi:stream_ack(From),
135-
{ok, State}
136-
end;
149+
handle_message(#view_row{id = _, doc = _} = Row, {_, _} = Source, State) ->
150+
handle_row(Row, Source, State);
151+
handle_message(#{id := _, doc := _} = Row, {_, _} = Source, State) ->
152+
handle_row(Row, Source, State);
137153
handle_message(complete, Worker, State) ->
138154
Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters),
139155
fabric_view:maybe_send_row(State#collector{counters = Counters}).
140156

141-
merge_row(fwd, Row, Rows) ->
142-
lists:keymerge(#view_row.id, [Row], Rows);
143-
merge_row(rev, Row, Rows) ->
144-
lists:rkeymerge(#view_row.id, [Row], Rows).
145-
146157
maybe_fetch_and_filter_doc(Id, undecided, State) ->
147158
#collector{db_name = DbName, query_args = #mrargs{extra = Extra}} = State,
148159
FilterStates = proplists:get_value(filter_states, Extra),

src/couch_replicator/src/couch_replicator_fabric_rpc.erl

+18-4
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,22 @@ docs(DbName, Options, Args0) ->
2727
Args = Args0#mrargs{skip = 0, limit = Skip + Limit},
2828
HealthThreshold = couch_replicator_scheduler:health_threshold(),
2929
{ok, Db} = couch_db:open_int(DbName, Options),
30-
Acc = {DbName, FilterStates, HealthThreshold},
30+
Acc =
31+
case couch_util:get_value(view_row_map, Extra, false) of
32+
true ->
33+
{DbName, FilterStates, HealthThreshold, view_row_map};
34+
false ->
35+
{DbName, FilterStates, HealthThreshold, view_row_record}
36+
end,
3137
couch_mrview:query_all_docs(Db, Args, fun docs_cb/2, Acc).
3238

3339
docs_cb({meta, Meta}, Acc) ->
3440
ok = rexi:stream2({meta, Meta}),
3541
{ok, Acc};
36-
docs_cb({row, Row}, {DbName, States, HealthThreshold} = Acc) ->
42+
docs_cb({row, Row}, {DbName, States, HealthThreshold, Kind} = Acc) ->
3743
Id = couch_util:get_value(id, Row),
3844
Doc = couch_util:get_value(doc, Row),
39-
ViewRow = #view_row{
45+
ViewRow0 = #view_row{
4046
id = Id,
4147
key = couch_util:get_value(key, Row),
4248
value = couch_util:get_value(value, Row)
@@ -45,7 +51,15 @@ docs_cb({row, Row}, {DbName, States, HealthThreshold} = Acc) ->
4551
skip ->
4652
ok;
4753
Other ->
48-
ok = rexi:stream2(ViewRow#view_row{doc = Other})
54+
ViewRow1 =
55+
case Kind of
56+
view_row_map ->
57+
fabric_util:to_view_row_map(ViewRow0);
58+
view_row_record ->
59+
ViewRow0
60+
end,
61+
ViewRow = fabric_util:row_embed_doc(ViewRow1, Other),
62+
ok = rexi:stream2(ViewRow)
4963
end,
5064
{ok, Acc};
5165
docs_cb(complete, Acc) ->

src/fabric/src/fabric_rpc.erl

+9-2
Original file line numberDiff line numberDiff line change
@@ -491,14 +491,21 @@ view_cb({meta, Meta}, Acc) ->
491491
% Map function starting
492492
ok = rexi:stream2({meta, Meta}),
493493
{ok, Acc};
494-
view_cb({row, Row}, Acc) ->
494+
view_cb({row, Row}, #mrargs{extra = Options} = Acc) ->
495495
% Adding another row
496-
ViewRow = #view_row{
496+
ViewRow0 = #view_row{
497497
id = couch_util:get_value(id, Row),
498498
key = couch_util:get_value(key, Row),
499499
value = couch_util:get_value(value, Row),
500500
doc = couch_util:get_value(doc, Row)
501501
},
502+
ViewRow =
503+
case couch_util:get_value(view_row_map, Options, false) of
504+
true ->
505+
fabric_util:to_view_row_map(ViewRow0);
506+
false ->
507+
ViewRow0
508+
end,
502509
ok = rexi:stream2(ViewRow),
503510
{ok, Acc};
504511
view_cb(complete, Acc) ->

src/fabric/src/fabric_util.erl

+179-33
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,17 @@
3535
-export([worker_ranges/1]).
3636
-export([get_uuid_prefix_len/0]).
3737
-export([isolate/1, isolate/2]).
38+
-export([row_embed_doc/2]).
39+
-export([to_view_row_map/1, to_view_row_record/1]).
40+
-export([row_get_worker/1, row_get_value/1]).
41+
-export([merge_row/3]).
3842

3943
-compile({inline, [{doc_id_and_rev, 1}]}).
4044

45+
-include_lib("fabric/include/fabric.hrl").
4146
-include_lib("mem3/include/mem3.hrl").
4247
-include_lib("couch/include/couch_db.hrl").
4348
-include_lib("couch_mrview/include/couch_mrview.hrl").
44-
-include_lib("eunit/include/eunit.hrl").
4549

4650
remove_down_workers(Workers, BadNode) ->
4751
remove_down_workers(Workers, BadNode, []).
@@ -250,38 +254,6 @@ create_monitors(Shards) ->
250254
MonRefs = lists:usort([rexi_utils:server_pid(N) || #shard{node = N} <- Shards]),
251255
rexi_monitor:start(MonRefs).
252256

253-
%% verify only id and rev are used in key.
254-
update_counter_test() ->
255-
Reply =
256-
{ok, #doc{
257-
id = <<"id">>,
258-
revs = <<"rev">>,
259-
body = <<"body">>,
260-
atts = <<"atts">>
261-
}},
262-
?assertEqual(
263-
[{{<<"id">>, <<"rev">>}, {Reply, 1}}],
264-
update_counter(Reply, 1, [])
265-
).
266-
267-
remove_ancestors_test() ->
268-
Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}},
269-
Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}},
270-
Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}},
271-
Bar2 = {not_found, {1, <<"bar">>}},
272-
?assertEqual(
273-
[kv(Bar1, 1), kv(Foo1, 1)],
274-
remove_ancestors([kv(Bar1, 1), kv(Foo1, 1)], [])
275-
),
276-
?assertEqual(
277-
[kv(Bar1, 1), kv(Foo2, 2)],
278-
remove_ancestors([kv(Bar1, 1), kv(Foo1, 1), kv(Foo2, 1)], [])
279-
),
280-
?assertEqual(
281-
[kv(Bar1, 2)],
282-
remove_ancestors([kv(Bar2, 1), kv(Bar1, 1)], [])
283-
).
284-
285257
is_replicator_db(DbName) ->
286258
path_ends_with(DbName, <<"_replicator">>).
287259

@@ -423,6 +395,103 @@ do_isolate(Fun) ->
423395
{'$isolerr', Tag, Reason, Stack}
424396
end.
425397

398+
row_embed_doc(#view_row{} = Row, Doc) ->
399+
Row#view_row{doc = Doc};
400+
row_embed_doc(#{} = Row, Doc) ->
401+
Row#{doc => Doc}.
402+
403+
add_if_defined(#{} = Map, Key, Value) ->
404+
case Value of
405+
undefined -> Map;
406+
V -> Map#{Key => V}
407+
end.
408+
409+
to_view_row_map(#view_row{key = Key, id = Id, value = Value, doc = Doc, worker = Worker}) ->
410+
Row0 = #{},
411+
Row1 = add_if_defined(Row0, key, Key),
412+
Row2 = add_if_defined(Row1, id, Id),
413+
Row3 = add_if_defined(Row2, value, Value),
414+
Row4 = add_if_defined(Row3, doc, Doc),
415+
add_if_defined(Row4, worker, Worker);
416+
to_view_row_map(#{} = Row) ->
417+
Row.
418+
419+
to_view_row_record(#view_row{} = Row) ->
420+
Row;
421+
to_view_row_record(#{} = Row) ->
422+
Id = maps:get(id, Row, undefined),
423+
Key = maps:get(key, Row, undefined),
424+
Value = maps:get(value, Row, undefined),
425+
Doc = maps:get(doc, Row, undefined),
426+
Worker = maps:get(worker, Row, undefined),
427+
#view_row{id = Id, key = Key, value = Value, doc = Doc, worker = Worker}.
428+
429+
row_get_worker(#view_row{worker = Worker}) ->
430+
Worker;
431+
row_get_worker(#{worker := Worker}) ->
432+
Worker;
433+
row_get_worker(#{}) ->
434+
undefined.
435+
436+
row_get_value(#view_row{value = Value}) ->
437+
Value;
438+
row_get_value(#{value := Value}) ->
439+
Value;
440+
row_get_value(#{}) ->
441+
undefined.
442+
443+
insert_row(_Dir, Row, []) ->
444+
[Row];
445+
insert_row(fwd, #{id := RowId} = Row, [#{id := HeadId} = Head | Tail] = List) ->
446+
case RowId =< HeadId of
447+
true -> [Row | List];
448+
false -> [Head | insert_row(fwd, Row, Tail)]
449+
end;
450+
insert_row(rev, Row, List) ->
451+
lists:reverse(insert_row(fwd, Row, lists:reverse(List))).
452+
453+
merge_row(fwd, #view_row{} = Row, Rows) ->
454+
lists:keymerge(#view_row.id, [Row], Rows);
455+
merge_row(rev, #view_row{} = Row, Rows) ->
456+
lists:rkeymerge(#view_row.id, [Row], Rows);
457+
merge_row(Dir, #{} = Row, Rows) ->
458+
insert_row(Dir, Row, Rows).
459+
460+
-ifdef(TEST).
461+
-include_lib("couch/include/couch_eunit.hrl").
462+
463+
%% verify only id and rev are used in key.
464+
update_counter_test() ->
465+
Reply =
466+
{ok, #doc{
467+
id = <<"id">>,
468+
revs = <<"rev">>,
469+
body = <<"body">>,
470+
atts = <<"atts">>
471+
}},
472+
?assertEqual(
473+
[{{<<"id">>, <<"rev">>}, {Reply, 1}}],
474+
update_counter(Reply, 1, [])
475+
).
476+
477+
remove_ancestors_test() ->
478+
Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}},
479+
Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}},
480+
Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}},
481+
Bar2 = {not_found, {1, <<"bar">>}},
482+
?assertEqual(
483+
[kv(Bar1, 1), kv(Foo1, 1)],
484+
remove_ancestors([kv(Bar1, 1), kv(Foo1, 1)], [])
485+
),
486+
?assertEqual(
487+
[kv(Bar1, 1), kv(Foo2, 2)],
488+
remove_ancestors([kv(Bar1, 1), kv(Foo1, 1), kv(Foo2, 1)], [])
489+
),
490+
?assertEqual(
491+
[kv(Bar1, 2)],
492+
remove_ancestors([kv(Bar2, 1), kv(Bar1, 1)], [])
493+
).
494+
426495
get_db_timeout_test() ->
427496
% Q=1, N=1
428497
?assertEqual(20000, get_db_timeout(1, 2, 100, 60000)),
@@ -466,3 +535,80 @@ get_db_timeout_test() ->
466535
% request_timeout was set to infinity, with enough shards it still gets to
467536
% 100 min timeout at the start from the exponential logic
468537
?assertEqual(100, get_db_timeout(64, 2, 100, infinity)).
538+
539+
merge_row_record_fwd_test() ->
540+
RowX1 = #view_row{id = 4},
541+
Row1 = #view_row{id = 1},
542+
Row2 = #view_row{id = 3},
543+
Row3 = #view_row{id = 5},
544+
Row4 = #view_row{id = 7},
545+
Rows = [Row1, Row2, Row3, Row4],
546+
Expected1 = [Row1, Row2, RowX1, Row3, Row4],
547+
?assertEqual(Expected1, merge_row(fwd, RowX1, Rows)),
548+
RowX2 = #view_row{id = 0},
549+
Expected2 = [RowX2, Row1, Row2, Row3, Row4],
550+
?assertEqual(Expected2, merge_row(fwd, RowX2, Rows)),
551+
RowX3 = #view_row{id = 8},
552+
Expected3 = [Row1, Row2, Row3, Row4, RowX3],
553+
?assertEqual(Expected3, merge_row(fwd, RowX3, Rows)),
554+
RowX4 = #view_row{id = 5},
555+
Expected4 = [Row1, Row2, RowX4, Row3, Row4],
556+
?assertEqual(Expected4, merge_row(fwd, RowX4, Rows)).
557+
558+
merge_row_record_rev_test() ->
559+
RowX1 = #view_row{id = 5},
560+
Row1 = #view_row{id = 2},
561+
Row2 = #view_row{id = 4},
562+
Row3 = #view_row{id = 6},
563+
Row4 = #view_row{id = 8},
564+
Rows = [Row4, Row3, Row2, Row1],
565+
Expected1 = [Row4, Row3, RowX1, Row2, Row1],
566+
?assertEqual(Expected1, merge_row(rev, RowX1, Rows)),
567+
RowX2 = #view_row{id = 1},
568+
Expected2 = [Row4, Row3, Row2, Row1, RowX2],
569+
?assertEqual(Expected2, merge_row(rev, RowX2, Rows)),
570+
RowX3 = #view_row{id = 9},
571+
Expected3 = [RowX3, Row4, Row3, Row2, Row1],
572+
?assertEqual(Expected3, merge_row(rev, RowX3, Rows)),
573+
RowX4 = #view_row{id = 6},
574+
Expected4 = [Row4, Row3, RowX4, Row2, Row1],
575+
?assertEqual(Expected4, merge_row(rev, RowX4, Rows)).
576+
577+
merge_row_map_fwd_test() ->
578+
RowX1 = #{id => 4},
579+
Row1 = #{id => 1},
580+
Row2 = #{id => 3},
581+
Row3 = #{id => 5},
582+
Row4 = #{id => 7},
583+
Rows = [Row1, Row2, Row3, Row4],
584+
Expected1 = [Row1, Row2, RowX1, Row3, Row4],
585+
?assertEqual(Expected1, merge_row(fwd, RowX1, Rows)),
586+
RowX2 = #{id => 0},
587+
Expected2 = [RowX2, Row1, Row2, Row3, Row4],
588+
?assertEqual(Expected2, merge_row(fwd, RowX2, Rows)),
589+
RowX3 = #{id => 8},
590+
Expected3 = [Row1, Row2, Row3, Row4, RowX3],
591+
?assertEqual(Expected3, merge_row(fwd, RowX3, Rows)),
592+
RowX4 = #{id => 5},
593+
Expected4 = [Row1, Row2, RowX4, Row3, Row4],
594+
?assertEqual(Expected4, merge_row(fwd, RowX4, Rows)).
595+
596+
merge_row_map_rev_test() ->
597+
RowX1 = #{id => 5},
598+
Row1 = #{id => 2},
599+
Row2 = #{id => 4},
600+
Row3 = #{id => 6},
601+
Row4 = #{id => 8},
602+
Rows = [Row4, Row3, Row2, Row1],
603+
Expected1 = [Row4, Row3, RowX1, Row2, Row1],
604+
?assertEqual(Expected1, merge_row(rev, RowX1, Rows)),
605+
RowX2 = #{id => 1},
606+
Expected2 = [Row4, Row3, Row2, Row1, RowX2],
607+
?assertEqual(Expected2, merge_row(rev, RowX2, Rows)),
608+
RowX3 = #{id => 9},
609+
Expected3 = [RowX3, Row4, Row3, Row2, Row1],
610+
?assertEqual(Expected3, merge_row(rev, RowX3, Rows)),
611+
RowX4 = #{id => 6},
612+
Expected4 = [Row4, Row3, RowX4, Row2, Row1],
613+
?assertEqual(Expected4, merge_row(rev, RowX4, Rows)).
614+
-endif.

0 commit comments

Comments
 (0)