@@ -81,8 +81,7 @@ async def shutdown(self):
81
81
def _bind_concurrent (self ):
82
82
for attr_name in dir (self .__class__ ):
83
83
v = getattr (self .__class__ , attr_name )
84
- if isinstance (v , Concurrent ):
85
- setattr (self , attr_name , v .bind (self ))
84
+ isinstance (v , Concurrent ) and v .bind (self )
86
85
87
86
async def enqueue_job (self , func_name : str , * args , queue : str = None , ** kwargs ):
88
87
"""
@@ -128,30 +127,30 @@ class Concurrent:
128
127
"""
129
128
__slots__ = ['_func' , '_dft_queue' , '_self_obj' ]
130
129
131
- def __init__ (self , func , dft_queue = None , self_obj = None ):
132
- if self_obj is None :
130
+ def __init__ (self , * , func , dft_queue = None , self_obj = None ):
131
+ self ._self_obj = self_obj
132
+ # if we're already bound we assume func is of the correct type and skip repeat logging
133
+ if not self .bound :
133
134
if not inspect .iscoroutinefunction (func ):
134
135
raise TypeError ('{} is not a coroutine function' .format (func .__qualname__ ))
135
136
136
137
main_logger .debug ('registering concurrent function %s' , func .__qualname__ )
137
138
self ._func = func
138
139
self ._dft_queue = dft_queue
139
- self ._self_obj = self_obj
140
140
141
- def bind (self , obj : object ) -> object :
141
+ def bind (self , obj : object ):
142
142
"""
143
- Equivalent of binding a normal function to an object.
144
-
145
- A new instance of Concurrent needs to exist for each function it's bound to.
143
+ Equivalent of binding a normal function to an object. A new instance of Concurrent is created and then
144
+ the reference on the parent object updated to that.
146
145
147
146
:param obj: object to bind the function to eg. "self" in the eyes of func.
148
- :return: instance of Concurrent, self if it's not yet bound, otherwise a new instance
149
147
"""
150
- if self ._self_obj is None :
151
- self ._self_obj = obj
152
- return self
153
- else :
154
- return Concurrent (func = self ._func , dft_queue = self ._dft_queue , self_obj = obj )
148
+ new_inst = Concurrent (func = self ._func , dft_queue = self ._dft_queue , self_obj = obj )
149
+ setattr (obj , self ._func .__name__ , new_inst )
150
+
151
+ @property
152
+ def bound (self ):
153
+ return self ._self_obj is not None
155
154
156
155
async def __call__ (self , * args , ** kwargs ):
157
156
return await self .defer (* args , ** kwargs )
0 commit comments