Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snabbkaffe-fy brod #448

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions include/brod_int.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

-include("brod.hrl").
-include_lib("kafka_protocol/include/kpro.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("kernel/include/logger.hrl").

-define(undef, undefined).
Expand Down Expand Up @@ -73,13 +74,6 @@
-define(GET_STACKTRACE(Var), Var = erlang:get_stacktrace()).
-endif.

%% Brod logging wrappers around Logger API calls. Insert 'brod' domain
%% to allow applications to filter Brod logs as they wish.
-define(BROD_LOG_WARNING(Fmt, Args), ?LOG_WARNING(Fmt, Args, #{domain => [brod]})).
-define(BROD_LOG_ERROR(Fmt, Args), ?LOG_ERROR( Fmt, Args, #{domain => [brod]})).
-define(BROD_LOG_INFO(Fmt, Args), ?LOG_INFO( Fmt, Args, #{domain => [brod]})).
-define(BROD_LOG(Level, Fmt, Args), ?LOG(Level, Fmt, Args, #{domain => [brod]})).

-endif. % include brod_int.hrl

%%%_* Emacs ====================================================================
Expand Down
3 changes: 3 additions & 0 deletions rebar.config
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
%% -*- mode:erlang -*-
{deps, [ {supervisor3, "1.1.11"}
, {kafka_protocol, "2.3.6"}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe", {tag, "0.10.0"}}}
]}.

{edoc_opts, [{preprocess, true}, {macros, [{build_brod_cli, true}]}]}.
{erl_opts, [warn_unused_vars,warn_shadow_vars,warn_obsolete_guard,debug_info]}.
{xref_checks, [undefined_function_calls, undefined_functions,
Expand Down
3 changes: 1 addition & 2 deletions rebar.config.script
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ IsRebar3 = erlang:function_exported(rebar3, main, 1),
DocoptUrl = "https://github.com/zmstone/docopt-erl.git",
DocOptTag = "0.1.3",
DocoptDep = {docopt, {git, DocoptUrl, {branch, DocOptTag}}},
Snabbkaffe = {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe", {tag, "0.6.0"}}},
Profiles =
{profiles, [
{brod_cli, [
Expand All @@ -18,7 +17,7 @@ Profiles =
]}
]}]},
{test, [
{deps, [meck, proper, jsone, DocoptDep, Snabbkaffe]},
{deps, [meck, proper, jsone, bear, DocoptDep]},
{erl_opts, [{d, build_brod_cli}]}
]}
]},
Expand Down
4 changes: 4 additions & 0 deletions rebar.lock
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
{"1.2.0",
[{<<"crc32cer">>,{pkg,<<"crc32cer">>,<<"0.1.4">>},1},
{<<"kafka_protocol">>,{pkg,<<"kafka_protocol">>,<<"2.3.6">>},0},
{<<"snabbkaffe">>,
{git,"https://github.com/kafka4beam/snabbkaffe",
{ref,"74a0d6cd6bbe32581616aee96e245ff2065f4a48"}},
0},
{<<"snappyer">>,{pkg,<<"snappyer">>,<<"1.2.5">>},1},
{<<"supervisor3">>,{pkg,<<"supervisor3">>,<<"1.1.11">>},0}]}.
[
Expand Down
42 changes: 22 additions & 20 deletions src/brod_cg_commits.erl
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ init({Client, GroupInput}) ->
ProtocolName = proplists:get_value(protocol, GroupInput),
Retention = proplists:get_value(retention, GroupInput),
Offsets = proplists:get_value(offsets, GroupInput),
logger:update_process_metadata(#{ group_id => GroupId
, coor => self()
, domain => [brod, cg_commits]
}),
%% use callback_implemented strategy so I know I am elected leader
%% when `assign_partitions' callback is called.
Config = [ {partition_assignment_strategy, callback_implemented}
Expand All @@ -167,7 +171,7 @@ init({Client, GroupInput}) ->
{ok, State}.

handle_info(Info, State) ->
log(State, info, "Info discarded:~p", [Info]),
?tp(warning, "unknown message", #{info => Info}),
{noreply, State}.

handle_call(sync, From, State0) ->
Expand All @@ -177,7 +181,7 @@ handle_call(sync, From, State0) ->
handle_call({assign_partitions, Members, TopicPartitions}, _From,
#state{topic = MyTopic,
offsets = Offsets} = State) ->
log(State, info, "Assigning all topic partitions to self", []),
?LOG_INFO("Assingning all topic-partitions to self", []),
MyTP = [{MyTopic, P} || {P, _} <- Offsets],
%% Assert that my topic partitions are included in
%% subscriptions collected from ALL members
Expand All @@ -188,8 +192,8 @@ handle_call({assign_partitions, Members, TopicPartitions}, _From,
ok;
BadPartitions ->
PartitionNumbers = [P || {_T, P} <- BadPartitions],
log(State, error, "Nonexisting partitions in input: ~p",
[PartitionNumbers]),
?LOG_ERROR("Nonexisting partitions in input: ~p",
[PartitionNumbers]),
erlang:exit({non_existing_partitions, PartitionNumbers})
end,
%% To keep it simple, assign all my topic-partitions to self
Expand All @@ -209,9 +213,6 @@ handle_cast({new_assignments, _MemberId, GenerationId, Assignments},
, coordinator = Pid
, topic = MyTopic
} = State) ->
%% Write a log if I am not a leader,
%% hope the desired partitions are all assigned to me
IsLeader orelse log(State, info, "Not elected", []),
Groupped0 =
brod_utils:group_per_key(
fun(#brod_received_assignment{ topic = Topic
Expand All @@ -223,11 +224,18 @@ handle_cast({new_assignments, _MemberId, GenerationId, Assignments},
%% Discard other topics if for whatever reason the group leader assigns
%% irrelevant topic-partitions to me
Groupped = lists:filter(fun({Topic, _}) -> Topic =:= MyTopic end, Groupped0),
log(State, info, "current offsets:\n~p", [Groupped]),
%% Write leader election results to the log,
%% hope the desired partitions are all assigned to me
?tp(info, election_result,
#{ generation_id => GenerationId
, topic => MyTopic
, elected => IsLeader
, current_offsets => Groupped
}),
%% Assert all desired partitions are in assignment
case Groupped of
[] ->
log(State, error, "Topic ~s is not received in assignment", [MyTopic]),
?LOG_ERROR("Topic ~s is not received in assignment", [MyTopic]),
erlang:exit({bad_topic_assignment, Groupped0});
[{MyTopic, PartitionOffsetList}] ->
MyPartitions = [P || {P, _O} <- OffsetsToCommit],
Expand All @@ -236,10 +244,9 @@ handle_cast({new_assignments, _MemberId, GenerationId, Assignments},
[] ->
ok;
Left ->
log(State, error,
"Partitions ~p are not received in assignment, "
"There is probably another active group member subscribing "
"to topic ~s, stop it and retry\n", [MyTopic, Left]),
?LOG_ERROR("Partitions ~p are not received in assignment, "
"There is probably another active group member subscribing "
"to topic ~s, stop it and retry\n", [MyTopic, Left]),
erlang:exit({unexpected_assignments, Left})
end
end,
Expand All @@ -255,7 +262,7 @@ handle_cast({new_assignments, _MemberId, GenerationId, Assignments},
case brod_group_coordinator:commit_offsets(Pid) of
ok -> ok;
{error, Reason} ->
log(State, error, "Failed to commit, reason:\n~p", [Reason]),
?LOG_ERROR("Failed to commit, reason:\n~p", [Reason]),
erlang:exit(commit_failed)
end,
{noreply, set_done(State)};
Expand All @@ -281,7 +288,7 @@ maybe_reply_sync(#state{pending_sync = ?undef} = State) ->
State;
maybe_reply_sync(#state{pending_sync = From} = State) ->
gen_server:reply(From, ok),
log(State, info, "done\n", []),
?LOG_INFO("done\n", []),
State#state{pending_sync = ?undef}.

%% I am the current leader because I am assigning partitions.
Expand All @@ -294,11 +301,6 @@ assign_all_to_self([{MyMemberId, _} | Members], TopicPartitions) ->
| [{Id, []} || {Id, _MemberMeta} <- Members]
].

log(#state{groupId = GroupId}, Level, Fmt, Args) ->
?BROD_LOG(Level,
"Group member (~s,coor=~p):\n" ++ Fmt,
[GroupId, self() | Args]).

%%%_* Emacs ====================================================================
%%% Local Variables:
%%% allout-layout: t
Expand Down
49 changes: 26 additions & 23 deletions src/brod_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ deregister_consumer(Client, Topic, Partition) ->
%% @private
init({BootstrapEndpoints, ClientId, Config}) ->
erlang:process_flag(trap_exit, true),
logger:update_process_metadata(#{ domain => [brod, client]
, id => ClientId
}),
Tab = ets:new(?ETS(ClientId),
[named_table, protected, {read_concurrency, true}]),
self() ! init,
Expand All @@ -308,21 +311,21 @@ handle_info(init, State0) ->
handle_info({'EXIT', Pid, Reason}, #state{ client_id = ClientId
, producers_sup = Pid
} = State) ->
?BROD_LOG_ERROR("client ~p producers supervisor down~nReason: ~p",
[ClientId, Pid, Reason]),
?LOG_ERROR("client ~p producers supervisor down~nReason: ~p",
[ClientId, Pid, Reason]),
{stop, {producers_sup_down, Reason}, State};
handle_info({'EXIT', Pid, Reason}, #state{ client_id = ClientId
, consumers_sup = Pid
} = State) ->
?BROD_LOG_ERROR("client ~p consumers supervisor down~nReason: ~p",
[ClientId, Pid, Reason]),
?LOG_ERROR("client ~p consumers supervisor down~nReason: ~p",
[ClientId, Pid, Reason]),
{stop, {consumers_sup_down, Reason}, State};
handle_info({'EXIT', Pid, Reason}, State) ->
NewState = handle_connection_down(State, Pid, Reason),
{noreply, NewState};
handle_info(Info, State) ->
?BROD_LOG_WARNING("~p [~p] ~p got unexpected info: ~p",
[?MODULE, self(), State#state.client_id, Info]),
?LOG_WARNING("~p [~p] ~p got unexpected info: ~p",
[?MODULE, self(), State#state.client_id, Info]),
{noreply, State}.

%% @private
Expand Down Expand Up @@ -379,8 +382,8 @@ handle_cast({deregister, Key}, #state{workers_tab = Tab} = State) ->
ets:delete(Tab, Key),
{noreply, State};
handle_cast(Cast, State) ->
?BROD_LOG_WARNING("~p [~p] ~p got unexpected cast: ~p",
[?MODULE, self(), State#state.client_id, Cast]),
?LOG_WARNING("~p [~p] ~p got unexpected cast: ~p",
[?MODULE, self(), State#state.client_id, Cast]),
{noreply, State}.

%% @private
Expand All @@ -398,8 +401,8 @@ terminate(Reason, #state{ client_id = ClientId
true ->
ok;
false ->
?BROD_LOG_WARNING("~p [~p] ~p is terminating\nreason: ~p~n",
[?MODULE, self(), ClientId, Reason])
?LOG_WARNING("~p [~p] ~p is terminating\nreason: ~p~n",
[?MODULE, self(), ClientId, Reason])
end,
%% stop producers and consumers first because they are monitoring connections
shutdown_pid(ProducersSup),
Expand Down Expand Up @@ -538,9 +541,9 @@ do_get_metadata(Topic, #state{ client_id = ClientId
ok = update_partitions_count_cache(Ets, TopicMetadataArray),
{{ok, Metadata}, State};
{error, Reason} ->
?BROD_LOG_ERROR("~p failed to fetch metadata for topics: ~p\n"
"reason=~p",
[ClientId, Topics, Reason]),
?LOG_ERROR("~p failed to fetch metadata for topics: ~p\n"
"reason=~p",
[ClientId, Topics, Reason]),
{{error, Reason}, State}
end.

Expand Down Expand Up @@ -634,9 +637,9 @@ maybe_connect(#state{client_id = ClientId} = State,
true ->
connect(State, Endpoint);
false ->
?BROD_LOG_ERROR("~p (re)connect to ~s:~p aborted.\n"
"last failure: ~p",
[ClientId, Host, Port, Reason]),
?LOG_ERROR("~p (re)connect to ~s:~p aborted.\n"
"last failure: ~p",
[ClientId, Host, Port, Reason]),
{{error, Reason}, State}
end.

Expand All @@ -648,15 +651,15 @@ connect(#state{ client_id = ClientId
Conn =
case do_connect(Endpoint, State) of
{ok, Pid} ->
?BROD_LOG_INFO("client ~p connected to ~s:~p~n",
[ClientId, Host, Port]),
?LOG_INFO("client ~p connected to ~s:~p~n",
[ClientId, Host, Port]),
#conn{ endpoint = Endpoint
, pid = Pid
};
{error, Reason} ->
?BROD_LOG_ERROR("client ~p failed to connect to ~s:~p~n"
"reason:~p",
[ClientId, Host, Port, Reason]),
?LOG_ERROR("client ~p failed to connect to ~s:~p~n"
"reason:~p",
[ClientId, Host, Port, Reason]),
#conn{ endpoint = Endpoint
, pid = mark_dead(Reason)
}
Expand Down Expand Up @@ -684,8 +687,8 @@ handle_connection_down(#state{ payload_conns = Conns
} = State, Pid, Reason) ->
case lists:keytake(Pid, #conn.pid, Conns) of
{value, #conn{endpoint = {Host, Port}} = Conn, Rest} ->
?BROD_LOG_INFO("client ~p: payload connection down ~s:~p~n"
"reason:~p", [ClientId, Host, Port, Reason]),
?LOG_INFO("client ~p: payload connection down ~s:~p~n"
"reason:~p", [ClientId, Host, Port, Reason]),
NewConn = Conn#conn{pid = mark_dead(Reason)},
State#state{payload_conns = [NewConn | Rest]};
false ->
Expand Down
34 changes: 19 additions & 15 deletions src/brod_consumer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ get_connection(Pid) ->

init({Bootstrap, Topic, Partition, Config}) ->
erlang:process_flag(trap_exit, true),
logger:update_process_metadata(#{ topic => Topic
, partition => Partition
, domain => [brod, consumer]
}),
Cfg = fun(Name, Default) ->
proplists:get_value(Name, Config, Default)
end,
Expand Down Expand Up @@ -333,8 +337,8 @@ handle_info({'EXIT', Pid, _Reason}, #state{connection = Pid} = State) ->
%% standalone connection spawn-linked to self()
{noreply, handle_conn_down(State)};
handle_info(Info, State) ->
?BROD_LOG_WARNING("~p ~p got unexpected info: ~p",
[?MODULE, self(), Info]),
?LOG_WARNING("~p ~p got unexpected info: ~p",
[?MODULE, self(), Info]),
{noreply, State}.

%% @private
Expand Down Expand Up @@ -382,8 +386,8 @@ handle_cast({ack, Offset}, #state{pending_acks = PendingAcks} = State0) ->
State = maybe_send_fetch_request(State1),
{noreply, State};
handle_cast(Cast, State) ->
?BROD_LOG_WARNING("~p ~p got unexpected cast: ~p",
[?MODULE, self(), Cast]),
?LOG_WARNING("~p ~p got unexpected cast: ~p",
[?MODULE, self(), Cast]),
{noreply, State}.

%% @private
Expand All @@ -403,8 +407,8 @@ terminate(Reason, #state{ bootstrap = Bootstrap
false -> ok
end,
%% write a log if it's not a normal reason
IsNormal orelse ?BROD_LOG_ERROR("Consumer ~s-~w terminate reason: ~p",
[Topic, Partition, Reason]),
IsNormal orelse ?LOG_ERROR("Consumer ~s-~w terminate reason: ~p",
[Topic, Partition, Reason]),
ok.

%% @private
Expand Down Expand Up @@ -579,8 +583,8 @@ handle_fetch_error(#kafka_fetch_error{error_code = ErrorCode} = Error,
} = State) ->
case err_op(ErrorCode) of
reset_connection ->
?BROD_LOG_INFO("Fetch error ~s-~p: ~p",
[Topic, Partition, ErrorCode]),
?LOG_INFO("Fetch error ~s-~p: ~p",
[Topic, Partition, ErrorCode]),
%% The current connection in use is not connected to the partition leader,
%% so we dereference and demonitor the connection pid, but leave it alive,
%% Can not kill it because it might be shared with other partition workers
Expand All @@ -596,8 +600,8 @@ handle_fetch_error(#kafka_fetch_error{error_code = ErrorCode} = Error,
{noreply, maybe_send_fetch_request(State)};
stop ->
ok = cast_to_subscriber(Subscriber, Error),
?BROD_LOG_ERROR("Consumer ~s-~p shutdown\nReason: ~p",
[Topic, Partition, ErrorCode]),
?LOG_ERROR("Consumer ~s-~p shutdown\nReason: ~p",
[Topic, Partition, ErrorCode]),
{stop, normal, State};
reset_offset ->
handle_reset_offset(State, Error);
Expand All @@ -611,13 +615,13 @@ handle_reset_offset(#state{ subscriber = Subscriber
} = State, Error) ->
ok = cast_to_subscriber(Subscriber, Error),
%% Suspend, no more fetch request until the subscriber re-subscribes
?BROD_LOG_INFO("~p ~p consumer is suspended, "
"waiting for subscriber ~p to resubscribe with "
"new begin_offset", [?MODULE, self(), Subscriber]),
?LOG_INFO("~p ~p consumer is suspended, "
"waiting for subscriber ~p to resubscribe with "
"new begin_offset", [?MODULE, self(), Subscriber]),
{noreply, State#state{is_suspended = true}};
handle_reset_offset(#state{offset_reset_policy = Policy} = State, _Error) ->
?BROD_LOG_INFO("~p ~p offset out of range, applying reset policy ~p",
[?MODULE, self(), Policy]),
?LOG_INFO("~p ~p offset out of range, applying reset policy ~p",
[?MODULE, self(), Policy]),
BeginOffset = case Policy of
reset_to_earliest -> ?OFFSET_EARLIEST;
reset_to_latest -> ?OFFSET_LATEST
Expand Down
Loading