diff --git a/docs/src/content/docs/developer-guides/storage/storage-operations.mdx b/docs/src/content/docs/developer-guides/storage/storage-operations.mdx index 628c4dff..efcac39a 100644 --- a/docs/src/content/docs/developer-guides/storage/storage-operations.mdx +++ b/docs/src/content/docs/developer-guides/storage/storage-operations.mdx @@ -115,7 +115,7 @@ for (const ds of dataSets) { console.log(`Dataset ${ds.pdpVerifierDataSetId}:`, { live: ds.isLive, cdn: ds.withCDN, - pieces: ds.currentPieceCount, + pieces: ds.activePieceCount, metadata: ds.metadata }); } diff --git a/packages/synapse-core/src/mocks/jsonrpc/index.ts b/packages/synapse-core/src/mocks/jsonrpc/index.ts index e23f044f..65f5653e 100644 --- a/packages/synapse-core/src/mocks/jsonrpc/index.ts +++ b/packages/synapse-core/src/mocks/jsonrpc/index.ts @@ -475,6 +475,7 @@ export const presets = { dataSetLive: () => [true], getDataSetListener: () => [ADDRESSES.calibration.warmStorage], getNextPieceId: () => [2n], + getActivePieceCount: () => [2n], getActivePieces: () => [[], [], false], getDataSetStorageProvider: () => [ADDRESSES.serviceProvider1, ADDRESSES.zero], getDataSetLeafCount: () => [0n], diff --git a/packages/synapse-core/src/mocks/jsonrpc/pdp.ts b/packages/synapse-core/src/mocks/jsonrpc/pdp.ts index a2d35575..c59afc29 100644 --- a/packages/synapse-core/src/mocks/jsonrpc/pdp.ts +++ b/packages/synapse-core/src/mocks/jsonrpc/pdp.ts @@ -6,6 +6,7 @@ import * as Abis from '../../abis/index.ts' import type { AbiToType, JSONRPCOptions } from './types.ts' export type getNextPieceId = ExtractAbiFunction +export type getActivePieceCount = ExtractAbiFunction export type dataSetLive = ExtractAbiFunction export type getDataSetListener = ExtractAbiFunction export type getActivePieces = ExtractAbiFunction @@ -17,6 +18,7 @@ export interface PDPVerifierOptions { dataSetLive?: (args: AbiToType) => AbiToType getDataSetListener?: (args: AbiToType) => AbiToType getNextPieceId?: (args: AbiToType) => AbiToType + getActivePieceCount?: (args: AbiToType) => AbiToType getActivePieces?: (args: AbiToType) => AbiToType getDataSetStorageProvider?: ( args: AbiToType @@ -65,6 +67,14 @@ export function pdpVerifierCallHandler(data: Hex, options: JSONRPCOptions): Hex Abis.pdp.find((abi) => abi.type === 'function' && abi.name === 'getNextPieceId')!.outputs, options.pdpVerifier.getNextPieceId(args) ) + case 'getActivePieceCount': + if (!options.pdpVerifier?.getActivePieceCount) { + throw new Error('PDP Verifier: getActivePieceCount is not defined') + } + return encodeAbiParameters( + Abis.pdp.find((abi) => abi.type === 'function' && abi.name === 'getActivePieceCount')!.outputs, + options.pdpVerifier.getActivePieceCount(args) + ) case 'getActivePieces': { if (!options.pdpVerifier?.getActivePieces) { throw new Error('PDP Verifier: getActivePieces is not defined') diff --git a/packages/synapse-sdk/src/pdp/verifier.ts b/packages/synapse-sdk/src/pdp/verifier.ts index db98c968..7cf46059 100644 --- a/packages/synapse-sdk/src/pdp/verifier.ts +++ b/packages/synapse-sdk/src/pdp/verifier.ts @@ -46,13 +46,23 @@ export class PDPVerifier { /** * Get the next piece ID for a data set * @param dataSetId - The PDPVerifier data set ID - * @returns The next piece ID (which equals the current piece count) + * @returns The next piece ID to assign (total pieces ever added; does not decrease when pieces are removed) */ async getNextPieceId(dataSetId: number): Promise { const nextPieceId = await this._contract.getNextPieceId(dataSetId) return Number(nextPieceId) } + /** + * Get the count of active pieces (non-zero leaf count) for a data set + * @param dataSetId - The PDPVerifier data set ID + * @returns The number of active pieces in the data set + */ + async getActivePieceCount(dataSetId: number): Promise { + const count = await this._contract.getActivePieceCount(dataSetId) + return Number(count) + } + /** * Get the data set listener (record keeper) * @param dataSetId - The PDPVerifier data set ID diff --git a/packages/synapse-sdk/src/retriever/chain.ts b/packages/synapse-sdk/src/retriever/chain.ts index 5fd43063..7090f1d1 100644 --- a/packages/synapse-sdk/src/retriever/chain.ts +++ b/packages/synapse-sdk/src/retriever/chain.ts @@ -44,7 +44,7 @@ export class ChainRetriever implements PieceRetriever { const dataSets = await this.warmStorageService.getClientDataSetsWithDetails(client) // Filter for live data sets with pieces - const validDataSets = dataSets.filter((ds) => ds.isLive && ds.currentPieceCount > 0) + const validDataSets = dataSets.filter((ds) => ds.isLive && ds.activePieceCount > 0) if (validDataSets.length === 0) { throw createError('ChainRetriever', 'findProviders', `No active data sets with data found for client ${client}`) diff --git a/packages/synapse-sdk/src/storage/context.ts b/packages/synapse-sdk/src/storage/context.ts index a5cd06cd..9abda1c8 100644 --- a/packages/synapse-sdk/src/storage/context.ts +++ b/packages/synapse-sdk/src/storage/context.ts @@ -491,7 +491,18 @@ export class StorageContext { } /** - * Resolve using a specific provider ID + * Resolve the best matching DataSet for a Provider using a specific provider ID + * + * Optimization Strategy: + * Uses `getClientDataSets` fetch followed by batched parallel checks to find + * the best matching data set while minimizing RPC calls. + * + * Selection Logic: + * 1. Filters for datasets belonging to this provider + * 2. Sorts by dataSetId ascending (oldest first) + * 3. Searches in batches (size dynamic based on total count) for metadata match + * 4. Prioritizes datasets with pieces > 0, then falls back to the oldest valid dataset + * 5. Exits early as soon as a non-empty matching dataset is found */ private static async resolveByProviderId( clientAddress: string, @@ -504,14 +515,13 @@ export class StorageContext { // Fetch provider (always) and dataSets (only if not forcing) in parallel const [provider, dataSets] = await Promise.all([ spRegistry.getProvider(providerId), - forceCreateDataSet ? Promise.resolve(null) : warmStorageService.getClientDataSetsWithDetails(clientAddress), + forceCreateDataSet ? Promise.resolve([]) : warmStorageService.getClientDataSets(clientAddress), ]) if (provider == null) { throw createError('StorageContext', 'resolveByProviderId', `Provider ID ${providerId} not found in registry`) } - // If forcing creation, skip the search for existing data sets if (forceCreateDataSet === true) { return { provider, @@ -521,35 +531,85 @@ export class StorageContext { } } - // dataSets is guaranteed non-null here since forceCreateDataSet is false + // Filter for this provider's active datasets + const providerDataSets = dataSets.filter( + (dataSet) => Number.isFinite(dataSet.dataSetId) && dataSet.providerId === provider.id && dataSet.pdpEndEpoch === 0 + ) - // Filter for this provider's data sets with matching metadata - const providerDataSets = ( - dataSets as Awaited> - ).filter((ps) => { - if (ps.providerId !== provider.id || !ps.isLive || !ps.isManaged || ps.pdpEndEpoch !== 0) { - return false - } - // Check if metadata matches - return metadataMatches(ps.metadata, requestedMetadata) + type EvaluatedDataSet = { + dataSetId: number + dataSetMetadata: Record + activePieceCount: number + } + + // Sort ascending by ID (oldest first) for deterministic selection + const sortedDataSets = providerDataSets.sort((a, b) => { + return Number(a.dataSetId) - Number(b.dataSetId) }) - if (providerDataSets.length > 0) { - // Sort by preference: data sets with pieces first, then by ID - const sorted = providerDataSets.sort((a, b) => { - if (a.currentPieceCount > 0 && b.currentPieceCount === 0) return -1 - if (b.currentPieceCount > 0 && a.currentPieceCount === 0) return 1 - return a.pdpVerifierDataSetId - b.pdpVerifierDataSetId - }) + // Batch strategy: 1/3 of total datasets per batch, with min & max, to balance latency vs RPC burst + const MIN_BATCH_SIZE = 50 + const MAX_BATCH_SIZE = 200 + const BATCH_SIZE = Math.min(MAX_BATCH_SIZE, Math.max(MIN_BATCH_SIZE, Math.ceil(sortedDataSets.length / 3), 1)) + let selectedDataSet: EvaluatedDataSet | null = null + + for (let i = 0; i < sortedDataSets.length; i += BATCH_SIZE) { + const batchResults: (EvaluatedDataSet | null)[] = await Promise.all( + sortedDataSets.slice(i, i + BATCH_SIZE).map(async (dataSet) => { + const dataSetId = Number(dataSet.dataSetId) + try { + const [dataSetMetadata, activePieceCount] = await Promise.all([ + warmStorageService.getDataSetMetadata(dataSetId), + warmStorageService.getActivePieceCount(dataSetId), + warmStorageService.validateDataSet(dataSetId), + ]) + + if (!metadataMatches(dataSetMetadata, requestedMetadata)) { + return null + } - // Fetch metadata for existing data set - const dataSetMetadata = await warmStorageService.getDataSetMetadata(sorted[0].pdpVerifierDataSetId) + return { + dataSetId, + dataSetMetadata, + activePieceCount, + } + } catch (error) { + console.warn( + `Skipping data set ${dataSetId} for provider ${providerId}:`, + error instanceof Error ? error.message : String(error) + ) + return null + } + }) + ) + + for (const result of batchResults) { + if (result == null) continue + + // select the first dataset with pieces and break out of the inner loop + if (result.activePieceCount > 0) { + selectedDataSet = result + break + } + + // keep the first (oldest) dataset found so far (no pieces) + if (selectedDataSet == null) { + selectedDataSet = result + } + } + + // early exit if we found a dataset with pieces; break out of the outer loop + if (selectedDataSet != null && selectedDataSet.activePieceCount > 0) { + break + } + } + if (selectedDataSet != null) { return { provider, - dataSetId: sorted[0].pdpVerifierDataSetId, + dataSetId: selectedDataSet.dataSetId, isExisting: true, - dataSetMetadata, + dataSetMetadata: selectedDataSet.dataSetMetadata, } } @@ -629,8 +689,8 @@ export class StorageContext { if (managedDataSets.length > 0 && !forceCreateDataSet) { // Prefer data sets with pieces, sort by ID (older first) const sorted = managedDataSets.sort((a, b) => { - if (a.currentPieceCount > 0 && b.currentPieceCount === 0) return -1 - if (b.currentPieceCount > 0 && a.currentPieceCount === 0) return 1 + if (a.activePieceCount > 0 && b.activePieceCount === 0) return -1 + if (b.activePieceCount > 0 && a.activePieceCount === 0) return 1 return a.pdpVerifierDataSetId - b.pdpVerifierDataSetId }) diff --git a/packages/synapse-sdk/src/test/retriever-chain.test.ts b/packages/synapse-sdk/src/test/retriever-chain.test.ts index 7e5a694d..6fd10fb3 100644 --- a/packages/synapse-sdk/src/test/retriever-chain.test.ts +++ b/packages/synapse-sdk/src/test/retriever-chain.test.ts @@ -572,7 +572,7 @@ describe('ChainRetriever', () => { return [dataSetId !== 1n] // Data set 1 not live }, getDataSetListener: () => [Mocks.ADDRESSES.calibration.warmStorage], - getNextPieceId: (args) => { + getActivePieceCount: (args) => { const [dataSetId] = args return [dataSetId === 2n ? 0n : 1n] // Data set 2 has no pieces }, diff --git a/packages/synapse-sdk/src/test/storage.test.ts b/packages/synapse-sdk/src/test/storage.test.ts index 8fe68887..999f4c72 100644 --- a/packages/synapse-sdk/src/test/storage.test.ts +++ b/packages/synapse-sdk/src/test/storage.test.ts @@ -399,12 +399,37 @@ describe('StorageService', () => { }) it('should prefer data sets with existing pieces', async () => { + const expectedDataSetBase = { + cacheMissRailId: 0n, + cdnRailId: 0n, + clientDataSetId: 0n, + commissionBps: 100n, + dataSetId: 1n, + payee: Mocks.ADDRESSES.serviceProvider1, + payer: Mocks.ADDRESSES.client1, + pdpEndEpoch: 0n, + pdpRailId: 1n, + providerId: 1n, + serviceProvider: Mocks.ADDRESSES.serviceProvider1, + } + const expectedDataSets = [ + { + ...expectedDataSetBase, + dataSetId: 1n, + pdpRailId: 1n, + }, + { + ...expectedDataSetBase, + dataSetId: 2n, + pdpRailId: 2n, + }, + ] server.use( Mocks.JSONRPC({ ...Mocks.presets.basic, pdpVerifier: { ...Mocks.presets.basic.pdpVerifier, - getNextPieceId: (args) => { + getActivePieceCount: (args) => { const [dataSetId] = args if (dataSetId === 2n) { return [2n] @@ -415,43 +440,11 @@ describe('StorageService', () => { }, warmStorageView: { ...Mocks.presets.basic.warmStorageView, - clientDataSets: () => [[1n, 2n]], + getClientDataSets: () => [expectedDataSets], getAllDataSetMetadata: () => [[], []], getDataSet: (args) => { const [dataSetId] = args - if (dataSetId === 1n) { - return [ - { - cacheMissRailId: 0n, - cdnRailId: 0n, - clientDataSetId: 0n, - commissionBps: 100n, - dataSetId: 1n, - payee: Mocks.ADDRESSES.serviceProvider1, - payer: Mocks.ADDRESSES.client1, - pdpEndEpoch: 0n, - pdpRailId: 1n, - providerId: 1n, - serviceProvider: Mocks.ADDRESSES.serviceProvider1, - }, - ] - } else { - return [ - { - cacheMissRailId: 0n, - cdnRailId: 0n, - clientDataSetId: 0n, - commissionBps: 100n, - dataSetId: 2n, - payee: Mocks.ADDRESSES.serviceProvider1, - payer: Mocks.ADDRESSES.client1, - pdpEndEpoch: 0n, - pdpRailId: 2n, - providerId: 1n, - serviceProvider: Mocks.ADDRESSES.serviceProvider1, - }, - ] - } + return [expectedDataSets.find((ds) => ds.dataSetId === dataSetId) ?? ({} as (typeof expectedDataSets)[0])] }, }, }), diff --git a/packages/synapse-sdk/src/test/warm-storage-service.test.ts b/packages/synapse-sdk/src/test/warm-storage-service.test.ts index 7eda0f81..b6e7382d 100644 --- a/packages/synapse-sdk/src/test/warm-storage-service.test.ts +++ b/packages/synapse-sdk/src/test/warm-storage-service.test.ts @@ -255,8 +255,7 @@ describe('WarmStorageService', () => { assert.lengthOf(detailedDataSets, 1) assert.equal(detailedDataSets[0].pdpRailId, 48) assert.equal(detailedDataSets[0].pdpVerifierDataSetId, 242) - assert.equal(detailedDataSets[0].nextPieceId, 2) - assert.equal(detailedDataSets[0].currentPieceCount, 2) + assert.equal(detailedDataSets[0].activePieceCount, 2) assert.isTrue(detailedDataSets[0].isLive) assert.isTrue(detailedDataSets[0].isManaged) }) diff --git a/packages/synapse-sdk/src/types.ts b/packages/synapse-sdk/src/types.ts index bf21ec9c..d65d4482 100644 --- a/packages/synapse-sdk/src/types.ts +++ b/packages/synapse-sdk/src/types.ts @@ -229,10 +229,8 @@ export interface DataSetInfo { export interface EnhancedDataSetInfo extends DataSetInfo { /** PDPVerifier global data set ID */ pdpVerifierDataSetId: number - /** Next piece ID to use when adding pieces */ - nextPieceId: number - /** Current number of pieces in the data set */ - currentPieceCount: number + /** Number of active pieces in the data set (excludes removed pieces) */ + activePieceCount: number /** Whether the data set is live on-chain */ isLive: boolean /** Whether this data set is managed by the current Warm Storage contract */ diff --git a/packages/synapse-sdk/src/warm-storage/service.ts b/packages/synapse-sdk/src/warm-storage/service.ts index 5dcbad7c..049de2d3 100644 --- a/packages/synapse-sdk/src/warm-storage/service.ts +++ b/packages/synapse-sdk/src/warm-storage/service.ts @@ -331,6 +331,7 @@ export class WarmStorageService { clientDataSetId: ds.clientDataSetId, pdpEndEpoch: Number(ds.pdpEndEpoch), providerId: Number(ds.providerId), + dataSetId: Number(ds.dataSetId), })) } catch (error) { throw new Error(`Failed to get client data sets: ${error instanceof Error ? error.message : String(error)}`) @@ -374,14 +375,13 @@ export class WarmStorageService { return null // Will be filtered out } - // Get next piece ID only if the data set is live - const nextPieceId = isLive ? await pdpVerifier.getNextPieceId(pdpVerifierDataSetId) : 0n + // Get active piece count only if the data set is live + const activePieceCount = isLive ? await pdpVerifier.getActivePieceCount(pdpVerifierDataSetId) : 0 return { ...base, pdpVerifierDataSetId, - nextPieceId: Number(nextPieceId), - currentPieceCount: Number(nextPieceId), + activePieceCount, isLive, isManaged, withCDN: base.cdnRailId > 0 && METADATA_KEYS.WITH_CDN in metadata, @@ -435,6 +435,27 @@ export class WarmStorageService { } } + /** + * Get the next piece ID for a dataset (total pieces ever added; does not decrease when pieces are removed) + * @param dataSetId - The PDPVerifier data set ID + * @returns The next piece ID as a number + */ + async getNextPieceId(dataSetId: number): Promise { + const pdpVerifier = this._getPDPVerifier() + const nextPieceId = await pdpVerifier.getNextPieceId(dataSetId) + return nextPieceId + } + + /** + * Get the count of active pieces in a dataset (excludes removed pieces) + * @param dataSetId - The PDPVerifier data set ID + * @returns The number of active pieces + */ + async getActivePieceCount(dataSetId: number): Promise { + const pdpVerifier = this._getPDPVerifier() + return await pdpVerifier.getActivePieceCount(dataSetId) + } + /** * Verify that a data set creation transaction was successful * This checks both the transaction status and on-chain data set state diff --git a/utils/example-piece-details.js b/utils/example-piece-details.js index c8d3a1d2..be1a6ac8 100755 --- a/utils/example-piece-details.js +++ b/utils/example-piece-details.js @@ -59,8 +59,8 @@ async function main() { return } - // Find a data set with pieces (currentPieceCount > 0) - const dataSetWithPieces = dataSets.find((ds) => ds.currentPieceCount > 0) + // Find a data set with pieces (activePieceCount > 0) + const dataSetWithPieces = dataSets.find((ds) => ds.activePieceCount > 0) if (!dataSetWithPieces) { console.log('❌ No data sets with pieces found. Please upload some data first using example-storage-simple.js') return @@ -70,7 +70,7 @@ async function main() { dataSetInfo = { dataSetId: dataSetWithPieces.pdpVerifierDataSetId, providerId: dataSetWithPieces.providerId, - pieceCount: dataSetWithPieces.currentPieceCount, + pieceCount: dataSetWithPieces.activePieceCount, clientDataSetId: dataSetWithPieces.clientDataSetId, isLive: dataSetWithPieces.isLive, withCDN: dataSetWithPieces.withCDN,