From 1195465fd73b3ac1c589bd71ca7d99a5c74fb90d Mon Sep 17 00:00:00 2001 From: k32 <10274441+k32@users.noreply.github.com> Date: Tue, 30 Mar 2021 20:38:20 +0200 Subject: [PATCH] Snabbkaffe-fy brod --- include/brod_int.hrl | 8 +--- rebar.config | 3 ++ rebar.config.script | 3 +- rebar.lock | 4 ++ src/brod_cg_commits.erl | 42 +++++++++++---------- src/brod_client.erl | 49 +++++++++++++------------ src/brod_consumer.erl | 34 +++++++++-------- src/brod_group_coordinator.erl | 9 +++-- src/brod_group_subscriber.erl | 14 ++----- src/brod_group_subscriber_v2.erl | 29 +++++++++++---- src/brod_group_subscriber_worker.erl | 14 +++++-- src/brod_kafka_apis.erl | 5 ++- src/brod_producer.erl | 18 +++++---- src/brod_topic_subscriber.erl | 3 ++ src/brod_utils.erl | 11 ------ test/brod_demo_group_subscriber_koc.erl | 4 +- test/brod_demo_group_subscriber_loc.erl | 4 +- test/brod_demo_topic_subscriber.erl | 8 ++-- test/brod_group_subscriber_SUITE.erl | 4 +- test/brod_test_group_subscriber.erl | 7 ++-- test/brod_test_macros.hrl | 2 +- test/kafka_test_helper.erl | 16 ++++---- 22 files changed, 156 insertions(+), 135 deletions(-) diff --git a/include/brod_int.hrl b/include/brod_int.hrl index 02f211ff..78d9f2c0 100644 --- a/include/brod_int.hrl +++ b/include/brod_int.hrl @@ -19,6 +19,7 @@ -include("brod.hrl"). -include_lib("kafka_protocol/include/kpro.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("kernel/include/logger.hrl"). -define(undef, undefined). @@ -73,13 +74,6 @@ -define(GET_STACKTRACE(Var), Var = erlang:get_stacktrace()). -endif. -%% Brod logging wrappers around Logger API calls. Insert 'brod' domain -%% to allow applications to filter Brod logs as they wish. --define(BROD_LOG_WARNING(Fmt, Args), ?LOG_WARNING(Fmt, Args, #{domain => [brod]})). --define(BROD_LOG_ERROR(Fmt, Args), ?LOG_ERROR( Fmt, Args, #{domain => [brod]})). --define(BROD_LOG_INFO(Fmt, Args), ?LOG_INFO( Fmt, Args, #{domain => [brod]})). --define(BROD_LOG(Level, Fmt, Args), ?LOG(Level, Fmt, Args, #{domain => [brod]})). - -endif. % include brod_int.hrl %%%_* Emacs ==================================================================== diff --git a/rebar.config b/rebar.config index 18c333cc..cdbc1a9c 100644 --- a/rebar.config +++ b/rebar.config @@ -1,6 +1,9 @@ +%% -*- mode:erlang -*- {deps, [ {supervisor3, "1.1.11"} , {kafka_protocol, "2.3.6"} + , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe", {tag, "0.10.0"}}} ]}. + {edoc_opts, [{preprocess, true}, {macros, [{build_brod_cli, true}]}]}. {erl_opts, [warn_unused_vars,warn_shadow_vars,warn_obsolete_guard,debug_info]}. {xref_checks, [undefined_function_calls, undefined_functions, diff --git a/rebar.config.script b/rebar.config.script index 4ee6fa35..0eb342aa 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -2,7 +2,6 @@ IsRebar3 = erlang:function_exported(rebar3, main, 1), DocoptUrl = "https://github.com/zmstone/docopt-erl.git", DocOptTag = "0.1.3", DocoptDep = {docopt, {git, DocoptUrl, {branch, DocOptTag}}}, -Snabbkaffe = {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe", {tag, "0.6.0"}}}, Profiles = {profiles, [ {brod_cli, [ @@ -18,7 +17,7 @@ Profiles = ]} ]}]}, {test, [ - {deps, [meck, proper, jsone, DocoptDep, Snabbkaffe]}, + {deps, [meck, proper, jsone, bear, DocoptDep]}, {erl_opts, [{d, build_brod_cli}]} ]} ]}, diff --git a/rebar.lock b/rebar.lock index 02973134..40500f62 100644 --- a/rebar.lock +++ b/rebar.lock @@ -1,6 +1,10 @@ {"1.2.0", [{<<"crc32cer">>,{pkg,<<"crc32cer">>,<<"0.1.4">>},1}, {<<"kafka_protocol">>,{pkg,<<"kafka_protocol">>,<<"2.3.6">>},0}, + {<<"snabbkaffe">>, + {git,"https://github.com/kafka4beam/snabbkaffe", + {ref,"74a0d6cd6bbe32581616aee96e245ff2065f4a48"}}, + 0}, {<<"snappyer">>,{pkg,<<"snappyer">>,<<"1.2.5">>},1}, {<<"supervisor3">>,{pkg,<<"supervisor3">>,<<"1.1.11">>},0}]}. [ diff --git a/src/brod_cg_commits.erl b/src/brod_cg_commits.erl index c3db8962..4516df3f 100644 --- a/src/brod_cg_commits.erl +++ b/src/brod_cg_commits.erl @@ -149,6 +149,10 @@ init({Client, GroupInput}) -> ProtocolName = proplists:get_value(protocol, GroupInput), Retention = proplists:get_value(retention, GroupInput), Offsets = proplists:get_value(offsets, GroupInput), + logger:update_process_metadata(#{ group_id => GroupId + , coor => self() + , domain => [brod, cg_commits] + }), %% use callback_implemented strategy so I know I am elected leader %% when `assign_partitions' callback is called. Config = [ {partition_assignment_strategy, callback_implemented} @@ -167,7 +171,7 @@ init({Client, GroupInput}) -> {ok, State}. handle_info(Info, State) -> - log(State, info, "Info discarded:~p", [Info]), + ?tp(warning, "unknown message", #{info => Info}), {noreply, State}. handle_call(sync, From, State0) -> @@ -177,7 +181,7 @@ handle_call(sync, From, State0) -> handle_call({assign_partitions, Members, TopicPartitions}, _From, #state{topic = MyTopic, offsets = Offsets} = State) -> - log(State, info, "Assigning all topic partitions to self", []), + ?LOG_INFO("Assingning all topic-partitions to self", []), MyTP = [{MyTopic, P} || {P, _} <- Offsets], %% Assert that my topic partitions are included in %% subscriptions collected from ALL members @@ -188,8 +192,8 @@ handle_call({assign_partitions, Members, TopicPartitions}, _From, ok; BadPartitions -> PartitionNumbers = [P || {_T, P} <- BadPartitions], - log(State, error, "Nonexisting partitions in input: ~p", - [PartitionNumbers]), + ?LOG_ERROR("Nonexisting partitions in input: ~p", + [PartitionNumbers]), erlang:exit({non_existing_partitions, PartitionNumbers}) end, %% To keep it simple, assign all my topic-partitions to self @@ -209,9 +213,6 @@ handle_cast({new_assignments, _MemberId, GenerationId, Assignments}, , coordinator = Pid , topic = MyTopic } = State) -> - %% Write a log if I am not a leader, - %% hope the desired partitions are all assigned to me - IsLeader orelse log(State, info, "Not elected", []), Groupped0 = brod_utils:group_per_key( fun(#brod_received_assignment{ topic = Topic @@ -223,11 +224,18 @@ handle_cast({new_assignments, _MemberId, GenerationId, Assignments}, %% Discard other topics if for whatever reason the group leader assigns %% irrelevant topic-partitions to me Groupped = lists:filter(fun({Topic, _}) -> Topic =:= MyTopic end, Groupped0), - log(State, info, "current offsets:\n~p", [Groupped]), + %% Write leader election results to the log, + %% hope the desired partitions are all assigned to me + ?tp(info, election_result, + #{ generation_id => GenerationId + , topic => MyTopic + , elected => IsLeader + , current_offsets => Groupped + }), %% Assert all desired partitions are in assignment case Groupped of [] -> - log(State, error, "Topic ~s is not received in assignment", [MyTopic]), + ?LOG_ERROR("Topic ~s is not received in assignment", [MyTopic]), erlang:exit({bad_topic_assignment, Groupped0}); [{MyTopic, PartitionOffsetList}] -> MyPartitions = [P || {P, _O} <- OffsetsToCommit], @@ -236,10 +244,9 @@ handle_cast({new_assignments, _MemberId, GenerationId, Assignments}, [] -> ok; Left -> - log(State, error, - "Partitions ~p are not received in assignment, " - "There is probably another active group member subscribing " - "to topic ~s, stop it and retry\n", [MyTopic, Left]), + ?LOG_ERROR("Partitions ~p are not received in assignment, " + "There is probably another active group member subscribing " + "to topic ~s, stop it and retry\n", [MyTopic, Left]), erlang:exit({unexpected_assignments, Left}) end end, @@ -255,7 +262,7 @@ handle_cast({new_assignments, _MemberId, GenerationId, Assignments}, case brod_group_coordinator:commit_offsets(Pid) of ok -> ok; {error, Reason} -> - log(State, error, "Failed to commit, reason:\n~p", [Reason]), + ?LOG_ERROR("Failed to commit, reason:\n~p", [Reason]), erlang:exit(commit_failed) end, {noreply, set_done(State)}; @@ -281,7 +288,7 @@ maybe_reply_sync(#state{pending_sync = ?undef} = State) -> State; maybe_reply_sync(#state{pending_sync = From} = State) -> gen_server:reply(From, ok), - log(State, info, "done\n", []), + ?LOG_INFO("done\n", []), State#state{pending_sync = ?undef}. %% I am the current leader because I am assigning partitions. @@ -294,11 +301,6 @@ assign_all_to_self([{MyMemberId, _} | Members], TopicPartitions) -> | [{Id, []} || {Id, _MemberMeta} <- Members] ]. -log(#state{groupId = GroupId}, Level, Fmt, Args) -> - ?BROD_LOG(Level, - "Group member (~s,coor=~p):\n" ++ Fmt, - [GroupId, self() | Args]). - %%%_* Emacs ==================================================================== %%% Local Variables: %%% allout-layout: t diff --git a/src/brod_client.erl b/src/brod_client.erl index 8171d1bd..b41ba136 100644 --- a/src/brod_client.erl +++ b/src/brod_client.erl @@ -285,6 +285,9 @@ deregister_consumer(Client, Topic, Partition) -> %% @private init({BootstrapEndpoints, ClientId, Config}) -> erlang:process_flag(trap_exit, true), + logger:update_process_metadata(#{ domain => [brod, client] + , id => ClientId + }), Tab = ets:new(?ETS(ClientId), [named_table, protected, {read_concurrency, true}]), self() ! init, @@ -308,21 +311,21 @@ handle_info(init, State0) -> handle_info({'EXIT', Pid, Reason}, #state{ client_id = ClientId , producers_sup = Pid } = State) -> - ?BROD_LOG_ERROR("client ~p producers supervisor down~nReason: ~p", - [ClientId, Pid, Reason]), + ?LOG_ERROR("client ~p producers supervisor down~nReason: ~p", + [ClientId, Pid, Reason]), {stop, {producers_sup_down, Reason}, State}; handle_info({'EXIT', Pid, Reason}, #state{ client_id = ClientId , consumers_sup = Pid } = State) -> - ?BROD_LOG_ERROR("client ~p consumers supervisor down~nReason: ~p", - [ClientId, Pid, Reason]), + ?LOG_ERROR("client ~p consumers supervisor down~nReason: ~p", + [ClientId, Pid, Reason]), {stop, {consumers_sup_down, Reason}, State}; handle_info({'EXIT', Pid, Reason}, State) -> NewState = handle_connection_down(State, Pid, Reason), {noreply, NewState}; handle_info(Info, State) -> - ?BROD_LOG_WARNING("~p [~p] ~p got unexpected info: ~p", - [?MODULE, self(), State#state.client_id, Info]), + ?LOG_WARNING("~p [~p] ~p got unexpected info: ~p", + [?MODULE, self(), State#state.client_id, Info]), {noreply, State}. %% @private @@ -379,8 +382,8 @@ handle_cast({deregister, Key}, #state{workers_tab = Tab} = State) -> ets:delete(Tab, Key), {noreply, State}; handle_cast(Cast, State) -> - ?BROD_LOG_WARNING("~p [~p] ~p got unexpected cast: ~p", - [?MODULE, self(), State#state.client_id, Cast]), + ?LOG_WARNING("~p [~p] ~p got unexpected cast: ~p", + [?MODULE, self(), State#state.client_id, Cast]), {noreply, State}. %% @private @@ -398,8 +401,8 @@ terminate(Reason, #state{ client_id = ClientId true -> ok; false -> - ?BROD_LOG_WARNING("~p [~p] ~p is terminating\nreason: ~p~n", - [?MODULE, self(), ClientId, Reason]) + ?LOG_WARNING("~p [~p] ~p is terminating\nreason: ~p~n", + [?MODULE, self(), ClientId, Reason]) end, %% stop producers and consumers first because they are monitoring connections shutdown_pid(ProducersSup), @@ -538,9 +541,9 @@ do_get_metadata(Topic, #state{ client_id = ClientId ok = update_partitions_count_cache(Ets, TopicMetadataArray), {{ok, Metadata}, State}; {error, Reason} -> - ?BROD_LOG_ERROR("~p failed to fetch metadata for topics: ~p\n" - "reason=~p", - [ClientId, Topics, Reason]), + ?LOG_ERROR("~p failed to fetch metadata for topics: ~p\n" + "reason=~p", + [ClientId, Topics, Reason]), {{error, Reason}, State} end. @@ -634,9 +637,9 @@ maybe_connect(#state{client_id = ClientId} = State, true -> connect(State, Endpoint); false -> - ?BROD_LOG_ERROR("~p (re)connect to ~s:~p aborted.\n" - "last failure: ~p", - [ClientId, Host, Port, Reason]), + ?LOG_ERROR("~p (re)connect to ~s:~p aborted.\n" + "last failure: ~p", + [ClientId, Host, Port, Reason]), {{error, Reason}, State} end. @@ -648,15 +651,15 @@ connect(#state{ client_id = ClientId Conn = case do_connect(Endpoint, State) of {ok, Pid} -> - ?BROD_LOG_INFO("client ~p connected to ~s:~p~n", - [ClientId, Host, Port]), + ?LOG_INFO("client ~p connected to ~s:~p~n", + [ClientId, Host, Port]), #conn{ endpoint = Endpoint , pid = Pid }; {error, Reason} -> - ?BROD_LOG_ERROR("client ~p failed to connect to ~s:~p~n" - "reason:~p", - [ClientId, Host, Port, Reason]), + ?LOG_ERROR("client ~p failed to connect to ~s:~p~n" + "reason:~p", + [ClientId, Host, Port, Reason]), #conn{ endpoint = Endpoint , pid = mark_dead(Reason) } @@ -684,8 +687,8 @@ handle_connection_down(#state{ payload_conns = Conns } = State, Pid, Reason) -> case lists:keytake(Pid, #conn.pid, Conns) of {value, #conn{endpoint = {Host, Port}} = Conn, Rest} -> - ?BROD_LOG_INFO("client ~p: payload connection down ~s:~p~n" - "reason:~p", [ClientId, Host, Port, Reason]), + ?LOG_INFO("client ~p: payload connection down ~s:~p~n" + "reason:~p", [ClientId, Host, Port, Reason]), NewConn = Conn#conn{pid = mark_dead(Reason)}, State#state{payload_conns = [NewConn | Rest]}; false -> diff --git a/src/brod_consumer.erl b/src/brod_consumer.erl index 21a9cebd..407b03aa 100644 --- a/src/brod_consumer.erl +++ b/src/brod_consumer.erl @@ -260,6 +260,10 @@ get_connection(Pid) -> init({Bootstrap, Topic, Partition, Config}) -> erlang:process_flag(trap_exit, true), + logger:update_process_metadata(#{ topic => Topic + , partition => Partition + , domain => [brod, consumer] + }), Cfg = fun(Name, Default) -> proplists:get_value(Name, Config, Default) end, @@ -333,8 +337,8 @@ handle_info({'EXIT', Pid, _Reason}, #state{connection = Pid} = State) -> %% standalone connection spawn-linked to self() {noreply, handle_conn_down(State)}; handle_info(Info, State) -> - ?BROD_LOG_WARNING("~p ~p got unexpected info: ~p", - [?MODULE, self(), Info]), + ?LOG_WARNING("~p ~p got unexpected info: ~p", + [?MODULE, self(), Info]), {noreply, State}. %% @private @@ -382,8 +386,8 @@ handle_cast({ack, Offset}, #state{pending_acks = PendingAcks} = State0) -> State = maybe_send_fetch_request(State1), {noreply, State}; handle_cast(Cast, State) -> - ?BROD_LOG_WARNING("~p ~p got unexpected cast: ~p", - [?MODULE, self(), Cast]), + ?LOG_WARNING("~p ~p got unexpected cast: ~p", + [?MODULE, self(), Cast]), {noreply, State}. %% @private @@ -403,8 +407,8 @@ terminate(Reason, #state{ bootstrap = Bootstrap false -> ok end, %% write a log if it's not a normal reason - IsNormal orelse ?BROD_LOG_ERROR("Consumer ~s-~w terminate reason: ~p", - [Topic, Partition, Reason]), + IsNormal orelse ?LOG_ERROR("Consumer ~s-~w terminate reason: ~p", + [Topic, Partition, Reason]), ok. %% @private @@ -579,8 +583,8 @@ handle_fetch_error(#kafka_fetch_error{error_code = ErrorCode} = Error, } = State) -> case err_op(ErrorCode) of reset_connection -> - ?BROD_LOG_INFO("Fetch error ~s-~p: ~p", - [Topic, Partition, ErrorCode]), + ?LOG_INFO("Fetch error ~s-~p: ~p", + [Topic, Partition, ErrorCode]), %% The current connection in use is not connected to the partition leader, %% so we dereference and demonitor the connection pid, but leave it alive, %% Can not kill it because it might be shared with other partition workers @@ -596,8 +600,8 @@ handle_fetch_error(#kafka_fetch_error{error_code = ErrorCode} = Error, {noreply, maybe_send_fetch_request(State)}; stop -> ok = cast_to_subscriber(Subscriber, Error), - ?BROD_LOG_ERROR("Consumer ~s-~p shutdown\nReason: ~p", - [Topic, Partition, ErrorCode]), + ?LOG_ERROR("Consumer ~s-~p shutdown\nReason: ~p", + [Topic, Partition, ErrorCode]), {stop, normal, State}; reset_offset -> handle_reset_offset(State, Error); @@ -611,13 +615,13 @@ handle_reset_offset(#state{ subscriber = Subscriber } = State, Error) -> ok = cast_to_subscriber(Subscriber, Error), %% Suspend, no more fetch request until the subscriber re-subscribes - ?BROD_LOG_INFO("~p ~p consumer is suspended, " - "waiting for subscriber ~p to resubscribe with " - "new begin_offset", [?MODULE, self(), Subscriber]), + ?LOG_INFO("~p ~p consumer is suspended, " + "waiting for subscriber ~p to resubscribe with " + "new begin_offset", [?MODULE, self(), Subscriber]), {noreply, State#state{is_suspended = true}}; handle_reset_offset(#state{offset_reset_policy = Policy} = State, _Error) -> - ?BROD_LOG_INFO("~p ~p offset out of range, applying reset policy ~p", - [?MODULE, self(), Policy]), + ?LOG_INFO("~p ~p offset out of range, applying reset policy ~p", + [?MODULE, self(), Policy]), BeginOffset = case Policy of reset_to_earliest -> ?OFFSET_EARLIEST; reset_to_latest -> ?OFFSET_LATEST diff --git a/src/brod_group_coordinator.erl b/src/brod_group_coordinator.erl index 1c415d61..229d3241 100644 --- a/src/brod_group_coordinator.erl +++ b/src/brod_group_coordinator.erl @@ -308,6 +308,9 @@ stop(Pid) -> init({Client, GroupId, Topics, Config, CbModule, MemberPid}) -> erlang:process_flag(trap_exit, true), + logger:update_process_metadata(#{ group_id => GroupId + , domain => [brod, group_coordinator] + }), GetCfg = fun(Name, Default) -> proplists:get_value(Name, Config, Default) end, @@ -1068,9 +1071,9 @@ log(#state{ groupId = GroupId , generationId = GenerationId , member_pid = MemberPid }, Level, Fmt, Args) -> - ?BROD_LOG(Level, - "Group member (~s,coor=~p,cb=~p,generation=~p):\n" ++ Fmt, - [GroupId, self(), MemberPid, GenerationId | Args]). + ?LOG(Level, + "Group member (~s,coor=~p,cb=~p,generation=~p):\n" ++ Fmt, + [GroupId, self(), MemberPid, GenerationId | Args]). %% Make metata to be committed together with offsets. -spec make_offset_commit_metadata() -> binary(). diff --git a/src/brod_group_subscriber.erl b/src/brod_group_subscriber.erl index 8da2f29c..f1b84bde 100644 --- a/src/brod_group_subscriber.erl +++ b/src/brod_group_subscriber.erl @@ -302,6 +302,9 @@ get_committed_offsets(Pid, TopicPartitions) -> init({Client, GroupId, Topics, GroupConfig, ConsumerConfig, MessageType, CbModule, CbInitArg}) -> + logger:update_process_metadata(#{ group_id => GroupId + , domain => [brod, group_subscriber_v1] + }), ok = brod_utils:assert_client(Client), ok = brod_utils:assert_group_id(GroupId), ok = brod_utils:assert_topics(Topics), @@ -353,7 +356,7 @@ handle_info(?LO_CMD_SUBSCRIBE_PARTITIONS, State) -> Tref = start_subscribe_timer(?undef, ?RESUBSCRIBE_DELAY), {noreply, NewState#state{subscribe_tref = Tref}}; handle_info(Info, State) -> - log(State, info, "discarded message:~p", [Info]), + ?tp(info, "unknown message", #{info => Info}), {noreply, State}. handle_call({get_committed_offsets, TopicPartitions}, _From, @@ -624,15 +627,6 @@ subscribe_partition(Client, Consumer) -> end end. -log(#state{ groupId = GroupId - , memberId = MemberId - , generationId = GenerationId - }, Level, Fmt, Args) -> - ?BROD_LOG( - Level, - "group subscriber (groupId=~s,memberId=~s,generation=~p,pid=~p):\n" ++ Fmt, - [GroupId, MemberId, GenerationId, self() | Args]). - get_consumer(Pid, Consumers) when is_pid(Pid) -> lists:keyfind(Pid, #consumer.consumer_pid, Consumers); get_consumer({_, _} = TP, Consumers) -> diff --git a/src/brod_group_subscriber_v2.erl b/src/brod_group_subscriber_v2.erl index 5fd47463..6d517b90 100644 --- a/src/brod_group_subscriber_v2.erl +++ b/src/brod_group_subscriber_v2.erl @@ -249,6 +249,9 @@ init(Config) -> , cb_module := CbModule } = Config, process_flag(trap_exit, true), + logger:update_process_metadata(#{ group_id => GroupId + , domain => [brod, group_subscriber_v2] + }), MessageType = maps:get(message_type, Config, message_set), DefaultGroupConfig = [], GroupConfig = maps:get(group_config, Config, DefaultGroupConfig), @@ -365,7 +368,10 @@ handle_info({'EXIT', Pid, Reason}, State) -> [TopicPartition|_] -> handle_worker_failure(TopicPartition, Pid, Reason, State); _ -> % Other process wants to kill us, supervisor? - ?BROD_LOG_INFO("Received EXIT:~p from ~p, shutting down", [Reason, Pid]), + ?tp(info, "received EXIT, shutting down", + #{ from => Pid + , reason => Reason + }), {stop, shutdown, State} end; handle_info(_Info, State) -> @@ -394,19 +400,26 @@ handle_worker_failure({Topic, Partition}, Pid, Reason, State) -> , group_id = GroupId , coordinator = Coordinator } = State, - ?BROD_LOG_ERROR("group_subscriber_v2 worker crashed.~n" - " group_id = ~s~n topic = ~s~n paritition = ~p~n" - " pid = ~p~n reason = ~p", - [GroupId, Topic, Partition, Pid, Reason]), + ?tp(error, "brod_group_subscriber_v2 worker crashed", + #{ group_id => GroupId + , topic => Topic + , partition => Partition + , pid => Pid + , reason => Reason + }), terminate_all_workers(Workers), brod_group_coordinator:commit_offsets(Coordinator), exit(worker_crash). -spec terminate_all_workers(workers()) -> ok. terminate_all_workers(Workers) -> - maps:map( fun(_, Worker) -> - ?BROD_LOG_INFO("Terminating worker pid=~p", [Worker]), - terminate_worker(Worker) + maps:map( fun({Topic, Partition}, Pid) -> + ?tp(info, "Terminating brod_group_subscriber_v2 worker", + #{ pid => Pid + , topic => Topic + , partition => Partition + }), + terminate_worker(Pid) end , Workers ), diff --git a/src/brod_group_subscriber_worker.erl b/src/brod_group_subscriber_worker.erl index 75b9ebbe..00600f1b 100644 --- a/src/brod_group_subscriber_worker.erl +++ b/src/brod_group_subscriber_worker.erl @@ -53,14 +53,20 @@ init(_Topic, StartOpts) -> , partition := Partition , begin_offset := BeginOffset , commit_fun := CommitFun + , topic := Topic + , group_id := GroupId } = StartOpts, + logger:update_process_metadata(#{ partition => Partition + , topic => Topic + , group_id => GroupId + , domain => [brod, group_subscriber_worker] + }), InitInfo = maps:with( [topic, partition, group_id, commit_fun] , StartOpts ), - ?BROD_LOG_INFO("Starting group_subscriber_worker: ~p~n" - "Offset: ~p~nPid: ~p~n" - , [InitInfo, BeginOffset, self()] - ), + ?tp(info, group_subscriber_worker_start, + InitInfo #{ offset => BeginOffset + }), {ok, CbState} = CbModule:init(InitInfo, CbConfig), State = #state{ start_options = StartOpts , cb_module = CbModule diff --git a/src/brod_kafka_apis.erl b/src/brod_kafka_apis.erl index e2bba199..c6d06a4c 100644 --- a/src/brod_kafka_apis.erl +++ b/src/brod_kafka_apis.erl @@ -72,6 +72,7 @@ pick_version(Conn, API) -> %%%_* gen_server callbacks ===================================================== init([]) -> + logger:update_process_metadata(#{domain => [brod, kafka_apis]}), ?ETS = ets:new(?ETS, [named_table, public]), {ok, #state{}}. @@ -79,14 +80,14 @@ handle_info({'DOWN', _Mref, process, Conn, _Reason}, State) -> _ = ets:delete(?ETS, Conn), {noreply, State}; handle_info(Info, State) -> - ?BROD_LOG_ERROR("unknown info ~p", [Info]), + ?tp(error, "unknown message", #{info => Info}), {noreply, State}. handle_cast({monitor_connection, Conn}, State) -> erlang:monitor(process, Conn), {noreply, State}; handle_cast(Cast, State) -> - ?BROD_LOG_ERROR("unknown cast ~p", [Cast]), + ?tp(error, "unknown message", #{cast => Cast}), {noreply, State}. handle_call(stop, From, State) -> diff --git a/src/brod_producer.erl b/src/brod_producer.erl index 130395fe..85bc5b7e 100644 --- a/src/brod_producer.erl +++ b/src/brod_producer.erl @@ -270,6 +270,10 @@ stop(Pid) -> ok = gen_server:call(Pid, stop). %% @private init({ClientPid, Topic, Partition, Config}) -> erlang:process_flag(trap_exit, true), + logger:update_process_metadata(#{ domain => [brod, producer] + , topic => Topic + , partition => Partition + }), BufferLimit = ?config(partition_buffer_limit, ?DEFAULT_PARITION_BUFFER_LIMIT), OnWireLimit = ?config(partition_onwire_limit, ?DEFAULT_PARITION_ONWIRE_LIMIT), MaxBatchSize = ?config(max_batch_size, ?DEFAULT_MAX_BATCH_SIZE), @@ -384,7 +388,10 @@ handle_info({msg, Pid, #kpro_rsp{ api = produce {ok, NewState} = case ?IS_ERROR(ErrorCode) of true -> - _ = log_error_code(Topic, Partition, Offset, ErrorCode), + ?tp(error, "Produce error", + #{ error_code => ErrorCode + , offset => Offset + }), Error = {produce_response_error, Topic, Partition, Offset, ErrorCode}, is_retriable(ErrorCode) orelse exit({not_retriable, Error}), @@ -435,11 +442,6 @@ format_status(terminate, [_PDict, State=#state{buffer = Buffer}]) -> %%%_* Internal Functions ======================================================= --spec log_error_code(topic(), partition(), offset(), brod:error_code()) -> _. -log_error_code(Topic, Partition, Offset, ErrorCode) -> - ?BROD_LOG_ERROR("Produce error ~s-~B Offset: ~B Error: ~p", - [Topic, Partition, Offset, ErrorCode]). - handle_produce(BufCb, Batch, #state{retry_tref = Ref} = State) when is_reference(Ref) -> %% pending on retry, add to buffer regardless of connection state @@ -487,8 +489,8 @@ maybe_reinit_connection(#state{ client_pid = ClientPid ok = maybe_demonitor(OldConnMref), %% Make sure the sent but not acked ones are put back to buffer Buffer = brod_producer_buffer:nack_all(Buffer0, no_leader_connection), - ?BROD_LOG_WARNING("Failed to (re)init connection, reason:\n~p", - [Reason]), + ?LOG_WARNING("Failed to (re)init connection, reason:\n~p", + [Reason]), {ok, State#state{ connection = ?undef , conn_mref = ?undef , buffer = Buffer diff --git a/src/brod_topic_subscriber.erl b/src/brod_topic_subscriber.erl index d42482be..06ee2e43 100644 --- a/src/brod_topic_subscriber.erl +++ b/src/brod_topic_subscriber.erl @@ -280,6 +280,9 @@ init(Config) -> , consumer_config := ConsumerConfig , partitions := Partitions } = maps:merge(Defaults, Config), + logger:update_process_metadata(#{ domain => [brod, topic_subscriber] + , topic => Topic + }), {ok, CommittedOffsets, CbState} = CbModule:init(Topic, InitData), ok = brod_utils:assert_client(Client), ok = brod_utils:assert_topic(Topic), diff --git a/src/brod_utils.erl b/src/brod_utils.erl index d1929f4a..fb1dc6f9 100644 --- a/src/brod_utils.erl +++ b/src/brod_utils.erl @@ -46,7 +46,6 @@ , is_pid_alive/1 , list_all_groups/2 , list_groups/2 - , log/3 , make_batch_input/2 , make_fetch_fun/4 , make_part_fun/1 @@ -75,9 +74,6 @@ -type offset_time() :: brod:offset_time(). -type group_id() :: brod:group_id(). -%% log/3 is deprecated, use ?BROD_LOG* macros from brod_int.hrl instead. --deprecated([{log, 3, eventually}]). - %%%_* APIs ===================================================================== %% @equiv create_topics(Hosts, TopicsConfigs, RequestConfigs, []) @@ -214,13 +210,6 @@ epoch_ms() -> {Mega, S, Micro} = os:timestamp(), (((Mega * 1000000) + S) * 1000) + Micro div 1000. -%% @doc Wrapper around logger API. -%% @deprecated Use ?BROD_LOG* macros from brod_int.hrl instead. --spec log(info | warning | error, string(), [any()]) -> ok. -log(info, Fmt, Args) -> ?BROD_LOG_ERROR(Fmt, Args); -log(warning, Fmt, Args) -> ?BROD_LOG_WARNING(Fmt, Args); -log(error, Fmt, Args) -> ?BROD_LOG_ERROR(Fmt, Args). - %% @doc Assert client_id is an atom(). -spec assert_client(brod:client_id() | pid()) -> ok | no_return(). assert_client(Client) -> diff --git a/test/brod_demo_group_subscriber_koc.erl b/test/brod_demo_group_subscriber_koc.erl index 6f5832fc..84f2d285 100644 --- a/test/brod_demo_group_subscriber_koc.erl +++ b/test/brod_demo_group_subscriber_koc.erl @@ -200,8 +200,8 @@ message_handler_loop(Topic, Partition, SubscriberPid) -> } -> Seqno = list_to_integer(binary_to_list(Value)), Now = os_time_utc_str(), - ?BROD_LOG_INFO("~p ~s-~p ~s: offset:~w seqno:~w\n", - [self(), Topic, Partition, Now, Offset, Seqno]), + ?LOG_INFO("~p ~s-~p ~s: offset:~w seqno:~w\n", + [self(), Topic, Partition, Now, Offset, Seqno]), brod_group_subscriber:ack(SubscriberPid, Topic, Partition, Offset), ?MODULE:message_handler_loop(Topic, Partition, SubscriberPid) after 1000 -> diff --git a/test/brod_demo_group_subscriber_loc.erl b/test/brod_demo_group_subscriber_loc.erl index c5c2fb36..777ee0bb 100644 --- a/test/brod_demo_group_subscriber_loc.erl +++ b/test/brod_demo_group_subscriber_loc.erl @@ -150,8 +150,8 @@ process_message(Topic, Partition, Dir, GroupId, Message) -> } = Message, Seqno = list_to_integer(binary_to_list(Value)), Now = os_time_utc_str(), - ?BROD_LOG_INFO("~p ~p ~s: offset:~w seqno:~w\n", - [self(), Partition, Now, Offset, Seqno]), + ?LOG_INFO("~p ~p ~s: offset:~w seqno:~w\n", + [self(), Partition, Now, Offset, Seqno]), ok = commit_offset(Dir, GroupId, Topic, Partition, Offset). %% @doc This callback is called whenever there is a new assignment received. diff --git a/test/brod_demo_topic_subscriber.erl b/test/brod_demo_topic_subscriber.erl index 315ec1e7..fa7f2f3d 100644 --- a/test/brod_demo_topic_subscriber.erl +++ b/test/brod_demo_topic_subscriber.erl @@ -113,9 +113,11 @@ process_message(Dir, Partition, Message) -> , value = Value } = Message, Seqno = binary_to_integer(Value), - Now = os_time_utc_str(), - ?BROD_LOG_INFO("~p ~p ~s: offset:~w seqno:~w\n", - [self(), Partition, Now, Offset, Seqno]), + ?tp(info, demo_process_message, + #{ partition => Partition + , seqno => Seqno + , offset => Offset + }), ok = commit_offset(Dir, Partition, Offset). -spec read_offsets(string()) -> [{brod:partition(), brod:offset()}]. diff --git a/test/brod_group_subscriber_SUITE.erl b/test/brod_group_subscriber_SUITE.erl index b5b84d28..f167f8c4 100644 --- a/test/brod_group_subscriber_SUITE.erl +++ b/test/brod_group_subscriber_SUITE.erl @@ -123,7 +123,7 @@ common_end_per_testcase(Case, Config) -> kafka_test_helper:common_end_per_testcase(Case, Config), receive {'EXIT', From, Reason} -> - ?log(warning, "Refusing to become collateral damage." + ?LOG(warning, "Refusing to become collateral damage." " Offender: ~p Reason: ~p", [From, Reason]) after 0 -> @@ -673,7 +673,7 @@ start_subscriber(GroupId, Config, Topics, GroupConfig, ConsumerConfig, InitArgs) GroupConfig, ConsumerConfig, ?MODULE, InitArgs) end, - ?log(notice, "Started subscriber with pid=~p", [SubscriberPid]), + ?LOG(notice, "Started subscriber with pid=~p", [SubscriberPid]), {ok, SubscriberPid}. stop_subscriber(Config, Pid) -> diff --git a/test/brod_test_group_subscriber.erl b/test/brod_test_group_subscriber.erl index 7a677135..7f4c671a 100644 --- a/test/brod_test_group_subscriber.erl +++ b/test/brod_test_group_subscriber.erl @@ -36,9 +36,10 @@ init(InitInfo, Config) -> IsAsyncAck = maps:get(async_ack, Config, false), IsAsyncCommit = maps:get(async_commit, Config, false), IsAssignPartitions = maps:get(assign_partitions, Config, false), - ?BROD_LOG_INFO("Started a test group subscriber.~n" - "Config: ~p~nInitInfo: ~p~n" - , [Config, InitInfo]), + ?tp(brod_test_subscriber_start, + #{ config => Config + , init_info => InitInfo + }), {ok, #state{ is_async_ack = IsAsyncAck , is_async_commit = IsAsyncCommit , is_assign_partitions = IsAssignPartitions diff --git a/test/brod_test_macros.hrl b/test/brod_test_macros.hrl index b6fe406b..f0f3e022 100644 --- a/test/brod_test_macros.hrl +++ b/test/brod_test_macros.hrl @@ -2,9 +2,9 @@ -define(BROD_TEST_MACROS_HRL, true). -include_lib("kafka_protocol/include/kpro.hrl"). --include_lib("hut/include/hut.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("stdlib/include/assert.hrl"). +-include_lib("kernel/include/logger.hrl"). %%==================================================================== %% Macros diff --git a/test/kafka_test_helper.erl b/test/kafka_test_helper.erl index 99fcefa0..e0c12c8e 100644 --- a/test/kafka_test_helper.erl +++ b/test/kafka_test_helper.erl @@ -66,7 +66,6 @@ produce({Topic, Partition}, Key, Value, Headers) -> , headers => Headers }] ), - ?log(notice, "Produced at ~p ~p, offset: ~p", [Topic, Partition, Offset]), Offset. payloads(Config) -> @@ -76,7 +75,7 @@ payloads(Config) -> %% Produce binaries to the topic and return offset of the last message: produce_payloads(Topic, Partition, Config) -> Payloads = payloads(Config), - ?log(notice, "Producing payloads to ~p", [{Topic, Partition}]), + ?LOG(notice, "Producing payloads to ~p", [{Topic, Partition}]), L = [produce({Topic, Partition}, I) + 1 || I <- payloads(Config)], LastOffset = lists:last(L), {LastOffset, Payloads}. @@ -111,13 +110,13 @@ exec_in_kafka_container(FMT, Args) -> CMD0 = lists:flatten(io_lib:format(FMT, Args)), CMD = "docker exec kafka-1 bash -c '" ++ CMD0 ++ "'", Port = open_port({spawn, CMD}, [exit_status, stderr_to_stdout]), - ?log(notice, "Running ~s~nin kafka container", [CMD0]), + ?LOG_NOTICE("Running ~s~nin kafka container", [CMD0]), collect_port_output(Port, CMD). collect_port_output(Port, CMD) -> receive {Port, {data, Str}} -> - ?log(notice, "~s", [Str]), + ?LOG_NOTICE("~s", [Str]), collect_port_output(Port, CMD); {Port, {exit_status, ExitStatus}} -> ExitStatus @@ -158,11 +157,10 @@ wait_n_messages(TestGroupId, Expected, NRetries) -> begin Offsets = get_acked_offsets(TestGroupId), NMessages = lists:sum(maps:values(Offsets)), - ?log( notice - , "Number of messages processed by consumer group: ~p; " - "total: ~p/~p" - , [Offsets, NMessages, Expected] - ), + ?LOG_NOTICE("Number of messages processed by consumer group: ~p; " + "total: ~p/~p" + , [Offsets, NMessages, Expected] + ), ?assert(NMessages >= Expected) end).