Skip to content

Commit

Permalink
fix: fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
pcarranzav committed Feb 5, 2025
1 parent da44d64 commit 1a4c74b
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 66 deletions.
23 changes: 12 additions & 11 deletions crates/dips/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

use std::{
str::FromStr,
sync::Arc,
};
use std::{str::FromStr, sync::Arc};

use thegraph_core::alloy::{
core::primitives::Address,
Expand Down Expand Up @@ -83,7 +80,7 @@ sol! {

uint32 minEpochsPerCollection;
uint32 maxEpochsPerCollection;

// Deadline for the indexer to accept the agreement
uint64 deadline;
bytes metadata;
Expand Down Expand Up @@ -280,8 +277,9 @@ pub async fn validate_and_create_agreement(
) -> Result<Uuid, DipsError> {
let voucher = SignedIndexingAgreementVoucher::abi_decode(&mut voucher.as_ref(), true)
.map_err(|e| DipsError::AbiDecoding(e.to_string()))?;
let metadata = SubgraphIndexingVoucherMetadata::abi_decode(&mut voucher.voucher.metadata.as_ref(), true)
.map_err(|e| DipsError::AbiDecoding(e.to_string()))?;
let metadata =
SubgraphIndexingVoucherMetadata::abi_decode(&mut voucher.voucher.metadata.as_ref(), true)
.map_err(|e| DipsError::AbiDecoding(e.to_string()))?;

voucher.validate(domain, expected_payee, allowed_payers)?;

Expand All @@ -298,7 +296,9 @@ pub async fn validate_and_cancel_agreement(
let request = SignedCancellationRequest::abi_decode(&mut cancellation_request.as_ref(), true)
.map_err(|e| DipsError::AbiDecoding(e.to_string()))?;

let result = store.get_by_id(Uuid::from_bytes(request.request.agreement_id.into())).await?;
let result = store
.get_by_id(Uuid::from_bytes(request.request.agreement_id.into()))
.await?;
let agreement_and_cancelled = result.ok_or(DipsError::AgreementNotFound)?;
let agreement = agreement_and_cancelled.0;
let cancelled = agreement_and_cancelled.1;
Expand Down Expand Up @@ -330,7 +330,8 @@ mod test {

pub use crate::store::{AgreementStore, InMemoryAgreementStore};
use crate::{
dips_agreement_eip712_domain, dips_cancellation_eip712_domain, CancellationRequest, DipsError, IndexingAgreementVoucher, SubgraphIndexingVoucherMetadata
dips_agreement_eip712_domain, dips_cancellation_eip712_domain, CancellationRequest,
DipsError, IndexingAgreementVoucher, SubgraphIndexingVoucherMetadata,
};

#[tokio::test]
Expand Down Expand Up @@ -525,7 +526,7 @@ mod test {
#[tokio::test]
async fn test_create_and_cancel_agreement() -> anyhow::Result<()> {
let deployment_id = "Qmbg1qF4YgHjiVfsVt6a13ddrVcRtWyJQfD4LA3CwHM29f".to_string();
let payee = PrivateKeySigner::random();
let payee = PrivateKeySigner::random();
let payee_addr = payee.address();
let payer = PrivateKeySigner::random();
let payer_addr = payer.address();
Expand Down Expand Up @@ -559,7 +560,7 @@ mod test {

let domain = dips_agreement_eip712_domain();
let signed_voucher = voucher.sign(&domain, payer.clone())?;

// Create agreement
let agreement_id = super::validate_and_create_agreement(
store.clone(),
Expand Down
4 changes: 2 additions & 2 deletions crates/dips/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub struct DipsServer {
pub agreement_store: Arc<dyn AgreementStore>,
pub expected_payee: Address,
pub allowed_payers: Vec<Address>,
pub domain: Eip712Domain
pub domain: Eip712Domain,
}

#[async_trait]
Expand Down Expand Up @@ -66,7 +66,7 @@ impl DipsService for DipsServer {
) -> Result<Response<CancelAgreementResponse>, Status> {
let CancelAgreementRequest {
version,
signed_cancellation
signed_cancellation,
} = request.into_inner();

if version != 1 {
Expand Down
35 changes: 26 additions & 9 deletions crates/dips/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@ use async_trait::async_trait;
use uuid::Uuid;

use crate::{
DipsError, SignedCancellationRequest, SignedIndexingAgreementVoucher, SubgraphIndexingVoucherMetadata
DipsError, SignedCancellationRequest, SignedIndexingAgreementVoucher,
SubgraphIndexingVoucherMetadata,
};

#[async_trait]
pub trait AgreementStore: Sync + Send + std::fmt::Debug {
async fn get_by_id(&self, id: Uuid) -> anyhow::Result<Option<(SignedIndexingAgreementVoucher, bool)>>;
async fn get_by_id(
&self,
id: Uuid,
) -> anyhow::Result<Option<(SignedIndexingAgreementVoucher, bool)>>;
async fn create_agreement(
&self,
agreement: SignedIndexingAgreementVoucher,
Expand All @@ -31,17 +35,21 @@ pub struct InMemoryAgreementStore {

#[async_trait]
impl AgreementStore for InMemoryAgreementStore {
async fn get_by_id(&self, id: Uuid) -> anyhow::Result<Option<(SignedIndexingAgreementVoucher, bool)>> {
async fn get_by_id(
&self,
id: Uuid,
) -> anyhow::Result<Option<(SignedIndexingAgreementVoucher, bool)>> {
Ok(self.data.try_read()?.get(&id).cloned())
}
async fn create_agreement(
&self,
agreement: SignedIndexingAgreementVoucher,
_medatadata: SubgraphIndexingVoucherMetadata,
) -> anyhow::Result<()> {
self.data
.try_write()?
.insert(Uuid::from_bytes(agreement.voucher.agreement_id.into()), (agreement.clone(), false));
self.data.try_write()?.insert(
Uuid::from_bytes(agreement.voucher.agreement_id.into()),
(agreement.clone(), false),
);

Ok(())
}
Expand All @@ -52,11 +60,20 @@ impl AgreementStore for InMemoryAgreementStore {
let id = Uuid::from_bytes(signed_cancellation.request.agreement_id.into());

let agreement = {
let read_lock = self.data.try_read().map_err(|e| DipsError::UnknownError(e.into()))?;
read_lock.get(&id).cloned().ok_or(DipsError::AgreementNotFound)?
let read_lock = self
.data
.try_read()
.map_err(|e| DipsError::UnknownError(e.into()))?;
read_lock
.get(&id)
.cloned()
.ok_or(DipsError::AgreementNotFound)?
};

let mut write_lock = self.data.try_write().map_err(|e| DipsError::UnknownError(e.into()))?;
let mut write_lock = self
.data
.try_write()
.map_err(|e| DipsError::UnknownError(e.into()))?;
write_lock.insert(id, (agreement.0, true));

Ok(id)
Expand Down
120 changes: 77 additions & 43 deletions crates/service/src/database/dips.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,12 @@ use anyhow::bail;
use axum::async_trait;
use build_info::chrono::{DateTime, Utc};
use indexer_dips::{
store::AgreementStore, DipsError, SignedCancellationRequest, SignedIndexingAgreementVoucher, SubgraphIndexingVoucherMetadata
store::AgreementStore, DipsError, SignedCancellationRequest, SignedIndexingAgreementVoucher,
SubgraphIndexingVoucherMetadata,
};
use sqlx::{types::BigDecimal, PgPool};
use thegraph_core::alloy::{
core::primitives::U256 as uint256,
sol_types::SolType
};
use thegraph_core::alloy::{core::primitives::U256 as uint256, hex::ToHexExt, sol_types::SolType};
use uuid::Uuid;
use thegraph_core::alloy::hex::ToHexExt;

#[derive(Debug)]
pub struct PsqlAgreementStore {
Expand All @@ -31,10 +28,12 @@ fn uint32_to_i64(uint32: u32) -> i64 {
uint32.try_into().unwrap()
}


#[async_trait]
impl AgreementStore for PsqlAgreementStore {
async fn get_by_id(&self, id: Uuid) -> anyhow::Result<Option<(SignedIndexingAgreementVoucher, bool)>> {
async fn get_by_id(
&self,
id: Uuid,
) -> anyhow::Result<Option<(SignedIndexingAgreementVoucher, bool)>> {
let item = sqlx::query!("SELECT * FROM indexing_agreements WHERE id=$1", id,)
.fetch_one(&self.pool)
.await;
Expand All @@ -45,7 +44,8 @@ impl AgreementStore for PsqlAgreementStore {
Err(err) => bail!(err),
};

let signed = SignedIndexingAgreementVoucher::abi_decode(&mut item.signed_payload.as_ref(), true)?;
let signed =
SignedIndexingAgreementVoucher::abi_decode(&mut item.signed_payload.as_ref(), true)?;
let cancelled = item.cancelled_at.is_some();
Ok(Some((signed, cancelled)))
}
Expand All @@ -57,12 +57,14 @@ impl AgreementStore for PsqlAgreementStore {
let id = Uuid::from_bytes(agreement.voucher.agreement_id.into());
let bs = agreement.encode_vec();
let now = Utc::now();
let deadline = DateTime::from_timestamp(agreement.voucher.deadline.try_into().unwrap(), 0).unwrap();
let deadline =
DateTime::from_timestamp(agreement.voucher.deadline.try_into().unwrap(), 0).unwrap();
let base_price_per_epoch = uint256_to_bigdecimal(&metadata.basePricePerEpoch);
let price_per_entity = uint256_to_bigdecimal(&metadata.pricePerEntity);
let duration_epochs = uint32_to_i64(agreement.voucher.durationEpochs);
let max_initial_amount = uint256_to_bigdecimal(&agreement.voucher.maxInitialAmount);
let max_ongoing_amount_per_epoch = uint256_to_bigdecimal(&agreement.voucher.maxOngoingAmountPerEpoch);
let max_ongoing_amount_per_epoch =
uint256_to_bigdecimal(&agreement.voucher.maxOngoingAmountPerEpoch);
let min_epochs_per_collection = uint32_to_i64(agreement.voucher.minEpochsPerCollection);
let max_epochs_per_collection = uint32_to_i64(agreement.voucher.maxEpochsPerCollection);
sqlx::query!(
Expand Down Expand Up @@ -117,14 +119,17 @@ impl AgreementStore for PsqlAgreementStore {
#[cfg(test)]
pub(crate) mod test {
use std::sync::Arc;
use super::*;

use build_info::chrono::Duration;
use indexer_dips::{CancellationRequest, IndexingAgreementVoucher};
use sqlx::PgPool;
use thegraph_core::alloy::primitives::{ruint::aliases::U256, Address};
use thegraph_core::alloy::{
primitives::{ruint::aliases::U256, Address},
sol_types::{SolType, SolValue},
};
use uuid::Uuid;
use thegraph_core::alloy::sol_types::SolValue;
use thegraph_core::alloy::sol_types::SolType;

use super::*;

#[sqlx::test(migrations = "../../migrations")]
async fn test_store_agreement(pool: PgPool) {
Expand All @@ -149,26 +154,26 @@ pub(crate) mod test {
payer: Address::from_str("1234567890123456789012345678901234567890").unwrap(),
recipient: Address::from_str("2345678901234567890123456789012345678901").unwrap(),
service: Address::from_str("3456789012345678901234567890123456789012").unwrap(),
durationEpochs: 30, // 30 epochs duration
durationEpochs: 30, // 30 epochs duration
maxInitialAmount: U256::from(1000),
maxOngoingAmountPerEpoch: U256::from(100),
maxEpochsPerCollection: 5,
minEpochsPerCollection: 1,
metadata: metadata.abi_encode().into(), // Convert Vec<u8> to Bytes
metadata: metadata.abi_encode().into(), // Convert Vec<u8> to Bytes
},
};

// Store agreement
store.create_agreement(agreement.clone(), metadata).await.unwrap();
store
.create_agreement(agreement.clone(), metadata)
.await
.unwrap();

// Verify stored agreement
let row = sqlx::query!(
"SELECT * FROM indexing_agreements WHERE id = $1",
id
)
.fetch_one(&store.pool)
.await
.unwrap();
let row = sqlx::query!("SELECT * FROM indexing_agreements WHERE id = $1", id)
.fetch_one(&store.pool)
.await
.unwrap();

assert_eq!(row.id, id);
assert_eq!(row.signature, agreement.signature);
Expand Down Expand Up @@ -210,27 +215,53 @@ pub(crate) mod test {
};

// Store agreement
store.create_agreement(agreement.clone(), metadata.clone()).await.unwrap();
store
.create_agreement(agreement.clone(), metadata.clone())
.await
.unwrap();

// Retrieve agreement
let retrieved = store.get_by_id(id).await.unwrap().unwrap();

let retrieved_voucher = &retrieved.0.voucher;
let cancelled = retrieved.1;
let retrieved_metadata = <indexer_dips::SubgraphIndexingVoucherMetadata as SolType>::abi_decode(&mut retrieved_voucher.metadata.as_ref(), true).unwrap();
let retrieved_metadata =
<indexer_dips::SubgraphIndexingVoucherMetadata as SolType>::abi_decode(
&mut retrieved_voucher.metadata.as_ref(),
true,
)
.unwrap();
// Verify retrieved agreement matches original
assert_eq!(retrieved.0.signature, agreement.signature);
assert_eq!(retrieved_voucher.durationEpochs, agreement.voucher.durationEpochs);
assert_eq!(
retrieved_voucher.durationEpochs,
agreement.voucher.durationEpochs
);
assert_eq!(retrieved_metadata.protocolNetwork, metadata.protocolNetwork);
assert_eq!(retrieved_metadata.chainId, metadata.chainId);
assert_eq!(retrieved_metadata.subgraphDeploymentId, metadata.subgraphDeploymentId);
assert_eq!(
retrieved_metadata.subgraphDeploymentId,
metadata.subgraphDeploymentId
);
assert_eq!(retrieved_voucher.payer, agreement.voucher.payer);
assert_eq!(retrieved_voucher.recipient, agreement.voucher.recipient);
assert_eq!(retrieved_voucher.service, agreement.voucher.service);
assert_eq!(retrieved_voucher.maxInitialAmount, agreement.voucher.maxInitialAmount);
assert_eq!(retrieved_voucher.maxOngoingAmountPerEpoch, agreement.voucher.maxOngoingAmountPerEpoch);
assert_eq!(retrieved_voucher.maxEpochsPerCollection, agreement.voucher.maxEpochsPerCollection);
assert_eq!(retrieved_voucher.minEpochsPerCollection, agreement.voucher.minEpochsPerCollection);
assert_eq!(
retrieved_voucher.maxInitialAmount,
agreement.voucher.maxInitialAmount
);
assert_eq!(
retrieved_voucher.maxOngoingAmountPerEpoch,
agreement.voucher.maxOngoingAmountPerEpoch
);
assert_eq!(
retrieved_voucher.maxEpochsPerCollection,
agreement.voucher.maxEpochsPerCollection
);
assert_eq!(
retrieved_voucher.minEpochsPerCollection,
agreement.voucher.minEpochsPerCollection
);
assert_eq!(cancelled, false);
}

Expand Down Expand Up @@ -267,8 +298,11 @@ pub(crate) mod test {
};

// Store agreement
store.create_agreement(agreement.clone(), metadata).await.unwrap();

store
.create_agreement(agreement.clone(), metadata)
.await
.unwrap();

// Cancel agreement
let cancellation = SignedCancellationRequest {
signature: vec![1, 2, 3].into(),
Expand All @@ -279,15 +313,15 @@ pub(crate) mod test {
store.cancel_agreement(cancellation.clone()).await.unwrap();

// Verify stored agreement
let row = sqlx::query!(
"SELECT * FROM indexing_agreements WHERE id = $1",
id
)
.fetch_one(&store.pool)
.await
.unwrap();
let row = sqlx::query!("SELECT * FROM indexing_agreements WHERE id = $1", id)
.fetch_one(&store.pool)
.await
.unwrap();

assert!(row.cancelled_at.is_some());
assert_eq!(row.signed_cancellation_payload, Some(cancellation.encode_vec()));
assert_eq!(
row.signed_cancellation_payload,
Some(cancellation.encode_vec())
);
}
}
2 changes: 1 addition & 1 deletion crates/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ pub async fn run() -> anyhow::Result<()> {
agreement_store: Arc::new(PsqlAgreementStore { pool: database }),
expected_payee: indexer_address,
allowed_payers: allowed_payers.clone(),
domain: domain_separator
domain: domain_separator,
};

info!("starting dips grpc server on {}", addr);
Expand Down

0 comments on commit 1a4c74b

Please sign in to comment.