Skip to content

MultiProcessingPool

Classes

MultiProcessingPool

the based Multi Processing Pool Class to be used for lunching FabSim3 tasks.

Source code in fabsim/base/MultiProcessingPool.py
class MultiProcessingPool:
    """
    the based Multi Processing Pool Class to be used for lunching FabSim3
    tasks.
    """

    def __init__(self, PoolSize=1):
        try:
            set_start_method("fork")
        except RuntimeError:
            pass
        # to be a little more stable : use one less process maximum
        self.PoolSize = PoolSize if PoolSize < cpu_count() else cpu_count() - 1
        self.Pool = Pool(
            processes=self.PoolSize,
            initializer=start_process,
            initargs=(self.PoolSize,),
        )
        self.Pool_tasks = []

    def add_task(self, func, func_args=dict(), callback_func=None):
        """
        adds the task to create Pool process for execution
        """
        # TODO: it would be better to collect the output results of
        #     each task within a callback function, instead of
        #     iterating over the Pool_tasks
        try:
            self.Pool_tasks.append(
                self.Pool.apply_async(
                    func=func,
                    args=(func_args,),
                    callback=callback_func,
                    error_callback=error_callback,
                )
            )
        except Exception as e:
            self.Pool.close()
            self.Pool.terminate()
            self.Pool.join()
            error_callback(e)
            sys.exit(1)

    def wait_for_tasks(self):
        """
        wait until all tasks in the Pool are finished, then collect the output
        tasks and return the outputs
        """
        results = []
        print("Waiting for tasks to be completed ...")
        # tells the pool not to accept any new job
        self.Pool.close()
        # tells the pool to wait until all jobs finished then exit,
        # effectively cleaning up the pool
        self.Pool.join()

        print("All tasks are finished ...")

        results = [task.get() for task in self.Pool_tasks]

        # make Pool_tasks empty list
        self.Pool_tasks = []

        # Flatten a list of lists
        # source :
        # https://stackoverflow.com/questions/952914/how-to-make-a-flat-list-out-of-list-of-lists
        flatten_results = list(itertools.chain(*results))
        return flatten_results

Methods

add_task(self, func, func_args={}, callback_func=None)

adds the task to create Pool process for execution

Source code in fabsim/base/MultiProcessingPool.py
def add_task(self, func, func_args=dict(), callback_func=None):
    """
    adds the task to create Pool process for execution
    """
    # TODO: it would be better to collect the output results of
    #     each task within a callback function, instead of
    #     iterating over the Pool_tasks
    try:
        self.Pool_tasks.append(
            self.Pool.apply_async(
                func=func,
                args=(func_args,),
                callback=callback_func,
                error_callback=error_callback,
            )
        )
    except Exception as e:
        self.Pool.close()
        self.Pool.terminate()
        self.Pool.join()
        error_callback(e)
        sys.exit(1)
wait_for_tasks(self)

wait until all tasks in the Pool are finished, then collect the output tasks and return the outputs

Source code in fabsim/base/MultiProcessingPool.py
def wait_for_tasks(self):
    """
    wait until all tasks in the Pool are finished, then collect the output
    tasks and return the outputs
    """
    results = []
    print("Waiting for tasks to be completed ...")
    # tells the pool not to accept any new job
    self.Pool.close()
    # tells the pool to wait until all jobs finished then exit,
    # effectively cleaning up the pool
    self.Pool.join()

    print("All tasks are finished ...")

    results = [task.get() for task in self.Pool_tasks]

    # make Pool_tasks empty list
    self.Pool_tasks = []

    # Flatten a list of lists
    # source :
    # https://stackoverflow.com/questions/952914/how-to-make-a-flat-list-out-of-list-of-lists
    flatten_results = list(itertools.chain(*results))
    return flatten_results

Functions

error_callback(e)

the error callback function attached to each process to check show the error message in case if the process call failed.

Source code in fabsim/base/MultiProcessingPool.py
def error_callback(e):
    """
    the error callback function attached to each process to check show the
    error message in case if the process call failed.
    """
    print(
        "{} error_callback from process {}:{} {}"
        "\nEXCEPTION TRACE:{}\n{}\n".format(
            "=" * 10,
            current_process().name,
            Process().name,
            "=" * 10,
            type(e),
            "".join(traceback.format_tb(e.__traceback__)),
        )
    )

start_process(PoolSize)

the initializer function for multiprocessing Pool

Source code in fabsim/base/MultiProcessingPool.py
def start_process(PoolSize):
    """
    the initializer function for multiprocessing Pool
    """
    print(
        "[MultiProcessingPool] Starting Process child "
        ": Name = {} {} , PID = {},  parentPID = {} "
        "Max PoolSize = {} requested PoolSize = {}".format(
            current_process().name,
            Process().name,
            os.getpid(),
            parent_id,
            cpu_count() - 1,
            PoolSize,
        )
    )
    signal.signal(signal.SIGINT, signal.SIG_IGN)