Serialization of un-picklable objectsΒΆ

This example highlights the options for tempering with loky serialization process.

# Code source: Thomas Moreau
# License: BSD 3 clause

import sys
import time
import traceback
from loky import set_loky_pickler
from loky import get_reusable_executor
from loky import wrap_non_picklable_objects

First, define functions which cannot be pickled with the standard pickle protocol. They cannot be serialized with pickle because they are defined in the __main__ module. They can however be serialized with cloudpickle.

def func_async(i, *args):
    return 2 * i

With the default behavior, loky is to use cloudpickle to serialize the objects that are sent to the workers.

executor = get_reusable_executor(max_workers=1)
print(executor.submit(func_async, 21).result())
42

For most use-cases, using cloudpickle` is efficient enough. However, this solution can be very slow to serialize large python objects, such as dict or list, compared to the standard pickle serialization.

# We have to pass an extra argument with a large list (or another large python
# object).
large_list = list(range(1000000))

t_start = time.time()
executor = get_reusable_executor(max_workers=1)
executor.submit(func_async, 21, large_list).result()
print(f"With cloudpickle serialization: {time.time() - t_start:.3f}s")
With cloudpickle serialization: 0.072s

To mitigate this, it is possible to fully rely on pickle to serialize all communications between the main process and the workers. This can be done with an environment variable LOKY_PICKLER=pickle set before the script is launched, or with the switch set_loky_pickler provided in the loky API.

# Now set the `loky_pickler` to use the pickle serialization from stdlib. Here,
# we do not pass the desired function ``call_function`` as it is not picklable
# but it is replaced by ``id`` for demonstration purposes.
set_loky_pickler('pickle')
t_start = time.time()
executor = get_reusable_executor(max_workers=1)
executor.submit(id, large_list).result()
print(f"With pickle serialization: {time.time() - t_start:.3f}s")
With pickle serialization: 0.075s

However, the function and objects defined in __main__ are not serializable anymore using pickle and it is not possible to call func_async using this pickler.

try:
    executor = get_reusable_executor(max_workers=1)
    executor.submit(func_async, 21, large_list).result()
except Exception:
    traceback.print_exc(file=sys.stdout)
loky.process_executor._RemoteTraceback:
"""
Traceback (most recent call last):
  File "/home/docs/checkouts/readthedocs.org/user_builds/loky/envs/stable/lib/python3.8/site-packages/loky-3.3.0-py3.8.egg/loky/process_executor.py", line 391, in _process_worker
    call_item = call_queue.get(block=True, timeout=timeout)
  File "/home/docs/.pyenv/versions/3.8.6/lib/python3.8/multiprocessing/queues.py", line 116, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'func_async' on <module 'loky.backend.popen_loky_posix' from '/home/docs/checkouts/readthedocs.org/user_builds/loky/envs/stable/lib/python3.8/site-packages/loky-3.3.0-py3.8.egg/loky/backend/popen_loky_posix.py'>
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/docs/checkouts/readthedocs.org/user_builds/loky/checkouts/stable/examples/cloudpickle_wrapper.py", line 84, in <module>
    executor.submit(func_async, 21, large_list).result()
  File "/home/docs/.pyenv/versions/3.8.6/lib/python3.8/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/home/docs/.pyenv/versions/3.8.6/lib/python3.8/concurrent/futures/_base.py", line 388, in __get_result
    raise self._exception
loky.process_executor.BrokenProcessPool: A task has failed to un-serialize. Please ensure that the arguments of the function are all picklable.

loky provides a wrapper function wrap_non_picklable_objects() to wrap the non-picklable function and indicate to the serialization process that this specific function should be serialized using cloudpickle. This changes the serialization behavior only for this function and keeps using pickle for all other objects. The drawback of this solution is that it modifies the object. This should not cause many issues with functions but can have side effects with object instances.

@wrap_non_picklable_objects
def func_async_wrapped(i, *args):
    return 2 * i


t_start = time.time()
executor = get_reusable_executor(max_workers=1)
executor.submit(func_async_wrapped, 21, large_list).result()
print(f"With default and wrapper: {time.time() - t_start:.3f}s")
With default and wrapper: 0.390s

The same wrapper can also be used for non-picklable classes. Note that the side effects of wrap_non_picklable_objects() on objects can break magic methods such as __add__ and can mess up the isinstance and issubclass functions. Some improvements will be considered if use-cases are reported.

Total running time of the script: ( 0 minutes 0.660 seconds)

Gallery generated by Sphinx-Gallery