Skip to content

Commit

Permalink
Fix concurrent workers register duplicate task execution for nextstep…
Browse files Browse the repository at this point in the history
… in the joined node

* When one or more steps (e.g., Step1 and Step2) join one Step 3, it may happen occasionally
  that two workers register two duplicate task executions for one next step.
  See bug report in ystia#786
* Fix: Before registering a new task execution, a worker checks if the task execution has already
  been registered for the given step (at _yorc/tasks/<taskID>/.registeredExecutions/<stepName>)
  When deleting the ".runningExecutions" in notifyEnd(), also delete the ".registeredExecution".
* Note: If the step status is DONE, ERROR, or CANCELED, we still register a task execution.
  This is the case where we resume a workflow. The task executions are registered run again.
  But they will be bypass if the given step is DONE.
  • Loading branch information
trihoangvo committed Oct 19, 2022
1 parent 8aac43e commit d0ac20f
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 2 deletions.
3 changes: 3 additions & 0 deletions tasks/workflow/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ func TestRunConsulWorkflowPackageTests(t *testing.T) {
t.Run("TestRunWorkflowStepReplay", func(t *testing.T) {
testRunWorkflowStepReplay(t, srv, client)
})
t.Run("testConcurrentTaskExecutionsForNextStep", func(t *testing.T) {
testConcurrentTaskExecutionsForNextStep(t, srv, client)
})
})
}

Expand Down
24 changes: 22 additions & 2 deletions tasks/workflow/task_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"path"
"strconv"
"strings"
"time"

