diff --git a/package-lock.json b/package-lock.json index 9538c99b..613d8383 100644 --- a/package-lock.json +++ b/package-lock.json @@ -22,6 +22,7 @@ "@nmshd/typescript-ioc": "^3.2.5", "@nmshd/typescript-rest": "^3.2.2", "agentkeepalive": "4.6.0", + "amqp-connection-manager": "^5.0.0", "amqplib": "^0.10.9", "axios": "^1.13.2", "compression": "1.8.1", @@ -4395,11 +4396,28 @@ } } }, + "node_modules/amqp-connection-manager": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/amqp-connection-manager/-/amqp-connection-manager-5.0.0.tgz", + "integrity": "sha512-88yQzqa5RSBgnLl504XjvCQJ7d+osskdwvg35Lwm1LRbfLjNU9p7SQUMSP82BB7mseiq9tIUPJ3HE3eXQbpjEw==", + "license": "MIT", + "dependencies": { + "promise-breaker": "^6.0.0" + }, + "engines": { + "node": ">=10.0.0", + "npm": ">5.0.0" + }, + "peerDependencies": { + "amqplib": "*" + } + }, "node_modules/amqplib": { "version": "0.10.9", "resolved": "https://registry.npmjs.org/amqplib/-/amqplib-0.10.9.tgz", "integrity": "sha512-jwSftI4QjS3mizvnSnOrPGYiUnm1vI2OP1iXeOUz5pb74Ua0nbf6nPyyTzuiCLEE3fMpaJORXh2K/TQ08H5xGA==", "license": "MIT", + "peer": true, "dependencies": { "buffer-more-ints": "~1.0.0", "url-parse": "~1.5.10" @@ -11732,6 +11750,12 @@ "integrity": "sha512-3ouUOpQhtgrbOa17J7+uxOTpITYWaGP7/AhoR3+A+/1e9skrzelGi/dXzEYyvbxubEF6Wn2ypscTKiKJFFn1ag==", "license": "MIT" }, + "node_modules/promise-breaker": { + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/promise-breaker/-/promise-breaker-6.0.0.tgz", + "integrity": "sha512-BthzO9yTPswGf7etOBiHCVuugs2N01/Q/94dIPls48z2zCmrnDptUUZzfIb+41xq0MnYZ/BzmOd6ikDR4ibNZA==", + "license": "MIT" + }, "node_modules/proto3-json-serializer": { "version": "3.0.2", "resolved": "https://registry.npmjs.org/proto3-json-serializer/-/proto3-json-serializer-3.0.2.tgz", diff --git a/package.json b/package.json index d383c703..de47d062 100644 --- a/package.json +++ b/package.json @@ -88,6 +88,7 @@ "@nmshd/typescript-ioc": "^3.2.5", "@nmshd/typescript-rest": "^3.2.2", "agentkeepalive": "4.6.0", + "amqp-connection-manager": "^5.0.0", "amqplib": "^0.10.9", "axios": "^1.13.2", "compression": "1.8.1", diff --git a/src/modules/messageBrokerPublisher/connectors/AMQPConnector.ts b/src/modules/messageBrokerPublisher/connectors/AMQPConnector.ts index dcf3f419..14ec8379 100644 --- a/src/modules/messageBrokerPublisher/connectors/AMQPConnector.ts +++ b/src/modules/messageBrokerPublisher/connectors/AMQPConnector.ts @@ -1,5 +1,5 @@ import { ILogger } from "@js-soft/logging-abstractions"; -import amqp from "amqplib"; +import { AmqpConnectionManager, ChannelWrapper, connect as amqpConnect } from "amqp-connection-manager"; import { MessageBrokerConnector } from "./MessageBrokerConnector"; export interface AMQPConnectorConfiguration { @@ -9,8 +9,8 @@ export interface AMQPConnectorConfiguration { } export class AMQPConnector extends MessageBrokerConnector { - private connection?: amqp.ChannelModel; - private channel?: amqp.Channel; + private connection?: AmqpConnectionManager; + private channel?: ChannelWrapper; public constructor(configuration: AMQPConnectorConfiguration, logger: ILogger) { super(configuration, logger); @@ -21,24 +21,24 @@ export class AMQPConnector extends MessageBrokerConnector { const url = this.configuration.url; - this.connection = await amqp.connect(url, { timeout: this.configuration.timeout ?? 2000 }).catch((e) => { - throw new Error(`Could not connect to RabbitMQ at '${url}' (${e.message})`); - }); + this.connection = amqpConnect(url, { connectionOptions: { timeout: this.configuration.timeout ?? 2000 } }); - this.channel = await this.connection.createChannel().catch((e) => { - throw new Error(`Could not create a channel for RabbitMQ (${e.message})`); + await this.connection.connect().catch((e) => { + throw new Error(`Could not connect to RabbitMQ at '${url}' (${e.message})`); }); const exchange = this.configuration.exchange ?? ""; + this.channel = this.connection.createChannel({ json: true }); + await this.channel.checkExchange(exchange).catch(() => { throw new Error(`The configured exchange '${exchange}' does not exist.`); }); } - public publish(namespace: string, data: Buffer): void { + public async publish(namespace: string, data: Buffer): Promise { const exchangeName = this.configuration.exchange ?? ""; - const sent = this.channel!.publish(exchangeName, namespace, data); + const sent = await this.channel!.publish(exchangeName, namespace, data); if (!sent) { this.logger.error(`Publishing event '${namespace}' to exchange '${exchangeName}' failed.`); }