Skip to content

Commit a36dccd

Browse files
author
mouxin
committed
[Feature] Update Counter Release
1 parent 876d6ab commit a36dccd

File tree

3 files changed

+247
-46
lines changed

3 files changed

+247
-46
lines changed

fastdeploy/golang_router/internal/scheduler/common/counter.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,16 @@ func (c *Counter) Inc() {
1212
c.count.Add(1)
1313
}
1414

15-
func (c *Counter) Dec() {
16-
c.count.Add(^uint64(0))
15+
func (c *Counter) Dec() bool {
16+
for {
17+
old := c.count.Load()
18+
if old == 0 {
19+
return false
20+
}
21+
if c.count.CompareAndSwap(old, old-1) {
22+
return true
23+
}
24+
}
1725
}
1826

1927
func (c *Counter) Get() uint64 {

fastdeploy/golang_router/internal/scheduler/handler/handler.go

Lines changed: 54 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -137,13 +137,22 @@ func SelectWorker(ctx context.Context, workers []string, message string, workerT
137137
return selectWorkerURL, nil
138138
}
139139

140-
// Release decreases the counter for the specified worker URL
140+
// Release decreases the counter for the specified worker URL.
141+
// Uses GetCounter (not GetOrCreateCounter) to avoid creating ghost entries
142+
// when the counter has already been cleaned up.
141143
func Release(ctx context.Context, url string) {
142144
if DefaultScheduler == nil {
143145
return
144146
}
145-
counter := GetOrCreateCounter(ctx, url)
146-
counter.Dec()
147+
counter, exists := GetCounter(ctx, url)
148+
if !exists {
149+
logger.Warn(ctx, "release worker: %s skipped, counter already cleaned up", url)
150+
return
151+
}
152+
if !counter.Dec() {
153+
logger.Warn(ctx, "release worker: %s skipped, counter already zero (possible double-release)", url)
154+
return
155+
}
147156
logger.Info(ctx, "release worker: %s, count: %d", url, counter.Get())
148157
}
149158

@@ -172,7 +181,9 @@ func GetOrCreateCounter(ctx context.Context, url string) *scheduler_common.Count
172181
return newCounter
173182
}
174183

175-
// CleanupUnhealthyCounter removes counters for unhealthy worker URLs
184+
// CleanupUnhealthyCounter removes counters for unhealthy worker URLs only
185+
// when the counter has reached zero (no inflight requests). If there are
186+
// still inflight requests, the counter is preserved so Dec() works correctly.
176187
func CleanupUnhealthyCounter(ctx context.Context, unhealthyRootURL string) {
177188
if unhealthyRootURL == "" {
178189
return
@@ -185,12 +196,27 @@ func CleanupUnhealthyCounter(ctx context.Context, unhealthyRootURL string) {
185196
DefaultScheduler.mu.Lock()
186197
defer DefaultScheduler.mu.Unlock()
187198

188-
delete(DefaultScheduler.IdCounterMap, unhealthyRootURL)
189-
delete(DefaultScheduler.tokenMap, unhealthyRootURL)
190-
logger.Info(ctx, "cleanup unhealthy worker counter: %s", unhealthyRootURL)
199+
if counter, exists := DefaultScheduler.IdCounterMap[unhealthyRootURL]; exists {
200+
if counter.Get() > 0 {
201+
logger.Info(ctx, "unhealthy worker counter preserved (inflight requests): %s, count: %d", unhealthyRootURL, counter.Get())
202+
} else {
203+
delete(DefaultScheduler.IdCounterMap, unhealthyRootURL)
204+
logger.Info(ctx, "cleanup unhealthy worker counter: %s", unhealthyRootURL)
205+
}
206+
}
207+
208+
if tokenCounter, exists := DefaultScheduler.tokenMap[unhealthyRootURL]; exists {
209+
if tokenCounter.Get() > 0 {
210+
logger.Info(ctx, "unhealthy worker token counter preserved (inflight requests): %s, tokens: %d", unhealthyRootURL, tokenCounter.Get())
211+
} else {
212+
delete(DefaultScheduler.tokenMap, unhealthyRootURL)
213+
logger.Info(ctx, "cleanup unhealthy worker token counter: %s", unhealthyRootURL)
214+
}
215+
}
191216
}
192217

193218
// CleanupInvalidCounters removes counters for invalid or unreachable workers
219+
// only when their counter has reached zero (no inflight requests).
194220
func CleanupInvalidCounters(ctx context.Context) {
195221
if DefaultScheduler == nil {
196222
return
@@ -212,22 +238,32 @@ func CleanupInvalidCounters(ctx context.Context) {
212238
defer DefaultScheduler.mu.Unlock()
213239

214240
var removed []string
215-
for rootURL := range DefaultScheduler.IdCounterMap {
241+
var preserved []string
242+
for rootURL, counter := range DefaultScheduler.IdCounterMap {
216243
if _, exists := healthyMap[rootURL]; !exists {
217-
delete(DefaultScheduler.IdCounterMap, rootURL)
218-
removed = append(removed, rootURL)
244+
if counter.Get() > 0 {
245+
preserved = append(preserved, rootURL)
246+
} else {
247+
delete(DefaultScheduler.IdCounterMap, rootURL)
248+
removed = append(removed, rootURL)
249+
}
219250
}
220251
}
221252

222-
for rootURL := range DefaultScheduler.tokenMap {
253+
for rootURL, tokenCounter := range DefaultScheduler.tokenMap {
223254
if _, exists := healthyMap[rootURL]; !exists {
224-
delete(DefaultScheduler.tokenMap, rootURL)
255+
if tokenCounter.Get() == 0 {
256+
delete(DefaultScheduler.tokenMap, rootURL)
257+
}
225258
}
226259
}
227260

228261
if len(removed) > 0 {
229262
logger.Info(ctx, "removed counters for %d unhealthy workers: %v", len(removed), removed)
230263
}
264+
if len(preserved) > 0 {
265+
logger.Info(ctx, "preserved counters for %d workers with inflight requests: %v", len(preserved), preserved)
266+
}
231267
}
232268

233269
// StartBackupCleanupTask starts a background task for cleaning up invalid counters
@@ -280,12 +316,16 @@ func estimateTokens(message string) uint64 {
280316
return uint64(runeCount * 2)
281317
}
282318

283-
// ReleasePrefillTokens releases the corresponding token load when request ends
319+
// ReleasePrefillTokens releases the corresponding token load when request ends.
320+
// Uses GetTokenCounter (not GetOrCreateTokenCounter) to avoid creating ghost entries.
284321
func ReleasePrefillTokens(ctx context.Context, url, message string) {
285322
if DefaultScheduler == nil || url == "" || message == "" {
286323
return
287324
}
288-
tokenCounter := GetOrCreateTokenCounter(ctx, url)
325+
tokenCounter, exists := GetTokenCounter(ctx, url)
326+
if !exists {
327+
return
328+
}
289329
tokenCounter.Sub(estimateTokens(message))
290330
logger.Info(ctx, "release prefill tokens: %s, tokens: %d", url, tokenCounter.Get())
291331
}

fastdeploy/golang_router/internal/scheduler/handler/handler_test.go

Lines changed: 183 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,16 @@ func TestCounterOperations(t *testing.T) {
9797
counter.Inc()
9898
assert.Equal(t, uint64(1), counter.Get())
9999

100-
counter.Dec()
100+
ok := counter.Dec()
101+
assert.True(t, ok)
102+
assert.Equal(t, uint64(0), counter.Get())
103+
})
104+
105+
t.Run("counter underflow protection", func(t *testing.T) {
106+
counter := GetOrCreateCounter(ctx, "test-underflow")
107+
assert.Equal(t, uint64(0), counter.Get())
108+
ok := counter.Dec()
109+
assert.False(t, ok)
101110
assert.Equal(t, uint64(0), counter.Get())
102111
})
103112

@@ -117,28 +126,49 @@ func TestCleanupInvalidCounters(t *testing.T) {
117126
ctx := context.Background()
118127
Init(&config.Config{}, &mockManagerAPI{})
119128

120-
// Add some counters
121-
c1 := GetOrCreateCounter(ctx, "worker1")
122-
c1.Inc()
123-
GetOrCreateCounter(ctx, "invalid-worker") // Should be cleaned up
129+
t.Run("idle invalid counter deleted", func(t *testing.T) {
130+
// Add some counters
131+
c1 := GetOrCreateCounter(ctx, "worker1")
132+
c1.Inc()
133+
GetOrCreateCounter(ctx, "invalid-worker") // idle, should be cleaned up
134+
135+
tc1 := GetOrCreateTokenCounter(ctx, "worker1")
136+
tc1.Add(100)
137+
GetOrCreateTokenCounter(ctx, "invalid-worker") // idle, should be cleaned up
138+
139+
CleanupInvalidCounters(ctx)
124140

125-
tc1 := GetOrCreateTokenCounter(ctx, "worker1")
126-
tc1.Add(100)
127-
GetOrCreateTokenCounter(ctx, "invalid-worker") // Should be cleaned up
141+
// Healthy worker counters remain
142+
_, exists := GetCounter(ctx, "worker1")
143+
assert.True(t, exists)
144+
_, exists = GetTokenCounter(ctx, "worker1")
145+
assert.True(t, exists)
128146

129-
CleanupInvalidCounters(ctx)
147+
// Idle invalid worker counters deleted
148+
_, exists = GetCounter(ctx, "invalid-worker")
149+
assert.False(t, exists)
150+
_, exists = GetTokenCounter(ctx, "invalid-worker")
151+
assert.False(t, exists)
152+
})
130153

131-
// Verify counters
132-
_, exists := GetCounter(ctx, "worker1")
133-
assert.True(t, exists)
134-
_, exists = GetCounter(ctx, "invalid-worker")
135-
assert.False(t, exists)
154+
t.Run("inflight invalid counter preserved", func(t *testing.T) {
155+
Init(&config.Config{}, &mockManagerAPI{})
136156

137-
// Verify token counters
138-
_, exists = GetTokenCounter(ctx, "worker1")
139-
assert.True(t, exists)
140-
_, exists = GetTokenCounter(ctx, "invalid-worker")
141-
assert.False(t, exists)
157+
inflightCounter := GetOrCreateCounter(ctx, "inflight-invalid-worker")
158+
inflightCounter.Inc() // simulate inflight request
159+
inflightTC := GetOrCreateTokenCounter(ctx, "inflight-invalid-worker")
160+
inflightTC.Add(50)
161+
162+
CleanupInvalidCounters(ctx)
163+
164+
// Inflight invalid worker counters preserved
165+
_, exists := GetCounter(ctx, "inflight-invalid-worker")
166+
assert.True(t, exists)
167+
_, exists = GetTokenCounter(ctx, "inflight-invalid-worker")
168+
assert.True(t, exists)
169+
assert.Equal(t, uint64(1), inflightCounter.Get())
170+
assert.Equal(t, uint64(50), inflightTC.Get())
171+
})
142172
}
143173

144174
func TestEstimateTokens(t *testing.T) {
@@ -182,19 +212,35 @@ func TestCleanupUnhealthyCounter(t *testing.T) {
182212
ctx := context.Background()
183213
Init(&config.Config{}, nil)
184214

185-
// Add counters
186-
c := GetOrCreateCounter(ctx, "unhealthy-worker")
187-
c.Inc()
188-
tc := GetOrCreateTokenCounter(ctx, "unhealthy-worker")
189-
tc.Add(100)
215+
t.Run("counter preserved when inflight requests exist", func(t *testing.T) {
216+
c := GetOrCreateCounter(ctx, "unhealthy-worker-inflight")
217+
c.Inc()
218+
tc := GetOrCreateTokenCounter(ctx, "unhealthy-worker-inflight")
219+
tc.Add(100)
190220

191-
CleanupUnhealthyCounter(ctx, "unhealthy-worker")
221+
CleanupUnhealthyCounter(ctx, "unhealthy-worker-inflight")
192222

193-
// Verify cleanup
194-
_, exists := GetCounter(ctx, "unhealthy-worker")
195-
assert.False(t, exists)
196-
_, exists = GetTokenCounter(ctx, "unhealthy-worker")
197-
assert.False(t, exists)
223+
// Counter should be preserved (inflight requests)
224+
_, exists := GetCounter(ctx, "unhealthy-worker-inflight")
225+
assert.True(t, exists)
226+
_, exists = GetTokenCounter(ctx, "unhealthy-worker-inflight")
227+
assert.True(t, exists)
228+
assert.Equal(t, uint64(1), c.Get())
229+
assert.Equal(t, uint64(100), tc.Get())
230+
})
231+
232+
t.Run("counter deleted when no inflight requests", func(t *testing.T) {
233+
GetOrCreateCounter(ctx, "unhealthy-worker-idle")
234+
GetOrCreateTokenCounter(ctx, "unhealthy-worker-idle")
235+
236+
CleanupUnhealthyCounter(ctx, "unhealthy-worker-idle")
237+
238+
// Counter should be deleted (no inflight requests)
239+
_, exists := GetCounter(ctx, "unhealthy-worker-idle")
240+
assert.False(t, exists)
241+
_, exists = GetTokenCounter(ctx, "unhealthy-worker-idle")
242+
assert.False(t, exists)
243+
})
198244
}
199245

200246
func TestStartBackupCleanupTask(t *testing.T) {
@@ -215,3 +261,110 @@ func TestStartBackupCleanupTask(t *testing.T) {
215261
_, exists := GetCounter(ctx, "invalid-worker")
216262
assert.False(t, exists)
217263
}
264+
265+
func TestCounterLifecycle_UnhealthyAndReregister(t *testing.T) {
266+
ctx := context.Background()
267+
Init(&config.Config{}, &mockManagerAPI{})
268+
269+
url := "http://10.0.0.1:8080"
270+
271+
// 1. Simulate request arrival: Inc
272+
counter := GetOrCreateCounter(ctx, url)
273+
counter.Inc()
274+
assert.Equal(t, uint64(1), counter.Get())
275+
276+
tokenCounter := GetOrCreateTokenCounter(ctx, url)
277+
tokenCounter.Add(100)
278+
assert.Equal(t, uint64(100), tokenCounter.Get())
279+
280+
// 2. Instance becomes unhealthy → CleanupUnhealthyCounter (counter preserved due to inflight)
281+
CleanupUnhealthyCounter(ctx, url)
282+
283+
// Counter still exists, value unchanged
284+
sameCounter := GetOrCreateCounter(ctx, url)
285+
assert.Equal(t, counter, sameCounter) // same object
286+
assert.Equal(t, uint64(1), sameCounter.Get())
287+
288+
// 3. Inflight request completes → Release
289+
Release(ctx, url)
290+
assert.Equal(t, uint64(0), counter.Get())
291+
292+
ReleasePrefillTokens(ctx, url, "dummy message with 10 chars")
293+
294+
// 4. Another Release does not underflow
295+
Release(ctx, url)
296+
assert.Equal(t, uint64(0), counter.Get()) // stays 0, no underflow
297+
298+
// 5. Instance re-registers → new request Inc
299+
counter.Inc()
300+
assert.Equal(t, uint64(1), counter.Get())
301+
302+
// 6. Request completes → Release
303+
Release(ctx, url)
304+
assert.Equal(t, uint64(0), counter.Get()) // back to zero
305+
306+
// 7. Multiple concurrent requests full cycle
307+
counter.Inc()
308+
counter.Inc()
309+
counter.Inc()
310+
assert.Equal(t, uint64(3), counter.Get())
311+
Release(ctx, url)
312+
Release(ctx, url)
313+
Release(ctx, url)
314+
assert.Equal(t, uint64(0), counter.Get()) // back to zero
315+
}
316+
317+
func TestCounterLifecycle_CleanupBeforeRelease(t *testing.T) {
318+
ctx := context.Background()
319+
Init(&config.Config{}, &mockManagerAPI{})
320+
321+
url := "http://10.0.0.2:8080"
322+
323+
t.Run("cleanup deletes counter then release is no-op", func(t *testing.T) {
324+
// 1. Request arrives → counter=1
325+
counter := GetOrCreateCounter(ctx, url)
326+
counter.Inc()
327+
assert.Equal(t, uint64(1), counter.Get())
328+
329+
tc := GetOrCreateTokenCounter(ctx, url)
330+
tc.Add(200)
331+
332+
// 2. Request finishes → Release → counter=0
333+
Release(ctx, url)
334+
assert.Equal(t, uint64(0), counter.Get())
335+
336+
// 3. Cleanup runs, sees counter=0, deletes it
337+
CleanupUnhealthyCounter(ctx, url)
338+
_, exists := GetCounter(ctx, url)
339+
assert.False(t, exists) // counter deleted
340+
341+
// 4. A late/duplicate Release after cleanup should NOT create ghost counter
342+
Release(ctx, url)
343+
344+
// Verify no ghost counter was created
345+
_, exists = GetCounter(ctx, url)
346+
assert.False(t, exists, "Release should not create ghost counter after cleanup")
347+
})
348+
349+
t.Run("cleanup deletes token counter then ReleasePrefillTokens is no-op", func(t *testing.T) {
350+
Init(&config.Config{}, &mockManagerAPI{})
351+
tokenURL := "http://10.0.0.3:8080"
352+
353+
tc := GetOrCreateTokenCounter(ctx, tokenURL)
354+
tc.Add(200)
355+
356+
// Sub all tokens so counter=0
357+
tc.Sub(200)
358+
assert.Equal(t, uint64(0), tc.Get())
359+
360+
// Cleanup deletes the token counter
361+
CleanupUnhealthyCounter(ctx, tokenURL)
362+
_, exists := GetTokenCounter(ctx, tokenURL)
363+
assert.False(t, exists)
364+
365+
// Late ReleasePrefillTokens should not create ghost token counter
366+
ReleasePrefillTokens(ctx, tokenURL, "hello world")
367+
_, exists = GetTokenCounter(ctx, tokenURL)
368+
assert.False(t, exists, "ReleasePrefillTokens should not create ghost token counter after cleanup")
369+
})
370+
}

0 commit comments

Comments
 (0)