"github.com/hashicorp/consul/api"
Expand Down Expand Up @@ -141,6 +142,7 @@ func numberOfRunningExecutionsForTask(cc *api.Client, taskID string) (*consuluti
func (t *taskExecution) notifyEnd() error {
log.Debugf("notifyEnd for taskExecution %q", t.id)
execPath := path.Join(consulutil.TasksPrefix, t.taskID, ".runningExecutions")
registeredPath := path.Join(consulutil.TasksPrefix, t.taskID, ".registeredExecutions")
l, e, err := numberOfRunningExecutionsForTask(t.cc, t.taskID)
if err != nil {
return err
Expand All @@ -149,10 +151,28 @@ func (t *taskExecution) notifyEnd() error {

kv := t.cc.KV()
// Delete our execID
ops := api.KVTxnOps{
&api.KVTxnOp{
Verb: api.KVDelete,
Key: path.Join(execPath, t.id),
},
&api.KVTxnOp{
Verb: api.KVDelete,
Key: path.Join(registeredPath, t.step),
},
}
log.Debugf("Deleting runningExecutions with id %q for task %q", t.id, t.taskID)
_, err = kv.Delete(path.Join(execPath, t.id), nil)
ok, response, _, err := kv.Txn(ops, nil)
if err != nil {
return errors.Wrap(err, consulutil.ConsulGenericErrMsg)
return errors.Wrap(err, "Failed to execute transaction")
}
if !ok {
// Check the response
var errs []string
for _, e := range response.Errors {
errs = append(errs, e.What)
}
return errors.Errorf("Failed to execute transaction: %s", strings.Join(errs, ", "))
}
if e <= 1 && t.finalFunction != nil {
log.Debugf("notifyEnd running finalFunction for taskExecution %q", t.id)
Expand Down
22 changes: 22 additions & 0 deletions tasks/workflow/testdata/workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,22 @@ topology_template:
protocol: tcp
network_name: PRIVATE
initiator: source

Compute_2:
type: ystia.yorc.tests.nodes.WFCompute
capabilities:
scalable:
properties:
min_instances: 1
max_instances: 1
default_instances: 1
endpoint:
properties:
secure: true
protocol: tcp
network_name: PRIVATE
initiator: source

JobNode:
type: ystia.yorc.tests.nodes.JobNode
workflows:
Expand Down Expand Up @@ -142,6 +158,12 @@ topology_template:
on_success:
- WFNode_hostedOnComputeHost_add_target
- WFNode_creating
Compute_2_install:
target: Compute_2
activities:
- delegate: install
on_success:
- WFNode_creating
WFNode_initial:
target: WFNode
activities:
Expand Down
75 changes: 75 additions & 0 deletions tasks/workflow/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,81 @@ func testRunWorkflowStepReplay(t *testing.T, srv *testutil.TestServer, client *a
require.Equal(t, true, foundStep, "Did not find step %s in next steps to execute", expectedNextStep)
}

func testConcurrentTaskExecutionsForNextStep(t *testing.T, srv *testutil.TestServer, client *api.Client) {
myWorker := &worker{
consulClient: client,
cfg: config.Configuration{
// Ensure we are not deleting filesystem files elsewhere
WorkingDirectory: "./testdata/work/",
},
}
myWorker2 := &worker{
consulClient: client,
cfg: config.Configuration{
// Ensure we are not deleting filesystem files elsewhere
WorkingDirectory: "./testdata/work/",
},
}
deploymentID := "TestRunWf"
taskID := "tWorkflow"
topologyPath := "testdata/workflow.yaml"
ctx := context.Background()
err := deployments.StoreDeploymentDefinition(ctx, deploymentID, topologyPath)
require.NoError(t, err, "Unexpected error storing %s", topologyPath)

// Registering test data with the first step of the workflow already done
srv.PopulateKV(t, testData(deploymentID))

wfName := "install"

srv.PopulateKV(t, testData(deploymentID))
deployments.SetDeploymentStatus(context.Background(), deploymentID, deployments.DEPLOYMENT_IN_PROGRESS)

mockExecutor := &mockExecutor{}
registry.GetRegistry().RegisterDelegates([]string{"ystia.yorc.tests.nodes.WFCompute"}, mockExecutor, "tests")

var myTaskExecution taskExecution
myTaskExecution.cc = client
myTaskExecution.targetID = deploymentID
myTaskExecution.taskID = taskID
myTaskExecution.step = "Compute_install"

var myTaskExecution2 taskExecution
myTaskExecution2.cc = client
myTaskExecution2.targetID = deploymentID
myTaskExecution2.taskID = taskID
myTaskExecution2.step = "Compute_2_install"

expectedNextStep := "WFNode_creating"

// Test for duplicated task execution
// When Compute_install and Compute_2_install steps complete, they continue to register the next step WFNode_creating
// but only one task execution should be registered for the step WFNode_creating
srv.SetKV(t, path.Join(consulutil.WorkflowsPrefix, taskID, "Compute_install"), []byte("initial"))
srv.SetKV(t, path.Join(consulutil.WorkflowsPrefix, taskID, "Compute_2_install"), []byte("DONE"))
srv.SetKV(t, path.Join(consulutil.WorkflowsPrefix, taskID, "WFNode_initial"), []byte("DONE"))

err = myWorker.runWorkflowStep(context.Background(), &myTaskExecution, wfName, false)
require.NoError(t, err)
err = myWorker2.runWorkflowStep(context.Background(), &myTaskExecution2, wfName, false)
require.NoError(t, err)

// Test that consul contains an execution for next step
execKeys, _, err := consulutil.GetKV().Keys(consulutil.ExecutionsTaskPrefix+"/", "/", nil)
require.NoError(t, err)
foundStep := 0
for _, execKey := range execKeys {
execID := path.Base(execKey)
execPath := path.Join(consulutil.ExecutionsTaskPrefix, execID)
found, value, err := consulutil.GetStringValue(path.Join(execPath, "step"))
require.NoError(t, err)
if found && value == expectedNextStep {
foundStep++
}
}
require.Equal(t, 1, foundStep, "Found more than one task execution is registered for one step %s", expectedNextStep)
}

// Construct key/value to initialise KV before running test
func testData(deploymentID string) map[string][]byte {
return map[string][]byte{
Expand Down
21 changes: 21 additions & 0 deletions tasks/workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ func createWorkflowStepsOperations(taskID string, steps []*step) (api.KVTxnOps,
ops := make(api.KVTxnOps, 0)
var stepOps api.KVTxnOps
for _, step := range steps {
exist, err := existTaskExecutionForStep(taskID, step.Name)
if err != nil {
return ops, errors.Wrapf(err, "Failed to check registered task execution for TaskID:%q, step:%q", taskID, step.Name)
}
if exist {
log.Debugf("Step:%q in TaskID:%q has already been registered for execution", step.Name, taskID)
continue
}
// Add execution key for initial steps only
u, err := uuid.NewV4()
if err != nil {
Expand All @@ -73,13 +81,26 @@ func createWorkflowStepsOperations(taskID string, steps []*step) (api.KVTxnOps,
Key: path.Join(consulutil.TasksPrefix, taskID, ".runningExecutions", execID),
Value: []byte(""),
},
&api.KVTxnOp{
Verb: api.KVSet,
Key: path.Join(consulutil.TasksPrefix, taskID, ".registeredExecutions", step.Name),
Value: []byte(""),
},
}
ops = append(ops, stepOps...)
log.Debugf("Will store runningExecutions with id %q in txn for task %q", execID, taskID)
}
return ops, nil
}

func existTaskExecutionForStep(taskID string, stepName string) (bool, error) {
exist, _, err := consulutil.GetValue(path.Join(consulutil.TasksPrefix, taskID, ".registeredExecutions", stepName))
if err != nil {
return false, err
}
return exist, nil
}

func getCallOperationsFromStep(s *step) []string {
ops := make([]string, 0)
for _, a := range s.Activities {
Expand Down

0 comments on commit d0ac20f

Please sign in to comment.