Skip to content

Commit 1c8262f

Browse files
authored
Merge pull request #172 from deploymenttheory/dev
Refactored semaphore permit management
2 parents e20d59b + daba3c8 commit 1c8262f

File tree

5 files changed

+235
-145
lines changed

5 files changed

+235
-145
lines changed

concurrency/const.go

+45-13
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,54 @@ package concurrency
44
import "time"
55

66
const (
7-
// MaxAcceptableTTFB represents the maximum acceptable Time to First Byte (TTFB) in milliseconds.
8-
// TTFB is the time taken for the server to start sending the first byte of data in response to a request.
9-
// Adjustments in concurrency will be made if the TTFB exceeds this threshold.
7+
// Concurrency constants define parameters related to managing concurrent requests.
8+
9+
// MaxConcurrency represents the maximum number of concurrent requests the system is designed to handle safely.
10+
MaxConcurrency = 10
11+
12+
// MinConcurrency is the minimum number of concurrent requests that the system will maintain,
13+
// even under low traffic conditions or when scaling down due to low resource utilization.
14+
MinConcurrency = 1
15+
16+
// EvaluationInterval specifies the frequency at which the system evaluates its performance metrics
17+
// to make decisions about scaling concurrency up or down.
18+
EvaluationInterval = 1 * time.Minute
19+
20+
// Threshold constants define critical operational metrics for adjusting concurrency.
21+
22+
// MaxAcceptableTTFB (Time to First Byte) is the threshold for the longest acceptable delay
23+
// between making a request and receiving the first byte of data in the response. If response
24+
// times exceed this threshold, it indicates potential performance issues, and the system may
25+
// scale down concurrency to reduce load on the server.
1026
MaxAcceptableTTFB = 300 * time.Millisecond
1127

12-
// MaxAcceptableThroughput represents the maximum acceptable network throughput in bytes per second.
13-
// Throughput is the amount of data transferred over the network within a specific time interval.
14-
// Adjustments in concurrency will be made if the network throughput exceeds this threshold.
15-
MaxAcceptableThroughput = 5 * 1024 * 1024
28+
// MaxAcceptableThroughput is the threshold for the maximum network data transfer rate. If the
29+
// system's throughput exceeds this value, it may be an indicator of high traffic demanding
30+
// significant bandwidth, which could warrant a scale-up in concurrency to maintain performance.
31+
MaxAcceptableThroughput = 5 * 1024 * 1024 // 5 MBps
1632

17-
// MaxAcceptableResponseTimeVariability represents the maximum acceptable variability in response times.
18-
// It is used as a threshold to dynamically adjust concurrency based on fluctuations in response times.
33+
// MaxAcceptableResponseTimeVariability is the threshold for the maximum allowed variability or
34+
// fluctuations in response times. A high variability often indicates an unstable system, which
35+
// could trigger a scale-down to allow the system to stabilize.
1936
MaxAcceptableResponseTimeVariability = 500 * time.Millisecond
2037

21-
// ErrorRateThreshold represents the threshold for error rate above which concurrency will be adjusted.
22-
// Error rate is calculated as (TotalRateLimitErrors + 5xxErrors) / TotalRequests.
23-
// Adjustments in concurrency will be made if the error rate exceeds this threshold. A threshold of 0.1 (or 10%) is common.
24-
ErrorRateThreshold = 0.1
38+
// ErrorRateThreshold is the maximum acceptable rate of error responses (such as rate-limit errors
39+
// and 5xx server errors) compared to the total number of requests. Exceeding this threshold suggests
40+
// the system is encountering issues that may be alleviated by scaling down concurrency.
41+
ErrorRateThreshold = 0.1 // 10% error rate
42+
43+
// RateLimitCriticalThreshold defines the number of available rate limit slots considered critical.
44+
// Falling at or below this threshold suggests the system is close to hitting the rate limit enforced
45+
// by the external service, and it should scale down to prevent rate-limiting errors.
46+
RateLimitCriticalThreshold = 5
47+
48+
// ErrorResponseThreshold is the threshold for the error rate that, once exceeded, indicates the system
49+
// should consider scaling down. It is a ratio of the number of error responses to the total number of
50+
// requests, reflecting the health of the interaction with the external system.
51+
ErrorResponseThreshold = 0.2 // 20% error rate
52+
53+
// ResponseTimeCriticalThreshold is the duration beyond which the response time is considered critically
54+
// high. If response times exceed this threshold, it could signal that the system or the external service
55+
// is under heavy load and may benefit from scaling down concurrency to alleviate pressure.
56+
ResponseTimeCriticalThreshold = 2 * time.Second
2557
)

concurrency/handler.go

+1-8
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,6 @@ import (
88
"github.com/deploymenttheory/go-api-http-client/logger"
99
)
1010

