This repository was archived by the owner on Sep 3, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 79
/
Copy path_async.py
85 lines (68 loc) · 3.31 KB
/
_async.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under
# the License.
"""Decorators for async methods and functions to dispatch on threads and support chained calls."""
from __future__ import absolute_import
from __future__ import unicode_literals
from builtins import object
import abc
import concurrent.futures
import functools
from . import _job
from future.utils import with_metaclass
class async_(with_metaclass(abc.ABCMeta, object)):
""" Base class for async_function/async_method. Creates a wrapped function/method that will
run the original function/method on a thread pool worker thread and return a Job instance
for monitoring the status of the thread.
"""
executor = concurrent.futures.ThreadPoolExecutor(max_workers=50) # Pool for doing the work.
def __init__(self, function):
self._function = function
# Make the wrapper get attributes like docstring from wrapped method.
functools.update_wrapper(self, function)
@staticmethod
def _preprocess_args(*args):
# Pre-process arguments - if any are themselves Futures block until they can be resolved.
return [arg.result() if isinstance(arg, concurrent.futures.Future) else arg for arg in args]
@staticmethod
def _preprocess_kwargs(**kwargs):
# Pre-process keyword arguments - if any are Futures block until they can be resolved.
return {kw: (arg.result() if isinstance(arg, concurrent.futures.Future) else arg)
for kw, arg in list(kwargs.items())}
@abc.abstractmethod
def _call(self, *args, **kwargs):
return
def __call__(self, *args, **kwargs):
# Queue the call up in the thread pool.
return _job.Job(future=self.executor.submit(self._call, *args, **kwargs))
class async_function(async_):
""" This decorator can be applied to any static function that makes blocking calls to create
a modified version that creates a Job and returns immediately; the original
method will be called on a thread pool worker thread.
"""
def _call(self, *args, **kwargs):
# Call the wrapped method.
return self._function(*async_._preprocess_args(*args), **async_._preprocess_kwargs(**kwargs))
class async_method(async_):
""" This decorator can be applied to any class instance method that makes blocking calls to create
a modified version that creates a Job and returns immediately; the original method will be
called on a thread pool worker thread.
"""
def _call(self, *args, **kwargs):
# Call the wrapped method.
return self._function(self.obj, *async_._preprocess_args(*args),
**async_._preprocess_kwargs(**kwargs))
def __get__(self, instance, owner):
# This is important for attribute inheritance and setting self.obj so it can be
# passed as first argument to wrapped method.
self.cls = owner
self.obj = instance
return self