Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix concurrent workers register duplicate task execution for nextstep #787

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions tasks/workflow/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ func TestRunConsulWorkflowPackageTests(t *testing.T) {
t.Run("TestRunWorkflowStepReplay", func(t *testing.T) {
testRunWorkflowStepReplay(t, srv, client)
})
t.Run("testConcurrentTaskExecutionsForNextStep", func(t *testing.T) {
testConcurrentTaskExecutionsForNextStep(t, srv, client)
})
})
}

Expand Down
24 changes: 22 additions & 2 deletions tasks/workflow/task_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"path"
"strconv"
"strings"
"time"

"github.com/hashicorp/consul/api"
Expand Down Expand Up @@ -141,6 +142,7 @@ func numberOfRunningExecutionsForTask(cc *api.Client, taskID string) (*consuluti
func (t *taskExecution) notifyEnd() error {
log.Debugf("notifyEnd for taskExecution %q", t.id)
execPath := path.Join(consulutil.TasksPrefix, t.taskID, ".runningExecutions")
registeredPath := path.Join(consulutil.TasksPrefix, t.taskID, ".registeredExecutions")
l, e, err := numberOfRunningExecutionsForTask(t.cc, t.taskID)
if err != nil {
return err
Expand All @@ -149,10 +151,28 @@ func (t *taskExecution) notifyEnd() error {

kv := t.cc.KV()
// Delete our execID
ops := api.KVTxnOps{
&api.KVTxnOp{
Verb: api.KVDelete,
Key: path.Join(execPath, t.id),
},
&api.KVTxnOp{
Verb: api.KVDelete,
Key: path.Join(registeredPath, t.step),
},
}
log.Debugf("Deleting runningExecutions with id %q for task %q", t.id, t.taskID)
_, err = kv.Delete(path.Join(execPath, t.id), nil)
ok, response, _, err := kv.Txn(ops, nil)
if err != nil {
return errors.Wrap(err, consulutil.ConsulGenericErrMsg)
return errors.Wrap(err, "Failed to execute transaction")
}
if !ok {
// Check the response
var errs []string
for _, e := range response.Errors {
errs = append(errs, e.What)
}
return errors.Errorf("Failed to execute transaction: %s", strings.Join(errs, ", "))
}
if e <= 1 && t.finalFunction != nil {
log.Debugf("notifyEnd running finalFunction for taskExecution %q", t.id)
Expand Down
22 changes: 22 additions & 0 deletions tasks/workflow/testdata/workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,22 @@ topology_template:
protocol: tcp
network_name: PRIVATE
initiator: source

Compute_2:
type: ystia.yorc.tests.nodes.WFCompute
capabilities:
scalable:
properties:
min_instances: 1
max_instances: 1
default_instances: 1
endpoint:
properties:
secure: true
protocol: tcp
network_name: PRIVATE
initiator: source

JobNode:
type: ystia.yorc.tests.nodes.JobNode
workflows:
Expand Down Expand Up @@ -142,6 +158,12 @@ topology_template:
on_success:
- WFNode_hostedOnComputeHost_add_target
- WFNode_creating
Compute_2_install:
target: Compute_2
activities:
- delegate: install
on_success:
- WFNode_creating
WFNode_initial:
target: WFNode
activities:
Expand Down
75 changes: 75 additions & 0 deletions tasks/workflow/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,81 @@ func testRunWorkflowStepReplay(t *testing.T, srv *testutil.TestServer, client *a
require.Equal(t, true, foundStep, "Did not find step %s in next steps to execute", expectedNextStep)
}

func testConcurrentTaskExecutionsForNextStep(t *testing.T, srv *testutil.TestServer, client *api.Client) {
myWorker := &worker{
consulClient: client,
cfg: config.Configuration{
// Ensure we are not deleting filesystem files elsewhere
WorkingDirectory: "./testdata/work/",
},
}
myWorker2 := &worker{
consulClient: client,
cfg: config.Configuration{
// Ensure we are not deleting filesystem files elsewhere
WorkingDirectory: "./testdata/work/",
},
}
deploymentID := "TestRunWf"
taskID := "tWorkflow"
topologyPath := "testdata/workflow.yaml"
ctx := context.Background()
err := deployments.StoreDeploymentDefinition(ctx, deploymentID, topologyPath)
require.NoError(t, err, "Unexpected error storing %s", topologyPath)

// Registering test data with the first step of the workflow already done
srv.PopulateKV(t, testData(deploymentID))

wfName := "install"

srv.PopulateKV(t, testData(deploymentID))
deployments.SetDeploymentStatus(context.Background(), deploymentID, deployments.DEPLOYMENT_IN_PROGRESS)

mockExecutor := &mockExecutor{}
registry.GetRegistry().RegisterDelegates([]string{"ystia.yorc.tests.nodes.WFCompute"}, mockExecutor, "tests")

var myTaskExecution taskExecution
myTaskExecution.cc = client
myTaskExecution.targetID = deploymentID
myTaskExecution.taskID = taskID
myTaskExecution.step = "Compute_install"

var myTaskExecution2 taskExecution
myTaskExecution2.cc = client
myTaskExecution2.targetID = deploymentID
myTaskExecution2.taskID = taskID
myTaskExecution2.step = "Compute_2_install"

expectedNextStep := "WFNode_creating"

// Test for duplicated task execution
// When Compute_install and Compute_2_install steps complete, they continue to register the next step WFNode_creating
// but only one task execution should be registered for the step WFNode_creating
srv.SetKV(t, path.Join(consulutil.WorkflowsPrefix, taskID, "Compute_install"), []byte("initial"))
srv.SetKV(t, path.Join(consulutil.WorkflowsPrefix, taskID, "Compute_2_install"), []byte("DONE"))
srv.SetKV(t, path.Join(consulutil.WorkflowsPrefix, taskID, "WFNode_initial"), []byte("DONE"))

err = myWorker.runWorkflowStep(context.Background(), &myTaskExecution, wfName, false)
require.NoError(t, err)
err = myWorker2.runWorkflowStep(context.Background(), &myTaskExecution2, wfName, false)
require.NoError(t, err)

// Test that consul contains an execution for next step
execKeys, _, err := consulutil.GetKV().Keys(consulutil.ExecutionsTaskPrefix+"/", "/", nil)
require.NoError(t, err)
foundStep := 0
for _, execKey := range execKeys {
execID := path.Base(execKey)
execPath := path.Join(consulutil.ExecutionsTaskPrefix, execID)
found, value, err := consulutil.GetStringValue(path.Join(execPath, "step"))
require.NoError(t, err)
if found && value == expectedNextStep {
foundStep++
}
}
require.Equal(t, 1, foundStep, "Found more than one task execution is registered for one step %s", expectedNextStep)
}

// Construct key/value to initialise KV before running test
func testData(deploymentID string) map[string][]byte {
return map[string][]byte{
Expand Down
21 changes: 21 additions & 0 deletions tasks/workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ func createWorkflowStepsOperations(taskID string, steps []*step) (api.KVTxnOps,
ops := make(api.KVTxnOps, 0)
var stepOps api.KVTxnOps
for _, step := range steps {
exist, err := existTaskExecutionForStep(taskID, step.Name)
if err != nil {
return ops, errors.Wrapf(err, "Failed to check registered task execution for TaskID:%q, step:%q", taskID, step.Name)
}
if exist {
log.Debugf("Step:%q in TaskID:%q has already been registered for execution", step.Name, taskID)
continue
}
// Add execution key for initial steps only
u, err := uuid.NewV4()
if err != nil {
Expand All @@ -73,13 +81,26 @@ func createWorkflowStepsOperations(taskID string, steps []*step) (api.KVTxnOps,
Key: path.Join(consulutil.TasksPrefix, taskID, ".runningExecutions", execID),
Value: []byte(""),
},
&api.KVTxnOp{
Verb: api.KVSet,
Key: path.Join(consulutil.TasksPrefix, taskID, ".registeredExecutions", step.Name),
Value: []byte(""),
},
}
ops = append(ops, stepOps...)
log.Debugf("Will store runningExecutions with id %q in txn for task %q", execID, taskID)
}
return ops, nil
}

func existTaskExecutionForStep(taskID string, stepName string) (bool, error) {
exist, _, err := consulutil.GetValue(path.Join(consulutil.TasksPrefix, taskID, ".registeredExecutions", stepName))
if err != nil {
return false, err
}
return exist, nil
}

func getCallOperationsFromStep(s *step) []string {
ops := make([]string, 0)
for _, a := range s.Activities {
Expand Down