10.3.5. Writing a custom dispatcher

This time we build a custom asynchronous job dispatcher. Again the function jobProcessor() is defined in file funclib.py in folder demo/parallel/cooperative/.

def jobProcessor(value):
	# Process a job (value), return result (multiply value by 2)
	hostID=MPI.hostID()
	taskID=MPI.taskID()
	
	print("Processing "+str(value)+ " on "+ str(hostID)+" "+str(taskID))
	return 2*value

File 05-asyncloop.py in folder demo/parallel/cooperative/

# This demo does the same as the dyndispatch demo, except that a
# custom dispatcher loop is used. This is how asynchronous parallel 
# optimization algorithms like DE and PSADE are implemented. 
#  mpirun -n 4 python3 05-asyncloop.py
#
# Under Windows you should use Microsoft MPI. mpiexec and python should be in 
# the system path. 
#
#   mpiexec /np <number of processes> python 05-asyncloop.py

from pyopus.parallel.cooperative import cOS
from pyopus.parallel.mpi import MPI
from funclib import jobProcessor

# Result at which we stop
stopAtResult=150

# Minimal and maximal number of parallel tasks
# The maximal number of parallel tasks can be infinite (set maxTasks to None)
minTasks=1
maxTasks=1000

if __name__=='__main__':
	# Set up MPI
	cOS.setVM(MPI())

	# Thsi list will hold the jobs (values that are doubled)
	jobs=[]

	# This list will be filled with results
	results=[]

	# Stop the loop
	stop=False

	# Running task status storage
	running={} 

	# Job index of next job
	atJob=0

	# Main loop
	# Run until stop flag set and all tasks are joined
	while not (stop and len(running)==0):
		# Spawn tasks if slots are available and maximal number of tasks is not reached
		# Spawn one task if there are no tasks
		while (
			# Spawn 
			not stop and (
				# no tasks running, need at least one task, spawn
				len(running)==0 or 
				# too few slaves in a parallel environment (number of slots > 0), 
				# force spawn regardless of the number of free slots
				(cOS.slots()>0 and len(running)<minTasks) or 
				# free slots (with joined tasks) available and less than maximal slaves, spawn
				(cOS.freeSlots()-cOS.finishedTasks()>0 and (maxTasks is None or len(running)<maxTasks)) 
			)
		):
			# Job (value to double)
			job=atJob
			
			# Spawn a global search task
			tid=cOS.Spawn(jobProcessor, args=[job], remote=True, block=True)
			
			print("Spawned task", tid, "for job", job)
			
			# Store the job
			running[tid]={
				'index': atJob, 
				'job': job, 
			}
			
			# Go to next job
			atJob+=1
			
		# Join jobs. Note that Join always joins one job at a time. 
		tid,result = cOS.Join(block=True).popitem()
		
		print("Received", result, "from", tid)
		
		# Get status and remove it from the dictionary of running jobs
		status=running[tid]
		del running[tid]
		index=status['index']
		
		# Make space for the result
		if index>=len(results):
			results.extend([None]*(index+1-len(results)))
			
		
		# Store result
		results[index]=result
		
		# Check if we need to stop
		if result>=stopAtResult and not stop:
			stop=True
			
			print("Spawning no more tasks")
			
	print("Results: "+str(results))

	# Finish, need to do this if MPI is used
	cOS.finalize()