-
Notifications
You must be signed in to change notification settings - Fork 0
/
RateLimiter.js
145 lines (130 loc) · 4.18 KB
/
RateLimiter.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
const EventEmitter = require('events');
const ONE_SECOND = 1000;
const RECENT_JOB_CRITERIA = (now, endDate) => now - endDate <= ONE_SECOND
const MAKE_RECENT_JOB_CHECKER = () => {
const now = new Date();
return RECENT_JOB_CRITERIA.bind(null, now);
};
/**
* Counts jobs per second to provide stats and commit to rating limit.
* @fires stats
* @fires drain
*/
class RateLimiter extends EventEmitter {
/**
* Instantiate a RateLimiter.
* @param {Object} opts
* @param {number} opts.rateLimit - Maximum number of jobs to be run per second. If `null`, jobs will be run sequentially.
* @param {number} opts.statsInterval - interval for emitting `stats`, in ms
* @param {SimpleNodeLogger} opts.log - instance of simple-node-logger (optional)
*/
constructor(opts) {
super(opts);
this.opts = Object.assign({}, opts);
this.log = this.opts.log;
this.running = 0; // number of jobs that are currently running
this.recentJobs = []; // end dates of jobs ended <=1 second ago
this.processedJobs = 0; // number of jobs processed since last call to start()
this.statsInterval = null; // will hold the interval that emits `stats` events
this.timeOfLastStart = null; // will hold the time of last call to start()
}
/**
* @returns array of end dates of jobs ended <=1 second ago.
* @private
*/
_cleanRecentJobs() {
return this.recentJobs.filter(MAKE_RECENT_JOB_CHECKER());
}
/**
* adds the date of last ended job in this.recentJobs, after cleaning.
* @private
*/
_appendEndedJob() {
this.recentJobs = this._cleanRecentJobs().concat([ new Date() ]);
}
/**
* emit a `stats` event
* @private
*/
_stats() {
this.log.trace('RateLimiter ⚡️ stats');
/**
* Fired every `opts.statsInterval` milliseconds, to tell how many jobs are processed per second.
* @event stats
* @memberof RateLimiter
* @type {object}
* @property {number} nbJobsPerSecond - number of jobs that are processed per second
*/
this.emit('stats', {
nbJobsPerSecond: ONE_SECOND * this.processedJobs / (new Date() - this.timeOfLastStart)
});
}
/**
* Toggles the interval that emits `stats` events.
* @private
* @param {boolean} enable - true will (re)start the interval, false will stop it.
*/
toggle(enable) {
this.log.trace('RateLimiter:toggle ', enable || 'false');
if (!!enable === !!this.statsInterval) return;
if (this.statsInterval) {
clearInterval(this.statsInterval)
this.statsInterval = null;
}
if (enable) {
this.timeOfLastStart = new Date();
this.processedJobs = 0;
this.statsInterval = setInterval(this._stats.bind(this), this.opts.statsInterval);
}
}
/**
* Informs the RateLimiter that a job has just started
*/
jobStarted() {
++this.running;
this.log.trace('RateLimiter:jobStarted => running: ', this.running || '0');
++this.processedJobs;
}
/**
* Informs the RateLimiter that a job has just ended
*/
jobEnded() {
--this.running;
this._appendEndedJob(); // mutates this.recentJobs
this.log.trace('RateLimiter:jobEnded => running: ', this.running || '0');
if (this.running === 0) {
this.log.trace('RateLimiter ⚡️ drain');
/**
* Fired every when RateLimiter is not running any jobs.
* @event drain
* @memberof RateLimiter
*/
this.emit('drain');
}
}
/**
* determines whether or not it's possible to start another job now, according to rate limits.
* @returns true if it's possible to start another job now
*/
canRunMore() {
if (this.opts.rateLimit === null) {
return this.running === 0; // run jobs sequentially, without applying rate limit
} else {
const nbJobsEndedDuringLastSecond = this._cleanRecentJobs().length;
return this.running + nbJobsEndedDuringLastSecond < this.opts.rateLimit;
}
}
/**
* @returns a promise that resolves when all jobs ended running.
*/
waitForDrain() {
return new Promise((resolve, reject) => {
if (this.running === 0) {
resolve();
} else {
this.once('drain', resolve);
}
});
}
}
module.exports = RateLimiter;