Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add locking to groups and tasks in eager backend #740

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 36 additions & 18 deletions v1/backends/eager/eager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,33 +45,46 @@ func (e ErrTasknotFound) Error() string {
// Backend represents an "eager" in-memory result backend
type Backend struct {
common.Backend
groups map[string][]string
tasks map[string][]byte
stateMutex sync.Mutex
groups struct {
sync.RWMutex
groups map[string][]string
}
tasks struct {
sync.RWMutex
tasks map[string][]byte
}
}

// New creates EagerBackend instance
func New() iface.Backend {
return &Backend{
b := Backend{
Backend: common.NewBackend(new(config.Config)),
groups: make(map[string][]string),
tasks: make(map[string][]byte),
}
b.groups.groups = make(map[string][]string)
b.tasks.tasks = make(map[string][]byte)
return &b
}

// InitGroup creates and saves a group meta data object
func (b *Backend) InitGroup(groupUUID string, taskUUIDs []string) error {
b.groups.Lock()
defer b.groups.Unlock()
b.tasks.Lock()
defer b.tasks.Unlock()

tasks := make([]string, 0, len(taskUUIDs))
// copy every task
tasks = append(tasks, taskUUIDs...)

b.groups[groupUUID] = tasks
b.groups.groups[groupUUID] = tasks
return nil
}

// GroupCompleted returns true if all tasks in a group finished
func (b *Backend) GroupCompleted(groupUUID string, groupTaskCount int) (bool, error) {
tasks, ok := b.groups[groupUUID]
b.groups.RLock()
defer b.groups.RUnlock()
tasks, ok := b.groups.groups[groupUUID]
if !ok {
return false, NewErrGroupNotFound(groupUUID)
}
Expand All @@ -93,7 +106,9 @@ func (b *Backend) GroupCompleted(groupUUID string, groupTaskCount int) (bool, er

// GroupTaskStates returns states of all tasks in the group
func (b *Backend) GroupTaskStates(groupUUID string, groupTaskCount int) ([]*tasks.TaskState, error) {
taskUUIDs, ok := b.groups[groupUUID]
b.groups.RLock()
defer b.groups.RUnlock()
taskUUIDs, ok := b.groups.groups[groupUUID]
if !ok {
return nil, NewErrGroupNotFound(groupUUID)
}
Expand Down Expand Up @@ -157,7 +172,9 @@ func (b *Backend) SetStateFailure(signature *tasks.Signature, err string) error

// GetState returns the latest task state
func (b *Backend) GetState(taskUUID string) (*tasks.TaskState, error) {
tasktStateBytes, ok := b.tasks[taskUUID]
b.tasks.RLock()
defer b.tasks.RUnlock()
tasktStateBytes, ok := b.tasks.tasks[taskUUID]
if !ok {
return nil, NewErrTasknotFound(taskUUID)
}
Expand All @@ -174,35 +191,36 @@ func (b *Backend) GetState(taskUUID string) (*tasks.TaskState, error) {

// PurgeState deletes stored task state
func (b *Backend) PurgeState(taskUUID string) error {
_, ok := b.tasks[taskUUID]
b.tasks.Lock()
defer b.tasks.Unlock()
_, ok := b.tasks.tasks[taskUUID]
if !ok {
return NewErrTasknotFound(taskUUID)
}

delete(b.tasks, taskUUID)
delete(b.tasks.tasks, taskUUID)
return nil
}

// PurgeGroupMeta deletes stored group meta data
func (b *Backend) PurgeGroupMeta(groupUUID string) error {
_, ok := b.groups[groupUUID]
_, ok := b.groups.groups[groupUUID]
if !ok {
return NewErrGroupNotFound(groupUUID)
}

delete(b.groups, groupUUID)
delete(b.groups.groups, groupUUID)
return nil
}

func (b *Backend) updateState(s *tasks.TaskState) error {
// simulate the behavior of json marshal/unmarshal
b.stateMutex.Lock()
defer b.stateMutex.Unlock()
b.tasks.Lock()
defer b.tasks.Unlock()
msg, err := json.Marshal(s)
if err != nil {
return fmt.Errorf("Marshal task state error: %v", err)
}

b.tasks[s.TaskUUID] = msg
b.tasks.tasks[s.TaskUUID] = msg
return nil
}
53 changes: 35 additions & 18 deletions v2/backends/eager/eager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,33 +45,43 @@ func (e ErrTasknotFound) Error() string {
// Backend represents an "eager" in-memory result backend
type Backend struct {
common.Backend
groups map[string][]string
tasks map[string][]byte
stateMutex sync.Mutex
groups struct {
sync.RWMutex
groups map[string][]string
}
tasks struct {
sync.RWMutex
tasks map[string][]byte
}
}

// New creates EagerBackend instance
func New() iface.Backend {
return &Backend{
b := Backend{
Backend: common.NewBackend(new(config.Config)),
groups: make(map[string][]string),
tasks: make(map[string][]byte),
}
b.groups.groups = make(map[string][]string)
b.tasks.tasks = make(map[string][]byte)
return &b
}

// InitGroup creates and saves a group meta data object
func (b *Backend) InitGroup(groupUUID string, taskUUIDs []string) error {
b.groups.Lock()
defer b.groups.Unlock()
tasks := make([]string, 0, len(taskUUIDs))
// copy every task
tasks = append(tasks, taskUUIDs...)

b.groups[groupUUID] = tasks
b.groups.groups[groupUUID] = tasks
return nil
}

// GroupCompleted returns true if all tasks in a group finished
func (b *Backend) GroupCompleted(groupUUID string, groupTaskCount int) (bool, error) {
tasks, ok := b.groups[groupUUID]
b.groups.RLock()
defer b.groups.RUnlock()
tasks, ok := b.groups.groups[groupUUID]
if !ok {
return false, NewErrGroupNotFound(groupUUID)
}
Expand All @@ -93,7 +103,9 @@ func (b *Backend) GroupCompleted(groupUUID string, groupTaskCount int) (bool, er

// GroupTaskStates returns states of all tasks in the group
func (b *Backend) GroupTaskStates(groupUUID string, groupTaskCount int) ([]*tasks.TaskState, error) {
taskUUIDs, ok := b.groups[groupUUID]
b.groups.RLock()
defer b.groups.RUnlock()
taskUUIDs, ok := b.groups.groups[groupUUID]
if !ok {
return nil, NewErrGroupNotFound(groupUUID)
}
Expand Down Expand Up @@ -157,7 +169,9 @@ func (b *Backend) SetStateFailure(signature *tasks.Signature, err string) error

// GetState returns the latest task state
func (b *Backend) GetState(taskUUID string) (*tasks.TaskState, error) {
tasktStateBytes, ok := b.tasks[taskUUID]
b.tasks.RLock()
defer b.tasks.RUnlock()
tasktStateBytes, ok := b.tasks.tasks[taskUUID]
if !ok {
return nil, NewErrTasknotFound(taskUUID)
}
Expand All @@ -174,35 +188,38 @@ func (b *Backend) GetState(taskUUID string) (*tasks.TaskState, error) {

// PurgeState deletes stored task state
func (b *Backend) PurgeState(taskUUID string) error {
_, ok := b.tasks[taskUUID]
b.tasks.Lock()
defer b.tasks.Unlock()
_, ok := b.tasks.tasks[taskUUID]
if !ok {
return NewErrTasknotFound(taskUUID)
}

delete(b.tasks, taskUUID)
delete(b.tasks.tasks, taskUUID)
return nil
}

// PurgeGroupMeta deletes stored group meta data
func (b *Backend) PurgeGroupMeta(groupUUID string) error {
_, ok := b.groups[groupUUID]
b.groups.Lock()
defer b.groups.Unlock()
_, ok := b.groups.groups[groupUUID]
if !ok {
return NewErrGroupNotFound(groupUUID)
}

delete(b.groups, groupUUID)
delete(b.groups.groups, groupUUID)
return nil
}

func (b *Backend) updateState(s *tasks.TaskState) error {
// simulate the behavior of json marshal/unmarshal
b.stateMutex.Lock()
defer b.stateMutex.Unlock()
b.tasks.Lock()
defer b.tasks.Unlock()
msg, err := json.Marshal(s)
if err != nil {
return fmt.Errorf("Marshal task state error: %v", err)
}

b.tasks[s.TaskUUID] = msg
b.tasks.tasks[s.TaskUUID] = msg
return nil
}