Source code for pyopus.optimizer.psade

# -*- coding: UTF-8 -*-

"""
.. inheritance-diagram:: pyopus.optimizer.psade
    :parts: 1

**Box constrained parallel SADE global optimizer 
(PyOPUS subsystem name: PSADEOPT)**

SADE stands for Simulated Annealing with Differential Evolution. 

A provably convergent (parallel) global optimization algorithm. 

The algorithm was published in [psade]_. 

.. [psade] Olenšek J., Tuma T., Puhan J., Bűrmen Á.: A new asynchronous parallel 
           global optimization meth od based on simulated annealing and differential 
           evolution. Applied Soft Computing Journal, vol. 11, pp. 1481-1489, 2011. 
"""

from ..misc.debug import DbgMsgOut, DbgMsg
from .base import BoxConstrainedOptimizer, normalizer, denormalizer
from ..parallel.cooperative import cOS

from numpy import array, concatenate, arange, zeros, random, isfinite, log, exp, tan, pi, concatenate, reshape
from numpy.random import RandomState
from time import sleep, time

__all__ = [ 'ParallelSADE' ] 


[docs]class ParallelSADE(BoxConstrainedOptimizer): """ Parallel SADE global optimizer class If *debug* is above 0, debugging messages are printed. The lower and upper bound (*xlo* and *xhi*) must both be finite. *populationSize* is the number of inidividuals (points) in the population. *pLocal* is the probability of performing a local step. *Tmin* is the minimal temperature of the annealers. *Rmin* and *Rmax* are the lower and upper bound on the range parameter of the annealers. *wmin*, *wmax*, *pxmin*, and *pxmax* are the lower and upper bounds for the differential evolution's weight and crossover probability parameters. *seed* is the random number generator seed. Setting it to ``None`` uses the builtin default seed. All operations are performed on normalized points ([0,1] interval corresponding to the range defined by the bounds). If *spawnerLevel* is not greater than 1, evaluations are distributed across available computing nodes (that is unless task distribution takes place at a higher level). See the :class:`~pyopus.optimizer.base.BoxConstrainedOptimizer` for more information. """ def __init__(self, function, xlo, xhi, debug=0, fstop=None, maxiter=None, populationSize=20, pLocal=0.01, Tmin=1e-10, Rmin=1e-10, Rmax=1.0, wmin=0.5, wmax=1.5, pxmin=0.1, pxmax=0.9, seed=0, minSlaves=1, maxSlaves=None, spawnerLevel=1 ): BoxConstrainedOptimizer.__init__(self, function, xlo, xhi, debug, fstop, maxiter) # Local random generator with fixed seed self.randGen=RandomState(seed) if seed is None: self.randGen.seed() if debug>1: debugEvt=1 else: debugEvt=0 # Number of variables is the number of dimensions # Pupulation size self.Np=populationSize # Local step probability self.pLocal=pLocal # Minimal temperature self.Tmin=Tmin # Minimal range self.Rmin=Rmin # Maximal range self.Rmax=Rmax # Differential operator weight bounds self.wmin=wmin self.wmax=wmax # Crossover probability bounds self.pxmin=pxmin self.pxmax=pxmax # Population (normalized points) self.population=None self.fpopulation=None # Temperatures and ranges self.T=None self.R=None # Differential operator weights and # Crossover probabilities self.w=None self.px=None # Indices of temperatures corresponding to population members self.indices=None # Stats self.accBetter=None self.accWorst=None self.rej=None self.localAcc=None self.localRej=None self.parentCount=None # Point status (set of points to be evaluated and points being evaluated) self.pointsForEvaluation=set() self.pointsBeingEvaluated=set() # Which population point to send out next. This is for master's use. self.ip=None self.spawnerLevel=spawnerLevel self.minSlaves=minSlaves self.maxSlaves=maxSlaves
[docs] def initialPopulation(self, Np): """ Constructs and returns the initial population with *Np* members. """ # Random permutations of Np subintervals for every variable # One column is one variable (it has Np rows) perm=zeros([Np, self.ndim]) for i in range(self.ndim): # perm[:,i]=(permutation(Np)) perm[:,i]=(self.randGen.permutation(Np)) # Random relative interval coordinates (0,1) # randNum=rand(Np, self.ndim) randNum=self.randGen.rand(Np, self.ndim) # Build Np points from random subintervals return self.denormalize((perm+randNum)/Np)
[docs] def initialTempRange(self): """ Chooses the values of the range and temperature parameters for the annealers. """ # Maximal temperature Tmax=-(self.fpopulation.max()-self.fpopulation.min())/log(0.9) # Exponential constants cT=1.0/(self.Np-1)*log(Tmax/self.Tmin) cR=1.0/(self.Np-1)*log(self.Rmax/self.Rmin) cw=1.0/(self.Np-1)*log(self.wmax/self.wmin) cpx=1.0/(self.Np-1)*log(self.pxmax/self.pxmin) # Temperatures self.T=Tmax*exp(-cT*arange(self.Np)) # Ranges self.R=self.Rmax*exp(-cR*arange(self.Np)) # Differential operator weights self.w=self.wmax*exp(-cw*arange(self.Np)) # Crossover probabilities self.px=self.pxmax*exp(-cpx*arange(self.Np)) # Stats self.accBetter=zeros(self.Np) self.accWorse=zeros(self.Np) self.rej=zeros(self.Np) self.localAcc=0 self.localRej=0 self.parentCount=zeros(self.Np)
[docs] def contest(self, ip): """ Performs a contest between two random points in the population for better values of the temperature and range parameter. The first point's index is *ip*. The second point is chosen randomly. """ # Select two random points # rp=permutation(self.Np) rp=self.randGen.permutation(self.Np) i1=ip i2=rp[0] if i2==i1: i2=rp[1] # Function values f1=self.fpopulation[i1] f2=self.fpopulation[i2] # Temperature indices it1=self.indices[i1] it2=self.indices[i2] # Temperatures T1=self.T[it1] T2=self.T[it2] # Calculate PT # PT=min(1, exp((f1-f2)*(1/T1-1/T2))) PT=exp(min(0, (f1-f2)*(1/T1-1/T2))) # Random number, is it lower than PT # if rand(1)[0]<PT: if self.randGen.rand(1)[0]<PT: # Yes, swap T, R, w, and px by swapping the indices self.indices[i1]=it2 self.indices[i2]=it1
[docs] def selectControlParameters(self): """ Selects the point (annealer) whose range, temperature, differential operator weight and crossover probability will be used in the global step. Returns the index of the point. """ # Sort cost function values (lowest first) - get indices ndx=self.fpopulation.argsort() # Selection probabilities rank=zeros(self.Np) rank[ndx]=arange(self.Np) probs=exp(-rank) probs/=probs.sum() # Cumulative probability cumprobs=probs.cumsum() cumprobs=concatenate((array([0.0]),cumprobs)) # Select random number # nr=rand(1)[0] nr=self.randGen.rand(1)[0] # Find the interval to which it belongs inInterval=(cumprobs[:-1]<=nr) & (nr<cumprobs[1:]) iR=inInterval.nonzero()[0][0] # Find the corresponding temperature index itR=self.indices[iR] return itR
[docs] def generateTrialPrerequisites(self): """ Generates all the prerequisites for the generation of a trial point. Choosed 5 random normalized points (xi1..xi5) from the population. Returns a tuple comprising the normalized point xi1, and two differential vectors xi2-xi3, xi4-xi5. """ # Generate random permutation # rp=permutation(self.Np) rp=self.randGen.permutation(self.Np) i1=rp[0] i2=rp[1] i3=rp[2] i4=rp[3] i5=rp[4] # Points xi1=self.population[i1,:] xi2=self.population[i2,:] xi3=self.population[i3,:] xi4=self.population[i4,:] xi5=self.population[i5,:] return (xi1, xi2-xi3, xi4-xi5)
[docs] def generateTrial(self, xip, xi1, delta1, delta2, R, w, px): """ Generates a normalized trial point for the global search step. A mutated normalized point is generated as ``xi1 + delta1*w*random1 + delta2*w*random2`` where *random1* and *random2* are two random numbers from the [0,1] interval. A component-wise crossover of the mutated point and *xip* is performed with the crossover probability *px*. Then every component of the resulting point is changed by a random value generated from the Cauchy probalility distribution with parameter *R*. Finally the bounds are enforced by selecting a random value between *xip* and the violated bound for every component of the generated point that violates a bound. Returns a normalized point. """ # Mutated point # xm=xi1+delta1*w*rand(1)[0]+delta2*w*rand(1)[0] xm=xi1+delta1*w*self.randGen.rand(1)[0]+delta2*w*self.randGen.rand(1)[0] # Crossover # mask=(rand(self.ndim)<px) mask=(self.randGen.rand(self.ndim)<px) indices=mask.nonzero()[0] xt=xip.copy() xt[indices]=xm[indices] # Random step (Cauchy) # xt=xt+R*tan(pi*(rand(self.ndim)-0.5)) xt=xt+R*tan(pi*(self.randGen.rand(self.ndim)-0.5)) # Lower bound violated, fix it mask=xt<0.0 indices=mask.nonzero()[0] if len(indices)>0: # xt[indices]=xip[indices]+rand(len(indices))*(0.0-xip[indices]) xt[indices]=xip[indices]+self.randGen.rand(len(indices))*(0.0-xip[indices]) # Upper bound violated, fix it mask=xt>1.0 indices=mask.nonzero()[0] if len(indices)>0: # xt[indices]=xip[indices]+rand(len(indices))*(1.0-xip[indices]) xt[indices]=xip[indices]+self.randGen.rand(len(indices))*(1.0-xip[indices]) return xt
[docs] def accept(self, xt, ft, ip, itR): """ Decides if a normalized point *xt* should be accepted. *ft* is the corresponding cost function value. *ip* is the index of the best point in the population. *itR* is the index of the point (annealer) whose temperature is used in the Metropolis criterion. Returns a tuple (*accepted*, *bestReplaced*) where *accepted* is ``True`` if the point should be accpeted and *bestReplaced* is ``True`` if accepting *xt* will replace the best point in the population. """ # w and px adaptation is not implemented # Acceptance probability (Metropolis) fp=self.fpopulation[ip] # PM=min(1.0, exp(-(ft-fp)/self.T[itR])) PM=exp(min(0, -(ft-fp)/self.T[itR])) # Is ip the best point in the population ipIsBest=(self.fpopulation[ip]==self.fpopulation.min()) # Test acceptance # if rand(1)[0]<PM: if self.randGen.rand(1)[0]<PM: # Are we trying to replace best point with a worse one if ft>=fp and ipIsBest: # Yes, but won't do it pass else: # Replace parent self.population[ip,:]=xt self.fpopulation[ip]=ft # Stats if ft<fp: self.accBetter[itR]+=1 else: self.accWorse[itR]+=1 return True, ipIsBest # Stats self.rej[itR]+=1 return False, ipIsBest
[docs] @classmethod def localStep(cls, xa, fa, d, origin, scale, rnum1, rnum2, evf, args): """ Performs a local step starting at normalized point *xa* with the corresponding cost function value *fa* in direction *d*. Runs remotely. The local step is performed with the help of a quadratic model. Two or three additional points are evaluated. The return value is a tuple of three tuples. The furst tuple lists the evaluated normalized point, the second one lists the corresponding cost function values and the third one the corresponding annotations. All three tuples must have the same size. Returns ``None`` if something goes wrong (like a failure to move a point within bounds). """ # Relative position of xb # db=rand(1)[0] db=rnum1 xb=xa+d*db # Force xb inside bounds count=0 while (xb<0.0).any() or (xb>1.0).any(): db/=2.0 xb=xa+d*db if count>10: return None count+=1 # Evaluate f(xb), store annotations # fb=self.fun(self.denormalize(xb)) args[0]=denormalizer(xb, origin, scale) fb,ab=evf(*args) #fb=self.fun() #ab=self.annotations # Direction of decrease if fb<fa: # Origin in xb doffs=db # dc=2*rand(1)[0]*db dc=2*rnum2*db else: # Origin in xa doffs=0 # dc=-2*rand(1)[0]*db dc=-2*rnum2*db # Third point xc=xa+d*(doffs+dc) # Force xc inside bounds count=0 while (xc<0.0).any() or (xc>1.0).any(): dc/=2.0 xc=xa+d*(doffs+dc) if count>10: # Giving up return ((xb, ), array([fb]), (ab, )) count+=1 # Fix dc so that the origin is in xa dc=dc+doffs # Evaluate f(xc) args[0]=denormalizer(xc, origin, scale) fc,ac=evf(*args) # fc=self.fun(self.denormalize(xc)) # ac=self.annotations # Quadratic model # dd: 0 db dc # f: fa fb fc # f(dd) = coefA * dd^2 + coefB * dd + coefC if db==0 or dc==0 or db==dc: # Can't calculate model, giving up f=zeros(2) f[0]=fb f[1]=fc return ((xb, xc), f, (ab, ac)) coefC=fa coefA=((fb-fa)/db-(fc-fa)/dc)/(db-dc) coefB=((fb-fa)/db*dc-(fc-fa)/dc*db)/(dc-db) # Is the model convex if coefA>0: # Minimum dmin=-coefB/(2*coefA) xd=xa+d*dmin # Is minimum inside bounds? if (xd<0.0).any() or (xd>1.0).any(): # Minimum outside bounds # Force xd inside bounds count=0 while (xd<0.0).any() or (xd>1.0).any(): dmin/=2.0 xd=xa+d*dmin if count>10: # Giving up f=zeros(2) f[0]=fb f[1]=fc return ((xb, xc), f, (ab, ac)) count+=1 # Evaluate f(xd) args[0]=denormalizer(xd, origin, scale) fd,ad=evf(*args) # fd=self.fun(self.denormalize(xd)) # ad=self.annotations # Return evaluated points f=zeros(3) f[0]=fb f[1]=fc f[2]=fd return ((xb, xc, xd), f, (ab, ac, ad)) else: # Return evaluated points f=zeros(2) f[0]=fb f[1]=fc return ((xb, xc), f, (ab, ac))
# Generate evaluators for points from the initial population def initPopJobGen(self): for ii in range(self.population.shape[0]): if self.debug: DbgMsgOut("PSADEOPT", "Inital point evaluation #%d" % ii) x=self.denormalize(self.population[ii,:]) yield self.getEvaluator(x) # Handle the result of an initial population point evaluation def initPopJobCol(self): while True: index, job, retval = (yield) evf, args = job x=args[0] if self.debug: DbgMsgOut("PSADEOPT", "Inital point evaluation result received #%d" % index) f, annot = retval self.newResult(x, f, annot) self.fpopulation[index]=f
[docs] def run(self): """ Run the algorithm. """ # Reset stop flag of the Optimizer class self.stop=False # Evaluate initial population in parallel cOS.dispatch( jobList=self.initPopJobGen(), collector=self.initPopJobCol(), remote=self.spawnerLevel<=1 ) # Set the parent point index to 0 self.ip=0 # Initialize temperatures and range parameters self.initialTempRange() # Main loop tidStatus={} # Status storage # Run until stop flag set and all tasks are joined while not (self.stop and len(tidStatus)==0): # Spawn initial tasks if slots are available and maximal number of tasks is not reached # Spawn one task if there are no tasks while ( # Spawn global search if stop flag not set not self.stop and ( # no tasks running, need at least one task, spawn len(tidStatus)==0 or # too few slaves in a parallel environment, force spawn regardless of free slots (cOS.slots()>0 and len(tidStatus)<self.minSlaves) or # free slots (with joined tasks) available and less than maximal slaves, spawn (cOS.freeSlots()-cOS.finishedTasks()>0 and (self.maxSlaves is None or len(tidStatus)<self.maxSlaves)) ) ): # Contest for better temperature and range parameters self.contest(self.ip) # Choose control parameters itR = self.selectControlParameters() # Get parent point xip=self.population[self.ip,:] # Generate trial point prerequisites (xi1, delta1, delta2) = self.generateTrialPrerequisites() # Generate trial point xt=self.generateTrial(xip, xi1, delta1, delta2, self.R[itR], self.w[itR], self.px[itR]) # Prepare evaluator evaluator=self.getEvaluator(self.denormalize(xt)) # Spawn a global search task tid=cOS.Spawn(evaluator[0], args=evaluator[1], remote=self.spawnerLevel<=1, block=True) # Store the job tidStatus[tid]={ 'itR': itR, 'ip': self.ip, 'global': True, 'xt': xt.copy(), # normalized point 'job': evaluator, } # Go to next parent self.ip = (self.ip + 1) % self.Np if self.debug: DbgMsgOut("PSADEOPT", "Started global search, task "+str(tid)) # If there are no free slots left, stop spawning if cOS.freeSlots()<=0: break # Join task tid,retval = cOS.Join(block=True).popitem() st=tidStatus[tid] del tidStatus[tid] # Get stored information itR=st['itR'] ip=st['ip'] # What was it running? if st['global']: # Global search finished evf, args = st['job'] xdn=args[0] # denormalized point f, annot = retval xt=st['xt'] if self.debug: DbgMsgOut("PSADEOPT", "Received global search result from task "+str(tid)) # Register result self.newResult(xdn, f, annot) # Accept point (accepted, ipIsBest)=self.accept(xt, f, ip, itR) self.parentCount[ip]+=1 # Debug message if self.debug and accepted: DbgMsgOut("PSADEOPT", "Global search point accepted, isBest=%d" % ipIsBest) # Do we want local search # if accepted or ipIsBest or rand(1)[0]<self.pLocal: if accepted or ipIsBest or self.randGen.rand(1)[0]<self.pLocal: # Local search # Choose two random points # rp=permutation(self.Np) rp=self.randGen.permutation(self.Np) i1=rp[0] i2=rp[1] # Points xi1=self.population[i1,:] xi2=self.population[i2,:] # Difference vector delta=(xi1-xi2) # Origin xa=self.population[ip,:] fa=self.fpopulation[ip] # Two random numbers # rnum1=rand(1)[0] # rnum2=rand(1)[0] rnum1=self.randGen.rand(1)[0] rnum2=self.randGen.rand(1)[0] # Spawn a local search task tid=cOS.Spawn( self.localStep, args=[xa, fa, delta, self.normOrigin, self.normScale, rnum1, rnum2, evf, args], remote=self.spawnerLevel<=1, block=True ) # Store the job st['global']=False tidStatus[tid]=st # Debug message if self.debug: DbgMsgOut("PSADEOPT", "Started local search, task "+str(tid)) else: # Local search finished if retval is None: # Local step failed if self.debug: DbgMsgOut("PSADEOPT", "Local step failed, task "+str(tid)) else: # Local step OK if self.debug: DbgMsgOut("PSADEOPT", "Received local search points from task "+str(tid)) # Unpack results x,f,annot = retval # Get parent xip=self.population[ip,:] fip=self.fpopulation[ip] # Sort function values (lowest f last), get indices ndx=(f.argsort())[-1::-1] # Register results for ii in ndx: self.newResult(self.denormalize(x[ii]), f[ii], annot[ii]) # Is the best point better than parent ibest=ndx[-1] if f[ibest]<fip: # Yes, replace parent self.population[ip,:]=x[ibest] self.fpopulation[ip]=f[ibest] self.localAcc+=1 # Debug message if self.debug: DbgMsgOut("PSADEOPT", "Replacing parent with local step result.") else: self.localRej+=1
[docs] def check(self): """ Checks the optimization algorithm's settings and raises an exception if something is wrong. """ BoxConstrainedOptimizer.check(self) # We require box constraints if (self.xlo is None): raise Exception(DbgMsg("PSADEOPT", "Lower bound is not defined.")) if (self.xhi is None): raise Exception(DbgMsg("PSADEOPT", "Upper bound is not defined.")) # Check if constraints are finite if (~isfinite(self.xlo)).any() or (~isfinite(self.xhi)).any(): raise Exception(DbgMsg("PSADEOPT", "Bounds must be finite."))
[docs] def reset(self, x0=None): """ Puts the optimizer in its initial state. If it is a 2-dimensional array or list the first index is the initial population member index while the second index is the component index. The initial population must lie within bounds *xlo* and *xhi* and have *populationSize* members. If the initial point *x0* is a 1-dimensional array or list, Np-1 population members are generated. Point *x0* is the Np-th member. See the :meth:`initialPopulation` method. If *x0* is ``None`` the Np members of the initial population are generated automatically. """ if x0 is None: # No initial point noInitialPoint=True x0=zeros(len(self.xlo)) self.bound(x0) else: # Initial point/population given noInitialPoint=False x0=array(x0) if len(x0.shape)==2: # Initial population if x0.shape[0]!=self.Np: raise Exception(DbgMsg("DEOPT", "Initial population has must have %d members." % self.Np)) # Take first point to get the dimension BoxConstrainedOptimizer.reset(self, x0[0]) # Check if the population is within bounds if (x0<self.xlo).any() or (x0>self.xhi).any(): raise Exception(DbgMsg("DEOPT", "Initial population is outside bounds.")) # Set initial population self.population=self.normalize(x0.copy()) # Build functiom values vector self.fpopulation=zeros(self.Np) # Build indices # self.indices=permutation(self.Np) self.indices=self.randGen.permutation(self.Np) elif len(x0.shape)==1: # No initial point or initial vector x0 given BoxConstrainedOptimizer.reset(self, x0) # Initialize population ngen=self.Np if noInitialPoint else self.Np-1 self.population=self.normalize(self.initialPopulation(ngen)) # Add initial point to population if not noInitialPoint: self.population=concatenate([ self.population, self.normalize(reshape(x0,(1,self.ndim))) ]) # Build functiom values vector self.fpopulation=zeros(self.Np) # Build indices # self.indices=permutation(self.Np) self.indices=self.randGen.permutation(self.Np) else: raise Exception(DbgMsg("PSADEOPT", "Only initial point (1D) or population (2D) can be set."))