Skip to content

Commit 1989a6d

Browse files
committed
store: Make sure we always map the right set of foreign tables
1 parent fa0757b commit 1989a6d

File tree

1 file changed

+22
-19
lines changed

1 file changed

+22
-19
lines changed

store/postgres/src/connection_pool.rs

+22-19
Original file line numberDiff line numberDiff line change
@@ -1037,16 +1037,14 @@ impl PoolInner {
10371037
let result = pool
10381038
.configure_fdw(coord.servers.as_ref())
10391039
.and_then(|()| pool.drop_cross_shard_views())
1040-
.and_then(|()| migrate_schema(&pool.logger, &mut conn))
1041-
.and_then(|count| {
1042-
pool.create_cross_shard_views(coord.servers.as_ref())
1043-
.map(|()| count)
1044-
});
1040+
.and_then(|()| migrate_schema(&pool.logger, &mut conn));
10451041
debug!(&pool.logger, "Release migration lock");
10461042
advisory_lock::unlock_migration(&mut conn).unwrap_or_else(|err| {
10471043
die(&pool.logger, "failed to release migration lock", &err);
10481044
});
1049-
let result = result.and_then(|count| coord.propagate(&pool, count));
1045+
let result = result
1046+
.and_then(|count| coord.propagate(&pool, count))
1047+
.and_then(|()| pool.create_cross_shard_views(coord.servers.as_ref()));
10501048
result.unwrap_or_else(|err| die(&pool.logger, "migrations failed", &err));
10511049

10521050
// Locale check
@@ -1178,9 +1176,9 @@ impl PoolInner {
11781176
.await
11791177
}
11801178

1181-
// The foreign server `server` had schema changes, and we therefore need
1182-
// to remap anything that we are importing via fdw to make sure we are
1183-
// using this updated schema
1179+
/// The foreign server `server` had schema changes, and we therefore
1180+
/// need to remap anything that we are importing via fdw to make sure we
1181+
/// are using this updated schema
11841182
pub fn remap(&self, server: &ForeignServer) -> Result<(), StoreError> {
11851183
if &server.shard == &*PRIMARY_SHARD {
11861184
info!(&self.logger, "Mapping primary");
@@ -1211,10 +1209,6 @@ impl MigrationCount {
12111209
fn had_migrations(&self) -> bool {
12121210
self.old != self.new
12131211
}
1214-
1215-
fn is_new(&self) -> bool {
1216-
self.old == 0
1217-
}
12181212
}
12191213

12201214
/// Run all schema migrations.
@@ -1334,13 +1328,22 @@ impl PoolCoordinator {
13341328
/// code that does _not_ hold the migration lock as it will otherwise
13351329
/// deadlock
13361330
fn propagate(&self, pool: &PoolInner, count: MigrationCount) -> Result<(), StoreError> {
1337-
// pool is a new shard, map all other shards into it
1338-
if count.is_new() {
1339-
for server in self.servers.iter() {
1340-
pool.remap(server)?;
1341-
}
1331+
// Strictly speaking, we only need to remap all these servers into
1332+
// `pool` if `pool` is a new shard (`count.is_new())`) or if the
1333+
// list of tables that are mapped have changed from the previous
1334+
// verison. Since creating foreign tables is pretty fast, we don't
1335+
// try to be too clever and just recreate these mappings every time.
1336+
// If that slows startup down unacceptably, we need to do something
1337+
// better here
1338+
for server in self.servers.iter() {
1339+
pool.remap(server)?;
13421340
}
1343-
// pool had schema changes, refresh the import from pool into all other shards
1341+
1342+
// pool had schema changes, refresh the import from pool into all
1343+
// other shards. This makes sure that schema changes to
1344+
// already-mapped tables are propagated to all other shards. Since
1345+
// we run `propagate` after migrations have been applied to `pool`,
1346+
// we can be sure that these mappings use the correct schema
13441347
if count.had_migrations() {
13451348
let server = self.server(&pool.shard)?;
13461349
for pool in self.pools.lock().unwrap().values() {

0 commit comments

Comments
 (0)