Skip to content

Commit cb9dcb8

Browse files
committed
test: add timeouts to prevent integration tests from hanging
1 parent 74ae5de commit cb9dcb8

File tree

13 files changed

+95
-35
lines changed

13 files changed

+95
-35
lines changed

.github/workflows/integration_tests.yml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ on:
55
push:
66
branches:
77
- main
8+
- test/hanging-efm2-integration-test
89

910
permissions:
1011
id-token: write # This is required for requesting the JWT
@@ -17,9 +18,9 @@ jobs:
1718
strategy:
1819
fail-fast: false
1920
matrix:
20-
python-version: [ "3.8", "3.11" ]
21-
engine-version: [ "lts", "latest"]
22-
environment: ["mysql", "pg"]
21+
python-version: [ "3.11" ]
22+
engine-version: [ "lts", "latest" ]
23+
environment: ["mysql"]
2324

2425
steps:
2526
- name: 'Clone repository'
@@ -56,6 +57,7 @@ jobs:
5657
RDS_DB_REGION: ${{ secrets.AWS_DEFAULT_REGION }}
5758
AURORA_MYSQL_DB_ENGINE_VERSION: ${{ matrix.engine-version }}
5859
AURORA_PG_ENGINE_VERSION: ${{ matrix.engine-version }}
60+
FILTER: test_aurora_failover
5961

6062
- name: 'Archive results'
6163
if: always()

aws_advanced_python_wrapper/failover_plugin.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,7 @@ def _failover_writer(self):
314314
elif result is None or not result.is_connected:
315315
raise FailoverFailedError(Messages.get("FailoverPlugin.UnableToConnectToWriter"))
316316

317+
logger.info("fetching new writer")
317318
writer_host = self._get_writer(result.topology)
318319
allowed_hosts = self._plugin_service.hosts
319320
allowed_hostnames = [host.host for host in allowed_hosts]
@@ -324,6 +325,7 @@ def _failover_writer(self):
324325
"<null>" if writer_host is None else writer_host.host,
325326
LogUtils.log_topology(allowed_hosts)))
326327

328+
logger.info("setting new writer: " + writer_host.host)
327329
self._plugin_service.set_current_connection(result.new_connection, writer_host)
328330
logger.info("FailoverPlugin.EstablishedConnection", self._plugin_service.current_host_info)
329331

aws_advanced_python_wrapper/states/session_state.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,3 +92,6 @@ def copy(self):
9292
new_session_state.readonly = self.readonly.copy()
9393

9494
return new_session_state
95+
96+
def __str__(self):
97+
return f"autocommit: {self.auto_commit}, readonly: {self.readonly}"

aws_advanced_python_wrapper/utils/pg_exception_handler.py

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,19 @@ class PgExceptionHandler(ExceptionHandler):
2727
_PAM_AUTHENTICATION_FAILED_MSG = "PAM authentication failed"
2828
_CONNECTION_FAILED = "connection failed"
2929
_CONSUMING_INPUT_FAILED = "consuming input failed"
30+
_CONNECTION_SOCKET_CLOSED = "connection socket closed"
3031

31-
_NETWORK_ERRORS: List[str]
32-
_ACCESS_ERRORS: List[str]
32+
_NETWORK_ERROR_MESSAGES: List[str] = [
33+
_CONNECTION_FAILED,
34+
_CONSUMING_INPUT_FAILED,
35+
_CONNECTION_SOCKET_CLOSED
36+
]
37+
_ACCESS_ERROR_MESSAGES: List[str] = [
38+
_PASSWORD_AUTHENTICATION_FAILED_MSG,
39+
_PAM_AUTHENTICATION_FAILED_MSG
40+
]
41+
_NETWORK_ERROR_CODES: List[str]
42+
_ACCESS_ERROR_CODES: List[str]
3343

