Source code for parallelize.parallelize

import math
import time
import functools
import tempfile
import os
    import cPickle as pickle
    import pickle
import gc
from pathlib import Path
from multiprocessing import Process, Queue
from typing import Callable, List, Tuple, Any

[docs]def time_function(func: Callable) -> Callable: """ Use as a decorator to time a function. Args: func (Callable): Function to be timed Returns: Callable: Wrapped function """ @functools.wraps(func) def wrapper_time_function(*args, **kwargs): start_time = time.perf_counter() value = func(*args, **kwargs) end_time = time.perf_counter() run_time = end_time - start_time print( "Completed in {} in {:.4f} secs".format( repr(func.__name__), run_time ) ) return value return wrapper_time_function
[docs]@time_function def parallel( func: Callable, iterable: list, n_jobs: int=2, write_to_file: bool=False, args: tuple=(), kwargs: dict={} ) -> Any: """ Parallelises a function that operators on an iterable by splitting the iterable into a suitable divisions, and using multiprocessing module to spawn a new process for each division. Args: func (Callable): Function to be parallelised iterable (list): List which `func` will operate over n_jobs (int): Number of splits to make, or number of CPU cores to use write_to_file (bool): (Optional) Default False. Set to True if the output from the each file is a large object, in which case writing to disk can help speed up process in recovering data in main process. args (tuple): Arguments to pass to `func` kwargs (dict): Keyword arguments to pass to `func` Raises: TypeError: Raised if iterable is not a real iterable Returns: Any: The output of the original function or no output. """ if not isinstance(iterable, list): raise TypeError('Iterable must be a list') divisions = make_divisions(iterable, n_jobs) queue = Queue() processes = [] for index, i in enumerate(range(len(divisions) - 1)): start, end = divisions[i], divisions[i + 1] processes.append(Process( target=capture_output, args=( func, iterable[start:end], index, queue, write_to_file, *args ), kwargs=kwargs )) for process in processes: process.start() result_output = [queue.get() for process in processes] for process in processes: process.join() if write_to_file: output = retrieve_output(result_output) else: if all([isinstance(item[1], list) for item in result_output]): output = merge_results(result_output) else: _output = sorted(result_output, key=lambda x: x[0]) output = [item[1] for item in _output] return output
[docs]@time_function def merge_results(results: List[list]) -> list: """ Merges a list of lists into a single list. Args: results (List[list]): List of lists to be merged. Returns: list: Flattened list of objects. """ _sorted_results = sorted(results, key=lambda x: x[0]) sorted_results = [result[1] for result in _sorted_results] return sum(sorted_results, [])
[docs]def capture_output( func: Callable, iterable: list, index: int, queue: Queue, write_to_file: bool=False, *args, **kwargs ) -> None: """ Captures the output of the function to be parallelised. If specified, output is saved to a temporary file, which is later read in by the master process, to be ultimately merged into one object. Args: func (Callable): Function which is to be parallelised. iterable (list): List which `func` will operate over. index (int): Index number of current process. Used to sort order of results, since some processes may finish earlier than others. queue (Queue): Queue object to pass filepaths back to master process. write_to_file (bool): (Optional) Default False. Set to True if the output from the each file is a large object, in which case writing to disk can help speed up process in recovering data in main process. Returns: None: No output returned. """ output = func(iterable, *args, **kwargs) if write_to_file: filepath = write_output_to_temp_file(output) queue.put((index, filepath)) elif not write_to_file: queue.put((index, output))
[docs]@time_function def write_output_to_temp_file(output: list) -> Path: """ Writes the output from the function being parallelised, and saves it to a temporary file using `pickle`. Args: output (list): The output from the parallelised function. Returns: Path: File path to the temporary file. """ fd, path = tempfile.mkstemp() gc.disable() pickle.dump(output, os.fdopen(fd, 'wb'), protocol=-1) gc.enable() return Path(path)
[docs]@time_function def retrieve_output(file_paths: List[Tuple[int, Path]]) -> list: """ Retrieves the outputs from the parallelised function which was saved to a temporary `pickle` file. Args: file_paths (List[Tuple[int, Path]]): List of file paths corresponding to the temporary files written by `pickle`. Returns: list: List of outputs from the parallelised function. """ output = [] for index, path in file_paths: _output = pickle.load(open(path, 'rb')) os.remove(path) output.append((index, _output)) if all([isinstance(item, list) for item in output]): output = merge_results(output) return output
[docs]def make_divisions(iterable: list, n_splits: int) -> list: """ Generates indices to divide iterable into equal portions. Args: iterable (list): Iterable to divide. n_splits (int): Number of splits to make. Raises: ValueError: Raised if number of splits is more than the number of items in the iterable. Returns: list: The list of indices """ if n_splits >= len(iterable): raise ValueError( 'Number of splits must not be greater than length of iterable' ) length = len(iterable) _unit = length / n_splits unit = int(_unit) divisions = [0] for i in range(1, n_splits + 1): divisions.append(unit * i) divisions[-1] = len(iterable) return divisions