Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ REDIS_PASSWORD=""
REDIS_PORT=6379
REDIS_DB=0

#Redis common
REDIS_COMMON_URL="localhost"
REDIS_COMMON_PREFIX="development_env"
REDIS_COMMON_PASSWORD=""
REDIS_COMMON_PORT=6384
REDIS_COMMON_DB=0

#MongoDB
MONGODB_URL="mongodb://localhost:27017"
MONGODB_DATABASE="development"
Expand Down
3 changes: 2 additions & 1 deletion src/config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,8 @@
"VOLUME_WEIGHT": 0.25,
"TRADES_COUNT_WEIGHT": 0.25
},
"AWS_QUERY_CACHE_WARMER_DELAY": 50
"AWS_QUERY_CACHE_WARMER_DELAY": 50,
"TIMESCALEDB_INSERT_CHUNK_SIZE": 30
},
"dataApi": {
"tableName": "XEXCHANGE_ANALYTICS"
Expand Down
23 changes: 23 additions & 0 deletions src/helpers/api.config.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,29 @@ export class ApiConfigService {
return password !== '' ? password : undefined;
}

getCommonRedisUrl(): string {
const redisUrl = this.configService.get<string>('REDIS_COMMON_URL');
if (!redisUrl) {
throw new Error('No common redis url present');
}
return redisUrl;
}

getCommonRedisPort(): number {
const redisPort = this.configService.get<number>('REDIS_COMMON_PORT');
if (!redisPort) {
throw new Error('No common redis port present');
}
return redisPort;
}

getCommonRedisPassword(): string | undefined {
const password = this.configService.get<string>(
'REDIS_COMMON_PASSWORD',
);
return password !== '' ? password : undefined;
}

getApiUrl(): string {
const apiUrl = this.configService.get<string>('MX_API_URL');
if (!apiUrl) {
Expand Down
64 changes: 60 additions & 4 deletions src/modules/rabbitmq/rabbitmq.consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ import { governanceContractsAddresses } from '../../utils/governance';
import { GovernanceHandlerService } from './handlers/governance.handler.service';
import { RemoteConfigGetterService } from '../remote-config/remote-config.getter.service';
import { StakingHandlerService } from './handlers/staking.handler.service';
import {
RedisCacheService,
RedlockService,
} from '@multiversx/sdk-nestjs-cache';

@Injectable()
export class RabbitMqConsumer {
Expand All @@ -91,6 +95,8 @@ export class RabbitMqConsumer {
private readonly analyticsWrite: AnalyticsWriteService,
private readonly governanceHandler: GovernanceHandlerService,
private readonly remoteConfig: RemoteConfigGetterService,
private readonly redisService: RedisCacheService,
private readonly redlockService: RedlockService,
@Inject(WINSTON_MODULE_PROVIDER) private readonly logger: Logger,
) {}

Expand Down Expand Up @@ -332,14 +338,64 @@ export class RabbitMqConsumer {
}

if (Object.keys(this.data).length > 0) {
await this.analyticsWrite.ingest({
data: this.data,
Time: timestamp,
});
await this.ingestData(this.data, timestamp);
}
this.logger.info('Finish Processing events...');
}

async ingestData(data: any, timestamp: number): Promise<void> {
const lastProcessedTimestamp = await this.redisService.get<number>(
'eventsIngester.lastProcessedTimestamp',
);

if (!lastProcessedTimestamp) {
await this.redlockService.using(
`eventsIngester`,
`eventsIngesterTimestamp:${timestamp}`,
async () => {
await this.analyticsWrite.ingest({
data: data,
Time: timestamp,
});
},
{
keyExpiration: 120 * 1000,
maxRetries: 10,
retryInterval: 5000,
},
);
return;
}

if (lastProcessedTimestamp > timestamp) {
this.logger.info(
`Last processed timestamp (${lastProcessedTimestamp}) is newer than current timestamp (${timestamp})`,
);
return;
}

await this.redlockService.using(
`eventsIngester`,
`eventsIngesterTimestamp:${timestamp}`,
async () => {
await this.analyticsWrite.ingest({
data: data,
Time: timestamp,
});

await this.redisService.set(
'eventsIngester.lastProcessedTimestamp',
timestamp,
);
},
{
keyExpiration: 120 * 1000,
maxRetries: 10,
retryInterval: 5000,
},
);
}

async getFilterAddresses(): Promise<void> {
this.filterAddresses = [];
this.filterAddresses = await this.routerAbi.getAllPairsAddressRaw();
Expand Down
3 changes: 3 additions & 0 deletions src/modules/rabbitmq/rabbitmq.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import { FarmModuleV2 } from '../farm/v2/farm.v2.module';
import { RemoteConfigModule } from '../remote-config/remote-config.module';
import { StakingHandlerService } from './handlers/staking.handler.service';
import { StakingModule } from '../staking/staking.module';
import { DynamicModuleUtils } from 'src/utils/dynamic.module.utils';

@Module({
imports: [
Expand All @@ -66,6 +67,8 @@ import { StakingModule } from '../staking/staking.module';
GovernanceModule,
EscrowModule,
RemoteConfigModule,
DynamicModuleUtils.getCommonRedisModule(),
DynamicModuleUtils.getRedlockModule(),
],
providers: [
RabbitMqConsumer,
Expand Down
52 changes: 31 additions & 21 deletions src/services/analytics/timescaledb/timescaledb.write.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { InjectRepository } from '@nestjs/typeorm';
import { XExchangeAnalyticsEntity } from './entities/timescaledb.entities';
import { Repository } from 'typeorm';
import { IngestRecord } from '../entities/ingest.record';
import { constantsConfig } from 'src/config';

@Injectable()
export class TimescaleDBWriteService implements AnalyticsWriteInterface {
Expand Down Expand Up @@ -62,29 +63,38 @@ export class TimescaleDBWriteService implements AnalyticsWriteInterface {
): Promise<void> {
const profiler = new PerformanceProfiler('ingestData');

try {
await this.dexAnalytics.save(records);
} catch (errors) {
const logMessage = generateLogMessage(
TimescaleDBWriteService.name,
this.writeRecords.name,
'',
{
message: 'Internal Server Error',
status: 500,
response: errors,
},
);
this.logger.error(logMessage);
} finally {
profiler.stop();
const chunkSize = constantsConfig.TIMESCALEDB_INSERT_CHUNK_SIZE;
for (let i = 0; i < records.length; i += chunkSize) {
try {
const recordsChunk = records.slice(i, i + chunkSize);

MetricsCollector.setExternalCall(
TimescaleDBWriteService.name,
'ingestData',
profiler.duration,
);
await this.dexAnalytics.upsert(recordsChunk, [
'timestamp',
'series',
'key',
]);
} catch (errors) {
const logMessage = generateLogMessage(
TimescaleDBWriteService.name,
this.writeRecords.name,
'',
{
message: 'Internal Server Error',
status: 500,
response: errors,
},
);
this.logger.error(logMessage);
}
}

profiler.stop();

MetricsCollector.setExternalCall(
TimescaleDBWriteService.name,
'ingestData',
profiler.duration,
);
}

createRecords({ data, Time }): XExchangeAnalyticsEntity[] {
Expand Down
34 changes: 33 additions & 1 deletion src/utils/dynamic.module.utils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import { RedisCacheModuleOptions } from '@multiversx/sdk-nestjs-cache';
import {
RedisCacheModule,
RedisCacheModuleOptions,
RedlockConnectionOptions,
RedlockModule,
} from '@multiversx/sdk-nestjs-cache';
import {
ElasticModule,
ElasticModuleOptions,
Expand All @@ -25,6 +30,33 @@ export class DynamicModuleUtils {
});
}

static getCommonRedisModule(): DynamicModule {
return RedisCacheModule.forRootAsync({
imports: [CommonAppModule],
useFactory: (configService: ApiConfigService) =>
new RedisCacheModuleOptions({
host: configService.getCommonRedisUrl(),
port: configService.getCommonRedisPort(),
password: configService.getCommonRedisPassword(),
}),
inject: [ApiConfigService],
});
}

static getRedlockModule(): DynamicModule {
return RedlockModule.forRootAsync({
imports: [CommonAppModule],
useFactory: (configService: ApiConfigService) => [
new RedlockConnectionOptions({
host: configService.getCommonRedisUrl(),
port: configService.getCommonRedisPort(),
password: configService.getCommonRedisPassword(),
}),
],
inject: [ApiConfigService],
});
}

static getElasticModule(): DynamicModule {
return ElasticModule.forRootAsync({
imports: [CommonAppModule],
Expand Down
Loading