Source code for pyopus.design.wcd

"""
.. inheritance-diagram:: pyopus.design.wcd
    :parts: 1

**Worst case distance analysis (PyOPUS subsystem name: WCD)**

Computes the worst case distances of the circuit's performances. 
Statistical parameters are assumed to be independent with 
zero mean and standard deviation equal to 1. 
""" 

from .wc import WorstCase, dbgName
from ..optimizer import optimizerClass
from ..optimizer.base import Plugin, Reporter
from ..misc.debug import DbgMsgOut, DbgMsg
from ..evaluator.performance import PerformanceEvaluator, updateAnalysisCount
from ..evaluator.aggregate import *
from ..evaluator.auxfunc import listParamDesc, paramDict, paramList, dictParamDesc
from ..parallel.cooperative import cOS
from .. import PyOpusError
import numpy as np

import matplotlib.pyplot as plt

__all__ = [ 'WorstCaseDistance' ] 


class Function(object):
	def __init__(self, n, beta0=0.0):
		"""si
		Function - distance from origin. 
		
		First *n* parameters are statistical parameters. 
		"""
		self.n=n 
		self.beta0=beta0
		
	def __call__(self, x):
		return np.array((x[:self.n]**2).sum())+self.beta0**2
	
class Function_gH(object):
	def __init__(self, n):
		"""
		Returns the gradient and the Hessian of the distance from 
		the origin. 
		
		First *n* parameters are statistical parameters. 
		"""
		self.n=n 
	
	def __call__(self, x):
		g=2*x
		g[self.n:]=0.0	# Derivatives wrt operating parameters are 0
		Hdiag=x.copy()
		Hdiag[:self.n]=2.0
		Hdiag[self.n:]=0.0 # Derivatives wrt operating parameters are 0
		
		return g, np.diag(Hdiag)

class optReporter(Reporter):
	def __init__(self, evaluator, wcName, component, lower):
		Reporter.__init__(self)
		self.evaluator=evaluator
		self.wcName=wcName
		self.component=component
		self.worst=None
		self.hworst=None
		self.lower=lower
		
	def reset(self):
		Reporter.reset(self)
		self.worst=None
		self.hworst=None
		
	def __call__(self, x, ft, opt):
		# perf=self.evaluator.results[self.wcName]['default']
		perf=next(iter(self.evaluator.results[self.wcName].values()))
		if self.component is not None and perf is not None:
			perf=perf[self.component]
		if opt.niter==opt.bestIter:
			self.worst=perf
			if type(ft) is tuple:
				self.hworst=opt.aggregateConstraintViolation(ft[1])
		str="iter=%d perf=%e " % (opt.niter, perf)
		if self.hworst is not None:
			str+=" hworst=%e" % (self.hworst)
		str+=" worst=%e" % (self.worst)
		str+=" "+dbgName(self.wcName, self.component)
		if 'step' in opt.__dict__:
			str+=" step=%e" % (opt.step)
		DbgMsgOut("WCD", str)
		
class wcdReporter(Reporter):
	def __init__(self, nsph, evaluator, beta0, wcdSign, wcName, component, goal, lower):
		Reporter.__init__(self)
		self.nsph=nsph
		self.evaluator=evaluator
		self.beta0=beta0
		self.wcdSign=wcdSign
		self.wcName=wcName
		self.component=component
		self.goal=goal
		self.wcd=None
		self.wcdperf=None
		self.hworst=None
		self.lower=lower
		
	def reset(self):
		Reporter.reset(self)
		self.worst=None
		self.hworst=None
		
	def __call__(self, x, ft, opt):
		dist=self.wcdSign*(x[:self.nsph]**2+self.beta0**2).sum()**0.5
		# perf=self.evaluator.results[self.wcName]['default']
		perf=next(iter(self.evaluator.results[self.wcName].values()))
		if self.component is not None and perf is not None:
			perf=perf[self.component]
		if type(ft) is tuple:
			h=opt.aggregateConstraintViolation(ft[1])
		else:
			h=None
		if opt.niter==opt.bestIter:
			self.wcd=dist
			self.hworst=h
			self.wcdperf=perf	
		str="iter=%d dist=%.4f h=%e h_worst=%e wcd=%.4f perf=%e" % (opt.niter, dist, h, self.hworst, self.wcd, perf)
		if 'step' in opt.__dict__:
			str+=" step=%e" % (opt.step)
		str+=" "+dbgName(self.wcName, self.component)
		DbgMsgOut("WCD", str)
		
