diff --git a/.env.example b/.env.example index 74850e275..2a9dbac9d 100644 --- a/.env.example +++ b/.env.example @@ -17,6 +17,13 @@ REDIS_PASSWORD="" REDIS_PORT=6379 REDIS_DB=0 +#Redis common +REDIS_COMMON_URL="localhost" +REDIS_COMMON_PREFIX="development_env" +REDIS_COMMON_PASSWORD="" +REDIS_COMMON_PORT=6384 +REDIS_COMMON_DB=0 + #MongoDB MONGODB_URL="mongodb://localhost:27017" MONGODB_DATABASE="development" diff --git a/src/config/default.json b/src/config/default.json index 13ffe4d62..7ff302a84 100644 --- a/src/config/default.json +++ b/src/config/default.json @@ -655,7 +655,8 @@ "VOLUME_WEIGHT": 0.25, "TRADES_COUNT_WEIGHT": 0.25 }, - "AWS_QUERY_CACHE_WARMER_DELAY": 50 + "AWS_QUERY_CACHE_WARMER_DELAY": 50, + "TIMESCALEDB_INSERT_CHUNK_SIZE": 30 }, "dataApi": { "tableName": "XEXCHANGE_ANALYTICS" diff --git a/src/helpers/api.config.service.ts b/src/helpers/api.config.service.ts index 27b1184da..99879331b 100644 --- a/src/helpers/api.config.service.ts +++ b/src/helpers/api.config.service.ts @@ -124,6 +124,29 @@ export class ApiConfigService { return password !== '' ? password : undefined; } + getCommonRedisUrl(): string { + const redisUrl = this.configService.get('REDIS_COMMON_URL'); + if (!redisUrl) { + throw new Error('No common redis url present'); + } + return redisUrl; + } + + getCommonRedisPort(): number { + const redisPort = this.configService.get('REDIS_COMMON_PORT'); + if (!redisPort) { + throw new Error('No common redis port present'); + } + return redisPort; + } + + getCommonRedisPassword(): string | undefined { + const password = this.configService.get( + 'REDIS_COMMON_PASSWORD', + ); + return password !== '' ? password : undefined; + } + getApiUrl(): string { const apiUrl = this.configService.get('MX_API_URL'); if (!apiUrl) { diff --git a/src/modules/rabbitmq/rabbitmq.consumer.ts b/src/modules/rabbitmq/rabbitmq.consumer.ts index b84c44e7d..7fc04f62a 100644 --- a/src/modules/rabbitmq/rabbitmq.consumer.ts +++ b/src/modules/rabbitmq/rabbitmq.consumer.ts @@ -66,6 +66,10 @@ import { governanceContractsAddresses } from '../../utils/governance'; import { GovernanceHandlerService } from './handlers/governance.handler.service'; import { RemoteConfigGetterService } from '../remote-config/remote-config.getter.service'; import { StakingHandlerService } from './handlers/staking.handler.service'; +import { + RedisCacheService, + RedlockService, +} from '@multiversx/sdk-nestjs-cache'; @Injectable() export class RabbitMqConsumer { @@ -91,6 +95,8 @@ export class RabbitMqConsumer { private readonly analyticsWrite: AnalyticsWriteService, private readonly governanceHandler: GovernanceHandlerService, private readonly remoteConfig: RemoteConfigGetterService, + private readonly redisService: RedisCacheService, + private readonly redlockService: RedlockService, @Inject(WINSTON_MODULE_PROVIDER) private readonly logger: Logger, ) {} @@ -332,14 +338,64 @@ export class RabbitMqConsumer { } if (Object.keys(this.data).length > 0) { - await this.analyticsWrite.ingest({ - data: this.data, - Time: timestamp, - }); + await this.ingestData(this.data, timestamp); } this.logger.info('Finish Processing events...'); } + async ingestData(data: any, timestamp: number): Promise { + const lastProcessedTimestamp = await this.redisService.get( + 'eventsIngester.lastProcessedTimestamp', + ); + + if (!lastProcessedTimestamp) { + await this.redlockService.using( + `eventsIngester`, + `eventsIngesterTimestamp:${timestamp}`, + async () => { + await this.analyticsWrite.ingest({ + data: data, + Time: timestamp, + }); + }, + { + keyExpiration: 120 * 1000, + maxRetries: 10, + retryInterval: 5000, + }, + ); + return; + } + + if (lastProcessedTimestamp > timestamp) { + this.logger.info( + `Last processed timestamp (${lastProcessedTimestamp}) is newer than current timestamp (${timestamp})`, + ); + return; + } + + await this.redlockService.using( + `eventsIngester`, + `eventsIngesterTimestamp:${timestamp}`, + async () => { + await this.analyticsWrite.ingest({ + data: data, + Time: timestamp, + }); + + await this.redisService.set( + 'eventsIngester.lastProcessedTimestamp', + timestamp, + ); + }, + { + keyExpiration: 120 * 1000, + maxRetries: 10, + retryInterval: 5000, + }, + ); + } + async getFilterAddresses(): Promise { this.filterAddresses = []; this.filterAddresses = await this.routerAbi.getAllPairsAddressRaw(); diff --git a/src/modules/rabbitmq/rabbitmq.module.ts b/src/modules/rabbitmq/rabbitmq.module.ts index d55056177..e52617b8a 100644 --- a/src/modules/rabbitmq/rabbitmq.module.ts +++ b/src/modules/rabbitmq/rabbitmq.module.ts @@ -40,6 +40,7 @@ import { FarmModuleV2 } from '../farm/v2/farm.v2.module'; import { RemoteConfigModule } from '../remote-config/remote-config.module'; import { StakingHandlerService } from './handlers/staking.handler.service'; import { StakingModule } from '../staking/staking.module'; +import { DynamicModuleUtils } from 'src/utils/dynamic.module.utils'; @Module({ imports: [ @@ -66,6 +67,8 @@ import { StakingModule } from '../staking/staking.module'; GovernanceModule, EscrowModule, RemoteConfigModule, + DynamicModuleUtils.getCommonRedisModule(), + DynamicModuleUtils.getRedlockModule(), ], providers: [ RabbitMqConsumer, diff --git a/src/services/analytics/timescaledb/timescaledb.write.service.ts b/src/services/analytics/timescaledb/timescaledb.write.service.ts index 3f14071c1..447b4c57b 100644 --- a/src/services/analytics/timescaledb/timescaledb.write.service.ts +++ b/src/services/analytics/timescaledb/timescaledb.write.service.ts @@ -10,6 +10,7 @@ import { InjectRepository } from '@nestjs/typeorm'; import { XExchangeAnalyticsEntity } from './entities/timescaledb.entities'; import { Repository } from 'typeorm'; import { IngestRecord } from '../entities/ingest.record'; +import { constantsConfig } from 'src/config'; @Injectable() export class TimescaleDBWriteService implements AnalyticsWriteInterface { @@ -62,29 +63,38 @@ export class TimescaleDBWriteService implements AnalyticsWriteInterface { ): Promise { const profiler = new PerformanceProfiler('ingestData'); - try { - await this.dexAnalytics.save(records); - } catch (errors) { - const logMessage = generateLogMessage( - TimescaleDBWriteService.name, - this.writeRecords.name, - '', - { - message: 'Internal Server Error', - status: 500, - response: errors, - }, - ); - this.logger.error(logMessage); - } finally { - profiler.stop(); + const chunkSize = constantsConfig.TIMESCALEDB_INSERT_CHUNK_SIZE; + for (let i = 0; i < records.length; i += chunkSize) { + try { + const recordsChunk = records.slice(i, i + chunkSize); - MetricsCollector.setExternalCall( - TimescaleDBWriteService.name, - 'ingestData', - profiler.duration, - ); + await this.dexAnalytics.upsert(recordsChunk, [ + 'timestamp', + 'series', + 'key', + ]); + } catch (errors) { + const logMessage = generateLogMessage( + TimescaleDBWriteService.name, + this.writeRecords.name, + '', + { + message: 'Internal Server Error', + status: 500, + response: errors, + }, + ); + this.logger.error(logMessage); + } } + + profiler.stop(); + + MetricsCollector.setExternalCall( + TimescaleDBWriteService.name, + 'ingestData', + profiler.duration, + ); } createRecords({ data, Time }): XExchangeAnalyticsEntity[] { diff --git a/src/utils/dynamic.module.utils.ts b/src/utils/dynamic.module.utils.ts index 99e540fab..b8afefb91 100644 --- a/src/utils/dynamic.module.utils.ts +++ b/src/utils/dynamic.module.utils.ts @@ -1,4 +1,9 @@ -import { RedisCacheModuleOptions } from '@multiversx/sdk-nestjs-cache'; +import { + RedisCacheModule, + RedisCacheModuleOptions, + RedlockConnectionOptions, + RedlockModule, +} from '@multiversx/sdk-nestjs-cache'; import { ElasticModule, ElasticModuleOptions, @@ -25,6 +30,33 @@ export class DynamicModuleUtils { }); } + static getCommonRedisModule(): DynamicModule { + return RedisCacheModule.forRootAsync({ + imports: [CommonAppModule], + useFactory: (configService: ApiConfigService) => + new RedisCacheModuleOptions({ + host: configService.getCommonRedisUrl(), + port: configService.getCommonRedisPort(), + password: configService.getCommonRedisPassword(), + }), + inject: [ApiConfigService], + }); + } + + static getRedlockModule(): DynamicModule { + return RedlockModule.forRootAsync({ + imports: [CommonAppModule], + useFactory: (configService: ApiConfigService) => [ + new RedlockConnectionOptions({ + host: configService.getCommonRedisUrl(), + port: configService.getCommonRedisPort(), + password: configService.getCommonRedisPassword(), + }), + ], + inject: [ApiConfigService], + }); + } + static getElasticModule(): DynamicModule { return ElasticModule.forRootAsync({ imports: [CommonAppModule],