10.3.2. Remote tasks (task outsourcing)

Suppose you want to outsource tasks to other processors in the system (either in the same physical machine or in a remote machine). First the task must be defined as a function that returns the task result. The function must be defined in a module so that it can be pickled and sent to a remote processor.

In this example the printMsgMPI() function is defined in file funclib.py in folder demo/parallel/cooperative/ It prints message msg n times along with the host and task ID assigned by the MPI subsystem.

def printMsgMPI(msg, n):
	# Same as previous, except that prints the host and the task before the message
	hostID=MPI.hostID()
	taskID=MPI.taskID()
	
	for ii in range(n):
		print("h="+str(hostID)+" t="+str(taskID)+": "+msg+" : "+str(ii))
		cOS.Yield()
	return n

The following Python program spawns two concurrent local tasks and two remote tasks. The actual CPU where a remote task will run is assigned by MPI.

File 02-remote.py in folder demo/parallel/cooperative/

# Outsources tasks, run this example as
#  mpirun -n 4 python3 02-remote.py
#
# Under Windows you should use Microsoft MPI. mpiexec and python should be in 
# the system path. 
#
#   mpiexec /np <number of processes> python 02-remote.py

from pyopus.parallel.cooperative import cOS
from pyopus.parallel.mpi import MPI
from funclib import printMsgMPI

if __name__=='__main__':
	# Set up MPI
	cOS.setVM(MPI())

	# Spawn two tasks (locally)
	tidA=cOS.Spawn(printMsgMPI, kwargs={'msg': 'Hello A', 'n': 10})
	tidB=cOS.Spawn(printMsgMPI, kwargs={'msg': 'Hello B', 'n': 20})

	# Spawn two remote tasks
	tidC=cOS.Spawn(printMsgMPI, kwargs={'msg': 'Hello C', 'n': 15}, remote=True)
	tidD=cOS.Spawn(printMsgMPI, kwargs={'msg': 'Hello D', 'n': 18}, remote=True)

	# IDs of running tasks
	running=set([tidA,tidB,tidC,tidD])

	# Wait for all tasks to finish
	while len(running)>0:
		# Wait for any task
		retval=cOS.Join()
		# Wait for tasks with specified IDs
		# retval=cOS.Join(running)
		
		# Remove IDs of finished tasks
		for tid in retval.keys():
			print("Task: "+str(tid)+" finished, return value: "+str(retval[tid]))
			running.remove(tid)

	# Cleanup and exit MPI, need to do this if MPI is used
	cOS.finalize()