Module cnvpytor.pool
cnvpytor.pool
Multiprocessing pool functions
Source code
""" cnvpytor.pool
Multiprocessing pool functions
"""
import multiprocessing
import logging
_logger = logging.getLogger("cnvpytor.pool")
def _fun(f, q_in, q_out):
while True:
i, x = q_in.get()
if i is None:
break
q_out.put((i, f(x)))
def parmap(f, x_arg, cores=multiprocessing.cpu_count(), info=True):
"""
Calculates list(map(f,x_args)) using multiprocessing module.
Parameters
----------
f : callable
x_arg : list of function arguments
cores : maximal number of cores
Returns
-------
results : list of values
List of results.
"""
cores = min(cores, multiprocessing.cpu_count())
if info:
_logger.info("Parallel processing using %d cores" % cores)
else:
_logger.debug("Parallel processing using %d cores" % cores)
q_in = multiprocessing.Queue(1)
q_out = multiprocessing.Queue()
proc = [multiprocessing.Process(target=_fun, args=(f, q_in, q_out))
for _ in range(cores)]
for p in proc:
p.daemon = True
p.start()
sent = [q_in.put((i, x)) for i, x in enumerate(x_arg)]
[q_in.put((None, None)) for _ in range(cores)]
res = [q_out.get() for _ in range(len(sent))]
[p.join() for p in proc]
return [x for i, x in sorted(res)]
Functions
def parmap(f, x_arg, cores=4, info=True)
-
Calculates list(map(f,x_args)) using multiprocessing module.
Parameters
f
:callable
x_arg
:list
offunction
arguments
cores
:maximal
number
ofcores
Returns
results
:list
ofvalues
- List of results.
Source code
def parmap(f, x_arg, cores=multiprocessing.cpu_count(), info=True): """ Calculates list(map(f,x_args)) using multiprocessing module. Parameters ---------- f : callable x_arg : list of function arguments cores : maximal number of cores Returns ------- results : list of values List of results. """ cores = min(cores, multiprocessing.cpu_count()) if info: _logger.info("Parallel processing using %d cores" % cores) else: _logger.debug("Parallel processing using %d cores" % cores) q_in = multiprocessing.Queue(1) q_out = multiprocessing.Queue() proc = [multiprocessing.Process(target=_fun, args=(f, q_in, q_out)) for _ in range(cores)] for p in proc: p.daemon = True p.start() sent = [q_in.put((i, x)) for i, x in enumerate(x_arg)] [q_in.put((None, None)) for _ in range(cores)] res = [q_out.get() for _ in range(len(sent))] [p.join() for p in proc] return [x for i, x in sorted(res)]