This repository was archived by the owner on Feb 28, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathflow.go
More file actions
51 lines (41 loc) · 1.17 KB
/
flow.go
File metadata and controls
51 lines (41 loc) · 1.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package executor
import (
"context"
"fmt"
"golang.org/x/sync/semaphore"
)
type flow struct {
maxActions int64
actions *semaphore.Weighted
calls *semaphore.Weighted
ex Interface
}
// ControlFlow decorates an Executor, limiting it to a maximum concurrent
// number of calls and actions.
func ControlFlow(e Interface, maxCalls, maxActions int64) Interface {
return flow{
ex: e,
maxActions: maxActions,
calls: semaphore.NewWeighted(maxCalls),
actions: semaphore.NewWeighted(maxActions),
}
}
// Execute attempts to acquire the semaphores for the concurrent calls and
// actions before delegating to the decorated Executor. If Execute is called
// with more actions than maxActions, an error is returned.
func (f flow) Execute(ctx context.Context, actions ...Action) error {
qty := int64(len(actions))
if qty > f.maxActions {
return fmt.Errorf("maximum %d actions allowed", f.maxActions)
}
if err := f.calls.Acquire(ctx, 1); err != nil {
return err
}
defer f.calls.Release(1)
if err := f.actions.Acquire(ctx, qty); err != nil {
return err
}
defer f.actions.Release(qty)
return f.ex.Execute(ctx, actions...)
}
var _ Interface = flow{}