From d0ac20f47f276b609285ba3e9961e656dde3948e Mon Sep 17 00:00:00 2001 From: trihoangvo Date: Mon, 10 Oct 2022 16:51:38 +0200 Subject: [PATCH] Fix concurrent workers register duplicate task execution for nextstep in the joined node * When one or more steps (e.g., Step1 and Step2) join one Step 3, it may happen occasionally that two workers register two duplicate task executions for one next step. See bug report in https://github.com/ystia/yorc/issues/786 * Fix: Before registering a new task execution, a worker checks if the task execution has already been registered for the given step (at _yorc/tasks//.registeredExecutions/) When deleting the ".runningExecutions" in notifyEnd(), also delete the ".registeredExecution". * Note: If the step status is DONE, ERROR, or CANCELED, we still register a task execution. This is the case where we resume a workflow. The task executions are registered run again. But they will be bypass if the given step is DONE. --- tasks/workflow/consul_test.go | 3 ++ tasks/workflow/task_execution.go | 24 ++++++++- tasks/workflow/testdata/workflow.yaml | 22 ++++++++ tasks/workflow/worker_test.go | 75 +++++++++++++++++++++++++++ tasks/workflow/workflow.go | 21 ++++++++ 5 files changed, 143 insertions(+), 2 deletions(-) diff --git a/tasks/workflow/consul_test.go b/tasks/workflow/consul_test.go index 6730cf4ce..53ec2e891 100644 --- a/tasks/workflow/consul_test.go +++ b/tasks/workflow/consul_test.go @@ -79,6 +79,9 @@ func TestRunConsulWorkflowPackageTests(t *testing.T) { t.Run("TestRunWorkflowStepReplay", func(t *testing.T) { testRunWorkflowStepReplay(t, srv, client) }) + t.Run("testConcurrentTaskExecutionsForNextStep", func(t *testing.T) { + testConcurrentTaskExecutionsForNextStep(t, srv, client) + }) }) } diff --git a/tasks/workflow/task_execution.go b/tasks/workflow/task_execution.go index 86dcaf350..5601a5d6e 100644 --- a/tasks/workflow/task_execution.go +++ b/tasks/workflow/task_execution.go @@ -19,6 +19,7 @@ import ( "fmt" "path" "strconv" + "strings" "time" "github.com/hashicorp/consul/api" @@ -141,6 +142,7 @@ func numberOfRunningExecutionsForTask(cc *api.Client, taskID string) (*consuluti func (t *taskExecution) notifyEnd() error { log.Debugf("notifyEnd for taskExecution %q", t.id) execPath := path.Join(consulutil.TasksPrefix, t.taskID, ".runningExecutions") + registeredPath := path.Join(consulutil.TasksPrefix, t.taskID, ".registeredExecutions") l, e, err := numberOfRunningExecutionsForTask(t.cc, t.taskID) if err != nil { return err @@ -149,10 +151,28 @@ func (t *taskExecution) notifyEnd() error { kv := t.cc.KV() // Delete our execID + ops := api.KVTxnOps{ + &api.KVTxnOp{ + Verb: api.KVDelete, + Key: path.Join(execPath, t.id), + }, + &api.KVTxnOp{ + Verb: api.KVDelete, + Key: path.Join(registeredPath, t.step), + }, + } log.Debugf("Deleting runningExecutions with id %q for task %q", t.id, t.taskID) - _, err = kv.Delete(path.Join(execPath, t.id), nil) + ok, response, _, err := kv.Txn(ops, nil) if err != nil { - return errors.Wrap(err, consulutil.ConsulGenericErrMsg) + return errors.Wrap(err, "Failed to execute transaction") + } + if !ok { + // Check the response + var errs []string + for _, e := range response.Errors { + errs = append(errs, e.What) + } + return errors.Errorf("Failed to execute transaction: %s", strings.Join(errs, ", ")) } if e <= 1 && t.finalFunction != nil { log.Debugf("notifyEnd running finalFunction for taskExecution %q", t.id) diff --git a/tasks/workflow/testdata/workflow.yaml b/tasks/workflow/testdata/workflow.yaml index 89600bd64..3055696a4 100644 --- a/tasks/workflow/testdata/workflow.yaml +++ b/tasks/workflow/testdata/workflow.yaml @@ -79,6 +79,22 @@ topology_template: protocol: tcp network_name: PRIVATE initiator: source + + Compute_2: + type: ystia.yorc.tests.nodes.WFCompute + capabilities: + scalable: + properties: + min_instances: 1 + max_instances: 1 + default_instances: 1 + endpoint: + properties: + secure: true + protocol: tcp + network_name: PRIVATE + initiator: source + JobNode: type: ystia.yorc.tests.nodes.JobNode workflows: @@ -142,6 +158,12 @@ topology_template: on_success: - WFNode_hostedOnComputeHost_add_target - WFNode_creating + Compute_2_install: + target: Compute_2 + activities: + - delegate: install + on_success: + - WFNode_creating WFNode_initial: target: WFNode activities: diff --git a/tasks/workflow/worker_test.go b/tasks/workflow/worker_test.go index 29502746e..d88ac5daa 100644 --- a/tasks/workflow/worker_test.go +++ b/tasks/workflow/worker_test.go @@ -233,6 +233,81 @@ func testRunWorkflowStepReplay(t *testing.T, srv *testutil.TestServer, client *a require.Equal(t, true, foundStep, "Did not find step %s in next steps to execute", expectedNextStep) } +func testConcurrentTaskExecutionsForNextStep(t *testing.T, srv *testutil.TestServer, client *api.Client) { + myWorker := &worker{ + consulClient: client, + cfg: config.Configuration{ + // Ensure we are not deleting filesystem files elsewhere + WorkingDirectory: "./testdata/work/", + }, + } + myWorker2 := &worker{ + consulClient: client, + cfg: config.Configuration{ + // Ensure we are not deleting filesystem files elsewhere + WorkingDirectory: "./testdata/work/", + }, + } + deploymentID := "TestRunWf" + taskID := "tWorkflow" + topologyPath := "testdata/workflow.yaml" + ctx := context.Background() + err := deployments.StoreDeploymentDefinition(ctx, deploymentID, topologyPath) + require.NoError(t, err, "Unexpected error storing %s", topologyPath) + + // Registering test data with the first step of the workflow already done + srv.PopulateKV(t, testData(deploymentID)) + + wfName := "install" + + srv.PopulateKV(t, testData(deploymentID)) + deployments.SetDeploymentStatus(context.Background(), deploymentID, deployments.DEPLOYMENT_IN_PROGRESS) + + mockExecutor := &mockExecutor{} + registry.GetRegistry().RegisterDelegates([]string{"ystia.yorc.tests.nodes.WFCompute"}, mockExecutor, "tests") + + var myTaskExecution taskExecution + myTaskExecution.cc = client + myTaskExecution.targetID = deploymentID + myTaskExecution.taskID = taskID + myTaskExecution.step = "Compute_install" + + var myTaskExecution2 taskExecution + myTaskExecution2.cc = client + myTaskExecution2.targetID = deploymentID + myTaskExecution2.taskID = taskID + myTaskExecution2.step = "Compute_2_install" + + expectedNextStep := "WFNode_creating" + + // Test for duplicated task execution + // When Compute_install and Compute_2_install steps complete, they continue to register the next step WFNode_creating + // but only one task execution should be registered for the step WFNode_creating + srv.SetKV(t, path.Join(consulutil.WorkflowsPrefix, taskID, "Compute_install"), []byte("initial")) + srv.SetKV(t, path.Join(consulutil.WorkflowsPrefix, taskID, "Compute_2_install"), []byte("DONE")) + srv.SetKV(t, path.Join(consulutil.WorkflowsPrefix, taskID, "WFNode_initial"), []byte("DONE")) + + err = myWorker.runWorkflowStep(context.Background(), &myTaskExecution, wfName, false) + require.NoError(t, err) + err = myWorker2.runWorkflowStep(context.Background(), &myTaskExecution2, wfName, false) + require.NoError(t, err) + + // Test that consul contains an execution for next step + execKeys, _, err := consulutil.GetKV().Keys(consulutil.ExecutionsTaskPrefix+"/", "/", nil) + require.NoError(t, err) + foundStep := 0 + for _, execKey := range execKeys { + execID := path.Base(execKey) + execPath := path.Join(consulutil.ExecutionsTaskPrefix, execID) + found, value, err := consulutil.GetStringValue(path.Join(execPath, "step")) + require.NoError(t, err) + if found && value == expectedNextStep { + foundStep++ + } + } + require.Equal(t, 1, foundStep, "Found more than one task execution is registered for one step %s", expectedNextStep) +} + // Construct key/value to initialise KV before running test func testData(deploymentID string) map[string][]byte { return map[string][]byte{ diff --git a/tasks/workflow/workflow.go b/tasks/workflow/workflow.go index 097732d05..a99d246c6 100644 --- a/tasks/workflow/workflow.go +++ b/tasks/workflow/workflow.go @@ -49,6 +49,14 @@ func createWorkflowStepsOperations(taskID string, steps []*step) (api.KVTxnOps, ops := make(api.KVTxnOps, 0) var stepOps api.KVTxnOps for _, step := range steps { + exist, err := existTaskExecutionForStep(taskID, step.Name) + if err != nil { + return ops, errors.Wrapf(err, "Failed to check registered task execution for TaskID:%q, step:%q", taskID, step.Name) + } + if exist { + log.Debugf("Step:%q in TaskID:%q has already been registered for execution", step.Name, taskID) + continue + } // Add execution key for initial steps only u, err := uuid.NewV4() if err != nil { @@ -73,6 +81,11 @@ func createWorkflowStepsOperations(taskID string, steps []*step) (api.KVTxnOps, Key: path.Join(consulutil.TasksPrefix, taskID, ".runningExecutions", execID), Value: []byte(""), }, + &api.KVTxnOp{ + Verb: api.KVSet, + Key: path.Join(consulutil.TasksPrefix, taskID, ".registeredExecutions", step.Name), + Value: []byte(""), + }, } ops = append(ops, stepOps...) log.Debugf("Will store runningExecutions with id %q in txn for task %q", execID, taskID) @@ -80,6 +93,14 @@ func createWorkflowStepsOperations(taskID string, steps []*step) (api.KVTxnOps, return ops, nil } +func existTaskExecutionForStep(taskID string, stepName string) (bool, error) { + exist, _, err := consulutil.GetValue(path.Join(consulutil.TasksPrefix, taskID, ".registeredExecutions", stepName)) + if err != nil { + return false, err + } + return exist, nil +} + func getCallOperationsFromStep(s *step) []string { ops := make([]string, 0) for _, a := range s.Activities {