-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
123 lines (109 loc) · 3.38 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package main
import (
"fmt"
"sync"
"time"
)
// main demonstrates the request sequence pattern.
// by restore sequence, we mean that with multiple
// goroutines performing work, we can ensure that they
// each take their turn and are given a fair chance to
// provide their results.
//
// Fan in has no guarantee and you could have two goroutines
// where goroutine A yields all values before any of Goroutine B's
//
// Restore Sequence forces turn based results by utilising a blocking
// chan shared between messages of each goroutine.
// resulting in A, B, A, B and so on and so forth.
func main() {
fanned := fanInMerge(respond(1), respond(2), respond(3))
/*
Even tho we have 3 invocations of fanInMerge, it actually spawns 5
working routines internally, so we have 15 goroutines total (3x5)
all being fanned in through the fanned channel.
Each call to respond(1), response(2), respond(3) will yield 5 responses
and internally shares a done channel to ensure only any one of its internal
goroutines can yield a result when expected.
Here we are attempting to achieve that every 3 reads of the channel (fanned)
comes from each of the individual channels we originally made. Order is not
entirely guaranteed due to the nature of scheduling but there should always be
a case of A, B, C results in each block of 3 reads.
*/
expected := 5 // 3 x 5 responses
for i := 0; i < expected; i++ {
// for each of the results, attempt to get a value
// from each
first := <-fanned
second := <-fanned
third := <-fanned
fmt.Println(first)
fmt.Println(second)
fmt.Println(third)
// signal to each in the order we received, it's ok to yield again
first.wait <- struct{}{}
second.wait <- struct{}{}
third.wait <- struct{}{}
}
}
// reply encapsulates some response from a server
// each reply stores an id to easily demonstrate
// the sequence restoration in practice.
type reply struct {
id int
duration time.Duration
message string
wait chan struct{}
}
// String implements fmt.Stringer and returns a string
// representation of the reply instance.
func (r reply) String() string {
return fmt.Sprintf("id: %d, duration: %s, message: %s", r.id, r.duration, r.message)
}
// fanInMerge merges the multiple channels of work
func fanInMerge(ch ...<-chan reply) <-chan reply {
out := make(chan reply)
var wg sync.WaitGroup
wg.Add(len(ch))
for _, channel := range ch {
go func(c <-chan reply) {
defer wg.Done()
for v := range channel {
out <- v
}
}(channel)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
// respond simulates IO bound calls to a server and returns a number
// of responses on the out channel.
// Each goroutine spawned by this function will push a new instance
// onto the out channel, then wait until it is signalled to yield
// another value.
//
// This ensures that all the goroutines from multiple invocations of
// respond operate in a turn based manner and the order of responses
// across multiple calls to this function are controlled.
func respond(id int) <-chan reply {
out := make(chan reply)
waiter := make(chan struct{})
var wg sync.WaitGroup
wg.Add(5)
go func() {
for i := range 5 {
out <- reply{duration: time.Duration(1000 * i), message: fmt.Sprintf("message %d", i), id: id, wait: waiter}
time.Sleep(100 * time.Millisecond)
<-waiter
}
}()
go func() {
wg.Wait()
close(out)
close(waiter)
}()
return out
}