10.3.6. Performing many differential evolution runs on a cluster of workstations

Suppose we want to evaluate te performance of differential evolution for all combinations of n population size and m test functions. Every function/population size must be run k times. This gives us k * l * m differential evolution runs which is quite a bit of work. Suppose we want to distribute this work to a bunch of computers.

First we must define a function that runs differential evolution for a given problem with a given population size. When a run is finished the function returns the best function value found and the function evaluation history.

File funclib.py in folder demo/parallel/desweep/

# A function that executes a run of given function with given population size. 
# Must be in a module so it can be pickled and sent to a task. 

from pyopus.optimizer.de import DifferentialEvolution
from pyopus.optimizer.base import CostCollector
from numpy import array, zeros, random
from pyopus.parallel.mpi import MPI

def deRun(prob, popSize, runIndex, maxiter=75000, maxGen=1500, w=0.5, pc=0.3, seed=None):
	hostID=MPI.hostID()
	taskID=MPI.taskID()
	print(str(hostID)+" "+str(taskID)+(" evaluating %s, run=%2d, popsize=%3d" % (prob.name, runIndex+1, popSize)))
	
	# Value of seed must be None so that every run has a different random number sequence
	opt=DifferentialEvolution(
		prob, prob.xl, prob.xh, debug=0, maxiter=maxiter, 
		maxGen=maxGen, populationSize=popSize, w=w, pc=pc, seed=None
	)
	cc=CostCollector()
	opt.installPlugin(cc)
	opt.reset(zeros(len(prob.xl)))
	opt.run()
	cc.finalize()
	
	return (opt.f, cc.fval)
	

Now let’s take a look at the main program in file funclib.py in folder demo/parallel/desweep/. First we import some things

import os
from numpy import array, zeros, arange
from pickle import dump
from pyopus.problems import glbc
from pyopus.parallel.cooperative import cOS 
from pyopus.parallel.mpi import MPI
import funclib

Next we define the parameters of the run.

# First three problems of GlobalBCsuite
funcIndicesList=[0,1,2] 
# For the whole GlobalBCsuite uncomment this
# range(len(glbc.GlobalBCsuite))

# Population sizes for which we want to evaluate the performance of DE
popSizeList=[10, 20] # range(10, 101, 40) 

# Number of runs
nRun=3 

# Number of generations (unlimited)
nGen=None # or a number to limit the number of generations

# Maximal number of function evaluations
maxIter=75000 # or None for no limit

# Do we want to write function evaluation history to files (one file per run)
writeFhist=True

The job generator produces tuples of the form (function, args, kwargs, misc). The last entry of the tuple contains miscellaneous data that helps the results collector to organize the collected results. This entry does not affect job evaluation.

def jobGenerator():
	for atFunc in range(len(funcIndicesList)):
		for atPopSize in range(len(popSizeList)):
			for atRun in range(nRun):
				yield (
					funclib.deRun, 
					[], 
					{
						'prob': glbc.GlobalBCsuite[funcIndicesList[atFunc]](), 
						'popSize': popSizeList[atPopSize], 
						'runIndex': atRun, 
						'maxiter': maxIter, 
						'maxGen': nGen, 
					},
					# Extra data not passed to deRun
					(atFunc, atPopSize, atRun)
				)

The results collector is an unprimed coroutine. It writes the function evaluation history to a file named fhist_f<index>_p<popSize>_r<runIndex>.pck. It also stores the lowest function value in the finalF dictionary.

def resultsCollector(finalF):
	try:
		while True:
			index, job, result = yield
			atFunc, atPopSize, atRun = job[3] # Get extra data
			fBest, fHistory = result
			
			print("Received results for %s, run=%2d, popsize=%3d " % (
				glbc.GlobalBCsuite[funcIndicesList[atFunc]].name, 
				atRun+1, 
				popSizeList[atPopSize]
			))
			
			if writeFhist:
				with open("fhist_f%d_p%d_r%d.pck" % (funcIndicesList[atFunc], popSizeList[atPopSize], atRun), "wb") as fp:
					dump(fHistory, fp, protocol=-1)
				
			
			finalF[atFunc][atPopSize][atRun]=fBest
	except GeneratorExit:
		print("No more results to collect.")

Finally the main part of the program dispatches the generated jobs and writes some summary information to a file named fsummary.pck.

if __name__=='__main__':
	cOS.setVM(MPI(startupDir=os.getcwd()))
	
	# Prepare results storage
	finalF=zeros((len(funcIndicesList), len(popSizeList), nRun))
	
	# Dispatch jobs
	cOS.dispatch(
		jobList=jobGenerator(), 
		collector=resultsCollector(finalF), 
		remote=True 
	)
	
	# Prepare function names
	names=[]
	dims=[]
	for i in funcIndicesList:
		prob=glbc.GlobalBCsuite[i]()
		names.append(prob.name)
		dims.append(prob.n)
	
	# Store summary
	summary={
		'funcIndices': funcIndicesList, 
		'names': names, 
		'dims': dims, 
		'populationSizes': popSizeList, 
		'finalF': finalF
	}
	with open("fsummary.pck", "wb") as fp:
		dump(summary, fp, protocol=-1)
	
	cOS.finalize()

You can run the example on a ,local computer in parallel using the available processors with

mpirun -n <nproc> python depop.py

See Using MPI with PyOPUS on how to run a program across mutiple machines.

After the run is finished you end up with a bunch of .pck files. To display the stored function evaluation history for a particular run, you can use the program in File an.py in folder demo/parallel/desweep/

# Load results summary, plot function value history for given function, 
# population size, and run. 
# Run this by typing 
#  python3 an.py

from numpy import array, zeros, arange
from pickle import dump, load
from pyopus.plotter import interface as pyopl
from pprint import pprint

if __name__=='__main__':
	# Which history to load
	findex=0     # Function index
	popsize=10   # Population size
	runindex=0   # Run index (starting with 0)
		
	# Load the summary and print it
	with open("fsummary.pck", "rb") as f:
		summary=load(f)
	pprint(summary)
	
	# Read the history from a file
	with open("fhist_f%d_p%d_r%d.pck" % (findex, popsize, runindex), "rb") as f:
		fhist=load(f)
	
	# Create a figure
	f1=pyopl.figure()
	pyopl.lock(True)
	if pyopl.alive(f1):
		# Draw the history
		ax=f1.add_subplot(1,1,1)
		ax.semilogy(arange(len(fhist))/popsize, fhist)
		ax.set_xlabel('generation')
		ax.set_ylabel('f')
		ax.set_title('Progress of differential evolution')
		ax.grid()
		pyopl.draw(f1)

	pyopl.lock(False)
	
	# Wait for the plot window to close
	pyopl.join()