Skip to content

Commit 386f6d8

Browse files
authored
chore: update code documentation and rework the maze example (#167)
1 parent 92faadd commit 386f6d8

File tree

6 files changed

+96
-56
lines changed

6 files changed

+96
-56
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@ processing operations. These building blocks can be used to transform and manipu
3030
- **Map:** Transforms each element in the stream.
3131
- **FlatMap:** Transforms each element into a stream of slices of zero or more elements.
3232
- **Filter:** Selects elements from the stream based on a condition.
33+
- **Fold:** Combines elements of the stream with the last folded value and emits the new value.
34+
Requires an initial value.
3335
- **Reduce:** Combines elements of the stream with the last reduced value and emits the new value.
36+
Does not require an initial value.
3437
- **PassThrough:** Passes elements through unchanged.
3538
- **Split<sup>1</sup>:** Divides the stream into two streams based on a boolean predicate.
3639
- **FanOut<sup>1</sup>:** Duplicates the stream to multiple outputs for parallel processing.

examples/maze/.gitignore

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1 @@
1-
file
2-
out.txt
1+
maze

examples/maze/main.go

Lines changed: 53 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package main
22

33
import (
4+
"math/rand"
5+
"strings"
46
"time"
57

68
ext "github.com/reugn/go-streams/extension"
@@ -10,61 +12,76 @@ import (
1012
type Move int
1113

1214
const (
13-
LEFT Move = iota
14-
RIGHT
15+
left Move = iota
16+
right
1517
)
1618

17-
// Simulate waling through a maze
18-
var MAZE = "------"
19-
var MOVES = []Move{RIGHT, RIGHT, LEFT, RIGHT, LEFT, LEFT, RIGHT, RIGHT, RIGHT, LEFT}
19+
const (
20+
mazeLength = 10
21+
movesNumber = 20
22+
)
23+
24+
var maze = strings.Repeat("-", mazeLength)
25+
26+
// Simulate walking through a maze.
27+
func main() {
28+
source := ext.NewChanSource(moveChan(500*time.Millisecond, movesNumber))
29+
positionFlow := flow.NewFold(mazeLength/2, move)
30+
formatFlow := flow.NewMap(format, 1)
31+
sink := ext.NewStdoutSink()
32+
33+
source.
34+
Via(positionFlow).
35+
Via(formatFlow).
36+
To(sink)
37+
}
2038

39+
// move calculates the next position given the current position and a Move.
40+
// If the move is invalid (out of bounds), the original position is returned.
2141
func move(m Move, pos int) int {
22-
if m == RIGHT {
42+
switch {
43+
case m == left && pos > 0:
44+
return pos - 1
45+
case m == right && pos < len(maze)-1:
2346
return pos + 1
47+
default:
48+
return pos
2449
}
25-
26-
return pos - 1
2750
}
2851

52+
// format marks the position with an X.
2953
func format(pos int) string {
30-
// Mark the position with an X
31-
positionInMaze := MAZE[:pos] + "X" + MAZE[pos+1:]
32-
return positionInMaze
54+
return maze[:pos] + "X" + maze[pos+1:]
3355
}
3456

35-
// Make a move every interval
36-
func moveChan(interval time.Duration) chan any {
37-
// Create a sequence of moves
57+
// moveChan creates a channel that emits n random moves at the specified interval.
58+
func moveChan(interval time.Duration, n int) chan any {
3859
outChan := make(chan any)
3960

4061
go func() {
41-
ticker := time.NewTicker(interval)
42-
// Start at position 0
43-
pos := 0
44-
for _ = range ticker.C {
45-
// Send the next move
46-
outChan <- MOVES[pos]
47-
// Move to the next position
48-
pos = (pos + 1)
49-
if pos >= len(MOVES) {
50-
// Stop
51-
close(outChan)
52-
break
53-
}
62+
defer close(outChan)
63+
// generate a sequence of moves
64+
moves := generateRandomMoves(n)
65+
for i := 0; i < n; i++ {
66+
time.Sleep(interval)
67+
// send the next move
68+
outChan <- moves[i]
5469
}
5570
}()
5671

5772
return outChan
5873
}
5974

60-
func main() {
61-
source := ext.NewChanSource(moveChan(time.Second))
62-
positionFlow := flow.NewFold(0, move)
63-
formatFlow := flow.NewMap(format, 1)
64-
sink := ext.NewStdoutSink()
75+
// generateRandomMoves creates a random sequence of moves with length n.
76+
func generateRandomMoves(n int) []Move {
77+
if n <= 0 {
78+
return []Move{}
79+
}
6580

66-
source.
67-
Via(positionFlow).
68-
Via(formatFlow).
69-
To(sink)
81+
moves := make([]Move, n)
82+
for i := 0; i < n; i++ {
83+
moves[i] = Move(rand.Intn(2)) //nolint:gosec
84+
}
85+
86+
return moves
7087
}

flow/fold.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ import (
77
// FoldFunction represents a Fold transformation function.
88
type FoldFunction[T, R any] func(T, R) R
99

10-
// Fold takes one element and produces one element.
10+
// Fold implements a "rolling" fold transformation on a data stream with an
11+
// initial value. Combines the current element with the last folded value and
12+
// emits the new value.
1113
//
1214
// in -- 1 -- 2 ---- 3 -- 4 ------ 5 --
1315
//
@@ -27,7 +29,8 @@ var _ streams.Flow = (*Fold[any, any])(nil)
2729
// NewFold returns a new Fold operator.
2830
// T specifies the incoming element type, and the outgoing element type is R.
2931
//
30-
// FoldFunction is the Fold transformation function.
32+
// init is the initial value for the folding process.
33+
// foldFunction is the function that performs the fold transformation.
3134
func NewFold[T, R any](init R, foldFunction FoldFunction[T, R]) *Fold[T, R] {
3235
foldFlow := &Fold[T, R]{
3336
init: init,
@@ -71,7 +74,7 @@ func (m *Fold[T, R]) transmit(inlet streams.Inlet) {
7174
}
7275

7376
func (m *Fold[T, R]) doStream() {
74-
var lastFolded = m.init
77+
lastFolded := m.init
7578
for element := range m.in {
7679
lastFolded = m.foldFunction(element.(T), lastFolded)
7780
m.out <- lastFolded

flow/fold_test.go

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,25 @@ func TestFold(t *testing.T) {
1414
tests := []struct {
1515
name string
1616
foldFlow streams.Flow
17+
ptr bool
1718
}{
1819
{
19-
name: "strings",
20+
name: "values",
2021
foldFlow: flow.NewFold(
2122
"",
2223
func(a int, b string) string {
2324
return b + strconv.Itoa(a)
2425
}),
26+
ptr: false,
27+
},
28+
{
29+
name: "pointers",
30+
foldFlow: flow.NewFold(
31+
"",
32+
func(a *int, b string) string {
33+
return b + strconv.Itoa(*a)
34+
}),
35+
ptr: true,
2536
},
2637
}
2738
input := []int{1, 2, 3, 4, 5}
@@ -34,13 +45,22 @@ func TestFold(t *testing.T) {
3445
source := ext.NewChanSource(in)
3546
sink := ext.NewChanSink(out)
3647

37-
ingestSlice(input, in)
38-
close(in)
48+
if tt.ptr {
49+
ingestSlice(ptrSlice(input), in)
50+
close(in)
51+
52+
source.
53+
Via(tt.foldFlow).
54+
To(sink)
55+
} else {
56+
ingestSlice(input, in)
57+
close(in)
3958

40-
source.
41-
Via(tt.foldFlow).
42-
Via(flow.NewPassThrough()). // Via coverage
43-
To(sink)
59+
source.
60+
Via(tt.foldFlow).
61+
Via(flow.NewPassThrough()). // Via coverage
62+
To(sink)
63+
}
4464

4565
output := readSlice[string](out)
4666
assert.Equal(t, expected, output)

flow/reduce.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
// ReduceFunction combines the current element with the last reduced value.
88
type ReduceFunction[T any] func(T, T) T
99

10-
// Reduce represents a “rolling” reduce on a data stream.
10+
// Reduce implements a “rolling” reduce transformation on a data stream.
1111
// Combines the current element with the last reduced value and emits the new value.
1212
//
1313
// in -- 1 -- 2 ---- 3 -- 4 ------ 5 --
@@ -19,7 +19,6 @@ type Reduce[T any] struct {
1919
reduceFunction ReduceFunction[T]
2020
in chan any
2121
out chan any
22-
lastReduced any
2322
}
2423

2524
// Verify Reduce satisfies the Flow interface.
@@ -71,15 +70,14 @@ func (r *Reduce[T]) transmit(inlet streams.Inlet) {
7170
}
7271

7372
func (r *Reduce[T]) doStream() {
73+
var lastReduced any
7474
for element := range r.in {
75-
if r.lastReduced == nil {
76-
r.lastReduced = element
75+
if lastReduced == nil {
76+
lastReduced = element
7777
} else {
78-
r.lastReduced = r.reduceFunction(
79-
r.lastReduced.(T),
80-
element.(T))
78+
lastReduced = r.reduceFunction(lastReduced.(T), element.(T))
8179
}
82-
r.out <- r.lastReduced
80+
r.out <- lastReduced
8381
}
8482
close(r.out)
8583
}

0 commit comments

Comments
 (0)