10.3.3. Dispatching a set of tasks and collecting the results
Outsourcing a set of tasks and collecting the results is simple. You don’t have to
do any calls to cOS.Spawn()
or cOS.Yield()
. Instead you can use
cOS.dispatch()
.
A job is evaluated by the jobProcessor()
function defined in
file funclib.py
in folder demo/parallel/cooperative/.
def jobProcessor(value):
# Process a job (value), return result (multiply value by 2)
hostID=MPI.hostID()
taskID=MPI.taskID()
print("Processing "+str(value)+ " on "+ str(hostID)+" "+str(taskID))
return 2*value
Every job is specified in the form of a tuple where the first entry is the function and the second entry is the list of the positional arguments. The jobs are generated by a generator that is defined with a simple generator expression.
Dispatching is asynchronous. This means that if you have less processors than jobs a processor will receive a new job as soon as the previous one is finished. This is repeated until all jobs are processed.
File 03-dispatch.py in folder demo/parallel/cooperative/
# Dispatches a fixed number of tasks to computational nodes
# Run it by typing
# mpirun -n 4 python3 03-dispatch.py
# If you run it with
# python3 03-dispatch.py
# only the local processor will be used.
#
# Under Windows you should use Microsoft MPI. mpiexec and python should be in
# the system path.
#
# mpiexec /np <number of processes> python 03-dispatch.py
from pyopus.parallel.cooperative import cOS
from pyopus.parallel.mpi import MPI
from funclib import jobProcessor
if __name__=='__main__':
# Set up MPI
cOS.setVM(MPI())
# This generator produces 100 jobs which are tuples of the form
# (function, args)
jobGen=((jobProcessor, [value]) for value in range(100))
# Dispatch jobs and collect results
results=cOS.dispatch(jobList=jobGen, remote=True)
# Results are put in the list in the ame order as the jobs are generated by jobGen
print("Results: "+str(results))
# Finish, need to do this if MPI is used
cOS.finalize()