class evalCollector(Plugin):
	"""
	Collects the values of the performance across iterations. 
	
	The values are stored in the ``history`` member. 
	"""
	def __init__(self, evaluator, wcName, component):
		Plugin.__init__(self)
		self.evaluator=evaluator
		self.wcName=wcName
		self.component=component
		self.history=[]
	       
	def __call__(self, x, ft, opt):
		# self.history.append(self.evaluator.results[self.wcName]['default'])
		hval=next(iter(self.evaluator.results[self.wcName].values()))
		if self.component is not None and hval is not None:
			hval=hval[self.component]
		self.history.append(hval)
	
	def reset(self):
		self.history=[]

class wcdStopper(Plugin):
	"""
	Stops the algorithm when the angle between x and the gradient of c 
	becomes smaller than a threshold given by relative tolerance 
	*angleTol* and the constraint function is close enough to 0
	within tolerance *constrTol*. 
	
	*n* is the number of statistical parameters
	"""
	def __init__(self, constrTol, angleTol, maxStep, name, component, n, wcdSign, debug=0):
		Plugin.__init__(self)
		self.constrTol=constrTol
		self.angleTol=angleTol
		self.n=n
		self.wcdSign=wcdSign
		self.maxStep=maxStep
		self.debug=debug
		self.name=name
		self.component=component
		
	def reset(self):
		self.it=None
	
	def optimality(self, x, ct, opt):
		if (
			self.constrTol is not None and self.angleTol is not None and 
			opt.xgo is not None and opt.modJ is not None and self.n>0
		):
			cdelta=np.abs(ct)
			g=(opt.modJ.reshape(x.size))[:self.n]
			xv=x[:self.n]
			l1=(g**2).sum()**0.5
			l2=(xv**2).sum()**0.5
			if l1==0.0 or l2==0.0:
				angle=0.0
			else:
				angle=np.arccos(self.wcdSign*(g*xv).sum()/l1/l2)*180/np.pi
			return cdelta, angle 
		else:
			return None, None
	
	def __call__(self, x, ft, opt):
		stop=False
		
		# Check origin
		cdelta, angle = self.optimality(opt.xgo, opt.co, opt)
		if cdelta is not None and opt.step<=self.maxStep and cdelta<self.constrTol and angle<self.angleTol:
			stop=True
			self.it=opt.ito
			self.x=opt.xgo
		if self.debug>1 and cdelta is not None:
			DbgMsgOut("STOP origin", "h=%.8e angle=%f %s" % (cdelta, angle, dbgName(self.name, self.component)))
			if stop:
				DbgMsgOut("STOP", "Stopping %s at %d" % (dbgName(self.name, self.component), opt.niter))
				
		opt.stop=opt.stop or stop
		return None

