-
Notifications
You must be signed in to change notification settings - Fork 2.6k
/
Copy pathtest_connection_interruptions.py
157 lines (120 loc) · 4.55 KB
/
test_connection_interruptions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import multiprocessing
import time
from typing import List
import pytest
from redis import BusyLoadingError, Redis
from redis.backoff import ExponentialBackoff
from redis.exceptions import ConnectionError as RedisConnectionError
from redis.exceptions import TimeoutError as RedisTimeoutError
from redis.retry import Retry
from ..conftest import _get_client
from . import Endpoint, get_endpoint
from .fake_app import FakeApp, FakeSubscriber
from .fault_injection_client import FaultInjectionClient
@pytest.fixture
def endpoint_name():
return "re-standalone"
@pytest.fixture
def endpoint(request: pytest.FixtureRequest, endpoint_name: str):
try:
return get_endpoint(request, endpoint_name)
except FileNotFoundError as e:
pytest.skip(
f"Skipping scenario test because endpoints file is missing: {str(e)}"
)
@pytest.fixture
def clients(request: pytest.FixtureRequest, endpoint: Endpoint):
# Use Recommended settings
retry = Retry(ExponentialBackoff(base=1), 3)
clients = []
for _ in range(2):
r = _get_client(
Redis,
request,
decode_responses=True,
from_url=endpoint.url,
retry=retry,
retry_on_error=[
BusyLoadingError,
RedisConnectionError,
RedisTimeoutError,
# FIXME: This is a workaround for a bug in redis-py
# https://github.com/redis/redis-py/issues/3203
ConnectionError,
TimeoutError,
],
)
r.flushdb()
clients.append(r)
return clients
@pytest.fixture
def fault_injection_client(request: pytest.FixtureRequest):
return FaultInjectionClient()
@pytest.mark.parametrize("action", ("dmc_restart", "network_failure"))
def test_connection_interruptions(
clients: List[Redis],
endpoint: Endpoint,
fault_injection_client: FaultInjectionClient,
action: str,
):
client = clients.pop()
app = FakeApp(client, lambda c: c.set("foo", "bar"))
stop_app, thread = app.run()
triggered_action = fault_injection_client.trigger_action(
action, {"bdb_id": endpoint.bdb_id}
)
triggered_action.wait_until_complete()
stop_app.set()
thread.join()
if triggered_action.status == "failed":
pytest.fail(f"Action failed: {triggered_action.data['error']}")
assert app.disconnects > 0, "Client did not disconnect"
@pytest.mark.parametrize("action", ("dmc_restart", "network_failure"))
def test_pubsub_with_connection_interruptions(
clients: List[Redis],
endpoint: Endpoint,
fault_injection_client: FaultInjectionClient,
action: str,
):
channel = "test"
# Subscriber is executed in a separate process to ensure it reacts
# to the disconnection at the same time as the publisher
with multiprocessing.Manager() as manager:
received_messages = manager.list()
def read_message(message):
nonlocal received_messages
if message and message["type"] == "message":
received_messages.append(message["data"])
subscriber_client = clients.pop()
subscriber = FakeSubscriber(subscriber_client, read_message)
stop_subscriber, subscriber_started, subscriber_t = subscriber.run(channel)
# Allow subscriber subscribe to the channel
subscriber_started.wait(timeout=5)
messages_sent = 0
def publish_message(c):
nonlocal messages_sent, channel
messages_sent += 1
c.publish(channel, messages_sent)
publisher_client = clients.pop()
publisher = FakeApp(publisher_client, publish_message)
stop_publisher, publisher_t = publisher.run()
triggered_action = fault_injection_client.trigger_action(
action, {"bdb_id": endpoint.bdb_id}
)
triggered_action.wait_until_complete()
last_message_sent_after_trigger = messages_sent
time.sleep(3) # Wait for the publisher to send more messages
stop_publisher.set()
publisher_t.join()
stop_subscriber.set()
subscriber_t.join()
assert publisher.disconnects > 0
assert subscriber.disconnects.value > 0
if triggered_action.status == "failed":
pytest.fail(f"Action failed: {triggered_action.data['error']}")
assert (
last_message_sent_after_trigger < messages_sent
), "No messages were sent after the failure"
assert (
int(received_messages[-1]) == messages_sent
), "Not all messages were received"