Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ for (const ds of dataSets) {
console.log(`Dataset ${ds.pdpVerifierDataSetId}:`, {
live: ds.isLive,
cdn: ds.withCDN,
pieces: ds.currentPieceCount,
pieces: ds.activePieceCount,
metadata: ds.metadata
});
}
Expand Down
1 change: 1 addition & 0 deletions packages/synapse-core/src/mocks/jsonrpc/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ export const presets = {
dataSetLive: () => [true],
getDataSetListener: () => [ADDRESSES.calibration.warmStorage],
getNextPieceId: () => [2n],
getActivePieceCount: () => [2n],
getActivePieces: () => [[], [], false],
getDataSetStorageProvider: () => [ADDRESSES.serviceProvider1, ADDRESSES.zero],
getDataSetLeafCount: () => [0n],
Expand Down
10 changes: 10 additions & 0 deletions packages/synapse-core/src/mocks/jsonrpc/pdp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import * as Abis from '../../abis/index.ts'
import type { AbiToType, JSONRPCOptions } from './types.ts'

export type getNextPieceId = ExtractAbiFunction<typeof Abis.pdp, 'getNextPieceId'>
export type getActivePieceCount = ExtractAbiFunction<typeof Abis.pdp, 'getActivePieceCount'>
export type dataSetLive = ExtractAbiFunction<typeof Abis.pdp, 'dataSetLive'>
export type getDataSetListener = ExtractAbiFunction<typeof Abis.pdp, 'getDataSetListener'>
export type getActivePieces = ExtractAbiFunction<typeof Abis.pdp, 'getActivePieces'>
Expand All @@ -17,6 +18,7 @@ export interface PDPVerifierOptions {
dataSetLive?: (args: AbiToType<dataSetLive['inputs']>) => AbiToType<dataSetLive['outputs']>
getDataSetListener?: (args: AbiToType<getDataSetListener['inputs']>) => AbiToType<getDataSetListener['outputs']>
getNextPieceId?: (args: AbiToType<getNextPieceId['inputs']>) => AbiToType<getNextPieceId['outputs']>
getActivePieceCount?: (args: AbiToType<getActivePieceCount['inputs']>) => AbiToType<getActivePieceCount['outputs']>
getActivePieces?: (args: AbiToType<getActivePieces['inputs']>) => AbiToType<getActivePieces['outputs']>
getDataSetStorageProvider?: (
args: AbiToType<getDataSetStorageProvider['inputs']>
Expand Down Expand Up @@ -65,6 +67,14 @@ export function pdpVerifierCallHandler(data: Hex, options: JSONRPCOptions): Hex
Abis.pdp.find((abi) => abi.type === 'function' && abi.name === 'getNextPieceId')!.outputs,
options.pdpVerifier.getNextPieceId(args)
)
case 'getActivePieceCount':
if (!options.pdpVerifier?.getActivePieceCount) {
throw new Error('PDP Verifier: getActivePieceCount is not defined')
}
return encodeAbiParameters(
Abis.pdp.find((abi) => abi.type === 'function' && abi.name === 'getActivePieceCount')!.outputs,
options.pdpVerifier.getActivePieceCount(args)
)
case 'getActivePieces': {
if (!options.pdpVerifier?.getActivePieces) {
throw new Error('PDP Verifier: getActivePieces is not defined')
Expand Down
12 changes: 11 additions & 1 deletion packages/synapse-sdk/src/pdp/verifier.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,23 @@ export class PDPVerifier {
/**
* Get the next piece ID for a data set
* @param dataSetId - The PDPVerifier data set ID
* @returns The next piece ID (which equals the current piece count)
* @returns The next piece ID to assign (total pieces ever added; does not decrease when pieces are removed)
*/
async getNextPieceId(dataSetId: number): Promise<number> {
const nextPieceId = await this._contract.getNextPieceId(dataSetId)
return Number(nextPieceId)
}

/**
* Get the count of active pieces (non-zero leaf count) for a data set
* @param dataSetId - The PDPVerifier data set ID
* @returns The number of active pieces in the data set
*/
async getActivePieceCount(dataSetId: number): Promise<number> {
const count = await this._contract.getActivePieceCount(dataSetId)
return Number(count)
}

/**
* Get the data set listener (record keeper)
* @param dataSetId - The PDPVerifier data set ID
Expand Down
2 changes: 1 addition & 1 deletion packages/synapse-sdk/src/retriever/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export class ChainRetriever implements PieceRetriever {
const dataSets = await this.warmStorageService.getClientDataSetsWithDetails(client)

// Filter for live data sets with pieces
const validDataSets = dataSets.filter((ds) => ds.isLive && ds.currentPieceCount > 0)
const validDataSets = dataSets.filter((ds) => ds.isLive && ds.activePieceCount > 0)

if (validDataSets.length === 0) {
throw createError('ChainRetriever', 'findProviders', `No active data sets with data found for client ${client}`)
Expand Down
112 changes: 86 additions & 26 deletions packages/synapse-sdk/src/storage/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,18 @@ export class StorageContext {
}

/**
* Resolve using a specific provider ID
* Resolve the best matching DataSet for a Provider using a specific provider ID
*
* Optimization Strategy:
* Uses `getClientDataSets` fetch followed by batched parallel checks to find
* the best matching data set while minimizing RPC calls.
*
* Selection Logic:
* 1. Filters for datasets belonging to this provider
* 2. Sorts by dataSetId ascending (oldest first)
* 3. Searches in batches (size dynamic based on total count) for metadata match
* 4. Prioritizes datasets with pieces > 0, then falls back to the oldest valid dataset
* 5. Exits early as soon as a non-empty matching dataset is found
*/
private static async resolveByProviderId(
clientAddress: string,
Expand All @@ -504,14 +515,13 @@ export class StorageContext {
// Fetch provider (always) and dataSets (only if not forcing) in parallel
const [provider, dataSets] = await Promise.all([
spRegistry.getProvider(providerId),
forceCreateDataSet ? Promise.resolve(null) : warmStorageService.getClientDataSetsWithDetails(clientAddress),
forceCreateDataSet ? Promise.resolve([]) : warmStorageService.getClientDataSets(clientAddress),
])

if (provider == null) {
throw createError('StorageContext', 'resolveByProviderId', `Provider ID ${providerId} not found in registry`)
}

// If forcing creation, skip the search for existing data sets
if (forceCreateDataSet === true) {
return {
provider,
Expand All @@ -521,35 +531,85 @@ export class StorageContext {
}
}

// dataSets is guaranteed non-null here since forceCreateDataSet is false
// Filter for this provider's active datasets
const providerDataSets = dataSets.filter(
(dataSet) => Number.isFinite(dataSet.dataSetId) && dataSet.providerId === provider.id && dataSet.pdpEndEpoch === 0
)

// Filter for this provider's data sets with matching metadata
const providerDataSets = (
dataSets as Awaited<ReturnType<typeof warmStorageService.getClientDataSetsWithDetails>>
).filter((ps) => {
if (ps.providerId !== provider.id || !ps.isLive || !ps.isManaged || ps.pdpEndEpoch !== 0) {
return false
}
// Check if metadata matches
return metadataMatches(ps.metadata, requestedMetadata)
type EvaluatedDataSet = {
dataSetId: number
dataSetMetadata: Record<string, string>
activePieceCount: number
}

// Sort ascending by ID (oldest first) for deterministic selection
const sortedDataSets = providerDataSets.sort((a, b) => {
return Number(a.dataSetId) - Number(b.dataSetId)
})

if (providerDataSets.length > 0) {
// Sort by preference: data sets with pieces first, then by ID
const sorted = providerDataSets.sort((a, b) => {
if (a.currentPieceCount > 0 && b.currentPieceCount === 0) return -1
if (b.currentPieceCount > 0 && a.currentPieceCount === 0) return 1
return a.pdpVerifierDataSetId - b.pdpVerifierDataSetId
})
// Batch strategy: 1/3 of total datasets per batch, with min & max, to balance latency vs RPC burst
const MIN_BATCH_SIZE = 50
const MAX_BATCH_SIZE = 200
const BATCH_SIZE = Math.min(MAX_BATCH_SIZE, Math.max(MIN_BATCH_SIZE, Math.ceil(sortedDataSets.length / 3), 1))
let selectedDataSet: EvaluatedDataSet | null = null

for (let i = 0; i < sortedDataSets.length; i += BATCH_SIZE) {
const batchResults: (EvaluatedDataSet | null)[] = await Promise.all(
sortedDataSets.slice(i, i + BATCH_SIZE).map(async (dataSet) => {
const dataSetId = Number(dataSet.dataSetId)
try {
const [dataSetMetadata, activePieceCount] = await Promise.all([
warmStorageService.getDataSetMetadata(dataSetId),
warmStorageService.getActivePieceCount(dataSetId),
warmStorageService.validateDataSet(dataSetId),
])

if (!metadataMatches(dataSetMetadata, requestedMetadata)) {
return null
}

// Fetch metadata for existing data set
const dataSetMetadata = await warmStorageService.getDataSetMetadata(sorted[0].pdpVerifierDataSetId)
return {
dataSetId,
dataSetMetadata,
activePieceCount,
}
} catch (error) {
console.warn(
`Skipping data set ${dataSetId} for provider ${providerId}:`,
error instanceof Error ? error.message : String(error)
)
return null
}
})
)

for (const result of batchResults) {
if (result == null) continue

// select the first dataset with pieces and break out of the inner loop
if (result.activePieceCount > 0) {
selectedDataSet = result
break
}

// keep the first (oldest) dataset found so far (no pieces)
if (selectedDataSet == null) {
selectedDataSet = result
}
}

// early exit if we found a dataset with pieces; break out of the outer loop
if (selectedDataSet != null && selectedDataSet.activePieceCount > 0) {
break
}
}

if (selectedDataSet != null) {
return {
provider,
dataSetId: sorted[0].pdpVerifierDataSetId,
dataSetId: selectedDataSet.dataSetId,
isExisting: true,
dataSetMetadata,
dataSetMetadata: selectedDataSet.dataSetMetadata,
}
}

Expand Down Expand Up @@ -629,8 +689,8 @@ export class StorageContext {
if (managedDataSets.length > 0 && !forceCreateDataSet) {
// Prefer data sets with pieces, sort by ID (older first)
const sorted = managedDataSets.sort((a, b) => {
if (a.currentPieceCount > 0 && b.currentPieceCount === 0) return -1
if (b.currentPieceCount > 0 && a.currentPieceCount === 0) return 1
if (a.activePieceCount > 0 && b.activePieceCount === 0) return -1
if (b.activePieceCount > 0 && a.activePieceCount === 0) return 1
return a.pdpVerifierDataSetId - b.pdpVerifierDataSetId
})

Expand Down
2 changes: 1 addition & 1 deletion packages/synapse-sdk/src/test/retriever-chain.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ describe('ChainRetriever', () => {
return [dataSetId !== 1n] // Data set 1 not live
},
getDataSetListener: () => [Mocks.ADDRESSES.calibration.warmStorage],
getNextPieceId: (args) => {
getActivePieceCount: (args) => {
const [dataSetId] = args
return [dataSetId === 2n ? 0n : 1n] // Data set 2 has no pieces
},
Expand Down
63 changes: 28 additions & 35 deletions packages/synapse-sdk/src/test/storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -399,12 +399,37 @@ describe('StorageService', () => {
})

it('should prefer data sets with existing pieces', async () => {
const expectedDataSetBase = {
cacheMissRailId: 0n,
cdnRailId: 0n,
clientDataSetId: 0n,
commissionBps: 100n,
dataSetId: 1n,
payee: Mocks.ADDRESSES.serviceProvider1,
payer: Mocks.ADDRESSES.client1,
pdpEndEpoch: 0n,
pdpRailId: 1n,
providerId: 1n,
serviceProvider: Mocks.ADDRESSES.serviceProvider1,
}
const expectedDataSets = [
{
...expectedDataSetBase,
dataSetId: 1n,
pdpRailId: 1n,
},
{
...expectedDataSetBase,
dataSetId: 2n,
pdpRailId: 2n,
},
]
server.use(
Mocks.JSONRPC({
...Mocks.presets.basic,
pdpVerifier: {
...Mocks.presets.basic.pdpVerifier,
getNextPieceId: (args) => {
getActivePieceCount: (args) => {
const [dataSetId] = args
if (dataSetId === 2n) {
return [2n]
Expand All @@ -415,43 +440,11 @@ describe('StorageService', () => {
},
warmStorageView: {
...Mocks.presets.basic.warmStorageView,
clientDataSets: () => [[1n, 2n]],
getClientDataSets: () => [expectedDataSets],
getAllDataSetMetadata: () => [[], []],
getDataSet: (args) => {
const [dataSetId] = args
if (dataSetId === 1n) {
return [
{
cacheMissRailId: 0n,
cdnRailId: 0n,
clientDataSetId: 0n,
commissionBps: 100n,
dataSetId: 1n,
payee: Mocks.ADDRESSES.serviceProvider1,
payer: Mocks.ADDRESSES.client1,
pdpEndEpoch: 0n,
pdpRailId: 1n,
providerId: 1n,
serviceProvider: Mocks.ADDRESSES.serviceProvider1,
},
]
} else {
return [
{
cacheMissRailId: 0n,
cdnRailId: 0n,
clientDataSetId: 0n,
commissionBps: 100n,
dataSetId: 2n,
payee: Mocks.ADDRESSES.serviceProvider1,
payer: Mocks.ADDRESSES.client1,
pdpEndEpoch: 0n,
pdpRailId: 2n,
providerId: 1n,
serviceProvider: Mocks.ADDRESSES.serviceProvider1,
},
]
}
return [expectedDataSets.find((ds) => ds.dataSetId === dataSetId) ?? ({} as (typeof expectedDataSets)[0])]
},
},
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,7 @@ describe('WarmStorageService', () => {
assert.lengthOf(detailedDataSets, 1)
assert.equal(detailedDataSets[0].pdpRailId, 48)
assert.equal(detailedDataSets[0].pdpVerifierDataSetId, 242)
assert.equal(detailedDataSets[0].nextPieceId, 2)
assert.equal(detailedDataSets[0].currentPieceCount, 2)
assert.equal(detailedDataSets[0].activePieceCount, 2)
assert.isTrue(detailedDataSets[0].isLive)
assert.isTrue(detailedDataSets[0].isManaged)
})
Expand Down
6 changes: 2 additions & 4 deletions packages/synapse-sdk/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,8 @@ export interface DataSetInfo {
export interface EnhancedDataSetInfo extends DataSetInfo {
/** PDPVerifier global data set ID */
pdpVerifierDataSetId: number
/** Next piece ID to use when adding pieces */
nextPieceId: number
/** Current number of pieces in the data set */
currentPieceCount: number
/** Number of active pieces in the data set (excludes removed pieces) */
activePieceCount: number
/** Whether the data set is live on-chain */
isLive: boolean
/** Whether this data set is managed by the current Warm Storage contract */
Expand Down
Loading