@@ -10,11 +10,11 @@ import (
1010 "github.com/go-kit/log"
1111 "github.com/prometheus/client_golang/prometheus"
1212 "github.com/stretchr/testify/require"
13- "github.com/twmb/franz-go/pkg/kgo"
1413
1514 "github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj"
1615 "github.com/grafana/loki/v3/pkg/dataobj/metastore"
1716 "github.com/grafana/loki/v3/pkg/dataobj/uploader"
17+ "github.com/grafana/loki/v3/pkg/kafka/partition"
1818 "github.com/grafana/loki/v3/pkg/logproto"
1919 "github.com/grafana/loki/v3/pkg/scratch"
2020
@@ -49,9 +49,9 @@ func TestPartitionProcessor_Flush(t *testing.T) {
4949 }
5050 b , err := s .Marshal ()
5151 require .NoError (t , err )
52- p .processRecord (& kgo .Record {
53- Key : [] byte ( "test-tenant" ) ,
54- Value : b ,
52+ p .processRecord (partition .Record {
53+ TenantID : "test-tenant" ,
54+ Content : b ,
5555 Timestamp : now ,
5656 })
5757
@@ -90,9 +90,9 @@ func TestPartitionProcessor_Flush(t *testing.T) {
9090 }
9191 b , err := s .Marshal ()
9292 require .NoError (t , err )
93- p .processRecord (& kgo .Record {
94- Key : [] byte ( "test-tenant" ) ,
95- Value : b ,
93+ p .processRecord (partition .Record {
94+ TenantID : "test-tenant" ,
95+ Content : b ,
9696 Timestamp : now ,
9797 })
9898
@@ -144,9 +144,9 @@ func TestPartitionProcessor_IdleFlush(t *testing.T) {
144144 }
145145 b , err := s .Marshal ()
146146 require .NoError (t , err )
147- p .processRecord (& kgo .Record {
148- Key : [] byte ( "test-tenant" ) ,
149- Value : b ,
147+ p .processRecord (partition .Record {
148+ TenantID : "test-tenant" ,
149+ Content : b ,
150150 Timestamp : clock .Now (),
151151 })
152152 // A modification should have happened.
@@ -189,36 +189,36 @@ func TestPartitionProcessor_OffsetsCommitted(t *testing.T) {
189189 }
190190 b , err := s .Marshal ()
191191 require .NoError (t , err )
192- p .processRecord (& kgo .Record {
193- Key : [] byte ( "test-tenant" ) ,
194- Value : b ,
192+ p .processRecord (partition .Record {
193+ TenantID : "test-tenant" ,
194+ Content : b ,
195195 Timestamp : now1 ,
196196 Offset : 1 ,
197197 })
198198
199199 // No flush should have occurred and no offsets should be committed.
200200 require .True (t , p .lastFlushed .IsZero ())
201- require .Nil (t , committer .records )
201+ require .Nil (t , committer .offsets )
202202
203203 // Mark the builder as full.
204204 wrappedBuilder .nextErr = logsobj .ErrBuilderFull
205205
206206 // Append another record.
207207 clock .Advance (time .Minute )
208208 now2 := clock .Now ()
209- p .processRecord (& kgo .Record {
210- Key : [] byte ( "test-tenant" ) ,
211- Value : b ,
209+ p .processRecord (partition .Record {
210+ TenantID : "test-tenant" ,
211+ Content : b ,
212212 Timestamp : now2 ,
213213 Offset : 2 ,
214214 })
215215
216216 // A flush should have occurred and offsets should be committed.
217217 require .Equal (t , now2 , p .lastFlushed )
218- require .Len (t , committer .records , 1 )
218+ require .Len (t , committer .offsets , 1 )
219219 // The offset committed should be the offset of the first record, as that
220220 // was the record that was flushed.
221- require .Equal (t , int64 (1 ), committer .records [0 ]. Offset )
221+ require .Equal (t , int64 (1 ), committer .offsets [0 ])
222222 })
223223
224224 t .Run ("when idle timeout is exceeded" , func (t * testing.T ) {
@@ -240,16 +240,16 @@ func TestPartitionProcessor_OffsetsCommitted(t *testing.T) {
240240 }
241241 b , err := s .Marshal ()
242242 require .NoError (t , err )
243- p .processRecord (& kgo .Record {
244- Key : [] byte ( "test-tenant" ) ,
245- Value : b ,
243+ p .processRecord (partition .Record {
244+ TenantID : "test-tenant" ,
245+ Content : b ,
246246 Timestamp : now1 ,
247247 Offset : 1 ,
248248 })
249249
250250 // No flush should have occurred and no offsets should be committed.
251251 require .True (t , p .lastFlushed .IsZero ())
252- require .Nil (t , committer .records )
252+ require .Nil (t , committer .offsets )
253253
254254 // Advance the clock past the idle timeout.
255255 clock .Advance (61 * time .Minute )
@@ -260,11 +260,11 @@ func TestPartitionProcessor_OffsetsCommitted(t *testing.T) {
260260
261261 // A flush should have occurred and offsets should be committed.
262262 require .Equal (t , now2 , p .lastFlushed )
263- require .Len (t , committer .records , 1 )
263+ require .Len (t , committer .offsets , 1 )
264264
265265 // The offset committed should be the offset of the first record, as that
266266 // was the record that was flushed.
267- require .Equal (t , int64 (1 ), committer .records [0 ]. Offset )
267+ require .Equal (t , int64 (1 ), committer .offsets [0 ])
268268 })
269269}
270270
@@ -286,9 +286,9 @@ func TestPartitionProcessor_ProcessRecord(t *testing.T) {
286286 }
287287 b , err := s .Marshal ()
288288 require .NoError (t , err )
289- p .processRecord (& kgo .Record {
290- Key : [] byte ( "test-tenant" ) ,
291- Value : b ,
289+ p .processRecord (partition .Record {
290+ TenantID : "test-tenant" ,
291+ Content : b ,
292292 Timestamp : clock .Now (),
293293 })
294294
@@ -300,16 +300,14 @@ func TestPartitionProcessor_ProcessRecord(t *testing.T) {
300300func newTestPartitionProcessor (_ * testing.T , clock quartz.Clock ) * partitionProcessor {
301301 p := newPartitionProcessor (
302302 context .Background (),
303- & kgo. Client {},
303+ & mockCommitter {},
304304 testBuilderConfig ,
305305 uploader.Config {},
306306 metastore.Config {
307307 PartitionRatio : 10 ,
308308 },
309309 newMockBucket (),
310310 nil ,
311- "topic" ,
312- 0 ,
313311 log .NewNopLogger (),
314312 prometheus .NewRegistry (),
315313 60 * time .Minute ,
0 commit comments