Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdks/python/apache_beam/internal/cloudpickle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"__version__",
"Pickler",
"CloudPickler",
"PurePythonPickler",
"dumps",
"loads",
"dump",
Expand Down
248 changes: 145 additions & 103 deletions sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -1205,7 +1205,14 @@ def _get_dataclass_field_type_sentinel(name):
return _DATACLASSE_FIELD_TYPE_SENTINELS[name]


class Pickler(pickle.Pickler):
class BaseCloudPickler:
"""Class for logic that is common between FastPickler and PurePythonPickler.
Cloudpickle provides two picklers: one extending the C implementation of
the CPython pickler and another extending the pure-Python pickler.
FastPickler and PurePythonPickler inherit from BaseCloudPickler and provide
BaseCloudPickler access to either the C or pure-Python pickler by
implementing the _super_pickler() method.
"""
# set of reducers defined and used by cloudpickle (private)
_dispatch_table = {}
_dispatch_table[classmethod] = _classmethod_reduce
Expand Down Expand Up @@ -1294,7 +1301,7 @@ def _function_getnewargs(self, func):

def dump(self, obj):
try:
return super().dump(obj)
return self._super_pickler().dump(obj)
except RuntimeError as e:
if len(e.args) > 0 and "recursion" in e.args[0]:
msg = "Could not pickle object as excessively deep recursion required."
Expand All @@ -1305,14 +1312,32 @@ def dump(self, obj):
def __init__(self, file, protocol=None, buffer_callback=None):
if protocol is None:
protocol = DEFAULT_PROTOCOL
super().__init__(file, protocol=protocol, buffer_callback=buffer_callback)
# map functions __globals__ attribute ids, to ensure that functions
# sharing the same global namespace at pickling time also share
# their global namespace at unpickling time.
self.globals_ref = {}
self.proto = int(protocol)
self._super_pickler().__init__(
file, protocol=protocol, buffer_callback=buffer_callback)

def _super_pickler(self):
"""Returns a proxy object for an instance of the pickler being extended."""
raise NotImplemented


if not PYPY:

class FastPickler(BaseCloudPickler, pickle.Pickler):
"""Fast pickler extending the C implementation of the CPython pickler.
The FastPickler is not available for PYPY and does not support
overriding how built-in types are pickled.
"""
def __init__(self, file, protocol=None, buffer_callback=None):
super().__init__(file, protocol, buffer_callback)

def _super_pickler(self):
return super(BaseCloudPickler, self)

if not PYPY:
# pickle.Pickler is the C implementation of the CPython pickler and
# therefore we rely on reduce_override method to customize the pickler
# behavior.
Expand All @@ -1328,7 +1353,7 @@ def __init__(self, file, protocol=None, buffer_callback=None):
# name was not a great choice given because it would collide with a
# similarly named attribute in the pure-Python `pickle._Pickler`
# implementation in the standard library.
dispatch = dispatch_table
dispatch = BaseCloudPickler.dispatch_table

# Implementation of the reducer_override callback, in order to
# efficiently serialize dynamic functions and classes by subclassing
Expand Down Expand Up @@ -1385,111 +1410,121 @@ def reducer_override(self, obj):
# dispatch_table
return NotImplemented

else:
# When reducer_override is not available, hack the pure-Python
# Pickler's types.FunctionType and type savers. Note: the type saver
# must override Pickler.save_global, because pickle.py contains a
# hard-coded call to save_global when pickling meta-classes.
dispatch = pickle.Pickler.dispatch.copy()

