import ast
import inspect
from abc import ABCMeta
# from typing import Dict, List, Tuple
try:
from typing import GenericMeta as _Generic
except ImportError:
from typing import _GenericAlias as _Generic
import traceback
import numpy as np
from optimeed.core.tools import rgetattr, rsetattr, printIfShown, SHOW_ERROR, SHOW_WARNING
import sys
import os
import re
"""
Expected behaviour:
Object attributes values can either be of "known type" (=> type hinted) or "unknown type" (=> Saveable object).
- If type hinted, then save/load is pretty straightforward:
1. save module/class of parent
2. Get typehinted attributes
3. For each attribute, only save the value
----
1. load module/class of parent
2. Recover from annotations where the module/class of attributes are
3. For each attribute, cast to this module/class
- If not, then save/load:
1. Save module/class of parent
2. For each attribute to save (get_additional_attributes_to_save):
3. Save the value AND its module/class
----
1. load module/class of parent
2. Recover from parent the attributes that have been saved
3. For each attribute, reconstitute the object using the added module/class information during save stage
"""
MODULE_TAG = '__module__'
CLASS_TAG = '__class__'
EXCLUDED_TAGS = [CLASS_TAG, MODULE_TAG]
[docs]def getExecPath():
try:
sFile = os.path.abspath(sys.modules['__main__'].__file__)
except:
sFile = sys.executable
basename = os.path.basename(sFile)
return os.path.splitext(basename)[0]
[docs]class SaveableObject(metaclass=ABCMeta):
"""Abstract class for dynamically type-hinted objects.
This class is to solve the special case where the exact type of an attribute is not known before runtime, yet has to be saved."""
[docs] def get_additional_attributes_to_save(self):
"""Return list of attributes corresponding to object, whose type cannot be determined statically (e.g. topology change)"""
return list()
[docs] def get_additional_attributes_to_save_list(self):
"""Same behavior as get_additional_attributes_to_save, but where the attributes contains list of unknown items"""
return list()
[docs]def _isclass(theObject):
"""Extends the default isclass method with typing"""
if isinstance(theObject, type):
return True
# Test if comes from typing class
return isinstance(theObject, _Generic)
[docs]def get_type_class(typ):
"""Get the type of the class. used to compare objects from Typing."""
try:
# Python 3.5 / 3.6
return typ.__extra__
except AttributeError:
try:
# Python 3.7
return typ.__origin__
except AttributeError:
return typ
[docs]def _get_object_class(theObj):
return theObj.__class__.__qualname__
[docs]def _get_object_module(theObj):
module = theObj.__class__.__module__
if module == str.__class__.__module__:
return None
if module == "__main__":
return getExecPath()
return module
[docs]def _object_to_FQCN(theobj):
"""Gets module path of object"""
module = theobj.__class__.__module__
if module is None or module == str.__class__.__module__:
return theobj.__class__.__qualname__
return module + '.' + theobj.__class__.__qualname__
#
# return theobj.__module__ + '.' + theobj.__class__.__name__
[docs]def _find_class(moduleName, className):
import importlib
splitted_name = className.split(".")
begin = True
parentClass = None
for name in splitted_name:
if begin:
parentClass = getattr(importlib.import_module(moduleName), name)
begin = False
else:
parentClass = getattr(parentClass, name)
return parentClass
[docs]def json_to_obj(json_dict):
"""Convenience class to create object from dictionary. Only works if CLASS_TAG is valid
:param json_dict: dictionary loaded from a json file.
:raise TypeError: if class can not be found
:raise KeyError: if CLASS_TAG not present in dictionary
"""
try:
cls = _find_class(json_dict[MODULE_TAG], json_dict[CLASS_TAG])
return json_to_obj_safe(json_dict, cls)
except TypeError:
raise
except KeyError:
raise KeyError("Object class not defined, can not create data. Use from_json_safe method to specify which class to load.")
[docs]def json_to_obj_safe(json_dict, cls):
"""Safe class to create object from dictionary.
:param json_dict: dictionary loaded from a json file
:param cls: class object to instantiate with dictionary
"""
type_class = get_type_class(cls)
if issubclass(type_class, list) or issubclass(type_class, tuple):
list_type = cls.__args__[0]
instance = list()
for value in json_dict:
instance.append(json_to_obj_safe(value, list_type))
if issubclass(type_class, list):
return instance
else:
return tuple(instance)
elif issubclass(type_class, dict):
instance = dict()
for key, value in json_dict.items():
key_type = cls.__args__[0]
val_type = cls.__args__[1]
# Check key
if issubclass(key_type, str):
key_instance = key
else:
key_instance = json_to_obj_safe(ast.literal_eval(key), key_type)
# Check value
try:
isdict = issubclass(val_type, dict)
except TypeError:
isdict = False
value_instance = value if isdict else json_to_obj_safe(value, val_type)
# Updates
instance.update({key_instance: value_instance})
return instance
elif issubclass(type_class, (float, int, complex, bool, str)):
return json_dict
elif issubclass(type_class, np.ndarray):
return np.array(json_dict)
else: # Object must be instantiated
theInstance = _instantiates_annotated_object(json_dict, cls)
return theInstance
[docs]def _instantiates_annotated_object(_json_dict, _cls):
# annotations: dict = _cls.__annotations__ if hasattr(_cls, '__annotations__') else None
# Instantiate the object
entranceParams = []
try:
params = inspect.signature(_cls).parameters
for param in params.values():
if param.default is param.empty:
if param.annotation is not param.empty:
entranceParams.append(_instantiates_annotated_object({}, param.annotation))
else:
entranceParams.append(None)
else:
entranceParams.append(param.default)
except ValueError:
pass
try:
instance = _cls(*entranceParams)
except TypeError:
printIfShown("Error while loading:\n{}".format(traceback.format_exc()), SHOW_ERROR)
return None
annotations = _get_annotations(_cls)
# Set object attributes from data items
# # Special care for option classes:
# if issubclass(_cls, Option_class):
# # Take all attributes associated to options
# for additionalAttribute in Option_class.get_option_attributes():
# # If they have been saved (because options exist)
# if additionalAttribute in _json_dict:
# # Retrieve saved values
# value = _json_dict[additionalAttribute]
# field_type = annotations.get(additionalAttribute)
# # Set them
# rsetattr(instance, additionalAttribute, json_to_obj_safe(value, field_type))
# # Reinitialize instance. That will NOT reinitialize the options due to its "if not hasattr" in init:)
# instance.__init__(*entranceParams)
# Special care for annotations
if annotations:
for name, value in _json_dict.items():
if name not in EXCLUDED_TAGS:
field_type = annotations.get(name)
if _isclass(field_type) and isinstance(value, (dict, tuple, list, set, frozenset)):
try:
rsetattr(instance, name, json_to_obj_safe(value, field_type))
except AttributeError:
printIfShown("Failed to instantiate subfield **{}** with value **{}**".format(name, value, SHOW_WARNING))
else:
rsetattr(instance, name, value)
# Special care for Savable classes
if isinstance(instance, SaveableObject):
# Manage direct unknown attribute
for additionalAttribute in instance.get_additional_attributes_to_save():
try:
theSubDict = _json_dict[additionalAttribute]
if isinstance(theSubDict, (float, int, complex, bool, str)): # Smallest quantity => use it has it is
rsetattr(instance, additionalAttribute, theSubDict)
else:
try:
rsetattr(instance, additionalAttribute, json_to_obj(theSubDict))
except TypeError:
printIfShown("Failed to instantiate attribute **{}**".format(additionalAttribute, theSubDict), SHOW_WARNING)
except KeyError:
printIfShown("Attribute {} was not saved. Used default value.".format(additionalAttribute), SHOW_WARNING)
# Manage list of unknown attributes
for additionalAttribute in instance.get_additional_attributes_to_save_list():
list_of_json = _json_dict[additionalAttribute]
list_of_obj = list()
for theSubDict in list_of_json:
try:
list_of_obj.append(json_to_obj(theSubDict))
except TypeError:
printIfShown("Failed to instantiate attribute **{}**".format(additionalAttribute, theSubDict), SHOW_WARNING)
rsetattr(instance, additionalAttribute, list_of_obj)
return instance
[docs]def _get_annotations(theObj):
"""Return annotated attributes (theObj being the type of the object)"""
annotations: dict = theObj.__annotations__ if hasattr(theObj, '__annotations__') else dict()
parents = inspect.getmro(theObj)[1:] # Starting index 1 because 0 is self
for parentClass in parents:
if parentClass is not None and parentClass != object:
theAnnotations = _get_annotations(parentClass)
annotations.update(theAnnotations)
return annotations
[docs]def obj_to_json(theObj):
"""Extract the json dictionary from the object. The data saved are automatically detected, using typehints.
ex: x: int=5 will be saved, x=5 won't.
Inheritance of annotation is managed by this function
"""
def _to_json(recObj, is_first=False):
"""Use of is_first to save module class name"""
if isinstance(recObj, list) or isinstance(recObj, tuple):
theList = list()
for elem in recObj:
theList.append(_to_json(elem, is_first=is_first))
return theList
elif isinstance(recObj, dict):
output_dict = dict()
for key in list(recObj):
output_dict[str(key)] = _to_json(recObj[key])
return output_dict
elif isinstance(recObj, int) or isinstance(recObj, float):
return recObj
elif isinstance(recObj, str):
return recObj
elif np.issubdtype(type(recObj), np.integer) or np.issubdtype(type(recObj), np.floating):
return float(recObj)
elif isinstance(recObj, bool):
return str(recObj).lower()
elif isinstance(recObj, np.ndarray):
return _to_json(recObj.tolist())
elif recObj is None:
return "null"
else: # Item is a user-defined object
output_dict = dict()
if recObj is not None:
if is_first:
output_dict[MODULE_TAG] = _get_object_module(recObj)
output_dict[CLASS_TAG] = _get_object_class(recObj)
for attribute, is_first in _get_attributes_to_save(recObj):
try:
value = rgetattr(recObj, attribute)
output_dict[attribute] = _to_json(value, is_first=is_first)
except AttributeError:
pass
return output_dict
return _to_json(theObj, is_first=True)
[docs]def _get_attributes_to_save(theObj):
"""Return list (attribute, is_first)"""
list_1 = list()
list_2 = list()
# List 1
annotations = _get_annotations(type(theObj))
if annotations is not None:
list_1 = list(zip(annotations, [False]*len(annotations)))
# List 2
if isinstance(theObj, SaveableObject):
additional_attributes = theObj.get_additional_attributes_to_save() + theObj.get_additional_attributes_to_save_list()
list_2 = list(zip(additional_attributes, [True]*len(additional_attributes)))
return list_1 + list_2
# def get_json_module_tree(theObj):
# """Return dict containing {CLASS_TAG: "class_name", MODULE_TAG: "module_name", "attribute1":{"class_name": "module_name", ...}}"""
# def _nested_module_tree(recObj):
# theDict = dict()
# theClass = _get_object_class(recObj)
# theModule = _get_object_module(recObj)
# if theModule is not None:
# theDict[CLASS_TAG] = theClass
# theDict[MODULE_TAG] = theModule
# for attribute, _ in _get_attributes_to_save(recObj):
# nestedObj = rgetattr(recObj, attribute)
# nested_subtree = _nested_module_tree(nestedObj)
# if nested_subtree:
# theDict[attribute] = nested_subtree
# return theDict
# return _nested_module_tree(theObj)
[docs]def get_json_module_tree_from_dict(jsonDict):
"""Return dict containing {CLASS_TAG: "class_name", MODULE_TAG: "module_name", "attribute1":{"class_name": "module_name", ...}}"""
def _nested_module_tree(recDict):
theDict = dict()
theClass = recDict.get(CLASS_TAG)
theModule = recDict.get(MODULE_TAG)
if theModule is not None:
theDict[CLASS_TAG] = theClass
theDict[MODULE_TAG] = theModule
for key in recDict:
nestedObj = recDict[key]
if isinstance(nestedObj, dict):
nested_subtree = _nested_module_tree(nestedObj)
if nested_subtree:
theDict[key] = nested_subtree
return theDict
return _nested_module_tree(jsonDict)
[docs]def remove_module_tree_from_string(theStr):
"""Used to compress string by removing __module__ and __class__ entries (used with get_json_module_tree_from_dict)"""
str_copy = str(theStr)
str_copy = re.sub("\s*\"__module__\" *: *(\"(.*?)\"(,|\s|)|\s*\{(.*?)\}(,|\s|))", '', str_copy)
str_copy = re.sub("\s*\"__class__\" *: *(\"(.*?)\"(,|\s|)|\s*\{(.*?)\}(,|\s|))", '', str_copy)
return str_copy
[docs]def apply_module_tree_to_dict(nestedTree, nestedObject, raiseError=False):
"""Restore __module__ and __class__ entries from nestedTree in nestedDict"""
for key in nestedTree:
if key == CLASS_TAG or key == MODULE_TAG:
try:
nestedObject[key] = nestedTree[key]
except TypeError as e:
printIfShown("(following error when loading, might come from numpy elementary types or 'null' values) {}".format(nestedTree, e), SHOW_WARNING)
if raiseError:
raise
else:
if isinstance(nestedObject, dict):
apply_module_tree_to_dict(nestedTree[key], nestedObject[key])
[docs]def encode_str_json(theStr):
return theStr.__repr__().replace("\\", "\\\\")[1:-1]
[docs]def decode_str_json(theStr):
return str(theStr.__repr__().replace("\\\\", "\\"))