@@ -4182,6 +4182,119 @@ public void testEmptyBatch() throws Exception {
41824182 assertNull (complete1 .get (5 , TimeUnit .SECONDS ));
41834183 }
41844184
4185+ @ Test
4186+ public void testRecordAppendLingerTime () throws Exception {
4187+ MockTimer timer = new MockTimer ();
4188+
4189+ // Writer sleeps for 10ms before appending records.
4190+ MockPartitionWriter writer = new MockPartitionWriter (timer .time (), Integer .MAX_VALUE , false );
4191+ CoordinatorRuntimeMetrics runtimeMetrics = mock (CoordinatorRuntimeMetrics .class );
4192+
4193+ CoordinatorRuntime <MockCoordinatorShard , String > runtime =
4194+ new CoordinatorRuntime .Builder <MockCoordinatorShard , String >()
4195+ .withTime (timer .time ())
4196+ .withTimer (timer )
4197+ .withDefaultWriteTimeOut (Duration .ofMillis (20 ))
4198+ .withLoader (new MockCoordinatorLoader ())
4199+ .withEventProcessor (new DirectEventProcessor ())
4200+ .withPartitionWriter (writer )
4201+ .withCoordinatorShardBuilderSupplier (new MockCoordinatorShardBuilderSupplier ())
4202+ .withCoordinatorRuntimeMetrics (runtimeMetrics )
4203+ .withCoordinatorMetrics (mock (CoordinatorMetrics .class ))
4204+ .withSerializer (new StringSerializer ())
4205+ .withAppendLingerMs (10 )
4206+ .withExecutorService (mock (ExecutorService .class ))
4207+ .build ();
4208+
4209+ // Schedule the loading.
4210+ runtime .scheduleLoadOperation (TP , 10 );
4211+
4212+ // Verify the initial state.
4213+ CoordinatorRuntime <MockCoordinatorShard , String >.CoordinatorContext ctx = runtime .contextOrThrow (TP );
4214+ assertNull (ctx .currentBatch );
4215+
4216+ // Get the max batch size.
4217+ int maxBatchSize = writer .config (TP ).maxMessageSize ();
4218+
4219+ // Create records with a quarter of the max batch size each. Keep in mind that
4220+ // each batch has a header so it is not possible to have those four records
4221+ // in one single batch.
4222+ List <String > records = Stream .of ('1' , '2' , '3' , '4' ).map (c -> {
4223+ char [] payload = new char [maxBatchSize / 4 ];
4224+ Arrays .fill (payload , c );
4225+ return new String (payload );
4226+ }).collect (Collectors .toList ());
4227+
4228+ // Write #1 with two records.
4229+ long firstBatchTimestamp = timer .time ().milliseconds ();
4230+ CompletableFuture <String > write1 = runtime .scheduleWriteOperation ("write#1" , TP , Duration .ofMillis (50 ),
4231+ state -> new CoordinatorResult <>(records .subList (0 , 2 ), "response1" )
4232+ );
4233+
4234+ // A batch has been created.
4235+ assertNotNull (ctx .currentBatch );
4236+
4237+ // Write #2 with one record.
4238+ CompletableFuture <String > write2 = runtime .scheduleWriteOperation ("write#2" , TP , Duration .ofMillis (50 ),
4239+ state -> new CoordinatorResult <>(records .subList (2 , 3 ), "response2" )
4240+ );
4241+
4242+ // Verify the state. Records are replayed but no batch written.
4243+ assertEquals (List .of (), writer .entries (TP ));
4244+ verify (runtimeMetrics , times (0 )).recordFlushTime (10 );
4245+
4246+ // Write #3 with one record. This one cannot go into the existing batch
4247+ // so the existing batch should be flushed and a new one should be created.
4248+ long secondBatchTimestamp = timer .time ().milliseconds ();
4249+ CompletableFuture <String > write3 = runtime .scheduleWriteOperation ("write#3" , TP , Duration .ofMillis (50 ),
4250+ state -> new CoordinatorResult <>(records .subList (3 , 4 ), "response3" )
4251+ );
4252+
4253+ // Verify the state. Records are replayed. The previous batch
4254+ // got flushed with all the records but the new one from #3.
4255+ // The new batch's timestamp comes from before the flush.
4256+ assertEquals (3L , ctx .coordinator .lastWrittenOffset ());
4257+ assertEquals (0L , ctx .coordinator .lastCommittedOffset ());
4258+ assertEquals (List .of (
4259+ new MockCoordinatorShard .RecordAndMetadata (0 , records .get (0 )),
4260+ new MockCoordinatorShard .RecordAndMetadata (1 , records .get (1 )),
4261+ new MockCoordinatorShard .RecordAndMetadata (2 , records .get (2 )),
4262+ new MockCoordinatorShard .RecordAndMetadata (3 , records .get (3 ))
4263+ ), ctx .coordinator .coordinator ().fullRecords ());
4264+ assertEquals (List .of (
4265+ records (firstBatchTimestamp , records .subList (0 , 3 ))
4266+ ), writer .entries (TP ));
4267+ verify (runtimeMetrics , times (1 )).recordLingerTime (0 );
4268+
4269+ // Advance past the linger time.
4270+ timer .advanceClock (11 );
4271+
4272+ // Verify the state. The pending batch is flushed.
4273+ assertEquals (4L , ctx .coordinator .lastWrittenOffset ());
4274+ assertEquals (0L , ctx .coordinator .lastCommittedOffset ());
4275+ assertEquals (List .of (
4276+ new MockCoordinatorShard .RecordAndMetadata (0 , records .get (0 )),
4277+ new MockCoordinatorShard .RecordAndMetadata (1 , records .get (1 )),
4278+ new MockCoordinatorShard .RecordAndMetadata (2 , records .get (2 )),
4279+ new MockCoordinatorShard .RecordAndMetadata (3 , records .get (3 ))
4280+ ), ctx .coordinator .coordinator ().fullRecords ());
4281+ assertEquals (List .of (
4282+ records (secondBatchTimestamp , records .subList (0 , 3 )),
4283+ records (secondBatchTimestamp , records .subList (3 , 4 ))
4284+ ), writer .entries (TP ));
4285+ verify (runtimeMetrics , times (1 )).recordLingerTime (21 );
4286+
4287+ // Commit and verify that writes are completed.
4288+ writer .commit (TP );
4289+ assertTrue (write1 .isDone ());
4290+ assertTrue (write2 .isDone ());
4291+ assertTrue (write3 .isDone ());
4292+ assertEquals (4L , ctx .coordinator .lastCommittedOffset ());
4293+ assertEquals ("response1" , write1 .get (5 , TimeUnit .SECONDS ));
4294+ assertEquals ("response2" , write2 .get (5 , TimeUnit .SECONDS ));
4295+ assertEquals ("response3" , write3 .get (5 , TimeUnit .SECONDS ));
4296+ }
4297+
41854298 @ Test
41864299 public void testRecordFlushTime () throws Exception {
41874300 MockTimer timer = new MockTimer ();
0 commit comments