-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathasyncblink.py
62 lines (44 loc) · 1.68 KB
/
asyncblink.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# -*- coding: utf-8 -*-
import asyncio
from collections import defaultdict
import inspect
from blinker import Signal
from blinker.base import ANY
class AsyncSignal(Signal):
"""AsyncSignal can handle sync and coroutine functions as receivers.
Note that when using a coroutine as receiver you will get a task
as the result. You must await it yourself to get the result of the
coroutine
"""
_tasks = set()
def receivers_for(self, sender):
for receiver in super().receivers_for(sender):
if asyncio.iscoroutinefunction(receiver):
def wrap(*args, **kwargs):
t = asyncio.ensure_future(receiver(*args, **kwargs))
type(self)._tasks.add(t)
t.add_done_callback(lambda t: type(self)._tasks.remove(t))
return t
yield wrap
else:
yield receiver
class NamedAsyncSignal(AsyncSignal):
"""A named generic notification emitter."""
def __init__(self, name, doc=None):
super().__init__(doc)
#: The name of this signal.
self.name = name
def __repr__(self):
base = super(NamedAsyncSignal, self).__repr__()
return "%s; %r>" % (base[:-1], self.name)
class Namespace(dict):
"""A mapping of signal names to signals."""
def signal(self, name, doc=None):
"""Return the :class:`NamedSignal` *name*, creating it if required.
Repeated calls to this function will return the same signal object.
"""
try:
return self[name]
except KeyError:
return self.setdefault(name, NamedAsyncSignal(name, doc))
signal = Namespace().signal