diff --git a/node/src/manager/commands/copy.rs b/node/src/manager/commands/copy.rs index ab007ea319d..9ca80bc9b20 100644 --- a/node/src/manager/commands/copy.rs +++ b/node/src/manager/commands/copy.rs @@ -2,7 +2,7 @@ use diesel::{ExpressionMethods, JoinOnDsl, OptionalExtension, QueryDsl, RunQuery use std::{collections::HashMap, sync::Arc, time::SystemTime}; use graph::{ - components::store::{BlockStore as _, DeploymentId}, + components::store::{BlockStore as _, DeploymentId, DeploymentLocator}, data::query::QueryTarget, prelude::{ anyhow::{anyhow, bail, Error}, @@ -84,10 +84,9 @@ impl CopyState { } } -pub async fn create( +async fn create_inner( store: Arc, - primary: ConnectionPool, - src: DeploymentSearch, + src: &DeploymentLocator, shard: String, shards: Vec, node: String, @@ -104,7 +103,6 @@ pub async fn create( }; let subgraph_store = store.subgraph_store(); - let src = src.locate_unique(&primary)?; let query_store = store .query_store(QueryTarget::Deployment( src.hash.clone(), @@ -154,6 +152,32 @@ pub async fn create( Ok(()) } +pub async fn create( + store: Arc, + primary: ConnectionPool, + src: DeploymentSearch, + shard: String, + shards: Vec, + node: String, + block_offset: u32, + activate: bool, + replace: bool, +) -> Result<(), Error> { + let src = src.locate_unique(&primary)?; + create_inner( + store, + &src, + shard, + shards, + node, + block_offset, + activate, + replace, + ) + .await + .map_err(|e| anyhow!("cannot copy {src}: {e}")) +} + pub fn activate(store: Arc, deployment: String, shard: String) -> Result<(), Error> { let shard = Shard::new(shard)?; let deployment = @@ -231,13 +255,11 @@ pub fn list(pools: HashMap) -> Result<(), Error> { } pub fn status(pools: HashMap, dst: &DeploymentSearch) -> Result<(), Error> { + const CHECK: &str = "✓"; + use catalog::active_copies as ac; use catalog::deployment_schemas as ds; - fn done(ts: &Option) -> String { - ts.map(|_| "✓").unwrap_or(".").to_string() - } - fn duration(start: &UtcDateTime, end: &Option) -> String { let start = *start; let end = *end; @@ -290,7 +312,7 @@ pub fn status(pools: HashMap, dst: &DeploymentSearch) -> }; let progress = match &state.finished_at { - Some(_) => done(&state.finished_at), + Some(_) => CHECK.to_string(), None => { let target: i64 = tables.iter().map(|table| table.target_vid).sum(); let next: i64 = tables.iter().map(|table| table.next_vid).sum(); @@ -339,13 +361,15 @@ pub fn status(pools: HashMap, dst: &DeploymentSearch) -> ); println!("{:-<74}", "-"); for table in tables { - let status = if table.next_vid > 0 && table.next_vid < table.target_vid { - ">".to_string() - } else if table.target_vid < 0 { + let status = match &table.finished_at { + // table finished + Some(_) => CHECK, // empty source table - "✓".to_string() - } else { - done(&table.finished_at) + None if table.target_vid < 0 => CHECK, + // copying in progress + None if table.duration_ms > 0 => ">", + // not started + None => ".", }; println!( "{} {:<28} | {:>8} | {:>8} | {:>8} | {:>8}", diff --git a/store/postgres/src/catalog.rs b/store/postgres/src/catalog.rs index 1524a768acc..ba532dd53ff 100644 --- a/store/postgres/src/catalog.rs +++ b/store/postgres/src/catalog.rs @@ -398,6 +398,16 @@ pub fn drop_foreign_schema(conn: &mut PgConnection, src: &Site) -> Result<(), St Ok(()) } +pub fn foreign_tables(conn: &mut PgConnection, nsp: &str) -> Result, StoreError> { + use foreign_tables as ft; + + ft::table + .filter(ft::foreign_table_schema.eq(nsp)) + .select(ft::foreign_table_name) + .get_results::(conn) + .map_err(StoreError::from) +} + /// Drop the schema `nsp` and all its contents if it exists, and create it /// again so that `nsp` is an empty schema pub fn recreate_schema(conn: &mut PgConnection, nsp: &str) -> Result<(), StoreError> { diff --git a/store/postgres/src/connection_pool.rs b/store/postgres/src/connection_pool.rs index 374a1adc5ab..6267a41628a 100644 --- a/store/postgres/src/connection_pool.rs +++ b/store/postgres/src/connection_pool.rs @@ -37,6 +37,11 @@ use crate::primary::{self, NAMESPACE_PUBLIC}; use crate::{advisory_lock, catalog}; use crate::{Shard, PRIMARY_SHARD}; +/// Tables that we map from the primary into `primary_public` in each shard +const PRIMARY_TABLES: [&str; 3] = ["deployment_schemas", "chains", "active_copies"]; + +/// Tables that we map from each shard into each other shard into the +/// `shard__subgraphs` namespace const SHARDED_TABLES: [(&str, &[&str]); 2] = [ ("public", &["ethereum_networks"]), ( @@ -47,7 +52,6 @@ const SHARDED_TABLES: [(&str, &[&str]); 2] = [ "dynamic_ethereum_contract_data_source", "subgraph_deployment", "subgraph_error", - "subgraph_features", "subgraph_manifest", "table_stats", ], @@ -185,7 +189,7 @@ impl ForeignServer { alter server \"{name}\" options (set host '{remote_host}', \ {set_port} port '{remote_port}', \ - set dbname '{remote_db}, \ + set dbname '{remote_db}', \ {set_fetch_size} fetch_size '{fetch_size}'); alter user mapping for current_user server \"{name}\" @@ -209,7 +213,7 @@ impl ForeignServer { catalog::recreate_schema(conn, Self::PRIMARY_PUBLIC)?; let mut query = String::new(); - for table_name in ["deployment_schemas", "chains", "active_copies"] { + for table_name in PRIMARY_TABLES { let create_stmt = if shard == &*PRIMARY_SHARD { format!( "create view {nsp}.{table_name} as select * from public.{table_name};", @@ -246,6 +250,33 @@ impl ForeignServer { } Ok(conn.batch_execute(&query)?) } + + fn needs_remap(&self, conn: &mut PgConnection) -> Result { + fn different(mut existing: Vec, mut needed: Vec) -> bool { + existing.sort(); + needed.sort(); + existing != needed + } + + if &self.shard == &*PRIMARY_SHARD { + let existing = catalog::foreign_tables(conn, Self::PRIMARY_PUBLIC)?; + let needed = PRIMARY_TABLES + .into_iter() + .map(String::from) + .collect::>(); + if different(existing, needed) { + return Ok(true); + } + } + + let existing = catalog::foreign_tables(conn, &Self::metadata_schema(&self.shard))?; + let needed = SHARDED_TABLES + .iter() + .flat_map(|(_, tables)| *tables) + .map(|table| table.to_string()) + .collect::>(); + Ok(different(existing, needed)) + } } /// How long to keep connections in the `fdw_pool` around before closing @@ -1037,16 +1068,14 @@ impl PoolInner { let result = pool .configure_fdw(coord.servers.as_ref()) .and_then(|()| pool.drop_cross_shard_views()) - .and_then(|()| migrate_schema(&pool.logger, &mut conn)) - .and_then(|count| { - pool.create_cross_shard_views(coord.servers.as_ref()) - .map(|()| count) - }); + .and_then(|()| migrate_schema(&pool.logger, &mut conn)); debug!(&pool.logger, "Release migration lock"); advisory_lock::unlock_migration(&mut conn).unwrap_or_else(|err| { die(&pool.logger, "failed to release migration lock", &err); }); - let result = result.and_then(|count| coord.propagate(&pool, count)); + let result = result + .and_then(|count| coord.propagate(&pool, count)) + .and_then(|()| pool.create_cross_shard_views(coord.servers.as_ref())); result.unwrap_or_else(|err| die(&pool.logger, "migrations failed", &err)); // Locale check @@ -1178,9 +1207,9 @@ impl PoolInner { .await } - // The foreign server `server` had schema changes, and we therefore need - // to remap anything that we are importing via fdw to make sure we are - // using this updated schema + /// The foreign server `server` had schema changes, and we therefore + /// need to remap anything that we are importing via fdw to make sure we + /// are using this updated schema pub fn remap(&self, server: &ForeignServer) -> Result<(), StoreError> { if &server.shard == &*PRIMARY_SHARD { info!(&self.logger, "Mapping primary"); @@ -1198,6 +1227,15 @@ impl PoolInner { } Ok(()) } + + pub fn needs_remap(&self, server: &ForeignServer) -> Result { + if &server.shard == &self.shard { + return Ok(false); + } + + let mut conn = self.get()?; + server.needs_remap(&mut conn) + } } pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); @@ -1211,10 +1249,6 @@ impl MigrationCount { fn had_migrations(&self) -> bool { self.old != self.new } - - fn is_new(&self) -> bool { - self.old == 0 - } } /// Run all schema migrations. @@ -1334,13 +1368,22 @@ impl PoolCoordinator { /// code that does _not_ hold the migration lock as it will otherwise /// deadlock fn propagate(&self, pool: &PoolInner, count: MigrationCount) -> Result<(), StoreError> { - // pool is a new shard, map all other shards into it - if count.is_new() { - for server in self.servers.iter() { + // We need to remap all these servers into `pool` if the list of + // tables that are mapped have changed from the code of the previous + // version. Since dropping and recreating the foreign table + // definitions can slow the startup of other nodes down because of + // locking, we try to only do this when it is actually needed + for server in self.servers.iter() { + if pool.needs_remap(server)? { pool.remap(server)?; } } - // pool had schema changes, refresh the import from pool into all other shards + + // pool had schema changes, refresh the import from pool into all + // other shards. This makes sure that schema changes to + // already-mapped tables are propagated to all other shards. Since + // we run `propagate` after migrations have been applied to `pool`, + // we can be sure that these mappings use the correct schema if count.had_migrations() { let server = self.server(&pool.shard)?; for pool in self.pools.lock().unwrap().values() { diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index 5a31acfb959..d82bc33e4a8 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -730,7 +730,7 @@ impl Connection { &table.src.name.to_string(), &table.dst, true, - true, + false, )?; for (_, sql) in arr { @@ -748,7 +748,11 @@ impl Connection { .iter() .map(|c| c.name.to_string()) .collect_vec(); - for sql in table.dst.create_postponed_indexes(orig_colums).into_iter() { + for sql in table + .dst + .create_postponed_indexes(orig_colums, false) + .into_iter() + { let query = sql_query(sql); query.execute(conn)?; } diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index b196cd3c539..01f705158d3 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1553,6 +1553,12 @@ impl DeploymentStore { catalog::copy_account_like(conn, &src.site, &dst.site)?; + // Analyze all tables for this deployment + info!(logger, "Analyzing all {} tables", dst.tables.len()); + for entity_name in dst.tables.keys() { + self.analyze_with_conn(site.cheap_clone(), entity_name.as_str(), conn)?; + } + // Rewind the subgraph so that entity versions that are // clamped in the future (beyond `block`) become valid for // all blocks after `block`. `revert_block` gets rid of @@ -1563,6 +1569,7 @@ impl DeploymentStore { .number .checked_add(1) .expect("block numbers fit into an i32"); + info!(logger, "Rewinding to block {}", block.number); let count = dst.revert_block(conn, block_to_revert)?; deployment::update_entity_count(conn, &dst.site, count)?; @@ -1575,11 +1582,6 @@ impl DeploymentStore { src_deployment.manifest.history_blocks, )?; - // Analyze all tables for this deployment - for entity_name in dst.tables.keys() { - self.analyze_with_conn(site.cheap_clone(), entity_name.as_str(), conn)?; - } - // The `earliest_block` for `src` might have changed while // we did the copy if `src` was pruned while we copied; // adjusting it very late in the copy process ensures that diff --git a/store/postgres/src/relational/ddl.rs b/store/postgres/src/relational/ddl.rs index 55e116272d1..980bca2b9fd 100644 --- a/store/postgres/src/relational/ddl.rs +++ b/store/postgres/src/relational/ddl.rs @@ -269,7 +269,11 @@ impl Table { (method, index_expr) } - pub(crate) fn create_postponed_indexes(&self, skip_colums: Vec) -> Vec { + pub(crate) fn create_postponed_indexes( + &self, + skip_colums: Vec, + concurrently: bool, + ) -> Vec { let mut indexing_queries = vec![]; let columns = self.columns_to_index(); @@ -281,8 +285,9 @@ impl Table { && column.name.as_str() != "id" && !skip_colums.contains(&column.name.to_string()) { + let conc = if concurrently { "concurrently " } else { "" }; let sql = format!( - "create index concurrently if not exists attr_{table_index}_{column_index}_{table_name}_{column_name}\n on {qname} using {method}({index_expr});\n", + "create index {conc}if not exists attr_{table_index}_{column_index}_{table_name}_{column_name}\n on {qname} using {method}({index_expr});\n", table_index = self.position, table_name = self.name, column_name = column.name, diff --git a/store/postgres/src/relational/ddl_tests.rs b/store/postgres/src/relational/ddl_tests.rs index 86e9f232d49..bb1dcc67f46 100644 --- a/store/postgres/src/relational/ddl_tests.rs +++ b/store/postgres/src/relational/ddl_tests.rs @@ -158,7 +158,7 @@ fn generate_postponed_indexes() { let layout = test_layout(THING_GQL); let table = layout.table(&SqlName::from("Scalar")).unwrap(); let skip_colums = vec!["id".to_string()]; - let query_vec = table.create_postponed_indexes(skip_colums); + let query_vec = table.create_postponed_indexes(skip_colums, true); assert!(query_vec.len() == 7); let queries = query_vec.join(" "); check_eqv(THING_POSTPONED_INDEXES, &queries)