diff --git a/sdks/python/apache_beam/internal/cloudpickle/__init__.py b/sdks/python/apache_beam/internal/cloudpickle/__init__.py index 3768a936efd3..9233aebcf33d 100644 --- a/sdks/python/apache_beam/internal/cloudpickle/__init__.py +++ b/sdks/python/apache_beam/internal/cloudpickle/__init__.py @@ -9,6 +9,7 @@ "__version__", "Pickler", "CloudPickler", + "PurePythonPickler", "dumps", "loads", "dump", diff --git a/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py b/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py index 48980526cf18..c90fd7b03c4e 100644 --- a/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py +++ b/sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py @@ -1205,7 +1205,14 @@ def _get_dataclass_field_type_sentinel(name): return _DATACLASSE_FIELD_TYPE_SENTINELS[name] -class Pickler(pickle.Pickler): +class BaseCloudPickler: + """Class for logic that is common between FastPickler and PurePythonPickler. + Cloudpickle provides two picklers: one extending the C implementation of + the CPython pickler and another extending the pure-Python pickler. + FastPickler and PurePythonPickler inherit from BaseCloudPickler and provide + BaseCloudPickler access to either the C or pure-Python pickler by + implementing the _super_pickler() method. + """ # set of reducers defined and used by cloudpickle (private) _dispatch_table = {} _dispatch_table[classmethod] = _classmethod_reduce @@ -1294,7 +1301,7 @@ def _function_getnewargs(self, func): def dump(self, obj): try: - return super().dump(obj) + return self._super_pickler().dump(obj) except RuntimeError as e: if len(e.args) > 0 and "recursion" in e.args[0]: msg = "Could not pickle object as excessively deep recursion required." @@ -1305,14 +1312,32 @@ def dump(self, obj): def __init__(self, file, protocol=None, buffer_callback=None): if protocol is None: protocol = DEFAULT_PROTOCOL - super().__init__(file, protocol=protocol, buffer_callback=buffer_callback) # map functions __globals__ attribute ids, to ensure that functions # sharing the same global namespace at pickling time also share # their global namespace at unpickling time. self.globals_ref = {} self.proto = int(protocol) + self._super_pickler().__init__( + file, protocol=protocol, buffer_callback=buffer_callback) + + def _super_pickler(self): + """Returns a proxy object for an instance of the pickler being extended.""" + raise NotImplemented + + +if not PYPY: + + class FastPickler(BaseCloudPickler, pickle.Pickler): + """Fast pickler extending the C implementation of the CPython pickler. + The FastPickler is not available for PYPY and does not support + overriding how built-in types are pickled. + """ + def __init__(self, file, protocol=None, buffer_callback=None): + super().__init__(file, protocol, buffer_callback) + + def _super_pickler(self): + return super(BaseCloudPickler, self) - if not PYPY: # pickle.Pickler is the C implementation of the CPython pickler and # therefore we rely on reduce_override method to customize the pickler # behavior. @@ -1328,7 +1353,7 @@ def __init__(self, file, protocol=None, buffer_callback=None): # name was not a great choice given because it would collide with a # similarly named attribute in the pure-Python `pickle._Pickler` # implementation in the standard library. - dispatch = dispatch_table + dispatch = BaseCloudPickler.dispatch_table # Implementation of the reducer_override callback, in order to # efficiently serialize dynamic functions and classes by subclassing @@ -1385,111 +1410,121 @@ def reducer_override(self, obj): # dispatch_table return NotImplemented - else: - # When reducer_override is not available, hack the pure-Python - # Pickler's types.FunctionType and type savers. Note: the type saver - # must override Pickler.save_global, because pickle.py contains a - # hard-coded call to save_global when pickling meta-classes. - dispatch = pickle.Pickler.dispatch.copy() - - def _save_reduce_pickle5( - self, + +class PurePythonPickler(BaseCloudPickler, pickle._Pickler): + """Pure-Python pickler. + This picker supports overriding how built-in types are pickled. + """ + def __init__(self, file, protocol=None, buffer_callback=None): + super().__init__(file, protocol, buffer_callback) + + def _super_pickler(self): + return super(BaseCloudPickler, self) + + # When reducer_override is not available, hack the pure-Python + # Pickler's types.FunctionType and type savers. Note: the type saver + # must override Pickler.save_global, because pickle.py contains a + # hard-coded call to save_global when pickling meta-classes. + dispatch = pickle._Pickler.dispatch.copy() + + def _save_reduce_pickle5( + self, + func, + args, + state=None, + listitems=None, + dictitems=None, + state_setter=None, + obj=None, + ): + save = self.save + write = self.write + self.save_reduce( func, args, state=None, - listitems=None, - dictitems=None, - state_setter=None, - obj=None, - ): - save = self.save - write = self.write - self.save_reduce( - func, - args, - state=None, - listitems=listitems, - dictitems=dictitems, - obj=obj, - ) - # backport of the Python 3.8 state_setter pickle operations - save(state_setter) - save(obj) # simple BINGET opcode as obj is already memoized. - save(state) - write(pickle.TUPLE2) - # Trigger a state_setter(obj, state) function call. - write(pickle.REDUCE) - # The purpose of state_setter is to carry-out an - # inplace modification of obj. We do not care about what the - # method might return, so its output is eventually removed from - # the stack. - write(pickle.POP) - - def save_global(self, obj, name=None, pack=struct.pack): - """Main dispatch method. - - The name of this method is somewhat misleading: all types get - dispatched here. - """ - if obj is type(None): # noqa - return self.save_reduce(type, (None, ), obj=obj) - elif obj is type(Ellipsis): - return self.save_reduce(type, (Ellipsis, ), obj=obj) - elif obj is type(NotImplemented): - return self.save_reduce(type, (NotImplemented, ), obj=obj) - elif obj in _BUILTIN_TYPE_NAMES: - return self.save_reduce( - _builtin_type, (_BUILTIN_TYPE_NAMES[obj], ), obj=obj) - - if name is not None: - super().save_global(obj, name=name) - elif not _should_pickle_by_reference(obj, name=name): - self._save_reduce_pickle5(*_dynamic_class_reduce(obj), obj=obj) - else: - super().save_global(obj, name=name) + listitems=listitems, + dictitems=dictitems, + obj=obj, + ) + # backport of the Python 3.8 state_setter pickle operations + save(state_setter) + save(obj) # simple BINGET opcode as obj is already memoized. + save(state) + write(pickle.TUPLE2) + # Trigger a state_setter(obj, state) function call. + write(pickle.REDUCE) + # The purpose of state_setter is to carry-out an + # inplace modification of obj. We do not care about what the + # method might return, so its output is eventually removed from + # the stack. + write(pickle.POP) + + def save_global(self, obj, name=None, pack=struct.pack): + """Main dispatch method. + + The name of this method is somewhat misleading: all types get + dispatched here. + """ + if obj is type(None): # noqa + return self.save_reduce(type, (None, ), obj=obj) + elif obj is type(Ellipsis): + return self.save_reduce(type, (Ellipsis, ), obj=obj) + elif obj is type(NotImplemented): + return self.save_reduce(type, (NotImplemented, ), obj=obj) + elif obj in _BUILTIN_TYPE_NAMES: + return self.save_reduce( + _builtin_type, (_BUILTIN_TYPE_NAMES[obj], ), obj=obj) + + if name is not None: + super().save_global(obj, name=name) + elif not _should_pickle_by_reference(obj, name=name): + self._save_reduce_pickle5(*_dynamic_class_reduce(obj), obj=obj) + else: + super().save_global(obj, name=name) - dispatch[type] = save_global + dispatch[type] = save_global - def save_function(self, obj, name=None): - """Registered with the dispatch to handle all function types. + def save_function(self, obj, name=None): + """Registered with the dispatch to handle all function types. - Determines what kind of function obj is (e.g. lambda, defined at - interactive prompt, etc) and handles the pickling appropriately. - """ - if _should_pickle_by_reference(obj, name=name): - return super().save_global(obj, name=name) - elif PYPY and isinstance(obj.__code__, builtin_code_type): - return self.save_pypy_builtin_func(obj) - else: - return self._save_reduce_pickle5( - *self._dynamic_function_reduce(obj), obj=obj) - - def save_pypy_builtin_func(self, obj): - """Save pypy equivalent of builtin functions. - - PyPy does not have the concept of builtin-functions. Instead, - builtin-functions are simple function instances, but with a - builtin-code attribute. - Most of the time, builtin functions should be pickled by attribute. - But PyPy has flaky support for __qualname__, so some builtin - functions such as float.__new__ will be classified as dynamic. For - this reason only, we created this special routine. Because - builtin-functions are not expected to have closure or globals, - there is no additional hack (compared the one already implemented - in pickle) to protect ourselves from reference cycles. A simple - (reconstructor, newargs, obj.__dict__) tuple is save_reduced. Note - also that PyPy improved their support for __qualname__ in v3.6, so - this routing should be removed when cloudpickle supports only PyPy - 3.6 and later. - """ - rv = ( - types.FunctionType, - (obj.__code__, {}, obj.__name__, obj.__defaults__, obj.__closure__), - obj.__dict__, - ) - self.save_reduce(*rv, obj=obj) + Determines what kind of function obj is (e.g. lambda, defined at + interactive prompt, etc) and handles the pickling appropriately. + """ + if _should_pickle_by_reference(obj, name=name): + return super().save_global(obj, name=name) + elif PYPY and isinstance(obj.__code__, builtin_code_type): + return self.save_pypy_builtin_func(obj) + else: + return self._save_reduce_pickle5( + *self._dynamic_function_reduce(obj), obj=obj) + + def save_pypy_builtin_func(self, obj): + """Save pypy equivalent of builtin functions. + + PyPy does not have the concept of builtin-functions. Instead, + builtin-functions are simple function instances, but with a + builtin-code attribute. + Most of the time, builtin functions should be pickled by attribute. + But PyPy has flaky support for __qualname__, so some builtin + functions such as float.__new__ will be classified as dynamic. For + this reason only, we created this special routine. Because + builtin-functions are not expected to have closure or globals, + there is no additional hack (compared the one already implemented + in pickle) to protect ourselves from reference cycles. A simple + (reconstructor, newargs, obj.__dict__) tuple is save_reduced. Note + also that PyPy improved their support for __qualname__ in v3.6, so + this routing should be removed when cloudpickle supports only PyPy + 3.6 and later. + """ + rv = ( + types.FunctionType, + (obj.__code__, {}, obj.__name__, obj.__defaults__, obj.__closure__), + obj.__dict__, + ) + self.save_reduce(*rv, obj=obj) - dispatch[types.FunctionType] = save_function + dispatch[types.FunctionType] = save_function # Shorthands similar to pickle.dump/pickle.dumps @@ -1533,5 +1568,12 @@ def dumps(obj, protocol=None, buffer_callback=None): # Include pickles unloading functions in this namespace for convenience. load, loads = pickle.load, pickle.loads +# Use the fast pickler extending the C implementation of pickler if it is +# available. +if PYPY: + Pickler = PurePythonPickler +else: + Pickler = FastPickler + # Backward compat alias. CloudPickler = Pickler diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler.py b/sdks/python/apache_beam/internal/cloudpickle_pickler.py index da069954754b..3cd592f21d83 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler.py @@ -111,7 +111,7 @@ def dumps(o, enable_trace=True, use_zlib=False) -> bytes: """For internal use only; no backwards-compatibility guarantees.""" with _pickle_lock: with io.BytesIO() as file: - pickler = cloudpickle.CloudPickler(file) + pickler = cloudpickle.PurePythonPickler(file) try: pickler.dispatch_table[type(flags.FLAGS)] = _pickle_absl_flags except NameError: