-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds utils.BatchReadChannel()
utility
#52342
base: master
Are you sure you want to change the base?
Conversation
Adds a utiity function thet blocks on a channel until it receives a message, and then reads all the messages in the channel until a read would block. This is used by the IC and provisioning integrations to handle monitoring events in batches to minimize re-work.
I don't quite follow why this is necessary.
Isn't this exactly how a for range loop would behave? |
Range loops are awkward to use with context. |
Sure, then do a for loop with a select covering the channel and the context. The behavior here looks like it will be racy. We do one spot check to see if anything is sitting in the channel after a read - if we miss it by a millisecond, the batch read operation returns and we have to call it again to pick up the item we just missed. Why is that behavior preferable to a loop that picks up new items as they become available? |
It's the batching that is important here, i.e. draining all of the currently pending messages in the channel. The motivating examples are the Provisioning and Identity Center services. In those, I specifically want to read a batch of messages so I can so some filter out duplicate or obsolete events in the batch and do less work. The events generated by the resource monitor for these services tend to be quite bursty (for example; you can get a bunch of notifications for a single Access List change, because you get notified for both the Access List itself and once for each member, because I need to watch for individual member changes as well). There are also other events that cause those services to recalculate everyone's state, so any outstanding requests to re-evaluate an individual User or Access List's state immediately become obsolete. And multiple "recalculate everything" events can be issued together, and so they should obviously be merged into one if at all possible. Usage looks something like this: for some_condition {
pendingMessages := make(map[string]*Msg)
for msg, ok := range BatchReadChannel(ctx, ch, maxBatchSize) {
if !ok {
return
}
// last message received for any given target wins
pendingMessages[msg.target] = msg
}
for _, msg := range pendingMessages {
// handle single message for each affected target
}
} In this case, it's not 100% critical that duplicate events are handled only once, its just wasteful if they aren't. Handling the events in batches lets me filter out a bunch of duplicate events, while still being responsive. It's a pretty niche use case, so I can understand if |
I'm hesitant of any code that attempts to implement some sort of new concurrency primitive, especially when it's put in a generic package that encourages reuse. If batching is what you care about then why not limit your batches based on a time interval instead of based on a timing-dependent emptiness check? // read batch reads as many items as it can from ch within d.
func readBatch(d time.Duration, ch <-chan foo) []foo {
deadline := time.After(d)
var result []foo
for {
select {
case <- deadline:
return result
case f := <- ch:
foo = append(foo, f)
}
}
} You can even extend this and add a max argument so that you can limit the size of a batch. This comes from someone who has written a number flaky tests due to a select with a default clause 😇 |
Adds a utiity function thet blocks on a channel until it receives a message,
and then reads all the messages in the channel until a read would block.
This is used by the IC and provisioning integrations to handle monitoring
events in batches to minimize re-work.