Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ name: Go

on:
push:
branches: [ main ]
pull_request:
branches: [ main ]

Expand Down
3 changes: 1 addition & 2 deletions .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ on:
tags:
- v*
branches:
- master
- main
- "**"
pull_request:

jobs:
Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ func main() {
requests := []string{"alpha", "beta", "gamma", "delta", "epsilon"}

for _, r := range requests {
r := r
wp.Submit(func() {
fmt.Println("Handling request:", r)
})
Expand Down
2 changes: 1 addition & 1 deletion pacer/pacer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestPacedWorkers(t *testing.T) {
})

// Cause worker to be created, and available for reuse before next task.
for i := 0; i < 10; i++ {
for range 10 {
wp.Submit(pacedTask)
wp.Submit(slowPacedTask)
}
Expand Down
13 changes: 7 additions & 6 deletions workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type WorkerPool struct {
stopLock sync.Mutex
stopOnce sync.Once
stopped bool
waiting int32
waiting_old atomic.Int32
wait bool
}

Expand Down Expand Up @@ -125,7 +125,7 @@ func (p *WorkerPool) SubmitWait(task func()) {

// WaitingQueueSize returns the count of tasks in the waiting queue.
func (p *WorkerPool) WaitingQueueSize() int {
return int(atomic.LoadInt32(&p.waiting))
return int(p.waiting_old.Load())
}

// Pause causes all workers to wait on the given Context, thereby making them
Expand All @@ -148,7 +148,7 @@ func (p *WorkerPool) Pause(ctx context.Context) {
}
ready := new(sync.WaitGroup)
ready.Add(p.maxWorkers)
for i := 0; i < p.maxWorkers; i++ {
for range p.maxWorkers {
p.Submit(func() {
ready.Done()
select {
Expand Down Expand Up @@ -199,7 +199,8 @@ Loop:
} else {
// Enqueue task to be executed by next available worker.
p.waitingQueue.PushBack(task)
atomic.StoreInt32(&p.waiting, int32(p.waitingQueue.Len()))
p.waitingQueue.Len()
p.waiting_old.Store(int32(p.waitingQueue.Len()))
}
}
idle = false
Expand Down Expand Up @@ -275,7 +276,7 @@ func (p *WorkerPool) processWaitingQueue() bool {
// A worker was ready, so gave task to worker.
p.waitingQueue.PopFront()
}
atomic.StoreInt32(&p.waiting, int32(p.waitingQueue.Len()))
p.waiting_old.Store(int32(p.waitingQueue.Len()))
return true
}

Expand All @@ -296,6 +297,6 @@ func (p *WorkerPool) runQueuedTasks() {
for p.waitingQueue.Len() != 0 {
// A worker is ready, so give task to worker.
p.workerQueue <- p.waitingQueue.PopFront()
atomic.StoreInt32(&p.waiting, int32(p.waitingQueue.Len()))
p.waiting_old.Store(int32(p.waitingQueue.Len()))
}
}
37 changes: 18 additions & 19 deletions workerpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ func TestExample(t *testing.T) {

rspChan := make(chan string, len(requests))
for _, r := range requests {
r := r
wp.Submit(func() {
rspChan <- r
})
Expand Down Expand Up @@ -62,7 +61,7 @@ func TestMaxWorkers(t *testing.T) {
release := make(chan struct{})

// Start workers, and have them all wait on a channel before completing.
for i := 0; i < max; i++ {
for range max {
wp.Submit(func() {
started <- struct{}{}
<-release
Expand Down Expand Up @@ -96,7 +95,7 @@ func TestReuseWorkers(t *testing.T) {
release := make(chan struct{})

// Cause worker to be created, and available for reuse before next task.
for i := 0; i < 10; i++ {
for range 10 {
wp.Submit(func() { <-release })
release <- struct{}{}
time.Sleep(time.Millisecond)
Expand Down Expand Up @@ -178,7 +177,7 @@ func TestStop(t *testing.T) {

release := make(chan struct{})
finished := make(chan struct{}, max)
for i := 0; i < max; i++ {
for range max {
wp.Submit(func() {
<-release
finished <- struct{}{}
Expand Down Expand Up @@ -216,7 +215,7 @@ func TestStopWait(t *testing.T) {
wp := New(5)
release := make(chan struct{})
finished := make(chan struct{}, max)
for i := 0; i < max; i++ {
for range max {
wp.Submit(func() {
<-release
finished <- struct{}{}
Expand All @@ -229,7 +228,7 @@ func TestStopWait(t *testing.T) {
close(release)
}()
wp.StopWait()
for count := 0; count < max; count++ {
for range max {
select {
case <-finished:
default:
Expand Down Expand Up @@ -298,7 +297,7 @@ func TestOverflow(t *testing.T) {
releaseChan := make(chan struct{})

// Start workers, and have them all wait on a channel before completing.
for i := 0; i < 64; i++ {
for range 64 {
wp.Submit(func() { <-releaseChan })
}

Expand Down Expand Up @@ -331,7 +330,7 @@ func TestStopRace(t *testing.T) {
started.Add(max)

// Start workers, and have them all wait on a channel before completing.
for i := 0; i < max; i++ {
for range max {
wp.Submit(func() {
started.Done()
<-workRelChan
Expand All @@ -342,7 +341,7 @@ func TestStopRace(t *testing.T) {

const doneCallers = 5
stopDone := make(chan struct{}, doneCallers)
for i := 0; i < doneCallers; i++ {
for range doneCallers {
go func() {
wp.Stop()
stopDone <- struct{}{}
Expand All @@ -358,7 +357,7 @@ func TestStopRace(t *testing.T) {
close(workRelChan)

timeout := time.After(time.Second)
for i := 0; i < doneCallers; i++ {
for range doneCallers {
select {
case <-stopDone:
case <-timeout:
Expand All @@ -381,12 +380,12 @@ func TestWaitingQueueSizeRace(t *testing.T) {
defer wp.Stop()

maxChan := make(chan int)
for g := 0; g < goroutines; g++ {
for range goroutines {
go func() {
max := 0
// Submit 100 tasks, checking waiting queue size each time. Report
// the maximum queue size seen.
for i := 0; i < tasks; i++ {
for range tasks {
wp.Submit(func() {
time.Sleep(time.Microsecond)
})
Expand All @@ -401,7 +400,7 @@ func TestWaitingQueueSizeRace(t *testing.T) {

// Find maximum queuesize seen by any goroutine.
maxMax := 0
for g := 0; g < goroutines; g++ {
for range goroutines {
max := <-maxChan
if max > maxMax {
maxMax = max
Expand Down Expand Up @@ -587,7 +586,7 @@ func TestWorkerLeak(t *testing.T) {
wp := New(workerCount)

// Start workers, and have them all wait on a channel before completing.
for i := 0; i < workerCount; i++ {
for range workerCount {
wp.Submit(func() {
time.Sleep(time.Millisecond)
})
Expand Down Expand Up @@ -647,7 +646,7 @@ func BenchmarkEnqueue(b *testing.B) {
b.ResetTimer()

// Start workers, and have them all wait on a channel before completing.
for i := 0; i < b.N; i++ {
for range b.N {
wp.Submit(func() { <-releaseChan })
}
close(releaseChan)
Expand All @@ -660,9 +659,9 @@ func BenchmarkEnqueue2(b *testing.B) {
b.ResetTimer()

// Start workers, and have them all wait on a channel before completing.
for i := 0; i < b.N; i++ {
for range b.N {
releaseChan := make(chan struct{})
for i := 0; i < 64; i++ {
for range 64 {
wp.Submit(func() { <-releaseChan })
}
close(releaseChan)
Expand Down Expand Up @@ -702,8 +701,8 @@ func benchmarkExecWorkers(n int, b *testing.B) {
b.ResetTimer()

// Start workers, and have them all wait on a channel before completing.
for i := 0; i < b.N; i++ {
for j := 0; j < n; j++ {
for range b.N {
for range n {
wp.Submit(func() {
//time.Sleep(100 * time.Microsecond)
allDone.Done()
Expand Down