diff --git a/src/notify/amqp/ampq.service.ts b/src/notify/amqp/ampq.service.ts index 93af3ae..1da7db2 100644 --- a/src/notify/amqp/ampq.service.ts +++ b/src/notify/amqp/ampq.service.ts @@ -87,7 +87,12 @@ export class AmqpService implements NotifyProvider, OnModuleDestroy { const { url: _, exchange, routingKey, responseTimeout, reply, ...options } = settings; const channel = await this.getChannel(args.service); - const message = args.message ?? createMessage(process, args.service); + const message = + args.message ?? { + process: process.id, + ...createMessage(process, args.service), + etag: etag(process), + }; options.appId ??= DEFAULT_APPID; options.messageId = etag(process); diff --git a/src/notify/notify.service.spec.ts b/src/notify/notify.service.spec.ts index 358e3a6..bb1d2ba 100644 --- a/src/notify/notify.service.spec.ts +++ b/src/notify/notify.service.spec.ts @@ -59,4 +59,27 @@ describe('NotifyService', () => { it('should be defined', () => { expect(service).toBeDefined(); }); + + describe('onRetry', () => { + const process = { current: { notify: [{ service: 'a' }, { service: 'b' }] } } as any; + + it('should notify all services if no list provided', async () => { + const notifySpy = jest.spyOn(service as any, 'notify').mockResolvedValue(undefined); + + await service.onRetry({ process }); + + expect(notifySpy).toHaveBeenCalledTimes(2); + expect(notifySpy).toHaveBeenCalledWith(process, { service: 'a' }); + expect(notifySpy).toHaveBeenCalledWith(process, { service: 'b' }); + }); + + it('should notify only listed services', async () => { + const notifySpy = jest.spyOn(service as any, 'notify').mockResolvedValue(undefined); + + await service.onRetry({ process, services: ['b'] }); + + expect(notifySpy).toHaveBeenCalledTimes(1); + expect(notifySpy).toHaveBeenCalledWith(process, { service: 'b' }); + }); + }); }); diff --git a/src/notify/notify.service.ts b/src/notify/notify.service.ts index f10cc0e..0046045 100644 --- a/src/notify/notify.service.ts +++ b/src/notify/notify.service.ts @@ -29,7 +29,7 @@ export class NotifyService implements NotifyProvider { @OnEvent('process.retry') async onRetry({ process, services }: { process: Process; services?: string[] }) { for (const args of process.current.notify) { - if (!services && !services.includes(args.service)) continue; + if (services && !services.includes(args.service)) continue; await this.notify(process, args); } } diff --git a/src/notify/webhook/webhook.service.ts b/src/notify/webhook/webhook.service.ts index a9f209d..17c67ea 100644 --- a/src/notify/webhook/webhook.service.ts +++ b/src/notify/webhook/webhook.service.ts @@ -1,6 +1,6 @@ import { Inject, Injectable } from '@nestjs/common'; import { ConfigService } from '@/common/config/config.service'; -import { createMessage, Notify, Process } from '@letsflow/core/process'; +import { createMessage, etag, Notify, Process } from '@letsflow/core/process'; import { NotifyProvider } from '@/notify/notify-provider.interface'; interface WebhookSettings extends RequestInit { @@ -27,7 +27,12 @@ export class WebhookService implements NotifyProvider { throw new Error(`Service '${args.service}' is missing url setting`); } - const message = args.message ?? createMessage(process, args.service); + const message = + args.message ?? { + process: process.id, + ...createMessage(process, args.service), + etag: etag(process), + }; settings.headers ??= {}; settings.headers['Content-Type'] ??= typeof message === 'string' ? 'text/plain' : 'application/json'; diff --git a/src/notify/zeromq/zeromq.service.ts b/src/notify/zeromq/zeromq.service.ts index 3905503..1690829 100644 --- a/src/notify/zeromq/zeromq.service.ts +++ b/src/notify/zeromq/zeromq.service.ts @@ -1,5 +1,5 @@ import { Inject, Injectable, OnModuleDestroy } from '@nestjs/common'; -import { createMessage, Notify, Process } from '@letsflow/core/process'; +import { createMessage, etag, Notify, Process } from '@letsflow/core/process'; import { Push, Reply, SocketOptions } from 'zeromq'; import { ConfigService } from '@/common/config/config.service'; import { NotifyProvider } from '../notify-provider.interface'; @@ -41,7 +41,12 @@ export class ZeromqService implements NotifyProvider, OnModuleDestroy { async notify(process: Process, args: Notify): Promise { const socket = this.getSocket(args.service); - const message = args.message ?? createMessage(process, args.service); + const message = + args.message ?? { + process: process.id, + ...createMessage(process, args.service), + etag: etag(process), + }; await socket.send(typeof message === 'string' ? message : JSON.stringify(message)); diff --git a/src/process/process.service.spec.ts b/src/process/process.service.spec.ts index c410853..e0e689a 100644 --- a/src/process/process.service.spec.ts +++ b/src/process/process.service.spec.ts @@ -231,7 +231,6 @@ describe('ProcessService', () => { expect(process.current.key).toEqual('initial'); expect(process.current.actions).toEqual([ { - $schema: 'https://schemas.letsflow.io/v1.0/action', actor: ['actor'], description: '', title: 'complete', @@ -307,7 +306,6 @@ describe('ProcessService', () => { key: 'initial', actions: { complete: { - $schema: 'https://schemas.letsflow.io/v1.0/action', actor: ['actor'], description: '', title: 'complete', @@ -355,7 +353,6 @@ describe('ProcessService', () => { key: 'initial', actions: { complete: { - $schema: 'https://schemas.letsflow.io/v1.0/action', actor: ['actor'], description: '', title: 'complete',