Skip to content

Commit d903ebf

Browse files
committed
chore: progress towards collecting payments
1 parent 65d1457 commit d903ebf

20 files changed

+731
-51
lines changed

.sqlx/query-f36803bd99ba0b93b312950e8d88c94a80335ea34bf7200aef5d18e6983a2cb1.json

+154
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.lock

+4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/config/maximal-config-example.toml

+1
Original file line numberDiff line numberDiff line change
@@ -153,3 +153,4 @@ max_receipts_per_request = 10000
153153
host = "0.0.0.0"
154154
port = "7601"
155155
allowed_payers = ["0x3333333333333333333333333333333333333333"]
156+
dipper_grpc_url = "https://dipper.thegraph.com/"

crates/config/src/config.rs

+3
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,7 @@ pub struct DipsConfig {
390390
pub host: String,
391391
pub port: String,
392392
pub allowed_payers: Vec<Address>,
393+
pub dipper_grpc_url: String,
393394
}
394395

395396
impl Default for DipsConfig {
@@ -398,6 +399,7 @@ impl Default for DipsConfig {
398399
host: "0.0.0.0".to_string(),
399400
port: "7601".to_string(),
400401
allowed_payers: vec![],
402+
dipper_grpc_url: "".to_string(),
401403
}
402404
}
403405
}
@@ -462,6 +464,7 @@ mod tests {
462464
allowed_payers: vec![Address(
463465
FixedBytes::<20>::from_str("0x3333333333333333333333333333333333333333").unwrap(),
464466
)],
467+
dipper_grpc_url: "https://dipper.thegraph.com/".to_string(),
465468
..Default::default()
466469
});
467470

crates/dips/Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ version = "0.1.0"
44
edition = "2021"
55

66
[dependencies]
7+
axum.workspace = true
8+
build-info.workspace = true
79
thiserror.workspace = true
810
anyhow.workspace = true
911
alloy-rlp = "0.3.10"
@@ -15,6 +17,7 @@ prost-types.workspace = true
1517
uuid.workspace = true
1618
base64.workspace = true
1719
tokio.workspace = true
20+
sqlx.workspace = true
1821

1922
[build-dependencies]
2023
tonic-build = { workspace = true }

crates/dips/build.rs

