-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
130 lines (118 loc) · 2.77 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package main
import (
"crypto/md5"
"errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync"
)
// main demonstrates a much more advanced pipeline example.
//
// the example in the basic_pipeline package does not account
// for various scenarios such as error handling, or upstream
// channels that will not have all value(s) produced.
//
// For example, if a stage in the pipeline (non producer, non sink)
// decided it would only forward on messages that were divisible by
// 5 equally etc, this creates a scenario where the output data is
// significantly less than the input data.
//
// The example outlined below checks the checksum of all the files
// in this directory, a few .txt files.
func main() {
done := make(chan struct{})
defer close(done)
root := "."
m, err := md5All(root)
if err != nil {
panic(err)
}
fmt.Println(m)
}
// result encapsulates the data for a single file
type result struct {
path string
sum [md5.Size]byte
err error
}
// sumFilesStage walks the tree and digests each of the files in a
// goroutine, the results are sent to it's downstream channel.
// sumFilesStage will return on the first error.
func sumFilesStage(done <-chan struct{}, root string) (<-chan result, <-chan error) {
out := make(chan result)
e := make(chan error, 1)
go func() {
var wg sync.WaitGroup
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
wg.Add(1)
go func() {
defer wg.Done()
data, err := ioutil.ReadFile(path)
select {
case out <- result{path, md5.Sum(data), err}:
case <-done:
}
}()
select {
case <-done:
return errors.New("walking cancelled")
default:
return nil
}
})
go func() {
wg.Wait()
close(out)
}()
e <- err
}()
return out, e
}
// md5All reads all the files in the current directory and returns
// a map for each file path (name) and an array of (16) bytes.
// if anything fails, an error is returned.
func md5All(root string) (map[string][md5.Size]byte, error) {
m := make(map[string][md5.Size]byte)
done := make(chan struct{})
defer close(done)
in, errs := sumFilesStage(done, root)
for reply := range in {
if reply.err != nil {
return nil, reply.err
}
m[reply.path] = reply.sum
}
if err := <-errs; err != nil {
return nil, err
}
return m, nil
}
// merge is a generic fan in implementation.
// it launches N goroutines, dictated by the
// number of channels in the inbound slice.
func merge[T any](done <-chan struct{}, inbound ...<-chan T) <-chan T {
out := make(chan T)
var wg sync.WaitGroup
size := len(inbound)
wg.Add(size)
task := func(c <-chan T) {
defer wg.Done()
for {
select {
case out <- <-c:
case <-done:
return
}
}
}
for _, ch := range inbound {
go task(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}