API Reference¶
The loky
module manages a pool of worker that can be re-used across time.
It provides a robust and dynamic implementation os the
ProcessPoolExecutor
and a function get_reusable_executor()
which
hide the pool management under the hood.
- loky.get_reusable_executor(max_workers=None, context=None, timeout=10, kill_workers=False, reuse='auto', job_reducers=None, result_reducers=None, initializer=None, initargs=(), env=None)[source]¶
Return the current ReusableExectutor instance.
Start a new instance if it has not been started already or if the previous instance was left in a broken state.
If the previous instance does not have the requested number of workers, the executor is dynamically resized to adjust the number of workers prior to returning.
Reusing a singleton instance spares the overhead of starting new worker processes and importing common python packages each time.
max_workers
controls the maximum number of tasks that can be running in parallel in worker processes. By default this is set to the number of CPUs on the host.Setting
timeout
(in seconds) makes idle workers automatically shutdown so as to release system resources. New workers are respawn upon submission of new tasks so thatmax_workers
are available to accept the newly submitted tasks. Settingtimeout
to around 100 times the time required to spawn new processes and import packages in them (on the order of 100ms) ensures that the overhead of spawning workers is negligible.Setting
kill_workers=True
makes it possible to forcibly interrupt previously spawned jobs to get a new instance of the reusable executor with new constructor argument values.The
job_reducers
andresult_reducers
are used to customize the pickling of tasks and results send to the executor.When provided, the
initializer
is run first in newly spawned processes with argumentinitargs
.The environment variable in the child process are a copy of the values in the main process. One can provide a dict
{ENV: VAL}
whereENV
andVAL
are string literals to overwrite the environment variableENV
in the child processes to valueVAL
. The environment variables are set in the children before any module is loaded. This only works with theloky
context.
- class loky.ProcessPoolExecutor(max_workers=None, job_reducers=None, result_reducers=None, timeout=None, context=None, initializer=None, initargs=(), env=None)[source]¶
Initializes a new ProcessPoolExecutor instance.
- Args:
- max_workers: int, optional (default: cpu_count())
The maximum number of processes that can be used to execute the given calls. If None or not given then as many worker processes will be created as the number of CPUs the current process can use.
- job_reducers, result_reducers: dict(type: reducer_func)
Custom reducer for pickling the jobs and the results from the Executor. If only job_reducers is provided, result_reducer will use the same reducers
- timeout: int, optional (default: None)
Idle workers exit after timeout seconds. If a new job is submitted after the timeout, the executor will start enough new Python processes to make sure the pool of workers is full.
- context: A multiprocessing context to launch the workers. This
object should provide SimpleQueue, Queue and Process.
initializer: An callable used to initialize worker processes. initargs: A tuple of arguments to pass to the initializer. env: A dict of environment variable to overwrite in the child
process. The environment variables are set before any module is loaded. Note that this only works with the loky context.
- map(fn, *iterables, **kwargs)[source]¶
Returns an iterator equivalent to map(fn, iter).
- Args:
- fn: A callable that will take as many arguments as there are
passed iterables.
- timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
- chunksize: If greater than one, the iterables will be chopped into
chunks of size chunksize and submitted to the process pool. If set to one, the items in the list will be sent one at a time.
- Returns:
An iterator equivalent to: map(func, *iterables) but the calls may be evaluated out-of-order.
- Raises:
- TimeoutError: If the entire result iterator could not be generated
before the given timeout.
Exception: If fn(*args) raises for any values.
- shutdown(wait=True, kill_workers=False)[source]¶
Clean-up the resources associated with the Executor.
It is safe to call this method several times. Otherwise, no other methods can be called after this one.
- Args:
- wait: If True then shutdown will not return until all running
futures have finished executing and the resources used by the executor have been reclaimed.
- cancel_futures: If True then shutdown will cancel all pending
futures. Futures that are completed or running will not be cancelled.
Task & results serialization¶
To share function definition across multiple python processes, it is necessary to rely on a serialization protocol. The standard protocol in python is pickle
but its default implementation in the standard library has several limitations. For instance, it cannot serialize functions which are defined interactively or in the __main__
module.
To avoid this limitation, loky
relies on cloudpickle
when it is present. cloudpickle
is a fork of the pickle protocol which allows the serialization of a greater number of objects and it can be installed using pip install cloudpickle
. As this library is slower than the pickle
module in the standard library, by default, loky
uses it only to serialize objects which are detected to be in the __main__
module.
There are three ways to temper with the serialization in loky
:
Using the arguments
job_reducers
andresult_reducers
, it is possible to register custom reducers for the serialization process.Setting the variable
LOKY_PICKLER
to an available and valid serialization module. This module must present a validPickler
object. Setting the environment variableLOKY_PICKLER=cloudpickle
will forceloky
to serialize everything withcloudpickle
instead of just serializing the object detected to be in the__main__
module.Finally, it is possible to wrap an unpicklable object using the
loky.wrap_non_picklable_objects
decorator. In this case, all other objects are serialized as in the default behavior and the wrapped object is pickled throughcloudpickle
.
The benefits and drawbacks of each method are highlighted in this example.
- loky.wrap_non_picklable_objects(obj, keep_wrapper=True)[source]¶
Wrapper for non-picklable object to use cloudpickle to serialize them.
Note that this wrapper tends to slow down the serialization process as it is done with cloudpickle which is typically slower compared to pickle. The proper way to solve serialization issues is to avoid defining functions and objects in the main scripts and to implement __reduce__ functions for complex classes.
Processes start methods in loky
¶
The API in loky
provides a set_start_method()
function to set the default start_method
, which controls the way Process
are started. The available methods are {'loky'
, 'loky_int_main'
, 'spawn'
}. On unix, the start methods {'fork'
, 'forkserver'
} are also available.
Note that loky
is not compatible with multiprocessing.set_start_method()
function. The default start method needs to be set with the provided function to ensure a proper behavior.
Protection against memory leaks¶
The memory size of long running worker processes can increase indefinitely if a memory leak is created. This can result in processes being shut down by the OS if those leaks are not resolved. To prevent it, loky provides leak detection, memory cleanups, and workers shutdown.
If psutil
is installed, each worker periodically [1] checks its
memory usage after it completes its task. If the usage is found to be
unusual [2], an additional gc.collect()
event is triggered to remove
objects with potential cyclic references.
If even after that, the memory usage of a process worker remains too high,
it will shut down safely, and a fresh process will be automatically spawned by
the executor.
If psutil
is not installed, there is no easy way to monitor worker
processes memory usage. gc.collect()
events will still be called
periodially [1] inside each workers, but there is no guarantee that a leak is
not happening.
Footnotes