diff --git a/package.json b/package.json index 66b4100..f1221c0 100644 --- a/package.json +++ b/package.json @@ -22,6 +22,7 @@ "kubernetes-client": "^6.6.1", "lodash": "^4.17.11", "morgan": "~1.9.0", + "promise-queue": "^2.2.5", "request": "^2.88.0", "request-promise": "^4.2.2", "winston": "^3.1.0" diff --git a/src/api/codefresh.api.js b/src/api/codefresh.api.js index 440c599..9a42020 100644 --- a/src/api/codefresh.api.js +++ b/src/api/codefresh.api.js @@ -7,15 +7,17 @@ const logger = require('../logger'); const config = require('../config'); const MetadataFilter = require('../filters/MetadataFilter'); const statistics = require('../statistics'); +const queue = require('promise-queue'); let metadataFilter; let counter; +let messageCounter = 0; let eventsPackage = []; class CodefreshAPI { - constructor(kubernetes) { + constructor(kubernetes, opts) { this.kubernetes = kubernetes; this.initEvents = this.initEvents.bind(this); @@ -28,7 +30,7 @@ class CodefreshAPI { this.getMetadata = this.getMetadata.bind(this); this._request = this._request.bind(this); this._getIdentifyOptions = this._getIdentifyOptions.bind(this); - + this.queue = new queue(Number.parseInt(_.get(opts, 'concurrent', '1'))); setInterval(this._sendPackage, 120 * 1000); } @@ -69,12 +71,26 @@ class CodefreshAPI { */ async sendEvents(payload) { - let data = _.cloneDeep(payload); - if (data.kind === 'Status') { - logger.debug(`Status: ${data.status}. Message: ${data.message}.`); + + if (payload.kind === 'Status') { + logger.debug(`Status: ${payload.status}. Message: ${payload.message}.`); return; } + console.log(`PUSHED : ${messageCounter}`); + messageCounter++; + await this.queue.add(this.processEvent(payload, messageCounter)); + + + + } + + async processEvent(payload, id) { + + try { + let data = _.cloneDeep(payload); + console.log(`PROCESSED : ${id}`); + let filteredMetadata = metadataFilter ? metadataFilter.buildResponse(payload.object, payload.object.kind) : payload.object; @@ -134,6 +150,10 @@ class CodefreshAPI { else { logger.info(`Skip packages sending - size ${eventsPackage.length}`); } + } catch (error) { + logger.error(error); + } + } async buildReleaseMetadata(payload) { diff --git a/src/app.js b/src/app.js index a052808..4c870dd 100644 --- a/src/app.js +++ b/src/app.js @@ -32,9 +32,11 @@ async function init() { logger.error(`Can't parse binded accounts. Only main account will be updating. Reason: ${error}`); newRelicMonitor.noticeError(error); } - + const opts = { + concurrent: config.k8sConcurrentCalls + }; const client = await clientFactory(); - const monitor = new Monitor(kubernetes); + const monitor = new Monitor(kubernetes, opts); const metadata = await monitor.getMetadata(); // Get instances for each resource and init cache for them diff --git a/src/config.js b/src/config.js index c9dbdce..f73d17b 100644 --- a/src/config.js +++ b/src/config.js @@ -25,6 +25,7 @@ module.exports = { statisticsInterval: 60 * 1000 * 60, // 60 min stateInterval: 60 * 1000, // 1 min + k8sConcurrentCalls: process.env.K8S_CONCURRENT_CALLS, newrelic: { license_key: process.env.NEWRELIC_LICENSE_KEY }, diff --git a/yarn.lock b/yarn.lock index 85497f4..a490c1b 100644 --- a/yarn.lock +++ b/yarn.lock @@ -5938,6 +5938,11 @@ promise-inflight@^1.0.1, promise-inflight@~1.0.1: version "1.0.1" resolved "https://registry.yarnpkg.com/promise-inflight/-/promise-inflight-1.0.1.tgz#98472870bf228132fcbdd868129bad12c3c029e3" +promise-queue@^2.2.5: + version "2.2.5" + resolved "https://registry.yarnpkg.com/promise-queue/-/promise-queue-2.2.5.tgz#2f6f5f7c0f6d08109e967659c79b88a9ed5e93b4" + integrity sha1-L29ffA9tCBCelnZZx5uIqe1ek7Q= + promise-retry@^1.1.1: version "1.1.1" resolved "https://registry.yarnpkg.com/promise-retry/-/promise-retry-1.1.1.tgz#6739e968e3051da20ce6497fb2b50f6911df3d6d"