-2
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,13 @@ fn main() {
66
tonic_build::configure()
77
.out_dir("src/proto")
88
.include_file("indexer.rs")
9-
.build_client(false)
109
.protoc_arg("--experimental_allow_proto3_optional")
1110
.compile_protos(&["proto/indexer.proto"], &["proto/"])
1211
.expect("Failed to compile DIPs indexer RPC proto(s)");
1312

1413
tonic_build::configure()
1514
.out_dir("src/proto")
1615
.include_file("gateway.rs")
17-
.build_server(false)
1816
.protoc_arg("--experimental_allow_proto3_optional")
1917
.compile_protos(&["proto/gateway.proto"], &["proto"])
2018
.expect("Failed to compile DIPs gateway RPC proto(s)");

crates/dips/proto/gateway.proto

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ syntax = "proto3";
22

33
package graphprotocol.gateway.dips;
44

5-
service DipsService {
5+
service DipperService {
66
/**
77
* Cancel an _indexing agreement_.
88
*

crates/dips/proto/indexer.proto

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ syntax = "proto3";
22

33
package graphprotocol.indexer.dips;
44

5-
service DipsService {
5+
service IndexerDipsService {
66
/**
77
* Propose a new _indexing agreement_ to an _indexer_.
88
*

crates/service/src/database/dips.rs crates/dips/src/database.rs

+47-5
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,15 @@ use std::str::FromStr;
55

66
use axum::async_trait;
77
use build_info::chrono::{DateTime, Utc};
8-
use indexer_dips::{
8+
use sqlx::{types::BigDecimal, PgPool};
9+
use thegraph_core::alloy::{core::primitives::U256 as uint256, hex::ToHexExt, sol_types::SolType};
10+
use uuid::Uuid;
11+
12+
use crate::{
913
store::{AgreementStore, StoredIndexingAgreement},
1014
DipsError, SignedCancellationRequest, SignedIndexingAgreementVoucher,
1115
SubgraphIndexingVoucherMetadata,
1216
};
13-
use sqlx::{types::BigDecimal, PgPool};
14-
use thegraph_core::alloy::{core::primitives::U256 as uint256, hex::ToHexExt, sol_types::SolType};
15-
use uuid::Uuid;
1617

1718
#[derive(Debug)]
1819
pub struct PsqlAgreementStore {
@@ -51,6 +52,47 @@ impl AgreementStore for PsqlAgreementStore {
5152
last_allocation_id: item.last_allocation_id,
5253
}))
5354
}
55+
async fn get_by_last_allocation_id(
56+
&self,
57+
allocation_id: String,
58+
) -> Result<Vec<StoredIndexingAgreement>, DipsError> {
59+
let items = sqlx::query!(
60+
"SELECT * FROM indexing_agreements WHERE last_allocation_id=$1",
61+
allocation_id,
62+
)
63+
.fetch_all(&self.pool)
64+
.await;
65+
66+
let items = match items {
67+
Ok(items) => items,
68+
Err(sqlx::Error::RowNotFound) => return Ok(vec![]),
69+
Err(err) => return Err(DipsError::UnknownError(err.into())),
70+
};
71+
72+
// Note: we discard any agreements that fail to decode
73+
let agreements = items
74+
.into_iter()
75+
.map(|item| {
76+
let signed =
77+
SignedIndexingAgreementVoucher::abi_decode(item.signed_payload.as_ref(), true)
78+
.map_err(|e| DipsError::AbiDecoding(e.to_string()))?;
79+
let metadata = SubgraphIndexingVoucherMetadata::abi_decode(
80+
signed.voucher.metadata.as_ref(),
81+
true,
82+
)
83+
.map_err(|e| DipsError::AbiDecoding(e.to_string()))?;
84+
Ok(StoredIndexingAgreement {
85+
voucher: signed,
86+
metadata,
87+
cancelled: item.cancelled_at.is_some(),
88+
current_allocation_id: item.current_allocation_id,
89+
last_allocation_id: item.last_allocation_id,
90+
})
91+
})
92+
.filter_map(|agreement: Result<StoredIndexingAgreement, DipsError>| agreement.ok())
93+
.collect();
94+
Ok(agreements)
95+
}
5496
async fn create_agreement(
5597
&self,
5698
agreement: SignedIndexingAgreementVoucher,
@@ -133,7 +175,6 @@ pub(crate) mod test {
133175
use std::sync::Arc;
134176

135177
use build_info::chrono::Duration;
136-
use indexer_dips::{CancellationRequest, IndexingAgreementVoucher};
137178
use sqlx::PgPool;
138179
use thegraph_core::alloy::{
139180
primitives::{ruint::aliases::U256, Address},
@@ -142,6 +183,7 @@ pub(crate) mod test {
142183
use uuid::Uuid;
143184

144185
use super::*;
186+
use crate::{CancellationRequest, IndexingAgreementVoucher};
145187

146188
#[sqlx::test(migrations = "../../migrations")]
147189
async fn test_store_agreement(pool: PgPool) {

crates/dips/src/lib.rs

+21
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@ use std::{str::FromStr, sync::Arc};
55

66
use thegraph_core::alloy::{
77
core::primitives::Address,
8+
hex::ToHexExt,
89
primitives::{b256, ChainId, PrimitiveSignature as Signature, B256},
910
signers::SignerSync,
1011
sol,
1112
sol_types::{eip712_domain, Eip712Domain, SolStruct, SolValue},
1213
};
1314

15+
pub mod database;
1416
pub mod proto;
1517
pub mod server;
1618
pub mod store;
@@ -323,6 +325,25 @@ pub async fn validate_and_cancel_agreement(
323325
Ok(id)
324326
}
325327

328+
pub async fn collect_indexing_agreements(
329+
store: Arc<dyn AgreementStore>,
330+
allocation_id: Address,
331+
_dipper_grpc_url: &str,
332+
) -> Result<(), DipsError> {
333+
let agreements = store
334+
.get_by_last_allocation_id(allocation_id.encode_hex())
335+
.await?;
336+
337+
for _agreement in agreements {
338+
// TODO get the entities count for the deployment from graph node
339+
// TODO create collection request
340+
// TODO sign collection request
341+
// TODO send collection request to the dipper via grpc
342+
}
343+
344+
Ok(())
345+
}
346+
326347
#[cfg(test)]
327348
mod test {
328349
use std::{

0 commit comments

Comments
 (0)