Skip to content

Commit 1fd032e

Browse files
Feat: Automate account-like optimization (#6209)
* store: Add table_stats materialzed view Signed-off-by: Maksim Dimitrov <[email protected]> * graph, store: Add experimental job to automatically set account-like flag on eligible tables Signed-off-by: Maksim Dimitrov <[email protected]> * docs: Update env vars documentation Signed-off-by: Maksim Dimitrov <[email protected]> * store: Fixes and refactor error handling Signed-off-by: Maksim Dimitrov <[email protected]> * store: Rename materialized view to avoid name clash Signed-off-by: Maksim Dimitrov <[email protected]> --------- Signed-off-by: Maksim Dimitrov <[email protected]>
1 parent 5486ae9 commit 1fd032e

File tree

7 files changed

+222
-1
lines changed

7 files changed

+222
-1
lines changed

docs/environment-variables.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,3 +287,10 @@ those.
287287
- `GRAPH_ENABLE_SQL_QUERIES`: Enable the experimental [SQL query
288288
interface](implementation/sql-interface.md).
289289
(default: false)
290+
- `GRAPH_STORE_ACCOUNT_LIKE_SCAN_INTERVAL_HOURS`: If set, enables an experimental job that
291+
periodically scans for entity tables that may benefit from an [account-like optimization](https://thegraph.com/docs/en/indexing/tooling/graph-node/#account-like-optimisation) and marks them with an
292+
account-like flag. The value is the interval in hours at which the job
293+
should run. The job reads data from the `info.table_stats` materialized view, which refreshes every six hours. Expects an integer value, e.g., 24. Requires also setting
294+
`GRAPH_STORE_ACCOUNT_LIKE_MIN_VERSIONS_COUNT` and `GRAPH_STORE_ACCOUNT_LIKE_MAX_UNIQUE_RATIO`.
295+
- `GRAPH_STORE_ACCOUNT_LIKE_MIN_VERSIONS_COUNT`: Sets the minimum total number of versions a table must have to be considered for account-like flagging. Expects a positive integer value. No default value.
296+
- `GRAPH_STORE_ACCOUNT_LIKE_MAX_UNIQUE_RATIO`: Sets the maximum unique entities to version ratio (e.g., 0.01 ≈ 1:100 entity-to-version ratio).

graph/src/env/store.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,18 @@ pub struct EnvVarsStore {
149149
/// The number of rows to fetch from the foreign data wrapper in one go,
150150
/// this will be set as the option 'fetch_size' on all foreign servers
151151
pub fdw_fetch_size: usize,
152+
153+
/// Experimental feature to automatically set the account-like flag on eligible tables
154+
/// Set by the environment variable `GRAPH_STORE_ACCOUNT_LIKE_SCAN_INTERVAL_HOURS`
155+
/// If not set, the job is disabled.
156+
/// Utilizes materialized view stats that refresh every 6 hours to discover heavy-write tables.
157+
pub account_like_scan_interval_hours: Option<u32>,
158+
/// Set by the environment variable `GRAPH_STORE_ACCOUNT_LIKE_MIN_VERSIONS_COUNT`
159+
/// Tables must have at least this many total versions to be considered.
160+
pub account_like_min_versions_count: Option<u64>,
161+
/// Set by the environment variable `GRAPH_STORE_ACCOUNT_LIKE_MAX_UNIQUE_RATIO`
162+
/// Defines the maximum share of unique entities (e.g. 0.01 for a 1:100 entity-to-version ratio).
163+
pub account_like_max_unique_ratio: Option<f64>,
152164
}
153165

154166
// This does not print any values avoid accidentally leaking any sensitive env vars
@@ -206,6 +218,9 @@ impl TryFrom<InnerStore> for EnvVarsStore {
206218
disable_block_cache_for_lookup: x.disable_block_cache_for_lookup,
207219
insert_extra_cols: x.insert_extra_cols,
208220
fdw_fetch_size: x.fdw_fetch_size,
221+
account_like_scan_interval_hours: x.account_like_scan_interval_hours,
222+
account_like_min_versions_count: x.account_like_min_versions_count,
223+
account_like_max_unique_ratio: x.account_like_max_unique_ratio.map(|r| r.0),
209224
};
210225
if let Some(timeout) = vars.batch_timeout {
211226
if timeout < 2 * vars.batch_target_duration {
@@ -217,6 +232,16 @@ impl TryFrom<InnerStore> for EnvVarsStore {
217232
if vars.batch_workers < 1 {
218233
bail!("GRAPH_STORE_BATCH_WORKERS must be at least 1");
219234
}
235+
if vars.account_like_scan_interval_hours.is_some()
236+
&& (vars.account_like_min_versions_count.is_none()
237+
|| vars.account_like_max_unique_ratio.is_none())
238+
{
239+
bail!(
240+
"Both GRAPH_STORE_ACCOUNT_LIKE_MIN_VERSIONS_COUNT and \
241+
GRAPH_STORE_ACCOUNT_LIKE_MAX_UNIQUE_RATIO must be set when \
242+
GRAPH_STORE_ACCOUNT_LIKE_SCAN_INTERVAL_HOURS is set"
243+
);
244+
}
220245
Ok(vars)
221246
}
222247
}
@@ -295,6 +320,12 @@ pub struct InnerStore {
295320
insert_extra_cols: usize,
296321
#[envconfig(from = "GRAPH_STORE_FDW_FETCH_SIZE", default = "1000")]
297322
fdw_fetch_size: usize,
323+
#[envconfig(from = "GRAPH_STORE_ACCOUNT_LIKE_SCAN_INTERVAL_HOURS")]
324+
account_like_scan_interval_hours: Option<u32>,
325+
#[envconfig(from = "GRAPH_STORE_ACCOUNT_LIKE_MIN_VERSIONS_COUNT")]
326+
account_like_min_versions_count: Option<u64>,
327+
#[envconfig(from = "GRAPH_STORE_ACCOUNT_LIKE_MAX_UNIQUE_RATIO")]
328+
account_like_max_unique_ratio: Option<ZeroToOneF64>,
298329
}
299330

300331
#[derive(Clone, Copy, Debug)]
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
DROP MATERIALIZED VIEW IF EXISTS info.entity_version_stats;
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
CREATE MATERIALIZED VIEW info.entity_version_stats AS
2+
WITH table_info AS (
3+
SELECT
4+
s.schemaname AS schema_name,
5+
sd.id AS deployment_id,
6+
sd.subgraph AS subgraph,
7+
s.tablename AS table_name,
8+
c.reltuples AS total_row_count,
9+
s.n_distinct AS n_distinct
10+
FROM pg_stats s
11+
JOIN pg_namespace n ON n.nspname = s.schemaname
12+
JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.tablename
13+
JOIN subgraphs.deployment sd ON sd.id::text = substring(s.schemaname, 4)
14+
WHERE
15+
s.attname = 'id'
16+
AND s.schemaname LIKE 'sgd%'
17+
AND c.relname NOT IN ('poi2$', 'data_sources$')
18+
)
19+
SELECT
20+
schema_name,
21+
deployment_id AS deployment,
22+
subgraph,
23+
table_name,
24+
CASE
25+
WHEN n_distinct < 0 THEN (-n_distinct) * total_row_count
26+
ELSE n_distinct
27+
END::bigint AS entities,
28+
total_row_count::bigint AS versions,
29+
CASE
30+
WHEN total_row_count = 0 THEN 0::float8
31+
WHEN n_distinct < 0 THEN (-n_distinct)::float8
32+
ELSE n_distinct::numeric / total_row_count::numeric
33+
END AS ratio
34+
FROM table_info
35+
WITH NO DATA;

store/postgres/src/deployment_store.rs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -770,6 +770,49 @@ impl DeploymentStore {
770770
catalog::drop_index(&mut conn, schema_name.as_str(), &index_name).await
771771
}
772772

773+
pub(crate) async fn identify_account_like_candidates(
774+
&self,
775+
min_versions: u64,
776+
ratio: f64,
777+
) -> Result<Vec<(String, String)>, StoreError> {
778+
#[derive(QueryableByName)]
779+
struct TableStat {
780+
#[diesel(sql_type = diesel::sql_types::Text)]
781+
subgraph: String,
782+
#[diesel(sql_type = diesel::sql_types::Text)]
783+
table_name: String,
784+
}
785+
786+
let mut conn = self.pool.get().await?;
787+
let query = r#"
788+
SELECT
789+
stats.subgraph,
790+
stats.table_name
791+
FROM info.entity_version_stats AS stats
792+
LEFT JOIN subgraphs.table_stats ts
793+
ON ts.deployment = stats.deployment
794+
AND ts.table_name = stats.table_name
795+
WHERE
796+
stats.versions > $1
797+
AND stats.ratio < $2
798+
AND ts.is_account_like IS NOT TRUE
799+
"#;
800+
801+
let result = diesel::sql_query(query)
802+
.bind::<diesel::sql_types::BigInt, _>(min_versions as i64)
803+
.bind::<diesel::sql_types::Double, _>(ratio)
804+
.load::<TableStat>(&mut conn)
805+
.await
806+
.map_err(Into::into);
807+
808+
result.map(|tables| {
809+
tables
810+
.into_iter()
811+
.map(|table_stat| (table_stat.subgraph, table_stat.table_name))
812+
.collect()
813+
})
814+
}
815+
773816
pub(crate) async fn set_account_like(
774817
&self,
775818
site: Arc<Site>,
@@ -1821,10 +1864,11 @@ impl DeploymentStore {
18211864
// We hardcode our materialized views, but could also use
18221865
// pg_matviews to list all of them, though that might inadvertently
18231866
// refresh materialized views that operators created themselves
1824-
const VIEWS: [&str; 3] = [
1867+
const VIEWS: [&str; 4] = [
18251868
"info.table_sizes",
18261869
"info.subgraph_sizes",
18271870
"info.chain_sizes",
1871+
"info.entity_version_stats",
18281872
];
18291873
let mut conn = store.pool.get_permitted().await?;
18301874
for view in VIEWS {

store/postgres/src/jobs.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,13 @@ pub fn register(
4848
Arc::new(RefreshMaterializedView::new(store.subgraph_store())),
4949
6 * ONE_HOUR,
5050
);
51+
52+
if let Some(interval) = ENV_VARS.store.account_like_scan_interval_hours {
53+
runner.register(
54+
Arc::new(AccountLikeJob::new(store.subgraph_store())),
55+
interval * ONE_HOUR,
56+
);
57+
}
5158
}
5259

5360
/// A job that vacuums `subgraphs.deployment` and `subgraphs.head`. With a
@@ -235,3 +242,37 @@ impl Job for UnusedJob {
235242
}
236243
}
237244
}
245+
246+
struct AccountLikeJob {
247+
store: Arc<SubgraphStore>,
248+
}
249+
250+
impl AccountLikeJob {
251+
fn new(store: Arc<SubgraphStore>) -> AccountLikeJob {
252+
AccountLikeJob { store }
253+
}
254+
}
255+
256+
#[async_trait]
257+
impl Job for AccountLikeJob {
258+
fn name(&self) -> &str {
259+
"Set account-like flag on eligible tables"
260+
}
261+
262+
async fn run(&self, logger: &Logger) {
263+
// Safe to unwrap due to a startup validation
264+
// which ensures these values are present when account_like_scan_interval_hours is set.
265+
let min_versions_count = ENV_VARS.store.account_like_min_versions_count.unwrap();
266+
let ratio = ENV_VARS.store.account_like_max_unique_ratio.unwrap();
267+
268+
self.store
269+
.identify_and_set_account_like(logger, min_versions_count, ratio)
270+
.await
271+
.unwrap_or_else(|e| {
272+
error!(
273+
logger,
274+
"Failed to set account-like flag on eligible tables: {}", e
275+
)
276+
});
277+
}
278+
}

store/postgres/src/subgraph_store.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1277,6 +1277,68 @@ impl Inner {
12771277
store.drop_index(site, index_name).await
12781278
}
12791279

1280+
pub(crate) async fn identify_and_set_account_like(
1281+
&self,
1282+
logger: &Logger,
1283+
min_records: u64,
1284+
ratio: f64,
1285+
) -> Result<(), StoreError> {
1286+
for (_shard, store) in &self.stores {
1287+
let candidates = match store
1288+
.identify_account_like_candidates(min_records, ratio)
1289+
.await
1290+
{
1291+
Ok(candidates) => candidates,
1292+
Err(e) => {
1293+
graph::slog::error!(
1294+
logger,
1295+
"Failed to identify account-like candidates in shard {}",
1296+
_shard;
1297+
"error" => e.to_string()
1298+
);
1299+
continue;
1300+
}
1301+
};
1302+
1303+
graph::slog::debug!(
1304+
logger,
1305+
"Found {} account-like candidates in shard {}",
1306+
candidates.len(),
1307+
_shard
1308+
);
1309+
1310+
for (subgraph, table_name) in candidates {
1311+
graph::slog::debug!(
1312+
logger,
1313+
"Setting table {} as account-like for deployment {}",
1314+
table_name,
1315+
subgraph
1316+
);
1317+
1318+
let res = async {
1319+
let hash = DeploymentHash::new(subgraph.clone()).map_err(|_| {
1320+
anyhow!("Failed to create deployment hash for subgraph: {subgraph}")
1321+
})?;
1322+
let (store, site) = self.store(&hash).await?;
1323+
store.set_account_like(site, &table_name, true).await
1324+
}
1325+
.await;
1326+
1327+
if let Err(e) = res {
1328+
graph::slog::error!(
1329+
logger,
1330+
"Failed to set table {} as account-like for deployment {}",
1331+
table_name,
1332+
subgraph;
1333+
"error" => e.to_string()
1334+
);
1335+
}
1336+
}
1337+
}
1338+
1339+
Ok(())
1340+
}
1341+
12801342
pub async fn set_account_like(
12811343
&self,
12821344
deployment: &DeploymentLocator,

0 commit comments

Comments
 (0)