Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions backend/bin/deal-observer-backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,31 @@ import { fetchDealWithHighestActivatedEpoch, countStoredActiveDeals, observeBuil
import { countStoredActiveDealsWithUnresolvedPayloadCid, resolvePayloadCids, countRevertedActiveDeals, countStoredActiveDealsWithPayloadState } from '../lib/resolve-payload-cids.js'
import { findAndSubmitUnsubmittedDeals, submitDealsToSparkApi } from '../lib/spark-api-submit-deals.js'
import { payloadCidRequest } from '../lib/piece-indexer-service.js'
// @ts-ignore
import { ethers } from 'ethers'
import {
getIndexProviderPeerId,
MINER_TO_PEERID_CONTRACT_ADDRESS, MINER_TO_PEERID_CONTRACT_ABI
// @ts-ignore
} from 'index-provider-peer-id'
/** @import {Queryable} from '@filecoin-station/deal-observer-db' */
/** @import {MakeRpcRequest, MakePayloadCidRequest} from '../lib/typings.d.ts' */

const {
INFLUXDB_TOKEN,
SPARK_API_BASE_URL,
SPARK_API_TOKEN,
SPARK_API_SUBMIT_DEALS_BATCH_SIZE = 100
SPARK_API_SUBMIT_DEALS_BATCH_SIZE = 100,
RPC_URL,
GLIF_TOKEN
} = process.env

if (!INFLUXDB_TOKEN) {
console.error('INFLUXDB_TOKEN not provided. Telemetry will not be recorded.')
}
assert(SPARK_API_BASE_URL, 'SPARK_API_BASE_URL required')
assert(SPARK_API_TOKEN, 'SPARK_API_TOKEN required')
assert(RPC_URL, 'RPC_URL required')

const LOOP_INTERVAL = 10 * 1000

Expand Down Expand Up @@ -127,12 +137,28 @@ const sparkApiSubmitDealsLoop = async (pgPool, { sparkApiBaseUrl, sparkApiToken,
* @param {Queryable} pgPool
*/
export const resolvePayloadCidsLoop = async (makeRpcRequest, makePayloadCidRequest, pgPool) => {
// Initialize contract using your RPC configuration
const fetchRequest = new ethers.FetchRequest(RPC_URL)
fetchRequest.setHeader('Authorization', `Bearer ${GLIF_TOKEN}`)
const provider = new ethers.JsonRpcProvider(fetchRequest)
const smartContract = new ethers.Contract(
MINER_TO_PEERID_CONTRACT_ADDRESS,
MINER_TO_PEERID_CONTRACT_ABI,
provider
)
const getPeerId = async (/** @type {number} */ minerId) => {
return await getIndexProviderPeerId(
`f0${minerId}`,
smartContract,
{ rpcFn: makeRpcRequest }
)
}
while (true) {
const start = Date.now()
// Maximum number of deals to resolve payload CIDs for in one loop iteration
const maxDeals = 1000
try {
const numOfPayloadCidsResolved = await resolvePayloadCids(makeRpcRequest, makePayloadCidRequest, pgPool, maxDeals)
const numOfPayloadCidsResolved = await resolvePayloadCids(getPeerId, makePayloadCidRequest, pgPool, maxDeals)
const totalNumOfUnresolvedPayloadCids = await countStoredActiveDealsWithUnresolvedPayloadCid(pgPool)
const totalNumOfDealsWithPayloadCidResolved = await countStoredActiveDealsWithPayloadState(pgPool, 'PAYLOAD_CID_RESOLVED')
const totalNumOfDealsWithPayloadCidUnresolved = await countStoredActiveDealsWithPayloadState(pgPool, 'PAYLOAD_CID_UNRESOLVED')
Expand Down
3 changes: 2 additions & 1 deletion backend/lib/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ if (RPC_URL.includes('glif') && GLIF_TOKEN) {
export {
RPC_URL,
rpcHeaders,
PIECE_INDEXER_URL
PIECE_INDEXER_URL,
GLIF_TOKEN
}
8 changes: 4 additions & 4 deletions backend/lib/resolve-payload-cids.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { loadDeals } from './deal-observer.js'
import * as util from 'node:util'
import { getMinerPeerId } from './rpc-service/service.js'
import { PayloadRetrievabilityState } from '@filecoin-station/deal-observer-db/lib/types.js'

/** @import {Queryable} from '@filecoin-station/deal-observer-db' */
Expand All @@ -11,16 +10,17 @@ const THREE_DAYS_IN_MILLISECONDS = 1000 * 60 * 60 * 24 * 3

/**
*
* @param {import('./typings.js').MakeRpcRequest} makeRpcRequest
* @param {import('./typings.js').GetIndexProviderPeerId} getIndexProviderPeerId
* @param {import('./typings.js').MakePayloadCidRequest} makePayloadCidRequest
* @param {Queryable} pgPool
* @param {number} maxDeals
* @param {number} now - The current timestamp in milliseconds
* @returns {Promise<number>}
*/
export const resolvePayloadCids = async (makeRpcRequest, makePayloadCidRequest, pgPool, maxDeals, now = Date.now()) => {
export const resolvePayloadCids = async (getIndexProviderPeerId, makePayloadCidRequest, pgPool, maxDeals, now = Date.now()) => {
let payloadCidsResolved = 0
for (const deal of await fetchDealsWithUnresolvedPayloadCid(pgPool, maxDeals, new Date(now - THREE_DAYS_IN_MILLISECONDS))) {
const minerPeerId = await getMinerPeerId(deal.miner_id, makeRpcRequest)
const minerPeerId = (await getIndexProviderPeerId(deal.miner_id)).peerId
const payloadCid = await makePayloadCidRequest(minerPeerId, deal.piece_cid)
if (payloadCid) deal.payload_cid = payloadCid
if (!deal.payload_cid) {
Expand Down
22 changes: 0 additions & 22 deletions backend/lib/rpc-service/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,28 +96,6 @@ export async function getChainHead (makeRpcRequest) {
}
}

/**
* @param {number} minerId
* @param {MakeRpcRequest} makeRpcRequest
* @returns {Promise<string>}
*/
export async function getMinerPeerId (minerId, makeRpcRequest) {
/** @typedef {{
* PeerId: string;
* }} MinerInfo
*/
try {
const params = getMinerInfoCallParams(minerId)
const res = /** @type {MinerInfo} */(await makeRpcRequest('Filecoin.StateMinerInfo', params))
if (!res || !res.PeerId) {
throw Error(`Failed to get peer ID for miner ${minerId}, result: ${res}`)
}
return res.PeerId
} catch (err) {
throw Error(`Failed to get peer ID for miner ${minerId}.`, { cause: err })
}
}

/**
* @param {number} blockHeight
* @param {string} eventTypeString
Expand Down
5 changes: 5 additions & 0 deletions backend/lib/typings.d.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
import { ethers } from 'ethers';

export type MakeRpcRequest = (method: string, params: unknown[]) => Promise<unknown>;
export type MakePayloadCidRequest = (providerId:string,pieceCid:string) => Promise<string|null>;
export type GetIndexProviderPeerId = (
minerId:number,
) => Promise<{ peerId: string, source: string }>;
1 change: 1 addition & 0 deletions backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"@sentry/node": "^9.10.1",
"@sinclair/typebox": "^0.34.31",
"debug": "^4.4.0",
"index-provider-peer-id": "^1.0.0",
"multiformats": "^13.3.2",
"p-retry": "^6.2.1",
"pg": "^8.14.1",
Expand Down
41 changes: 23 additions & 18 deletions backend/test/resolve-payload-cids.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@ describe('deal-observer-backend resolve payload CIDs', () => {
assert(typeof filter.toHeight === 'number', 'filter.toHeight must be a number')
return parse(JSON.stringify(rawActorEventTestData)).filter((/** @type {{ height: number; }} */ e) => e.height >= filter.fromHeight && e.height <= filter.toHeight)
}
case 'Filecoin.StateMinerInfo':
assert(typeof params[0] === 'string', 'params[0] must be a string')
return minerPeerIds.get(params[0])
default:
console.error('Unknown method')
}
}
const getPeerId = async (/** @type {number} */ minerId) => {
const peerId = minerPeerIds.get(`f0${minerId}`)?.PeerId
if (!peerId) {
throw new Error(`Peer ID not found for miner ID: ${minerId}`)
}
return Promise.resolve({ peerId, source: 'TEST' })
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either remove the async keyword or make it return { peerId, source.. }, having both is redundant (you're creating a promise that's resolving with a promise that's resolving with the value).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 330d290

/**
* @type {import('@filecoin-station/deal-observer-db').PgPool}}
* */
Expand All @@ -60,7 +64,7 @@ describe('deal-observer-backend resolve payload CIDs', () => {
)
})

it('piece indexer loop function fetches deals where there exists no payload yet and updates the database entry', async (t) => {
it('piece indexer loop function fetches deals where there exists no payload yet and updates the database entry', async () => {
const resolvePayloadCidCalls = []
/**
* @type {import('../lib/typings.d.ts').MakePayloadCidRequest}
Expand All @@ -75,7 +79,7 @@ describe('deal-observer-backend resolve payload CIDs', () => {
(await pgPool.query('SELECT * FROM active_deals WHERE payload_cid IS NULL')).rows.length,
336
)
await resolvePayloadCids(makeRpcRequest, makePayloadCidRequest, pgPool, 10000)
await resolvePayloadCids(getPeerId, makePayloadCidRequest, pgPool, 10000)
assert.strictEqual(resolvePayloadCidCalls.length, 336)
assert.strictEqual(
(await pgPool.query('SELECT * FROM active_deals WHERE payload_cid IS NULL')).rows.length,
Expand All @@ -96,7 +100,7 @@ describe('deal-observer-backend resolve payload CIDs', () => {
return payloadCid ? payloadCid.payloadCid : null
}

await resolvePayloadCids(makeRpcRequest, resolvePayloadCid, pgPool, 10000)
await resolvePayloadCids(getPeerId, resolvePayloadCid, pgPool, 10000)
unresolvedPayloadCids = await countStoredActiveDealsWithUnresolvedPayloadCid(pgPool)
assert.strictEqual(unresolvedPayloadCids, 85n)
})
Expand All @@ -110,9 +114,6 @@ describe('deal-observer-backend piece indexer payload retrieval', () => {
const payloadCid = 'PAYLOAD_CID'
const minerPeerId = 'MINER_PEER_ID'
const now = Date.now()
const fetchMinerId = async () => {
return { PeerId: minerPeerId }
}
/**
* @param {Static<typeof ActiveDeal >[]} activeDeals
**/
Expand Down Expand Up @@ -179,6 +180,10 @@ describe('deal-observer-backend piece indexer payload retrieval', () => {
reverted: false
})

const getPeerId = async (/** @type {number} */ minerId) => {
return Promise.resolve({ peerId: minerPeerId, source: 'TEST' })
}

before(async () => {
pgPool = await createPgPool()
await migrateWithPgClient(pgPool)
Expand All @@ -192,7 +197,7 @@ describe('deal-observer-backend piece indexer payload retrieval', () => {
await pgPool.query('DELETE FROM active_deals')
await pgPool.query('ALTER SEQUENCE active_deals_id_seq RESTART WITH 1')
})
it('piece indexer does not retry to fetch unresolved payloads if the last retrieval was too recent', async (t) => {
it('piece indexer does not retry to fetch unresolved payloads if the last retrieval was too recent', async () => {
const returnPayload = false
let payloadsCalled = 0
const resolvePayloadCid = async () => {
Expand All @@ -204,18 +209,18 @@ describe('deal-observer-backend piece indexer payload retrieval', () => {
const expectedDealDbEntry = { id: 1, ...DEFAULT_ACTIVE_DEAL }
assert.deepStrictEqual((await loadDeals(pgPool, 'SELECT * FROM active_deals')), [expectedDealDbEntry])
// The payload is unretrievable and the last retrieval timestamp should be updated
await resolvePayloadCids(fetchMinerId, resolvePayloadCid, pgPool, 10000, now)
await resolvePayloadCids(getPeerId, resolvePayloadCid, pgPool, 10000, now)
// The timestamp on when the last retrieval of the payload was, was not yet set, so the piece indexer will try to fetch the payload
assert.strictEqual(payloadsCalled, 1)
expectedDealDbEntry.last_payload_retrieval_attempt = new Date(now)
expectedDealDbEntry.payload_retrievability_state = PayloadRetrievabilityState.Unresolved
assert.deepStrictEqual((await loadDeals(pgPool, 'SELECT * FROM active_deals')), [expectedDealDbEntry])
// If we retry now without changing the field last_payload_retrieval_attempt the function for calling payload should not be called
await resolvePayloadCids(fetchMinerId, resolvePayloadCid, pgPool, 10000, now)
await resolvePayloadCids(getPeerId, resolvePayloadCid, pgPool, 10000, now)
assert.strictEqual(payloadsCalled, 1)
})

it('piece indexer sets the payload to be unresolvable if the second attempt fails', async (t) => {
it('piece indexer sets the payload to be unresolvable if the second attempt fails', async () => {
const returnPayload = false
let payloadsCalled = 0
const resolvePayloadCid = async () => {
Expand All @@ -229,7 +234,7 @@ describe('deal-observer-backend piece indexer payload retrieval', () => {
})

await withUniqueActiveDeals([deal])
await resolvePayloadCids(fetchMinerId, resolvePayloadCid, pgPool, 10000, now)
await resolvePayloadCids(getPeerId, resolvePayloadCid, pgPool, 10000, now)
assert.strictEqual(payloadsCalled, 1)
// This is the second attempt that failed to fetch the payload CID so the deal should be marked as unretrievable
const expectedDealDbEntry = {
Expand All @@ -240,11 +245,11 @@ describe('deal-observer-backend piece indexer payload retrieval', () => {
}
assert.deepStrictEqual((await loadDeals(pgPool, 'SELECT * FROM active_deals')), [expectedDealDbEntry])
// Now the piece indexer should no longer call the payload request for this deal
await resolvePayloadCids(fetchMinerId, resolvePayloadCid, pgPool, 10000, now)
await resolvePayloadCids(getPeerId, resolvePayloadCid, pgPool, 10000, now)
assert.strictEqual(payloadsCalled, 1)
})

it('piece indexer correctly udpates the payloads if the retry succeeeds', async (t) => {
it('piece indexer correctly udpates the payloads if the retry succeeds', async () => {
const returnPayload = true
let payloadsCalled = 0
const resolvePayloadCid = async () => {
Expand All @@ -258,7 +263,7 @@ describe('deal-observer-backend piece indexer payload retrieval', () => {
})

await withUniqueActiveDeals([deal])
await resolvePayloadCids(fetchMinerId, resolvePayloadCid, pgPool, 10000, now)
await resolvePayloadCids(getPeerId, resolvePayloadCid, pgPool, 10000, now)
assert.strictEqual(payloadsCalled, 1)
const expectedDealDbEntry = {
id: 1,
Expand All @@ -271,7 +276,7 @@ describe('deal-observer-backend piece indexer payload retrieval', () => {
assert.deepStrictEqual((await loadDeals(pgPool, 'SELECT * FROM active_deals')), [expectedDealDbEntry])

// Now the piece indexer should no longer call the payload request for this deal
await resolvePayloadCids(fetchMinerId, resolvePayloadCid, pgPool, 10000, now)
await resolvePayloadCids(getPeerId, resolvePayloadCid, pgPool, 10000, now)
assert.strictEqual(payloadsCalled, 1)
})

Expand Down
Loading