Source code for optimeed.optimize.optimizer

from mimetypes import init
from .characterization import Characterization
from .mathsToPhysics import MathsToPhysics
from .optiAlgorithms import MultiObjective_GA
from optimeed.core import SaveableObject
from optimeed.core import printIfShown, SHOW_DEBUG
from optimeed.core.commonImport import SHOW_INFO, SHOW_WARNING
import traceback
import time
import copy
import math

default = dict()
default['M2P'] = MathsToPhysics()
default['Charac'] = Characterization()
default['Algo'] = MultiObjective_GA()


[docs]class OptimizerSettings(SaveableObject): def __init__(self, theDevice, theObjectives, theConstraints, theOptimizationVariables, theOptimizationAlgorithm=None, theMathsToPhysics=None, theCharacterization=None): """ Prepare the optimizer for the optimization. :param theDevice: object of type :class:`~optimeed.core.interfaceDevice.InterfaceDevice` :param theObjectives: list of objects of type :class:`~optimeed.optimize.objAndCons.interfaceObjCons.InterfaceObjCons` :param theConstraints: list of objects of type :class:`~optimeed.optimize.objAndCons.interfaceObjCons.InterfaceObjCons` :param theOptimizationVariables: list of objects of type :class:`~optimeed.optimize.optiVariable.OptimizationVariable` :param theOptimizationAlgorithm: list of objects of type :class:`~optimeed.optimize.optiAlgorithms.algorithmInterface.AlgorithmInterface` :param theMathsToPhysics: object of type :class:`~optimeed.optimize.mathsToPhysics.interfaceMathsToPhysics.InterfaceMathsToPhysics` :param theCharacterization: object of type :class:`~optimeed.optimize.characterization.interfaceCharacterization.InterfaceCharacterization` :return: """ self.theDevice = theDevice self.theMathsToPhysics = theMathsToPhysics if theMathsToPhysics is not None else default['M2P'] self.theCharacterization = theCharacterization if theCharacterization is not None else default['Charac'] self.theObjectives = theObjectives self.theConstraints = theConstraints self.theOptimizationAlgorithm = theOptimizationAlgorithm if theOptimizationAlgorithm is not None else default['Algo'] self.theOptimizationVariables = theOptimizationVariables
[docs] def get_additional_attributes_to_save(self): return ["theDevice", "theMathsToPhysics", "theCharacterization", "theOptimizationAlgorithm"]
[docs] def get_additional_attributes_to_save_list(self): return ["theObjectives", "theConstraints", "theOptimizationVariables"]
[docs] def get_device(self): return self.theDevice
[docs] def get_M2P(self): return self.theMathsToPhysics
[docs] def get_charac(self): return self.theCharacterization
[docs] def get_optivariables(self): return self.theOptimizationVariables
[docs] def get_objectives(self): return self.theObjectives
[docs] def get_constraints(self): return self.theConstraints
[docs] def get_optialgorithm(self): return self.theOptimizationAlgorithm
[docs]class _Evaluator: """This is the main class that serves as evaluator. This class is NOT process safe (i.e., copy of it might be generated upon process call)""" def __init__(self, optimization_parameters: OptimizerSettings): # Variables defining an optimization problem self.theDevice = optimization_parameters.get_device() self.theMathsToPhysics = optimization_parameters.get_M2P() self.theCharacterization = optimization_parameters.get_charac() self.theObjectives = optimization_parameters.get_objectives() self.theConstraints = optimization_parameters.get_constraints() self.theOptimizationVariables = optimization_parameters.get_optivariables() self.startingTime = None
[docs] def start(self): self.startingTime = time.time()
[docs] def evaluate(self, x): """ Evaluates the performances of device associated to entrance vector x. Outputs the objective function and the constraints, and other data used in optiHistoric. This function is NOT process safe: "self." is a FORK in multiprocessing algorithms. It means that the motor originally contained in self. is modified only in the fork, and only gathered by reaching the end of the fork. :param x: Input mathematical vector from optimization algorithm :return: dictionary, containing objective values (list of scalar), constraint values (list of scalar), and other info (motor, time) """ copyDevice = copy.copy(self.theDevice) self.theMathsToPhysics.fromMathsToPhys(x, copyDevice, self.theOptimizationVariables) characterization_failed = False # noinspection PyBroadException try: self.theCharacterization.compute(copyDevice) except Exception: characterization_failed = True printIfShown("An error in characterization. Set objectives to inf. Error :" + traceback.format_exc(), SHOW_WARNING) nbr_of_objectives = len(self.theObjectives) objective_values = [float('inf')]*nbr_of_objectives nbr_of_constraints = len(self.theConstraints) constraint_values = [float('inf')] * nbr_of_constraints if not characterization_failed: for i in range(nbr_of_objectives): # noinspection PyBroadException try: objective_values[i] = self.theObjectives[i].compute(copyDevice) if math.isnan(objective_values[i]): objective_values[i] = float('inf') except Exception: objective_values[i] = float('inf') printIfShown("An error in objectives. inf value has been set to continue execution. Error:" + traceback.format_exc(), SHOW_DEBUG) for i in range(nbr_of_constraints): # noinspection PyBroadException try: constraint_values[i] = self.theConstraints[i].compute(copyDevice) except Exception: constraint_values[i] = float('inf') printIfShown("An error in constraints. NaN value has been set to continue execution. Error:" + traceback.format_exc(), SHOW_DEBUG) valuesToReturn = dict() valuesToReturn["params"] = x valuesToReturn["device"] = copyDevice valuesToReturn["time"] = time.time()-self.startingTime valuesToReturn["objectives"] = objective_values valuesToReturn["constraints"] = constraint_values return valuesToReturn # objective_values, constraint_values
[docs] def reevaluate_solutions(self, x_solutions): resultsdevices = [None] * len(x_solutions) for i, x_solution in enumerate(x_solutions): self.theMathsToPhysics.fromMathsToPhys(x_solution, self.theDevice, self.theOptimizationVariables) self.theCharacterization.compute(self.theDevice) resultsdevices[i] = copy.deepcopy(self.theDevice) return resultsdevices
[docs]class _ArrayEvaluator(_Evaluator): """Same as _evaluator, using array as inputs. Allows to evaluate all generation at once, in case of numpy array computations. Drawbacks: devices can not be visualized and saved. Use this with care."""
[docs] def evaluate(self, list_of_x): self.theMathsToPhysics.fromMathsToPhys(list_of_x, self.theDevice, self.theOptimizationVariables) objective_values = [objective.compute(self.theDevice) for objective in self.theObjectives] constraint_values = [constraint.compute(self.theDevice) for constraint in self.theConstraints] valuesToReturn = dict() valuesToReturn["objectives"] = [[objective[k] for objective in objective_values] for k in range(len(list_of_x))] valuesToReturn["constraints"] = [[constraint[k] for constraint in constraint_values] for k in range(len(list_of_x))] return valuesToReturn
[docs] def reevaluate_solutions(self, x_solutions): self.theMathsToPhysics.fromMathsToPhys(x_solutions, self.theDevice, self.theOptimizationVariables) self.theCharacterization.compute(self.theDevice) return [self.theDevice]
[docs]def run_optimization(optimizer_settings: OptimizerSettings, opti_historic, max_opti_time_sec=10, return_x_solutions=False, array_evaluator=False, initialVectorGuess=None): """ Perform the optimization. :param optimizer_settings: :class:`OptimizerSettings` containing all information in models, objectives and optimization variable :param opti_historic: OptiHistoric to log evaluations and results :param max_opti_time_sec: Maximum optimization time (default stopping criterion, unless modified in algorithm) :param return_x_solutions: If True, returns raw parameters in reults :param array_evaluator: If True, evaluate each generation at once using numpy array. Use it only with care, as it dismisses some features (expert mode) :return: list of the best optimized devices, convergence information and [if return_x_solutions=True] best solutions """ if array_evaluator: evaluator = _ArrayEvaluator(optimizer_settings) else: evaluator = _Evaluator(optimizer_settings) theOptimizationAlgorithm = optimizer_settings.get_optialgorithm() # Initialize opti algorithms opti_historic.start(optimizer_settings) evaluator.start() if initialVectorGuess is None: initialVectorGuess = evaluator.theMathsToPhysics.fromPhysToMaths(evaluator.theDevice, evaluator.theOptimizationVariables) theOptimizationAlgorithm.set_maxtime(max_opti_time_sec) theOptimizationAlgorithm.set_evaluationFunction(evaluator.evaluate, opti_historic.log_after_evaluation, len(evaluator.theObjectives), len(evaluator.theConstraints), array_evaluator) # Initialize the algorithm theOptimizationAlgorithm.initialize(initialVectorGuess, evaluator.theOptimizationVariables) # Get track of convergence convergence = theOptimizationAlgorithm.get_convergence() opti_historic.set_convergence(convergence) # Start optimization printIfShown("Performing optimization", SHOW_INFO) x_solutions = theOptimizationAlgorithm.compute() printIfShown("Optimization ended", SHOW_INFO) # Manage results success, best_devices = opti_historic.get_best_devices_without_reevaluating(x_solutions) if not success: printIfShown("Could not retrieve best devices from database ... Reevaluating", SHOW_INFO) best_devices = evaluator.reevaluate_solutions(x_solutions) # Set results and save opti_historic.set_results(best_devices) opti_historic.save() if not return_x_solutions: return best_devices, convergence return best_devices, convergence, x_solutions