def _save_reduce_pickle5(
self,

class PurePythonPickler(BaseCloudPickler, pickle._Pickler):
"""Pure-Python pickler.
This picker supports overriding how built-in types are pickled.
"""
def __init__(self, file, protocol=None, buffer_callback=None):
super().__init__(file, protocol, buffer_callback)

def _super_pickler(self):
return super(BaseCloudPickler, self)

# When reducer_override is not available, hack the pure-Python
# Pickler's types.FunctionType and type savers. Note: the type saver
# must override Pickler.save_global, because pickle.py contains a
# hard-coded call to save_global when pickling meta-classes.
dispatch = pickle._Pickler.dispatch.copy()

def _save_reduce_pickle5(
self,
func,
args,
state=None,
listitems=None,
dictitems=None,
state_setter=None,
obj=None,
):
save = self.save
write = self.write
self.save_reduce(
func,
args,
state=None,
listitems=None,
dictitems=None,
state_setter=None,
obj=None,
):
save = self.save
write = self.write
self.save_reduce(
func,
args,
state=None,
listitems=listitems,
dictitems=dictitems,
obj=obj,
)
# backport of the Python 3.8 state_setter pickle operations
save(state_setter)
save(obj) # simple BINGET opcode as obj is already memoized.
save(state)
write(pickle.TUPLE2)
# Trigger a state_setter(obj, state) function call.
write(pickle.REDUCE)
# The purpose of state_setter is to carry-out an
# inplace modification of obj. We do not care about what the
# method might return, so its output is eventually removed from
# the stack.
write(pickle.POP)

def save_global(self, obj, name=None, pack=struct.pack):
"""Main dispatch method.

The name of this method is somewhat misleading: all types get
dispatched here.
"""
if obj is type(None): # noqa
return self.save_reduce(type, (None, ), obj=obj)
elif obj is type(Ellipsis):
return self.save_reduce(type, (Ellipsis, ), obj=obj)
elif obj is type(NotImplemented):
return self.save_reduce(type, (NotImplemented, ), obj=obj)
elif obj in _BUILTIN_TYPE_NAMES:
return self.save_reduce(
_builtin_type, (_BUILTIN_TYPE_NAMES[obj], ), obj=obj)

if name is not None:
super().save_global(obj, name=name)
elif not _should_pickle_by_reference(obj, name=name):
self._save_reduce_pickle5(*_dynamic_class_reduce(obj), obj=obj)
else:
super().save_global(obj, name=name)
listitems=listitems,
dictitems=dictitems,
obj=obj,
)
# backport of the Python 3.8 state_setter pickle operations
save(state_setter)
save(obj) # simple BINGET opcode as obj is already memoized.
save(state)
write(pickle.TUPLE2)
# Trigger a state_setter(obj, state) function call.
write(pickle.REDUCE)
# The purpose of state_setter is to carry-out an
# inplace modification of obj. We do not care about what the
# method might return, so its output is eventually removed from
# the stack.
write(pickle.POP)

def save_global(self, obj, name=None, pack=struct.pack):
"""Main dispatch method.

The name of this method is somewhat misleading: all types get
dispatched here.
"""
if obj is type(None): # noqa
return self.save_reduce(type, (None, ), obj=obj)
elif obj is type(Ellipsis):
return self.save_reduce(type, (Ellipsis, ), obj=obj)
elif obj is type(NotImplemented):
return self.save_reduce(type, (NotImplemented, ), obj=obj)
elif obj in _BUILTIN_TYPE_NAMES:
return self.save_reduce(
_builtin_type, (_BUILTIN_TYPE_NAMES[obj], ), obj=obj)

if name is not None:
super().save_global(obj, name=name)
elif not _should_pickle_by_reference(obj, name=name):
self._save_reduce_pickle5(*_dynamic_class_reduce(obj), obj=obj)
else:
super().save_global(obj, name=name)

dispatch[type] = save_global
dispatch[type] = save_global

def save_function(self, obj, name=None):
"""Registered with the dispatch to handle all function types.
def save_function(self, obj, name=None):
"""Registered with the dispatch to handle all function types.

