-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathratelimiter.go
195 lines (183 loc) · 5.29 KB
/
ratelimiter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
package irdmtools
import (
"fmt"
"io"
"net/http"
"os"
"strconv"
"time"
)
// RateLimit holds the values used to play nice with OAI-PMH or REST API.
// It normally is extracted from the response header.
type RateLimit struct {
// Limit maps to X-RateLimit-Limit
Limit int `json:"limit,omitempty"`
// OldLimit holds the last value of rate limit before change.
OldLimit int `json:"-"`
// Remaining maps to X-RateLimit-Remaining
Remaining int `json:"remaining,omitempty"`
// Reset maps to X-RateLimit-Reset
Reset int `json:"reset,omitempty"`
}
// FromResponse takes an http.Response struct and extracts
// the header values realated to rate limits (e.g. X-RateLite-Limit)
//
// ```
// rl := new(RateLimit)
// rl.FromResponse(response)
// ```
func (rl *RateLimit) FromResponse(resp *http.Response) {
l := resp.Header.Values("X-RateLimit-Limit")
if len(l) > 0 {
if val, err := strconv.Atoi(l[0]); err == nil {
if val != rl.OldLimit {
rl.OldLimit = rl.Limit
}
rl.Limit = val
} else {
rl.Limit = 0
}
}
l = resp.Header.Values("X-RateLimit-Remaining")
if len(l) > 0 {
if val, err := strconv.Atoi(l[0]); err == nil {
rl.Remaining = val
} else {
rl.Remaining = 0
}
}
l = resp.Header.Values("X-RateLimit-Reset")
if len(l) > 0 {
if val, err := strconv.Atoi(l[0]); err == nil {
rl.Reset = val
} else {
rl.Reset = 0
}
}
}
// FromHeader takes an http.Header (e.g. http.Response.Header)
// and updates a rate limit struct.
//
// ```
// rl := new(RateLimit)
// rl.FromHeader(header)
// ```
func (rl *RateLimit) FromHeader(header http.Header) {
l := header.Values("X-RateLimit-Limit")
if len(l) > 0 {
if val, err := strconv.Atoi(l[0]); err == nil {
if val != rl.OldLimit {
rl.OldLimit = rl.Limit
}
rl.Limit = val
} else {
rl.Limit = 0
}
}
l = header.Values("X-RateLimit-Remaining")
if len(l) > 0 {
if val, err := strconv.Atoi(l[0]); err == nil {
rl.Remaining = val
} else {
rl.Remaining = 0
}
}
l = header.Values("X-RateLimit-Reset")
if len(l) > 0 {
if val, err := strconv.Atoi(l[0]); err == nil {
rl.Reset = val
} else {
rl.Reset = 0
}
}
}
func (rl *RateLimit) ResetString() string {
var s string
if rl.Reset > 0 {
resetTime := time.Unix(int64(rl.Reset), 0)
s = fmt.Sprintf("reset in %s at %s", resetTime.Sub(time.Now()).Truncate(time.Second), resetTime.Format("03:04PM"))
}
return s
}
func (rl *RateLimit) String() string {
return fmt.Sprintf("limits %d/%d, %s", rl.Remaining, rl.Limit, rl.ResetString())
}
func (rl *RateLimit) Fprintf(out io.Writer) {
fmt.Fprintln(out, rl.String())
}
// SecondsToWait returns the number of seconds (as a time.Duratin) to wait to avoid
// a http status code 429 and a ratio (float64) of remaining per request limit.
//
// ```
// rl := new(RateLimit)
// rl.FromHeader(response.Header)
// timeToWait := rl.TimeToWait()
// time.Sleep(timeToWait)
// ```
func (rl *RateLimit) TimeToWait(unit time.Duration) time.Duration {
return time.Duration(int64(unit) / int64(rl.Limit))
}
func (rl *RateLimit) TimeToReset() (time.Duration, time.Time) {
resetTime := time.Unix(int64(rl.Reset), 0)
return resetTime.Sub(time.Now()), resetTime
}
// Throttle looks at the rate limit structure and implements
// an appropriate sleep time based on rate limits.
//
// ```
//
// i, tot := 0, 1000 // This ith' iteration and total number of records
// rl := new(RateLimit)
// // Set our rate limit from
// rl.FromResponse(response)
// rl.Throttle(i, tot)
//
// ```
func (rl *RateLimit) Throttle(i int, tot int) {
var speedBump time.Duration
// NOTE: 5000 per hour rate from some RDM API
// 500 per minutes for others. We need to throttle accordingly
// An hout == 3600 seconds, a minute is 60 seconds.
//
// wait time = time unit / request limit
//
if tot == 1 || tot >= 5000 {
// NOTE: Picking slower of the two rate limits, otherwise I stalling for an hour
// at each 5000 records retrieved.
speedBump = time.Duration(int64(time.Hour) / int64(5000))
} else if rl.Limit == 5000 {
// Slow down to Rate Limit is 5000 per hour
speedBump = time.Duration(int64(time.Hour) / int64(rl.Limit))
} else if rl.Limit > 0 {
// Restart with Rate Limit is 500 per minute
speedBump = time.Duration(int64(time.Minute) / int64(rl.Limit))
} else {
// Default rate limit of one per second
speedBump = time.Second
}
if rl.OldLimit != rl.Limit {
if rl.OldLimit > 0 {
timeUntilReset, resetAt := rl.TimeToReset()
// We're throttled for whichever is further in the future plus some padding
timeUntilReset = timeUntilReset + (10 * time.Second)
fmt.Fprintf(os.Stderr, "limits changed, waiting %s for reset (%s) before continuing (%d/%d)\n", timeUntilReset.Truncate(time.Second), resetAt.Format("3:04PM"), i, tot)
time.Sleep(timeUntilReset)
}
// Update the old limit
rl.OldLimit = rl.Limit
} else {
callsRemaining := 0.0
if rl.Limit > 0 {
callsRemaining = float64(rl.Remaining) / float64(rl.Limit)
}
if callsRemaining <= 0.25 {
timeUntilReset, resetAt := rl.TimeToReset()
// We're throttled for whichever is further in the future plus some padding
timeUntilReset = timeUntilReset + (10 * time.Second)
fmt.Fprintf(os.Stderr, "waiting %s for reset (%s) before continuing (%d/%d)\n", timeUntilReset.Truncate(time.Second), resetAt.Format("3:04PM"), i, tot)
time.Sleep(timeUntilReset)
} else {
time.Sleep(speedBump)
}
}
}