From 68d1264bb34ec8c290e2620b2884b424edabb764 Mon Sep 17 00:00:00 2001 From: Oren Gurfinkel Date: Tue, 12 Nov 2019 17:38:19 +0200 Subject: [PATCH 1/2] throthle by queue --- package.json | 1 + src/api/codefresh.api.js | 20 +++++++++++++++----- src/app.js | 6 ++++-- src/config.js | 2 ++ yarn.lock | 19 +++++++++++++++++++ 5 files changed, 41 insertions(+), 7 deletions(-) diff --git a/package.json b/package.json index e6487e0..3ed61b4 100644 --- a/package.json +++ b/package.json @@ -9,6 +9,7 @@ }, "dependencies": { "@codefresh-io/kube-integration": "1.6.0", + "better-queue": "^3.8.10", "bluebird": "^3.5.4", "body-parser": "^1.18.3", "cookie-parser": "~1.4.3", diff --git a/src/api/codefresh.api.js b/src/api/codefresh.api.js index 117269a..582189e 100644 --- a/src/api/codefresh.api.js +++ b/src/api/codefresh.api.js @@ -6,6 +6,7 @@ const logger = require('../logger'); const config = require('../config'); const MetadataFilter = require('../filters/MetadataFilter'); const statistics = require('../statistics'); +const queue = require('better-queue'); let metadataFilter; let counter; @@ -14,7 +15,7 @@ let eventsPackage = []; class CodefreshAPI { - constructor(kubernetes) { + constructor(kubernetes, opts) { this.kubernetes = kubernetes; this.initEvents = this.initEvents.bind(this); @@ -27,7 +28,10 @@ class CodefreshAPI { this.getMetadata = this.getMetadata.bind(this); this._request = this._request.bind(this); this._getIdentifyOptions = this._getIdentifyOptions.bind(this); - + this.queue = new queue({ + process: processEvent, + batchSize: _.get(opt, concurrent) + }) setInterval(this._sendPackage, 120 * 1000); } @@ -65,13 +69,19 @@ class CodefreshAPI { */ async sendEvents(payload) { - let data = _.cloneDeep(payload); - if (data.kind === 'Status') { - logger.debug(`Status: ${data.status}. Message: ${data.message}.`); + if (payload.kind === 'Status') { + logger.debug(`Status: ${payload.status}. Message: ${payload.message}.`); return; } + this.queue.push(payload); + } + + async processEvent(payload) { + + let data = _.cloneDeep(payload); + let filteredMetadata = metadataFilter ? metadataFilter.buildResponse(payload.object, payload.object.kind) : payload.object; // For release override configmap by release diff --git a/src/app.js b/src/app.js index 44e5d54..d886c1f 100644 --- a/src/app.js +++ b/src/app.js @@ -30,9 +30,11 @@ async function init() { accounts = null; logger.error(`Can't parse binded accounts. Only main account will be updating. Reason: ${error}`); } - + const opts = { + concurrent: config.k8sConcurrentCalls + }; const client = await clientFactory(); - const monitor = new Monitor(kubernetes); + const monitor = new Monitor(kubernetes, opts); const metadata = await monitor.getMetadata(); // Get instances for each resource and init cache for them diff --git a/src/config.js b/src/config.js index afb0ed8..77d0cbc 100644 --- a/src/config.js +++ b/src/config.js @@ -23,6 +23,8 @@ module.exports = { statisticsInterval: 60 * 1000 * 60, // 60 min stateInterval: 60 * 1000, // 1 min + k8sConcurrentCalls: process.env.K8S_CONCURRENT_CALLS, + port: 9020, logLevel: 'info', }; diff --git a/yarn.lock b/yarn.lock index 9270f6f..6f20a1b 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1059,6 +1059,20 @@ bcrypt-pbkdf@^1.0.0: dependencies: tweetnacl "^0.14.3" +better-queue-memory@^1.0.1: + version "1.0.4" + resolved "https://registry.yarnpkg.com/better-queue-memory/-/better-queue-memory-1.0.4.tgz#f390d6b30bb3b36aaf2ce52b37a483e8a7a81a22" + integrity sha512-SWg5wFIShYffEmJpI6LgbL8/3Dqhku7xI1oEiy6FroP9DbcZlG0ZDjxvPdP9t7hTGW40IpIcC6zVoGT1oxjOuA== + +better-queue@^3.8.10: + version "3.8.10" + resolved "https://registry.yarnpkg.com/better-queue/-/better-queue-3.8.10.tgz#1c93b9ec4cb3d1b72eb91d0efcb84fc80e8c6835" + integrity sha512-e3gwNZgDCnNWl0An0Tz6sUjKDV9m6aB+K9Xg//vYeo8+KiH8pWhLFxkawcXhm6FpM//GfD9IQv/kmvWCAVVpKA== + dependencies: + better-queue-memory "^1.0.1" + node-eta "^0.9.0" + uuid "^3.0.0" + bin-links@^1.1.0, bin-links@^1.1.2: version "1.1.2" resolved "https://registry.yarnpkg.com/bin-links/-/bin-links-1.1.2.tgz#fb74bd54bae6b7befc6c6221f25322ac830d9757" @@ -5620,6 +5634,11 @@ nice-try@^1.0.4: resolved "https://registry.yarnpkg.com/nice-try/-/nice-try-1.0.5.tgz#a3378a7696ce7d223e88fc9b764bd7ef1089e366" integrity sha512-1nh45deeb5olNY7eX82BkPO7SSxR5SSYJiPTrTdFUVYwAl8CKMA5N9PjTYkHiRjisVcxcQ1HXdLhx2qxxJzLNQ== +node-eta@^0.9.0: + version "0.9.0" + resolved "https://registry.yarnpkg.com/node-eta/-/node-eta-0.9.0.tgz#9fb0b099bcd2a021940e603c64254dc003d9a7a8" + integrity sha1-n7CwmbzSoCGUDmA8ZCVNwAPZp6g= + node-fetch-npm@^2.0.2: version "2.0.2" resolved "https://registry.yarnpkg.com/node-fetch-npm/-/node-fetch-npm-2.0.2.tgz#7258c9046182dca345b4208eda918daf33697ff7" From efe26526bce44244533a3daf5bba1de4359c9150 Mon Sep 17 00:00:00 2001 From: Oren Gurfinkel Date: Tue, 12 Nov 2019 21:24:53 +0200 Subject: [PATCH 2/2] move to promise queue --- package.json | 2 +- src/api/codefresh.api.js | 26 ++++++++++++++++++-------- yarn.lock | 24 +++++------------------- 3 files changed, 24 insertions(+), 28 deletions(-) diff --git a/package.json b/package.json index cbaee33..f1221c0 100644 --- a/package.json +++ b/package.json @@ -9,7 +9,6 @@ }, "dependencies": { "@codefresh-io/kube-integration": "1.6.0", - "better-queue": "^3.8.10", "bluebird": "^3.5.4", "body-parser": "^1.18.3", "cf-monitor": "^0.0.2", @@ -23,6 +22,7 @@ "kubernetes-client": "^6.6.1", "lodash": "^4.17.11", "morgan": "~1.9.0", + "promise-queue": "^2.2.5", "request": "^2.88.0", "request-promise": "^4.2.2", "winston": "^3.1.0" diff --git a/src/api/codefresh.api.js b/src/api/codefresh.api.js index da3181b..9a42020 100644 --- a/src/api/codefresh.api.js +++ b/src/api/codefresh.api.js @@ -7,10 +7,11 @@ const logger = require('../logger'); const config = require('../config'); const MetadataFilter = require('../filters/MetadataFilter'); const statistics = require('../statistics'); -const queue = require('better-queue'); +const queue = require('promise-queue'); let metadataFilter; let counter; +let messageCounter = 0; let eventsPackage = []; @@ -29,10 +30,7 @@ class CodefreshAPI { this.getMetadata = this.getMetadata.bind(this); this._request = this._request.bind(this); this._getIdentifyOptions = this._getIdentifyOptions.bind(this); - this.queue = new queue({ - process: processEvent, - batchSize: _.get(opt, concurrent) - }) + this.queue = new queue(Number.parseInt(_.get(opts, 'concurrent', '1'))); setInterval(this._sendPackage, 120 * 1000); } @@ -74,17 +72,25 @@ class CodefreshAPI { async sendEvents(payload) { + if (payload.kind === 'Status') { logger.debug(`Status: ${payload.status}. Message: ${payload.message}.`); return; } + console.log(`PUSHED : ${messageCounter}`); + messageCounter++; + await this.queue.add(this.processEvent(payload, messageCounter)); + + - this.queue.push(payload); } - async processEvent(payload) { + async processEvent(payload, id) { + + try { + let data = _.cloneDeep(payload); + console.log(`PROCESSED : ${id}`); - let data = _.cloneDeep(payload); let filteredMetadata = metadataFilter ? metadataFilter.buildResponse(payload.object, payload.object.kind) : payload.object; @@ -144,6 +150,10 @@ class CodefreshAPI { else { logger.info(`Skip packages sending - size ${eventsPackage.length}`); } + } catch (error) { + logger.error(error); + } + } async buildReleaseMetadata(payload) { diff --git a/yarn.lock b/yarn.lock index fb38ee1..a490c1b 100644 --- a/yarn.lock +++ b/yarn.lock @@ -929,20 +929,6 @@ bcrypt-pbkdf@^1.0.0: dependencies: tweetnacl "^0.14.3" -better-queue-memory@^1.0.1: - version "1.0.4" - resolved "https://registry.yarnpkg.com/better-queue-memory/-/better-queue-memory-1.0.4.tgz#f390d6b30bb3b36aaf2ce52b37a483e8a7a81a22" - integrity sha512-SWg5wFIShYffEmJpI6LgbL8/3Dqhku7xI1oEiy6FroP9DbcZlG0ZDjxvPdP9t7hTGW40IpIcC6zVoGT1oxjOuA== - -better-queue@^3.8.10: - version "3.8.10" - resolved "https://registry.yarnpkg.com/better-queue/-/better-queue-3.8.10.tgz#1c93b9ec4cb3d1b72eb91d0efcb84fc80e8c6835" - integrity sha512-e3gwNZgDCnNWl0An0Tz6sUjKDV9m6aB+K9Xg//vYeo8+KiH8pWhLFxkawcXhm6FpM//GfD9IQv/kmvWCAVVpKA== - dependencies: - better-queue-memory "^1.0.1" - node-eta "^0.9.0" - uuid "^3.0.0" - bin-links@^1.1.0, bin-links@^1.1.2: version "1.1.2" resolved "https://registry.yarnpkg.com/bin-links/-/bin-links-1.1.2.tgz#fb74bd54bae6b7befc6c6221f25322ac830d9757" @@ -4939,11 +4925,6 @@ nice-try@^1.0.4: version "1.0.5" resolved "https://registry.yarnpkg.com/nice-try/-/nice-try-1.0.5.tgz#a3378a7696ce7d223e88fc9b764bd7ef1089e366" -node-eta@^0.9.0: - version "0.9.0" - resolved "https://registry.yarnpkg.com/node-eta/-/node-eta-0.9.0.tgz#9fb0b099bcd2a021940e603c64254dc003d9a7a8" - integrity sha1-n7CwmbzSoCGUDmA8ZCVNwAPZp6g= - node-fetch-npm@^2.0.2: version "2.0.2" resolved "https://registry.yarnpkg.com/node-fetch-npm/-/node-fetch-npm-2.0.2.tgz#7258c9046182dca345b4208eda918daf33697ff7" @@ -5957,6 +5938,11 @@ promise-inflight@^1.0.1, promise-inflight@~1.0.1: version "1.0.1" resolved "https://registry.yarnpkg.com/promise-inflight/-/promise-inflight-1.0.1.tgz#98472870bf228132fcbdd868129bad12c3c029e3" +promise-queue@^2.2.5: + version "2.2.5" + resolved "https://registry.yarnpkg.com/promise-queue/-/promise-queue-2.2.5.tgz#2f6f5f7c0f6d08109e967659c79b88a9ed5e93b4" + integrity sha1-L29ffA9tCBCelnZZx5uIqe1ek7Q= + promise-retry@^1.1.1: version "1.1.1" resolved "https://registry.yarnpkg.com/promise-retry/-/promise-retry-1.1.1.tgz#6739e968e3051da20ce6497fb2b50f6911df3d6d"