|  | 
| 2 | 2 | from crate.client import connect | 
| 3 | 3 | from crate.client.exceptions import ProgrammingError | 
| 4 | 4 | 
 | 
| 5 |  | -from crate.qa.tests import NodeProvider, insert_data, wait_for_active_shards, UpgradePath | 
|  | 5 | +from crate.qa.tests import NodeProvider, insert_data, wait_for_active_shards, UpgradePath, assert_busy | 
| 6 | 6 | 
 | 
| 7 | 7 | ROLLING_UPGRADES_V4 = ( | 
| 8 | 8 |     # 4.0.0 -> 4.0.1 -> 4.0.2 don't support rolling upgrades due to a bug | 
|  | 
| 30 | 30 | ) | 
| 31 | 31 | 
 | 
| 32 | 32 | ROLLING_UPGRADES_V5 = ( | 
| 33 |  | -    UpgradePath('5.0.x', '5.1.x'), | 
| 34 |  | -    UpgradePath('5.1.x', '5.2.x'), | 
| 35 |  | -    UpgradePath('5.2.x', '5.3.x'), | 
| 36 |  | -    UpgradePath('5.3.x', '5.4.x'), | 
| 37 |  | -    UpgradePath('5.4.x', '5.5.x'), | 
| 38 |  | -    UpgradePath('5.5.x', '5.6.x'), | 
| 39 |  | -    UpgradePath('5.6.x', '5.7.x'), | 
| 40 |  | -    UpgradePath('5.7.x', '5.8.x'), | 
| 41 |  | -    UpgradePath('5.8.x', '5.9.x'), | 
| 42 |  | -    UpgradePath('5.9.x', '5.10.x'), | 
| 43 |  | -    UpgradePath('5.10.x', '5.10'), | 
| 44 |  | -    UpgradePath('5.10', 'latest-nightly'), | 
|  | 33 | +    UpgradePath('5.10', 'branch:jeeminso/temp'), | 
| 45 | 34 | ) | 
| 46 | 35 | 
 | 
| 47 | 36 | 
 | 
| @@ -88,6 +77,10 @@ def _test_rolling_upgrade(self, path, nodes): | 
| 88 | 77 |         } | 
| 89 | 78 |         cluster = self._new_cluster(path.from_version, nodes, settings=settings) | 
| 90 | 79 |         cluster.start() | 
|  | 80 | +        replica_cluster = None | 
|  | 81 | +        if int(path.from_version.split('.')[0]) >= 5 and int(path.from_version.split('.')[1]) >= 10: | 
|  | 82 | +            replica_cluster = self._new_cluster(path.from_version, 1, settings=settings, explicit_discovery=False) | 
|  | 83 | +            replica_cluster.start() | 
| 91 | 84 |         with connect(cluster.node().http_url, error_trace=True) as conn: | 
| 92 | 85 |             c = conn.cursor() | 
| 93 | 86 |             c.execute("create user arthur with (password = 'secret')") | 
| @@ -152,6 +145,32 @@ def _test_rolling_upgrade(self, path, nodes): | 
| 152 | 145 |             # Add the shards of the new partition primaries | 
| 153 | 146 |             expected_active_shards += shards | 
| 154 | 147 | 
 | 
