Source code for optimeed.consolidate.sensitivity_analysis

import shutil
from abc import ABCMeta, abstractmethod
from optimeed.core import SaveableObject, AutosaveStruct, create_unique_dirname, SHOW_INFO, printIfShown, Performance_ListDataStruct, ListDataStruct
from typing import List
from .sensitivity_analysis_evaluation import evaluate
from optimeed.core import SingleObjectSaveLoad
from optimeed.core import getPath_workspace, SHOW_ERROR, order_lists
from optimeed.optimize import Real_OptimizationVariable
from multiprocessing import Pool
import math
import numpy as np
import os


_filename_sensitivityparams = "sensitivity_params.json"
_foldername_embarrassingly_parallel_results = "_jobs_results"
_filename_sensitivityresults = "sensitivity.json"


[docs]class SensitivityResults(SaveableObject): paramsToEvaluate: List[float] success: bool index: int def __init__(self): self.paramsToEvaluate = [0.0] self.device = None self.success = False self.index = 0
[docs] def add_data(self, params, device, success, index): self.device = device self.success = success self.paramsToEvaluate = params self.index = index
[docs] def get_additional_attributes_to_save(self): return ["device"]
[docs]class SensitivityParameters(SaveableObject): list_of_optimization_variables: List[Real_OptimizationVariable] param_values: List[List[float]] def __init__(self, param_values, list_of_optimization_variables, theDevice, theMathsToPhys, theCharacterization): """ These are the parameters t :param list_of_optimization_variables: list of OptiVariables that are analyzed :param theDevice: / :param theMathsToPhys: / :param theCharacterization: /""" self.theDevice = theDevice self.theMathsToPhys = theMathsToPhys self.theCharacterization = theCharacterization self.list_of_optimization_variables = list_of_optimization_variables self.param_values = param_values
[docs] def get_device(self): return self.theDevice
[docs] def get_M2P(self): return self.theMathsToPhys
[docs] def get_charac(self): return self.theCharacterization
[docs] def get_optivariables(self): return self.list_of_optimization_variables
[docs] def get_paramvalues(self): return self.param_values
[docs] def get_additional_attributes_to_save(self): return ["theDevice", "theMathsToPhys", "theCharacterization"]
[docs]class Restrained_SensitivityParameters(SensitivityParameters): """Class to perform Sensitivty Analysis on a subset of the full parameters""" def __init__(self, *args): super().__init__(*args) self.selected = list() @staticmethod def create_from_sensitivityParameters(theSensitivityParameters): restrained_SA = Restrained_SensitivityParameters(theSensitivityParameters.get_device(), theSensitivityParameters.get_M2P(), theSensitivityParameters.get_charac(), theSensitivityParameters.get_optivariables(), theSensitivityParameters.get_paramvalues()) return restrained_SA
[docs] def set_selected(self, selection): self.selected = selection
[docs] def get_optivariables(self): init_optivariables = super().get_optivariables() return [init_optivariables[index] for index in self.selected]
[docs] def get_paramvalues(self): init_paramvalues = super().get_paramvalues() return [[line[index] for index in self.selected] for line in init_paramvalues]
[docs]class SensitivityAnalysis_LibInterface(metaclass=ABCMeta): """Interface a library for sensitivity analysis :param theSensitivityParameters: :class:`optimeed.consolidate.sensitivity_analysis.SensitivityParameters` :param theObjectives: array-like objective associated to evaluation, using Sobol sampling""" def __init__(self, theSensitivityParameters: SensitivityParameters, theObjectives): self.theSensitivityParameters = theSensitivityParameters self.theObjectives = theObjectives self.performed = False @staticmethod @abstractmethod def sample_sobol(theOptimizationVariables, N): pass @abstractmethod def get_sobol_S1(self): """ Get first order sobol indices :return: """ pass @abstractmethod def get_sobol_S1conf(self): pass @abstractmethod def get_sobol_S2(self): """ Get second order sobol indices :return: """ pass @abstractmethod def get_sobol_ST(self): """ Get total order sobol indices :return: """ pass @abstractmethod def get_sobol_STconf(self): pass
[docs] def get_summary(self): """Display a summary of the sobol indices""" S1 = self.get_sobol_S1() S1conf = self.get_sobol_S1conf() ST = self.get_sobol_ST() STconf = self.get_sobol_STconf() nb_params = len(self.get_SA_params().get_optivariables()) _, ordered_S1 = order_lists(S1, list(range(nb_params))) _, ordered_ST = order_lists(ST, list(range(nb_params))) ordered_S1.reverse() ordered_ST.reverse() def format_array(permutations, S, Sconf, name): theStr = '' theStr += '─' * 120 + '\n' theStr += "{:^12}{:^14}{:^25}{:<}".format(*["Rank (" + name + ")", "Sobol value", "+- 95% conf", "Param name"]) + '\n' theStr += '─' * 120 + '\n' for i, map_index in enumerate(permutations): row = [i + 1, S[map_index], Sconf[map_index], self.get_SA_params().get_optivariables()[map_index].get_attribute_name()] theStr += "{:^12}{:^14.3f}{:^25.3f}{}".format(*row) + '\n' theStr += '─' * 50 + '\n' theStr += "{:^12}{:^14.3f}{:^25}{:<}".format("SUM", sum(S), "", "") + '\n' return theStr S1_array = format_array(ordered_S1, S1, S1conf, "S1") ST_array = format_array(ordered_ST, ST, STconf, "ST") print(S1_array) print(ST_array)
[docs] def get_convergence_S1(self, stepsize=1): """ Create dictionary for convergence plot - First order index :param stepsize: increments of sampling size :return: Dictionary """ return self._get_convergence(self.get_sobol_S1, stepsize)
[docs] def get_convergence_ST(self, stepsize=1): """ Create dictionary for convergence plot - Total order index :param stepsize: increments of sampling size :return: Dictionary """ return self._get_convergence(self.get_sobol_ST, stepsize)
[docs] def _get_convergence(self, method, stepsize): opti_variables = self.get_SA_params().get_optivariables() nb_params = len(opti_variables) max_nb_step = math.floor(len(self.theObjectives) / (2 * nb_params + 2)) outputs = list() steps = list(range(2, max_nb_step, stepsize)) theObjectives = np.array(self.theObjectives, copy=True) for sample_size in steps: printIfShown("Doing {} over {}".format(sample_size, max_nb_step)) self.set_objectives(np.array(theObjectives[0:sample_size * (2 * nb_params + 2)])) outputs.append(method()) outputs_dict = dict() for i in range(nb_params): outputs_dict[i] = {'S': [output[i] for output in outputs], 'step': steps, 'name': opti_variables[i].get_attribute_name()} self.set_objectives(theObjectives) return outputs_dict
[docs] def get_SA_params(self): return self.theSensitivityParameters
[docs] def set_SA_params(self, theSensitivityParameters): self.theSensitivityParameters = theSensitivityParameters self.performed = False
[docs] def set_objectives(self, theObjectives): self.theObjectives = theObjectives self.performed = False
[docs]def _get_sensitivity_result(output): """Convert output of "evaluate" function to SensitivityResult""" result = SensitivityResults() result.add_data(output["x"], output["device"], output["success"], output["index"]) return result
[docs]def _get_job_args(theSensitivityParameters, index): """Convert sensitivityparameters at index to args used in "evaluate" function""" return [theSensitivityParameters.get_paramvalues()[index], theSensitivityParameters.get_device(), theSensitivityParameters.get_M2P(), theSensitivityParameters.get_charac(), theSensitivityParameters.get_optivariables(), index]
[docs]def _find_missings(theSensitivityParameters, studyname): missings = list() for index, _ in enumerate(theSensitivityParameters.get_paramvalues()): saved_filename = os.path.join(getPath_workspace(), studyname, _foldername_embarrassingly_parallel_results, "{}.json".format(index)) print(saved_filename) if not os.path.exists(saved_filename): missings.append(index) return missings
[docs]def prepare_embarrassingly_parallel_sensitivity(theSensitivityParameters, studyname): """ Initialize sensitivity analysis folder :param theSensitivityParameters: :param studyname: Folder to be created in Workspace :return: """ project_foldername = os.path.join(getPath_workspace(), studyname) foldername_tempfiles = os.path.join(project_foldername, _foldername_embarrassingly_parallel_results) shutil.rmtree(foldername_tempfiles, ignore_errors=True) os.makedirs(foldername_tempfiles) # Also create project_foldername dir SingleObjectSaveLoad.save(theSensitivityParameters, os.path.join(project_foldername, _filename_sensitivityparams)) printIfShown("Files created. There will be {} indices to evaluate".format(len(theSensitivityParameters.get_paramvalues())), SHOW_INFO)
[docs]def launch_embarrassingly_parallel_sensitivity(theSensitivityParameters, studyname, base_index, mult_factor=1): """ Single job launcher for an embarrassingly parallel evaluation :param theSensitivityParameters: :param studyname: Name of the folder in Workspace in which the study is performed :param base_index: start index (Formula: index to evaluate = base_index*mult_factor) :param mult_factor: Multiplication factor of the base_index. Allows to overcome QOSMaxJobPerUserLimit in clusters. :return: """ for offset_index in range(mult_factor): new_index = base_index*mult_factor + offset_index saved_filename = os.path.join(getPath_workspace(), studyname, _foldername_embarrassingly_parallel_results, "{}.json".format(new_index)) if not os.path.exists(saved_filename): output = evaluate(_get_job_args(theSensitivityParameters, new_index)) result = _get_sensitivity_result(output) SingleObjectSaveLoad.save(result, saved_filename)
[docs]def launch_missing_embarrassingly_parallel_sensitivity(theSensitivityParameters, studyname, missing_list, base_index, mult_factor=1): """Same as launch_embarrassingly_parallel_sensitivity, but using the 'missing_list' arg used for mapping""" for offset_index in range(mult_factor): new_index = missing_list[base_index*mult_factor + offset_index] saved_filename = os.path.join(getPath_workspace(), studyname, _foldername_embarrassingly_parallel_results, "{}.json".format(new_index)) if not os.path.exists(saved_filename): output = evaluate(_get_job_args(theSensitivityParameters, new_index)) result = _get_sensitivity_result(output) SingleObjectSaveLoad.save(result, saved_filename)
[docs]def gather_embarrassingly_parallel_sensitivity(theSensitivityParameters, studyname): """ Gather the results. If some are missing, display the indices. :param theSensitivityParameters: :param studyname: :return: """ missings = _find_missings(theSensitivityParameters, studyname) if len(missings): printIfShown("Could not gather results yet, several parameters remain unevaluated:", SHOW_ERROR) printIfShown("{}".format(missings), SHOW_ERROR) exit(-1) results = Performance_ListDataStruct() for index, _ in enumerate(theSensitivityParameters.get_paramvalues()): saved_filename = os.path.join(getPath_workspace(), studyname, _foldername_embarrassingly_parallel_results, "{}.json".format(index)) with open(saved_filename, 'r') as f: theStr = f.read() results.add_json_data(theStr) results.save(os.path.join(getPath_workspace(), studyname, _filename_sensitivityresults))
[docs]def condition_aborted_sensitivities(foldername): theSensitivityParameters = SingleObjectSaveLoad.load(os.path.join(foldername, _filename_sensitivityparams)) myDataStruct = Performance_ListDataStruct().load(os.path.join(foldername, _filename_sensitivityresults)) max_nb_eval = len(theSensitivityParameters.param_values) index_SA_to_index_in_col = [None]*max_nb_eval for index_elem in range(myDataStruct.get_nbr_elements()): index_SA = myDataStruct.get_attribute_value_at_index_fast("index", index_elem) index_SA_to_index_in_col[int(index_SA)] = index_elem max_available_SA = index_SA_to_index_in_col.index(None) trunc_available_SA = index_SA_to_index_in_col[0:max_available_SA] new_col = myDataStruct.extract_collection_from_indices(trunc_available_SA) theSensitivityParameters.param_values = theSensitivityParameters.param_values[0:max_available_SA] # Save items save_dir = "{}_conditioned".format(foldername) shutil.copytree(foldername, save_dir) new_col.save(os.path.join(save_dir, _filename_sensitivityresults)) SingleObjectSaveLoad.save(theSensitivityParameters, os.path.join(save_dir, _filename_sensitivityparams)) printIfShown("Succeeded to condition SA. \n Saved directory: {} \n Number of elems before: {} \n Number of elems in SA: {} \n" .format(save_dir, max_nb_eval, max_available_SA), SHOW_INFO)
[docs]def evaluate_sensitivities(theSensitivityParameters: SensitivityParameters, numberOfCores=2, studyname="sensitivity", indices_to_evaluate=None): """ Evaluate the sensitivities :param theSensitivityParameters: class`~SensitivityParameters` :param numberOfCores: number of core for multicore evaluation :param studyname: Name of the study, that will be the subfolder name in workspace :param indices_to_evaluate: if None, evaluate all param_values, otherwise if list: evaluate subset of param_values defined by indices_to_evaluate :return: collection of class`~SensitivityResults` """ myDataStruct = Performance_ListDataStruct() foldername = create_unique_dirname(os.path.join(getPath_workspace(), studyname)) SingleObjectSaveLoad.save(theSensitivityParameters, os.path.join(foldername, _filename_sensitivityparams)) # Start saving autosaveStruct = AutosaveStruct(myDataStruct, filename=os.path.join(foldername, _filename_sensitivityresults)) autosaveStruct.start_autosave(timer_autosave=60*5) param_values = theSensitivityParameters.get_paramvalues() try: param_values = param_values.tolist() except AttributeError: pass if indices_to_evaluate is None: indices = list(range(len(param_values))) else: indices = indices_to_evaluate param_values = [param_values[index] for index in indices_to_evaluate] # create jobs jobs = [_get_job_args(theSensitivityParameters, index) for index in indices] pool = Pool(numberOfCores) nb_to_do = len(param_values) nb_done = 0 permutations = list() for output in pool.imap_unordered(evaluate, jobs): result = _get_sensitivity_result(output) myDataStruct.add_data(result) permutations.append(output["index"]) nb_done += 1 printIfShown("did {} over {}".format(nb_done, nb_to_do), SHOW_INFO) # save results autosaveStruct.stop_autosave() myDataStruct.reorder(permutations=permutations) autosaveStruct.save() pool.close() pool.join() return myDataStruct
[docs]def evaluate_sensitivities_fast(theSensitivityParameters: SensitivityParameters): """Deactivate multicore and save management for fast results""" myDataStruct = ListDataStruct() [myDataStruct.add_data(_get_sensitivity_result(evaluate(_get_job_args(theSensitivityParameters, index)))) for index in range(len(theSensitivityParameters.get_paramvalues()))] return myDataStruct