Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 35 additions & 10 deletions docs/graphman.md
Original file line number Diff line number Diff line change
Expand Up @@ -371,21 +371,30 @@ Inspect all blocks after block `13000000`:

Remove the call cache of the specified chain.

If block numbers are not mentioned in `--from` and `--to`, then all the call cache will be removed.
Either remove entries in the range `--from` and `--to`, remove stale contracts which have not been accessed for a specified duration `--ttl_days`, or remove the entire cache with `--remove-entire-cache`. Removing the entire cache can reduce indexing performance significantly and should generally be avoided.

USAGE:
graphman chain call-cache <CHAIN_NAME> remove [OPTIONS]
Usage: graphman chain call-cache <CHAIN_NAME> remove [OPTIONS]

OPTIONS:
-f, --from <FROM>
Starting block number
Options:
--remove-entire-cache
Remove the entire cache

--ttl-days <TTL_DAYS>
Remove stale contracts based on call_meta table

-h, --help
Print help information
--ttl-max-contracts <TTL_MAX_CONTRACTS>
Limit the number of contracts to consider for stale contract removal

-f, --from <FROM>
Starting block number

-t, --to <TO>
-t, --to <TO>
Ending block number

-h, --help
Print help (see a summary with '-h')


### DESCRIPTION

Remove the call cache of a specified chain.
Expand All @@ -404,6 +413,15 @@ the first block number will be used as the starting block number.
The `to` option is used to specify the ending block number of the block range. In the absence of `to` option,
the last block number will be used as the ending block number.

#### `--remove-entire-cache`
The `--remove-entire-cache` option is used to remove the entire call cache of the specified chain.

#### `--ttl-days <TTL_DAYS>`
The `--ttl-days` option is used to remove stale contracts based on the `call_meta.accessed_at` field. For example, if `--ttl-days` is set to 7, all calls to a contract that has not been accessed in the last 7 days will be removed from the call cache.

#### `--ttl-max-contracts <TTL_MAX_CONTRACTS>`
The `--ttl-max-contracts` option is used to limit the maximum number of contracts to be removed when using the `--ttl-days` option. For example, if `--ttl-max-contracts` is set to 100, at most 100 contracts will be removed from the call cache even if more contracts meet the TTL criteria.

### EXAMPLES

Remove the call cache for all blocks numbered from 10 to 20:
Expand All @@ -412,5 +430,12 @@ Remove the call cache for all blocks numbered from 10 to 20:

Remove all the call cache of the specified chain:

graphman --config config.toml chain call-cache ethereum remove
graphman --config config.toml chain call-cache ethereum remove --remove-entire-cache

Remove stale contracts from the call cache that have not been accessed in the last 7 days:

graphman --config config.toml chain call-cache ethereum remove --ttl-days 7

Remove stale contracts from the call cache that have not been accessed in the last 7 days, limiting the removal to a maximum of 100 contracts:
graphman --config config.toml chain call-cache ethereum remove --ttl-days 7 --ttl-max-contracts 100

