From dc3180d50e79432fc6b06236f39781c6e3ff3660 Mon Sep 17 00:00:00 2001 From: Ian Lewis Date: Mon, 17 Jan 2022 09:05:30 +0900 Subject: [PATCH] Add locking to groups and tasks in eager backend Fixes #737 --- v1/backends/eager/eager.go | 54 +++++++++++++++++++++++++------------- v2/backends/eager/eager.go | 53 ++++++++++++++++++++++++------------- 2 files changed, 71 insertions(+), 36 deletions(-) diff --git a/v1/backends/eager/eager.go b/v1/backends/eager/eager.go index 6fb6bf100..300809d4c 100644 --- a/v1/backends/eager/eager.go +++ b/v1/backends/eager/eager.go @@ -45,33 +45,46 @@ func (e ErrTasknotFound) Error() string { // Backend represents an "eager" in-memory result backend type Backend struct { common.Backend - groups map[string][]string - tasks map[string][]byte - stateMutex sync.Mutex + groups struct { + sync.RWMutex + groups map[string][]string + } + tasks struct { + sync.RWMutex + tasks map[string][]byte + } } // New creates EagerBackend instance func New() iface.Backend { - return &Backend{ + b := Backend{ Backend: common.NewBackend(new(config.Config)), - groups: make(map[string][]string), - tasks: make(map[string][]byte), } + b.groups.groups = make(map[string][]string) + b.tasks.tasks = make(map[string][]byte) + return &b } // InitGroup creates and saves a group meta data object func (b *Backend) InitGroup(groupUUID string, taskUUIDs []string) error { + b.groups.Lock() + defer b.groups.Unlock() + b.tasks.Lock() + defer b.tasks.Unlock() + tasks := make([]string, 0, len(taskUUIDs)) // copy every task tasks = append(tasks, taskUUIDs...) - b.groups[groupUUID] = tasks + b.groups.groups[groupUUID] = tasks return nil } // GroupCompleted returns true if all tasks in a group finished func (b *Backend) GroupCompleted(groupUUID string, groupTaskCount int) (bool, error) { - tasks, ok := b.groups[groupUUID] + b.groups.RLock() + defer b.groups.RUnlock() + tasks, ok := b.groups.groups[groupUUID] if !ok { return false, NewErrGroupNotFound(groupUUID) } @@ -93,7 +106,9 @@ func (b *Backend) GroupCompleted(groupUUID string, groupTaskCount int) (bool, er // GroupTaskStates returns states of all tasks in the group func (b *Backend) GroupTaskStates(groupUUID string, groupTaskCount int) ([]*tasks.TaskState, error) { - taskUUIDs, ok := b.groups[groupUUID] + b.groups.RLock() + defer b.groups.RUnlock() + taskUUIDs, ok := b.groups.groups[groupUUID] if !ok { return nil, NewErrGroupNotFound(groupUUID) } @@ -157,7 +172,9 @@ func (b *Backend) SetStateFailure(signature *tasks.Signature, err string) error // GetState returns the latest task state func (b *Backend) GetState(taskUUID string) (*tasks.TaskState, error) { - tasktStateBytes, ok := b.tasks[taskUUID] + b.tasks.RLock() + defer b.tasks.RUnlock() + tasktStateBytes, ok := b.tasks.tasks[taskUUID] if !ok { return nil, NewErrTasknotFound(taskUUID) } @@ -174,35 +191,36 @@ func (b *Backend) GetState(taskUUID string) (*tasks.TaskState, error) { // PurgeState deletes stored task state func (b *Backend) PurgeState(taskUUID string) error { - _, ok := b.tasks[taskUUID] + b.tasks.Lock() + defer b.tasks.Unlock() + _, ok := b.tasks.tasks[taskUUID] if !ok { return NewErrTasknotFound(taskUUID) } - delete(b.tasks, taskUUID) + delete(b.tasks.tasks, taskUUID) return nil } // PurgeGroupMeta deletes stored group meta data func (b *Backend) PurgeGroupMeta(groupUUID string) error { - _, ok := b.groups[groupUUID] + _, ok := b.groups.groups[groupUUID] if !ok { return NewErrGroupNotFound(groupUUID) } - delete(b.groups, groupUUID) + delete(b.groups.groups, groupUUID) return nil } func (b *Backend) updateState(s *tasks.TaskState) error { // simulate the behavior of json marshal/unmarshal - b.stateMutex.Lock() - defer b.stateMutex.Unlock() + b.tasks.Lock() + defer b.tasks.Unlock() msg, err := json.Marshal(s) if err != nil { return fmt.Errorf("Marshal task state error: %v", err) } - - b.tasks[s.TaskUUID] = msg + b.tasks.tasks[s.TaskUUID] = msg return nil } diff --git a/v2/backends/eager/eager.go b/v2/backends/eager/eager.go index 292b63b73..874b5e9e9 100644 --- a/v2/backends/eager/eager.go +++ b/v2/backends/eager/eager.go @@ -45,33 +45,43 @@ func (e ErrTasknotFound) Error() string { // Backend represents an "eager" in-memory result backend type Backend struct { common.Backend - groups map[string][]string - tasks map[string][]byte - stateMutex sync.Mutex + groups struct { + sync.RWMutex + groups map[string][]string + } + tasks struct { + sync.RWMutex + tasks map[string][]byte + } } // New creates EagerBackend instance func New() iface.Backend { - return &Backend{ + b := Backend{ Backend: common.NewBackend(new(config.Config)), - groups: make(map[string][]string), - tasks: make(map[string][]byte), } + b.groups.groups = make(map[string][]string) + b.tasks.tasks = make(map[string][]byte) + return &b } // InitGroup creates and saves a group meta data object func (b *Backend) InitGroup(groupUUID string, taskUUIDs []string) error { + b.groups.Lock() + defer b.groups.Unlock() tasks := make([]string, 0, len(taskUUIDs)) // copy every task tasks = append(tasks, taskUUIDs...) - b.groups[groupUUID] = tasks + b.groups.groups[groupUUID] = tasks return nil } // GroupCompleted returns true if all tasks in a group finished func (b *Backend) GroupCompleted(groupUUID string, groupTaskCount int) (bool, error) { - tasks, ok := b.groups[groupUUID] + b.groups.RLock() + defer b.groups.RUnlock() + tasks, ok := b.groups.groups[groupUUID] if !ok { return false, NewErrGroupNotFound(groupUUID) } @@ -93,7 +103,9 @@ func (b *Backend) GroupCompleted(groupUUID string, groupTaskCount int) (bool, er // GroupTaskStates returns states of all tasks in the group func (b *Backend) GroupTaskStates(groupUUID string, groupTaskCount int) ([]*tasks.TaskState, error) { - taskUUIDs, ok := b.groups[groupUUID] + b.groups.RLock() + defer b.groups.RUnlock() + taskUUIDs, ok := b.groups.groups[groupUUID] if !ok { return nil, NewErrGroupNotFound(groupUUID) } @@ -157,7 +169,9 @@ func (b *Backend) SetStateFailure(signature *tasks.Signature, err string) error // GetState returns the latest task state func (b *Backend) GetState(taskUUID string) (*tasks.TaskState, error) { - tasktStateBytes, ok := b.tasks[taskUUID] + b.tasks.RLock() + defer b.tasks.RUnlock() + tasktStateBytes, ok := b.tasks.tasks[taskUUID] if !ok { return nil, NewErrTasknotFound(taskUUID) } @@ -174,35 +188,38 @@ func (b *Backend) GetState(taskUUID string) (*tasks.TaskState, error) { // PurgeState deletes stored task state func (b *Backend) PurgeState(taskUUID string) error { - _, ok := b.tasks[taskUUID] + b.tasks.Lock() + defer b.tasks.Unlock() + _, ok := b.tasks.tasks[taskUUID] if !ok { return NewErrTasknotFound(taskUUID) } - delete(b.tasks, taskUUID) + delete(b.tasks.tasks, taskUUID) return nil } // PurgeGroupMeta deletes stored group meta data func (b *Backend) PurgeGroupMeta(groupUUID string) error { - _, ok := b.groups[groupUUID] + b.groups.Lock() + defer b.groups.Unlock() + _, ok := b.groups.groups[groupUUID] if !ok { return NewErrGroupNotFound(groupUUID) } - delete(b.groups, groupUUID) + delete(b.groups.groups, groupUUID) return nil } func (b *Backend) updateState(s *tasks.TaskState) error { // simulate the behavior of json marshal/unmarshal - b.stateMutex.Lock() - defer b.stateMutex.Unlock() + b.tasks.Lock() + defer b.tasks.Unlock() msg, err := json.Marshal(s) if err != nil { return fmt.Errorf("Marshal task state error: %v", err) } - - b.tasks[s.TaskUUID] = msg + b.tasks.tasks[s.TaskUUID] = msg return nil }