diff --git a/docs/action-queue.md b/docs/action-queue.md index 5a882c4d7..e2591e599 100644 --- a/docs/action-queue.md +++ b/docs/action-queue.md @@ -191,6 +191,7 @@ enum ActionStatus { queued approved pending + deploying success failed canceled diff --git a/docs/networks/arbitrum-one.md b/docs/networks/arbitrum-one.md index 69ffff803..5bf940edf 100644 --- a/docs/networks/arbitrum-one.md +++ b/docs/networks/arbitrum-one.md @@ -49,8 +49,8 @@ Other network contracts can be found in [graphprotocol/contracts](https://github | `INDEXER_AGENT_NETWORK_SUBGRAPH_ENDPOINT` | `--network-subgraph-endpoint` | `https://gateway-arbitrum.network.thegraph.com/api/[api-key]/subgraphs/id/DZz4kDTdmzWLWsV373w2bSmoar3umKKH9y82SUKr5qmp` | | `INDEXER_AGENT_EPOCH_SUBGRAPH_DEPLOYMENT` | `--epoch-subgraph-deployment` | [![Dynamic JSON Badge](https://img.shields.io/badge/dynamic/json?url=https%3A%2F%2Fthegraph.com%2Fexplorer%2F_next%2Fdata%2F5PBypsdmUEy39BcWLsyp0%2Fsubgraphs%2F4KFYqUWRTZQ9gn7GPHC6YQ2q15chJfVrX43ezYcwkgxB.json%3Fview%3DAbout%26chain%3Darbitrum-one%26id%3D4KFYqUWRTZQ9gn7GPHC6YQ2q15chJfVrX43ezYcwkgxB&query=%24.pageProps.subgraph.currentVersion.subgraphDeployment.ipfsHash&label=deployment-id)](https://thegraph.com/explorer/subgraphs/4KFYqUWRTZQ9gn7GPHC6YQ2q15chJfVrX43ezYcwkgxB) | | `INDEXER_AGENT_EPOCH_SUBGRAPH_ENDPOINT` | `--epoch-subgraph-endpoint` | `https://gateway-arbitrum.network.thegraph.com/api/[api-key]/subgraphs/id/4KFYqUWRTZQ9gn7GPHC6YQ2q15chJfVrX43ezYcwkgxB` | -| `INDEXER_AGENT_TAP_SUBGRAPH_DEPLOYMENT` | `--tap-subgraph-deployment` | [![Dynamic JSON Badge](https://img.shields.io/badge/dynamic/json?url=https%3A%2F%2Fthegraph.com%2Fexplorer%2F_next%2Fdata%2F5PBypsdmUEy39BcWLsyp0%2Fsubgraphs%2F4sukbNVTzGELnhdnpyPqsf1QqtzNHEYKKmJkgaT8z6M1.json%3Fview%3DAbout%26chain%3Darbitrum-one%26id%3D4sukbNVTzGELnhdnpyPqsf1QqtzNHEYKKmJkgaT8z6M1&query=%24.pageProps.subgraph.currentVersion.subgraphDeployment.ipfsHash&label=deployment-id)](https://thegraph.com/explorer/subgraphs/4sukbNVTzGELnhdnpyPqsf1QqtzNHEYKKmJkgaT8z6M1) | -| `INDEXER_AGENT_TAP_SUBGRAPH_ENDPOINT` | `--tap-subgraph-endpoint` | `https://gateway-arbitrum.network.thegraph.com/api/[api-key]/subgraphs/id/4sukbNVTzGELnhdnpyPqsf1QqtzNHEYKKmJkgaT8z6M1` | +| `INDEXER_AGENT_TAP_SUBGRAPH_ENDPOINT` | `--tap-subgraph-endpoint` | `https://gateway-arbitrum.network.thegraph.com/api/[api-key]/subgraphs/id/4sukbNVTzGELnhdnpyPqsf1QqtzNHEYKKmJkgaT8z6M1` | +| `INDEXER_AGENT_IPFS_ENDPOINT` | `--ipfs-endpoint` | `https://ipfs.network.thegraph.com` | In order to avoid collecting or claiming query fees below a certain threshold diff --git a/docs/networks/arbitrum-sepolia.md b/docs/networks/arbitrum-sepolia.md index b1fe7ab40..c42bb4f6f 100644 --- a/docs/networks/arbitrum-sepolia.md +++ b/docs/networks/arbitrum-sepolia.md @@ -56,6 +56,7 @@ testnet (for now) are Mainnet subgraphs. This means: | `INDEXER_AGENT_EPOCH_SUBGRAPH_ENDPOINT` | `--epoch-subgraph-endpoint` | `https://gateway-arbitrum.network.thegraph.com/api/[api-key]/subgraphs/id/BhnsdeZihU4SuokxZMLF4FQBVJ3jgtZf6v51gHvz3bSS` | | `INDEXER_AGENT_TAP_SUBGRAPH_DEPLOYMENT` | `--tap-subgraph-deployment` | `QmUiLdbsk6c51UMdcNBxsP3KadJpkmp6a3k2NCprR4ZFeM` | | `INDEXER_AGENT_TAP_SUBGRAPH_ENDPOINT` | `--tap-subgraph-endpoint` | `https://gateway-arbitrum.network.thegraph.com/api/[api-key]/subgraphs/id/7ubx365MiqBH5iUz6XWXWT8PTof5BVAyEzdb8m17RvbD` | +| `INDEXER_AGENT_IPFS_ENDPOINT` | `--ipfs-endpoint` | `https://ipfs.network.thegraph.com` | In order to avoid collecting or claiming query fees below a certain threshold (e.g. below the cost of the two transactions), the following configuration diff --git a/packages/indexer-agent/README.md b/packages/indexer-agent/README.md index a0173135d..af735993a 100644 --- a/packages/indexer-agent/README.md +++ b/packages/indexer-agent/README.md @@ -49,9 +49,6 @@ Indexer Infrastructure --graph-node-admin-endpoint Graph Node endpoint for applying and updating subgraph deployments [string] [required] - --enable-auto-migration-support Auto migrate allocations from L1 to L2 - (multi-network mode must be enabled) - [boolean] [default: false] --public-indexer-url Indexer endpoint for receiving requests from the network [string] [required] --indexer-geo-coordinates Coordinates describing the Indexer's @@ -161,9 +158,6 @@ Indexer Infrastructure etc. [string] [required] --graph-node-admin-endpoint Graph Node endpoint for applying and updating subgraph deployments [string] [required] - --enable-auto-migration-support Auto migrate allocations from L1 to L2 - (multi-network mode must be enabled) - [boolean] [default: false] Postgres --postgres-host Postgres host [string] [required] diff --git a/packages/indexer-agent/src/__tests__/indexer.ts b/packages/indexer-agent/src/__tests__/indexer.ts index f8c1567de..29ae5ab28 100644 --- a/packages/indexer-agent/src/__tests__/indexer.ts +++ b/packages/indexer-agent/src/__tests__/indexer.ts @@ -18,7 +18,6 @@ import { specification, QueryFeeModels, defineQueryFeeModels, - MultiNetworks, loadTestYamlConfig, } from '@graphprotocol/indexer-common' import { BigNumber } from 'ethers' @@ -139,6 +138,7 @@ const setup = async () => { 'http://test-admin-endpoint.xyz', 'https://test-query-endpoint.xyz', 'https://test-status-endpoint.xyz', + 'https://test-ipfs-endpoint.xyz', ) const yamlObj = loadTestYamlConfig() @@ -152,11 +152,6 @@ const setup = async () => { metrics, ) - const multiNetworks = new MultiNetworks( - [network], - (n: Network) => n.specification.networkIdentifier, - ) - indexerManagementClient = await createIndexerManagementClient({ models, graphNode, @@ -167,7 +162,7 @@ const setup = async () => { parallelAllocations: 1, }, }, - multiNetworks, + network, }) operator = new Operator(logger, indexerManagementClient, networkSpecification) diff --git a/packages/indexer-agent/src/agent.ts b/packages/indexer-agent/src/agent.ts index 82e14c57e..f3caa6908 100644 --- a/packages/indexer-agent/src/agent.ts +++ b/packages/indexer-agent/src/agent.ts @@ -8,7 +8,6 @@ import { timer, } from '@graphprotocol/common-ts' import { - ActivationCriteria, ActionStatus, Allocation, AllocationManagementMode, @@ -30,11 +29,6 @@ import { GraphNode, Operator, validateProviderNetworkIdentifier, - MultiNetworks, - NetworkMapped, - TransferredSubgraphDeployment, - networkIsL2, - networkIsL1, DeploymentManagementMode, SubgraphStatus, sequentialTimerMap, @@ -43,12 +37,8 @@ import { import PQueue from 'p-queue' import pMap from 'p-map' import pFilter from 'p-filter' -import mapValues from 'lodash.mapvalues' -import zip from 'lodash.zip' import { AgentConfigs, NetworkAndOperator } from './types' -type ActionReconciliationContext = [AllocationDecision[], number, number] - const deploymentInList = ( list: SubgraphDeploymentID[], deployment: SubgraphDeploymentID, @@ -128,65 +118,11 @@ export const convertSubgraphBasedRulesToDeploymentBased = ( return rules } -// Extracts the network identifier from a pair of matching Network and Operator objects. -function networkAndOperatorIdentity({ - network, - operator, -}: NetworkAndOperator): string { - const networkId = network.specification.networkIdentifier - const operatorId = operator.specification.networkIdentifier - if (networkId !== operatorId) { - throw new Error( - `Network and Operator pairs have different network identifiers: ${networkId} != ${operatorId}`, - ) - } - return networkId -} - -// Helper function to produce a `MultiNetworks` while validating its -// inputs. -function createMultiNetworks( - networks: Network[], - operators: Operator[], -): MultiNetworks { - // Validate that Networks and Operator arrays have even lengths and - // contain unique, matching network identifiers. - const visited = new Set() - const validInputs = - networks.length === operators.length && - networks.every((network, index) => { - const sameIdentifier = - network.specification.networkIdentifier === - operators[index].specification.networkIdentifier - if (!sameIdentifier) { - return false - } - if (visited.has(network.specification.networkIdentifier)) { - return false - } - visited.add(network.specification.networkIdentifier) - return true - }) - - if (!validInputs) { - throw new Error( - 'Invalid Networks and Operator pairs used in Agent initialization', - ) - } - // Note on undefineds: `lodash.zip` can return `undefined` if array lengths are - // uneven, but we have just checked that. - const networksAndOperators = zip(networks, operators).map(pair => { - const [network, operator] = pair - return { network: network!, operator: operator! } - }) - return new MultiNetworks(networksAndOperators, networkAndOperatorIdentity) -} - export class Agent { logger: Logger metrics: Metrics graphNode: GraphNode - multiNetworks: MultiNetworks + networkAndOperator: NetworkAndOperator indexerManagement: IndexerManagementClient offchainSubgraphs: SubgraphDeploymentID[] autoMigrationSupport: boolean @@ -198,10 +134,10 @@ export class Agent { this.metrics = configs.metrics this.graphNode = configs.graphNode this.indexerManagement = configs.indexerManagement - this.multiNetworks = createMultiNetworks( - configs.networks, - configs.operators, - ) + this.networkAndOperator = { + network: configs.network, + operator: configs.operator, + } this.offchainSubgraphs = configs.offchainSubgraphs this.autoMigrationSupport = !!configs.autoMigrationSupport this.deploymentManagement = configs.deploymentManagement @@ -228,93 +164,85 @@ export class Agent { // * Ensure NetworkSubgraph is indexing // * Register the Indexer in the Network // -------------------------------------------------------------------------------- - await this.multiNetworks.map( - async ({ network, operator }: NetworkAndOperator) => { - try { - await operator.ensureGlobalIndexingRule() - await this.ensureAllSubgraphsIndexing(network) - await network.register() - } catch (err) { - this.logger.critical( - `Failed to prepare indexer for ${network.specification.networkIdentifier}`, - { - error: err.message, - }, - ) - process.exit(1) - } - }, - ) + const { network, operator }: NetworkAndOperator = this.networkAndOperator + try { + await operator.ensureGlobalIndexingRule() + await this.ensureAllSubgraphsIndexing(network) + await network.register() + } catch (err) { + this.logger.critical( + `Failed to prepare indexer for ${network.specification.networkIdentifier}`, + { + error: err.message, + }, + ) + process.exit(1) + } this.reconciliationLoop() return this } reconciliationLoop() { + const { network, operator } = this.networkAndOperator const requestIntervalSmall = this.pollingInterval const requestIntervalLarge = this.pollingInterval * 5 const logger = this.logger.child({ component: 'ReconciliationLoop' }) - const currentEpochNumber: Eventual> = - sequentialTimerMap( - { logger, milliseconds: requestIntervalLarge }, - async () => - await this.multiNetworks.map(({ network }) => { - logger.trace('Fetching current epoch number', { - protocolNetwork: network.specification.networkIdentifier, - }) - return network.networkMonitor.currentEpochNumber() - }), - { - onError: error => - logger.warn(`Failed to fetch current epoch`, { error }), - }, - ) + const currentEpochNumber: Eventual = sequentialTimerMap( + { logger, milliseconds: requestIntervalLarge }, + async () => { + logger.trace('Fetching current epoch number', { + protocolNetwork: network.specification.networkIdentifier, + }) + return await network.networkMonitor.currentEpochNumber() + }, + { + onError: error => + logger.warn(`Failed to fetch current epoch`, { error }), + }, + ) - const maxAllocationEpochs: Eventual> = - sequentialTimerMap( - { logger, milliseconds: requestIntervalLarge }, - () => - this.multiNetworks.map(({ network }) => { - logger.trace('Fetching max allocation epochs', { - protocolNetwork: network.specification.networkIdentifier, - }) - return network.contracts.staking.maxAllocationEpochs() - }), - { - onError: error => - logger.warn(`Failed to fetch max allocation epochs`, { error }), - }, - ) + const maxAllocationEpochs: Eventual = sequentialTimerMap( + { logger, milliseconds: requestIntervalLarge }, + async () => { + logger.trace('Fetching max allocation epochs', { + protocolNetwork: network.specification.networkIdentifier, + }) + return network.contracts.staking.maxAllocationEpochs() + }, + { + onError: error => + logger.warn(`Failed to fetch max allocation epochs`, { error }), + }, + ) - const indexingRules: Eventual> = + const indexingRules: Eventual = sequentialTimerMap( { logger, milliseconds: requestIntervalSmall }, async () => { - return this.multiNetworks.map(async ({ network, operator }) => { - logger.trace('Fetching indexing rules', { - protocolNetwork: network.specification.networkIdentifier, - }) - let rules = await operator.indexingRules(true) - const subgraphRuleIds = rules - .filter( - rule => rule.identifierType == SubgraphIdentifierType.SUBGRAPH, - ) - .map(rule => rule.identifier!) - const subgraphsMatchingRules = - await network.networkMonitor.subgraphs(subgraphRuleIds) - if (subgraphsMatchingRules.length >= 1) { - const epochLength = - await network.contracts.epochManager.epochLength() - const blockPeriod = 15 - const bufferPeriod = epochLength.toNumber() * blockPeriod * 100 // 100 epochs - rules = convertSubgraphBasedRulesToDeploymentBased( - rules, - subgraphsMatchingRules, - bufferPeriod, - ) - } - return rules + logger.trace('Fetching indexing rules', { + protocolNetwork: network.specification.networkIdentifier, }) + let rules = await operator.indexingRules(true) + const subgraphRuleIds = rules + .filter( + rule => rule.identifierType == SubgraphIdentifierType.SUBGRAPH, + ) + .map(rule => rule.identifier!) + const subgraphsMatchingRules = + await network.networkMonitor.subgraphs(subgraphRuleIds) + if (subgraphsMatchingRules.length >= 1) { + const epochLength = + await network.contracts.epochManager.epochLength() + const blockPeriod = 15 + const bufferPeriod = epochLength.toNumber() * blockPeriod * 100 // 100 epochs + rules = convertSubgraphBasedRulesToDeploymentBased( + rules, + subgraphsMatchingRules, + bufferPeriod, + ) + } + return rules }, { onError: error => @@ -329,8 +257,11 @@ export class Agent { sequentialTimerMap( { logger, milliseconds: requestIntervalLarge }, async () => { - if (this.deploymentManagement === DeploymentManagementMode.AUTO) { - logger.debug('Fetching active deployments') + if ( + this.deploymentManagement === DeploymentManagementMode.AUTO || + network.networkMonitor.poiDisputeMonitoringEnabled() + ) { + logger.trace('Fetching active deployments') const assignments = await this.graphNode.subgraphDeploymentsAssignments( SubgraphStatus.ACTIVE, @@ -351,16 +282,15 @@ export class Agent { }, ) - const networkDeployments: Eventual> = + const networkDeployments: Eventual = sequentialTimerMap( { logger, milliseconds: requestIntervalSmall }, - async () => - await this.multiNetworks.map(({ network }) => { - logger.trace('Fetching network deployments', { - protocolNetwork: network.specification.networkIdentifier, - }) - return network.networkMonitor.subgraphDeployments() - }), + async () => { + logger.trace('Fetching network deployments', { + protocolNetwork: network.specification.networkIdentifier, + }) + return network.networkMonitor.subgraphDeployments() + }, { onError: error => logger.warn( @@ -370,173 +300,26 @@ export class Agent { }, ) - const eligibleTransferDeployments: Eventual< - NetworkMapped - > = sequentialTimerMap( - { logger, milliseconds: requestIntervalLarge }, - async () => { - // Return early if the auto migration feature is disabled. - if (!this.autoMigrationSupport) { - logger.trace( - 'Auto Migration feature is disabled, skipping querying transferred subgraphs', - ) - return this.multiNetworks.map(async () => []) - } - - const statuses = await this.graphNode.indexingStatus([]) - return this.multiNetworks.map(async ({ network }) => { - const protocolNetwork = network.specification.networkIdentifier - logger.trace('Fetching deployments eligible for L2 transfer', { - protocolNetwork, - }) - const transfers = - await network.networkMonitor.transferredDeployments() - logger.trace( - `Found ${transfers.length} transferred subgraphs in the network`, - { protocolNetwork }, - ) - return transfers - .map(transfer => { - const status = statuses.find( - status => - status.subgraphDeployment.ipfsHash == transfer.ipfsHash, - ) - if (status) { - transfer.ready = status.synced && status.health == 'healthy' - } - return transfer - }) - .filter(transfer => transfer.ready == true) - }) - }, - { - onError: error => - logger.warn( - `Failed to obtain transferred deployments, trying again later`, - { error }, - ), - }, - ) - - // While in the L1 -> L2 transfer period this will be an intermediate value - // with the final value including transfer considerations - const intermediateNetworkDeploymentAllocationDecisions: Eventual< - NetworkMapped - > = join({ - networkDeployments, - indexingRules, - }).tryMap( - ({ indexingRules, networkDeployments }) => { - return mapValues( - this.multiNetworks.zip(indexingRules, networkDeployments), - ([indexingRules, networkDeployments]: [ - IndexingRuleAttributes[], - SubgraphDeployment[], - ]) => { - // Identify subgraph deployments on the network that are worth picking up; - // these may overlap with the ones we're already indexing - logger.trace('Evaluating which deployments are worth allocating to') - return indexingRules.length === 0 - ? [] - : evaluateDeployments(logger, networkDeployments, indexingRules) - }, - ) - }, - { - onError: error => - logger.warn(`Failed to evaluate deployments, trying again later`, { - error, - }), - }, - ) - - // Update targetDeployments and networkDeplomentAllocationDecisions using transferredSubgraphDeployments data - // This will be somewhat custom and will likely be yanked out later after the transfer stage is complete - // Cases: - // - L1 subgraph that had the transfer started: keep synced and allocated to for at least one week - // post transfer. - // - L2 subgraph that has been transferred: - // - if already synced, allocate to it immediately using default allocation amount - // - if not synced, no changes - const networkDeploymentAllocationDecisions: Eventual< - NetworkMapped - > = join({ - intermediateNetworkDeploymentAllocationDecisions, - eligibleTransferDeployments, - }).tryMap( - ({ - intermediateNetworkDeploymentAllocationDecisions, - eligibleTransferDeployments, - }) => - mapValues( - this.multiNetworks.zip( - intermediateNetworkDeploymentAllocationDecisions, - eligibleTransferDeployments, - ), - ([allocationDecisions, eligibleTransferDeployments]: [ - AllocationDecision[], - TransferredSubgraphDeployment[], - ]) => { - logger.debug( - `Found ${eligibleTransferDeployments.length} deployments eligible for transfer`, - { eligibleTransferDeployments }, - ) - const oneWeekAgo = Math.floor(Date.now() / 1_000) - 86_400 * 7 - return allocationDecisions.map(decision => { - const matchingTransfer = eligibleTransferDeployments.find( - deployment => - deployment.ipfsHash == decision.deployment.ipfsHash && - deployment.startedTransferToL2At.toNumber() > oneWeekAgo, - ) - if (matchingTransfer) { - logger.debug('Found a matching subgraph transfer', { - matchingTransfer, - }) - // L1 deployments being transferred need to be supported for one week post transfer - // to ensure continued support. - if (networkIsL1(matchingTransfer.protocolNetwork)) { - decision.toAllocate = true - decision.ruleMatch.activationCriteria = - ActivationCriteria.L2_TRANSFER_SUPPORT - logger.debug( - `Allocating towards L1 subgraph deployment to support its transfer`, - { - subgraphDeployment: matchingTransfer, - allocationDecision: decision, - }, - ) - } - // L2 Deployments - if ( - networkIsL2(matchingTransfer.protocolNetwork) && - !!matchingTransfer.transferredToL2 - ) { - decision.toAllocate = true - decision.ruleMatch.activationCriteria = - ActivationCriteria.L2_TRANSFER_SUPPORT - logger.debug( - `Allocating towards transferred L2 subgraph deployment`, - { - subgraphDeployment: matchingTransfer, - allocationDecision: decision, - }, - ) - } - } - return decision - }) - }, - ), - { - onError: error => - logger.warn( - `Failed to merge L2 transfer decisions, trying again later`, - { + const networkDeploymentAllocationDecisions: Eventual = + join({ + networkDeployments, + indexingRules, + }).tryMap( + ({ indexingRules, networkDeployments }) => { + // Identify subgraph deployments on the network that are worth picking up; + // these may overlap with the ones we're already indexing + logger.trace('Evaluating which deployments are worth allocating to') + return indexingRules.length === 0 + ? [] + : evaluateDeployments(logger, networkDeployments, indexingRules) + }, + { + onError: error => + logger.warn(`Failed to evaluate deployments, trying again later`, { error, - }, - ), - }, - ) + }), + }, + ) // let targetDeployments be an union of targetAllocations // and offchain subgraphs. @@ -546,8 +329,12 @@ export class Agent { }).tryMap( async ({ indexingRules, networkDeploymentAllocationDecisions }) => { logger.trace('Resolving target deployments') - const targetDeploymentIDs: Set = - consolidateAllocationDecisions(networkDeploymentAllocationDecisions) + const targetDeploymentIDs: Set = new Set( + Object.values(networkDeploymentAllocationDecisions) + .flat() + .filter(decision => decision.toAllocate === true) + .map(decision => decision.deployment), + ) // Add offchain subgraphs to the deployment list from rules Object.values(indexingRules) @@ -572,23 +359,21 @@ export class Agent { }, ) - const activeAllocations: Eventual> = - sequentialTimerMap( - { logger, milliseconds: requestIntervalSmall }, - () => - this.multiNetworks.map(({ network }) => { - logger.trace('Fetching active allocations', { - protocolNetwork: network.specification.networkIdentifier, - }) - return network.networkMonitor.allocations(AllocationStatus.ACTIVE) - }), - { - onError: () => - logger.warn( - `Failed to obtain active allocations, trying again later`, - ), - }, - ) + const activeAllocations: Eventual = sequentialTimerMap( + { logger, milliseconds: requestIntervalSmall }, + async () => { + logger.trace('Fetching active allocations', { + protocolNetwork: network.specification.networkIdentifier, + }) + return network.networkMonitor.allocations(AllocationStatus.ACTIVE) + }, + { + onError: () => + logger.warn( + `Failed to obtain active allocations, trying again later`, + ), + }, + ) // `activeAllocations` is used to trigger this Eventual, but not really needed // inside. @@ -598,20 +383,12 @@ export class Agent { }).tryMap( // eslint-disable-next-line @typescript-eslint/no-unused-vars async ({ activeAllocations: _, currentEpochNumber }) => { - const allocationsByNetwork = await this.multiNetworks.mapNetworkMapped( - currentEpochNumber, - async ({ network }, epochNumber): Promise => { - logger.trace('Fetching recently closed allocations', { - protocolNetwork: network.specification.networkIdentifier, - currentEpochNumber, - }) - return network.networkMonitor.recentlyClosedAllocations( - epochNumber, - 1, - ) - }, - ) - return Object.values(allocationsByNetwork).flat() + const recentlyClosedAllocations = + await network.networkMonitor.recentlyClosedAllocations( + currentEpochNumber, + 1, + ) + return Object.values(recentlyClosedAllocations).flat() }, { onError: () => @@ -621,26 +398,21 @@ export class Agent { }, ) - const disputableAllocations: Eventual> = join({ + const disputableAllocations: Eventual = join({ currentEpochNumber, activeDeployments, }).tryMap( - async ({ currentEpochNumber, activeDeployments }) => - this.multiNetworks.mapNetworkMapped( + async ({ currentEpochNumber, activeDeployments }) => { + logger.trace('Fetching disputable allocations', { + protocolNetwork: network.specification.networkIdentifier, currentEpochNumber, - ({ network }: NetworkAndOperator, currentEpochNumber: number) => { - logger.trace('Fetching disputable allocations', { - protocolNetwork: network.specification.networkIdentifier, - currentEpochNumber, - }) - return network.networkMonitor.disputableAllocations( - currentEpochNumber, - activeDeployments, - 0, - ) - }, - ), - + }) + return network.networkMonitor.disputableAllocations( + currentEpochNumber, + activeDeployments, + 0, + ) + }, { onError: () => logger.warn( @@ -675,30 +447,16 @@ export class Agent { }) try { - const disputableEpochs = await this.multiNetworks.mapNetworkMapped( - currentEpochNumber, - async ( - { network }: NetworkAndOperator, - currentEpochNumber: number, - ) => - currentEpochNumber - - network.specification.indexerOptions.poiDisputableEpochs, - ) + const disputableEpochs = + currentEpochNumber - + network.specification.indexerOptions.poiDisputableEpochs // Find disputable allocations - await this.multiNetworks.mapNetworkMapped( - this.multiNetworks.zip(disputableEpochs, disputableAllocations), - async ( - { network, operator }: NetworkAndOperator, - [disputableEpoch, disputableAllocations]: [number, Allocation[]], - ): Promise => { - await this.identifyPotentialDisputes( - disputableAllocations, - disputableEpoch, - operator, - network, - ) - }, + await this.identifyPotentialDisputes( + disputableAllocations, + disputableEpochs, + operator, + network, ) } catch (err) { logger.warn(`Failed POI dispute monitoring`, { err }) @@ -916,15 +674,14 @@ export class Agent { // Ensure the network subgraph deployment is _always_ indexed // ---------------------------------------------------------------------------------------- let indexingNetworkSubgraph = false - await this.multiNetworks.map(async ({ network }) => { - if (network.networkSubgraph.deployment) { - const networkDeploymentID = network.networkSubgraph.deployment.id - if (!deploymentInList(targetDeployments, networkDeploymentID)) { - targetDeployments.push(networkDeploymentID) - indexingNetworkSubgraph = true - } + const { network } = this.networkAndOperator + if (network.networkSubgraph.deployment) { + const networkDeploymentID = network.networkSubgraph.deployment.id + if (!deploymentInList(targetDeployments, networkDeploymentID)) { + targetDeployments.push(networkDeploymentID) + indexingNetworkSubgraph = true } - }) + } // ---------------------------------------------------------------------------------------- // Inspect Deployments and Networks @@ -1143,116 +900,95 @@ export class Agent { } async reconcileActions( - networkDeploymentAllocationDecisions: NetworkMapped, - epoch: NetworkMapped, - maxAllocationEpochs: NetworkMapped, + allocationDecisions: AllocationDecision[], + epoch: number, + maxAllocationEpochs: number, ): Promise { // -------------------------------------------------------------------------------- // Filter out networks set to `manual` allocation management mode, and ensure the // Network Subgraph is NEVER allocated towards // -------------------------------------------------------------------------------- - const validatedAllocationDecisions = - await this.multiNetworks.mapNetworkMapped( - networkDeploymentAllocationDecisions, - async ( - { network }: NetworkAndOperator, - allocationDecisions: AllocationDecision[], - ) => { - if ( - network.specification.indexerOptions.allocationManagementMode === - AllocationManagementMode.MANUAL - ) { - this.logger.trace( - `Skipping allocation reconciliation since AllocationManagementMode = 'manual'`, - { - protocolNetwork: network.specification.networkIdentifier, - targetDeployments: allocationDecisions - .filter(decision => decision.toAllocate) - .map(decision => decision.deployment.ipfsHash), - }, - ) - return [] as AllocationDecision[] - } - const networkSubgraphDeployment = network.networkSubgraph.deployment - if ( - networkSubgraphDeployment && - !network.specification.indexerOptions.allocateOnNetworkSubgraph - ) { - const networkSubgraphIndex = allocationDecisions.findIndex( - decision => - decision.deployment.bytes32 == - networkSubgraphDeployment.id.bytes32, - ) - if (networkSubgraphIndex >= 0) { - allocationDecisions[networkSubgraphIndex].toAllocate = false - } - } - return allocationDecisions + const { network, operator } = this.networkAndOperator + let validatedAllocationDecisions = [...allocationDecisions] + + if ( + network.specification.indexerOptions.allocationManagementMode === + AllocationManagementMode.MANUAL + ) { + this.logger.trace( + `Skipping allocation reconciliation since AllocationManagementMode = 'manual'`, + { + protocolNetwork: network.specification.networkIdentifier, + targetDeployments: allocationDecisions + .filter(decision => decision.toAllocate) + .map(decision => decision.deployment.ipfsHash), }, ) + validatedAllocationDecisions = [] as AllocationDecision[] + } else { + const networkSubgraphDeployment = network.networkSubgraph.deployment + if ( + networkSubgraphDeployment && + !network.specification.indexerOptions.allocateOnNetworkSubgraph + ) { + const networkSubgraphIndex = validatedAllocationDecisions.findIndex( + decision => + decision.deployment.bytes32 == networkSubgraphDeployment.id.bytes32, + ) + if (networkSubgraphIndex >= 0) { + validatedAllocationDecisions[networkSubgraphIndex].toAllocate = false + } + } + } //---------------------------------------------------------------------------------------- // For every network, loop through all deployments and queue allocation actions if needed //---------------------------------------------------------------------------------------- - await this.multiNetworks.mapNetworkMapped( - this.multiNetworks.zip3( - validatedAllocationDecisions, - epoch, - maxAllocationEpochs, - ), - async ( - { network, operator }: NetworkAndOperator, - [ - allocationDecisions, - epoch, - maxAllocationEpochs, - ]: ActionReconciliationContext, - ) => { - // Do nothing if there are already approved actions in the queue awaiting execution - const approvedActions = await operator.fetchActions({ - status: ActionStatus.APPROVED, - protocolNetwork: network.specification.networkIdentifier, - }) - if (approvedActions.length > 0) { - this.logger.info( - `There are ${approvedActions.length} approved actions awaiting execution, will reconcile with the network once they are executed`, - { protocolNetwork: network.specification.networkIdentifier }, - ) - return - } - // Accuracy check: re-fetch allocations to ensure that we have a fresh state since the - // start of the reconciliation loop. This means we don't use the allocations coming from - // the Eventual input. - const activeAllocations: Allocation[] = - await network.networkMonitor.allocations(AllocationStatus.ACTIVE) + // Do nothing if there are already approved actions in the queue awaiting execution + const approvedActions = await operator.fetchActions({ + status: ActionStatus.APPROVED, + protocolNetwork: network.specification.networkIdentifier, + }) + if (approvedActions.length > 0) { + this.logger.info( + `There are ${approvedActions.length} approved actions awaiting execution, will reconcile with the network once they are executed`, + { protocolNetwork: network.specification.networkIdentifier }, + ) + return + } - this.logger.trace(`Reconcile allocation actions`, { - protocolNetwork: network.specification.networkIdentifier, - epoch, - maxAllocationEpochs, - targetDeployments: allocationDecisions - .filter(decision => decision.toAllocate) - .map(decision => decision.deployment.ipfsHash), - activeAllocations: activeAllocations.map(allocation => ({ - id: allocation.id, - deployment: allocation.subgraphDeployment.id.ipfsHash, - createdAtEpoch: allocation.createdAtEpoch, - })), - }) + // Accuracy check: re-fetch allocations to ensure that we have a fresh state since the + // start of the reconciliation loop. This means we don't use the allocations coming from + // the Eventual input. + const activeAllocations: Allocation[] = + await network.networkMonitor.allocations(AllocationStatus.ACTIVE) - return pMap(allocationDecisions, async decision => - this.reconcileDeploymentAllocationAction( - decision, - activeAllocations, - epoch, - maxAllocationEpochs, - network, - operator, - ), - ) - }, + this.logger.trace(`Reconcile allocation actions`, { + protocolNetwork: network.specification.networkIdentifier, + epoch, + maxAllocationEpochs, + targetDeployments: validatedAllocationDecisions + .filter(decision => decision.toAllocate) + .map(decision => decision.deployment.ipfsHash), + activeAllocations: activeAllocations.map(allocation => ({ + id: allocation.id, + deployment: allocation.subgraphDeployment.id.ipfsHash, + createdAtEpoch: allocation.createdAtEpoch, + })), + }) + + await pMap(validatedAllocationDecisions, async decision => + this.reconcileDeploymentAllocationAction( + decision, + activeAllocations, + epoch, + maxAllocationEpochs, + network, + operator, + ), ) + return } // TODO: After indexer-service deprecation: Move to be an initialization check inside Network.create() diff --git a/packages/indexer-agent/src/commands/common-options.ts b/packages/indexer-agent/src/commands/common-options.ts index 0ed8e67c7..e93b5d2e9 100644 --- a/packages/indexer-agent/src/commands/common-options.ts +++ b/packages/indexer-agent/src/commands/common-options.ts @@ -101,6 +101,13 @@ export function injectCommonStartupOptions(argv: Argv): Argv { default: 50, group: 'Postgres', }) + .option('ipfs-endpoint', { + description: 'Ipfs endpoint for querying manifests.', + type: 'string', + required: true, + group: 'Indexer Infrastructure', + default: 'https://ipfs.network.thegraph.com', + }) .option('graph-node-query-endpoint', { description: 'Graph Node endpoint for querying subgraphs', type: 'string', @@ -120,14 +127,6 @@ export function injectCommonStartupOptions(argv: Argv): Argv { required: true, group: 'Indexer Infrastructure', }) - .option('enable-auto-migration-support', { - description: - 'Auto migrate allocations from L1 to L2 (multi-network mode must be enabled)', - type: 'boolean', - required: false, - default: false, - group: 'Indexer Infrastructure', - }) .option('deployment-management', { describe: 'Subgraph deployments management mode', required: false, diff --git a/packages/indexer-agent/src/commands/start-multi-network.ts b/packages/indexer-agent/src/commands/start-multi-network.ts deleted file mode 100644 index 491cd8189..000000000 --- a/packages/indexer-agent/src/commands/start-multi-network.ts +++ /dev/null @@ -1,18 +0,0 @@ -import { Argv } from 'yargs' -import { injectCommonStartupOptions } from './common-options' - -export const startMultiNetwork = { - command: 'start', - describe: 'Start the Agent in multiple Protocol Networks', - builder: (args: Argv): Argv => { - const updatedArgs = injectCommonStartupOptions(args) - return updatedArgs.option('network-specifications-directory', { - alias: 'dir', - description: 'Path to a directory containing network specification files', - type: 'string', - required: true, - }) - }, - // eslint-disable-next-line @typescript-eslint/no-empty-function, @typescript-eslint/no-unused-vars, @typescript-eslint/no-explicit-any - handler: (_argv: any) => {}, -} diff --git a/packages/indexer-agent/src/commands/start.ts b/packages/indexer-agent/src/commands/start.ts index caee24b45..72914dfe6 100644 --- a/packages/indexer-agent/src/commands/start.ts +++ b/packages/indexer-agent/src/commands/start.ts @@ -18,7 +18,6 @@ import { GraphNode, indexerError, IndexerErrorCode, - MultiNetworks, Network, Operator, registerIndexerErrorMetrics, @@ -28,8 +27,6 @@ import { import { Agent } from '../agent' import { createSyncingServer } from '../syncing-server' import { injectCommonStartupOptions } from './common-options' -import pMap from 'p-map' -import { NetworkSpecification } from '@graphprotocol/indexer-common/dist/network-specification' import { BigNumber } from 'ethers' import { displayZodParsingError } from '@graphprotocol/indexer-common' import { readFileSync } from 'fs' @@ -49,7 +46,15 @@ export const start = { describe: 'Start the agent', builder: (args: Argv): Argv => { const updatedArgs = injectCommonStartupOptions(args) + return updatedArgs + .option('network-specifications-directory', { + alias: 'dir', + description: + 'Path to a directory containing network specification files', + type: 'string', + required: false, + }) .option('network-provider', { alias: 'ethereum', description: 'Ethereum node or provider URL', @@ -459,7 +464,7 @@ function loadFile(path: string | undefined): unknown | undefined { export async function run( argv: AgentOptions, - networkSpecifications: spec.NetworkSpecification[], + networkSpecification: spec.NetworkSpecification, logger: Logger, ): Promise { // -------------------------------------------------------------------------------- @@ -500,6 +505,7 @@ export async function run( argv.graphNodeAdminEndpoint, argv.graphNodeQueryEndpoint, argv.graphNodeStatusEndpoint, + argv.ipfsEndpoint, ) // -------------------------------------------------------------------------------- @@ -543,7 +549,7 @@ export async function run( queryInterface: sequelize.getQueryInterface(), logger, graphNodeAdminEndpoint: argv.graphNodeAdminEndpoint, - networkSpecifications, + networkSpecifications: [networkSpecification], graphNode: graphNode, }, storage: new SequelizeStorage({ sequelize }), @@ -573,23 +579,21 @@ export async function run( // -------------------------------------------------------------------------------- // * Networks // -------------------------------------------------------------------------------- - logger.info('Connect to network/s', { - networks: networkSpecifications.map(spec => spec.networkIdentifier), + logger.info('Connect to network', { + networks: networkSpecification.networkIdentifier, }) - const networks: Network[] = await pMap( - networkSpecifications, - async (spec: NetworkSpecification) => - Network.create(logger, spec, queryFeeModels, graphNode, metrics), + const network = await Network.create( + logger, + networkSpecification, + queryFeeModels, + graphNode, + metrics, ) // -------------------------------------------------------------------------------- // * Indexer Management (GraphQL) Server // -------------------------------------------------------------------------------- - const multiNetworks = new MultiNetworks( - networks, - (n: Network) => n.specification.networkIdentifier, - ) const indexerManagementClient = await createIndexerManagementClient({ models: managementModels, @@ -602,7 +606,7 @@ export async function run( parallelAllocations: 1, }, }, - multiNetworks, + network, }) // -------------------------------------------------------------------------------- @@ -625,9 +629,7 @@ export async function run( await createSyncingServer({ logger, - networkSubgraphs: await multiNetworks.map( - async network => network.networkSubgraph, - ), + networkSubgraph: network.networkSubgraph, port: argv.syncingPort, }) logger.info(`Successfully launched syncing server`) @@ -635,10 +637,10 @@ export async function run( // -------------------------------------------------------------------------------- // * Operator // -------------------------------------------------------------------------------- - const operators: Operator[] = await pMap( - networkSpecifications, - async (spec: NetworkSpecification) => - new Operator(logger, indexerManagementClient, spec), + const operator = new Operator( + logger, + indexerManagementClient, + networkSpecification, ) // -------------------------------------------------------------------------------- @@ -648,9 +650,9 @@ export async function run( logger, metrics, graphNode, - operators, + operator, indexerManagement: indexerManagementClient, - networks, + network, deploymentManagement: argv.deploymentManagement, autoMigrationSupport: argv.enableAutoMigrationSupport, offchainSubgraphs: argv.offchainSubgraphs.map( diff --git a/packages/indexer-agent/src/db/migrations/18-actions-expand-action-status-add-deploying.ts b/packages/indexer-agent/src/db/migrations/18-actions-expand-action-status-add-deploying.ts new file mode 100644 index 000000000..bba663681 --- /dev/null +++ b/packages/indexer-agent/src/db/migrations/18-actions-expand-action-status-add-deploying.ts @@ -0,0 +1,51 @@ +import { Logger } from '@graphprotocol/common-ts' +import { DataTypes, QueryInterface } from 'sequelize' + +interface MigrationContext { + queryInterface: QueryInterface + logger: Logger +} + +interface Context { + context: MigrationContext +} + +export async function up({ context }: Context): Promise { + const { queryInterface, logger } = context + + logger.debug(`Checking if 'Actions' table exists`) + const tables = await queryInterface.showAllTables() + if (!tables.includes('Actions')) { + logger.info(`Actions table does not exist, migration not necessary`) + return + } + + logger.debug(`Checking if 'Actions' table needs to be migrated`) + const table = await queryInterface.describeTable('Actions') + const statusColumn = table.status + if (statusColumn) { + logger.debug(`'status' column exists with type = ${statusColumn.type}`) + logger.info(`Update 'status' column to support variant 'deploying' status`) + await queryInterface.changeColumn('Actions', 'status', { + type: DataTypes.ENUM( + 'queued', + 'approved', + 'deploying', + 'pending', + 'success', + 'failed', + 'canceled', + ), + allowNull: false, + }) + return + } +} + +export async function down({ context }: Context): Promise { + const { logger } = context + logger.info( + `No 'down' migration needed since the 'up' migration simply expanded the 'failureReason' column size`, + ) + return +} diff --git a/packages/indexer-agent/src/index.ts b/packages/indexer-agent/src/index.ts index c6fa21074..675cb0051 100644 --- a/packages/indexer-agent/src/index.ts +++ b/packages/indexer-agent/src/index.ts @@ -7,24 +7,14 @@ import { AgentOptions, run, } from './commands/start' -import { startMultiNetwork } from './commands/start-multi-network' -import { parseNetworkSpecifications } from '@graphprotocol/indexer-common' - -const MULTINETWORK_MODE: boolean = - !!process.env.INDEXER_AGENT_MULTINETWORK_MODE && - process.env.INDEXER_AGENT_MULTINETWORK_MODE.toLowerCase() !== 'false' +import { parseNetworkSpecification } from '@graphprotocol/indexer-common' function parseArguments(): AgentOptions { let builder = yargs.scriptName('indexer-agent').env('INDEXER_AGENT') // Dynamic argument parser construction based on network mode - if (MULTINETWORK_MODE) { - console.log('Starting the Indexer Agent in multi-network mode') - builder = builder.command(startMultiNetwork) - } else { - console.log('Starting the Indexer Agent in single-network mode') - builder = builder.command(start) - } + console.log('Starting the Indexer Agent') + builder = builder.command(start) return ( builder @@ -53,14 +43,15 @@ async function processArgumentsAndRun(args: AgentOptions): Promise { async: false, level: args.logLevel, }) - if (MULTINETWORK_MODE) { - const specifications = parseNetworkSpecifications(args, logger) - await run(args, specifications, logger) + + let specification + if (args.dir || args['network-specifications-directory']) { + specification = parseNetworkSpecification(args, logger) } else { + specification = await createNetworkSpecification(args, logger) reviewArgumentsForWarnings(args, logger) - const specification = await createNetworkSpecification(args, logger) - await run(args, [specification], logger) } + await run(args, specification!, logger) } async function main(): Promise { diff --git a/packages/indexer-agent/src/syncing-server.ts b/packages/indexer-agent/src/syncing-server.ts index 2c08da48b..2920eb7e0 100644 --- a/packages/indexer-agent/src/syncing-server.ts +++ b/packages/indexer-agent/src/syncing-server.ts @@ -5,21 +5,17 @@ import bodyParser from 'body-parser' import morgan from 'morgan' import { Logger } from '@graphprotocol/common-ts' import { parse } from 'graphql' -import { - NetworkMapped, - SubgraphClient, - resolveChainId, -} from '@graphprotocol/indexer-common' +import { SubgraphClient, resolveChainId } from '@graphprotocol/indexer-common' export interface CreateSyncingServerOptions { logger: Logger - networkSubgraphs: NetworkMapped + networkSubgraph: SubgraphClient port: number } export const createSyncingServer = async ({ logger, - networkSubgraphs, + networkSubgraph, port, }: CreateSyncingServerOptions): Promise => { logger = logger.child({ component: 'SyncingServer' }) @@ -64,7 +60,6 @@ export const createSyncingServer = async ({ .send(`Unknown network identifier: '${unvalidatedNetworkIdentifier}'`) } - const networkSubgraph = networkSubgraphs[networkIdentifier] if (!networkSubgraph) { return res .status(404) diff --git a/packages/indexer-agent/src/types.ts b/packages/indexer-agent/src/types.ts index f71f9d295..b945bae59 100644 --- a/packages/indexer-agent/src/types.ts +++ b/packages/indexer-agent/src/types.ts @@ -18,9 +18,9 @@ export interface AgentConfigs { logger: Logger metrics: Metrics graphNode: GraphNode - operators: Operator[] + network: Network + operator: Operator indexerManagement: IndexerManagementClient - networks: Network[] deploymentManagement: DeploymentManagementMode autoMigrationSupport: boolean offchainSubgraphs: SubgraphDeploymentID[] diff --git a/packages/indexer-cli/src/__tests__/cli.test.ts b/packages/indexer-cli/src/__tests__/cli.test.ts index 40fcda30c..9a6178b62 100644 --- a/packages/indexer-cli/src/__tests__/cli.test.ts +++ b/packages/indexer-cli/src/__tests__/cli.test.ts @@ -1,10 +1,10 @@ -import { cliTest, setupMultiNetworks, teardown } from './util' +import { cliTest, setupSingleNetwork, teardown } from './util' import path from 'path' const baseDir = path.join(__dirname) describe('Indexer cli tests', () => { - beforeEach(setupMultiNetworks) + beforeEach(setupSingleNetwork) afterEach(teardown) describe('General', () => { diff --git a/packages/indexer-cli/src/__tests__/indexer/actions.test.ts b/packages/indexer-cli/src/__tests__/indexer/actions.test.ts index 6d432b3d4..00d90538b 100644 --- a/packages/indexer-cli/src/__tests__/indexer/actions.test.ts +++ b/packages/indexer-cli/src/__tests__/indexer/actions.test.ts @@ -2,7 +2,7 @@ import { cliTest, deleteFromAllTables, seedActions, - setupMultiNetworks, + setupSingleNetwork, teardown, } from '../util' import path from 'path' @@ -10,7 +10,7 @@ import path from 'path' const baseDir = path.join(__dirname, '..') describe('Indexer actions tests', () => { describe('With indexer management server', () => { - beforeAll(setupMultiNetworks) + beforeAll(setupSingleNetwork) afterAll(teardown) beforeEach(seedActions) afterEach(deleteFromAllTables) diff --git a/packages/indexer-cli/src/__tests__/indexer/cost.test.ts b/packages/indexer-cli/src/__tests__/indexer/cost.test.ts index 4b08860c7..2532c68cd 100644 --- a/packages/indexer-cli/src/__tests__/indexer/cost.test.ts +++ b/packages/indexer-cli/src/__tests__/indexer/cost.test.ts @@ -5,7 +5,6 @@ import { deleteFromAllTables, seedCostModels, setupSingleNetwork, - setupMultiNetworks, } from '../util' import path from 'path' @@ -215,30 +214,3 @@ describe('Indexer cost tests singleNetwork', () => { ) }) }) - -describe('Indexer cost tests multiNetworks', () => { - beforeAll(setupMultiNetworks) - afterAll(teardown) - beforeEach(seedCostModels) - afterEach(deleteFromAllTables) - - describe('Cost set...', () => { - cliTest( - 'Indexer cost set model deployment id - reject multinetwork mode', - [ - 'indexer', - 'cost', - 'set', - 'model', - 'QmXRpJW3qBuYaiBYHdhv8DF4bHDZhXBmh91MtrnhJfQ5Lk', - 'references/basic.agora', - ], - 'references/indexer-cost-model-deployment-multinetworks', - { - expectedExitCode: 1, - cwd: baseDir, - timeout: 10000, - }, - ) - }) -}) diff --git a/packages/indexer-cli/src/__tests__/indexer/rules.test.ts b/packages/indexer-cli/src/__tests__/indexer/rules.test.ts index 708440e35..d32231de5 100644 --- a/packages/indexer-cli/src/__tests__/indexer/rules.test.ts +++ b/packages/indexer-cli/src/__tests__/indexer/rules.test.ts @@ -4,7 +4,7 @@ import { teardown, deleteFromAllTables, seedIndexingRules, - setupMultiNetworks, + setupSingleNetwork, } from '../util' import path from 'path' @@ -12,7 +12,7 @@ const baseDir = path.join(__dirname, '..') describe('Indexer rules tests', () => { describe('With indexer management server', () => { - beforeAll(setupMultiNetworks) + beforeAll(setupSingleNetwork) afterAll(teardown) beforeEach(seedIndexingRules) afterEach(deleteFromAllTables) diff --git a/packages/indexer-cli/src/__tests__/util.ts b/packages/indexer-cli/src/__tests__/util.ts index 273a3d649..4753e9132 100644 --- a/packages/indexer-cli/src/__tests__/util.ts +++ b/packages/indexer-cli/src/__tests__/util.ts @@ -19,7 +19,6 @@ import { IndexerManagementModels, IndexingDecisionBasis, loadTestYamlConfig, - MultiNetworks, Network, QueryFeeModels, specification, @@ -56,15 +55,11 @@ let metrics: Metrics const yamlObj = loadTestYamlConfig() const testNetworkSpecification = specification.NetworkSpecification.parse(yamlObj) -export const setupMultiNetworks = async () => { - return await setup(true) -} - export const setupSingleNetwork = async () => { - return await setup(false) + return await setup() } -export const setup = async (multiNetworksEnabled: boolean) => { +export const setup = async () => { logger = createLogger({ name: 'Setup', async: false, @@ -87,6 +82,7 @@ export const setup = async (multiNetworksEnabled: boolean) => { 'http://test-admin-endpoint.xyz', 'https://test-query-endpoint.xyz', statusEndpoint, + 'https://test-ipfs-endpoint.xyz', ) const network = await Network.create( @@ -100,13 +96,6 @@ export const setup = async (multiNetworksEnabled: boolean) => { const fakeMainnetNetwork = cloneDeep(network) as Network fakeMainnetNetwork.specification.networkIdentifier = 'eip155:1' - const multiNetworks = multiNetworksEnabled - ? new MultiNetworks( - [network, fakeMainnetNetwork], - (n: Network) => n.specification.networkIdentifier, - ) - : new MultiNetworks([network], (n: Network) => n.specification.networkIdentifier) - const defaults: IndexerManagementDefaults = { globalIndexingRule: { allocationAmount: parseGRT('100'), @@ -121,7 +110,7 @@ export const setup = async (multiNetworksEnabled: boolean) => { graphNode, logger, defaults, - multiNetworks, + network, }) server = await createIndexerManagementServer({ diff --git a/packages/indexer-cli/src/commands/indexer/actions/delete.ts b/packages/indexer-cli/src/commands/indexer/actions/delete.ts index 84940bf0e..e69247937 100644 --- a/packages/indexer-cli/src/commands/indexer/actions/delete.ts +++ b/packages/indexer-cli/src/commands/indexer/actions/delete.ts @@ -13,7 +13,7 @@ ${chalk.bold('graph indexer actions delete')} [options] [ ...] ${chalk.dim('Options:')} -h, --help Show usage information - --status queued|approved|pending|success|failed|canceled Filter by status + --status queued|approved|deploying|pending|success|failed|canceled Filter by status -o, --output table|json|yaml Choose the output format: table (default), JSON, or YAML ` diff --git a/packages/indexer-cli/src/commands/indexer/actions/get.ts b/packages/indexer-cli/src/commands/indexer/actions/get.ts index ce73d2740..f1916eb04 100644 --- a/packages/indexer-cli/src/commands/indexer/actions/get.ts +++ b/packages/indexer-cli/src/commands/indexer/actions/get.ts @@ -114,12 +114,18 @@ module.exports = { if ( status && - !['queued', 'approved', 'pending', 'success', 'failed', 'canceled'].includes( - status, - ) + ![ + 'queued', + 'approved', + 'deploying', + 'pending', + 'success', + 'failed', + 'canceled', + ].includes(status) ) { throw Error( - `Invalid '--status' provided, must be one of ['queued', 'approved', 'pending', 'success', 'failed', 'canceled]`, + `Invalid '--status' provided, must be one of ['queued', 'approved', 'deploying', 'pending', 'success', 'failed', 'canceled]`, ) } diff --git a/packages/indexer-cli/src/commands/indexer/actions/update.ts b/packages/indexer-cli/src/commands/indexer/actions/update.ts index 6ea470cac..0d27d8f31 100644 --- a/packages/indexer-cli/src/commands/indexer/actions/update.ts +++ b/packages/indexer-cli/src/commands/indexer/actions/update.ts @@ -22,13 +22,13 @@ ${chalk.bold('graph indexer actions update')} [options] [ ...] ${chalk.dim('Options:')} - -h, --help Show usage information - --id Filter by actionID - --type allocate|unallocate|reallocate Filter by type - --status queued|approved|pending|success|failed|canceled Filter by status - --source Filter by source - --reason Filter by reason string - -o, --output table|json|yaml Choose the output format: table (default), JSON, or YAML + -h, --help Show usage information + --id Filter by actionID + --type allocate|unallocate|reallocate Filter by type + --status queued|approved|deploying|pending|success|failed|canceled Filter by status + --source Filter by source + --reason Filter by reason string + -o, --output table|json|yaml Choose the output format: table (default), JSON, or YAML ` module.exports = { diff --git a/packages/indexer-common/src/__tests__/ipfs.test.ts b/packages/indexer-common/src/__tests__/ipfs.test.ts new file mode 100644 index 000000000..3345cb6be --- /dev/null +++ b/packages/indexer-common/src/__tests__/ipfs.test.ts @@ -0,0 +1,183 @@ +import { createLogger, SubgraphDeploymentID } from '@graphprotocol/common-ts' +import { SubgraphDependencies, SubgraphManifestResolver } from '../graph-node' +import express, { Request, Response } from 'express' +import { AddressInfo } from 'net' +import { utils } from 'ethers' + +const EXAMPLE_VALID_IPFS_HASH = 'Qmd9nZKCH8UZU1pBzk7G8ECJr3jX3a2vAf3vowuTwFvrQg' +const EXAMPLE_NON_MANIFEST_VALID_IPFS_HASH = + 'QmddQDkcHHM7mGvYrrnoGnQ1q9GdHQfbTvj2mfbyz2Q49K' + +function mockManifestHash(input: string): string { + const utf8Bytes = utils.toUtf8Bytes(input) + const hash = utils.keccak256(utf8Bytes) // Generate a keccak256 hash of the input + return new SubgraphDeploymentID(hash).ipfsHash +} + +const DEP_ROOT_HASH = mockManifestHash('root') +const DEP_1 = mockManifestHash('dep1') +const DEP_2 = mockManifestHash('dep2') +const DEP_3 = mockManifestHash('dep3') + +describe(SubgraphManifestResolver, () => { + let ipfs: SubgraphManifestResolver + const app = express() + + /* eslint-disable @typescript-eslint/no-explicit-any */ + let server: any + + const manifestMap = new Map() + manifestMap.set( + EXAMPLE_VALID_IPFS_HASH, + ` + specVersion: "0.0.2" + name: "test" + graft: + base: "test" + block: 5 + `, + ) + + // this example is a real world contract schema + manifestMap.set( + EXAMPLE_NON_MANIFEST_VALID_IPFS_HASH, + ` + { + "name": "test" + } + `, + ) + + manifestMap.set( + DEP_ROOT_HASH, + ` + specVersion: "0.0.2" + name: "root" + graft: + base: ${DEP_1} + block: 1000 + `, + ) + manifestMap.set( + DEP_1, + ` + specVersion: "0.0.2" + name: "dep1" + graft: + base: ${DEP_2} + block: 550 + `, + ) + manifestMap.set( + DEP_2, + ` + specVersion: "0.0.2" + name: "dep2" + graft: + base: ${DEP_3} + block: 250 + `, + ) + manifestMap.set( + DEP_3, + ` + specVersion: "0.0.2" + name: "dep3" + `, + ) + + beforeAll(async () => { + // Mock endpoint for IPFS CID requests + app.post('/api/v0/cat', (req: Request, res: Response) => { + const cid = req.query.arg as string + if (!cid) { + const err = new Error('CID parameter is required') + res.status(400) + return res.send(err.message) + } + + // console.log(`got cid ${cid}`) + + // Example: Respond with different data based on the CID + if (manifestMap.has(cid)) { + res.send(manifestMap.get(cid)) + } else { + // console.log(`CID not found: ${cid}`) + res.status(404).send('Not Found') + } + }) + + // Handler for all other routes + app.use((req: Request, res: Response, next) => { + // console.log(`404: ${req.url}, ${req.method}`) + res.status(404).send('Not Found') + next() + }) + + // Start server and bind to a random port + server = await new Promise((resolve, reject) => { + const s = app.listen(0, () => { + const address: AddressInfo = s.address() as AddressInfo + console.log(`Mock server running on ${address.address}:${address.port}`) + const serverAddress = `http://localhost:${address.port}` + ipfs = new SubgraphManifestResolver(serverAddress, createLogger({ name: 'test' })) + resolve(s) + }) + s.on('error', reject) + }) + }) + + afterAll(async () => { + // Shut down the server + if (server) { + await new Promise((resolve, reject) => { + server.close((err: Error | undefined) => { + if (err) { + reject(err) + } else { + resolve() + } + }) + }) + console.log('Mock server shut down') + } + }) + + it('should fetch and parse a valid manifest', async () => { + const manifest = await ipfs.resolve(new SubgraphDeploymentID(EXAMPLE_VALID_IPFS_HASH)) + expect(manifest).toEqual({ + specVersion: '0.0.2', + name: 'test', + graft: { base: 'test', block: 5 }, + }) + }) + + it('should throw an error when fetching an invalid manifest', async () => { + await expect( + ipfs.resolve(new SubgraphDeploymentID(EXAMPLE_NON_MANIFEST_VALID_IPFS_HASH)), + ).rejects.toThrow() + }) + + it('should throw an error when fetching a non-existent manifest', async () => { + await expect( + ipfs.resolve( + new SubgraphDeploymentID('QmeDVcAvgYPKFCw2VCqTK3JRexHT8jkgvQ7AJ9WxhuFNM8'), + ), + ).rejects.toThrow() + }) + + it('should resolve dependencies', async () => { + const manifest: SubgraphDependencies = await ipfs.resolveGraftDependencies( + new SubgraphDeploymentID(DEP_ROOT_HASH), + ) + expect(manifest).toEqual({ + root: new SubgraphDeploymentID(DEP_ROOT_HASH), + dependencies: [ + // dependencies are fetched in oldest to newest order + { base: new SubgraphDeploymentID(DEP_3), block: 250 }, + { base: new SubgraphDeploymentID(DEP_2), block: 550 }, + { base: new SubgraphDeploymentID(DEP_1), block: 1000 }, + ], + }) + }) +}) diff --git a/packages/indexer-common/src/actions.ts b/packages/indexer-common/src/actions.ts index 9aec66814..afa265d02 100644 --- a/packages/indexer-common/src/actions.ts +++ b/packages/indexer-common/src/actions.ts @@ -108,7 +108,7 @@ export const validateActionInputs = async ( ) } - // Must have status QUEUED or APPROVED + // Must have status QUEUED or APPROVED, or DEPLOYING if ( [ ActionStatus.FAILED, @@ -156,7 +156,7 @@ export const validateActionInputs = async ( export interface ActionFilter { id?: number | undefined type?: ActionType - status?: ActionStatus + status?: ActionStatus | ActionStatus[] source?: string reason?: string updatedAt?: WhereOperators @@ -204,6 +204,7 @@ export enum ActionStatus { QUEUED = 'queued', APPROVED = 'approved', PENDING = 'pending', + DEPLOYING = 'deploying', SUCCESS = 'success', FAILED = 'failed', CANCELED = 'canceled', diff --git a/packages/indexer-common/src/allocations/__tests__/tap.test.ts b/packages/indexer-common/src/allocations/__tests__/tap.test.ts index b096a9220..d85e50c8f 100644 --- a/packages/indexer-common/src/allocations/__tests__/tap.test.ts +++ b/packages/indexer-common/src/allocations/__tests__/tap.test.ts @@ -51,6 +51,7 @@ const setup = async () => { 'https://test-admin-endpoint.xyz', 'https://test-query-endpoint.xyz', 'https://test-status-endpoint.xyz', + 'https://test-ipfs-endpoint.xyz', ) const network = await Network.create( diff --git a/packages/indexer-common/src/allocations/__tests__/validate-queries.test.ts b/packages/indexer-common/src/allocations/__tests__/validate-queries.test.ts index dbfec4b11..0eaf46722 100644 --- a/packages/indexer-common/src/allocations/__tests__/validate-queries.test.ts +++ b/packages/indexer-common/src/allocations/__tests__/validate-queries.test.ts @@ -44,6 +44,7 @@ const setup = async () => { 'https://test-admin-endpoint.xyz', 'https://test-query-endpoint.xyz', 'https://test-status-endpoint.xyz', + 'https://test-ipfs-endpoint.xyz', ) const network = await Network.create( diff --git a/packages/indexer-common/src/graph-node.ts b/packages/indexer-common/src/graph-node.ts index 494247e44..1b7edb300 100644 --- a/packages/indexer-common/src/graph-node.ts +++ b/packages/indexer-common/src/graph-node.ts @@ -12,6 +12,7 @@ import { BlockPointer, ChainIndexingStatus, IndexingStatus } from './types' import pRetry, { Options } from 'p-retry' import axios, { AxiosInstance } from 'axios' import fetch from 'isomorphic-fetch' +import yaml from 'yaml' interface indexNode { id: string @@ -67,9 +68,111 @@ export const parseGraphQLBlockPointer = (block: any): BlockPointer | null => } : null +export interface SubgraphDependencies { + root: SubgraphDeploymentID + dependencies: SubgraphDependency[] +} + +export interface SubgraphDependency { + base: SubgraphDeploymentID + block: number +} + +export class SubgraphManifestResolver { + private ipfsBaseUrl: URL + private ipfsClient: AxiosInstance + private logger: Logger + + constructor(ipfsEndpoint: string, logger: Logger) { + this.ipfsBaseUrl = new URL(`/api/v0/`, ipfsEndpoint) + this.ipfsClient = axios.create({}) + this.ipfsClient.interceptors.request.use((config) => { + logger.info(`Subgraph Manifest IPFS request: ${config.url}`) + return config + }) + this.logger = logger + } + + /** + * Resolves a subgraph's manifest. + * + * @param subgraphDeploymentId + * @returns Promise + */ + public async resolve( + subgraphDeploymentId: SubgraphDeploymentID, + ): Promise { + const response = await this.ipfsClient.post( + `${this.ipfsBaseUrl}cat?arg=${subgraphDeploymentId.ipfsHash}`, + ) + return yaml.parse(response.data) + } + + /** + * resolveCompositionDependencies + * resolves the composed dependencies in datasource[].source.address that is a Qm hash (starts with Qm) + * @param subgraphDeploymentId + * @returns Promise + */ + public async resolveCompositionDependencies( + subgraphDeploymentId: SubgraphDeploymentID, + ): Promise { + const manifest = await this.resolve(subgraphDeploymentId) + const dependencies: SubgraphDependency[] = [] + if (manifest['dataSources']) { + for (const dataSource of manifest['dataSources']) { + if ( + dataSource['source'] && + dataSource['source']['address'] && + dataSource['source']['address'].startsWith('Qm') + ) { + const dep = { + block: dataSource['source']['startBlock'], + base: new SubgraphDeploymentID(dataSource['source']['address']), + } + dependencies.push(dep) + } + } + } + return dependencies + } + + /** + * Resolves a subgraph's manifest and its dependencies in the order that they need to be resolved. + * + * @param subgraphDeploymentId + * @returns Promise + */ + public async resolveGraftDependencies( + subgraphDeploymentId: SubgraphDeploymentID, + ): Promise { + const deps: SubgraphDependencies = { + root: subgraphDeploymentId, + dependencies: [], + } + const root = await this.resolve(subgraphDeploymentId) + let currentManifest = root + let dependency = currentManifest['graft'] + while (dependency) { + const dep = { + block: dependency.block, + base: new SubgraphDeploymentID(dependency.base), + } + // push onto the front of the list so we always have the deepest deps first + deps.dependencies.unshift(dep) + const nextManifest = await this.resolve(dep.base) + currentManifest = nextManifest + dependency = currentManifest['graft'] + } + return deps + } +} + export class GraphNode { admin: RpcClient private queryBaseURL: URL + private manifestResolver: SubgraphManifestResolver + status: Client logger: Logger @@ -78,6 +181,7 @@ export class GraphNode { adminEndpoint: string, queryEndpoint: string, statusEndpoint: string, + ipfsEndpoint: string, ) { this.logger = logger.child({ component: 'GraphNode' }) this.status = createClient({ @@ -95,6 +199,10 @@ export class GraphNode { } this.queryBaseURL = new URL(`/subgraphs/id/`, queryEndpoint) + this.manifestResolver = new SubgraphManifestResolver( + ipfsEndpoint, + this.logger.child({ component: 'SubgraphManifestResolver' }), + ) } async connect(): Promise { @@ -192,6 +300,16 @@ export class GraphNode { throw nodeOnlyResult.error } + if ( + !nodeOnlyResult.data.indexingStatuses || + nodeOnlyResult.data.indexingStatuses.length === 0 + ) { + this.logger.debug(`No 'indexingStatuses' data returned from index nodes`, { + data: nodeOnlyResult.data, + }) + return [] + } + const withAssignments: string[] = nodeOnlyResult.data.indexingStatuses .filter( (result: { node: string | null }) => @@ -603,6 +721,11 @@ export class GraphNode { }) await this.resume(deployment) } else { + // Subgraph deployment not found + await this.autoGraftDeployDependencies(deployment, deploymentAssignments, name) + await this.autoCompositionDependencies(deployment, deploymentAssignments, name) + + // Create and deploy the subgraph this.logger.debug( 'Subgraph deployment not found, creating subgraph name and deploying...', { @@ -625,6 +748,281 @@ export class GraphNode { } } + /** + * Automatically deploy composition dependencies + * @param deployment + * @param deploymentAssignments + * @param name + * @returns void + */ + private async autoCompositionDependencies( + deployment: SubgraphDeploymentID, + deploymentAssignments: SubgraphDeploymentAssignment[], + name: string, + ) { + this.logger.debug('Auto composition deploy subgraph dependencies') + const { network: subgraphChainName } = await this.subgraphFeatures(deployment) + const dependencies = + await this.manifestResolver.resolveCompositionDependencies(deployment) + // these dependencies should be deployed in parallel + await Promise.all( + dependencies.map(async (dependency) => { + await this.ensureDependency(dependency, name) + await this.syncToBlock( + dependency.block, + dependency.base, + subgraphChainName, + false, // dependencies in composition should continue indexing + ) + }), + ) + } + + /** + * Automatically deploy any dependencies of the subgraph, returning only when they are sync'd to the specified block. + * + * Note: All dependencies must be present on the same network as the root deployment. + * + * @param deployment + * @param deploymentAssignments + * @param name + * @returns + */ + private async autoGraftDeployDependencies( + deployment: SubgraphDeploymentID, + deploymentAssignments: SubgraphDeploymentAssignment[], + name: string, + ) { + this.logger.debug('Auto graft deploy subgraph dependencies') + const { network: subgraphChainName } = await this.subgraphFeatures(deployment) + const dependencies = await this.manifestResolver.resolveGraftDependencies(deployment) + if (dependencies.dependencies.length == 0) { + this.logger.debug('No subgraph dependencies found', { + name, + deployment: deployment.display, + }) + } else { + this.logger.debug( + 'graft dep chain found', + dependencies.dependencies.map((d) => d.base.ipfsHash), + ) + + for (const dependency of dependencies.dependencies) { + await this.ensureDependency(dependency, name) + await this.syncToBlock(dependency.block, dependency.base, subgraphChainName, true) + } + } + } + + /** + * Ensure a dependency is deployed and healthy + * @param dependency + * @param name + */ + private async ensureDependency(dependency: SubgraphDependency, name: string) { + const queriedAssignments = await this.subgraphDeploymentAssignmentsByDeploymentID( + SubgraphStatus.ACTIVE, + [dependency.base.ipfsHash], + ) + this.logger.debug( + 'queried graph-node for assignment', + queriedAssignments.map((a: SubgraphDeploymentAssignment) => { + return { ipfsHash: a.id.ipfsHash, ...a } + }), + ) + const dependencyAssignment = queriedAssignments.find( + (assignment) => assignment.id.ipfsHash == dependency.base.ipfsHash, + ) + + if (dependencyAssignment) { + this.logger.info("Dependency subgraph found, checking if it's healthy", { + name, + deployment: dependency.base.display, + block_required: dependency.block, + }) + + const indexingStatus = await this.indexingStatus([dependency.base]) + const deploymentStatus = indexingStatus.find( + (status) => status.subgraphDeployment.ipfsHash === dependency.base.ipfsHash, + ) + if (!deploymentStatus) { + this.logger.error(`Subgraph not found in indexing status`, { + subgraph: dependency.base.ipfsHash, + indexingStatus, + }) + throw new Error(`Subgraph not found in indexing status`) + } else { + this.logger.info( + 'Dependency subgraph found, will try to sync it to the block required', + { + deploymentStatus, + }, + ) + } + } else { + this.logger.debug( + 'Dependency subgraph not found, creating, deploying and pausing...', + { + name, + deployment: dependency.base.display, + block_required: dependency.block, + }, + ) + await this.create(name) + await this.deploy(name, dependency.base) + } + } + + /** + * Wait for the block to be synced, polling indexing status until it is + * The Deployment should already be created and deployed to graph-node + * This will resume a paused subgraph if the block height target is higher than the + * current block height + */ + public async syncToBlock( + blockHeight: number, + subgraphDeployment: SubgraphDeploymentID, + chainName: string | null, + shouldPauseWhenComplete: boolean, + ): Promise { + async function waitForMs(ms: number) { + return new Promise((resolve) => setTimeout(resolve, ms)) + } + + this.logger.info(`Begin syncing subgraph deployment to block`, { + subgraph: subgraphDeployment.ipfsHash, + blockHeight, + }) + + // loop-wait for the block to be synced + for (;;) { + // first ensure it's been deployed and is active, or already paused + let deployed: SubgraphDeploymentAssignment[] = [] + let attempt = 0 + + const maxAttempts = 5 + const waitTimeMs = 3000 + + // wait and poll for the assignment to be created. + while (attempt < maxAttempts) { + await waitForMs(waitTimeMs) + deployed = await this.subgraphDeploymentAssignmentsByDeploymentID( + SubgraphStatus.ALL, + [subgraphDeployment.ipfsHash], + ) + if (deployed.length > 0) { + this.logger.info(`Subgraph deployment active or already paused`, { + subgraph: subgraphDeployment.ipfsHash, + status: deployed, + }) + break + } + this.logger.info(`Subgraph deployment not yet active, waiting...`, { + subgraph: subgraphDeployment.ipfsHash, + attempt, + deployed, + }) + attempt += 1 + if (attempt >= maxAttempts) { + this.logger.error(`Subgraph not deployed and active`, { + subgraph: subgraphDeployment.ipfsHash, + }) + throw new Error( + `Subgraph ${subgraphDeployment.ipfsHash} not deployed and active after ${maxAttempts} attempts, cannot sync to block ${blockHeight}`, + ) + } + } + + const indexingStatus = await this.indexingStatus([subgraphDeployment]) + const deploymentStatus = indexingStatus.find( + (status) => status.subgraphDeployment.ipfsHash === subgraphDeployment.ipfsHash, + ) + + if (!deploymentStatus) { + this.logger.error(`Subgraph not found in indexing status`, { + subgraph: subgraphDeployment.ipfsHash, + indexingStatus, + }) + throw new Error(`Subgraph not found in indexing status`) + } + + const chain = deploymentStatus.chains.find((chain) => chain.network === chainName) + + if (!chain) { + this.logger.error(`Chain not found in indexing status for deployment`, { + subgraph: subgraphDeployment.ipfsHash, + chainName, + status: deploymentStatus, + }) + throw new Error(`Chain not found in indexing status for deployment`) + } + + // NOTES: + // - latestBlock is the latest block that has been indexed + // - earliestBlock and chainHeadBlock are the earliest and latest blocks on the chain, respectively + // if the deployment is paused and latestBlock is null or lower than we need, unpause it, + // otherwise, if it's paused, we can't unpause it, so just wait + if ( + deployed[0].paused && + (!chain.latestBlock || chain.latestBlock.number < blockHeight) + ) { + this.logger.debug(`Subgraph paused and not yet synced to block, resuming`, { + subgraph: subgraphDeployment.ipfsHash, + indexingStatus, + }) + await this.resume(subgraphDeployment) + } + + // Is the graftBaseBlock within the range of the earliest and head of the chain? + if (chain.latestBlock && chain.latestBlock.number >= blockHeight) { + if (!deployed[0].paused) { + this.logger.debug(`Subgraph synced to block! Pausing as requirement is met.`, { + subgraph: subgraphDeployment.ipfsHash, + indexingStatus, + }) + // pause the subgraph to prevent further indexing (grafting path) + if (shouldPauseWhenComplete) { + this.logger.debug("Pausing subgraph deployment as it's synced to block", { + subgraph: subgraphDeployment.ipfsHash, + blockHeight, + }) + await this.pause(subgraphDeployment) + } // Composition path + else { + this.logger.debug( + `Subgraph is syncd to block ${chain.latestBlock.number} not pausing.`, + { + subgraph: subgraphDeployment.ipfsHash, + indexingStatus, + }, + ) + break + } + } else { + this.logger.debug(`Subgraph already paused and synced to block.`, { + subgraph: subgraphDeployment.ipfsHash, + indexingStatus, + }) + } + break + } + + this.logger.debug( + `Subgraph not yet synced to block ${blockHeight}, waiting for 3s`, + { + subgraph: subgraphDeployment.ipfsHash, + indexingStatus, + }, + ) + await waitForMs(waitTimeMs) + } + + this.logger.debug(`End syncing subgraph deployment synced to block`, { + subgraph: subgraphDeployment.ipfsHash, + blockHeight, + }) + } + // -------------------------------------------------------------------------------- // * Indexing Status // -------------------------------------------------------------------------------- diff --git a/packages/indexer-common/src/index.ts b/packages/indexer-common/src/index.ts index ab3eedd97..e22722403 100644 --- a/packages/indexer-common/src/index.ts +++ b/packages/indexer-common/src/index.ts @@ -15,5 +15,4 @@ export * from './types' export * from './utils' export * from './parsers' export * as specification from './network-specification' -export * from './multi-networks' export * from './sequential-timer' diff --git a/packages/indexer-common/src/indexer-management/__tests__/allocations.test.ts b/packages/indexer-common/src/indexer-management/__tests__/allocations.test.ts index 5c3a0ad28..eafd14665 100644 --- a/packages/indexer-common/src/indexer-management/__tests__/allocations.test.ts +++ b/packages/indexer-common/src/indexer-management/__tests__/allocations.test.ts @@ -56,6 +56,7 @@ const setup = async () => { 'https://test-admin-endpoint.xyz', 'https://test-query-endpoint.xyz', 'https://test-status-endpoint.xyz', + 'https://test-ipfs-endpoint.xyz', ) const network = await Network.create( diff --git a/packages/indexer-common/src/indexer-management/__tests__/helpers.test.ts b/packages/indexer-common/src/indexer-management/__tests__/helpers.test.ts index c97df0722..a10f953c8 100644 --- a/packages/indexer-common/src/indexer-management/__tests__/helpers.test.ts +++ b/packages/indexer-common/src/indexer-management/__tests__/helpers.test.ts @@ -14,7 +14,6 @@ import { IndexingRuleAttributes, } from '../models' import { defineQueryFeeModels, specification as spec } from '../../index' -import { networkIsL1, networkIsL2 } from '../types' import { fetchIndexingRules, upsertIndexingRule } from '../rules' import { SubgraphFreshnessChecker, SubgraphIdentifierType } from '../../subgraphs' import { ActionManager } from '../actions' @@ -104,6 +103,7 @@ const setupMonitor = async () => { 'http://test-admin-endpoint.xyz', 'https://test-query-endpoint.xyz', statusEndpoint, + 'https://test-ipfs-endpoint.xyz', ) const indexerOptions = spec.IndexerOptions.parse({ @@ -479,51 +479,3 @@ describe.skip('Monitor: CI', () => { await expect(deployments.length).toBeGreaterThan(500) }, 40000) }) - -describe('Network layer detection', () => { - interface NetworkLayer { - name: string - l1: boolean - l2: boolean - } - - // Should be true for L1 and false for L2 - const l1Networks: NetworkLayer[] = ['mainnet', 'eip155:1', 'sepolia'].map( - (name: string) => ({ name, l1: true, l2: false }), - ) - - // Should be false for L1 and true for L2 - const l2Networks: NetworkLayer[] = [ - 'arbitrum-one', - 'eip155:42161', - 'eip155:421614', - ].map((name: string) => ({ name, l1: false, l2: true })) - - // Those will be false for L1 and L2 - const nonProtocolNetworks: NetworkLayer[] = [ - 'fantom', - 'eip155:250', - 'hardhat', - 'eip155:1337', - 'matic', - 'eip155:137', - 'gnosis', - 'eip155:100', - ].map((name: string) => ({ name, l1: false, l2: false })) - - const testCases = [...l1Networks, ...l2Networks, ...nonProtocolNetworks] - - test.each(testCases)('Can detect network layer [$name]', (network) => { - expect(networkIsL1(network.name)).toStrictEqual(network.l1) - expect(networkIsL2(network.name)).toStrictEqual(network.l2) - }) - - const invalidTProtocolNetworkNames = ['invalid-name', 'eip155:9999'] - - test.each(invalidTProtocolNetworkNames)( - 'Throws error when protocol network is unknown [%s]', - (invalidProtocolNetworkName) => { - expect(() => networkIsL1(invalidProtocolNetworkName)).toThrow() - }, - ) -}) diff --git a/packages/indexer-common/src/indexer-management/__tests__/util.ts b/packages/indexer-common/src/indexer-management/__tests__/util.ts index a4a14c605..3beed0320 100644 --- a/packages/indexer-common/src/indexer-management/__tests__/util.ts +++ b/packages/indexer-common/src/indexer-management/__tests__/util.ts @@ -9,7 +9,6 @@ import { IndexerManagementClient, IndexerManagementDefaults, loadTestYamlConfig, - MultiNetworks, Network, specification, } from '@graphprotocol/indexer-common' @@ -40,6 +39,7 @@ export const createTestManagementClient = async ( 'http://test-admin-endpoint.xyz', 'https://test-query-endpoint.xyz', statusEndpoint, + 'https://test-ipfs-endpoint.xyz', ) const networkSpecification = { ...testNetworkSpecification } @@ -66,17 +66,12 @@ export const createTestManagementClient = async ( network.specification.networkIdentifier = networkIdentifierOverride } - const multiNetworks = new MultiNetworks( - [network], - (n: Network) => n.specification.networkIdentifier, - ) - return await createIndexerManagementClient({ models: managementModels, graphNode, logger, defaults, - multiNetworks, + network, }) } diff --git a/packages/indexer-common/src/indexer-management/actions.ts b/packages/indexer-common/src/indexer-management/actions.ts index 237dd718f..d2df47954 100644 --- a/packages/indexer-common/src/indexer-management/actions.ts +++ b/packages/indexer-common/src/indexer-management/actions.ts @@ -13,8 +13,6 @@ import { IndexerErrorCode, IndexerManagementModels, isActionFailure, - MultiNetworks, - NetworkMapped, Network, OrderDirection, GraphNode, @@ -23,37 +21,34 @@ import { import { Order, Transaction } from 'sequelize' import { Eventual, join, Logger } from '@graphprotocol/common-ts' -import groupBy from 'lodash.groupby' export class ActionManager { - declare multiNetworks: MultiNetworks + declare network: Network declare logger: Logger declare models: IndexerManagementModels - declare allocationManagers: NetworkMapped + declare allocationManager: AllocationManager executeBatchActionsPromise: Promise | undefined static async create( - multiNetworks: MultiNetworks, + network: Network, logger: Logger, models: IndexerManagementModels, graphNode: GraphNode, ): Promise { const actionManager = new ActionManager() - actionManager.multiNetworks = multiNetworks + actionManager.network = network actionManager.logger = logger.child({ component: 'ActionManager' }) actionManager.models = models - actionManager.allocationManagers = await multiNetworks.map(async (network) => { - return new AllocationManager( - logger.child({ - component: 'AllocationManager', - protocolNetwork: network.specification.networkIdentifier, - }), - models, - graphNode, - network, - ) - }) + actionManager.allocationManager = new AllocationManager( + logger.child({ + component: 'AllocationManager', + protocolNetwork: network.specification.networkIdentifier, + }), + models, + graphNode, + network, + ) logger.info('Begin monitoring the queue for approved actions to execute') await actionManager.monitorQueue() @@ -129,7 +124,7 @@ export class ActionManager { let actions: Action[] = [] try { actions = await ActionManager.fetchActions(this.models, null, { - status: ActionStatus.APPROVED, + status: [ActionStatus.APPROVED, ActionStatus.DEPLOYING], }) logger.trace(`Fetched ${actions.length} approved actions`) } catch (err) { @@ -147,62 +142,53 @@ export class ActionManager { join({ approvedActions }).pipe(async ({ approvedActions }) => { logger.debug('Approved actions found, evaluating batch') - const approvedActionsByNetwork: NetworkMapped = groupBy( - approvedActions, - (action: Action) => action.protocolNetwork, - ) - await this.multiNetworks.mapNetworkMapped( - approvedActionsByNetwork, - async (network: Network, approvedActions: Action[]) => { - const networkLogger = logger.child({ - protocolNetwork: network.specification.networkIdentifier, - indexer: network.specification.indexerOptions.address, - operator: network.transactionManager.wallet.address, + const networkLogger = logger.child({ + protocolNetwork: this.network.specification.networkIdentifier, + indexer: this.network.specification.indexerOptions.address, + operator: this.network.transactionManager.wallet.address, + }) + + if (await this.batchReady(approvedActions, this.network, networkLogger)) { + const paused = await this.network.paused.value() + const isOperator = await this.network.isOperator.value() + networkLogger.debug('Batch ready, preparing to execute', { + paused, + isOperator, + protocolNetwork: this.network.specification.networkIdentifier, + }) + // Do nothing else if the network is paused + if (paused) { + networkLogger.info( + `The network is currently paused, not doing anything until it resumes`, + ) + return + } + + // Do nothing if we're not authorized as an operator for the indexer + if (!isOperator) { + networkLogger.error(`Not authorized as an operator for the indexer`, { + err: indexerError(IndexerErrorCode.IE034), }) + return + } - if (await this.batchReady(approvedActions, network, networkLogger)) { - const paused = await network.paused.value() - const isOperator = await network.isOperator.value() - networkLogger.debug('Batch ready, preparing to execute', { - paused, - isOperator, - protocolNetwork: network.specification.networkIdentifier, - }) - // Do nothing else if the network is paused - if (paused) { - networkLogger.info( - `The network is currently paused, not doing anything until it resumes`, - ) - return - } - - // Do nothing if we're not authorized as an operator for the indexer - if (!isOperator) { - networkLogger.error(`Not authorized as an operator for the indexer`, { - err: indexerError(IndexerErrorCode.IE034), - }) - return - } - - networkLogger.info('Executing batch of approved actions', { - actions: approvedActions, - note: 'If actions were approved very recently they may be missing from this batch', - }) + networkLogger.info('Executing batch of approved actions', { + actions: approvedActions, + note: 'If actions were approved very recently they may be missing from this batch', + }) - try { - const attemptedActions = await this.executeApprovedActions(network) - networkLogger.trace('Attempted to execute all approved actions', { - actions: attemptedActions, - }) - } catch (error) { - networkLogger.error('Failed to execute batch of approved actions', { - error, - }) - } - } - }, - ) + try { + const attemptedActions = await this.executeApprovedActions(this.network) + networkLogger.trace('Attempted to execute all approved actions', { + actions: attemptedActions, + }) + } catch (error) { + networkLogger.error('Failed to execute batch of approved actions', { + error, + }) + } + } }) } @@ -299,7 +285,7 @@ export class ActionManager { { isolationLevel: Transaction.ISOLATION_LEVELS.SERIALIZABLE }, async (transaction) => { batchStartTime = Date.now() - let approvedActions + let approvedAndDeployingActions try { // Execute already approved actions in the order of type and priority. // Unallocate actions are prioritized to free up stake that can be used @@ -307,9 +293,12 @@ export class ActionManager { // Reallocate actions are prioritized before allocate as they are for // existing syncing deployments with relatively smaller changes made. const actionTypePriority = ['unallocate', 'reallocate', 'allocate'] - approvedActions = ( + approvedAndDeployingActions = ( await this.models.Action.findAll({ - where: { status: ActionStatus.APPROVED, protocolNetwork }, + where: { + status: [ActionStatus.APPROVED, ActionStatus.DEPLOYING], + protocolNetwork, + }, order: [['priority', 'ASC']], transaction, lock: transaction.LOCK.UPDATE, @@ -324,25 +313,31 @@ export class ActionManager { transaction, }) if (pendingActions.length > 0) { - logger.warn(`${pendingActions} Actions found in PENDING state when execution began. Was there a crash? \ - These indicate that execution was interrupted and will need to be cleared manually.`) + logger.warn( + `${pendingActions} Actions found in PENDING state when execution began. Was there a crash?` + + `These indicate that execution was interrupted while calling contracts, and will need to be cleared manually.`, + ) } - if (approvedActions.length === 0) { + if (approvedAndDeployingActions.length === 0) { logger.debug('No approved actions were found for this network') return [] } logger.debug( - `Found ${approvedActions.length} approved actions for this network `, - { approvedActions }, + `Found ${approvedAndDeployingActions.length} approved actions for this network `, + { approvedActions: approvedAndDeployingActions }, ) } catch (error) { logger.error('Failed to query approved actions for network', { error }) return [] } - // mark all approved actions as PENDING, this serves as a lock on other processing of them - await this.markActions(approvedActions, transaction, ActionStatus.PENDING) - return approvedActions + // mark all approved actions as DEPLOYING, this serves as a lock on other processing of them + await this.markActions( + approvedAndDeployingActions, + transaction, + ActionStatus.DEPLOYING, + ) + return approvedAndDeployingActions }, ) @@ -352,13 +347,30 @@ export class ActionManager { startTimeMs: Date.now() - batchStartTime, }) - const allocationManager = - this.allocationManagers[network.specification.networkIdentifier] - let results try { + // TODO: we should lift the batch execution (graph-node, then contracts) up to here so we can + // mark the actions appropriately + const onFinishedDeploying = async (validatedActions) => { + // After we ensure that we have finished deploying new subgraphs (and possibly their dependencies) to graph-node, + // we can mark the actions as PENDING. + logger.debug('Finished deploying actions, marking as PENDING') + this.models.Action.sequelize!.transaction( + { isolationLevel: Transaction.ISOLATION_LEVELS.SERIALIZABLE }, + async (transaction) => { + return await this.markActions( + validatedActions, + transaction, + ActionStatus.PENDING, + ) + }, + ) + } // This will return all results if successful, if failed it will return the failed actions - results = await allocationManager.executeBatch(prioritizedActions) + results = await this.allocationManager.executeBatch( + prioritizedActions, + onFinishedDeploying, + ) logger.debug('Completed batch action execution', { results, endTimeMs: Date.now() - batchStartTime, diff --git a/packages/indexer-common/src/indexer-management/allocations.ts b/packages/indexer-common/src/indexer-management/allocations.ts index 035aa0932..fcc614f07 100644 --- a/packages/indexer-common/src/indexer-management/allocations.ts +++ b/packages/indexer-common/src/indexer-management/allocations.ts @@ -105,10 +105,13 @@ export class AllocationManager { private network: Network, ) {} - async executeBatch(actions: Action[]): Promise { + async executeBatch( + actions: Action[], + onFinishedDeploying: (actions: Action[]) => Promise, + ): Promise { const logger = this.logger.child({ function: 'executeBatch' }) logger.trace('Executing action batch', { actions }) - const result = await this.executeTransactions(actions) + const result = await this.executeTransactions(actions, onFinishedDeploying) if (Array.isArray(result)) { logger.trace('Execute batch transaction failed', { actionBatchResult: result }) return result as ActionFailure[] @@ -116,7 +119,10 @@ export class AllocationManager { return await this.confirmTransactions(result, actions) } - private async executeTransactions(actions: Action[]): Promise { + private async executeTransactions( + actions: Action[], + onFinishedDeploying: (actions: Action[]) => Promise, + ): Promise { const logger = this.logger.child({ function: 'executeTransactions' }) logger.trace('Begin executing transactions', { actions }) if (actions.length < 1) { @@ -127,6 +133,7 @@ export class AllocationManager { logger.trace('Validated actions', { validatedActions }) await this.deployBeforeAllocating(logger, validatedActions) + await onFinishedDeploying(validatedActions) const populateTransactionsResults = await this.prepareTransactions(validatedActions) @@ -462,6 +469,7 @@ export class AllocationManager { if (receipt === 'paused' || receipt === 'unauthorized') { throw indexerError( IndexerErrorCode.IE062, + `Allocation not created. ${ receipt === 'paused' ? 'Network paused' : 'Operator not authorized' }`, diff --git a/packages/indexer-common/src/indexer-management/client.ts b/packages/indexer-common/src/indexer-management/client.ts index 280eb62cb..c1b545a15 100644 --- a/packages/indexer-common/src/indexer-management/client.ts +++ b/packages/indexer-common/src/indexer-management/client.ts @@ -14,7 +14,7 @@ import poiDisputeResolvers from './resolvers/poi-disputes' import statusResolvers from './resolvers/indexer-status' import { BigNumber } from 'ethers' import { GraphNode } from '../graph-node' -import { ActionManager, MultiNetworks, Network } from '@graphprotocol/indexer-common' +import { ActionManager, Network } from '@graphprotocol/indexer-common' export interface IndexerManagementResolverContext { models: IndexerManagementModels @@ -22,7 +22,7 @@ export interface IndexerManagementResolverContext { logger: Logger defaults: IndexerManagementDefaults actionManager: ActionManager | undefined - multiNetworks: MultiNetworks | undefined + network: Network } const SCHEMA_SDL = gql` @@ -104,6 +104,7 @@ const SCHEMA_SDL = gql` enum ActionStatus { queued approved + deploying pending success failed @@ -452,7 +453,7 @@ export interface IndexerManagementClientOptions { logger: Logger models: IndexerManagementModels graphNode: GraphNode - multiNetworks: MultiNetworks | undefined + network: Network defaults: IndexerManagementDefaults } @@ -468,12 +469,10 @@ export class IndexerManagementClient extends Client { } } -// TODO:L2: Put the IndexerManagementClient creation inside the Agent, and receive -// MultiNetworks from it export const createIndexerManagementClient = async ( options: IndexerManagementClientOptions, ): Promise => { - const { models, graphNode, logger, defaults, multiNetworks } = options + const { models, graphNode, logger, defaults, network } = options const schema = buildSchema(print(SCHEMA_SDL)) const resolvers = { ...indexingRuleResolvers, @@ -484,8 +483,8 @@ export const createIndexerManagementClient = async ( ...actionResolvers, } - const actionManager = multiNetworks - ? await ActionManager.create(multiNetworks, logger, models, graphNode) + const actionManager = network + ? await ActionManager.create(network, logger, models, graphNode) : undefined const context: IndexerManagementResolverContext = { @@ -493,7 +492,7 @@ export const createIndexerManagementClient = async ( graphNode, defaults, logger: logger.child({ component: 'IndexerManagementClient' }), - multiNetworks, + network, actionManager, } diff --git a/packages/indexer-common/src/indexer-management/models/action.ts b/packages/indexer-common/src/indexer-management/models/action.ts index 642a9fbca..f89a644b3 100644 --- a/packages/indexer-common/src/indexer-management/models/action.ts +++ b/packages/indexer-common/src/indexer-management/models/action.ts @@ -81,6 +81,7 @@ export const defineActionModels = (sequelize: Sequelize): ActionModels => { ActionStatus.QUEUED, ActionStatus.APPROVED, ActionStatus.PENDING, + ActionStatus.DEPLOYING, ActionStatus.CANCELED, ), allowNull: false, diff --git a/packages/indexer-common/src/indexer-management/monitor.ts b/packages/indexer-common/src/indexer-management/monitor.ts index e09e43fef..69219258c 100644 --- a/packages/indexer-common/src/indexer-management/monitor.ts +++ b/packages/indexer-common/src/indexer-management/monitor.ts @@ -16,7 +16,6 @@ import { BlockPointer, resolveChainId, resolveChainAlias, - TransferredSubgraphDeployment, sequentialTimerReduce, } from '@graphprotocol/indexer-common' import { @@ -561,93 +560,6 @@ export class NetworkMonitor { } } - async transferredDeployments(): Promise { - this.logger.debug('Querying the Network for transferred subgraph deployments') - try { - const result = await this.networkSubgraph.checkedQuery( - // TODO: Consider querying for the same time range as the Agent's evaluation, limiting - // results to recent transfers. - gql` - { - subgraphs( - where: { startedTransferToL2: true } - orderBy: startedTransferToL2At - orderDirection: asc - ) { - id - idOnL1 - idOnL2 - startedTransferToL2 - startedTransferToL2At - startedTransferToL2AtBlockNumber - startedTransferToL2AtTx - transferredToL2 - transferredToL2At - transferredToL2AtBlockNumber - transferredToL2AtTx - versions { - subgraphDeployment { - ipfsHash - } - } - } - } - `, - ) - - if (result.error) { - throw result.error - } - - const transferredDeployments = result.data.subgraphs - - // There may be no transferred subgraphs, handle gracefully - if (transferredDeployments.length == 0) { - this.logger.warn( - 'Failed to query subgraph deployments transferred to L2: no deployments found', - ) - throw new Error('No transferred subgraph deployments returned') - } - - // Flatten multiple subgraphDeployment versions into a single `TransferredSubgraphDeployment` object - // TODO: We could use `zod` to parse GraphQL responses into the expected type - // eslint-disable-next-line @typescript-eslint/no-explicit-any - return transferredDeployments.flatMap((deployment: any) => { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - return deployment.versions.map((version: any) => { - return { - id: deployment.id, - idOnL1: deployment.idOnL1, - idOnL2: deployment.idOnL2, - startedTransferToL2: deployment.startedTransferToL2, - startedTransferToL2At: BigNumber.from(deployment.startedTransferToL2At), - startedTransferToL2AtBlockNumber: BigNumber.from( - deployment.startedTransferToL2AtBlockNumber, - ), - startedTransferToL2AtTx: deployment.startedTransferToL2AtTx, - transferredToL2: deployment.transferredToL2, - transferredToL2At: deployment.transferredToL2At - ? BigNumber.from(deployment.transferredToL2At) - : null, - transferredToL2AtTx: deployment.transferredToL2AtTx, - transferredToL2AtBlockNumber: deployment.transferredToL2AtBlockNumber - ? BigNumber.from(deployment.transferredToL2AtBlockNumber) - : null, - ipfsHash: version.subgraphDeployment.ipfsHash, - protocolNetwork: this.networkCAIPID, - ready: null, - } - }) - }) - } catch (err) { - const error = indexerError(IndexerErrorCode.IE009, err.message) - this.logger.error(`Failed to query transferred subgraph deployments`, { - error, - }) - throw error - } - } - async subgraphDeployments(): Promise { const deployments: SubgraphDeployment[] = [] const queryProgress = { diff --git a/packages/indexer-common/src/indexer-management/resolvers/actions.ts b/packages/indexer-common/src/indexer-management/resolvers/actions.ts index 1e246754c..6890797fd 100644 --- a/packages/indexer-common/src/indexer-management/resolvers/actions.ts +++ b/packages/indexer-common/src/indexer-management/resolvers/actions.ts @@ -11,15 +11,12 @@ import { ActionType, ActionUpdateInput, IndexerManagementModels, - Network, - NetworkMapped, OrderDirection, validateActionInputs, validateNetworkIdentifier, } from '@graphprotocol/indexer-common' import { literal, Op, Transaction } from 'sequelize' import { ActionManager } from '../actions' -import groupBy from 'lodash.groupby' // Perform insert, update, or no-op depending on existing queue data // INSERT - No item in the queue yet targeting this deploymentID @@ -151,13 +148,13 @@ export default { queueActions: async ( { actions }: { actions: ActionInput[] }, - { actionManager, logger, multiNetworks, models }: IndexerManagementResolverContext, + { actionManager, logger, network, models }: IndexerManagementResolverContext, ): Promise => { logger.debug(`Execute 'queueActions' mutation`, { actions, }) - if (!actionManager || !multiNetworks) { + if (!actionManager || !network) { throw Error('IndexerManagementClient must be in `network` mode to modify actions') } @@ -171,11 +168,7 @@ export default { }) // Let Network Monitors validate actions based on their protocol networks - await multiNetworks.mapNetworkMapped( - groupBy(actions, (action) => action.protocolNetwork), - (network: Network, actions: ActionInput[]) => - validateActionInputs(actions, network.networkMonitor, logger), - ) + await validateActionInputs(actions, network.networkMonitor, logger) let results: ActionResult[] = [] @@ -360,23 +353,20 @@ export default { if (!actionManager) { throw Error('IndexerManagementClient must be in `network` mode to modify actions') } - const result: NetworkMapped = await actionManager.multiNetworks.map( - async (network: Network) => { - logger.debug(`Execute 'executeApprovedActions' mutation`, { - protocolNetwork: network.specification.networkIdentifier, - }) - try { - return await actionManager.executeApprovedActions(network) - } catch (error) { - logger.error('Failed to execute approved actions for network', { - protocolNetwork: network.specification.networkIdentifier, - error, - }) - return [] - } - }, - ) - return Object.values(result).flat() + const { network } = actionManager + logger.debug(`Execute 'executeApprovedActions' mutation`, { + protocolNetwork: network.specification.networkIdentifier, + }) + try { + const result = await actionManager.executeApprovedActions(network) + return result + } catch (error) { + logger.error('Failed to execute approved actions for network', { + protocolNetwork: network.specification.networkIdentifier, + error, + }) + return [] + } }, } diff --git a/packages/indexer-common/src/indexer-management/resolvers/allocations.ts b/packages/indexer-common/src/indexer-management/resolvers/allocations.ts index 5d2e72690..3af7e6f8f 100644 --- a/packages/indexer-common/src/indexer-management/resolvers/allocations.ts +++ b/packages/indexer-common/src/indexer-management/resolvers/allocations.ts @@ -1,4 +1,4 @@ -import { epochElapsedBlocks, Network } from '@graphprotocol/indexer-common' +import { epochElapsedBlocks } from '@graphprotocol/indexer-common' /* eslint-disable @typescript-eslint/explicit-module-boundary-types */ /* eslint-disable @typescript-eslint/ban-types */ @@ -29,7 +29,6 @@ import { SubgraphIdentifierType, uniqueAllocationID, } from '@graphprotocol/indexer-common' -import { extractNetwork } from './utils' interface AllocationFilter { status: 'active' | 'closed' @@ -298,70 +297,71 @@ async function queryAllocations( export default { allocations: async ( { filter }: { filter: AllocationFilter }, - { multiNetworks, logger }: IndexerManagementResolverContext, + { network, logger }: IndexerManagementResolverContext, ): Promise => { logger.debug('Execute allocations() query', { filter, }) - if (!multiNetworks) { + if (!network) { throw Error( 'IndexerManagementClient must be in `network` mode to fetch allocations', ) } - const allocationsByNetwork = await multiNetworks.map( - async (network: Network): Promise => { - // Return early if a different protocol network is specifically requested - if ( - filter.protocolNetwork && - filter.protocolNetwork !== network.specification.networkIdentifier - ) { - return [] - } + // Return early if a different protocol network is specifically requested + if ( + filter.protocolNetwork && + filter.protocolNetwork !== network.specification.networkIdentifier + ) { + return [] + } - const { - networkMonitor, - networkSubgraph, - contracts, - specification: { - indexerOptions: { address }, - }, - } = network - - const [currentEpoch, maxAllocationEpochs, epochLength] = await Promise.all([ - networkMonitor.networkCurrentEpoch(), - contracts.staking.maxAllocationEpochs(), - contracts.epochManager.epochLength(), - ]) - - const allocation = filter.allocation - ? filter.allocation === 'all' - ? null - : toAddress(filter.allocation) - : null - - const variables = { - indexer: toAddress(address), - allocation, - status: filter.status, - } + const { + networkMonitor, + networkSubgraph, + contracts, + specification: { + indexerOptions: { address }, + }, + } = network + + const [currentEpoch, maxAllocationEpochs, epochLength] = await Promise.all([ + networkMonitor.networkCurrentEpoch(), + contracts.staking.maxAllocationEpochs(), + contracts.epochManager.epochLength(), + ]) + + const allocation = filter.allocation + ? filter.allocation === 'all' + ? null + : toAddress(filter.allocation) + : null + + const variables = { + indexer: toAddress(address), + allocation, + status: filter.status, + } - const context = { - currentEpoch: currentEpoch.epochNumber, - currentEpochStartBlock: currentEpoch.startBlockNumber, - currentEpochElapsedBlocks: epochElapsedBlocks(currentEpoch), - latestBlock: currentEpoch.latestBlock, - maxAllocationEpochs, - blocksPerEpoch: epochLength.toNumber(), - avgBlockTime: 13000, - protocolNetwork: network.specification.networkIdentifier, - } + const context = { + currentEpoch: currentEpoch.epochNumber, + currentEpochStartBlock: currentEpoch.startBlockNumber, + currentEpochElapsedBlocks: epochElapsedBlocks(currentEpoch), + latestBlock: currentEpoch.latestBlock, + maxAllocationEpochs, + blocksPerEpoch: epochLength.toNumber(), + avgBlockTime: 13000, + protocolNetwork: network.specification.networkIdentifier, + } - return queryAllocations(logger, networkSubgraph, variables, context) - }, + const allocationsResult = await queryAllocations( + logger, + networkSubgraph, + variables, + context, ) - return Object.values(allocationsByNetwork).flat() + return Object.values(allocationsResult).flat() }, createAllocation: async ( @@ -374,19 +374,18 @@ export default { amount: string protocolNetwork: string }, - { multiNetworks, graphNode, logger, models }: IndexerManagementResolverContext, + { network, graphNode, logger, models }: IndexerManagementResolverContext, ): Promise => { logger.debug('Execute createAllocation() mutation', { deployment, amount, protocolNetwork, }) - if (!multiNetworks) { + if (!network) { throw Error( 'IndexerManagementClient must be in `network` mode to fetch allocations', ) } - const network = extractNetwork(protocolNetwork, multiNetworks) const networkMonitor = network.networkMonitor const contracts = network.contracts const transactionManager = network.transactionManager @@ -604,25 +603,22 @@ export default { allocation, poi, force, - protocolNetwork, }: { allocation: string poi: string | undefined force: boolean - protocolNetwork: string }, - { logger, models, multiNetworks }: IndexerManagementResolverContext, + { logger, models, network }: IndexerManagementResolverContext, ): Promise => { logger.debug('Execute closeAllocation() mutation', { allocationID: allocation, poi: poi || 'none provided', }) - if (!multiNetworks) { + if (!network) { throw Error( 'IndexerManagementClient must be in `network` mode to fetch allocations', ) } - const network = extractNetwork(protocolNetwork, multiNetworks) const networkMonitor = network.networkMonitor const contracts = network.contracts const transactionManager = network.transactionManager @@ -769,7 +765,7 @@ export default { force: boolean protocolNetwork: string }, - { logger, models, multiNetworks }: IndexerManagementResolverContext, + { logger, models, network }: IndexerManagementResolverContext, ): Promise => { logger = logger.child({ component: 'reallocateAllocationResolver', @@ -782,14 +778,13 @@ export default { force, }) - if (!multiNetworks) { + if (!network) { throw Error( 'IndexerManagementClient must be in `network` mode to fetch allocations', ) } // Obtain the Network object and its associated components and data - const network = extractNetwork(protocolNetwork, multiNetworks) const networkMonitor = network.networkMonitor const contracts = network.contracts const transactionManager = network.transactionManager @@ -1072,18 +1067,17 @@ export default { allocation: string protocolNetwork: string }, - { logger, multiNetworks }: IndexerManagementResolverContext, + { logger, network }: IndexerManagementResolverContext, ): Promise => { logger.debug('Execute collectAllocationReceipts() mutation', { allocationID: allocation, protocolNetwork, }) - if (!multiNetworks) { + if (!network) { throw Error( 'IndexerManagementClient must be in `network` mode to collect receipts for an allocation', ) } - const network = extractNetwork(protocolNetwork, multiNetworks) const networkMonitor = network.networkMonitor const receiptCollector = network.receiptCollector diff --git a/packages/indexer-common/src/indexer-management/resolvers/cost-models.ts b/packages/indexer-common/src/indexer-management/resolvers/cost-models.ts index 45587363e..54e015714 100644 --- a/packages/indexer-common/src/indexer-management/resolvers/cost-models.ts +++ b/packages/indexer-common/src/indexer-management/resolvers/cost-models.ts @@ -80,14 +80,11 @@ export default { setCostModel: async ( { costModel }: { deployment: string; costModel: GraphQLCostModel }, - { models, multiNetworks }: IndexerManagementResolverContext, + { models, network }: IndexerManagementResolverContext, ): Promise => { - if (!multiNetworks) { + if (!network) { throw new Error('No network configuration available') } - if (Object.keys(multiNetworks.inner).length !== 1) { - throw Error('Must be in single network mode to set cost models') - } const update = parseGraphQLCostModel(costModel) // Validate cost model diff --git a/packages/indexer-common/src/indexer-management/resolvers/indexer-status.ts b/packages/indexer-common/src/indexer-management/resolvers/indexer-status.ts index c040113a2..e1e3eae95 100644 --- a/packages/indexer-common/src/indexer-management/resolvers/indexer-status.ts +++ b/packages/indexer-common/src/indexer-management/resolvers/indexer-status.ts @@ -10,7 +10,6 @@ import { Network, validateNetworkIdentifier, } from '@graphprotocol/indexer-common' -import { extractNetwork } from './utils' interface Test { test: (url: string) => string run: (url: string) => Promise @@ -62,16 +61,15 @@ const URL_VALIDATION_TEST: Test = { export default { indexerRegistration: async ( - { protocolNetwork: unvalidatedProtocolNetwork }: { protocolNetwork: string }, - { multiNetworks }: IndexerManagementResolverContext, + _: { protocolNetwork: string | null }, + { network }: IndexerManagementResolverContext, ): Promise => { - if (!multiNetworks) { + if (!network) { throw Error( 'IndexerManagementClient must be in `network` mode to fetch indexer registration information', ) } - const network = extractNetwork(unvalidatedProtocolNetwork, multiNetworks) const protocolNetwork = network.specification.networkIdentifier const address = network.specification.indexerOptions.address const contracts = network.contracts @@ -100,7 +98,7 @@ export default { }, indexerDeployments: async ( - _: {}, + _: { protocolNetwork: string | null }, { graphNode }: IndexerManagementResolverContext, ): Promise => { const result = await graphNode.indexingStatus([]) @@ -111,16 +109,15 @@ export default { }, indexerAllocations: async ( - { protocolNetwork }: { protocolNetwork: string }, - { multiNetworks, logger }: IndexerManagementResolverContext, + _: { protocolNetwork: string | null }, + { network, logger }: IndexerManagementResolverContext, ): Promise => { - if (!multiNetworks) { + if (!network) { throw Error( 'IndexerManagementClient must be in `network` mode to fetch indexer allocations', ) } - const network = extractNetwork(protocolNetwork, multiNetworks) const address = network.specification.indexerOptions.address try { @@ -188,9 +185,9 @@ export default { indexerEndpoints: async ( { protocolNetwork: unvalidatedProtocolNetwork }: { protocolNetwork: string | null }, - { multiNetworks, logger }: IndexerManagementResolverContext, + { network, logger }: IndexerManagementResolverContext, ): Promise => { - if (!multiNetworks) { + if (!network) { throw Error( 'IndexerManagementClient must be in `network` mode to fetch indexer endpoints', ) @@ -209,25 +206,22 @@ export default { ) } - await multiNetworks.map(async (network: Network) => { - // Skip if this query asks for another protocol network - if ( - networkIdentifier && - networkIdentifier !== network.specification.networkIdentifier - ) { - return - } - try { - const networkEndpoints = await endpointForNetwork(network) - endpoints.push(networkEndpoints) - } catch (err) { - // Ignore endpoints for this network - logger?.warn(`Failed to detect service endpoints for network`, { - err, - protocolNetwork: network.specification.networkIdentifier, - }) - } - }) + if ( + networkIdentifier && + networkIdentifier !== network.specification.networkIdentifier + ) { + return endpoints + } + try { + const networkEndpoints = await endpointForNetwork(network) + endpoints.push(networkEndpoints) + } catch (err) { + // Ignore endpoints for this network + logger?.warn(`Failed to detect service endpoints for network`, { + err, + protocolNetwork: network.specification.networkIdentifier, + }) + } return endpoints }, } diff --git a/packages/indexer-common/src/indexer-management/resolvers/utils.ts b/packages/indexer-common/src/indexer-management/resolvers/utils.ts deleted file mode 100644 index baa061fdf..000000000 --- a/packages/indexer-common/src/indexer-management/resolvers/utils.ts +++ /dev/null @@ -1,26 +0,0 @@ -import { - MultiNetworks, - Network, - validateNetworkIdentifier, -} from '@graphprotocol/indexer-common' - -export function extractNetwork( - unvalidatedNetworkIdentifier: string, - multiNetworks: MultiNetworks, -): Network { - let networkIdentifier: string - try { - networkIdentifier = validateNetworkIdentifier(unvalidatedNetworkIdentifier) - } catch (parseError) { - throw new Error( - `Invalid protocol network identifier: '${unvalidatedNetworkIdentifier}'. Error: ${parseError}`, - ) - } - const network = multiNetworks.inner[networkIdentifier] - if (!network) { - throw new Error( - `Could not find a configured protocol network named ${networkIdentifier}`, - ) - } - return network -} diff --git a/packages/indexer-common/src/indexer-management/types.ts b/packages/indexer-common/src/indexer-management/types.ts index beeab7f6a..cf801f4cb 100644 --- a/packages/indexer-common/src/indexer-management/types.ts +++ b/packages/indexer-common/src/indexer-management/types.ts @@ -290,17 +290,3 @@ export async function validateProviderNetworkIdentifier( throw new Error(errorMsg) } } - -// Convenience function to check if a given network identifier is a supported Layer-1 protocol network -export function networkIsL1(networkIdentifier: string): boolean { - // Normalize network identifier - networkIdentifier = resolveChainId(networkIdentifier) - return networkIdentifier === 'eip155:1' || networkIdentifier === 'eip155:11155111' -} - -// Convenience function to check if a given network identifier is a supported Layer-2 protocol network -export function networkIsL2(networkIdentifier: string): boolean { - // Normalize network identifier - networkIdentifier = resolveChainId(networkIdentifier) - return networkIdentifier === 'eip155:42161' || networkIdentifier === 'eip155:421614' -} diff --git a/packages/indexer-common/src/multi-networks.ts b/packages/indexer-common/src/multi-networks.ts deleted file mode 100644 index f305d2087..000000000 --- a/packages/indexer-common/src/multi-networks.ts +++ /dev/null @@ -1,111 +0,0 @@ -import pReduce from 'p-reduce' -import isEqual from 'lodash.isequal' -import xor from 'lodash.xor' - -// A mapping of different values of type T keyed by their network identifiers -export type NetworkMapped = Record - -// Function to extract the network identifier from a value of type T -type NetworkIdentity = (element: T) => string - -// Wrapper type for performing calls over multiple values of any type, most notably -// Network and Operator instances. -// All public-facing methods should return a `NetworkMapped` or `void`. -export class MultiNetworks { - inner: NetworkMapped - constructor(elements: T[], networkIdentity: NetworkIdentity) { - if (elements.length === 0) { - throw new Error('MultiNetworks component was initialized with empty values') - } - - function reducer(accumulator: NetworkMapped, current: T): NetworkMapped { - const key = networkIdentity(current) - if (key in accumulator) { - throw new Error( - `Duplicate network identifier found while mapping value's network: ${key}`, - ) - } - // TODO: parse and validate network identifiers to standardize them - accumulator[key] = current - return accumulator - } - this.inner = elements.reduce(reducer, {}) - } - - private checkEqualKeys(a: NetworkMapped, b: NetworkMapped) { - const aKeys = Object.keys(a) - const bKeys = Object.keys(b) - if (!isEqual(aKeys, bKeys)) { - const differentKeys = xor(aKeys, bKeys) - throw new Error(`Network Mapped objects have different keys: ${differentKeys}`) - } - } - - async map(func: (value: T) => Promise): Promise> { - const entries: [string, T][] = Object.entries(this.inner) - return pReduce( - entries, - async (acc, pair) => { - const [networkIdentifier, element]: [string, T] = pair - const result = await func(element) - acc[networkIdentifier] = result - return acc - }, - {} as NetworkMapped, - ) - } - - zip(a: NetworkMapped, b: NetworkMapped): NetworkMapped<[U, V]> { - this.checkEqualKeys(a, b) - const result = {} as NetworkMapped<[U, V]> - for (const key in a) { - result[key] = [a[key], b[key]] - } - return result - } - - zip3( - a: NetworkMapped, - b: NetworkMapped, - c: NetworkMapped, - ): NetworkMapped<[U, V, W]> { - this.checkEqualKeys(a, b) - const result = {} as NetworkMapped<[U, V, W]> - for (const key in a) { - result[key] = [a[key], b[key], c[key]] - } - return result - } - - zip4( - a: NetworkMapped, - b: NetworkMapped, - c: NetworkMapped, - d: NetworkMapped, - ): NetworkMapped<[U, V, W, Y]> { - this.checkEqualKeys(a, b) - const result = {} as NetworkMapped<[U, V, W, Y]> - for (const key in a) { - result[key] = [a[key], b[key], c[key], d[key]] - } - return result - } - - async mapNetworkMapped( - nmap: NetworkMapped, - func: (inner: T, value: U) => Promise, - ): Promise> { - return pReduce( - Object.entries(nmap), - async (acc, [networkIdentifier, value]: [string, U]) => { - const inner = this.inner[networkIdentifier] - if (!inner) { - throw new Error(`Network identifier not found: ${networkIdentifier}`) - } - acc[networkIdentifier] = await func(inner, value) - return acc - }, - {} as NetworkMapped, - ) - } -} diff --git a/packages/indexer-common/src/parsers/test-utils.ts b/packages/indexer-common/src/parsers/test-utils.ts index 3463cbdec..0c80fc416 100644 --- a/packages/indexer-common/src/parsers/test-utils.ts +++ b/packages/indexer-common/src/parsers/test-utils.ts @@ -72,18 +72,22 @@ function parseYamlFile(filePath: string): NetworkSpecification { } } -function parseYamlFiles(filePaths: string[]): NetworkSpecification[] { - return filePaths.map(parseYamlFile) -} - -export function parseNetworkSpecifications( +export function parseNetworkSpecification( // eslint-disable-next-line @typescript-eslint/no-explicit-any argv: any, logger: Logger, -): NetworkSpecification[] { +): NetworkSpecification | undefined { const dir: string = argv.dir || argv['network-specifications-directory'] const yamlFiles = scanDirectoryForYamlFiles(dir, logger) - return parseYamlFiles(yamlFiles) + if (yamlFiles.length === 0) { + logger.info('No network specification files found in the provided directory') + return undefined + } else if (yamlFiles.length === 1) { + logger.info(`Found yaml config at ${dir}/${yamlFiles[0]} (ignoring others})`) + return parseYamlFile(yamlFiles[0]) + } else { + throw new Error(`Multiple network specification files found in ${dir}.`) + } } function scanDirectoryForYamlFiles(directoryPath: string, logger: Logger): string[] { @@ -91,13 +95,13 @@ function scanDirectoryForYamlFiles(directoryPath: string, logger: Logger): strin // Check if the directory exists if (!fs.existsSync(directoryPath)) { - throw new Error(`Directory does not exist: ${directoryPath}`) + throw new Error(`Directory does not exist: ${directoryPath} `) } // Check if the provided path is a directory const isDirectory = fs.lstatSync(directoryPath).isDirectory() if (!isDirectory) { - throw new Error(`Provided path is not a directory: ${directoryPath}`) + throw new Error(`Provided path is not a directory: ${directoryPath} `) } // Read the directory @@ -124,7 +128,7 @@ function scanDirectoryForYamlFiles(directoryPath: string, logger: Logger): strin fs.accessSync(filePath, fs.constants.R_OK) yamlFiles.push(filePath) } catch (error) { - throw new Error(`Cannot read file: ${filePath}`) + throw new Error(`Cannot read file: ${filePath} `) } } } @@ -132,7 +136,7 @@ function scanDirectoryForYamlFiles(directoryPath: string, logger: Logger): strin // Check if at least one YAMl file was found if (yamlFiles.length === 0) { throw new Error( - `No YAML file was found in '${directoryPath}'. At least one file is required.`, + `No YAML file was found in '${directoryPath}'.At least one file is required.`, ) } diff --git a/packages/indexer-common/src/subgraph-client.ts b/packages/indexer-common/src/subgraph-client.ts index d634abc99..b0a432a5d 100644 --- a/packages/indexer-common/src/subgraph-client.ts +++ b/packages/indexer-common/src/subgraph-client.ts @@ -165,11 +165,11 @@ export class SubgraphClient { const healthy = status.synced && status.health === 'healthy' if (healthy) { - this.logger.trace(`Use own deployment for ${this.name} query`) + this.logger.trace(`Use own deployment for ${this.name} query`, { status }) this.endpoint = this.subgraphDeploymentEndpoint return this.deployment.endpointClient } else if (this.endpointClient) { - this.logger.trace(`Use provided endpoint for ${this.name} query`) + this.logger.trace(`Use provided endpoint for ${this.name} query`, { status }) this.endpoint = this.subgraphConfigEndpoint return this.endpointClient } else { diff --git a/packages/indexer-common/src/types.ts b/packages/indexer-common/src/types.ts index cfd62747a..cb6f81fd4 100644 --- a/packages/indexer-common/src/types.ts +++ b/packages/indexer-common/src/types.ts @@ -66,25 +66,6 @@ export interface SubgraphDeployment { protocolNetwork: string } -// L1 Network Subgraph will always return `null` for the -// `transferredToL2*` set of fields -export interface TransferredSubgraphDeployment { - id: string - idOnL1: string - idOnL2: string - startedTransferToL2L: boolean - startedTransferToL2At: BigNumber - startedTransferToL2AtBlockNumber: BigNumber - startedTransferToL2AtTx: string - transferredToL2: boolean | null - transferredToL2At: BigNumber | null - transferredToL2AtTx: string | null - transferredToL2AtBlockNumber: BigNumber | null - ipfsHash: string - protocolNetwork: string - ready: boolean | null -} - export enum TransactionType { ZERO, TWO,