Skip to content

MultiProcessingPool

Classes

MultiProcessingPool

the based Multi Processing Pool Class to be used for lunching FabSim3 tasks.

Methods

add_task(self, func, func_args={}, callback_func=None)

adds the task to create Pool process for execution

Source code in base/MultiProcessingPool.py
def add_task(self, func, func_args=dict(), callback_func=None):
    """
    adds the task to create Pool process for execution
    """
    # TODO: it would be better to collect the output results of
    #     each task within a callback function, instead of
    #     iterating over the Pool_tasks
    try:
        self.Pool_tasks.append(
            self.Pool.apply_async(
                func=func,
                args=(func_args,),
                callback=callback_func,
                error_callback=error_callback,
            )
        )
    except Exception as e:
        self.Pool.close()
        self.Pool.terminate()
        self.Pool.join()
        error_callback(e)
        sys.exit(1)
wait_for_tasks(self)

wait until all tasks in the Pool are finished, then collect the output tasks and return the outputs

Source code in base/MultiProcessingPool.py
def wait_for_tasks(self):
    """
    wait until all tasks in the Pool are finished, then collect the output
    tasks and return the outputs
    """
    results = []
    print("Waiting for tasks to be completed ...")
    # tells the pool not to accept any new job
    self.Pool.close()
    # tells the pool to wait until all jobs finished then exit,
    # effectively cleaning up the pool
    self.Pool.join()

    print("All tasks are finished ...")

    results = [task.get() for task in self.Pool_tasks]

    # make Pool_tasks empty list
    self.Pool_tasks = []

    # Flatten a list of lists
    # source :
    # https://stackoverflow.com/questions/952914/how-to-make-a-flat-list-out-of-list-of-lists
    flatten_results = list(itertools.chain(*results))
    return flatten_results

Functions

error_callback(e)

the error callback function attached to each process to check show the error message in case if the process call failed.

Source code in base/MultiProcessingPool.py
def error_callback(e):
    """
    the error callback function attached to each process to check show the
    error message in case if the process call failed.
    """
    print(
        "{} error_callback from process {}:{} {}"
        "\nEXCEPTION TRACE:{}\n{}\n".format(
            "=" * 10,
            current_process().name,
            Process().name,
            "=" * 10,
            type(e),
            "".join(traceback.format_tb(e.__traceback__)),
        )
    )

start_process(PoolSize)

the initializer function for multiprocessing Pool

Source code in base/MultiProcessingPool.py
def start_process(PoolSize):
    """
    the initializer function for multiprocessing Pool
    """
    print(
        "[MultiProcessingPool] Starting Process child "
        ": Name = {} {} , PID = {},  parentPID = {} "
        "Max PoolSize = {} requested PoolSize = {}".format(
            current_process().name,
            Process().name,
            os.getpid(),
            parent_id,
            cpu_count() - 1,
            PoolSize,
        )
    )