[docs]class WorstCaseDistance(WorstCase): """ Performs worst case distance analysis. See the :class:`~pyopus.evaluator.performance.PerformanceEvaluator` class for details on *heads*, *analyses*, *measures*, and *variables*. *corners* is the dictionary of corner definitions, exactly one corner for every head. These are prototype corners and do not specify any operating or statistical parameters. Performance measures that are vectors can also be a subject of worst case distance analysis. See the :class:`~pyopus.design.wc.WorstCase` class for the explanation of *statParamDesc*, *opParamDesc*, *fixedParams*, *maxPass*, *wcStepTol*, *stepTol*, *constrTol*, *angleTol*, *stepScaling*, *perturbationScaling*, *maximalStopperStep*, *evaluatorOptions*, *sensOptions*, *screenOptions*, *opOptimizarOptions*, *optimizerOptions*, and *spawnerLevel* options. Setting *debug* to a value greater than 0 turns on debug messages. The verbosity is proportional to the specified number. *sigmaBox* is the box constraint on normalized statistical parameters (i.e. normalized to N(0,1)). If *linearWc* is set to ``True`` the initial point in the space of the statistical parameters is computed using linear worst case distance analysis. If *alternating* is set to ``True`` the worst case is computed by alternating optimizations in the operating and statistical parameter space. This is a callable object with an optional argument specifying which worst case distances to compute. If given, the argument must be a list of entries. Every entry is * a tuple of the form (name,type), where name is the measure name and type is ``lower`` or ``upper`` * a string specifying the measure name. In this case the type of computed worst case distance is given by the presence of the ``lower`` and the ``upper`` entries in the measure's description. If no argument is specified, all worst case distances corresponding to lower/upper bounds of all *measures* are computed. The results are stored in the :attr:`results` member. They are represented as a list of dictionaries with the following members: * ``name`` - name of the performance measure * ``component`` - index of the performance measure's component ``None`` for scalar performance measures * ``type`` - ``lower`` or ``upper`` * ``passes`` - number of algorithm passes * ``evals`` - number of evaluations * ``status`` - ``OK``, ``FAILED``, ``OUTSIDE+``, ``OUTSIDE-``, ``SENS+``, or ``SENS-`` * ``nominal`` - nominal performance * ``nominal_wcop`` - performance at nominal statistical parameters and initial worst case operating parameters * ``linwc`` - performance at the linearized worst case distance point * ``wc`` - performance at the worst case distance point * ``lindist`` - linearized worst case distance * ``dist`` - worst case distance * ``op`` - operating parameter values at the worst case distance point * ``stat`` - statistical parameter values at the worst case distance point * ``modules`` - input file modules from the corner where the measure was evaluated * ``head`` - name of the head used for evaluating this measure Status FAILED means that the algorithm failed to converge in *maxPass* passes. Status OUTSIDE+ means that the algorithm faield to find a point satisfying the WCD search constraint for positive WCD. This means that the WCD is a large positive value. Status OUTSIDE- means that the algorithm faield to find a point satisfying the WCD search constraint for negative WCD. This means that the WCD is a large negative value. SENS+ and SENS- are similar to OUTSIDE+ and OUTSIDE-, except that they occur when the sensitivity to statistical parameters is very small (i.e. linear WCD point is at a distance greater than half the sigmaBox diagonal). SENS0 indicates the norm of the sensitivity is zero and the linear worst case distance point is not computed. If the computed worst case distance is zero then it should be considered as not valid. Objects of this type store the number of analyses performed during the last call to the object in a dictionary stored in the :attr:`analysisCount` member. The return value of a call to an object of this class is a tuple holding the results structure and the analysis count dictionary. """ def __init__( self, heads, analyses, measures, corners, statParamDesc, opParamDesc, fixedParams={}, variables={}, debug=0, sigmaBox=10, linearWc=True, alternating=True, maxPass=20, wcStepTol=0.01, stepTol=0.01, constrTol=0.01, angleTol=15, stepScaling=4, perturbationScaling=64, maximalStopperStep=0.5, evaluatorOptions={}, sensOptions={}, screenOptions={ 'contribThreshold': 0.01, 'cumulativeThreshold': 0.25, 'useSens': False, 'squared': True, }, opOptimizerOptions={}, optimizerOptions={}, spawnerLevel=1 ): WorstCase.__init__( self, heads=heads, analyses=analyses, measures=measures, corners=corners, statParamDesc=statParamDesc, opParamDesc=opParamDesc, fixedParams=fixedParams, variables=variables, debug=debug, sigmaBox=sigmaBox, linearWc=linearWc, alternating=alternating, maxPass=maxPass, wcStepTol=wcStepTol, stepTol=stepTol, constrTol=constrTol, angleTol=angleTol, stepScaling=stepScaling, perturbationScaling=perturbationScaling, maximalStopperStep=maximalStopperStep, evaluatorOptions=evaluatorOptions, sensOptions=sensOptions, screenOptions=screenOptions, opOptimizerOptions=opOptimizerOptions, optimizerOptions=optimizerOptions, spawnerLevel=spawnerLevel ) def jobGenerator(self, wcSpecs=None): # Prepare jobs if wcSpecs is None: wcSpecs=list(self.measures.keys()) ii=0 for wcSpec in wcSpecs: if type(wcSpec) is tuple: wcName, wcType = wcSpec wcTypes=[wcType] else: wcName=wcSpec wcTypes=[] if 'lower' in self.measures[wcName]: wcTypes+=['lower'] if 'upper' in self.measures[wcName]: wcTypes+=['upper'] if ( "vector" in self.measures[wcName] and self.measures[wcName]["vector"] ): if ( self.vectorLengths is None or wcName not in self.vectorLengths ): # Error raise PyOpusError(DbgMsg("WCD", ("Number of components unknown for '%s'." % wcName))) components=range(self.vectorLengths[wcName]) else: components=[None] for wcType in wcTypes: for component in components: yield ( self.jobProcessor, [ ii, wcName, self.measureCHD[wcName], component, wcType ] ) ii+=1 def jobProcessor(self, index, name, chd, component, wcType): target=self.measures[name][wcType] return self.compute(name, chd, component, target, wcType=="lower") def jobCollector(self, results, analysisCount): # Collect results while True: index, job, retval = (yield) summary, anCount = retval updateAnalysisCount(analysisCount, anCount) if len(results)<=index: results.extend([None]*(index+1-len(results))) results[index]=summary if self.debug>1: DbgMsgOut("WCD", self.formatResults(results)) def __call__(self, wcSpecs=None): # Clean up results self.results=[] self.analysisCount={} # Initialize results=[] analysisCount=self.initialEvaluation(wcSpecs) cOS.dispatch( jobList=self.jobGenerator(wcSpecs), collector=self.jobCollector(results, analysisCount), remote=self.spawnerLevel<=1 ) # Store results self.results=results self.analysisCount=analysisCount if self.debug>1: DbgMsgOut("WCD", "Analysis count: %s" % str(self.analysisCount)) DbgMsgOut("WCD", "Results:") DbgMsgOut("WCD", self.formatResults(details=True)) return self.results, self.analysisCount def wcdOptimization(self, atStatx, atOpx, lastStep, inNdxStat, inNdxOp, outNdxStat, outNdxOp, ev, beta0, wcdSign, target, norm, wcName, component, lower=True): #print("lower", lower, "target", target, "norm", norm, "sign", wcdSign) # Linear worst case distance linDist=(atStatx**2).sum()**0.5 # Prepare fixed parameters fixedParams={} # Copy fixed parameters fixedParams.update(self.fixedParams) # Screened out statistical parameters initial point fixedParams.update( paramDict(atStatx, self.statNames, outNdxStat) ) # Screened out op parameters initial point fixedParams.update( paramDict(atOpx, self.opNames, outNdxOp) ) # Prepare optimization parameter names, bounds screenedStatNames=[] screenedOpNames=[] nScrStat=len(inNdxStat) for ii in inNdxStat: screenedStatNames.append(self.statNames[ii]) for ii in inNdxOp: screenedOpNames.append(self.opNames[ii]) paramNames=screenedStatNames+screenedOpNames # Prepare bounds paramLo=np.hstack((-np.ones(len(inNdxStat))*self.sigmaBox, self.opLo[inNdxOp])) paramHi=np.hstack(( np.ones(len(inNdxStat))*self.sigmaBox, self.opHi[inNdxOp])) # Join statistical and op parameters as initial point paramIni=np.hstack(( atStatx[inNdxStat], atOpx[inNdxOp] )) # Param scaling for op parameters scale=(paramHi-paramLo)/self.stepScaling # Function (distance) fun=Function(nScrStat) # Prepare gradient and Hessian gH=Function_gH(nScrStat) # Constraint ev.setParameters(fixedParams) # Positive constraint = infeasible agg=Aggregator( ev, [{ 'measure': wcName, # Requirement satified for normalized value <=0 'norm': Nabove(target, norm, 1e6) if lower else Nbelow(target, norm, 1e6), 'shape': Slinear2(1.0,1.0), 'reduce': Rworst(component) }], paramNames ) # TODO # Wrap in an array (this is not picklable, needs to be replaced when a parallel algorithm is used) con=lambda x: np.array([agg(x)]) # Prepare optimizer optimizerDefaults={ #'stepBase': 8.0, 'meshBase': 32.0, 'initialMeshDensity': 32.0, #'maxStep': 1.0, 'stopStep': 0.01, 'startStep': lastStep, 'stepBase': 4.0, 'meshBase': 16.0, 'initialMeshDensity': 2.0**20, # 16384.0, 'maxStep': 1.0, 'stopStep': self.stepTol, 'protoset': 2, # minimal=0, maximal=1, 2=orthogonal n+1 'unifmat': 5, # 5 = nxn Sobol 'generator': 2, # UniMADS 'rounding': 0, 'modelOrdering': True, 'lastDirectionOrdering': True, 'roundOnFinestMeshEntry': True, 'speculativePollSearch': True, 'speculativeModelSearch': False, 'model': True, 'HinitialDiag': 0.0, 'boundSnap': True, 'boundSlide': True, 'qpFeasibilityRestoration': False, 'stepCutAffectsUpdate': True, 'speculativeStepAffectsUpdate': True, 'rho':16, 'rhoNeg': 1.0, 'linearUpdate': True, 'simplicalUpdate': True, 'boundStepMirroring': False, 'linearUpdateExtend': False, # 'debug': 2, 'maxiter': None, 'hmax': 100.0, 'cache': True, } optimizerDefaults.update(self.optimizerOptions) # Constraint depends on the wcd type if wcdSign>0: # Look in infeasible region clo=np.array([0]) chi=np.array([np.Inf]) else: # Look in feasible region clo=np.array([-np.Inf]) chi=np.array([0]) opt=optimizerClass("QPMADS")( fun, paramLo, paramHi, fgH=gH, constraints=con, clo=clo, chi=chi, scaling=scale, debug=0, **optimizerDefaults ) collector=evalCollector(ev, wcName, component) # Install plugins if self.debug>1: opt.installPlugin(wcdReporter(nScrStat, ev, beta0, wcdSign, wcName, component, target, lower)) opt.installPlugin(collector) # 5%, 5 deg tol stopper=wcdStopper( constrTol=self.constrTol, angleTol=self.angleTol, maxStep=self.maximalStopperStep, name=wcName, component=component, n=nScrStat, wcdSign=wcdSign, debug=self.debug ) opt.installPlugin(stopper) opt.reset(paramIni) opt.run() # Use stopper's iteration if stopper is not None and stopper.it is not None: it=stopper.it x=stopper.x else: it=opt.bestIter x=opt.x # Extract statistical parameters solStat=paramDict(x[:nScrStat], screenedStatNames) solStat.update(paramDict(atStatx, self.statNames, outNdxStat)) atStatx=np.array(paramList(solStat, self.statNames)) # Extract operating parameters solOp=paramDict(x[nScrStat:], screenedOpNames) solOp.update(paramDict(atOpx, self.opNames, outNdxOp)) atOpx=np.array(paramList(solOp, self.opNames)) # Extract worst case worstPerf=collector.history[it-1] linPerf=collector.history[0] analysisCount={} nevals=opt.niter updateAnalysisCount(analysisCount, ev.analysisCount, opt.niter) # Transform to actual parameter values for storage result={ 'wc': worstPerf, 'dist': wcdSign*(atStatx**2).sum()**0.5, 'stat': self.transform(solStat), 'op': solOp, } if self.linearWc: result['linwc']=linPerf result['lindist']=linDist return result, atStatx, atOpx, opt.step, analysisCount, nevals def compute(self, wcName, chd, component, target, lower=True): # Reset analysis counter analysisCount={} # Extract CHD cornerName, headName, cornerDef = chd if self.debug: if lower: str="lower" else: str="upper" DbgMsgOut("WCD", "Running %s, %s" % (dbgName(wcName, component), str)) # Construct evaluator ev=PerformanceEvaluator( self.heads, self.analyses, self.measures, self.corners, variables=self.variables, activeMeasures=[wcName], paramTransform=self.transform ) # Check corner count for headName, corners in ev.cornersHC.items(): if len(corners)>1: raise PyOpusError(DbgMsg("WC", ("More than one corner specified for head '%s'." % headName))) # Intial statistical parameters atStatx=np.zeros(self.nStat) # Evaluations counter nevals=[] # Worst op parameters #result, atOpx, anCount, nev = cOS.dispatchSingle( # self.opWorstCase, # args=[ev, wcName], # kwargs={'lower': lower}, # remote=self.spawnerLevel<=2 #) result, atOpx, anCount, nev = self.opWorstCase(ev, wcName, component, lower=lower) updateAnalysisCount(analysisCount, anCount) nevals.append(nev) result['modules']=cornerDef['modules'] result['head']=headName # Initial worst performance atWorstCase=result['nominal_wcop'] # Determine the type of the WCD problem alternating=self.alternating if (lower and atWorstCase>=target) or (not lower and atWorstCase<=target): wcdSign=1 else: wcdSign=-1 # Do not optimize alternating op and stat parameters when wcd is negative alternating=False if self.debug: if wcdSign>0: DbgMsgOut("WCD", "Positive wcd expected for %s, %s" % (dbgName(wcName, component), str)) else: DbgMsgOut("WCD", "Negative wcd expected for %s, %s" % (dbgName(wcName, component), str)) atPass=0 inNdxStat=[] inNdxOp=[] lastStep=0.25 zeroSens=False zeroSensNorm=False while True: # Assume op parameters did not change opChanged=False # Confirm op parameters if alternating and atPass>0: # Start with small step #result1, newAtOpx, anCount, nev = cOS.dispatchSingle( # self.opWorstCase, # args=[ev, wcName], # kwargs={'startStep': 1.0/4**2, 'atStatx': atStatx, 'lower': lower}, # remote=self.spawnerLevel<=2 #) result1, newAtOpx, anCount, nev = self.opWorstCase( ev, wcName, component, startStep=1.0/4**2, atStatx=atStatx, lower=lower ) updateAnalysisCount(analysisCount, anCount) nevals.append(nev) # Get new worst case newWorstCase=result1['nominal_wcop'] if ( ((lower and newWorstCase<atWorstCase) or (not lower and newWorstCase>atWorstCase)) and (np.abs(newWorstCase-atWorstCase)/np.abs(result['nominal']-atWorstCase)>=self.constrTol) ): opChanged=True atOpx=newAtOpx atWorstCase=newWorstCase if self.debug: # print(atOpx) # print(newAtOpx) if opChanged: DbgMsgOut("WCD", " OP parameters changed") else: DbgMsgOut("WCD", " OP parameters unchanged") # Sensitivity and screening if self.debug: DbgMsgOut("WCD", " Computing sensitivity, %s, pass %d" % (dbgName(wcName, component), atPass+1)) if wcdSign>0: useType=lower else: useType=not lower # Skip op sensitivity when alternating # No need to spawn this one, it spawns its own subtasks propDiff, delta, anCount, nev, inStat, inOp = self.sensitivityAndScreening(ev, atStatx, atOpx, wcName, component, useType, skipOp=alternating) updateAnalysisCount(analysisCount, anCount) nevals.append(nev) screenChanged=not (set(inStat).issubset(set(inNdxStat)) and set(inOp).issubset(set(inNdxOp))) # Set of screened parameters unchanged and op parameters values unchanged ... stop if not screenChanged and not opChanged: if self.debug: DbgMsgOut("WCD", " Stopping") result['status']="OK" break # New parameter set - accumulate newNdxStat=set(inNdxStat).union(inStat) newNdxOp=set(inNdxOp).union(inOp) # New parameter set - replace # newNdxStat=set(inStat) # newNdxOp=set(inOp) # Report if self.debug>1: DbgMsgOut("WCD", " Parameter set (%d)" % (len(newNdxStat)+len(newNdxOp))) for ii in newNdxStat: flag=" " if ii in inStat and ii not in inNdxStat: flag="*" DbgMsgOut("WCD", " "+flag+self.statNames[ii]) for ii in newNdxOp: flag=" " if ii in inOp and ii not in inNdxOp: flag="*" DbgMsgOut("WCD", " "+flag+self.opNames[ii]) # Update inNdxStat=newNdxStat inNdxOp=newNdxOp # Complement outNdxStat=list(set(range(self.nStat)).difference(inNdxStat)) outNdxOp=list(set(range(self.nOp)).difference(inNdxOp)) # Make it a list and sort it inNdxStat=list(inNdxStat) inNdxOp=list(inNdxOp) inNdxStat.sort() inNdxOp.sort() # Use beta=3.0 for norm computation norm=np.abs(3.0*((propDiff/delta)[:self.nStat]**2).sum()**0.5) if norm==0.0: norm=1.0 # Compute sensitivity to statistical parameters sens=(propDiff/delta) ssens=sens[:self.nStat] # Verify zero sensitivity nrm=(ssens**2).sum()**0.5 if ( nrm==0.0 or # compare required distance for reaching target # with half of sigmaBox main diagonal np.abs(target-atWorstCase)/nrm>=self.sigmaBox*(len(self.statNames))**0.5 ): zeroSens=True else: zeroSens=False # Compute linear worst case distance point zeroSensNorm=False if atPass==0 and self.linearWc and len(inNdxStat)>0: # if self.linearWc and len(inNdxStat)>0: # Consider only screened parameters ssens[outNdxStat]=0.0 # Compute sensitivity norm snorm=(ssens**2).sum()**0.5 # print(target, atWorstCase, snorm) if snorm!=0.0: # Compute linear step length dl=(target-atWorstCase)/snorm # Limit step length to 10 if dl>10.0: dl=10.0 if dl<-10.0: dl=-10.0 # Compute step atStatdxLin=dl*ssens/snorm else: # Zero sensitivity atStatdxLin=np.zeros(self.nStat) zeroSensNorm=True atStatxLin=atStatx+atStatdxLin # Slide along boundary atStatxLin=np.where(atStatxLin>self.sigmaBox, self.sigmaBox, atStatxLin) atStatxLin=np.where(atStatxLin<-self.sigmaBox, -self.sigmaBox, atStatxLin) # Move atStatx=atStatxLin # Compute beta0 corresponding to screened out statistical parameters beta0=(np.array(atStatx)[outNdxStat]**2).sum()**0.5 # Main optimization if self.debug: DbgMsgOut("WCD", " Optimization, %s, pass %d" % (dbgName(wcName, component), atPass+1)) #result1, atStatx, atOpx, lastStep, anCount, nev = cOS.dispatchSingle( # self.wcdOptimization, # args=[atStatx, atOpx, lastStep*4, inNdxStat, inNdxOp, outNdxStat, outNdxOp, ev, beta0, wcdSign, target, norm, wcName, lower], # remote=self.spawnerLevel<=2 #) result1, atStatx, atOpx, lastStep, anCount, nev = self.wcdOptimization( atStatx, atOpx, lastStep*4, inNdxStat, inNdxOp, outNdxStat, outNdxOp, ev, beta0, wcdSign, target, norm, wcName, component, lower ) updateAnalysisCount(analysisCount, anCount) nevals.append(nev) # Get new worst case atWorstCase=result1['wc'] # Ignore linear wc result for all but the first pass if atPass>0 or not self.linearWc: del result1['linwc'] del result1['lindist'] result.update(result1) # Increase pass count atPass+=1 if atPass>=self.maxPass: if self.debug: DbgMsgOut("WCD", " Maximum number of passes reached, stopping (%s)" % dbgName(wcName, component)) result['status']="FAILED" break # print(wcName, atPass, ":", nevals, " ..", np.array(nevals).sum()) result['name']=wcName result['component']=component result['target']=target result['passes']=atPass if lower: result['type']="lower" else: result['type']="upper" result['evals']=np.array(nevals).sum() # Verify if result is outside search region if result['status']=='OK': # Check if it is in the search region constraint=(result['wc']-result['target'])/norm if result['type']=='lower': constraint=-constraint if wcdSign<0: constraint=-constraint # Values above -constrTol are OK if constraint<-self.constrTol: result['status']='OUTSIDE'+('+' if wcdSign>0 else '-') # Verify zero sensitivity if zeroSens: if zeroSensNorm: result['status']='SENS0' else: result['status']='SENS'+('+' if wcdSign>0 else '-') return result, analysisCount
[docs] def formatResults(self, results=None, nMeasureName=10, nResult=15, nPrec=5, nEvalPrec=4, details=False): """ Formats the results as a string. *nMeasureName* specifies the formatting width for the performance measure name. *nResult* and *nPrec* specify the formatting width and the number of significant digits for the performance measure values. *nEvalPrec* is the number of spaces reserved for the evaluations count. If *details* is ``True`` the nominal performance, the performance the worst case operating parameters and the nominal statistical parameters, and the linear worst case distance are also formatted. """ if results is None: results=self.results txt="" for res in results: if res is None: continue txt+="%*s" % (nMeasureName, dbgName(res['name'],res['component'])) if res['type']=="upper": txt+=" < " else: txt+=" > " txt+="%*.*e" % (nResult, nPrec, res['target']) if details: txt+=" nom=%*.*e" % (nResult, nPrec, res['nominal']) txt+=" wcop=%*.*e" % (nResult, nPrec, res['nominal_wcop']) if self.linearWc: txt+=" lin_wcd=%*.*e" % (nResult, nPrec, res['lindist']) txt+=" wc=%*.*e wcd=%*.*e neval=%*d" % (nResult, nPrec, res['wc'], nResult, nPrec, res['dist'], nEvalPrec, res['evals']) txt+=" %d" % res['passes'] txt+=" "+res['status']+"\n" return txt