From ecda8f3270223223fe9ad8d6e80c1e95bad08657 Mon Sep 17 00:00:00 2001 From: Russell Branca Date: Mon, 30 Oct 2023 15:35:45 -0700 Subject: [PATCH] Add more usage tracking and core functionality --- src/chttpd/src/chttpd.erl | 4 + src/chttpd/src/chttpd_db.erl | 1 + .../src/couch_stats_resource_tracker.erl | 150 ++++++++++++++++-- src/fabric/src/fabric_rpc.erl | 9 ++ src/rexi/src/rexi.erl | 9 +- src/rexi/src/rexi_utils.erl | 4 + 6 files changed, 158 insertions(+), 19 deletions(-) diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl index ab8e1e9a379..fdd7855f0e9 100644 --- a/src/chttpd/src/chttpd.erl +++ b/src/chttpd/src/chttpd.erl @@ -323,6 +323,8 @@ handle_request_int(MochiReq) -> % Save client socket so that it can be monitored for disconnects chttpd_util:mochiweb_client_req_set(MochiReq), + %% This is probably better in before_request, but having Path is nice + couch_stats_resource_tracker:create_coordinator_context(HttpReq0, Path), {HttpReq2, Response} = case before_request(HttpReq0) of {ok, HttpReq1} -> @@ -353,6 +355,8 @@ handle_request_int(MochiReq) -> before_request(HttpReq) -> try + %% TODO: re-enable this here once we have Path + %% couch_stats_resource_tracker:create_coordinator_context(HttpReq), chttpd_stats:init(), chttpd_plugin:before_request(HttpReq) catch diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl index 9b1aff54f24..7538c452196 100644 --- a/src/chttpd/src/chttpd_db.erl +++ b/src/chttpd/src/chttpd_db.erl @@ -83,6 +83,7 @@ % Database request handlers handle_request(#httpd{path_parts = [DbName | RestParts], method = Method} = Req) -> + couch_stats_resource_tracker:set_context_dbname(DbName), case {Method, RestParts} of {'PUT', []} -> create_db_req(Req, DbName); diff --git a/src/couch_stats/src/couch_stats_resource_tracker.erl b/src/couch_stats/src/couch_stats_resource_tracker.erl index 8e9feeca1b0..b74eb5a10d2 100644 --- a/src/couch_stats/src/couch_stats_resource_tracker.erl +++ b/src/couch_stats/src/couch_stats_resource_tracker.erl @@ -33,12 +33,17 @@ -export([ create_context/0, create_context/1, create_context/3, + create_coordinator_context/1, create_coordinator_context/2, + set_context_dbname/1, + set_context_username/1, track/1, should_track/1 ]). -export([ - active/0 + active/0, + active_coordinators/0, + active_workers/0 ]). -export([ @@ -93,6 +98,7 @@ %% TODO: overlap between this and couch btree fold invocations %% TODO: need some way to distinguish fols on views vs find vs all_docs -define(FRPC_CHANGES_ROW, changes_processed). +%%-define(FRPC_CHANGES_ROW, ?ROWS_READ). %% Module pdict markers -define(DELTA_TA, csrt_delta_ta). @@ -109,6 +115,7 @@ %% TODO: switch to: %% -record(?RCTX, { -record(rctx, { + %% Metadata updated_at = os:timestamp(), exited_at, pid_ref, @@ -116,6 +123,11 @@ nonce, from, type = unknown, %% unknown/background/system/rpc/coordinator/fabric_rpc/etc_rpc/etc + state = alive, + dbname, + username, + + %% Stats counters db_open = 0, docs_read = 0, rows_read = 0, @@ -132,8 +144,7 @@ %% TODO: switch record definitions to be macro based, eg: %% ?COUCH_BT_GET_KP_NODE = 0, get_kv_node = 0, - get_kp_node = 0, - state = alive + get_kp_node = 0 }). db_opened() -> inc(db_opened). @@ -208,7 +219,7 @@ inc(?MANGO_EVAL_MATCH, N) -> inc(?DB_OPEN_DOC, N) -> update_counter(#rctx.?DB_OPEN_DOC, N); inc(?FRPC_CHANGES_ROW, N) -> - update_counter(#rctx.?FRPC_CHANGES_ROW, N); + update_counter(#rctx.?ROWS_READ, N); %% TODO: rework double use of rows_read inc(?COUCH_BT_GET_KP_NODE, N) -> update_counter(#rctx.?COUCH_BT_GET_KP_NODE, N); inc(?COUCH_BT_GET_KV_NODE, N) -> @@ -238,8 +249,8 @@ maybe_inc([couchdb, query_server, js_filter], Val) -> inc(?COUCH_JS_FILTER, Val); maybe_inc([couchdb, query_server, js_filtered_docs], Val) -> inc(?COUCH_JS_FILTERED_DOCS, Val); -maybe_inc(Metric, Val) -> - io:format("SKIPPING MAYBE_INC METRIC[~p]: ~p~n", [Val, Metric]), +maybe_inc(_Metric, _Val) -> + %%io:format("SKIPPING MAYBE_INC METRIC[~p]: ~p~n", [Val, Metric]), 0. @@ -248,6 +259,8 @@ should_track([fabric_rpc, all_docs, spawned]) -> true; should_track([fabric_rpc, changes, spawned]) -> true; +should_track([fabric_rpc, changes, processed]) -> + true; should_track([fabric_rpc, map_view, spawned]) -> true; should_track([fabric_rpc, reduce_view, spawned]) -> @@ -283,7 +296,26 @@ update_counter({_Pid,_Ref}=Key, Field, Count) -> ets:update_counter(?MODULE, Key, {Field, Count}, #rctx{pid_ref=Key}). -active() -> +active() -> active_int(all). +active_coordinators() -> active_int(coordinators). +active_workers() -> active_int(workers). + + +active_int(coordinators) -> + select_by_type(coordinators); +active_int(workers) -> + select_by_type(workers); +active_int(all) -> + lists:map(fun to_json/1, ets:tab2list(?MODULE)). + + +select_by_type(coordinators) -> + ets:select(couch_stats_resource_tracker, + [{#rctx{type = {coordinator,'_','_'}, _ = '_'}, [], ['$_']}]); +select_by_type(workers) -> + ets:select(couch_stats_resource_tracker, + [{#rctx{type = {worker,'_','_'}, _ = '_'}, [], ['$_']}]); +select_by_type(all) -> lists:map(fun to_json/1, ets:tab2list(?MODULE)). @@ -294,11 +326,17 @@ to_json(#rctx{}=Rctx) -> mfa = MFA0, nonce = Nonce0, from = From0, + dbname = DbName, + username = UserName, + db_open = DbOpens, docs_read = DocsRead, rows_read = RowsRead, state = State0, type = Type, btree_folds = BtFolds, + get_kp_node = KpNodes, + get_kv_node = KvNodes, + ioq_calls = IoqCalls, changes_processed = ChangesProcessed } = Rctx, %%io:format("TO_JSON_MFA: ~p~n", [MFA0]), @@ -338,27 +376,43 @@ to_json(#rctx{}=Rctx) -> nonce => term_to_json(Nonce), %%from => From, from => term_to_json(From), + dbname => DbName, + username => UserName, + db_open => DbOpens, docs_read => DocsRead, rows_read => RowsRead, state => State, - type => term_to_json(Type), + type => term_to_json({type, Type}), btree_folds => BtFolds, + kp_nodes => KpNodes, + kv_nodes => KvNodes, + ioq_calls => IoqCalls, changes_processed => ChangesProcessed }. term_to_json({Pid, Ref}) when is_pid(Pid), is_reference(Ref) -> [?l2b(pid_to_list(Pid)), ?l2b(ref_to_list(Ref))]; +term_to_json({type, {coordinator, _, _} = Type}) -> + %%io:format("SETTING JSON TYPE: ~p~n", [Type]), + ?l2b(io_lib:format("~p", [Type])); term_to_json({A, B, C}) -> [A, B, C]; term_to_json(undefined) -> null; +term_to_json(null) -> + null; term_to_json(T) -> T. +term_to_flat_json({type, {coordinator, _, _} = Type}) -> + %%io:format("SETTING FLAT JSON TYPE: ~p~n", [Type]), + ?l2b(io_lib:format("~p", [Type])); term_to_flat_json(Tuple) when is_tuple(Tuple) -> ?l2b(io_lib:format("~w", [Tuple])); term_to_flat_json(undefined) -> null; +term_to_flat_json(null) -> + null; term_to_flat_json(T) -> T. @@ -369,11 +423,17 @@ to_flat_json(#rctx{}=Rctx) -> mfa = MFA0, nonce = Nonce0, from = From0, + dbname = DbName, + username = UserName, + db_open = DbOpens, docs_read = DocsRead, rows_read = RowsRead, state = State0, type = Type, - btree_folds = ChangesProcessed + get_kp_node = KpNodes, + get_kv_node = KvNodes, + btree_folds = ChangesProcessed, + ioq_calls = IoqCalls } = Rctx, io:format("TO_JSON_MFA: ~p~n", [MFA0]), MFA = case MFA0 of @@ -402,6 +462,7 @@ to_flat_json(#rctx{}=Rctx) -> Nonce0 -> list_to_binary(Nonce0) end, + io:format("NONCE IS: ~p||~p~n", [Nonce0, Nonce]), #{ %%updated_at => ?l2b(io_lib:format("~w", [TP])), updated_at => term_to_flat_json(TP), @@ -410,11 +471,17 @@ to_flat_json(#rctx{}=Rctx) -> mfa => MFA, nonce => Nonce, from => From, + dbname => DbName, + username => UserName, + db_open => DbOpens, docs_read => DocsRead, rows_read => RowsRead, state => State, - type => term_to_flat_json(Type), - btree_folds => ChangesProcessed + type => term_to_flat_json({type, Type}), + kp_nodes => KpNodes, + kv_nodes => KvNodes, + btree_folds => ChangesProcessed, + ioq_calls => IoqCalls }. get_pid_ref() -> @@ -440,12 +507,12 @@ create_context(Pid) -> %% add type to disnguish coordinator vs rpc_worker create_context(From, {M,F,_A} = MFA, Nonce) -> - io:format("CREAT_CONTEXT MFA[~p]: {~p}: ~p~n", [From, MFA, Nonce]), - Ref = make_ref(), + io:format("[~p] CREAT_CONTEXT MFA[~p]: {~p}: ~p~n", [self(), From, MFA, Nonce]), + PidRef = get_pid_ref(), %% this will instantiate a new PidRef %%Rctx = make_record(self(), Ref), %% TODO: extract user_ctx and db/shard from Rctx = #rctx{ - pid_ref = {self(), Ref}, + pid_ref = PidRef, from = From, mfa = MFA, type = {worker, M, F}, @@ -453,9 +520,58 @@ create_context(From, {M,F,_A} = MFA, Nonce) -> }, track(Rctx), erlang:put(?DELTA_TZ, Rctx), - ets:insert(?MODULE, Rctx), + true = ets:insert(?MODULE, Rctx), Rctx. +create_coordinator_context(#httpd{path_parts=Parts} = Req) -> + create_coordinator_context(Req, io_lib:format("~p", [Parts])). + +create_coordinator_context(#httpd{} = Req, Path) -> + io:format("CREATING COORDINATOR CONTEXT ON {~p}~n", [Path]), + #httpd{ + method = Verb, + %%path_parts = Parts, + nonce = Nonce + } = Req, + PidRef = get_pid_ref(), %% this will instantiate a new PidRef + %%Rctx = make_record(self(), Ref), + %% TODO: extract user_ctx and db/shard from Req + Rctx = #rctx{ + pid_ref = PidRef, + %%type = {cooridantor, Verb, Parts}, + type = {coordinator, Verb, [$/ | Path]}, + nonce = Nonce + }, + track(Rctx), + erlang:put(?DELTA_TZ, Rctx), + true = ets:insert(?MODULE, Rctx), + Rctx. + +set_context_dbname(DbName) -> + case ets:update_element(?MODULE, get_pid_ref(), [{#rctx.dbname, DbName}]) of + false -> + Stk = try throw(42) catch _:_:Stk0 -> Stk0 end, + io:format("UPDATING DBNAME[~p] FAILURE WITH CONTEXT: ~p AND STACK:~n~pFOO:: ~p~n~n", [DbName, get_resource(), Stk, process_info(self(), current_stacktrace)]), + timer:sleep(1000), + erlang:halt(kaboomz); + true -> + true + end. + +set_context_username(null) -> + ok; +set_context_username(UserName) -> + io:format("CSRT SETTING USERNAME CONTEXT: ~p~n", [UserName]), + case ets:update_element(?MODULE, get_pid_ref(), [{#rctx.username, UserName}]) of + false -> + Stk = try throw(42) catch _:_:Stk0 -> Stk0 end, + io:format("UPDATING DBNAME[~p] FAILURE WITH CONTEXT: ~p AND STACK:~n~pFOO:: ~p~n~n", [UserName, get_resource(), Stk, process_info(self(), current_stacktrace)]), + timer:sleep(1000), + erlang:halt(kaboomz); + true -> + true + end. + track(#rctx{}=Rctx) -> %% TODO: should this block or not? If no, what cleans up zombies? %% gen_server:call(?MODULE, {track, PR}). @@ -522,6 +638,10 @@ make_delta(#rctx{}=TA, #rctx{}=TB) -> docs_read => TB#rctx.docs_read - TA#rctx.docs_read, rows_read => TB#rctx.rows_read - TA#rctx.rows_read, btree_folds => TB#rctx.btree_folds - TA#rctx.btree_folds, + get_kp_node => TB#rctx.get_kp_node - TA#rctx.get_kp_node, + get_kv_node => TB#rctx.get_kv_node - TA#rctx.get_kv_node, + db_open => TB#rctx.db_open - TA#rctx.db_open, + ioq_calls => TB#rctx.ioq_calls - TA#rctx.ioq_calls, dt => timer:now_diff(TB#rctx.updated_at, TA#rctx.updated_at) }, %% TODO: reevaluate this decision diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index 49f7d2c4fe5..da0710c9630 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -262,6 +262,7 @@ set_purge_infos_limit(DbName, Limit, Options) -> with_db(DbName, Options, {couch_db, set_purge_infos_limit, [Limit]}). open_doc(DbName, DocId, Options) -> + io:format("frpc:open_doc(~p, ~p, ~p)~n", [DbName, DocId, Options]), with_db(DbName, Options, {couch_db, open_doc, [DocId, Options]}). open_revs(DbName, IdRevsOpts, Options) -> @@ -352,6 +353,14 @@ get_uuid(DbName) -> with_db(DbName, Options, {M, F, A}) -> set_io_priority(DbName, Options), + couch_stats_resource_tracker:set_context_dbname(DbName), + %% TODO: better approach here than using proplists? + case proplists:get_value(user_ctx, Options) of + undefined -> + ok; + #user_ctx{name = UserName} -> + couch_stats_resource_tracker:set_context_username(UserName) + end, case get_or_create_db(DbName, Options) of {ok, Db} -> rexi:reply( diff --git a/src/rexi/src/rexi.erl b/src/rexi/src/rexi.erl index 67287108975..754dee44411 100644 --- a/src/rexi/src/rexi.erl +++ b/src/rexi/src/rexi.erl @@ -129,8 +129,7 @@ async_server_call(Server, Caller, Request) -> -spec reply(any()) -> any(). reply(Reply) -> {Caller, Ref} = get(rexi_from), - Delta = couch_stats_resource_tracker:make_delta(), - erlang:send(Caller, {Ref, Reply, {delta, Delta}}). + erlang:send(Caller, {Ref, Reply, get_delta()}). %% @equiv sync_reply(Reply, 300000) sync_reply(Reply) -> @@ -215,8 +214,7 @@ stream(Msg, Limit, Timeout) -> {ok, Count} -> put(rexi_unacked, Count + 1), {Caller, Ref} = get(rexi_from), - Delta = couch_stats_resource_tracker:make_delta(), - erlang:send(Caller, {Ref, self(), Msg, {delta, Delta}}), + erlang:send(Caller, {Ref, self(), Msg, get_delta()}), ok catch throw:timeout -> @@ -330,3 +328,6 @@ drain_acks(Count) -> after 0 -> {ok, Count} end. + +get_delta() -> + {delta, couch_stats_resource_tracker:make_delta()}. diff --git a/src/rexi/src/rexi_utils.erl b/src/rexi/src/rexi_utils.erl index 4f4ca5576c3..f7127ecd1f9 100644 --- a/src/rexi/src/rexi_utils.erl +++ b/src/rexi/src/rexi_utils.erl @@ -103,6 +103,8 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> Fun(Msg, {Worker, From}, Acc0) end; {Ref, Msg} -> + %% TODO: add stack trace to log entry + couch_log:debug("rexi_utils:process_message no delta: {Ref, Msg} => {~p, ~p}~n", [Ref, Msg]), %%io:format("GOT NON DELTA MSG: ~p~n", [Msg]), case lists:keyfind(Ref, Keypos, RefList) of false -> @@ -113,6 +115,8 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> end; {Ref, From, Msg} -> %%io:format("GOT NON DELTA MSG: ~p~n", [Msg]), + %% TODO: add stack trace to log entry + couch_log:debug("rexi_utils:process_message no delta: {Ref, From, Msg} => {~p, ~p, ~p}~n", [Ref, From, Msg]), case lists:keyfind(Ref, Keypos, RefList) of false -> {ok, Acc0};