diff --git a/server/dockerfile b/server/dockerfile index febd9a6..c775bd4 100644 --- a/server/dockerfile +++ b/server/dockerfile @@ -9,6 +9,7 @@ COPY package*.json ./ # Only copy package.json to avoid copying package-lock.json with local registry paths #COPY package.json ./ +ENV KAFKA_BROKER=${KAFKA_BROKER} ENV CONTROLLER_USER=${CONTROLLER_USER} ENV CONTROLLER_PASSWORD=${CONTROLLER_PASSWORD} ENV DEVICE_USER=${DEVICE_USER} diff --git a/server/index.js b/server/index.js index 348b1ba..b6b49eb 100644 --- a/server/index.js +++ b/server/index.js @@ -6,6 +6,7 @@ var http = require('http'); var oas3Tools = require('oas3-tools'); var appCommons = require('onf-core-model-ap/applicationPattern/commons/AppCommons'); +const kafkaClient = require("./service/individualServices/KafkaClient"); const logger = require('./service/LoggingService.js').getLogger(); var serverPort = 9092; @@ -35,6 +36,9 @@ http.createServer(app).listen(serverPort, function () { global.databasePath = './database/load.json' +// connect to Kafka +kafkaClient.connect(); + // 1-integrate-loadfile // 3-integrate-authorization diff --git a/server/package-lock.json b/server/package-lock.json index 79e0c29..f766729 100644 --- a/server/package-lock.json +++ b/server/package-lock.json @@ -13,6 +13,7 @@ "connect": "^3.2.0", "eventsource": "^2.0.2", "js-yaml": "^3.3.0", + "kafkajs": "^2.2.4", "oas3-tools": "^2.2.3", "onf-core-model-ap": "2.1.2", "onf-core-model-ap-bs": "2.1.2", @@ -3900,6 +3901,14 @@ "node": ">=6" } }, + "node_modules/kafkajs": { + "version": "2.2.4", + "resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz", + "integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==", + "engines": { + "node": ">=14.0.0" + } + }, "node_modules/kleur": { "version": "3.0.3", "resolved": "https://registry.npmjs.org/kleur/-/kleur-3.0.3.tgz", @@ -8743,6 +8752,11 @@ "integrity": "sha512-XmOWe7eyHYH14cLdVPoyg+GOH3rYX++KpzrylJwSW98t3Nk+U8XOl8FWKOgwtzdb8lXGf6zYwDUzeHMWfxasyg==", "dev": true }, + "kafkajs": { + "version": "2.2.4", + "resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz", + "integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==" + }, "kleur": { "version": "3.0.3", "resolved": "https://registry.npmjs.org/kleur/-/kleur-3.0.3.tgz", diff --git a/server/package.json b/server/package.json index aa06e6c..84ed321 100644 --- a/server/package.json +++ b/server/package.json @@ -4,9 +4,9 @@ "description": "No description provided (generated by Swagger Codegen https://github.com/swagger-api/swagger-codegen)", "main": "index.js", "scripts": { - "prestart": "npm install", - "start": "node index.js", - "test": "jest" + "prestart": "npm install", + "start": "node index.js", + "test": "jest" }, "keywords": [ "swagger" @@ -14,10 +14,11 @@ "license": "Unlicense", "private": true, "dependencies": { - "connect": "^3.2.0", - "js-yaml": "^3.3.0", "axios": "^1.5.1", + "connect": "^3.2.0", "eventsource": "^2.0.2", + "js-yaml": "^3.3.0", + "kafkajs": "^2.2.4", "oas3-tools": "^2.2.3", "onf-core-model-ap": "2.1.2", "onf-core-model-ap-bs": "2.1.2", diff --git a/server/service/individualServices/KafkaClient.js b/server/service/individualServices/KafkaClient.js new file mode 100644 index 0000000..c13ed45 --- /dev/null +++ b/server/service/individualServices/KafkaClient.js @@ -0,0 +1,37 @@ +const { Kafka } = require("kafkajs"); +const process = require('process'); +const logger = require("../LoggingService.js").getLogger(); + +const clientId = "notification-proxy"; +const brokers = [process.env['KAFKA_BROKER'] || "localhost:9092"]; // Default to localhost if not set +let producer = null; + +exports.connect = async function () { + const kafka = new Kafka({ + clientId, + brokers, + }); + producer = kafka.producer(); + await producer.connect(); + logger.info(`Kafka producer connected to brokers: ${brokers.join(", ")}`); +}; + +exports.sendMessage = async function (topic, message) { + if (!producer) { + logger.error("Kafka producer is not connected. Call connect() first."); + return; + } + try { + await producer.send({ + topic: topic, + messages: [ + { + value: JSON.stringify(message), + }, + ], + }); + logger.info(`Message sent to topic ${topic}: ${JSON.stringify(message)}`); + } catch (error) { + logger.error(`Error sending message to topic ${topic}: ${error}`); + } +}; \ No newline at end of file diff --git a/server/service/individualServices/NotificationManagement.js b/server/service/individualServices/NotificationManagement.js index 53cfbf4..ece38cc 100644 --- a/server/service/individualServices/NotificationManagement.js +++ b/server/service/individualServices/NotificationManagement.js @@ -15,6 +15,7 @@ const logger = require('../LoggingService.js').getLogger(); const controllerManagement = require('./ControllerManagement'); const notificationManagement = require("./NotificationManagement"); const crypto = require("crypto"); +const kafkaClient = require("./KafkaClient"); const CONTROLLER_SUB_MODE_CONFIGURATION = "CONFIGURATION"; const CONTROLLER_SUB_MODE_OPERATIONAL = "OPERATIONAL"; @@ -724,20 +725,29 @@ function handleDeviceNotification(message, controllerName, controllerRelease, co * @param controllerTargetUrl */ async function notifyAllDeviceSubscribers(deviceNotificationType, controllerNotification, controllerName, controllerRelease, controllerTargetUrl) { - let activeSubscribers = await exports.getActiveSubscribers(deviceNotificationType); - - if (activeSubscribers.length > 0) { - logger.debug("starting notification of " + activeSubscribers.length + " subscribers for '" + deviceNotificationType + "', source-stream is " + controllerName + " -> " + controllerTargetUrl); - - //build one notification for all subscribers - let notificationMessage = notificationConverter.convertNotification(controllerNotification, deviceNotificationType, controllerName, controllerRelease); - - for (let subscriber of activeSubscribers) { - sendMessageToSubscriber(deviceNotificationType, subscriber.targetOperationURL, subscriber.operationKey, notificationMessage); - } - } else { - logger.debug("no subscribers for " + deviceNotificationType + ", message discarded"); - } + // Kafka + let notificationMessage = notificationConverter.convertNotification(controllerNotification, deviceNotificationType, controllerName, controllerRelease); + const topic = deviceNotificationType === configConstants.OAM_PATH_DEVICE_ALARMS ? 'device-alarms' : 'device-object-change'; + await kafkaClient.sendMessage(topic, { + type: deviceNotificationType, + payload: notificationMessage, + }); + + // Webhook + // let activeSubscribers = await exports.getActiveSubscribers(deviceNotificationType); + + // if (activeSubscribers.length > 0) { + // logger.debug("starting notification of " + activeSubscribers.length + " subscribers for '" + deviceNotificationType + "', source-stream is " + controllerName + " -> " + controllerTargetUrl); + + // //build one notification for all subscribers + // let notificationMessage = notificationConverter.convertNotification(controllerNotification, deviceNotificationType, controllerName, controllerRelease); + + // for (let subscriber of activeSubscribers) { + // sendMessageToSubscriber(deviceNotificationType, subscriber.targetOperationURL, subscriber.operationKey, notificationMessage); + // } + // } else { + // logger.debug("no subscribers for " + deviceNotificationType + ", message discarded"); + // } }