@@ -50,6 +50,7 @@ type watchable interface {
5050
5151type watchableStore struct {
5252 * store
53+ watchBatchMaxSize int
5354
5455 // mu protects watcher groups and batches. It should never be locked
5556 // before locking store.mu to avoid deadlock.
@@ -76,24 +77,30 @@ var _ WatchableKV = (*watchableStore)(nil)
7677// cancel operations.
7778type cancelFunc func ()
7879
79- func New (lg * zap.Logger , b backend.Backend , le lease.Lessor , cfg StoreConfig ) * watchableStore {
80+ func New (lg * zap.Logger , b backend.Backend , le lease.Lessor , cfg WatchableStoreConfig ) * watchableStore {
8081 s := newWatchableStore (lg , b , le , cfg )
8182 s .wg .Add (2 )
8283 go s .syncWatchersLoop ()
8384 go s .syncVictimsLoop ()
8485 return s
8586}
8687
87- func newWatchableStore (lg * zap.Logger , b backend.Backend , le lease.Lessor , cfg StoreConfig ) * watchableStore {
88+ type WatchableStoreConfig struct {
89+ StoreConfig
90+ WatchBatchMaxSize int
91+ }
92+
93+ func newWatchableStore (lg * zap.Logger , b backend.Backend , le lease.Lessor , cfg WatchableStoreConfig ) * watchableStore {
8894 if lg == nil {
8995 lg = zap .NewNop ()
9096 }
9197 s := & watchableStore {
92- store : NewStore (lg , b , le , cfg ),
93- victimc : make (chan struct {}, 1 ),
94- unsynced : newWatcherGroup (),
95- synced : newWatcherGroup (),
96- stopc : make (chan struct {}),
98+ store : NewStore (lg , b , le , cfg .StoreConfig ),
99+ victimc : make (chan struct {}, 1 ),
100+ unsynced : newWatcherGroup (),
101+ synced : newWatcherGroup (),
102+ stopc : make (chan struct {}),
103+ watchBatchMaxSize : cfg .WatchBatchMaxSize ,
97104 }
98105 s .store .ReadView = & readView {s }
99106 s .store .WriteView = & writeView {s }
@@ -373,7 +380,7 @@ func (s *watchableStore) syncWatchers() int {
373380 tx .RUnlock ()
374381
375382 victims := make (watcherBatch )
376- wb := newWatcherBatch (wg , evs )
383+ wb := newWatcherBatch (wg , evs , s . watchBatchMaxSize )
377384 for w := range wg .watchers {
378385 if w .minRev < compactionRev {
379386 // Skip the watcher that failed to send compacted watch response due to w.ch is full.
@@ -449,7 +456,7 @@ func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []m
449456// watchers that watch on the key of the event.
450457func (s * watchableStore ) notify (rev int64 , evs []mvccpb.Event ) {
451458 victim := make (watcherBatch )
452- for w , eb := range newWatcherBatch (& s .synced , evs ) {
459+ for w , eb := range newWatcherBatch (& s .synced , evs , s . watchBatchMaxSize ) {
453460 if w .send (WatchResponse {WatchID : w .id , Events : eb .evs , Revision : rev }) {
454461 pendingEventsGauge .Add (float64 (len (eb .evs )))
455462 } else {
0 commit comments