With Dotflow Go, you get a powerful and easy-to-use library designed to create execution pipelines without complication. Add tasks intuitively and control the entire process with just a few commands.
Our goal is to make task management faster and more secure, without overwhelming you with complexity. Simply instantiate the DotFlow struct, add your tasks with the Add method, and start execution with the Start method.
This is the Go implementation of the dotflow Python library, following the same interface and patterns.
Start with the basics here.
We use GitHub issues for tracking bugs and feature requests and have limited bandwidth to address them. If you need anything, I ask you to please follow our templates for opening issues or discussions.
Click to expand
To install dotflow-go, run the following command:
go get github.com/dotflow-io/dotflow-goTo run the examples, you have several options:
Option 1: From the project root
# Clone the repository first
git clone https://github.com/dotflow-io/dotflow-go.git
cd dotflow-go
# Run the basic example
go run examples/basic/main.goOption 2: Using go.work (Go 1.18+)
# From the project root
go work init .
go run examples/basic/main.goOption 3: From the example directory
cd examples/basic
go mod init example
go mod edit -replace github.com/dotflow-io/dotflow-go=../..
go mod tidy
go run main.goThe simplest file could look like this:
package main
import (
"fmt"
"github.com/dotflow-io/dotflow-go"
"github.com/dotflow-io/dotflow-go/pkg/action"
"github.com/dotflow-io/dotflow-go/pkg/context"
)
func myCallback(task *dotflow.Task) error {
fmt.Printf("Callback: %+v\n", task)
return nil
}
func main() {
workflow := dotflow.NewDotFlow(nil)
myActionX := action.NewAction(func(initialContext, previousContext *context.Context, task interface{}) (*context.Context, error) {
fmt.Println("Executing my_task_x")
return context.NewContext("result from x", 0, nil), nil
}, nil)
myActionY := action.NewAction(func(initialContext, previousContext *context.Context, task interface{}) (*context.Context, error) {
fmt.Println("Executing my_task_y")
return context.NewContext("result from y", 0, nil), nil
}, nil)
workflow.Task().Add(myActionX, myCallback, nil, "default")
workflow.Task().Add(myActionY, myCallback, nil, "default")
workflow.Start()
}Start with the basics, which is importing the necessary packages.
import (
"github.com/dotflow-io/dotflow-go"
"github.com/dotflow-io/dotflow-go/pkg/action"
"github.com/dotflow-io/dotflow-go/pkg/context"
)Create a myCallback function to receive execution information of a task. It is not necessary to include this function, as you will still have a report at the end of the execution in the instantiated object of the DotFlow struct. This myCallback function is only needed if you need to do something after the execution of the task, for example: sending a message to someone, making a phone call, or sending a letter.
func myCallback(task *dotflow.Task) error {
fmt.Printf("Task %d completed with status: %s\n", task.TaskID, task.Status)
return nil
}Now, create the function responsible for executing your task. It's very simple; just use the action.NewAction function to wrap your function, and that's it—you've created a task.
myAction := action.NewAction(func(initialContext, previousContext *context.Context, task interface{}) (*context.Context, error) {
fmt.Println("Executing task")
return context.NewContext("result", 0, nil), nil
}, nil)You can also use action.ActionFromFunc to convert a simple function to an action:
func mySimpleTask() (string, error) {
return "result", nil
}
myAction := action.ActionFromFunc(mySimpleTask)Instantiate the DotFlow struct in a workflow variable to be used in the following steps.
workflow := dotflow.NewDotFlow(nil)You can also provide a custom configuration:
config := dotflow.NewConfig()
workflow := dotflow.NewDotFlow(config)Now, simply add the action and callback you created earlier to the workflow using the code below. This process is necessary to define which tasks will be executed and the order in which they will run. The execution order follows the sequence in which they were added to the workflow.
- Adding one step at a time:
workflow.Task().Add(myActionX, myCallback, nil, "default")
workflow.Task().Add(myActionY, myCallback, nil, "default")- Adding a step with initial context:
initialCtx := context.NewContext(map[string]interface{}{
"data": "initial value",
}, 0, nil)
workflow.Task().Add(myAction, myCallback, initialCtx, "default")- Adding a step with a group name:
workflow.Task().Add(myAction, myCallback, nil, "group1")
workflow.Task().Add(myAction2, myCallback, nil, "group2")Parameters:
myAction: The action to executemyCallback: The callback function (can be nil)initialContext: Initial context (can be nil)groupName: Group name for the task (default: "default")
Finally, just execute the workflow with the following code snippet.
workflow.Start()Or with custom options:
workflow.StartWithOptions(
func(tasks []*dotflow.Task) error {
fmt.Println("All tasks completed successfully")
return nil
},
func(tasks []*dotflow.Task) error {
fmt.Println("Some tasks failed")
return nil
},
dotflow.TypeExecutionSequential,
false, // keepGoing - continue execution even if a task fails
)You can also access results after execution:
workflow.Start()
// Get all tasks
tasks := workflow.ResultTask()
// Get all contexts
contexts := workflow.ResultContext()
// Get all storage objects
storage := workflow.ResultStorage()Execute tasks one after another in sequence. This is the default mode.
workflow.Task().Add(taskFoo, nil, nil, "default")
workflow.Task().Add(taskBar, nil, nil, "default")
workflow.Start()
// or explicitly:
workflow.StartWithOptions(nil, nil, dotflow.TypeExecutionSequential, false)Click to see diagram
flowchart TD
A[Start] -->|run| B
B[task_foo] -->|response to| C
C[task_bar] -->|response| D
D[Finish]
Execute tasks in a background goroutine. Tasks still run sequentially, but the entire workflow runs asynchronously.
workflow.Task().Add(taskFoo, nil, nil, "default")
workflow.Task().Add(taskBar, nil, nil, "default")
workflow.StartWithOptions(nil, nil, dotflow.TypeExecutionBackground, false)Click to see diagram
flowchart TD
A[Start] -->|run| B
B[task_foo] -->|response to| C
C[task_bar] -->|response| D
D[Finish]
Execute all tasks in parallel. All tasks start simultaneously and run concurrently.
workflow.Task().Add(taskA, nil, nil, "default")
workflow.Task().Add(taskB, nil, nil, "default")
workflow.Task().Add(taskC, nil, nil, "default")
workflow.Task().Add(taskD, nil, nil, "default")
workflow.StartWithOptions(nil, nil, dotflow.TypeExecutionParallel, false)Click to see diagram
flowchart TD
S[Start] -->|run| A[task_a]
S[Start] -->|run| B[task_b]
S[Start] -->|run| C[task_c]
S[Start] -->|run| D[task_d]
A --> H[Finish]
B --> H[Finish]
C --> H[Finish]
D --> H[Finish]
Tasks with the same group name run sequentially within their group, but different groups run in parallel.
workflow.Task().Add(taskFoo1, nil, nil, "foo")
workflow.Task().Add(taskFoo2, nil, nil, "foo")
workflow.Task().Add(taskBar1, nil, nil, "bar")
workflow.Task().Add(taskBar2, nil, nil, "bar")
workflow.Start()Click to see diagram
flowchart TD
A[Start] -->|run| C(Parallel Groups)
C -->|run| D[task_foo1]
C -->|run| E[task_bar1]
D -->|response| X[task_foo2]
X --> H[Finish]
E -->|response| Y[task_bar2]
Y --> H[Finish]
You can configure actions with retry, timeout, and backoff:
config := &action.ActionConfig{
Retry: 5,
}
myAction := action.NewAction(func(initialContext, previousContext *context.Context, task interface{}) (*context.Context, error) {
// Your task logic here
return context.NewContext("result", 0, nil), nil
}, config)config := &action.ActionConfig{
Timeout: 60 * time.Second,
}
myAction := action.NewAction(func(initialContext, previousContext *context.Context, task interface{}) (*context.Context, error) {
// Your task logic here
return context.NewContext("result", 0, nil), nil
}, config)config := &action.ActionConfig{
Retry: 5,
RetryDelay: 5 * time.Second,
}
myAction := action.NewAction(func(initialContext, previousContext *context.Context, task interface{}) (*context.Context, error) {
// Your task logic here
return context.NewContext("result", 0, nil), nil
}, config)config := &action.ActionConfig{
Retry: 5,
Backoff: true,
RetryDelay: 1 * time.Second, // Initial delay, will double on each retry
}
myAction := action.NewAction(func(initialContext, previousContext *context.Context, task interface{}) (*context.Context, error) {
// Your task logic here
return context.NewContext("result", 0, nil), nil
}, config)config := &action.ActionConfig{
Retry: 3,
Timeout: 30 * time.Second,
RetryDelay: 2 * time.Second,
Backoff: true,
}
myAction := action.NewAction(func(initialContext, previousContext *context.Context, task interface{}) (*context.Context, error) {
// Your task logic here
return context.NewContext("result", 0, nil), nil
}, config)See the examples/ directory for more examples:
examples/basic/main.go- Basic workflow example
To run the examples, navigate to the project root and execute:
# Run the basic example
go run examples/basic/main.goFor more detailed instructions, see EXAMPLES.md.
Access data from previous tasks:
workflow := dotflow.NewDotFlow(nil)
task1 := action.NewAction(func(initialContext, previousContext *context.Context, task interface{}) (*context.Context, error) {
return context.NewContext("data from task 1", 0, nil), nil
}, nil)
task2 := action.NewAction(func(initialContext, previousContext *context.Context, task interface{}) (*context.Context, error) {
// Access previous context
if previousContext != nil && previousContext.Storage != nil {
fmt.Printf("Previous task result: %v\n", previousContext.Storage)
}
return context.NewContext("data from task 2", 0, nil), nil
}, nil)
workflow.Task().Add(task1, nil, nil, "default")
workflow.Task().Add(task2, nil, nil, "default")
workflow.Start()Pass initial data to tasks:
workflow := dotflow.NewDotFlow(nil)
initialCtx := context.NewContext(map[string]interface{}{
"user_id": 123,
"name": "John",
}, 0, nil)
task := action.NewAction(func(initialContext, previousContext *context.Context, task interface{}) (*context.Context, error) {
if initialContext != nil && initialContext.Storage != nil {
data := initialContext.Storage.(map[string]interface{})
fmt.Printf("User ID: %v, Name: %v\n", data["user_id"], data["name"])
}
return context.NewContext("processed", 0, nil), nil
}, nil)
workflow.Task().Add(task, nil, initialCtx, "default")
workflow.Start()Continue execution even if a task fails:
workflow.StartWithOptions(
nil,
nil,
dotflow.TypeExecutionSequential,
true, // keepGoing - continue even if a task fails
)Handle success and failure at the workflow level:
onSuccess := func(tasks []*dotflow.Task) error {
fmt.Println("All tasks completed successfully!")
for _, task := range tasks {
fmt.Printf("Task %d: %s\n", task.TaskID, task.Status)
}
return nil
}
onFailure := func(tasks []*dotflow.Task) error {
fmt.Println("Some tasks failed!")
for _, task := range tasks {
if task.Status == dotflow.TypeStatusFailed {
fmt.Printf("Task %d failed: %v\n", task.TaskID, task.Error)
}
}
return nil
}
workflow.StartWithOptions(onSuccess, onFailure, dotflow.TypeExecutionSequential, false)While dotflow-go follows the same interface and patterns as the Python version, there are some Go-specific adaptations:
- Decorators: Go doesn't have decorators, so use
action.NewAction()instead of@action - Function Signatures: Actions use Go's type system with explicit context parameters
- Concurrency: Parallel execution uses Go's goroutines and
sync.WaitGroup - Error Handling: Uses Go's standard error interface
Contributions are welcome! Please feel free to submit a Pull Request.
This project is licensed under the terms of the MIT License.