Skip to content

Commit 525bff1

Browse files
authored
Add timeout to jetstream (#494)
1 parent 1185b05 commit 525bff1

File tree

5 files changed

+24
-9
lines changed

5 files changed

+24
-9
lines changed

boilerplate/ts/infra/nest-jetstream/package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@bitloops/bl-boilerplate-infra-nest-jetstream",
3-
"version": "0.0.6",
3+
"version": "0.0.7",
44
"description": "TypeScript nats jetstream code for Bitloops Language generated projects using NestJS",
55
"engines": {
66
"node": ">= 13"

boilerplate/ts/infra/nest-jetstream/src/buses/nats-pubsub-command-bus.ts

+6-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { Inject, Injectable, Logger } from '@nestjs/common';
22
import { NatsConnection, JSONCodec, headers, Msg, MsgHdrs } from 'nats';
33
import { Application, Infra } from '@bitloops/bl-boilerplate-core';
4-
import { ASYNC_LOCAL_STORAGE, ProvidersConstants } from '../jetstream.constants';
4+
import { ASYNC_LOCAL_STORAGE, TIMEOUT_MILLIS, ProvidersConstants } from '../jetstream.constants';
55
import { ContextPropagation } from './utils/context-propagation';
66

77
const jsonCodec = JSONCodec();
@@ -16,6 +16,8 @@ export class NatsPubSubCommandBus implements Infra.CommandBus.IPubSubCommandBus
1616
@Inject(ProvidersConstants.JETSTREAM_PROVIDER) private readonly nats: any,
1717
@Inject(ASYNC_LOCAL_STORAGE)
1818
private readonly asyncLocalStorage: any,
19+
@Inject(TIMEOUT_MILLIS)
20+
private readonly timeoutMillis: number,
1921
) {
2022
this.nc = this.nats.getConnection();
2123
}
@@ -42,8 +44,8 @@ export class NatsPubSubCommandBus implements Infra.CommandBus.IPubSubCommandBus
4244
async request(command: Application.Command): Promise<any> {
4345
const topic = NatsPubSubCommandBus.getTopicFromCommandInstance(command);
4446

45-
this.logger.log('Requesting in haha :' + topic);
46-
console.log('this.asyncLocalStorage.getStore()', this.asyncLocalStorage.getStore());
47+
this.logger.log('Requesting in topic:' + topic);
48+
// console.log('this.asyncLocalStorage.getStore()', this.asyncLocalStorage.getStore());
4749
command.correlationId = this.getCorelationId();
4850
command.context = this.getContext();
4951
// command
@@ -52,7 +54,7 @@ export class NatsPubSubCommandBus implements Infra.CommandBus.IPubSubCommandBus
5254
try {
5355
const response = await this.nc.request(topic, jsonCodec.encode(command), {
5456
headers,
55-
timeout: 10000,
57+
timeout: this.timeoutMillis,
5658
});
5759
return jsonCodec.decode(response.data);
5860
} catch (error: any) {

boilerplate/ts/infra/nest-jetstream/src/jetstream-core.module.ts

+14-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,12 @@ import { JetstreamModuleFeatureConfig } from './interfaces/module-feature-input.
99
import { BUSES_TOKENS } from './buses/constants';
1010
import { NatsStreamingIntegrationEventBus, NatsStreamingCommandBus } from './buses';
1111
import { SubscriptionsService } from './jetstream.subscriptions.service';
12-
import { ASYNC_LOCAL_STORAGE, HANDLERS_TOKENS, ProvidersConstants } from './jetstream.constants';
12+
import {
13+
ASYNC_LOCAL_STORAGE,
14+
HANDLERS_TOKENS,
15+
ProvidersConstants,
16+
TIMEOUT_MILLIS,
17+
} from './jetstream.constants';
1318
import { NatsPubSubIntegrationEventsBus } from './buses/nats-pubsub-integration-events-bus';
1419

1520
const pubSubCommandBus = {
@@ -48,7 +53,7 @@ const pubSubIntegrationEventBus = {
4853
@Global()
4954
@Module({})
5055
export class JetstreamCoreModule {
51-
static forRoot(connectionOptions: ConnectionOptions): DynamicModule {
56+
static forRoot(connectionOptions: ConnectionOptions, timeoutMillis?: number): DynamicModule {
5257
const jetstreamProviders = {
5358
provide: ProvidersConstants.JETSTREAM_PROVIDER,
5459
useFactory: (): any => {
@@ -61,6 +66,11 @@ export class JetstreamCoreModule {
6166
useValue: asyncLocalStorage,
6267
};
6368

69+
const timeoutMillisProvider = {
70+
provide: TIMEOUT_MILLIS,
71+
useValue: timeoutMillis ?? 10000,
72+
};
73+
6474
return {
6575
module: JetstreamCoreModule,
6676
providers: [
@@ -73,6 +83,7 @@ export class JetstreamCoreModule {
7383
streamingMessageBus,
7484
pubSubIntegrationEventBus,
7585
asyncLocalStorageProvider,
86+
timeoutMillisProvider,
7687
],
7788
exports: [
7889
jetstreamProviders,
@@ -84,6 +95,7 @@ export class JetstreamCoreModule {
8495
streamingMessageBus,
8596
pubSubIntegrationEventBus,
8697
asyncLocalStorageProvider,
98+
timeoutMillisProvider,
8799
],
88100
};
89101
}

boilerplate/ts/infra/nest-jetstream/src/jetstream.constants.ts

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ export const HANDLERS_TOKENS = {
1111
};
1212

1313
export const ASYNC_LOCAL_STORAGE = Symbol('ASYNC_LOCAL_STORAGE');
14+
export const TIMEOUT_MILLIS = Symbol('TIMEOUT_MILLIS');
1415

1516
export const METADATA_HEADERS = {
1617
CORRELATION_ID: 'correlationId',

boilerplate/ts/infra/nest-jetstream/src/jetstream.module.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ import { JetstreamCoreModule } from './jetstream-core.module';
66

77
@Module({})
88
export class JetstreamModule {
9-
static forRoot(connectionOptions: ConnectionOptions): DynamicModule {
9+
static forRoot(connectionOptions: ConnectionOptions, timeoutMillis?: number): DynamicModule {
1010
return {
1111
module: JetstreamModule,
12-
imports: [JetstreamCoreModule.forRoot(connectionOptions)],
12+
imports: [JetstreamCoreModule.forRoot(connectionOptions, timeoutMillis)],
1313
};
1414
}
1515

0 commit comments

Comments
 (0)