3444
def is_network_exception(self, error: Optional[Exception] = None, sql_state: Optional[str] = None) -> bool:
3545
if isinstance(error, QueryTimeoutError) or isinstance(error, ConnectionTimeout):
@@ -43,15 +53,15 @@ def is_network_exception(self, error: Optional[Exception] = None, sql_state: Opt
4353
# getattr may throw an AttributeError if the error does not have a `sqlstate` attribute
4454
pass
4555

46-
if sql_state is not None and sql_state in self._NETWORK_ERRORS:
56+
if sql_state is not None and sql_state in self._NETWORK_ERROR_CODES:
4757
return True
4858

4959
if isinstance(error, OperationalError):
5060
if len(error.args) == 0:
5161
return False
5262
# Check the error message if this is a generic error
5363
error_msg: str = error.args[0]
54-
return self._CONNECTION_FAILED in error_msg or self._CONSUMING_INPUT_FAILED in error_msg
64+
return any(msg in error_msg for msg in self._NETWORK_ERROR_MESSAGES)
5565

5666
return False
5767

@@ -63,7 +73,7 @@ def is_login_exception(self, error: Optional[Exception] = None, sql_state: Optio
6373
if sql_state is None and hasattr(error, "sqlstate") and error.sqlstate is not None:
6474
sql_state = error.sqlstate
6575

66-
if sql_state is not None and sql_state in self._ACCESS_ERRORS:
76+
if sql_state is not None and sql_state in self._ACCESS_ERROR_CODES:
6777
return True
6878

6979
if isinstance(error, OperationalError):
@@ -72,15 +82,14 @@ def is_login_exception(self, error: Optional[Exception] = None, sql_state: Optio
7282

7383
# Check the error message if this is a generic error
7484
error_msg: str = error.args[0]
75-
if self._PASSWORD_AUTHENTICATION_FAILED_MSG in error_msg \
76-
or self._PAM_AUTHENTICATION_FAILED_MSG in error_msg:
85+
if any(msg in error_msg for msg in self._ACCESS_ERROR_MESSAGES):
7786
return True
7887

7988
return False
8089

8190

8291
class SingleAzPgExceptionHandler(PgExceptionHandler):
83-
_NETWORK_ERRORS: List[str] = [
92+
_NETWORK_ERROR_CODES: List[str] = [
8493
"53", # insufficient resources
8594
"57P01", # admin shutdown
8695
"57P02", # crash shutdown
@@ -92,14 +101,14 @@ class SingleAzPgExceptionHandler(PgExceptionHandler):
92101
"XX" # internal error(backend)
93102
]
94103

95-
_ACCESS_ERRORS: List[str] = [
104+
_ACCESS_ERROR_CODES: List[str] = [
96105
"28000", # PAM authentication errors
97106
"28P01"
98107
]
99108

100109

101110
class MultiAzPgExceptionHandler(PgExceptionHandler):
102-
_NETWORK_ERRORS: List[str] = [
111+
_NETWORK_ERROR_CODES: List[str] = [
103112
"28000", # access denied during reboot, this should be considered a temporary failure
104113
"53", # insufficient resources
105114
"57P01", # admin shutdown
@@ -112,4 +121,4 @@ class MultiAzPgExceptionHandler(PgExceptionHandler):
112121
"XX" # internal error(backend)
113122
]
114123

115-
_ACCESS_ERRORS: List[str] = ["28P01"]
124+
_ACCESS_ERROR_CODES: List[str] = ["28P01"]

tests/integration/container/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ def pytest_runtest_setup(item):
9292
# Wait up to 5min
9393
instances: List[str] = list()
9494
start_time = timeit.default_timer()
95-
while (len(instances) != request.get_num_of_instances()
95+
while (len(instances) < request.get_num_of_instances()
9696
or len(instances) == 0
9797
or not rds_utility.is_db_instance_writer(instances[0])) and (
9898
timeit.default_timer() - start_time) < 300: # 5 min

tests/integration/container/test_aurora_failover.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,15 @@ def aurora_utility(self):
5656

5757
@pytest.fixture(scope='class')
5858
def props(self):
59-
p: Properties = Properties({"plugins": "failover", "connect_timeout": 60, "topology_refresh_ms": 10, "autocommit": True})
59+
p: Properties = Properties({
60+
"plugins": "failover",
61+
"socket_timeout": 30,
62+
"connect_timeout": 10,
63+
"monitoring-connect_timeout": 5,
64+
"monitoring-socket_timeout": 5,
65+
"topology_refresh_ms": 10,
66+
"autocommit": True
67+
})
6068

6169
features = TestEnvironment.get_current().get_features()
6270
if TestEnvironmentFeatures.TELEMETRY_TRACES_ENABLED in features \

tests/integration/container/test_basic_connectivity.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,15 @@ def test_proxied_wrapper_connection_failed(
131131
@enable_on_deployments([DatabaseEngineDeployment.AURORA, DatabaseEngineDeployment.RDS_MULTI_AZ_CLUSTER])
132132
@enable_on_features([TestEnvironmentFeatures.ABORT_CONNECTION_SUPPORTED])
133133
def test_wrapper_connection_reader_cluster_with_efm_enabled(self, test_driver: TestDriver, conn_utils, plugins):
134+
props: Properties = Properties({
135+
WrapperProperties.PLUGINS.name: plugins,
136+
"socket_timeout": 5,
137+
"connect_timeout": 5,
138+
"monitoring-connect_timeout": 3,
139+
"monitoring-socket_timeout": 3,
140+
"autocommit": True})
134141
target_driver_connect = DriverHelper.get_connect_func(test_driver)
135-
conn = AwsWrapperConnection.connect(
136-
target_driver_connect,
137-
**conn_utils.get_connect_params(conn_utils.reader_cluster_host),
138-
plugins=plugins, connect_timeout=10)
142+
conn = AwsWrapperConnection.connect(target_driver_connect, **conn_utils.get_connect_params(conn_utils.reader_cluster_host), **props)
139143
cursor = conn.cursor()
140144
cursor.execute("SELECT 1")
141145
result = cursor.fetchone()

tests/integration/container/test_host_monitoring_v2.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
from aws_advanced_python_wrapper.utils.properties import (Properties,
2323
WrapperProperties)
2424
from tests.integration.container.utils.conditions import (
25-
disable_on_features, enable_on_deployments)
25+
disable_on_engines, disable_on_features, enable_on_deployments)
26+
from tests.integration.container.utils.database_engine import DatabaseEngine
2627
from tests.integration.container.utils.database_engine_deployment import \
2728
DatabaseEngineDeployment
2829
from tests.integration.container.utils.driver_helper import DriverHelper
@@ -44,6 +45,7 @@
4445
@disable_on_features([TestEnvironmentFeatures.PERFORMANCE,
4546
TestEnvironmentFeatures.RUN_AUTOSCALING_TESTS_ONLY,
4647
TestEnvironmentFeatures.BLUE_GREEN_DEPLOYMENT])
48+
@disable_on_engines([DatabaseEngine.MYSQL])
4749
class TestHostMonitoringV2:
4850
@pytest.fixture(scope='class')
4951
def rds_utils(self):
@@ -55,6 +57,8 @@ def props(self):
5557
p: Properties = Properties({"plugins": "host_monitoring_v2",
5658
"socket_timeout": 30,
5759
"connect_timeout": 10,
60+
"monitoring-connect_timeout": 5,
61+
"monitoring-socket_timeout": 5,
5862
"failure_detection_time_ms": 5_000,
5963
"failure_detection_interval_ms": 5_000,
6064
"failure_detection_count": 1,

tests/integration/container/test_read_write_splitting.py

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,13 @@ def props(self):
8282
@pytest.fixture(scope='class')
8383
def failover_props(self):
8484
return {
85-
"plugins": "read_write_splitting,failover", "connect_timeout": 10, "autocommit": True}
85+
"plugins": "read_write_splitting,failover,host_monitoring",
86+
"socket_timeout": 30,
87+
"connect_timeout": 10,
88+
"monitoring-connect_timeout": 5,
89+
"monitoring-socket_timeout": 5,
90+
"autocommit": True
91+
}
8692

8793
@pytest.fixture(scope='class')
8894
def proxied_props(self, props, conn_utils):
@@ -349,15 +355,13 @@ def test_failover_to_new_writer__switch_read_only(
349355
current_id = rds_utils.query_instance_id(conn)
350356
assert new_writer_id == current_id
351357

352-
@pytest.mark.parametrize("plugins", ["read_write_splitting,failover,host_monitoring_v2"])
353358
@enable_on_features([TestEnvironmentFeatures.NETWORK_OUTAGES_ENABLED,
354359
TestEnvironmentFeatures.ABORT_CONNECTION_SUPPORTED])
355360
@enable_on_num_instances(min_instances=3)
356361
@disable_on_engines([DatabaseEngine.MYSQL])
357362
def test_failover_to_new_reader__switch_read_only(
358363
self, test_environment: TestEnvironment, test_driver: TestDriver,
359-
proxied_failover_props, conn_utils, rds_utils, plugins):
360-
WrapperProperties.PLUGINS.set(proxied_failover_props, plugins)
364+
proxied_failover_props, conn_utils, rds_utils):
361365
WrapperProperties.FAILOVER_MODE.set(proxied_failover_props, "reader-or-writer")
362366

363367
target_driver_connect = DriverHelper.get_connect_func(test_driver)
@@ -398,16 +402,13 @@ def test_failover_to_new_reader__switch_read_only(
398402
current_id = rds_utils.query_instance_id(conn)
399403
assert other_reader_id == current_id
400404

401-
@pytest.mark.parametrize("plugins", ["read_write_splitting,failover,host_monitoring",
402-
"read_write_splitting,failover,host_monitoring_v2"])
403405
@enable_on_features([TestEnvironmentFeatures.NETWORK_OUTAGES_ENABLED,
404406
TestEnvironmentFeatures.ABORT_CONNECTION_SUPPORTED])
405407
@enable_on_num_instances(min_instances=3)
406408
@disable_on_engines([DatabaseEngine.MYSQL])
407409
def test_failover_reader_to_writer__switch_read_only(
408410
self, test_environment: TestEnvironment, test_driver: TestDriver,
409-
proxied_failover_props, conn_utils, rds_utils, plugins):
410-
WrapperProperties.PLUGINS.set(proxied_failover_props, plugins)
411+
proxied_failover_props, conn_utils, rds_utils):
411412
target_driver_connect = DriverHelper.get_connect_func(test_driver)
412413
with AwsWrapperConnection.connect(
413414
target_driver_connect, **conn_utils.get_proxy_connect_params(), **proxied_failover_props) as conn:
@@ -519,19 +520,16 @@ def test_pooled_connection__cluster_url_failover(
519520
new_driver_conn = conn.target_connection
520521
assert initial_driver_conn is not new_driver_conn
521522

522-
@pytest.mark.parametrize("plugins", ["read_write_splitting,failover,host_monitoring",
523-
"read_write_splitting,failover,host_monitoring_v2"])
524523
@enable_on_features([TestEnvironmentFeatures.FAILOVER_SUPPORTED, TestEnvironmentFeatures.NETWORK_OUTAGES_ENABLED,
525524
TestEnvironmentFeatures.ABORT_CONNECTION_SUPPORTED])
526525
@disable_on_engines([DatabaseEngine.MYSQL])
527526
def test_pooled_connection__failover_failed(
528527
self, test_environment: TestEnvironment, test_driver: TestDriver,
529-
rds_utils, conn_utils, proxied_failover_props, plugins):
528+
rds_utils, conn_utils, proxied_failover_props):
530529
writer_host = test_environment.get_writer().get_host()
531530
provider = SqlAlchemyPooledConnectionProvider(lambda _, __: {"pool_size": 1}, None, lambda host_info, props: writer_host in host_info.host)
532531
ConnectionProviderManager.set_connection_provider(provider)
533532

534-
WrapperProperties.PLUGINS.set(proxied_failover_props, plugins)
535533
WrapperProperties.FAILOVER_TIMEOUT_SEC.set(proxied_failover_props, "1")
536534
WrapperProperties.FAILURE_DETECTION_TIME_MS.set(proxied_failover_props, "1000")
537535
WrapperProperties.FAILURE_DETECTION_COUNT.set(proxied_failover_props, "1")

tests/integration/container/utils/rds_test_utility.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,10 +132,13 @@ def wait_until_instance_has_desired_status(
132132
"RdsTestUtility.InstanceDescriptionTimeout", instance_id, desired_status, wait_time_mins))
133133

134134
def wait_until_cluster_has_desired_status(self, cluster_id: str, desired_status: str) -> None:
135+
stop_time = datetime.now() + timedelta(minutes=10)
135136
cluster_info = self.get_db_cluster(cluster_id)
136137
status = cluster_info.get("Status")
137138
while status != desired_status:
138-
sleep(1)
139+
if datetime.now() > stop_time:
140+
raise TimeoutError(f"Cluster {cluster_id} did not reach status '{desired_status}' within 10 minutes.")
141+
sleep(10)
139142
cluster_info = self.get_db_cluster(cluster_id)
140143
status = cluster_info.get("Status")
141144

0 commit comments

Comments
 (0)