|  | 148 | +            # Set up tables for logical replications | 
|  | 149 | +            def num_docs_x(cursor): | 
|  | 150 | +                cursor.execute("select count(*) from doc.x") | 
|  | 151 | +                return cursor.fetchall()[0][0] | 
|  | 152 | +            def num_docs_rx(cursor): | 
|  | 153 | +                cursor.execute("select count(*) from doc.rx") | 
|  | 154 | +                return cursor.fetchall()[0][0] | 
|  | 155 | + | 
|  | 156 | +            if int(path.from_version.split('.')[0]) >= 5 and int(path.from_version.split('.')[1]) >= 10: | 
|  | 157 | +                c.execute("create table doc.x (a int) clustered into 1 shards with (number_of_replicas=0)") | 
|  | 158 | +                c.execute("create publication p for table doc.x") | 
|  | 159 | +                with connect(replica_cluster.node().http_url, error_trace=True) as replica_conn: | 
|  | 160 | +                    rc = replica_conn.cursor() | 
|  | 161 | +                    transport_port = cluster.node().addresses.transport.port | 
|  | 162 | +                    replica_transport_port = replica_cluster.node().addresses.transport.port | 
|  | 163 | +                    assert 4300 <= transport_port <= 4310 and 4300 <= replica_transport_port <= 4310 | 
|  | 164 | +                    rc.execute("create table doc.rx (a int) clustered into 1 shards with (number_of_replicas=0)") | 
|  | 165 | +                    rc.execute("create publication rp for table doc.rx") | 
|  | 166 | +                    rc.execute(f"create subscription rs connection 'crate://localhost:{transport_port}?user=crate&sslmode=sniff' publication p") | 
|  | 167 | +                    wait_for_active_shards(rc, 2) # doc.rx created via create-table and doc.x that is subscribed | 
|  | 168 | +                    assert_busy(lambda: self.assertEqual(num_docs_x(rc), 0)) | 
|  | 169 | +                c.execute(f"create subscription s connection 'crate://localhost:{replica_transport_port}?user=crate&sslmode=sniff' publication rp") | 
|  | 170 | +                expected_active_shards += 2 | 
|  | 171 | +                wait_for_active_shards(c, expected_active_shards) | 
|  | 172 | +                assert_busy(lambda: self.assertEqual(num_docs_rx(c), 0)) | 
|  | 173 | + | 
| 155 | 174 |         for idx, node in enumerate(cluster): | 
| 156 | 175 |             # Enforce an old version node be a handler to make sure that an upgraded node can serve 'select *' from an old version node. | 
| 157 | 176 |             # Otherwise upgraded node simply requests N-1 columns from old version with N columns and it always works. | 
| @@ -282,6 +301,25 @@ def _test_rolling_upgrade(self, path, nodes): | 
| 282 | 301 |                         c.execute("select version['created'] from information_schema.table_partitions where table_name = 't3' and values['a'] = ?", [idx]) | 
| 283 | 302 |                         self.assertEqual(c.fetchall(), [[partition_version]]) | 
| 284 | 303 | 
 | 
|  | 304 | +                # Ensure logical replications works | 
|  | 305 | +                if int(path.from_version.split('.')[0]) >= 5 and int(path.from_version.split('.')[1]) >= 10: | 
|  | 306 | +                    with connect(replica_cluster.node().http_url, error_trace=True) as replica_conn: | 
|  | 307 | +                        rc = replica_conn.cursor() | 
|  | 308 | + | 
|  | 309 | +                        # Cannot drop replicated tables | 
|  | 310 | +                        with self.assertRaises(ProgrammingError): | 
|  | 311 | +                            rc.execute("drop table doc.x") | 
|  | 312 | +                            c.execute("drop table doc.rx") | 
|  | 313 | + | 
|  | 314 | +                        count = num_docs_x(rc) | 
|  | 315 | +                        count2 = num_docs_rx(c) | 
|  | 316 | + | 
|  | 317 | +                        c.execute("insert into doc.x values (1)") | 
|  | 318 | +                        rc.execute("insert into doc.rx values (1)") | 
|  | 319 | + | 
|  | 320 | +                        assert_busy(lambda: self.assertEqual(num_docs_x(rc), count + 1)) | 
|  | 321 | +                        assert_busy(lambda: self.assertEqual(num_docs_rx(c), count2 + 1)) | 
|  | 322 | + | 
| 285 | 323 |         # Finally validate that all shards (primaries and replicas) of all partitions are started | 
| 286 | 324 |         # and writes into the partitioned table while upgrading were successful | 
| 287 | 325 |         with connect(cluster.node().http_url, error_trace=True) as conn: | 
|  | 
0 commit comments