-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpool.go
161 lines (138 loc) · 4.66 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package nodejs
import (
"context"
"runtime"
"sync"
"sync/atomic"
"time"
)
// Pool manages a pool of NodeJS processes that can be used to execute JavaScript code.
// It automatically maintains a queue of ready processes and creates new ones as needed.
type Pool struct {
queue []*Process // Queue of available NodeJS processes
factory *Factory // Factory used to create new processes
maxProcs uint64 // Maximum number of processes allowed
procs uint64 // Current count of processes
spawner atomic.Bool // Flag indicating if spawner goroutine is running
addcond *sync.Cond // Condition variable for process addition
availcond *sync.Cond // Condition variable for process availability
lk sync.Mutex // Mutex for protecting shared state
Timeout time.Duration // Timeout for running nodejs, defaults to 10 second if zero
}
// NewPool returns a pool of nodejs processes generated by the factory. The pool will
// always generate at least queueSize processes in memory and will start new ones as
// they are taken. Passing queueSize <=0 will use the number of CPUs.
func (f *Factory) NewPool(queueSize, maxProcs int) *Pool {
if queueSize <= 0 {
queueSize = runtime.NumCPU()
}
if maxProcs <= 0 {
maxProcs = runtime.NumCPU()
}
p := &Pool{
maxProcs: uint64(maxProcs),
factory: f,
}
p.addcond = sync.NewCond(&p.lk)
p.availcond = sync.NewCond(&p.lk)
go p.run()
return p
}
// TakeIfAvailable returns a Process if any is immediately available, or nil if not.
// This method does not wait or block if no processes are available.
func (pool *Pool) TakeIfAvailable() *Process {
pool.lk.Lock()
defer pool.lk.Unlock()
if len(pool.queue) == 0 {
return nil
}
res := pool.queue[0]
pool.queue = pool.queue[1:]
return res
}
// Take returns a [Process] from the pool and will wait until one is available,
// unless the context is cancelled.
func (pool *Pool) Take(ctx context.Context) (*Process, error) {
pool.lk.Lock()
defer pool.lk.Unlock()
def := context.AfterFunc(ctx, func() {
pool.availcond.L.Lock()
defer pool.availcond.L.Unlock()
pool.availcond.Broadcast()
})
defer def()
for {
if e := ctx.Err(); e != nil {
return nil, e
}
if len(pool.queue) > 0 {
res := pool.queue[0]
pool.queue = pool.queue[1:]
return res, nil
}
pool.availcond.Wait()
}
}
// TakeTimeout will return a [Process] taken from the pool if any is available
// before the timeout expires.
func (pool *Pool) TakeTimeout(t time.Duration) (*Process, error) {
ctx, cancel := context.WithTimeout(context.Background(), t)
defer cancel()
return pool.Take(ctx)
}
// addProcess adds a new Process to the pool and notifies waiting goroutines
// that a new process is available.
func (pool *Pool) addProcess(p *Process) {
pool.lk.Lock()
defer pool.lk.Unlock()
pool.queue = append(pool.queue, p)
pool.availcond.Broadcast() // Signal that a process is available
}
// run is the main goroutine that creates new NodeJS processes for the pool.
// It ensures only one spawner is running at a time and maintains the number
// of processes within limits.
func (pool *Pool) run() {
// Ensure only one spawner is running
if !pool.spawner.CompareAndSwap(false, true) {
// means run() is being called more than once
return
}
defer pool.spawner.Store(false)
for {
// Check if we've reached the maximum allowed processes
if atomic.LoadUint64(&pool.procs) > pool.maxProcs {
// too many running, stop
// Note: we can have a very special edge case where we end at the exact time all processes stop
// this is very unlikely to happen, especially if we have multiple processes
return
}
// Use configured timeout or default to 10 seconds
timeout := pool.Timeout
if timeout <= 0 {
timeout = 10 * time.Second
}
// Create a new NodeJS process
newProc, err := pool.factory.NewWithTimeout(timeout)
if err != nil {
// If creation fails, wait and try again
time.Sleep(10 * time.Second)
continue
}
// Increment process counter and register cleanup handler
atomic.AddUint64(&pool.procs, 1)
newProc.cleanup = append(newProc.cleanup, pool.procsSubOne)
// Add the new process to the available queue
pool.addProcess(newProc)
}
}
// procsSubOne decrements the process counter and restarts the spawner if needed.
// This is called when a process exits or is closed, as part of its cleanup handlers.
func (pool *Pool) procsSubOne() {
// Decrement the process counter (^uint64(0) is the bitwise NOT of 0, which is all 1s)
// In 2's complement, adding all 1s is equivalent to subtracting 1
atomic.AddUint64(&pool.procs, ^uint64(0))
// If the spawner isn't running, restart it to maintain the pool
if !pool.spawner.Load() {
go pool.run()
}
}