-
Notifications
You must be signed in to change notification settings - Fork 2.6k
/
Copy pathfake_app.py
63 lines (46 loc) · 1.8 KB
/
fake_app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import multiprocessing
import typing
from multiprocessing import Event as PEvent
from multiprocessing import Process
from threading import Event, Thread
from unittest.mock import patch
from redis import Redis
class FakeApp:
def __init__(self, client: Redis, logic: typing.Callable[[Redis], None]):
self.client = client
self.logic = logic
self.disconnects = 0
def run(self) -> (Event, Thread):
e = Event()
t = Thread(target=self._run_logic, args=(e,))
t.start()
return e, t
def _run_logic(self, e: Event):
with patch.object(
self.client, "_disconnect_raise", wraps=self.client._disconnect_raise
) as spy:
while not e.is_set():
self.logic(self.client)
self.disconnects = spy.call_count
class FakeSubscriber:
def __init__(self, client: Redis, logic: typing.Callable[[dict], None]):
self.client = client
self.logic = logic
self.disconnects = multiprocessing.Value("i", 0)
def run(self, channel: str) -> (PEvent, Process):
e, started = PEvent(), PEvent()
p = Process(target=self._run_logic, args=(e, started, channel))
p.start()
return e, started, p
def _run_logic(self, should_stop: PEvent, started: PEvent, channel: str):
pubsub = self.client.pubsub()
with patch.object(
pubsub, "_disconnect_raise_connect", wraps=pubsub._disconnect_raise_connect
) as spy_pubsub:
pubsub.subscribe(channel)
started.set()
while not should_stop.is_set():
message = pubsub.get_message(ignore_subscribe_messages=True, timeout=1)
if message:
self.logic(message)
self.disconnects.value = spy_pubsub.call_count