|
| 1 | +import time |
1 | 2 | import unittest
|
2 | 3 | from crate.client import connect
|
3 | 4 | from crate.client.exceptions import ProgrammingError
|
@@ -88,6 +89,10 @@ def _test_rolling_upgrade(self, path, nodes):
|
88 | 89 | }
|
89 | 90 | cluster = self._new_cluster(path.from_version, nodes, settings=settings)
|
90 | 91 | cluster.start()
|
| 92 | + replica_cluster = None |
| 93 | + if path.from_version.startswith("5"): |
| 94 | + replica_cluster = self._new_cluster(path.from_version, 1, settings=settings, explicit_discovery=False) |
| 95 | + replica_cluster.start() |
91 | 96 | with connect(cluster.node().http_url, error_trace=True) as conn:
|
92 | 97 | c = conn.cursor()
|
93 | 98 | c.execute("create user arthur with (password = 'secret')")
|
@@ -133,6 +138,21 @@ def _test_rolling_upgrade(self, path, nodes):
|
133 | 138 | ) partitioned by (a) clustered into 1 shards with (number_of_replicas = 0)
|
134 | 139 | ''')
|
135 | 140 |
|
| 141 | + # FDW: two CrateDB clusters setting up foreign data wrappers bidirectionally |
| 142 | + if int(path.from_version.split('.')[1]) >= 7: |
| 143 | + c.execute("create table doc.y (a int) clustered into 1 shards with (number_of_replicas=0)") |
| 144 | + expected_active_shards += 1 |
| 145 | + with connect(replica_cluster.node().http_url, error_trace=True) as replica_conn: |
| 146 | + rc = replica_conn.cursor() |
| 147 | + psql_port = cluster.node().addresses.psql.port |
| 148 | + replica_psql_port = replica_cluster.node().addresses.psql.port |
| 149 | + assert 5430 <= psql_port <= 5440 and 5430 <= replica_psql_port <= 5440 |
| 150 | + rc.execute("create table doc.y (a int) clustered into 1 shards with (number_of_replicas=0)") |
| 151 | + rc.execute(f"CREATE SERVER source FOREIGN DATA WRAPPER jdbc OPTIONS (url 'jdbc:postgresql://localhost:{psql_port}/')") |
| 152 | + rc.execute("CREATE FOREIGN TABLE doc.remote_y (a int) SERVER source OPTIONS (schema_name 'doc', table_name 'y')") |
| 153 | + c.execute(f"CREATE SERVER remote FOREIGN DATA WRAPPER jdbc OPTIONS (url 'jdbc:postgresql://localhost:{replica_psql_port}/')") |
| 154 | + c.execute("CREATE FOREIGN TABLE doc.remote_y (a int) SERVER remote OPTIONS (schema_name 'doc', table_name 'y')") |
| 155 | + |
136 | 156 | c.execute('''
|
137 | 157 | CREATE FUNCTION foo(INT)
|
138 | 158 | RETURNS INT
|
@@ -282,6 +302,26 @@ def _test_rolling_upgrade(self, path, nodes):
|
282 | 302 | c.execute("select version['created'] from information_schema.table_partitions where table_name = 't3' and values['a'] = ?", [idx])
|
283 | 303 | self.assertEqual(c.fetchall(), [[partition_version]])
|
284 | 304 |
|
| 305 | + # Ensure FDWs are functional |
| 306 | + if int(path.from_version.split('.')[1]) >= 7: |
| 307 | + with connect(replica_cluster.node().http_url, error_trace=True) as replica_conn: |
| 308 | + rc = replica_conn.cursor() |
| 309 | + wait_for_active_shards(c) |
| 310 | + wait_for_active_shards(rc) |
| 311 | + rc.execute("select count(a) from doc.remote_y") |
| 312 | + count = rc.fetchall()[0][0] |
| 313 | + c.execute("insert into doc.y values (1)") |
| 314 | + c.execute("refresh table doc.y") |
| 315 | + rc.execute("select count(a) from doc.remote_y") |
| 316 | + self.assertEqual(rc.fetchall()[0][0], count + 1) |
| 317 | + |
| 318 | + c.execute("select count(a) from doc.remote_y") |
| 319 | + count = c.fetchall()[0][0] |
| 320 | + rc.execute("insert into doc.y values (1)") |
| 321 | + rc.execute("refresh table doc.y") |
| 322 | + c.execute("select count(a) from doc.remote_y") |
| 323 | + self.assertEqual(c.fetchall()[0][0], count + 1) |
| 324 | + |
285 | 325 | # Finally validate that all shards (primaries and replicas) of all partitions are started
|
286 | 326 | # and writes into the partitioned table while upgrading were successful
|
287 | 327 | with connect(cluster.node().http_url, error_trace=True) as conn:
|
|
0 commit comments