-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdag_test.go
89 lines (85 loc) · 3.45 KB
/
dag_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package lambdag_test
import (
"context"
"sort"
"testing"
"github.com/mashiike/lambdag"
"github.com/stretchr/testify/require"
)
func TestDAG(t *testing.T) {
dag, err := lambdag.NewDAG("test")
require.NoError(t, err)
task1, err := dag.NewTask(
"task1",
lambdag.TaskHandlerFunc(func(ctx context.Context, tr *lambdag.TaskRequest) (interface{}, error) {
return nil, nil
}),
)
require.NoError(t, err)
task2, err := dag.NewTask(
"task2",
lambdag.TaskHandlerFunc(func(ctx context.Context, tr *lambdag.TaskRequest) (interface{}, error) {
return nil, nil
}),
)
require.NoError(t, err)
task3, err := dag.NewTask(
"task3",
lambdag.TaskHandlerFunc(func(ctx context.Context, tr *lambdag.TaskRequest) (interface{}, error) {
return nil, nil
}),
)
require.NoError(t, err)
task4, err := dag.NewTask(
"task4",
lambdag.TaskHandlerFunc(func(ctx context.Context, tr *lambdag.TaskRequest) (interface{}, error) {
return nil, nil
}),
)
require.NoError(t, err)
task5, err := dag.NewTask(
"task5",
lambdag.TaskHandlerFunc(func(ctx context.Context, tr *lambdag.TaskRequest) (interface{}, error) {
return nil, nil
}),
)
require.NoError(t, err)
// task1 ─> task2 ───┓
// │ v
// └───────────> task3 ──> task4
// ^
// task5 ──────┘
err = task1.SetDownstream(task2)
require.NoError(t, err)
err = task2.SetDownstream(task3)
require.NoError(t, err)
err = task1.SetDownstream(task3)
require.NoError(t, err)
err = task3.SetDownstream(task4)
require.NoError(t, err)
err = task5.SetDownstream(task4)
require.NoError(t, err)
require.EqualValues(t, []*lambdag.Task{task1, task5}, dag.GetStartTasks(), "StartTasks")
requireTasksMatch(t, []*lambdag.Task{task1, task2, task3, task4, task5}, dag.GetAllTasks(), "AllTasks")
require.EqualValues(t, []*lambdag.Task{task2, task3}, dag.GetDownstreamTasks("task1"), "DownstreamTasks")
require.EqualValues(t, []*lambdag.Task{task1, task2}, dag.GetUpstreamTasks("task3"), "UpstreamTasks")
requireTasksMatch(t, []*lambdag.Task{task3, task5, task2, task1}, dag.GetAncestorTasks("task4"), "AncestorTasks")
requireTasksMatch(t, []*lambdag.Task{task3, task4}, dag.GetDescendantTasks("task2"), "DescendantTasks")
require.EqualValues(t, false, dag.IsExecutableTask("task2", []string{}), "task2 is executable? task1 not finished")
require.EqualValues(t, true, dag.IsExecutableTask("task2", []string{"task1"}), "task2 is executable? task1 finished")
require.EqualValues(t, false, dag.IsExecutableTask("task3", []string{"task1"}), "task3 is executable? task2 not finished")
require.EqualValues(t, true, dag.IsExecutableTask("task3", []string{"task1", "task2"}), "task3 is executable? task2 not finished")
requireTasksMatch(t, []*lambdag.Task{task1, task5}, dag.GetExecutableTasks([]string{}), "first executable tasks")
requireTasksMatch(t, []*lambdag.Task{task2, task5}, dag.GetExecutableTasks([]string{"task1"}), "executable if task1 finished")
requireTasksMatch(t, []*lambdag.Task{task3, task5}, dag.GetExecutableTasks([]string{"task1", "task2"}), "executable if task1, task2 finished")
}
func requireTasksMatch(t *testing.T, listA []*lambdag.Task, listB []*lambdag.Task, msgAndArgs ...interface{}) {
t.Helper()
sort.SliceStable(listA, func(i, j int) bool {
return listA[i].ID() > listA[j].ID()
})
sort.SliceStable(listB, func(i, j int) bool {
return listB[i].ID() > listB[j].ID()
})
require.EqualValues(t, listA, listB, msgAndArgs...)
}