MultiProcessingPool
Classes¶
MultiProcessingPool
¶
the based Multi Processing Pool Class to be used for lunching FabSim3 tasks.
Source code in base/MultiProcessingPool.py
class MultiProcessingPool:
"""
the based Multi Processing Pool Class to be used for lunching FabSim3
tasks.
"""
def __init__(self, PoolSize=1):
try:
set_start_method("fork")
except RuntimeError:
pass
# to be a little more stable : use one less process maximum
self.PoolSize = PoolSize if PoolSize < cpu_count() else cpu_count() - 1
self.Pool = Pool(
processes=self.PoolSize,
initializer=start_process,
initargs=(self.PoolSize,),
)
self.Pool_tasks = []
def add_task(self, func, func_args=dict(), callback_func=None):
"""
adds the task to create Pool process for execution
"""
# TODO: it would be better to collect the output results of
# each task within a callback function, instead of
# iterating over the Pool_tasks
try:
self.Pool_tasks.append(
self.Pool.apply_async(
func=func,
args=(func_args,),
callback=callback_func,
error_callback=error_callback,
)
)
except Exception as e:
self.Pool.close()
self.Pool.terminate()
self.Pool.join()
error_callback(e)
sys.exit(1)
def wait_for_tasks(self):
"""
wait until all tasks in the Pool are finished, then collect the output
tasks and return the outputs
"""
results = []
print("Waiting for tasks to be completed ...")
# tells the pool not to accept any new job
self.Pool.close()
# tells the pool to wait until all jobs finished then exit,
# effectively cleaning up the pool
self.Pool.join()
print("All tasks are finished ...")
results = [task.get() for task in self.Pool_tasks]
# make Pool_tasks empty list
self.Pool_tasks = []
# Flatten a list of lists
# source :
# https://stackoverflow.com/questions/952914/how-to-make-a-flat-list-out-of-list-of-lists
flatten_results = list(itertools.chain(*results))
return flatten_results
Methods¶
add_task(self, func, func_args={}, callback_func=None)
¶
adds the task to create Pool process for execution
Source code in base/MultiProcessingPool.py
def add_task(self, func, func_args=dict(), callback_func=None):
"""
adds the task to create Pool process for execution
"""
# TODO: it would be better to collect the output results of
# each task within a callback function, instead of
# iterating over the Pool_tasks
try:
self.Pool_tasks.append(
self.Pool.apply_async(
func=func,
args=(func_args,),
callback=callback_func,
error_callback=error_callback,
)
)
except Exception as e:
self.Pool.close()
self.Pool.terminate()
self.Pool.join()
error_callback(e)
sys.exit(1)
wait_for_tasks(self)
¶
wait until all tasks in the Pool are finished, then collect the output tasks and return the outputs
Source code in base/MultiProcessingPool.py
def wait_for_tasks(self):
"""
wait until all tasks in the Pool are finished, then collect the output
tasks and return the outputs
"""
results = []
print("Waiting for tasks to be completed ...")
# tells the pool not to accept any new job
self.Pool.close()
# tells the pool to wait until all jobs finished then exit,
# effectively cleaning up the pool
self.Pool.join()
print("All tasks are finished ...")
results = [task.get() for task in self.Pool_tasks]
# make Pool_tasks empty list
self.Pool_tasks = []
# Flatten a list of lists
# source :
# https://stackoverflow.com/questions/952914/how-to-make-a-flat-list-out-of-list-of-lists
flatten_results = list(itertools.chain(*results))
return flatten_results
Functions¶
error_callback(e)
¶
the error callback function attached to each process to check show the error message in case if the process call failed.
Source code in base/MultiProcessingPool.py
def error_callback(e):
"""
the error callback function attached to each process to check show the
error message in case if the process call failed.
"""
print(
"{} error_callback from process {}:{} {}"
"\nEXCEPTION TRACE:{}\n{}\n".format(
"=" * 10,
current_process().name,
Process().name,
"=" * 10,
type(e),
"".join(traceback.format_tb(e.__traceback__)),
)
)
start_process(PoolSize)
¶
the initializer function for multiprocessing Pool
Source code in base/MultiProcessingPool.py
def start_process(PoolSize):
"""
the initializer function for multiprocessing Pool
"""
print(
"[MultiProcessingPool] Starting Process child "
": Name = {} {} , PID = {}, parentPID = {} "
"Max PoolSize = {} requested PoolSize = {}".format(
current_process().name,
Process().name,
os.getpid(),
parent_id,
cpu_count() - 1,
PoolSize,
)
)
signal.signal(signal.SIGINT, signal.SIG_IGN)