diff --git a/tests/integration/v3_watch_test.go b/tests/integration/v3_watch_test.go index 21140da097b..971fd1c7365 100644 --- a/tests/integration/v3_watch_test.go +++ b/tests/integration/v3_watch_test.go @@ -1585,3 +1585,78 @@ func TestV3NoEventsLostOnCompact(t *testing.T) { } assert.Truef(t, compacted, "Expected stream to get compacted, instead we got %d events out of %d events", eventCount, writeCount) } + +// TestV3WatchLargeTxnFragmentTrue tests watching multiple large transactions w/ fragment enabled. +func TestV3WatchLargeTxnFragmentTrue(t *testing.T) { + integration.BeforeTest(t) + testV3WatchLargeTxn(t, true) +} + +// TestV3WatchLargeTxnFragmentFalse tests watching multiple large transactions w/ fragment disabled. +// +// Even though each event is only 1.2 MB which is below the maximum response size, the server +// attempts to return all four events in one 4.8 MB response which fails. +func TestV3WatchLargeTxnFragmentFalse(t *testing.T) { + integration.BeforeTest(t) + testV3WatchLargeTxn(t, false) +} + +// testV3WatchLargeTxn tests watching multiple large transactions +func testV3WatchLargeTxn(t *testing.T, fragment bool) { + clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + wStream, err := integration.ToGRPC(clus.RandClient()).Watch.Watch(ctx) + require.Nil(t, err) + + // Put 12 keys w/ different values of length 100KB in one transaction 4 times. + var n = 4 + var m = 12 + var rev []int64 + var svc = integration.ToGRPC(clus.RandClient()).KV + for j := range n { + req := &pb.TxnRequest{Success: []*pb.RequestOp{}} + for i := range m { + req.Success = append(req.Success, &pb.RequestOp{ + Request: &pb.RequestOp_RequestPut{ + RequestPut: &pb.PutRequest{ + Key: []byte(fmt.Sprintf(`test-watch-fragment-%03d`, i)), + Value: bytes.Repeat([]byte{'a' + uint8(i+j)}, 1e5), + }, + }, + }) + } + resp, err := svc.Txn(ctx, req) + require.Nil(t, err, err) + assert.True(t, resp.Succeeded) + rev = append(rev, resp.Header.Revision) + } + + // Create Watch + err = wStream.Send(&pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ + CreateRequest: &pb.WatchCreateRequest{ + Key: []byte(`test-watch-fragment-000`), + RangeEnd: []byte(`test-watch-fragment-999`), + StartRevision: rev[0], + Fragment: fragment, + }, + }}) + require.Nil(t, err) + resp, err := wStream.Recv() + require.Nil(t, err) + require.NotNil(t, resp) + require.True(t, resp.Created) + + // Receive events + var events []*mvccpb.Event + for len(events) < n*m { + resp, err := wStream.Recv() + require.Nil(t, err, err) + require.NotNil(t, resp) + events = append(events, resp.Events...) + } + // Last response header revision should match mod revision of last returned event + assert.Equal(t, rev[len(rev)-1], events[len(events)-1].Kv.ModRevision) +}