diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl index ab8e1e9a379..e2bfd5bef54 100644 --- a/src/chttpd/src/chttpd.erl +++ b/src/chttpd/src/chttpd.erl @@ -221,6 +221,7 @@ stop() -> mochiweb_http:stop(?MODULE). handle_request(MochiReq0) -> + couch_util:clear_pdict(), %% Make sure we start clean, everytime erlang:put(?REWRITE_COUNT, 0), MochiReq = couch_httpd_vhost:dispatch_host(MochiReq0), handle_request_int(MochiReq). @@ -323,6 +324,9 @@ handle_request_int(MochiReq) -> % Save client socket so that it can be monitored for disconnects chttpd_util:mochiweb_client_req_set(MochiReq), + %% This is probably better in before_request, but having Path is nice + couch_stats_resource_tracker:create_coordinator_context(HttpReq0, Path), + {HttpReq2, Response} = case before_request(HttpReq0) of {ok, HttpReq1} -> @@ -347,12 +351,14 @@ handle_request_int(MochiReq) -> #httpd_resp{status = ok, response = Resp} -> {ok, Resp}; #httpd_resp{status = aborted, reason = Reason} -> - couch_log:error("Response abnormally terminated: ~p", [Reason]), + couch_log:error("Response abnormally terminated: ~w", [Reason]), exit({shutdown, Reason}) end. before_request(HttpReq) -> try + %% TODO: re-enable this here once we have Path + %% couch_stats_resource_tracker:create_coordinator_context(HttpReq), chttpd_stats:init(), chttpd_plugin:before_request(HttpReq) catch @@ -372,6 +378,7 @@ after_request(HttpReq, HttpResp0) -> HttpResp2 = update_stats(HttpReq, HttpResp1), chttpd_stats:report(HttpReq, HttpResp2), maybe_log(HttpReq, HttpResp2), + %%couch_stats_resource_tracker:close_context(), HttpResp2. process_request(#httpd{mochi_req = MochiReq} = HttpReq) -> @@ -409,6 +416,7 @@ handle_req_after_auth(HandlerKey, HttpReq) -> HandlerKey, fun chttpd_db:handle_request/1 ), + couch_stats_resource_tracker:set_context_handler_fun(HandlerFun), AuthorizedReq = chttpd_auth:authorize( possibly_hack(HttpReq), fun chttpd_auth_request:authorize_request/1 diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl index 9b1aff54f24..8c35910a686 100644 --- a/src/chttpd/src/chttpd_db.erl +++ b/src/chttpd/src/chttpd_db.erl @@ -83,6 +83,7 @@ % Database request handlers handle_request(#httpd{path_parts = [DbName | RestParts], method = Method} = Req) -> + couch_stats_resource_tracker:set_context_dbname(DbName), case {Method, RestParts} of {'PUT', []} -> create_db_req(Req, DbName); @@ -2248,7 +2249,7 @@ monitor_attachments(Att) -> monitor_attachments([Att]). demonitor_refs(Refs) when is_list(Refs) -> - [demonitor(Ref) || Ref <- Refs]. + [demonitor(Ref, [flush]) || Ref <- Refs]. % Return attachments which are not stubs non_stubbed_attachments(Atts) when is_list(Atts) -> diff --git a/src/chttpd/src/chttpd_httpd_handlers.erl b/src/chttpd/src/chttpd_httpd_handlers.erl index 932b52e5f6e..e1b26022204 100644 --- a/src/chttpd/src/chttpd_httpd_handlers.erl +++ b/src/chttpd/src/chttpd_httpd_handlers.erl @@ -20,6 +20,7 @@ url_handler(<<"_utils">>) -> fun chttpd_misc:handle_utils_dir_req/1; url_handler(<<"_all_dbs">>) -> fun chttpd_misc:handle_all_dbs_req/1; url_handler(<<"_dbs_info">>) -> fun chttpd_misc:handle_dbs_info_req/1; url_handler(<<"_active_tasks">>) -> fun chttpd_misc:handle_task_status_req/1; +url_handler(<<"_active_resources">>) -> fun chttpd_misc:handle_resource_status_req/1; url_handler(<<"_scheduler">>) -> fun couch_replicator_httpd:handle_scheduler_req/1; url_handler(<<"_node">>) -> fun chttpd_node:handle_node_req/1; url_handler(<<"_reload_query_servers">>) -> fun chttpd_misc:handle_reload_query_servers_req/1; diff --git a/src/chttpd/src/chttpd_misc.erl b/src/chttpd/src/chttpd_misc.erl index 9df3a881fae..08ce9841840 100644 --- a/src/chttpd/src/chttpd_misc.erl +++ b/src/chttpd/src/chttpd_misc.erl @@ -20,6 +20,7 @@ handle_replicate_req/1, handle_reload_query_servers_req/1, handle_task_status_req/1, + handle_resource_status_req/1, handle_up_req/1, handle_utils_dir_req/1, handle_utils_dir_req/2, @@ -36,8 +37,7 @@ [ send_json/2, send_json/3, send_method_not_allowed/2, - send_chunk/2, - start_chunked_response/3 + send_chunk/2 ] ). @@ -230,6 +230,110 @@ handle_task_status_req(#httpd{method = 'GET'} = Req) -> handle_task_status_req(Req) -> send_method_not_allowed(Req, "GET,HEAD"). +handle_resource_status_req(#httpd{method = 'POST'} = Req) -> + ok = chttpd:verify_is_server_admin(Req), + chttpd:validate_ctype(Req, "application/json"), + {Props} = chttpd:json_body_obj(Req), + Action = proplists:get_value(<<"action">>, Props), + Key = proplists:get_value(<<"key">>, Props), + Val = proplists:get_value(<<"val">>, Props), + + CountBy = fun couch_stats_resource_tracker:count_by/1, + GroupBy = fun couch_stats_resource_tracker:group_by/2, + SortedBy1 = fun couch_stats_resource_tracker:sorted_by/1, + SortedBy2 = fun couch_stats_resource_tracker:sorted_by/2, + ConvertEle = fun(K) -> list_to_existing_atom(binary_to_list(K)) end, + ConvertList = fun(L) -> [ConvertEle(E) || E <- L] end, + ToJson = fun couch_stats_resource_tracker:term_to_flat_json/1, + JsonKeys = fun(PL) -> [[ToJson(K), V] || {K, V} <- PL] end, + + Fun = case {Action, Key, Val} of + {<<"count_by">>, Keys, undefined} when is_list(Keys) -> + Keys1 = [ConvertEle(K) || K <- Keys], + fun() -> CountBy(Keys1) end; + {<<"count_by">>, Key, undefined} -> + Key1 = ConvertEle(Key), + fun() -> CountBy(Key1) end; + {<<"group_by">>, Keys, Vals} when is_list(Keys) andalso is_list(Vals) -> + Keys1 = ConvertList(Keys), + Vals1 = ConvertList(Vals), + fun() -> GroupBy(Keys1, Vals1) end; + {<<"group_by">>, Key, Vals} when is_list(Vals) -> + Key1 = ConvertEle(Key), + Vals1 = ConvertList(Vals), + fun() -> GroupBy(Key1, Vals1) end; + {<<"group_by">>, Keys, Val} when is_list(Keys) -> + Keys1 = ConvertList(Keys), + Val1 = ConvertEle(Val), + fun() -> GroupBy(Keys1, Val1) end; + {<<"group_by">>, Key, Val} -> + Key1 = ConvertEle(Key), + Val1 = ConvertList(Val), + fun() -> GroupBy(Key1, Val1) end; + + {<<"sorted_by">>, Key, undefined} -> + Key1 = ConvertEle(Key), + fun() -> JsonKeys(SortedBy1(Key1)) end; + {<<"sorted_by">>, Keys, undefined} when is_list(Keys) -> + Keys1 = [ConvertEle(K) || K <- Keys], + fun() -> JsonKeys(SortedBy1(Keys1)) end; + {<<"sorted_by">>, Keys, Vals} when is_list(Keys) andalso is_list(Vals) -> + Keys1 = ConvertList(Keys), + Vals1 = ConvertList(Vals), + fun() -> JsonKeys(SortedBy2(Keys1, Vals1)) end; + {<<"sorted_by">>, Key, Vals} when is_list(Vals) -> + Key1 = ConvertEle(Key), + Vals1 = ConvertList(Vals), + fun() -> JsonKeys(SortedBy2(Key1, Vals1)) end; + {<<"sorted_by">>, Keys, Val} when is_list(Keys) -> + Keys1 = ConvertList(Keys), + Val1 = ConvertEle(Val), + fun() -> JsonKeys(SortedBy2(Keys1, Val1)) end; + {<<"sorted_by">>, Key, Val} -> + Key1 = ConvertEle(Key), + Val1 = ConvertList(Val), + fun() -> JsonKeys(SortedBy2(Key1, Val1)) end; + _ -> + throw({badrequest, invalid_resource_request}) + end, + + Fun1 = fun() -> + case Fun() of + Map when is_map(Map) -> + {maps:fold( + fun + (_K,0,A) -> A; %% TODO: Skip 0 value entries? + (K,V,A) -> [{ToJson(K), V} | A] + end, + [], Map)}; + List when is_list(List) -> + List + end + end, + + {Resp, _Bad} = rpc:multicall(erlang, apply, [ + fun() -> + {node(), Fun1()} + end, + [] + ]), + %%io:format("{CSRT}***** GOT RESP: ~p~n", [Resp]), + send_json(Req, {Resp}); +handle_resource_status_req(#httpd{method = 'GET'} = Req) -> + ok = chttpd:verify_is_server_admin(Req), + {Resp, Bad} = rpc:multicall(erlang, apply, [ + fun() -> + {node(), couch_stats_resource_tracker:active()} + end, + [] + ]), + %% TODO: incorporate Bad responses + io:format("ACTIVE RESP: ~p~nBAD RESP: ~p~n", [Resp, Bad]), + send_json(Req, {Resp}); +handle_resource_status_req(Req) -> + ok = chttpd:verify_is_server_admin(Req), + send_method_not_allowed(Req, "GET,HEAD,POST"). + handle_replicate_req(#httpd{method = 'POST', user_ctx = Ctx, req_body = PostBody} = Req) -> chttpd:validate_ctype(Req, "application/json"), %% see HACK in chttpd.erl about replication diff --git a/src/chttpd/test/eunit/chttpd_db_doc_size_tests.erl b/src/chttpd/test/eunit/chttpd_db_doc_size_tests.erl index 01ef16f23e8..91817473d1c 100644 --- a/src/chttpd/test/eunit/chttpd_db_doc_size_tests.erl +++ b/src/chttpd/test/eunit/chttpd_db_doc_size_tests.erl @@ -25,7 +25,7 @@ setup() -> Hashed = couch_passwords:hash_admin_password(?PASS), ok = config:set("admins", ?USER, ?b2l(Hashed), _Persist = false), - ok = config:set("couchdb", "max_document_size", "50"), + ok = config:set("couchdb", "max_document_size", "50", false), TmpDb = ?tempdb(), Addr = config:get("chttpd", "bind_address", "127.0.0.1"), Port = mochiweb_socket_server:get(chttpd, port), diff --git a/src/couch/include/couch_db.hrl b/src/couch/include/couch_db.hrl index 9c1df21b690..d8dbf38ba8d 100644 --- a/src/couch/include/couch_db.hrl +++ b/src/couch/include/couch_db.hrl @@ -127,6 +127,7 @@ meta = [] }). +-define(LOG_UNEXPECTED_MSG(Msg), couch_log:warning("[~p:~p:~p/~p]{~p[~p]} Unexpected message: ~w", [?MODULE, ?LINE, ?FUNCTION_NAME, ?FUNCTION_ARITY, self(), element(2, process_info(self(), message_queue_len)), Msg])). -record(user_ctx, { name=null, diff --git a/src/couch/priv/stats_descriptions.cfg b/src/couch/priv/stats_descriptions.cfg index 2d40518e278..43794bff62b 100644 --- a/src/couch/priv/stats_descriptions.cfg +++ b/src/couch/priv/stats_descriptions.cfg @@ -298,6 +298,10 @@ {type, counter}, {desc, <<"number of couch_server LRU operations skipped">>} ]}. +{[couchdb, couch_server, open], [ + {type, counter}, + {desc, <<"number of couch_server open operations invoked">>} +]}. {[couchdb, query_server, vdu_rejects], [ {type, counter}, {desc, <<"number of rejections by validate_doc_update function">>} @@ -410,10 +414,39 @@ {type, counter}, {desc, <<"number of other requests">>} ]}. +{[couchdb, query_server, js_filter], [ + {type, counter}, + {desc, <<"number of JS filter invocations">>} +]}. +{[couchdb, query_server, js_filtered_docs], [ + {type, counter}, + {desc, <<"number of docs filtered through JS invocations">>} +]}. +{[couchdb, query_server, js_filter_error], [ + {type, counter}, + {desc, <<"number of JS filter invocation errors">>} +]}. {[couchdb, legacy_checksums], [ {type, counter}, {desc, <<"number of legacy checksums found in couch_file instances">>} ]}. +{[couchdb, btree, folds], [ + {type, counter}, + {desc, <<"number of couch btree kv fold callback invocations">>} +]}. +{[couchdb, btree, kp_node], [ + {type, counter}, + {desc, <<"number of couch btree kp_nodes read">>} +]}. +{[couchdb, btree, kv_node], [ + {type, counter}, + {desc, <<"number of couch btree kv_nodes read">>} +]}. +%% CSRT (couch_stats_resource_tracker) stats +{[couchdb, csrt, delta_missing_t0], [ + {type, counter}, + {desc, <<"number of csrt contexts without a proper startime">>} +]}. {[pread, exceed_eof], [ {type, counter}, {desc, <<"number of the attempts to read beyond end of db file">>} diff --git a/src/couch/src/couch_btree.erl b/src/couch/src/couch_btree.erl index b974a22eeca..27b5bc18b2e 100644 --- a/src/couch/src/couch_btree.erl +++ b/src/couch/src/couch_btree.erl @@ -472,6 +472,8 @@ reduce_tree_size(kp_node, NodeSize, [{_K, {_P, _Red, Sz}} | NodeList]) -> get_node(#btree{fd = Fd}, NodePos) -> {ok, {NodeType, NodeList}} = couch_file:pread_term(Fd, NodePos), + %% TODO: wire in csrt tracking + couch_stats:increment_counter([couchdb, btree, NodeType]), {NodeType, NodeList}. write_node(#btree{fd = Fd, compression = Comp} = Bt, NodeType, NodeList) -> @@ -1163,6 +1165,7 @@ stream_kv_node2(Bt, Reds, PrevKVs, [{K, V} | RestKVs], InRange, Dir, Fun, Acc) - false -> {stop, {PrevKVs, Reds}, Acc}; true -> + couch_stats:increment_counter([couchdb, btree, folds]), AssembledKV = assemble(Bt, K, V), case Fun(visit, AssembledKV, {PrevKVs, Reds}, Acc) of {ok, Acc2} -> diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl index 2ef89ced3a6..c7afaa4b39f 100644 --- a/src/couch/src/couch_db.erl +++ b/src/couch/src/couch_db.erl @@ -297,6 +297,7 @@ open_doc(Db, IdOrDocInfo) -> open_doc(Db, IdOrDocInfo, []). open_doc(Db, Id, Options) -> + %% TODO: wire in csrt tracking increment_stat(Db, [couchdb, database_reads]), case open_doc_int(Db, Id, Options) of {ok, #doc{deleted = true} = Doc} -> @@ -1982,6 +1983,7 @@ increment_stat(#db{options = Options}, Stat, Count) when -> case lists:member(sys_db, Options) of true -> + %% TODO: we shouldn't leak resource usage just because it's a sys_db ok; false -> couch_stats:increment_counter(Stat, Count) diff --git a/src/couch/src/couch_query_servers.erl b/src/couch/src/couch_query_servers.erl index 151bdc805ee..3d759594f79 100644 --- a/src/couch/src/couch_query_servers.erl +++ b/src/couch/src/couch_query_servers.erl @@ -542,6 +542,8 @@ filter_docs(Req, Db, DDoc, FName, Docs) -> {ok, filter_docs_int(Db, DDoc, FName, JsonReq, JsonDocs)} catch throw:{os_process_error, {exit_status, 1}} -> + %% TODO: wire in csrt tracking + couch_stats:increment_counter([couchdb, query_server, js_filter_error]), %% batch used too much memory, retry sequentially. Fun = fun(JsonDoc) -> filter_docs_int(Db, DDoc, FName, JsonReq, [JsonDoc]) @@ -550,6 +552,12 @@ filter_docs(Req, Db, DDoc, FName, Docs) -> end. filter_docs_int(Db, DDoc, FName, JsonReq, JsonDocs) -> + %% Count usage in _int version as this can be repeated for OS error + %% Pros & cons... might not have actually processed `length(JsonDocs)` docs + %% but it certainly undercounts if we count in `filter_docs/5` above + %% TODO: wire in csrt tracking + couch_stats:increment_counter([couchdb, query_server, js_filter]), + couch_stats:increment_counter([couchdb, query_server, js_filtered_docs], length(JsonDocs)), [true, Passes] = ddoc_prompt( Db, DDoc, diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl index 7dbbe4af11c..c6c244ad411 100644 --- a/src/couch/src/couch_server.erl +++ b/src/couch/src/couch_server.erl @@ -97,6 +97,8 @@ sup_start_link(N) -> gen_server:start_link({local, couch_server(N)}, couch_server, [N], []). open(DbName, Options) -> + %% TODO: wire in csrt tracking + couch_stats:increment_counter([couchdb, couch_server, open]), try validate_open_or_create(DbName, Options), open_int(DbName, Options) diff --git a/src/couch/src/couch_util.erl b/src/couch/src/couch_util.erl index 739df28e59d..0bc9a94bae7 100644 --- a/src/couch/src/couch_util.erl +++ b/src/couch/src/couch_util.erl @@ -46,6 +46,7 @@ -export([verify_hash_names/2]). -export([get_config_hash_algorithms/0]). -export([remove_sensitive_data/1]). +-export([clear_pdict/0, clear_pdict/1]). -include_lib("couch/include/couch_db.hrl"). @@ -870,3 +871,31 @@ remove_sensitive_data(KVList) -> KVList1 = lists:keyreplace(<<"password">>, 1, KVList, {<<"password">>, <<"****">>}), % some KVList entries are atoms, so test fo this too lists:keyreplace(password, 1, KVList1, {password, <<"****">>}). + +-spec clear_pdict() -> ok. +clear_pdict() -> + clear_pdict(erlang:get()). + +%% Exclude mochiweb markers, otherwise just use erlang:erase/0 +-spec clear_pdict(list()) -> ok. +clear_pdict([]) -> + ok; +clear_pdict([{mochiweb_request_body, _V} | Rest]) -> + clear_pdict(Rest); +clear_pdict([{mochiweb_request_body_length, _V} | Rest]) -> + clear_pdict(Rest); +clear_pdict([{mochiweb_request_cookie, _V} | Rest]) -> + clear_pdict(Rest); +clear_pdict([{mochiweb_request_force_close, _V} | Rest]) -> + clear_pdict(Rest); +clear_pdict([{mochiweb_request_path, _V} | Rest]) -> + clear_pdict(Rest); +clear_pdict([{mochiweb_request_post, _V} | Rest]) -> + clear_pdict(Rest); +clear_pdict([{mochiweb_request_qs, _V} | Rest]) -> + clear_pdict(Rest); +clear_pdict([{mochiweb_request_recv, _V} | Rest]) -> + clear_pdict(Rest); +clear_pdict([{Key, _V} | Rest]) -> + erlang:erase(Key), + clear_pdict(Rest). diff --git a/src/couch_stats/src/couch_stats.app.src b/src/couch_stats/src/couch_stats.app.src index a54fac7349f..de9f00e4e70 100644 --- a/src/couch_stats/src/couch_stats.app.src +++ b/src/couch_stats/src/couch_stats.app.src @@ -13,8 +13,12 @@ {application, couch_stats, [ {description, "Simple statistics collection"}, {vsn, git}, - {registered, [couch_stats_aggregator, couch_stats_process_tracker]}, - {applications, [kernel, stdlib]}, + {registered, [ + couch_stats_aggregator, + couch_stats_process_tracker, + couch_stats_resource_tracker + ]}, + {applications, [kernel, stdlib, couch_log]}, {mod, {couch_stats_app, []}}, {env, []} ]}. diff --git a/src/couch_stats/src/couch_stats.erl b/src/couch_stats/src/couch_stats.erl index 29a4024491f..29190e6b003 100644 --- a/src/couch_stats/src/couch_stats.erl +++ b/src/couch_stats/src/couch_stats.erl @@ -24,6 +24,12 @@ update_gauge/2 ]). +%% couch_stats_resource_tracker API +-export([ + create_context/3, + maybe_track_rexi_init_p/1 +]). + -type response() :: ok | {error, unknown_metric} | {error, invalid_metric}. -type stat() :: {any(), [{atom(), any()}]}. @@ -49,6 +55,11 @@ increment_counter(Name) -> -spec increment_counter(any(), pos_integer()) -> response(). increment_counter(Name, Value) -> + %% Should maybe_track_local happen before or after notify? + %% If after, only currently tracked metrics declared in the app's + %% stats_description.cfg will be trackable locally. Pros/cons. + %io:format("NOTIFY_EXISTING_METRIC: ~p || ~p || ~p~n", [Name, Op, Type]), + ok = maybe_track_local_counter(Name, Value), case couch_stats_util:get_counter(Name, stats()) of {ok, Ctx} -> couch_stats_counter:increment(Ctx, Value); {error, Error} -> {error, Error} @@ -100,6 +111,25 @@ stats() -> now_sec() -> erlang:monotonic_time(second). +%% Only potentially track positive increments to counters +-spec maybe_track_local_counter(any(), any()) -> ok. +maybe_track_local_counter(Name, Val) when is_integer(Val) andalso Val > 0 -> + %%io:format("maybe_track_local[~p]: ~p~n", [Val, Name]), + couch_stats_resource_tracker:maybe_inc(Name, Val), + ok; +maybe_track_local_counter(_, _) -> + ok. + +create_context(From, MFA, Nonce) -> + couch_stats_resource_tracker:create_context(From, MFA, Nonce). + +maybe_track_rexi_init_p({M, F, _A}) -> + Metric = [M, F, spawned], + case couch_stats_resource_tracker:should_track(Metric) of + true -> increment_counter(Metric); + false -> ok + end. + -ifdef(TEST). -include_lib("couch/include/couch_eunit.hrl"). diff --git a/src/couch_stats/src/couch_stats_resource_tracker.erl b/src/couch_stats/src/couch_stats_resource_tracker.erl new file mode 100644 index 00000000000..0d8e4164edb --- /dev/null +++ b/src/couch_stats/src/couch_stats_resource_tracker.erl @@ -0,0 +1,1027 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_stats_resource_tracker). + +-behaviour(gen_server). + +-export([ + start_link/0, + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3, + terminate/2 +]). + +-export([ + inc/1, inc/2, + maybe_inc/2, + get_pid_ref/0, + accumulate_delta/1 +]). + +-export([ + create_context/0, create_context/1, create_context/3, + create_coordinator_context/1, create_coordinator_context/2, + is_enabled/0, + get_resource/0, + get_resource/1, + set_context_dbname/1, + set_context_handler_fun/1, + set_context_username/1, + track/1, + should_track/1 +]). + +-export([ + active/0, + active_coordinators/0, + active_workers/0, + find_unmonitored/0 +]). + +-export([ + count_by/1, + group_by/2, + group_by/3, + group_by/4, + sorted/1, + sorted_by/1, + sorted_by/2, + sorted_by/3, + + find_by_pid/1, + + unsafe_foldl/3, + + term_to_flat_json/1 +]). + +-export([ + make_delta/0 +]). + +%% Singular increment operations +-export([ + db_opened/0, + doc_read/0, + row_read/0, + btree_fold/0, + ioq_called/0, + js_evaled/0, + js_filtered/0, + js_filtered_error/0, + js_filtered_doc/0, + mango_match_evaled/0, + get_kv_node/0, + get_kp_node/0 +]). + +%% Plural increment operations +-export([ + js_filtered_docs/1, + io_bytes_read/1, + io_bytes_written/1 +]). + +-export([ + field/2, + curry_field/1 +]). + +-include_lib("couch/include/couch_db.hrl"). + +%% Use these for record upgrades over the wire and in ETS tables +%% TODO: alternatively, just delete these. Currently using a map +%% for shipping deltas over the wire, avoiding much of the +%% problem here. We'll likely still need to handle upgrades to +%% map format over time, so let's decide a course of action here. +-define(RCTX_V1, rctx_v1). +-define(RCTX, ?RCTX_V1). + +-define(MANGO_EVAL_MATCH, mango_eval_match). +-define(DB_OPEN_DOC, docs_read). +-define(DB_OPEN, db_open). +-define(COUCH_SERVER_OPEN, db_open). +-define(COUCH_BT_FOLDS, btree_folds). +-define(COUCH_BT_GET_KP_NODE, get_kp_node). +-define(COUCH_BT_GET_KV_NODE, get_kv_node). +-define(COUCH_JS_FILTER, js_filter). +-define(COUCH_JS_FILTER_ERROR, js_filter_error). +-define(COUCH_JS_FILTERED_DOCS, js_filtered_docs). +-define(ROWS_READ, rows_read). + +%% TODO: overlap between this and couch btree fold invocations +%% TODO: need some way to distinguish fols on views vs find vs all_docs +-define(FRPC_CHANGES_ROW, changes_processed). +-define(FRPC_CHANGES_RETURNED, changes_returned). +%%-define(FRPC_CHANGES_ROW, ?ROWS_READ). + +%% Module pdict markers +-define(DELTA_TA, csrt_delta_ta). +-define(DELTA_TZ, csrt_delta_tz). %% T Zed instead of T0 +-define(PID_REF, csrt_pid_ref). %% track local ID + + +-record(st, { + eviction_delay = 10 * 1000, %% How many ms dead processes are visible + scan_interval = 2048, %% How regularly to perfom scans + tracking = #{} %% track active processes for eventual eviction +}). + + +%% TODO: switch to: +%% -record(?RCTX, { +-record(rctx, { + %% Metadata + started_at = tnow(), + updated_at = tnow(), + exited_at, %% TODO: do we need a final exit time and additional update times afterwards? + pid_ref, + mon_ref, + mfa, + nonce, + from, + type = unknown, %% unknown/background/system/rpc/coordinator/fabric_rpc/etc_rpc/etc + state = alive, + dbname, + username, + + %% Stats counters + db_open = 0, + docs_read = 0, + rows_read = 0, + btree_folds = 0, + changes_processed = 0, + changes_returned = 0, + ioq_calls = 0, + io_bytes_read = 0, + io_bytes_written = 0, + js_evals = 0, + js_filter = 0, + js_filter_error = 0, + js_filtered_docs = 0, + mango_eval_match = 0, + %% TODO: switch record definitions to be macro based, eg: + %% ?COUCH_BT_GET_KP_NODE = 0, + get_kv_node = 0, + get_kp_node = 0 +}). + +%% monotonic time now in millisecionds +tnow() -> + erlang:monotonic_time(millisecond). + +is_enabled() -> + config:get_boolean(?MODULE_STRING, "enabled", true). + +db_opened() -> inc(db_opened). +doc_read() -> inc(docs_read). +row_read() -> inc(rows_read). +btree_fold() -> inc(?COUCH_BT_FOLDS). +%% TODO: do we need ioq_called and this access pattern? +ioq_called() -> is_enabled() andalso inc(ioq_calls). +js_evaled() -> inc(js_evals). +js_filtered() -> inc(js_filter). +js_filtered_error() -> inc(js_filter_error). +js_filtered_doc() -> inc(js_filtered_docs). +mango_match_evaled() -> inc(mango_eval_match). +get_kv_node() -> inc(get_kv_node). +get_kp_node() -> inc(get_kp_node). + +js_filtered_docs(N) -> inc(js_filtered_docs, N). +io_bytes_read(N) -> inc(io_bytes_read, N). +io_bytes_written(N) -> inc(io_bytes_written, N). + +inc(?DB_OPEN) -> + inc(?DB_OPEN, 1); +inc(docs_read) -> + inc(docs_read, 1); +inc(?ROWS_READ) -> + inc(?ROWS_READ, 1); +inc(?FRPC_CHANGES_RETURNED) -> + inc(?FRPC_CHANGES_RETURNED, 1); +inc(?COUCH_BT_FOLDS) -> + inc(?COUCH_BT_FOLDS, 1); +inc(ioq_calls) -> + inc(ioq_calls, 1); +inc(io_bytes_read) -> + inc(io_bytes_read, 1); +inc(io_bytes_written) -> + inc(io_bytes_written, 1); +inc(js_evals) -> + inc(js_evals, 1); +inc(?COUCH_JS_FILTER) -> + inc(?COUCH_JS_FILTER, 1); +inc(?COUCH_JS_FILTER_ERROR) -> + inc(?COUCH_JS_FILTER_ERROR, 1); +inc(?COUCH_JS_FILTERED_DOCS) -> + inc(?COUCH_JS_FILTERED_DOCS, 1); +inc(?MANGO_EVAL_MATCH) -> + inc(?MANGO_EVAL_MATCH, 1); +inc(?COUCH_BT_GET_KP_NODE) -> + inc(?COUCH_BT_GET_KP_NODE, 1); +inc(?COUCH_BT_GET_KV_NODE) -> + inc(?COUCH_BT_GET_KV_NODE, 1); +inc(_) -> + 0. + + +inc(?DB_OPEN, N) -> + update_counter(#rctx.?DB_OPEN, N); +inc(?ROWS_READ, N) -> + update_counter(#rctx.?ROWS_READ, N); +inc(?FRPC_CHANGES_RETURNED, N) -> + update_counter(#rctx.?FRPC_CHANGES_RETURNED, N); +inc(ioq_calls, N) -> + update_counter(#rctx.ioq_calls, N); +inc(io_bytes_read, N) -> + update_counter(#rctx.io_bytes_read, N); +inc(io_bytes_written, N) -> + update_counter(#rctx.io_bytes_written, N); +inc(js_evals, N) -> + update_counter(#rctx.js_evals, N); +inc(?COUCH_JS_FILTER, N) -> + update_counter(#rctx.?COUCH_JS_FILTER, N); +inc(?COUCH_JS_FILTER_ERROR, N) -> + update_counter(#rctx.?COUCH_JS_FILTER_ERROR, N); +inc(?COUCH_JS_FILTERED_DOCS, N) -> + update_counter(#rctx.?COUCH_JS_FILTERED_DOCS, N); +inc(?MANGO_EVAL_MATCH, N) -> + update_counter(#rctx.?MANGO_EVAL_MATCH, N); +inc(?DB_OPEN_DOC, N) -> + update_counter(#rctx.?DB_OPEN_DOC, N); +inc(?FRPC_CHANGES_ROW, N) -> + update_counter(#rctx.?ROWS_READ, N); %% TODO: rework double use of rows_read +inc(?COUCH_BT_GET_KP_NODE, N) -> + update_counter(#rctx.?COUCH_BT_GET_KP_NODE, N); +inc(?COUCH_BT_GET_KV_NODE, N) -> + update_counter(#rctx.?COUCH_BT_GET_KV_NODE, N); +inc(_, _) -> + %% inc needs to allow unknown types to pass for accumulate_update to handle + %% updates from nodes with newer data formats + 0. + +maybe_inc([mango, evaluate_selector], Val) -> + inc(?MANGO_EVAL_MATCH, Val); +maybe_inc([couchdb, database_reads], Val) -> + inc(?DB_OPEN_DOC, Val); +maybe_inc([fabric_rpc, changes, processed], Val) -> + inc(?FRPC_CHANGES_ROW, Val); +maybe_inc([fabric_rpc, changes, returned], Val) -> + inc(?FRPC_CHANGES_RETURNED, Val); +maybe_inc([fabric_rpc, view, rows_read], Val) -> + inc(?ROWS_READ, Val); +maybe_inc([couchdb, couch_server, open], Val) -> + inc(?DB_OPEN, Val); +maybe_inc([couchdb, btree, folds], Val) -> + inc(?COUCH_BT_FOLDS, Val); +maybe_inc([couchdb, btree, kp_node], Val) -> + inc(?COUCH_BT_GET_KP_NODE, Val); +maybe_inc([couchdb, btree, kv_node], Val) -> + inc(?COUCH_BT_GET_KV_NODE, Val); +maybe_inc([couchdb, query_server, js_filter_error], Val) -> + inc(?COUCH_JS_FILTER_ERROR, Val); +maybe_inc([couchdb, query_server, js_filter], Val) -> + inc(?COUCH_JS_FILTER, Val); +maybe_inc([couchdb, query_server, js_filtered_docs], Val) -> + inc(?COUCH_JS_FILTERED_DOCS, Val); +maybe_inc(_Metric, _Val) -> + %%io:format("SKIPPING MAYBE_INC METRIC[~p]: ~p~n", [Val, Metric]), + 0. + + +%% TODO: update stats_descriptions.cfg for relevant apps +should_track([fabric_rpc, all_docs, spawned]) -> + is_enabled(); +should_track([fabric_rpc, changes, spawned]) -> + is_enabled(); +should_track([fabric_rpc, changes, processed]) -> + is_enabled(); +should_track([fabric_rpc, changes, returned]) -> + is_enabled(); +should_track([fabric_rpc, map_view, spawned]) -> + is_enabled(); +should_track([fabric_rpc, reduce_view, spawned]) -> + is_enabled(); +should_track([fabric_rpc, get_all_security, spawned]) -> + is_enabled(); +should_track([fabric_rpc, open_doc, spawned]) -> + is_enabled(); +should_track([fabric_rpc, update_docs, spawned]) -> + is_enabled(); +should_track([fabric_rpc, open_shard, spawned]) -> + is_enabled(); +should_track([mango_cursor, view, all_docs]) -> + is_enabled(); +should_track([mango_cursor, view, idx]) -> + is_enabled(); +should_track(_Metric) -> + %%io:format("SKIPPING METRIC: ~p~n", [Metric]), + false. + +accumulate_delta(Delta) when is_map(Delta) -> + %% TODO: switch to creating a batch of updates to invoke a single + %% update_counter rather than sequentially invoking it for each field + is_enabled() andalso maps:foreach(fun inc/2, Delta); +accumulate_delta(undefined) -> + ok; +accumulate_delta(Other) -> + io:format("CSRT:ACC_DELTA UNKNOWN DELTA: ~p~n", [Other]). + + +update_counter(Field, Count) -> + is_enabled() andalso update_counter(get_pid_ref(), Field, Count). + + +update_counter({_Pid,_Ref}=PidRef, Field, Count) -> + %% TODO: mem3 crashes without catch, why do we lose the stats table? + is_enabled() andalso catch ets:update_counter(?MODULE, PidRef, {Field, Count}, #rctx{pid_ref=PidRef}). + + +active() -> active_int(all). +active_coordinators() -> active_int(coordinators). +active_workers() -> active_int(workers). + + +active_int(coordinators) -> + select_by_type(coordinators); +active_int(workers) -> + select_by_type(workers); +active_int(all) -> + lists:map(fun to_flat_json/1, ets:tab2list(?MODULE)). + + +select_by_type(coordinators) -> + ets:select(couch_stats_resource_tracker, + [{#rctx{type = {coordinator,'_','_'}, _ = '_'}, [], ['$_']}]); +select_by_type(workers) -> + ets:select(couch_stats_resource_tracker, + [{#rctx{type = {worker,'_','_'}, _ = '_'}, [], ['$_']}]); +select_by_type(all) -> + lists:map(fun to_flat_json/1, ets:tab2list(?MODULE)). + + +field(#rctx{pid_ref=Val}, pid_ref) -> Val; +%% NOTE: Pros and cons to doing these convert functions here +%% Ideally, this would be done later so as to prefer the core data structures +%% as long as possible, but we currently need the output of this function to +%% be jiffy:encode'able. The tricky bit is dynamically encoding the group_by +%% structure provided by the caller of *_by aggregator functions below. +%% For now, we just always return jiffy:encode'able data types. +field(#rctx{mfa=Val}, mfa) -> convert_mfa(Val); +field(#rctx{nonce=Val}, nonce) -> Val; +field(#rctx{from=Val}, from) -> Val; +field(#rctx{type=Val}, type) -> convert_type(Val); +field(#rctx{state=Val}, state) -> Val; +field(#rctx{dbname=Val}, dbname) -> Val; +field(#rctx{username=Val}, username) -> Val; +field(#rctx{db_open=Val}, db_open) -> Val; +field(#rctx{docs_read=Val}, docs_read) -> Val; +field(#rctx{rows_read=Val}, rows_read) -> Val; +field(#rctx{btree_folds=Val}, btree_folds) -> Val; +field(#rctx{changes_processed=Val}, changes_processed) -> Val; +field(#rctx{changes_returned=Val}, changes_returned) -> Val; +field(#rctx{ioq_calls=Val}, ioq_calls) -> Val; +field(#rctx{io_bytes_read=Val}, io_bytes_read) -> Val; +field(#rctx{io_bytes_written=Val}, io_bytes_written) -> Val; +field(#rctx{js_evals=Val}, js_evals) -> Val; +field(#rctx{js_filter=Val}, js_filter) -> Val; +field(#rctx{js_filter_error=Val}, js_filter_error) -> Val; +field(#rctx{js_filtered_docs=Val}, js_filtered_docs) -> Val; +field(#rctx{mango_eval_match=Val}, mango_eval_match) -> Val; +field(#rctx{get_kv_node=Val}, get_kv_node) -> Val; +field(#rctx{get_kp_node=Val}, get_kp_node) -> Val. + + +curry_field(Field) -> + fun(Ele) -> field(Ele, Field) end. + + +count_by(KeyFun) -> + group_by(KeyFun, fun(_) -> 1 end). + + +group_by(KeyFun, ValFun) -> + group_by(KeyFun, ValFun, fun erlang:'+'/2). + + +group_by(KeyFun, ValFun, AggFun) -> + group_by(KeyFun, ValFun, AggFun, fun ets:foldl/3). + + +%% eg: group_by(mfa, docs_read). +%% eg: group_by(fun(#rctx{mfa=MFA,docs_read=DR}) -> {MFA, DR} end, ioq_calls). +%% eg: ^^ or: group_by([mfa, docs_read], ioq_calls). +%% eg: group_by([username, dbname, mfa], docs_read). +%% eg: group_by([username, dbname, mfa], ioq_calls). +%% eg: group_by([username, dbname, mfa], js_filters). +group_by(KeyL, ValFun, AggFun, Fold) when is_list(KeyL) -> + KeyFun = fun(Ele) -> list_to_tuple([field(Ele, Key) || Key <- KeyL]) end, + group_by(KeyFun, ValFun, AggFun, Fold); +group_by(Key, ValFun, AggFun, Fold) when is_atom(Key) -> + group_by(curry_field(Key), ValFun, AggFun, Fold); +group_by(KeyFun, Val, AggFun, Fold) when is_atom(Val) -> + group_by(KeyFun, curry_field(Val), AggFun, Fold); +group_by(KeyFun, ValFun, AggFun, Fold) -> + FoldFun = fun(Ele, Acc) -> + Key = KeyFun(Ele), + Val = ValFun(Ele), + CurrVal = maps:get(Key, Acc, 0), + NewVal = AggFun(CurrVal, Val), + %% TODO: should we skip here? how to make this optional? + case NewVal > 0 of + true -> + maps:put(Key, NewVal, Acc); + false -> + Acc + end + end, + Fold(FoldFun, #{}, ?MODULE). + + +%% Sorts largest first +sorted(Map) when is_map(Map) -> + lists:sort(fun({_K1, A}, {_K2, B}) -> B < A end, maps:to_list(Map)). + +shortened(L) -> + lists:sublist(L, 10). + + +%% eg: sorted_by([username, dbname, mfa], ioq_calls) +%% eg: sorted_by([dbname, mfa], doc_reads) +sorted_by(KeyFun) -> shortened(sorted(count_by(KeyFun))). +sorted_by(KeyFun, ValFun) -> shortened(sorted(group_by(KeyFun, ValFun))). +sorted_by(KeyFun, ValFun, AggFun) -> shortened(sorted(group_by(KeyFun, ValFun, AggFun))). + + +term_to_flat_json({shutdown, Reason0}) when is_atom(Reason0) -> + Reason = atom_to_binary(Reason0), + <<"shutdown: ", Reason/binary>>; +term_to_flat_json({type, Atom}) when is_atom(Atom) -> + atom_to_binary(Atom); +term_to_flat_json({type, {coordinator, Verb0, Path0}}=_Type) -> + Verb = atom_to_binary(Verb0), + Path = list_to_binary(Path0), + <<"coordinator:", Verb/binary, ":", Path/binary>>; +term_to_flat_json({type, {worker, M0, F0}}=_Type) -> + M = atom_to_binary(M0), + F = atom_to_binary(F0), + <<"worker:", M/binary, ":", F/binary>>; +term_to_flat_json(Tuple) when is_tuple(Tuple) -> + erlang:tuple_to_list(Tuple); +term_to_flat_json(Pid) when is_pid(Pid) -> + ?l2b(pid_to_list(Pid)); +term_to_flat_json(Ref) when is_reference(Ref) -> + ?l2b(ref_to_list(Ref)); +term_to_flat_json(Atom) when is_atom(Atom) -> + atom_to_binary(Atom); +term_to_flat_json(undefined) -> + null; +term_to_flat_json(null) -> + null; +term_to_flat_json(T) -> + T. + +to_flat_json(#rctx{}=Rctx) -> + #rctx{ + updated_at = TP, + started_at = TInit, + pid_ref = {Pid0, Ref0}, + mfa = MFA0, + nonce = Nonce0, + from = From0, + dbname = DbName, + username = UserName, + db_open = DbOpens, + docs_read = DocsRead, + rows_read = RowsRead, + js_filter = JSFilters, + js_filter_error = JSFilterErrors, + js_filtered_docs = JSFilteredDocss, + state = State0, + type = Type, + get_kp_node = KpNodes, + get_kv_node = KvNodes, + btree_folds = ChangesProcessed, + changes_returned = ChangesReturned, + ioq_calls = IoqCalls + } = Rctx, + Pid = term_to_flat_json(Pid0), + Ref = term_to_flat_json(Ref0), + PidRef = <>, + MFA = case MFA0 of + {M0, F0, A0} -> + M = atom_to_binary(M0), + F = atom_to_binary(F0), + A = integer_to_binary(A0), + <>; + MFA0 when is_list(MFA0) -> + MFA0; + undefined -> + null; + OtherMFA -> + throw({error, {unexpected, OtherMFA}}) + end, + From = case From0 of + {Parent0, ParentRef0} -> + Parent = term_to_flat_json(Parent0), + ParentRef = term_to_flat_json(ParentRef0), + <>; + undefined -> + null + end, + State = case State0 of + alive -> + alive; + {down, Reason0} -> + Reason = term_to_flat_json(Reason0), + <<"down: ", Reason/binary>> + end, + Nonce = case Nonce0 of + undefined -> + null; + Nonce0 -> + list_to_binary(Nonce0) + end, + #{ + updated_at => TP, + started_at => TInit, + pid_ref => PidRef, + mfa => MFA, + nonce => Nonce, + from => From, + dbname => DbName, + username => UserName, + db_open => DbOpens, + docs_read => DocsRead, + js_filter => JSFilters, + js_filter_error => JSFilterErrors, + js_filtered_docs => JSFilteredDocss, + rows_read => RowsRead, + state => State, + type => term_to_flat_json({type, Type}), + kp_nodes => KpNodes, + kv_nodes => KvNodes, + btree_folds => ChangesProcessed, + changes_returned => ChangesReturned, + ioq_calls => IoqCalls + }. + + +convert_mfa({M0, F0, A0}) -> + M = atom_to_binary(M0), + F = atom_to_binary(F0), + A = integer_to_binary(A0), + <>; +convert_mfa(undefined) -> + null. + +convert_type(Atom) when is_atom(Atom) -> + atom_to_binary(Atom); +convert_type({coordinator, Verb0, Path0}) -> + Verb = atom_to_binary(Verb0), + Path = list_to_binary(Path0), + <<"coordinator:", Verb/binary, ":", Path/binary>>; +convert_type({worker, M0, F0}) -> + M = atom_to_binary(M0), + F = atom_to_binary(F0), + <<"worker:", M/binary, ":", F/binary>>. + +get_pid_ref() -> + case get(?PID_REF) of + undefined -> + Ref = make_ref(), + set_pid_ref({self(), Ref}); + PidRef -> + PidRef + end. + + +create_context() -> + is_enabled() andalso create_context(self()). + + +create_context(Pid) -> + case is_enabled() of + false -> + ok; + true -> + Ref = make_ref(), + Rctx = make_record(Pid, Ref), + track(Rctx), + create_resource(Rctx), + Rctx + end. + + +create_resource(#rctx{} = Rctx) -> + %% true = ets:insert(?MODULE, Rctx). + catch ets:insert(?MODULE, Rctx). + +%% add type to disnguish coordinator vs rpc_worker +create_context(From, {M,F,_A} = MFA, Nonce) -> + case is_enabled() of + false -> + ok; + true -> + PidRef = get_pid_ref(), %% this will instantiate a new PidRef + %% TODO: extract user_ctx and db/shard from + Rctx = #rctx{ + pid_ref = PidRef, + from = From, + mfa = MFA, + type = {worker, M, F}, + nonce = Nonce + }, + track(Rctx), + erlang:put(?DELTA_TZ, Rctx), + create_resource(Rctx), + Rctx + end. + +create_coordinator_context(#httpd{path_parts=Parts} = Req) -> + is_enabled() andalso create_coordinator_context(Req, io_lib:format("~p", [Parts])). + +create_coordinator_context(#httpd{} = Req, Path) -> + case is_enabled() of + false -> + ok; + true -> + #httpd{ + method = Verb, + nonce = Nonce + } = Req, + PidRef = get_pid_ref(), %% this will instantiate a new PidRef + Rctx = #rctx{ + pid_ref = PidRef, + type = {coordinator, Verb, [$/ | Path]}, + nonce = Nonce + }, + track(Rctx), + erlang:put(?DELTA_TZ, Rctx), + create_resource(Rctx), + Rctx + end. + +set_context_dbname(DbName) -> + case is_enabled() of + false -> + ok; + true -> + catch case ets:update_element(?MODULE, get_pid_ref(), [{#rctx.dbname, DbName}]) of + false -> + Stk = try throw(42) catch _:_:Stk0 -> Stk0 end, + io:format("UPDATING DBNAME[~p] FAILURE WITH CONTEXT: ~p AND STACK:~n~pFOO:: ~p~n~n", [DbName, get_resource(), Stk, process_info(self(), current_stacktrace)]), + timer:sleep(1000), + erlang:halt(kaboomz); + true -> + true + end + end. + +set_context_handler_fun(Fun) when is_function(Fun) -> + case is_enabled() of + false -> + ok; + true -> + FunName = erlang:fun_to_list(Fun), + catch case ets:update_element(?MODULE, get_pid_ref(), [{#rctx.mfa, FunName}]) of + false -> + Stk = try throw(42) catch _:_:Stk0 -> Stk0 end, + io:format("UPDATING HANDLER FUN[~p] FAILURE WITH CONTEXT: ~p AND STACK:~n~pFOO:: ~p~n~n", [FunName, get_resource(), Stk, process_info(self(), current_stacktrace)]), + timer:sleep(1000), + erlang:halt(kaboomz); + true -> + true + end + end. + +set_context_username(null) -> + ok; +set_context_username(UserName) -> + case is_enabled() of + false -> + ok; + true -> + catch case ets:update_element(?MODULE, get_pid_ref(), [{#rctx.username, UserName}]) of + false -> + Stk = try throw(42) catch _:_:Stk0 -> Stk0 end, + io:format("UPDATING USERNAME[~p] FAILURE WITH CONTEXT: ~p AND STACK:~n~pFOO:: ~p~n~n", [UserName, get_resource(), Stk, process_info(self(), current_stacktrace)]), + timer:sleep(1000), + erlang:halt(kaboomz); + true -> + true + end + end. + +track(#rctx{}=Rctx) -> + %% TODO: should this block or not? If no, what cleans up zombies? + %% gen_server:call(?MODULE, {track, PR}). + gen_server:cast(?MODULE, {track, Rctx}). + + +make_delta() -> + TA = case get(?DELTA_TA) of + undefined -> + %% Need to handle this better, can't just make a new T0 at T' as + %% the timestamps will be identical causing a divide by zero error. + %% + %% Realistically need to ensure that all invocations of database + %% operations sets T0 appropriately. Perhaps it's possible to do + %% this is the couch_db:open chain, and then similarly, in + %% couch_server, and uhhhh... couch_file, and... + %% + %% I think we need some type of approach for establishing a T0 that + %% doesn't result in outrageous deltas. For now zero out the + %% microseconds field, or subtract a second on the off chance that + %% microseconds is zero. I'm not uptodate on the latest Erlang time + %% libraries and don't remember how to easily get an + %% `os:timestamp()` out of now() - 100ms or some such. + %% + %% I think it's unavoidable that we'll have some codepaths that do + %% not properly instantiate the T0 at spawn resulting in needing to + %% do some time of "time warp" or ignoring the timing collection + %% entirely. Perhaps if we hoisted out the stats collection into + %% the primary flow of the database and funnel that through all the + %% function clauses we could then utilize Dialyzer to statically + %% analyze and assert all code paths that invoke database + %% operations have properly instantinated a T0 at the appropriate + %% start time such that we don't have to "fudge" deltas with a + %% missing start point, but we're a long ways from that happening + %% so I feel it necessary to address the NULL start time. + + %% Track how often we fail to initiate T0 correctly + %% Perhaps somewhat naughty we're incrementing stats from within + %% couch_stats itself? Might need to handle this differently + %% TODO: determine appropriate course of action here + %% io:format("~n**********MISSING STARTING DELTA************~n~n", []), + couch_stats:increment_counter( + [couchdb, csrt, delta_missing_t0]), + %%[couch_stats_resource_tracker, delta_missing_t0]), + + case erlang:get(?DELTA_TZ) of + undefined -> + TA0 = make_delta_base(), + %% TODO: handline missing deltas, otherwise divide by zero + set_delta_a(TA0), + TA0; + TA0 -> + TA0 + end; + #rctx{} = TA0 -> + TA0 + end, + TB = get_resource(), + Delta = make_delta(TA, TB), + set_delta_a(TB), + Delta. + + +make_delta(#rctx{}=TA, #rctx{}=TB) -> + Delta = #{ + docs_read => TB#rctx.docs_read - TA#rctx.docs_read, + js_filter => TB#rctx.js_filter - TA#rctx.js_filter, + js_filter_error => TB#rctx.js_filter_error - TA#rctx.js_filter_error, + js_filtered_docs => TB#rctx.js_filtered_docs - TA#rctx.js_filtered_docs, + rows_read => TB#rctx.rows_read - TA#rctx.rows_read, + changes_returned => TB#rctx.changes_returned - TA#rctx.changes_returned, + btree_folds => TB#rctx.btree_folds - TA#rctx.btree_folds, + get_kp_node => TB#rctx.get_kp_node - TA#rctx.get_kp_node, + get_kv_node => TB#rctx.get_kv_node - TA#rctx.get_kv_node, + db_open => TB#rctx.db_open - TA#rctx.db_open, + ioq_calls => TB#rctx.ioq_calls - TA#rctx.ioq_calls, + dt => TB#rctx.updated_at - TA#rctx.updated_at + }, + %% TODO: reevaluate this decision + %% Only return non zero (and also positive) delta fields + maps:filter(fun(_K,V) -> V > 0 end, Delta); +make_delta(_, #rctx{}) -> + #{error => missing_beg_rctx}; +make_delta(#rctx{}, _) -> + #{error => missing_fin_rctx}. + +make_delta_base() -> + Ref = make_ref(), + %% TODO: extract user_ctx and db/shard from request + Now = tnow(), + #rctx{ + pid_ref = {self(), Ref}, + started_at = Now - 100, %% give us 100ms rewind time for missing T0 + updated_at = Now + }. + +set_delta_a(TA) -> + erlang:put(?DELTA_TA, TA). + +set_pid_ref(PidRef) -> + erlang:put(?PID_REF, PidRef), + PidRef. + +get_resource() -> + get_resource(get_pid_ref()). + +get_resource(PidRef) -> + catch case ets:lookup(?MODULE, PidRef) of + [#rctx{}=TP] -> + TP; + [] -> + undefined + end. + +make_record(Pid, Ref) -> + #rctx{pid_ref = {Pid, Ref}}. + + +find_unmonitored() -> + %% TODO: only need PidRef here, replace with a select that does that... + [PR || #rctx{pid_ref=PR} <- ets:match_object(?MODULE, #rctx{mon_ref=undefined, _ = '_'})]. + + +find_by_pid(Pid) -> + [R || #rctx{} = R <- ets:match_object(?MODULE, #rctx{pid_ref={Pid, '_'}, _ = '_'})]. + + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init([]) -> + ets:new(?MODULE, [ + named_table, + public, + {decentralized_counters, true}, %% TODO: test impact of this + {write_concurrency, true}, + {read_concurrency, true}, + {keypos, #rctx.pid_ref} + ]), + St = #st{}, + case is_enabled() of + false -> + ok; + true -> + _TimerRef = erlang:send_after(St#st.scan_interval, self(), scan) + end, + {ok, St}. + +handle_call(fetch, _from, #st{} = St) -> + {reply, {ok, St}, St}; +handle_call({track, _}=Msg, _From, St) -> + {noreply, St1} = handle_cast(Msg, St), + {reply, ok, St1}; +handle_call(Msg, _From, St) -> + {stop, {unknown_call, Msg}, error, St}. + +handle_cast({track, #rctx{pid_ref=PidRef}}, #st{tracking=AT0} = St0) -> + AT = maybe_track(PidRef, AT0), + {noreply, St0#st{tracking=AT}}; +handle_cast(Msg, St) -> + {stop, {unknown_cast, Msg}, St}. + +handle_info(scan, #st{tracking=AT0} = St0) -> + Unmonitored = find_unmonitored(), + AT = maybe_track(Unmonitored, AT0), + _TimerRef = erlang:send_after(St0#st.scan_interval, self(), scan), + {noreply, St0#st{tracking=AT}}; +handle_info({'DOWN', MonRef, _Type, DPid, Reason0}, #st{tracking=AT0} = St0) -> + %% io:format("CSRT:HI(~p)~n", [{'DOWN', MonRef, Type, DPid, Reason}]), + St = case maps:get(MonRef, AT0, undefined) of + undefined -> + io:format("ERROR: UNEXPECTED MISSING MONITOR IN TRACKING TABLE: {~p, ~p}~n", [MonRef, DPid]), + St0; + {RPid, _Ref} = PidRef -> + if + RPid =:= DPid -> ok; + true -> erlang:halt(io_lib:format("CSRT:HI PID MISMATCH ABORT: ~p =/= ~p~n", [DPid, RPid])) + end, + %% remove double bookkeeping + AT = maps:remove(MonRef, maps:remove(PidRef, AT0)), + %% TODO: Assert Pid matches Object + %% update process state in live table + %% TODO: decide whether we want the true match to crash this process on failure + %% true = ets:update_element(?MODULE, PidRef, + Reason = case Reason0 of + {shutdown, Shutdown0} -> + Shutdown = atom_to_binary(Shutdown0), + <<"shutdown: ", Shutdown/binary>>; + Reason0 -> + Reason0 + end, + ets:update_element(?MODULE, PidRef, + [{#rctx.state, {down, Reason}}, {#rctx.updated_at, tnow()}]), + log_process_lifetime_report(PidRef), + %% Delay eviction to allow human visibility on short lived pids + erlang:send_after(St0#st.eviction_delay, self(), {evict, PidRef}), + St0#st{tracking=AT} + end, + {noreply, St}; +handle_info({evict, {_Pid, _Ref}=PidRef}, #st{}=St) -> + ets:delete(?MODULE, PidRef), + {noreply, St}; +handle_info(Msg, St) -> + {stop, {unknown_info, Msg}, St}. + +terminate(_Reason, _St) -> + ok. + +code_change(_OldVsn, St, _Extra) -> + {ok, St}. + + +maybe_track([], AT) -> + AT; +maybe_track(PidRef, AT) when is_tuple(PidRef) -> + maybe_track([PidRef], AT); +maybe_track([{Pid,_Ref} = PidRef | PidRefs], AT) -> + AT1 = case maps:is_key(PidRef, AT) of + true -> %% noop, we're already tracking this PidRef + AT; + false -> %% setup new monitor and double bookkeep refs + Mon = erlang:monitor(process, Pid), + %% TODO: decide whether we want the true match to crash this process on failure + %%true = ets:update_element(?MODULE, PidRef, [{#rctx.mon_ref, Mon}]), + ets:update_element(?MODULE, PidRef, [{#rctx.mon_ref, Mon}]), + maps:put(Mon, PidRef, maps:put(PidRef, Mon, AT)) + end, + maybe_track(PidRefs, AT1). + +log_process_lifetime_report(PidRef) -> + %% More safely assert this can't ever be undefined + #rctx{} = Rctx = get_resource(PidRef), + %% TODO: catch error out of here, report crashes on depth>1 json + %%io:format("CSRT RCTX: ~p~n", [to_flat_json(Rctx)]), + case is_enabled() andalso should_log(Rctx) of + true -> + couch_log:report("csrt-pid-usage-lifetime", to_flat_json(Rctx)); + false -> + ok + end. + + +logging_enabled() -> + case conf_get("log_pid_usage_report", "coordinator") of + "coordinator" -> + coordinator; + "true" -> + true; + _ -> + false + end. + + +should_log(#rctx{}=Rctx) -> + should_log(Rctx, logging_enabled()). + + +should_log(#rctx{}, true) -> + true; +should_log(#rctx{}, false) -> + false; +should_log(#rctx{type = {coordinator, _, _}}, coordinator) -> + true; +should_log(#rctx{type = {worker, fabric_rpc, FName}}, _) -> + case conf_get("log_fabric_rpc") of + "true" -> + true; + undefined -> + false; + Name -> + Name =:= atom_to_list(FName) + end; +should_log(#rctx{}, _) -> + false. + + +conf_get(Key) -> + conf_get(Key, undefined). + + +conf_get(Key, Default) -> + config:get(?MODULE_STRING, Key, Default). + + +%% Reimplementation of: https://github.com/erlang/otp/blob/b2ee4fc9a0b81a139dad2033e9b2bfc178146886/lib/stdlib/src/ets.erl#L633-L658 +%% with wrapping of ets:safe_fixtable/2 removed +unsafe_foldl(F, Accu, T) -> + First = ets:first(T), + do_foldl(F, Accu, First, T). + +do_foldl(F, Accu0, Key, T) -> + case Key of + '$end_of_table' -> + Accu0; + _ -> + do_foldl(F, + lists:foldl(F, Accu0, ets:lookup(T, Key)), + ets:next(T, Key), T) + end. diff --git a/src/couch_stats/src/couch_stats_sup.erl b/src/couch_stats/src/couch_stats_sup.erl index 325372c3e4b..4b4df17e26a 100644 --- a/src/couch_stats/src/couch_stats_sup.erl +++ b/src/couch_stats/src/couch_stats_sup.erl @@ -29,6 +29,7 @@ init([]) -> { {one_for_one, 5, 10}, [ ?CHILD(couch_stats_server, worker), + ?CHILD(couch_stats_resource_tracker, worker), ?CHILD(couch_stats_process_tracker, worker) ] }}. diff --git a/src/fabric/priv/stats_descriptions.cfg b/src/fabric/priv/stats_descriptions.cfg index d12aa0c8480..9ab054bf038 100644 --- a/src/fabric/priv/stats_descriptions.cfg +++ b/src/fabric/priv/stats_descriptions.cfg @@ -26,3 +26,53 @@ {type, counter}, {desc, <<"number of write quorum errors">>} ]}. + + +%% fabric_rpc worker stats +%% TODO: decide on which naming scheme: +%% {[fabric_rpc, get_all_security, spawned], [ +%% {[fabric_rpc, spawned, get_all_security], [ +{[fabric_rpc, get_all_security, spawned], [ + {type, counter}, + {desc, <<"number of fabric_rpc worker get_all_security spawns">>} +]}. +{[fabric_rpc, open_doc, spawned], [ + {type, counter}, + {desc, <<"number of fabric_rpc worker open_doc spawns">>} +]}. +{[fabric_rpc, all_docs, spawned], [ + {type, counter}, + {desc, <<"number of fabric_rpc worker all_docs spawns">>} +]}. +{[fabric_rpc, update_docs, spawned], [ + {type, counter}, + {desc, <<"number of fabric_rpc worker update_docs spawns">>} +]}. +{[fabric_rpc, map_view, spawned], [ + {type, counter}, + {desc, <<"number of fabric_rpc worker map_view spawns">>} +]}. +{[fabric_rpc, reduce_view, spawned], [ + {type, counter}, + {desc, <<"number of fabric_rpc worker reduce_view spawns">>} +]}. +{[fabric_rpc, open_shard, spawned], [ + {type, counter}, + {desc, <<"number of fabric_rpc worker open_shard spawns">>} +]}. +{[fabric_rpc, changes, spawned], [ + {type, counter}, + {desc, <<"number of fabric_rpc worker changes spawns">>} +]}. +{[fabric_rpc, changes, processed], [ + {type, counter}, + {desc, <<"number of fabric_rpc worker changes row invocations">>} +]}. +{[fabric_rpc, changes, returned], [ + {type, counter}, + {desc, <<"number of fabric_rpc worker changes rows returned">>} +]}. +{[fabric_rpc, view, rows_read], [ + {type, counter}, + {desc, <<"number of fabric_rpc view_cb row invocations">>} +]}. diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index d01f1f5a749..6fa990e711b 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -492,6 +492,11 @@ view_cb({meta, Meta}, Acc) -> ok = rexi:stream2({meta, Meta}), {ok, Acc}; view_cb({row, Row}, Acc) -> + %% TODO: distinguish between rows and docs + %% TODO: wire in csrt tracking + %% TODO: distinguish between all_docs vs view call + couch_stats:increment_counter([fabric_rpc, view, rows_read]), + %%couch_stats_resource_tracker:inc(rows_read), % Adding another row ViewRow = #view_row{ id = couch_util:get_value(id, Row), @@ -535,6 +540,7 @@ changes_enumerator(#full_doc_info{} = FDI, Acc) -> changes_enumerator(#doc_info{id = <<"_local/", _/binary>>, high_seq = Seq}, Acc) -> {ok, Acc#fabric_changes_acc{seq = Seq, pending = Acc#fabric_changes_acc.pending - 1}}; changes_enumerator(DocInfo, Acc) -> + couch_stats:increment_counter([fabric_rpc, changes, processed]), #fabric_changes_acc{ db = Db, args = #changes_args{ @@ -555,6 +561,7 @@ changes_enumerator(DocInfo, Acc) -> {seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}} ]}; Results -> + couch_stats:increment_counter([fabric_rpc, changes, returned]), Opts = if Conflicts -> [conflicts | DocOptions]; @@ -660,6 +667,14 @@ clean_stack(S) -> ). set_io_priority(DbName, Options) -> + couch_stats_resource_tracker:set_context_dbname(DbName), + %% TODO: better approach here than using proplists? + case proplists:get_value(user_ctx, Options) of + undefined -> + ok; + #user_ctx{name = UserName} -> + couch_stats_resource_tracker:set_context_username(UserName) + end, case lists:keyfind(io_priority, 1, Options) of {io_priority, Pri} -> erlang:put(io_priority, Pri); diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl index 4acb65c739a..e4f5d793f4b 100644 --- a/src/fabric/src/fabric_util.erl +++ b/src/fabric/src/fabric_util.erl @@ -139,6 +139,9 @@ get_shard([#shard{node = Node, name = Name} | Rest], Opts, Timeout, Factor) -> receive {Ref, {ok, Db}} -> {ok, Db}; + {Ref, {ok, Db}, {delta, Delta}} -> + couch_stats_resource_tracker:accumulate_delta(Delta), + {ok, Db}; {Ref, {'rexi_EXIT', {{unauthorized, _} = Error, _}}} -> throw(Error); {Ref, {'rexi_EXIT', {{forbidden, _} = Error, _}}} -> diff --git a/src/fabric/test/eunit/fabric_rpc_purge_tests.erl b/src/fabric/test/eunit/fabric_rpc_purge_tests.erl index 07e6b1d4220..c7a36fbe342 100644 --- a/src/fabric/test/eunit/fabric_rpc_purge_tests.erl +++ b/src/fabric/test/eunit/fabric_rpc_purge_tests.erl @@ -263,6 +263,8 @@ rpc_update_doc(DbName, Doc, Opts) -> Reply = test_util:wait(fun() -> receive {Ref, Reply} -> + Reply; + {Ref, Reply, {delta, _}} -> Reply after 0 -> wait diff --git a/src/fabric/test/eunit/fabric_rpc_tests.erl b/src/fabric/test/eunit/fabric_rpc_tests.erl index 16bb66badac..c402affbab0 100644 --- a/src/fabric/test/eunit/fabric_rpc_tests.erl +++ b/src/fabric/test/eunit/fabric_rpc_tests.erl @@ -101,7 +101,16 @@ t_no_config_db_create_fails_for_shard_rpc(DbName) -> receive Resp0 -> Resp0 end, - ?assertMatch({Ref, {'rexi_EXIT', {{error, missing_target}, _}}}, Resp). + case couch_stats_resource_tracker:is_enabled() of + true -> + ?assertMatch( %% allow for {Ref, {rexi_EXIT, error}, {delta, D}} + {Ref, {'rexi_EXIT', {{error, missing_target}, _}}, _}, + Resp); + false -> + ?assertMatch( + {Ref, {'rexi_EXIT', {{error, missing_target}, _}}}, + Resp) + end. t_db_create_with_config(DbName) -> MDbName = mem3:dbname(DbName), diff --git a/src/mango/src/mango_cursor_view.erl b/src/mango/src/mango_cursor_view.erl index 41006ce7794..a19d650b401 100644 --- a/src/mango/src/mango_cursor_view.erl +++ b/src/mango/src/mango_cursor_view.erl @@ -229,9 +229,11 @@ execute(#cursor{db = Db, index = Idx, execution_stats = Stats} = Cursor0, UserFu Result = case mango_idx:def(Idx) of all_docs -> + couch_stats:increment_counter([mango_cursor, view, all_docs]), CB = fun ?MODULE:handle_all_docs_message/2, fabric:all_docs(Db, DbOpts, CB, Cursor, Args); _ -> + couch_stats:increment_counter([mango_cursor, view, idx]), CB = fun ?MODULE:handle_message/2, % Normal view DDoc = ddocid(Idx), diff --git a/src/mango/src/mango_selector.erl b/src/mango/src/mango_selector.erl index 59be7a6ebaf..bde297a15ab 100644 --- a/src/mango/src/mango_selector.erl +++ b/src/mango/src/mango_selector.erl @@ -50,6 +50,7 @@ normalize(Selector) -> % This assumes that the Selector has been normalized. % Returns true or false. match(Selector, D) -> + %% TODO: wire in csrt tracking couch_stats:increment_counter([mango, evaluate_selector]), match_int(Selector, D). diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl index 60c24e1d40e..e4006c62cbe 100644 --- a/src/mem3/src/mem3_rpc.erl +++ b/src/mem3/src/mem3_rpc.erl @@ -378,20 +378,34 @@ rexi_call(Node, MFA, Timeout) -> Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]), Ref = rexi:cast(Node, self(), MFA, [sync]), try - receive - {Ref, {ok, Reply}} -> - Reply; - {Ref, Error} -> - erlang:error(Error); - {rexi_DOWN, Mon, _, Reason} -> - erlang:error({rexi_DOWN, {Node, Reason}}) - after Timeout -> - erlang:error(timeout) - end + wait_message(Node, Ref, Mon, Timeout) after rexi_monitor:stop(Mon) end. +wait_message(Node, Ref, Mon, Timeout) -> + receive + Msg -> + process_raw_message(Msg, Node, Ref, Mon, Timeout) + after Timeout -> + erlang:error(timeout) + end. + +process_raw_message(Msg0, Node, Ref, Mon, Timeout) -> + {Msg, Delta} = rexi_utils:extract_delta(Msg0), + couch_stats_resource_tracker:accumulate_delta(Delta), + case Msg of + {Ref, {ok, Reply}} -> + Reply; + {Ref, Error} -> + erlang:error(Error); + {rexi_DOWN, Mon, _, Reason} -> + erlang:error({rexi_DOWN, {Node, Reason}}); + Other -> + ?LOG_UNEXPECTED_MSG(Other), + wait_message(Node, Ref, Mon, Timeout) + end. + get_or_create_db(DbName, Options) -> mem3_util:get_or_create_db_int(DbName, Options). diff --git a/src/rexi/include/rexi.hrl b/src/rexi/include/rexi.hrl index a2d86b2ab54..a962f306917 100644 --- a/src/rexi/include/rexi.hrl +++ b/src/rexi/include/rexi.hrl @@ -11,6 +11,7 @@ % the License. -record(error, { + delta, timestamp, reason, mfa, diff --git a/src/rexi/src/rexi.erl b/src/rexi/src/rexi.erl index 77830996e4b..4bb5e9f6913 100644 --- a/src/rexi/src/rexi.erl +++ b/src/rexi/src/rexi.erl @@ -129,7 +129,8 @@ async_server_call(Server, Caller, Request) -> -spec reply(any()) -> any(). reply(Reply) -> {Caller, Ref} = get(rexi_from), - erlang:send(Caller, {Ref, Reply}). + %%erlang:send(Caller, {Ref, Reply, get_delta()}). + erlang:send(Caller, maybe_add_delta({Ref, Reply})). %% @equiv sync_reply(Reply, 300000) sync_reply(Reply) -> @@ -214,7 +215,8 @@ stream(Msg, Limit, Timeout) -> {ok, Count} -> put(rexi_unacked, Count + 1), {Caller, Ref} = get(rexi_from), - erlang:send(Caller, {Ref, self(), Msg}), + %%erlang:send(Caller, {Ref, self(), Msg, get_delta()}), + erlang:send(Caller, maybe_add_delta({Ref, self(), Msg})), ok catch throw:timeout -> @@ -243,7 +245,7 @@ stream2(Msg, Limit, Timeout) -> {ok, Count} -> put(rexi_unacked, Count + 1), {Caller, Ref} = get(rexi_from), - erlang:send(Caller, {Ref, self(), Msg}), + erlang:send(Caller, maybe_add_delta({Ref, self(), Msg})), ok catch throw:timeout -> @@ -265,17 +267,23 @@ stream_last(Msg, Timeout) -> %% @equiv stream_ack(Client, 1) stream_ack(Client) -> + %% stream_ack is coordinator side only, no need to send worker deltas erlang:send(Client, {rexi_ack, 1}). %% @doc Ack streamed messages stream_ack(Client, N) -> + %% stream_ack is coordinator side only, no need to send worker deltas erlang:send(Client, {rexi_ack, N}). %% Sends a ping message to the coordinator. This is for long running %% operations on a node that could exceed the rexi timeout ping() -> {Caller, _} = get(rexi_from), - erlang:send(Caller, {rexi, '$rexi_ping'}). + %% It is essential ping/0 includes deltas as otherwise long running + %% filtered queries will be silent on usage until they finally return + %% a row or no results. This delay is proportional to the database size, + %% so instead we make sure ping/0 keeps live stats flowing. + erlang:send(Caller, maybe_add_delta({rexi, '$rexi_ping'})). %% internal functions %% @@ -328,3 +336,23 @@ drain_acks(Count) -> after 0 -> {ok, Count} end. + +get_delta() -> + {delta, couch_stats_resource_tracker:make_delta()}. + +maybe_add_delta(T) -> + case couch_stats_resource_tracker:is_enabled() of + false -> + T; + true -> + add_delta(T, get_delta()) + end. + +add_delta({A}, Delta) -> {A, Delta}; +add_delta({A, B}, Delta) -> {A, B, Delta}; +add_delta({A, B, C}, Delta) -> {A, B, C, Delta}; +add_delta({A, B, C, D}, Delta) -> {A, B, C, D, Delta}; +add_delta({A, B, C, D, E}, Delta) -> {A, B, C, D, E, Delta}; +add_delta({A, B, C, D, E, F}, Delta) -> {A, B, C, D, E, F, Delta}; +add_delta({A, B, C, D, E, F, G}, Delta) -> {A, B, C, D, E, F, G, Delta}; +add_delta(T, _Delta) -> T. diff --git a/src/rexi/src/rexi_monitor.erl b/src/rexi/src/rexi_monitor.erl index 7fe66db71d4..72f0985df80 100644 --- a/src/rexi/src/rexi_monitor.erl +++ b/src/rexi/src/rexi_monitor.erl @@ -35,6 +35,7 @@ start(Procs) -> %% messages from our mailbox. -spec stop(pid()) -> ok. stop(MonitoringPid) -> + unlink(MonitoringPid), MonitoringPid ! {self(), shutdown}, flush_down_messages(). diff --git a/src/rexi/src/rexi_server.erl b/src/rexi/src/rexi_server.erl index 52489a9c5ef..b8aa92b5d82 100644 --- a/src/rexi/src/rexi_server.erl +++ b/src/rexi/src/rexi_server.erl @@ -101,12 +101,12 @@ handle_info({'DOWN', Ref, process, Pid, Error}, #st{workers = Workers} = St) -> case find_worker(Ref, Workers) of #job{worker_pid = Pid, worker = Ref, client_pid = CPid, client = CRef} = Job -> case Error of - #error{reason = {_Class, Reason}, stack = Stack} -> - notify_caller({CPid, CRef}, {Reason, Stack}), + #error{reason = {_Class, Reason}, stack = Stack, delta = Delta} -> + notify_caller({CPid, CRef}, {Reason, Stack}, Delta), St1 = save_error(Error, St), {noreply, remove_job(Job, St1)}; _ -> - notify_caller({CPid, CRef}, Error), + notify_caller({CPid, CRef}, Error, edelta), {noreply, remove_job(Job, St)} end; false -> @@ -136,9 +136,12 @@ init_p(From, MFA) -> string() | undefined ) -> any(). init_p(From, {M, F, A}, Nonce) -> + MFA = {M, F, length(A)}, put(rexi_from, From), - put('$initial_call', {M, F, length(A)}), + put('$initial_call', MFA), put(nonce, Nonce), + couch_stats:create_context(From, MFA, Nonce), + couch_stats:maybe_track_rexi_init_p(MFA), try apply(M, F, A) catch @@ -202,8 +205,14 @@ find_worker(Ref, Tab) -> [Worker] -> Worker end. -notify_caller({Caller, Ref}, Reason) -> - rexi_utils:send(Caller, {Ref, {rexi_EXIT, Reason}}). +notify_caller({Caller, Ref}, Reason, Delta) -> + Msg = case couch_stats_resource_tracker:is_enabled() of + true -> + {Ref, {rexi_EXIT, Reason}, {delta, Delta}}; + false -> + {Ref, {rexi_EXIT, Reason}} + end, + rexi_utils:send(Caller, Msg). kill_worker(FromRef, #st{clients = Clients} = St) -> case find_worker(FromRef, Clients) of diff --git a/src/rexi/src/rexi_utils.erl b/src/rexi/src/rexi_utils.erl index d59c5ea0f1d..56c5a69b87a 100644 --- a/src/rexi/src/rexi_utils.erl +++ b/src/rexi/src/rexi_utils.erl @@ -14,6 +14,8 @@ -export([server_id/1, server_pid/1, send/2, recv/6]). +-export([extract_delta/1]). + %% @doc Return a rexi_server id for the given node. server_id(Node) -> case config:get_boolean("rexi", "server_per_node", true) of @@ -65,6 +67,16 @@ process_mailbox(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> receive + Msg -> + process_raw_message(Msg, RefList, Keypos, Fun, Acc0, TimeoutRef) + after PerMsgTO -> + {timeout, Acc0} + end. + +process_raw_message(Payload0, RefList, Keypos, Fun, Acc0, TimeoutRef) -> + {Payload, Delta} = extract_delta(Payload0), + couch_stats_resource_tracker:accumulate_delta(Delta), + case Payload of {timeout, TimeoutRef} -> {timeout, Acc0}; {rexi, Ref, Msg} -> @@ -100,6 +112,13 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> end; {rexi_DOWN, _, _, _} = Msg -> Fun(Msg, nil, Acc0) - after PerMsgTO -> - {timeout, Acc0} end. + +extract_delta({A, {delta, Delta}}) -> {{A}, Delta}; +extract_delta({A, B, {delta, Delta}}) -> {{A, B}, Delta}; +extract_delta({A, B, C, {delta, Delta}}) -> {{A, B, C}, Delta}; +extract_delta({A, B, C, D, {delta, Delta}}) -> {{A, B, C, D}, Delta}; +extract_delta({A, B, C, D, E, {delta, Delta}}) -> {{A, B, C, D, E}, Delta}; +extract_delta({A, B, C, D, E, F, {delta, Delta}}) -> {{A, B, C, D, E, F}, Delta}; +extract_delta({A, B, C, D, E, F, G, {delta, Delta}}) -> {{A, B, C, D, E, F, G}, Delta}; +extract_delta(T) -> {T, undefined}.