Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 138 additions & 3 deletions packages/node-core/src/indexer/dynamic-ds.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@ class TestDynamicDsService extends DynamicDsService<BaseDataSource, ISubqueryPro
}

// Make it public
getTemplate(templateName: string, startBlock?: number | undefined): BaseDataSource {
return super.getTemplate(templateName, startBlock);
getTemplate(templateName: string, startBlock?: number | undefined, endBlock?: number | undefined): BaseDataSource {
return super.getTemplate(templateName, startBlock, endBlock);
}
}

const testParam1 = {templateName: 'Test', startBlock: 1};
const testParam2 = {templateName: 'Test', startBlock: 2};
const testParam3 = {templateName: 'Test', startBlock: 3};
const testParam4 = {templateName: 'Test', startBlock: 4};
const testParamOther = {templateName: 'Other', startBlock: 5};

const mockMetadata = (initData: DatasourceParams[] = []) => {
let datasourceParams: DatasourceParams[] = initData;
Expand All @@ -40,7 +41,7 @@ const mockMetadata = (initData: DatasourceParams[] = []) => {
describe('DynamicDsService', () => {
let service: TestDynamicDsService;
const project = {
templates: [{name: 'Test'}],
templates: [{name: 'Test'}, {name: 'Other'}],
} as any as ISubqueryProject;

beforeEach(() => {
Expand Down Expand Up @@ -70,6 +71,67 @@ describe('DynamicDsService', () => {
]);
});

it('can destroy a dynamic datasource', async () => {
const meta = mockMetadata([testParam1, testParam2]);
await service.init(meta);

await service.destroyDynamicDatasource('Test', 50);

const updatedParams = (service as any)._datasourceParams;
expect(updatedParams[0]).toEqual({...testParam1, endBlock: 50});
expect(updatedParams[1]).toEqual(testParam2);

const datasources = (service as any)._datasources;
expect(datasources[0].endBlock).toBe(50);
});

it('throws error when destroying non-existent datasource', async () => {
const meta = mockMetadata([testParam1]);
await service.init(meta);

await expect(service.destroyDynamicDatasource('NonExistent', 50)).rejects.toThrow(
'Dynamic datasource with template name "NonExistent" not found'
);
});

it('throws error when destroying already destroyed datasource', async () => {
const destroyedParam = {...testParam1, endBlock: 30};
const meta = mockMetadata([destroyedParam]);
await service.init(meta);

await expect(service.destroyDynamicDatasource('Test', 50)).rejects.toThrow(
'Dynamic datasource "Test" is already destroyed'
);
});

it('allows creating new datasource after destroying existing one', async () => {
const meta = mockMetadata([testParam1]);
await service.init(meta);

expect((service as any)._datasourceParams).toEqual([testParam1]);

await service.destroyDynamicDatasource('Test', 50);

const paramsAfterDestroy = (service as any)._datasourceParams;
expect(paramsAfterDestroy[0]).toEqual({...testParam1, endBlock: 50});

const newParam = {templateName: 'Test', startBlock: 60};
await service.createDynamicDatasource(newParam);

const finalParams = (service as any)._datasourceParams;
const destroyedCount = finalParams.filter((p) => p.endBlock !== undefined).length;
const activeCount = finalParams.filter((p) => p.endBlock === undefined).length;

expect(destroyedCount).toBeGreaterThanOrEqual(1);
expect(activeCount).toBeGreaterThanOrEqual(1);

const destroyedParam = finalParams.find((p) => p.startBlock === 1 && p.endBlock === 50);
expect(destroyedParam).toBeDefined();

const newParamFound = finalParams.find((p) => p.startBlock === 60 && !p.endBlock);
expect(newParamFound).toBeDefined();
});

it('resets dynamic datasources', async () => {
const meta = mockMetadata([testParam1, testParam2, testParam3, testParam4]);
await service.init(meta);
Expand All @@ -83,6 +145,25 @@ describe('DynamicDsService', () => {
]);
});

it('handles reset after datasource destruction correctly', async () => {
const params = [testParam1, testParam2, testParam3, testParam4];
const meta = mockMetadata(params);
await service.init(meta);

await service.destroyDynamicDatasource('Test', 25); // Destroys testParam1

const paramsAfterDestroy = (service as any)._datasourceParams;
expect(paramsAfterDestroy[0]).toEqual({...testParam1, endBlock: 25});

// Reset to block 2 (should keep testParam1 and testParam2)
await service.resetDynamicDatasource(2, null as any);

const paramsAfterReset = (service as any)._datasourceParams;
expect(paramsAfterReset).toHaveLength(2);
expect(paramsAfterReset[0]).toEqual({...testParam1, endBlock: 25});
expect(paramsAfterReset[1]).toEqual(testParam2);
});

it('getDynamicDatasources with force reloads from metadata', async () => {
const meta = mockMetadata([testParam1, testParam2]);
await service.init(meta);
Expand All @@ -107,6 +188,29 @@ describe('DynamicDsService', () => {
]);
});

it('loads destroyed datasources with endBlock correctly', async () => {
const destroyedParam = {...testParam1, endBlock: 100};
const meta = mockMetadata([destroyedParam, testParam2]);
await service.init(meta);

const datasources = await service.getDynamicDatasources();
expect(datasources).toHaveLength(2);
expect((datasources[0] as any).endBlock).toBe(100);
expect((datasources[1] as any).endBlock).toBeUndefined();
});

it('updates metadata correctly when destroying datasource', async () => {
const meta = mockMetadata([testParam1, testParam2]);
await service.init(meta);

await service.destroyDynamicDatasource('Test', 75);

const metadataParams = await meta.find('dynamicDatasources');
expect(metadataParams).toBeDefined();
expect(metadataParams![0]).toEqual({...testParam1, endBlock: 75});
expect(metadataParams![1]).toEqual(testParam2);
});

it('can find a template and cannot mutate the template', () => {
const template1 = service.getTemplate('Test', 1);
const template2 = service.getTemplate('Test', 2);
Expand All @@ -120,4 +224,35 @@ describe('DynamicDsService', () => {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
expect(project.templates![0]).toEqual({name: 'Test'});
});

it('can create template with endBlock', () => {
const template = service.getTemplate('Test', 1, 100);

expect(template.startBlock).toBe(1);
expect((template as any).endBlock).toBe(100);
expect((template as any).name).toBeUndefined();
});

it('handles multiple templates with same name during destruction', async () => {
const param1 = {templateName: 'Test', startBlock: 1};
const param2 = {templateName: 'Test', startBlock: 5};
const param3 = {templateName: 'Other', startBlock: 3};

const meta = mockMetadata([param1, param2, param3]);
await service.init(meta);

// Should destroy the first matching one
await service.destroyDynamicDatasource('Test', 10);

const updatedParams = (service as any)._datasourceParams;
expect(updatedParams[0]).toEqual({...param1, endBlock: 10});
expect(updatedParams[1]).toEqual(param2); // Not destroyed
expect(updatedParams[2]).toEqual(param3); // Not destroyed
});

it('throws error when service not initialized for destruction', async () => {
await expect(service.destroyDynamicDatasource('Test', 50)).rejects.toThrow(
'DynamicDsService has not been initialized'
);
});
});
38 changes: 34 additions & 4 deletions packages/node-core/src/indexer/dynamic-ds.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ export interface DatasourceParams {
templateName: string;
args?: Record<string, unknown>;
startBlock: number;
endBlock?: number;
}

export interface IDynamicDsService<DS> {
dynamicDatasources: DS[];
createDynamicDatasource(params: DatasourceParams): Promise<DS>;
destroyDynamicDatasource(templateName: string, currentBlockHeight: number): Promise<void>;
getDynamicDatasources(forceReload?: boolean): Promise<DS[]>;
}

Expand Down Expand Up @@ -91,6 +93,34 @@ export class DynamicDsService<DS extends BaseDataSource = BaseDataSource, P exte
}
}

async destroyDynamicDatasource(templateName: string, currentBlockHeight: number, tx?: Transaction): Promise<void> {
if (!this._datasources || !this._datasourceParams) {
throw new Error('DynamicDsService has not been initialized');
}

const dsIndex = this._datasourceParams.findIndex((params) => params.templateName === templateName);
if (dsIndex === -1) {
throw new Error(`Dynamic datasource with template name "${templateName}" not found`);
}

const dsParam = this._datasourceParams[dsIndex];

if (dsParam.endBlock !== undefined) {
throw new Error(`Dynamic datasource "${templateName}" is already destroyed`);
}

const updatedParams = {...dsParam, endBlock: currentBlockHeight};
this._datasourceParams[dsIndex] = updatedParams;

if (this._datasources[dsIndex]) {
(this._datasources[dsIndex] as any).endBlock = currentBlockHeight;
}

await this.metadata.set(METADATA_KEY, this._datasourceParams, tx);

logger.info(`Destroyed dynamic datasource "${templateName}" at block ${currentBlockHeight}`);
}

// Not force only seems to be used for project changes
async getDynamicDatasources(forceReload?: boolean): Promise<DS[]> {
// Workers should not cache this result in order to keep in sync
Expand All @@ -117,19 +147,19 @@ export class DynamicDsService<DS extends BaseDataSource = BaseDataSource, P exte
*
* This will throw if the template cannot be found by name.
*
* Inserts the startBlock into the template.
* Inserts the startBlock and optionally endBlock into the template.
* */
protected getTemplate(templateName: string, startBlock?: number): DS {
protected getTemplate(templateName: string, startBlock?: number, endBlock?: number): DS {
const t = (this.project.templates ?? []).find((t) => t.name === templateName);
if (!t) {
throw new Error(`Unable to find matching template in project for name: "${templateName}"`);
}
const {name, ...template} = cloneDeep(t);
return {...template, startBlock} as DS;
return {...template, startBlock, endBlock} as DS;
}

private async getDatasource(params: DatasourceParams): Promise<DS> {
const dsObj = this.getTemplate(params.templateName, params.startBlock);
const dsObj = this.getTemplate(params.templateName, params.startBlock, params.endBlock);

try {
await this.blockchainService.updateDynamicDs(params, dsObj);
Expand Down
23 changes: 23 additions & 0 deletions packages/node-core/src/indexer/indexer.manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,29 @@ export abstract class BaseIndexerManager<
dynamicDsCreated = true;
}, 'createDynamicDatasource');

// Inject function to destroy ds into vm
vm.freeze(async (templateName?: string) => {
if (!templateName) {
throw new Error('Cannot destroy datasource: template name must be provided');
}

await this.dynamicDsService.destroyDynamicDatasource(templateName, blockHeight);

// Mark datasources with this template for removal from current processing
filteredDataSources.forEach((fds) => {
const dsParams = this.dynamicDsService.dynamicDatasources.find((dynamicDs) => {
// Find the corresponding params for this datasource
const params = (this.dynamicDsService as any)._datasourceParams?.find(
(p: any) => p.templateName === templateName && p.startBlock === (fds as any).startBlock
);
return params !== undefined;
});
if (dsParams) {
(fds as any).endBlock = blockHeight;
}
});
}, 'destroyDynamicDatasource');

return vm;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import {DatasourceParams, IDynamicDsService} from '../dynamic-ds.service';

export type HostDynamicDS<DS> = {
dynamicDsCreateDynamicDatasource: (params: DatasourceParams) => Promise<DS>;
dynamicDsDestroyDynamicDatasource: (templateName: string, currentBlockHeight: number) => Promise<void>;
dynamicDsGetDynamicDatasources: () => Promise<DS[]>;
};

export const hostDynamicDsKeys: (keyof HostDynamicDS<any>)[] = [
'dynamicDsCreateDynamicDatasource',
'dynamicDsDestroyDynamicDatasource',
'dynamicDsGetDynamicDatasources',
];

Expand All @@ -32,6 +34,10 @@ export class WorkerDynamicDsService<DS> implements IDynamicDsService<DS> {
return this.host.dynamicDsCreateDynamicDatasource(JSON.parse(JSON.stringify(params)));
}

async destroyDynamicDatasource(templateName: string, currentBlockHeight: number): Promise<void> {
return this.host.dynamicDsDestroyDynamicDatasource(templateName, currentBlockHeight);
}

async getDynamicDatasources(): Promise<DS[]> {
return this.host.dynamicDsGetDynamicDatasources();
}
Expand All @@ -40,6 +46,7 @@ export class WorkerDynamicDsService<DS> implements IDynamicDsService<DS> {
export function dynamicDsHostFunctions<DS>(dynamicDsService: IDynamicDsService<DS>): HostDynamicDS<DS> {
return {
dynamicDsCreateDynamicDatasource: dynamicDsService.createDynamicDatasource.bind(dynamicDsService),
dynamicDsDestroyDynamicDatasource: dynamicDsService.destroyDynamicDatasource.bind(dynamicDsService),
dynamicDsGetDynamicDatasources: dynamicDsService.getDynamicDatasources.bind(dynamicDsService),
};
}
3 changes: 2 additions & 1 deletion packages/types-core/src/global.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: GPL-3.0

import type Pino from 'pino';
import {Cache, DynamicDatasourceCreator} from './interfaces';
import {Cache, DynamicDatasourceCreator, DynamicDatasourceDestructor} from './interfaces';
import {Store} from './store';

// base global
Expand All @@ -12,4 +12,5 @@ declare global {
const cache: Cache;
const chainId: string;
const createDynamicDatasource: DynamicDatasourceCreator;
const destroyDynamicDatasource: DynamicDatasourceDestructor;
}
1 change: 1 addition & 0 deletions packages/types-core/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: GPL-3.0

export type DynamicDatasourceCreator = (name: string, args: Record<string, unknown>) => Promise<void>;
export type DynamicDatasourceDestructor = (name?: string) => Promise<void>;

export interface Cache<T extends Record<string, any> = Record<string, any>> {
set(key: keyof T, value: T[keyof T]): Promise<void>;
Expand Down