-
Notifications
You must be signed in to change notification settings - Fork 164
/
Copy pathutil.go
179 lines (158 loc) · 4.71 KB
/
util.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package flow
import (
"fmt"
"sync"
"github.com/reugn/go-streams"
)
// DoStream streams data from the outlet to inlet.
func DoStream(outlet streams.Outlet, inlet streams.Inlet) {
go func() {
for element := range outlet.Out() {
inlet.In() <- element
}
close(inlet.In())
}()
}
// Split splits the stream into two flows according to the given boolean predicate.
// T specifies the incoming and outgoing element type.
func Split[T any](outlet streams.Outlet, predicate func(T) bool) [2]streams.Flow {
condTrue := NewPassThrough()
condFalse := NewPassThrough()
go func() {
for element := range outlet.Out() {
if predicate(element.(T)) {
condTrue.In() <- element
} else {
condFalse.In() <- element
}
}
close(condTrue.In())
close(condFalse.In())
}()
return [...]streams.Flow{condTrue, condFalse}
}
// FanOut creates a number of identical flows from the single outlet.
// This can be useful when writing to multiple sinks is required.
func FanOut(outlet streams.Outlet, magnitude int) []streams.Flow {
out := make([]streams.Flow, magnitude)
for i := 0; i < magnitude; i++ {
out[i] = NewPassThrough()
}
go func() {
for element := range outlet.Out() {
for _, flow := range out {
flow.In() <- element
}
}
for _, flow := range out {
close(flow.In())
}
}()
return out
}
// RoundRobin creates a balanced number of flows from the single outlet.
// This can be useful when work can be parallelized across multiple cores.
func RoundRobin(outlet streams.Outlet, magnitude int) []streams.Flow {
out := make([]streams.Flow, magnitude)
for i := 0; i < magnitude; i++ {
out[i] = NewPassThrough()
go func(o streams.Flow) {
defer close(o.In())
for element := range outlet.Out() {
o.In() <- element
}
}(out[i])
}
return out
}
// Merge merges multiple flows into a single flow.
// When all specified outlets are closed, the resulting flow will close.
func Merge(outlets ...streams.Flow) streams.Flow {
merged := NewPassThrough()
var wg sync.WaitGroup
wg.Add(len(outlets))
for _, out := range outlets {
go func(outlet streams.Outlet) {
for element := range outlet.Out() {
merged.In() <- element
}
wg.Done()
}(out)
}
// close the in channel on the last outlet close.
go func() {
wg.Wait()
close(merged.In())
}()
return merged
}
// ZipWith combines elements from multiple input streams using a combiner function.
// It returns a new Flow with the resulting values. The combiner function is called
// with a slice of elements, where each element is taken from each input outlet.
// The elements in the slice will be in the order of outlets. If an outlet
// is closed, its corresponding element in the slice will be the zero value.
// The returned Flow will close when all the input outlets are closed.
//
// It will panic if provided less than two outlets, or if any of the outlets has an
// element type other than T.
func ZipWith[T, R any](combine func([]T) R, outlets ...streams.Outlet) streams.Flow {
// validate outlets length
if len(outlets) < 2 {
panic(fmt.Errorf("outlets length %d must be at least 2", len(outlets)))
}
combined := NewPassThrough()
// asynchronously populate the flow with zipped elements
go func() {
var zero T
head := outlets[0]
tail := outlets[1:]
for n := 0; n < len(outlets); n++ {
for element := range head.Out() {
// initialize the slice to contain one element per outlet
zipped := make([]T, len(outlets))
// fill zero elements for the closed outlets
for j := 0; j < n; j++ {
zipped[j] = zero
}
// set the value from the head outlet
zipped[n] = element.(T)
// read elements from subsequent outlets
for i, outlet := range tail {
// this read will block until an element is available
// for the outlet, or it is closed by the upstream
e, ok := <-outlet.Out()
if ok {
zipped[i+n+1] = e.(T)
} else {
zipped[i+n+1] = zero
}
}
// send the result of the combiner function downstream;
// at this point zipped has at least one non-empty value
// taken from the current head
combined.In() <- combine(zipped)
}
// when the head channel is closed move the head to the next
// outlet and advance the tail slice
switch n {
case len(outlets) - 1:
// last iteration
case len(outlets) - 2:
head = outlets[n+1]
tail = nil
default:
head = outlets[n+1]
tail = outlets[n+2:]
}
}
close(combined.In()) // all provided outlets are closed
}()
return combined
}
// Flatten creates a Flow to flatten the stream of slices.
// T specifies the outgoing element type, and the incoming element type is []T.
func Flatten[T any](parallelism int) streams.Flow {
return NewFlatMap(func(element []T) []T {
return element
}, parallelism)
}