7 changes: 7 additions & 0 deletions graph/src/blockchain/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,13 @@ impl ChainStore for MockChainStore {
async fn clear_call_cache(&self, _from: BlockNumber, _to: BlockNumber) -> Result<(), Error> {
unimplemented!()
}
async fn clear_stale_call_cache(
&self,
_ttl_days: i32,
_ttl_max_contracts: Option<i64>,
) -> Result<(), Error> {
unimplemented!()
}
fn chain_identifier(&self) -> Result<ChainIdentifier, Error> {
unimplemented!()
}
Expand Down
7 changes: 7 additions & 0 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,13 @@ pub trait ChainStore: ChainHeadStore {
/// Clears call cache of the chain for the given `from` and `to` block number.
async fn clear_call_cache(&self, from: BlockNumber, to: BlockNumber) -> Result<(), Error>;

/// Clears stale call cache entries for the given TTL in days.
async fn clear_stale_call_cache(
&self,
ttl_days: i32,
ttl_max_contracts: Option<i64>,
) -> Result<(), Error>;

/// Return the chain identifier for this store.
fn chain_identifier(&self) -> Result<ChainIdentifier, Error>;

Expand Down
22 changes: 20 additions & 2 deletions node/src/bin/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,14 +555,21 @@ pub enum ChainCommand {
pub enum CallCacheCommand {
/// Remove the call cache of the specified chain.
///
/// Either remove entries in the range `--from` and `--to`, or remove
/// the entire cache with `--remove-entire-cache`. Removing the entire
/// Either remove entries in the range `--from` and `--to`,
/// remove the cache for contracts that have not been accessed for the specified duration --ttl_days,
/// or remove the entire cache with `--remove-entire-cache`. Removing the entire
/// cache can reduce indexing performance significantly and should
/// generally be avoided.
Remove {
/// Remove the entire cache
#[clap(long, conflicts_with_all = &["from", "to"])]
remove_entire_cache: bool,
/// Remove the cache for contracts that have not been accessed in the last <TTL_DAYS> days
#[clap(long, conflicts_with_all = &["from", "to", "remove-entire-cache"], value_parser = clap::value_parser!(i32).range(1..))]
ttl_days: Option<i32>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't allow negative values here

/// Limits the number of contracts to consider for cache removal when using --ttl_days
#[clap(long, conflicts_with_all = &["remove-entire-cache", "to", "from"], requires = "ttl_days", value_parser = clap::value_parser!(i64).range(1..))]
ttl_max_contracts: Option<i64>,
/// Starting block number
#[clap(long, short, conflicts_with = "remove-entire-cache", requires = "to")]
from: Option<i32>,
Expand Down Expand Up @@ -1472,8 +1479,19 @@ async fn main() -> anyhow::Result<()> {
from,
to,
remove_entire_cache,
ttl_days,
ttl_max_contracts,
} => {
let chain_store = ctx.chain_store(&chain_name)?;
if let Some(ttl_days) = ttl_days {
return commands::chain::clear_stale_call_cache(
chain_store,
ttl_days,
ttl_max_contracts,
)
.await;
}

if !remove_entire_cache && from.is_none() && to.is_none() {
bail!("you must specify either --from and --to or --remove-entire-cache");
}
Expand Down
15 changes: 15 additions & 0 deletions node/src/manager/commands/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,21 @@ pub async fn clear_call_cache(
Ok(())
}

pub async fn clear_stale_call_cache(
chain_store: Arc<ChainStore>,
ttl_days: i32,
ttl_max_contracts: Option<i64>,
) -> Result<(), Error> {
println!(
"Removing stale entries from the call cache for `{}`",
chain_store.chain
);
chain_store
.clear_stale_call_cache(ttl_days, ttl_max_contracts)
.await?;
Ok(())
}

pub async fn info(
primary: ConnectionPool,
store: Arc<BlockStore>,
Expand Down
199 changes: 198 additions & 1 deletion store/postgres/src/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ pub use data::Storage;

/// Encapuslate access to the blocks table for a chain.
mod data {
use crate::diesel::dsl::IntervalDsl;
use diesel::sql_types::{Array, Binary, Bool, Nullable};
use diesel::{connection::SimpleConnection, insert_into};
use diesel::{delete, prelude::*, sql_query};
Expand All @@ -104,8 +105,10 @@ mod data {
use graph::prelude::transaction_receipt::LightTransactionReceipt;
use graph::prelude::web3::types::H256;
use graph::prelude::{
serde_json as json, BlockNumber, BlockPtr, CachedEthereumCall, Error, StoreError,
info, serde_json as json, BlockNumber, BlockPtr, CachedEthereumCall, Error, Logger,
StoreError,
};

use std::collections::HashMap;
use std::convert::TryFrom;
use std::fmt;
Expand Down Expand Up @@ -1398,6 +1401,190 @@ mod data {
}
}

pub fn clear_stale_call_cache(
&self,
conn: &mut PgConnection,
logger: &Logger,
ttl_days: i32,
ttl_max_contracts: Option<i64>,
) -> Result<(), Error> {
let mut total_calls: usize = 0;
let mut total_contracts: i64 = 0;
// We process contracts in batches to avoid loading too many entries into memory
// at once. Each contract can have many calls, so we also delete calls in batches.
// Note: The batch sizes were chosen based on experimentation. Potentially, they
// could be made configurable via ENV vars.
let contracts_batch_size: i64 = 2000;
let cache_batch_size: usize = 10000;

// Limits the number of contracts to process if ttl_max_contracts is set.
// Used also to adjust the final batch size, so we don't process more
// contracts than the set limit.
let remaining_contracts = |processed: i64| -> Option<i64> {
ttl_max_contracts.map(|limit| limit.saturating_sub(processed))
};

match self {
Storage::Shared => {
use public::eth_call_cache as cache;
use public::eth_call_meta as meta;

loop {
if let Some(0) = remaining_contracts(total_contracts) {
info!(
logger,
"Finished cleaning call cache: deleted {} entries for {} contracts (limit reached)",
total_calls,
total_contracts
);
break;
}

let batch_limit = remaining_contracts(total_contracts)
.map(|left| left.min(contracts_batch_size))
.unwrap_or(contracts_batch_size);

let stale_contracts = meta::table
.select(meta::contract_address)
.filter(
meta::accessed_at
.lt(diesel::dsl::date(diesel::dsl::now - ttl_days.days())),
)
.limit(batch_limit)
.get_results::<Vec<u8>>(conn)?;

if stale_contracts.is_empty() {
info!(
logger,
"Finished cleaning call cache: deleted {} entries for {} contracts",
total_calls,
total_contracts
);
break;
}

loop {
let next_batch = cache::table
.select(cache::id)
.filter(cache::contract_address.eq_any(&stale_contracts))
.limit(cache_batch_size as i64)
.get_results::<Vec<u8>>(conn)?;
let deleted_count =
diesel::delete(cache::table.filter(cache::id.eq_any(&next_batch)))
.execute(conn)?;

total_calls += deleted_count;

if deleted_count < cache_batch_size {
break;
}
}

let deleted_contracts = diesel::delete(
meta::table.filter(meta::contract_address.eq_any(&stale_contracts)),
)
.execute(conn)?;

total_contracts += deleted_contracts as i64;
}

Ok(())
}
Storage::Private(Schema {
call_cache,
call_meta,
..
}) => {
let select_query = format!(
"WITH stale_contracts AS (
SELECT contract_address
FROM {}
WHERE accessed_at < current_date - interval '{} days'
LIMIT $1
)
SELECT contract_address FROM stale_contracts",
call_meta.qname, ttl_days
);

let delete_cache_query = format!(
"WITH targets AS (
SELECT id
FROM {}
WHERE contract_address = ANY($1)
LIMIT {}
)
DELETE FROM {} USING targets
WHERE {}.id = targets.id",
call_cache.qname, cache_batch_size, call_cache.qname, call_cache.qname
);

let delete_meta_query = format!(
"DELETE FROM {} WHERE contract_address = ANY($1)",
call_meta.qname
);

#[derive(QueryableByName)]
struct ContractAddress {
#[diesel(sql_type = Bytea)]
contract_address: Vec<u8>,
}

loop {
if let Some(0) = remaining_contracts(total_contracts) {
info!(
logger,
"Finished cleaning call cache: deleted {} entries for {} contracts (limit reached)",
total_calls,
total_contracts
);
break;
}

let batch_limit = remaining_contracts(total_contracts)
.map(|left| left.min(contracts_batch_size))
.unwrap_or(contracts_batch_size);

let stale_contracts: Vec<Vec<u8>> = sql_query(&select_query)
.bind::<BigInt, _>(batch_limit)
.load::<ContractAddress>(conn)?
.into_iter()
.map(|r| r.contract_address)
.collect();

if stale_contracts.is_empty() {
info!(
logger,
"Finished cleaning call cache: deleted {} entries for {} contracts",
total_calls,
total_contracts
);
break;
}

loop {
let deleted_count = sql_query(&delete_cache_query)
.bind::<Array<Bytea>, _>(&stale_contracts)
.execute(conn)?;

total_calls += deleted_count;

if deleted_count < cache_batch_size {
break;
}
}

let deleted_contracts = sql_query(&delete_meta_query)
.bind::<Array<Bytea>, _>(&stale_contracts)
.execute(conn)?;

total_contracts += deleted_contracts as i64;
}

Ok(())
}
}
}

pub(super) fn update_accessed_at(
&self,
conn: &mut PgConnection,
Expand Down Expand Up @@ -2508,6 +2695,16 @@ impl ChainStoreTrait for ChainStore {
Ok(())
}

async fn clear_stale_call_cache(
&self,
ttl_days: i32,
ttl_max_contracts: Option<i64>,
) -> Result<(), Error> {
let conn = &mut *self.get_conn()?;
self.storage
.clear_stale_call_cache(conn, &self.logger, ttl_days, ttl_max_contracts)
}

async fn transaction_receipts_in_block(
&self,
block_hash: &H256,
Expand Down
Loading