11-
// Constants and Data Structures:
12-
const (
13-
MaxConcurrency = 10 // Maximum allowed concurrent requests
14-
MinConcurrency = 1 // Minimum allowed concurrent requests
15-
EvaluationInterval = 1 * time.Minute // Time interval for evaluating metrics and adjusting concurrency
16-
)
17-
1811
// ConcurrencyHandler controls the number of concurrent HTTP requests.
1912
type ConcurrencyHandler struct {
2013
sem chan struct{}
@@ -30,7 +23,7 @@ type ConcurrencyMetrics struct {
3023
TotalRequests int64 // Total number of requests made
3124
TotalRetries int64 // Total number of retry attempts
3225
TotalRateLimitErrors int64 // Total number of rate limit errors encountered
33-
TokenWaitTime time.Duration // Total time spent waiting for tokens
26+
PermitWaitTime time.Duration // Total time spent waiting for tokens
3427
TTFB struct { // Metrics related to Time to First Byte (TTFB)
3528
Total time.Duration // Total Time to First Byte (TTFB) for all requests
3629
Count int64 // Count of requests used for calculating TTFB

concurrency/metrics.go

+90-42
Original file line numberDiff line numberDiff line change
@@ -10,62 +10,110 @@ import (
1010
"go.uber.org/zap"
1111
)
1212

13-
// EvaluateAndAdjustConcurrency evaluates the HTTP response from a server along with the request's response time
14-
// and adjusts the concurrency level of the system accordingly. It utilizes three monitoring functions:
15-
// MonitorRateLimitHeaders, MonitorServerResponseCodes, and MonitorResponseTimeVariability, each of which
16-
// provides feedback on different aspects of the response and system's current state. The function aggregates
17-
// feedback from these monitoring functions to make a decision on whether to scale up or scale down the concurrency.
18-
// The decision is based on a simple majority of suggestions: if more functions suggest scaling down (return -1),
19-
// it scales down; if more suggest scaling up (return 1), it scales up. This method centralizes concurrency control
20-
// decision-making, providing a systematic approach to managing request handling capacity based on real-time
21-
// operational metrics.
13+
// Defined weights for the metrics
14+
var metricWeights = map[string]float64{
15+
"RateLimit": 5.0, // High importance
16+
"ServerError": 3.0, // High importance
17+
"ResponseTime": 1.0, // Lower importance
18+
}
19+
20+
// EvaluateAndAdjustConcurrency assesses the current state of system metrics and decides whether to scale
21+
// up or down the number of concurrent operations allowed. It employs a combination of strategies:
22+
// a weighted scoring system, threshold-based direct actions, and cumulative impact assessment.
23+
//
24+
// A weighted scoring system is used to prioritize the importance of different system metrics. Each metric
25+
// can influence the scaling decision based on its assigned weight, reflecting its relative impact on system performance.
26+
//
27+
// Threshold-based scaling provides a fast-track decision path for critical metrics that have exceeded predefined limits.
28+
// If a critical metric, such as the rate limit remaining slots or server error rates, crosses a specified threshold,
29+
// immediate action is taken to scale down the concurrency to prevent system overload.
30+
//
31+
// Cumulative impact assessment calculates a cumulative score from all monitored metrics, taking into account
32+
// their respective weights. This score determines the overall tendency of the system to either scale up or down.
33+
// If the score indicates a negative trend (i.e., below zero), the system will scale down to reduce load.
34+
// Conversely, a positive score suggests that there is capacity to handle more concurrent operations, leading
35+
// to a scale-up decision.
2236
//
2337
// Parameters:
38+
// - resp: The HTTP response received from the server, providing status codes and headers for rate limiting.
39+
// - responseTime: The time duration between sending the request and receiving the response, indicating the server's responsiveness.
2440
//
25-
// resp - The HTTP response received from the server.
26-
// responseTime - The time duration between sending the request and receiving the response.
41+
// The function logs the decision process at each step, providing traceability and insight into the scaling mechanism.
42+
// The method should be called after each significant interaction with the external system (e.g., an HTTP request) to
43+
// ensure concurrency levels are adapted to current conditions.
2744
//
28-
// It logs the specific reason for scaling decisions, helping in traceability and fine-tuning system performance.
45+
// Returns: None. The function directly calls the ScaleUp or ScaleDown methods as needed.
46+
//
47+
// Note: This function does not return any value; it performs actions based on internal assessments and logs outcomes.
2948
func (ch *ConcurrencyHandler) EvaluateAndAdjustConcurrency(resp *http.Response, responseTime time.Duration) {
30-
// Call monitoring functions
3149
rateLimitFeedback := ch.MonitorRateLimitHeaders(resp)
3250
responseCodeFeedback := ch.MonitorServerResponseCodes(resp)
3351
responseTimeFeedback := ch.MonitorResponseTimeVariability(responseTime)
3452

35-
// Log the feedback from each monitoring function for debugging
36-
ch.logger.Debug("Concurrency Adjustment Feedback",
37-
zap.Int("RateLimitFeedback", rateLimitFeedback),
38-
zap.Int("ResponseCodeFeedback", responseCodeFeedback),
39-
zap.Int("ResponseTimeFeedback", responseTimeFeedback))
40-
41-
// Determine overall action based on feedback
42-
suggestions := []int{rateLimitFeedback, responseCodeFeedback, responseTimeFeedback}
43-
scaleDownCount := 0
44-
scaleUpCount := 0
45-
46-
for _, suggestion := range suggestions {
47-
switch suggestion {
48-
case -1:
49-
scaleDownCount++
50-
case 1:
51-
scaleUpCount++
52-
}
53+
// Use weighted scores for each metric.
54+
weightedRateLimitScore := float64(rateLimitFeedback) * metricWeights["RateLimit"]
55+
weightedResponseCodeScore := float64(responseCodeFeedback) * metricWeights["ServerError"]
56+
weightedResponseTimeScore := float64(responseTimeFeedback) * metricWeights["ResponseTime"]
57+
58+
// Calculate the cumulative score.
59+
cumulativeScore := weightedRateLimitScore + weightedResponseCodeScore + weightedResponseTimeScore
60+
61+
// Log the feedback from each monitoring function for debugging.
62+
ch.logger.Debug("Evaluate and Adjust Concurrency",
63+
zap.String("event", "EvaluateConcurrency"),
64+
zap.Float64("weightedRateLimitScore", weightedRateLimitScore),
65+
zap.Float64("weightedResponseCodeScore", weightedResponseCodeScore),
66+
zap.Float64("weightedResponseTimeScore", weightedResponseTimeScore),
67+
zap.Float64("cumulativeScore", cumulativeScore),
68+
zap.Int("rateLimitFeedback", rateLimitFeedback),
69+
zap.Int("responseCodeFeedback", responseCodeFeedback),
70+
zap.Int("responseTimeFeedback", responseTimeFeedback),
71+
zap.Duration("responseTime", responseTime),
72+
)
73+
74+
// Check critical thresholds
75+
if rateLimitFeedback <= RateLimitCriticalThreshold || weightedResponseCodeScore >= ErrorResponseThreshold {
76+
ch.logger.Warn("Scaling down due to critical threshold breach",
77+
zap.String("event", "CriticalThresholdBreach"),
78+
zap.Int("rateLimitFeedback", rateLimitFeedback),
79+
zap.Float64("errorResponseRate", weightedResponseCodeScore),
80+
)
81+
ch.ScaleDown()
82+
return
5383
}
5484

55-
// Log the counts for scale down and up suggestions
56-
ch.logger.Info("Scaling Decision Counts",
57-
zap.Int("ScaleDownCount", scaleDownCount),
58-
zap.Int("ScaleUpCount", scaleUpCount))
59-
60-
// Decide on scaling action
61-
if scaleDownCount > scaleUpCount {
62-
ch.logger.Info("Scaling down the concurrency", zap.String("Reason", "More signals suggested to decrease concurrency"))
85+
// Evaluate cumulative impact and make a scaling decision.
86+
if cumulativeScore < 0 {
87+
utilizedBefore := len(ch.sem) // Tokens in use before scaling down.
6388
ch.ScaleDown()
64-
} else if scaleUpCount > scaleDownCount {
65-
ch.logger.Info("Scaling up the concurrency", zap.String("Reason", "More signals suggested to increase concurrency"))
89+
utilizedAfter := len(ch.sem) // Tokens in use after scaling down.
90+
ch.logger.Info("Concurrency scaling decision: scale down.",
91+
zap.Float64("cumulativeScore", cumulativeScore),
92+
zap.Int("utilizedTokensBefore", utilizedBefore),
93+
zap.Int("utilizedTokensAfter", utilizedAfter),
94+
zap.Int("availableTokensBefore", cap(ch.sem)-utilizedBefore),
95+
zap.Int("availableTokensAfter", cap(ch.sem)-utilizedAfter),
96+
zap.String("reason", "Cumulative impact of metrics suggested an overload."),
97+
)
98+
} else if cumulativeScore > 0 {
99+
utilizedBefore := len(ch.sem) // Tokens in use before scaling up.
66100
ch.ScaleUp()
101+
utilizedAfter := len(ch.sem) // Tokens in use after scaling up.
102+
ch.logger.Info("Concurrency scaling decision: scale up.",
103+
zap.Float64("cumulativeScore", cumulativeScore),
104+
zap.Int("utilizedTokensBefore", utilizedBefore),
105+
zap.Int("utilizedTokensAfter", utilizedAfter),
106+
zap.Int("availableTokensBefore", cap(ch.sem)-utilizedBefore),
107+
zap.Int("availableTokensAfter", cap(ch.sem)-utilizedAfter),
108+
zap.String("reason", "Metrics indicate available resources to handle more load."),
109+
)
67110
} else {
68-
ch.logger.Info("No change in concurrency", zap.String("Reason", "Equal signals for both scaling up and down"))
111+
ch.logger.Info("Concurrency scaling decision: no change.",
112+
zap.Float64("cumulativeScore", cumulativeScore),
113+
zap.Int("currentUtilizedTokens", len(ch.sem)),
114+
zap.Int("currentAvailableTokens", cap(ch.sem)-len(ch.sem)),
115+
zap.String("reason", "Metrics are stable, maintaining current concurrency level."),
116+
)
69117
}
70118
}
71119

0 commit comments

Comments
 (0)