diff --git a/src/brod_consumer.erl b/src/brod_consumer.erl index 2b2daec9..41d5db73 100644 --- a/src/brod_consumer.erl +++ b/src/brod_consumer.erl @@ -115,6 +115,7 @@ , prefetch_bytes :: non_neg_integer() , connection_mref :: ?undef | reference() , isolation_level :: isolation_level() + , rack_id :: ?undef | binary() }). -type state() :: #state{}. @@ -301,6 +302,7 @@ init({Bootstrap, Topic, Partition, Config}) -> BeginOffset = Cfg(begin_offset, ?DEFAULT_BEGIN_OFFSET), OffsetResetPolicy = Cfg(offset_reset_policy, ?DEFAULT_OFFSET_RESET_POLICY), IsolationLevel = Cfg(isolation_level, ?DEFAULT_ISOLATION_LEVEL), + RackId = Cfg(rack_id, ?undef), %% If bootstrap is a client pid, register self to the client case is_shared_conn(Bootstrap) of @@ -328,6 +330,7 @@ init({Bootstrap, Topic, Partition, Config}) -> , size_stat_window = Cfg(size_stat_window, ?DEFAULT_AVG_WINDOW) , connection_mref = ?undef , isolation_level = IsolationLevel + , rack_id = RackId }}. %% @private @@ -730,7 +733,8 @@ send_fetch_request(#state{ begin_offset = BeginOffset State#state.max_wait_time, State#state.min_bytes, State#state.max_bytes, - State#state.isolation_level), + State#state.isolation_level, + State#state.rack_id), case kpro:request_async(Connection, Request) of ok -> State#state{last_req_ref = Request#kpro_req.ref}; diff --git a/src/brod_kafka_request.erl b/src/brod_kafka_request.erl index a934cc51..ab4dc424 100644 --- a/src/brod_kafka_request.erl +++ b/src/brod_kafka_request.erl @@ -20,6 +20,7 @@ -export([ create_topics/3 , delete_topics/3 , fetch/8 + , fetch/9 , list_groups/1 , list_offsets/4 , join_group/2 @@ -89,6 +90,20 @@ fetch(Pid, Topic, Partition, Offset, , isolation_level => IsolationLevel }). +-spec fetch(conn(), topic(), partition(), offset(), + kpro:wait(), kpro:count(), kpro:count(), + kpro:isolation_level(), ?undef | binary()) -> kpro:req(). +fetch(Pid, Topic, Partition, Offset, + WaitTime, MinBytes, MaxBytes, IsolationLevel, RackId) -> + Vsn = pick_version(fetch, Pid), + kpro_req_lib:fetch(Vsn, Topic, Partition, Offset, + #{ max_wait_time => WaitTime + , min_bytes => MinBytes + , max_bytes => MaxBytes + , isolation_level => IsolationLevel + , rack_id => RackId + }). + %% @doc Make a `list_offsets' request message for offset resolution. %% In kafka protocol, -2 and -1 are semantic 'time' to request for %% 'earliest' and 'latest' offsets.