From acee822346558cbf8c93558738631085ed37be08 Mon Sep 17 00:00:00 2001
From: ditas <ditasandditas@gmail.com>
Date: Tue, 29 Nov 2022 12:26:03 +0000
Subject: [PATCH] Add support for rack id

---
 src/brod_consumer.erl      |  6 +++++-
 src/brod_kafka_request.erl | 15 +++++++++++++++
 2 files changed, 20 insertions(+), 1 deletion(-)

diff --git a/src/brod_consumer.erl b/src/brod_consumer.erl
index 2b2daec9..41d5db73 100644
--- a/src/brod_consumer.erl
+++ b/src/brod_consumer.erl
@@ -115,6 +115,7 @@
                , prefetch_bytes      :: non_neg_integer()
                , connection_mref     :: ?undef | reference()
                , isolation_level     :: isolation_level()
+               , rack_id             :: ?undef | binary()
                }).
 
 -type state() :: #state{}.
@@ -301,6 +302,7 @@ init({Bootstrap, Topic, Partition, Config}) ->
   BeginOffset = Cfg(begin_offset, ?DEFAULT_BEGIN_OFFSET),
   OffsetResetPolicy = Cfg(offset_reset_policy, ?DEFAULT_OFFSET_RESET_POLICY),
   IsolationLevel = Cfg(isolation_level, ?DEFAULT_ISOLATION_LEVEL),
+  RackId = Cfg(rack_id, ?undef),
 
   %% If bootstrap is a client pid, register self to the client
   case is_shared_conn(Bootstrap) of
@@ -328,6 +330,7 @@ init({Bootstrap, Topic, Partition, Config}) ->
              , size_stat_window    = Cfg(size_stat_window, ?DEFAULT_AVG_WINDOW)
              , connection_mref     = ?undef
              , isolation_level     = IsolationLevel
+             , rack_id             = RackId
              }}.
 
 %% @private
@@ -730,7 +733,8 @@ send_fetch_request(#state{ begin_offset = BeginOffset
                              State#state.max_wait_time,
                              State#state.min_bytes,
                              State#state.max_bytes,
-                             State#state.isolation_level),
+                             State#state.isolation_level,
+                             State#state.rack_id),
   case kpro:request_async(Connection, Request) of
     ok ->
       State#state{last_req_ref = Request#kpro_req.ref};
diff --git a/src/brod_kafka_request.erl b/src/brod_kafka_request.erl
index a934cc51..ab4dc424 100644
--- a/src/brod_kafka_request.erl
+++ b/src/brod_kafka_request.erl
@@ -20,6 +20,7 @@
 -export([ create_topics/3
         , delete_topics/3
         , fetch/8
+        , fetch/9
         , list_groups/1
         , list_offsets/4
         , join_group/2
@@ -89,6 +90,20 @@ fetch(Pid, Topic, Partition, Offset,
                       , isolation_level => IsolationLevel
                       }).
 
+-spec fetch(conn(), topic(), partition(), offset(),
+            kpro:wait(), kpro:count(), kpro:count(),
+            kpro:isolation_level(), ?undef | binary()) -> kpro:req().
+fetch(Pid, Topic, Partition, Offset,
+      WaitTime, MinBytes, MaxBytes, IsolationLevel, RackId) ->
+  Vsn = pick_version(fetch, Pid),
+  kpro_req_lib:fetch(Vsn, Topic, Partition, Offset,
+                      #{ max_wait_time => WaitTime
+                      , min_bytes => MinBytes
+                      , max_bytes => MaxBytes
+                      , isolation_level => IsolationLevel
+                      , rack_id => RackId
+                      }).
+
 %% @doc Make a `list_offsets' request message for offset resolution.
 %% In kafka protocol, -2 and -1 are semantic 'time' to request for
 %% 'earliest' and 'latest' offsets.