Determines what kind of function obj is (e.g. lambda, defined at
interactive prompt, etc) and handles the pickling appropriately.
"""
if _should_pickle_by_reference(obj, name=name):
return super().save_global(obj, name=name)
elif PYPY and isinstance(obj.__code__, builtin_code_type):
return self.save_pypy_builtin_func(obj)
else:
return self._save_reduce_pickle5(
*self._dynamic_function_reduce(obj), obj=obj)

def save_pypy_builtin_func(self, obj):
"""Save pypy equivalent of builtin functions.

PyPy does not have the concept of builtin-functions. Instead,
builtin-functions are simple function instances, but with a
builtin-code attribute.
Most of the time, builtin functions should be pickled by attribute.
But PyPy has flaky support for __qualname__, so some builtin
functions such as float.__new__ will be classified as dynamic. For
this reason only, we created this special routine. Because
builtin-functions are not expected to have closure or globals,
there is no additional hack (compared the one already implemented
in pickle) to protect ourselves from reference cycles. A simple
(reconstructor, newargs, obj.__dict__) tuple is save_reduced. Note
also that PyPy improved their support for __qualname__ in v3.6, so
this routing should be removed when cloudpickle supports only PyPy
3.6 and later.
"""
rv = (
types.FunctionType,
(obj.__code__, {}, obj.__name__, obj.__defaults__, obj.__closure__),
obj.__dict__,
)
self.save_reduce(*rv, obj=obj)
Determines what kind of function obj is (e.g. lambda, defined at
interactive prompt, etc) and handles the pickling appropriately.
"""
if _should_pickle_by_reference(obj, name=name):
return super().save_global(obj, name=name)
elif PYPY and isinstance(obj.__code__, builtin_code_type):
return self.save_pypy_builtin_func(obj)
else:
return self._save_reduce_pickle5(
*self._dynamic_function_reduce(obj), obj=obj)

def save_pypy_builtin_func(self, obj):
"""Save pypy equivalent of builtin functions.

PyPy does not have the concept of builtin-functions. Instead,
builtin-functions are simple function instances, but with a
builtin-code attribute.
Most of the time, builtin functions should be pickled by attribute.
But PyPy has flaky support for __qualname__, so some builtin
functions such as float.__new__ will be classified as dynamic. For
this reason only, we created this special routine. Because
builtin-functions are not expected to have closure or globals,
there is no additional hack (compared the one already implemented
in pickle) to protect ourselves from reference cycles. A simple
(reconstructor, newargs, obj.__dict__) tuple is save_reduced. Note
also that PyPy improved their support for __qualname__ in v3.6, so
this routing should be removed when cloudpickle supports only PyPy
3.6 and later.
"""
rv = (
types.FunctionType,
(obj.__code__, {}, obj.__name__, obj.__defaults__, obj.__closure__),
obj.__dict__,
)
self.save_reduce(*rv, obj=obj)

dispatch[types.FunctionType] = save_function
dispatch[types.FunctionType] = save_function


# Shorthands similar to pickle.dump/pickle.dumps
Expand Down Expand Up @@ -1533,5 +1568,12 @@ def dumps(obj, protocol=None, buffer_callback=None):
# Include pickles unloading functions in this namespace for convenience.
load, loads = pickle.load, pickle.loads

# Use the fast pickler extending the C implementation of pickler if it is
# available.
if PYPY:
Pickler = PurePythonPickler
else:
Pickler = FastPickler

# Backward compat alias.
CloudPickler = Pickler
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/internal/cloudpickle_pickler.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def dumps(o, enable_trace=True, use_zlib=False) -> bytes:
"""For internal use only; no backwards-compatibility guarantees."""
with _pickle_lock:
with io.BytesIO() as file:
pickler = cloudpickle.CloudPickler(file)
pickler = cloudpickle.PurePythonPickler(file)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline we probably want to have this default to the fast pickler, and add an option to use slow pickler.

try:
pickler.dispatch_table[type(flags.FLAGS)] = _pickle_absl_flags
except NameError:
Expand Down
Loading