Source code for optimeed.core.collection
import copy
import json
import os
from abc import abstractmethod, ABCMeta
from threading import Timer, Lock
from optimeed.core.tools import getPath_workspace
from .myjson import obj_to_json, json_to_obj, json_to_obj_safe, encode_str_json, decode_str_json, get_json_module_tree_from_dict, remove_module_tree_from_string, apply_module_tree_to_dict
from .tools import indentParagraph, rgetattr, printIfShown, SHOW_WARNING, SHOW_DEBUG, SHOW_INFO, SHOW_ERROR, delete_indices_from_list
import re
import inspect
from shutil import rmtree
from math import floor, ceil
import linecache
[docs]class SingleObjectSaveLoad:
@staticmethod
def load(filename):
with open(filename, "r") as f:
return json_to_obj(json.load(f))
@staticmethod
def save(theObject, filename):
with open(filename, "w") as f:
f.write(json.dumps(obj_to_json(theObject)))
[docs]class DataStruct_Interface(metaclass=ABCMeta):
@abstractmethod
def save(self, filename):
"""Save the datastructure to filename"""
pass
@abstractmethod
def clone(self, filename):
"""Clone the datastructure to a new location"""
pass
@staticmethod
@abstractmethod
def load(filename, **kwargs):
"""Load the datastructure from filename"""
pass
@staticmethod
def get_extension():
"""File extension used for datastructure"""
return ".json"
[docs]class ListDataStruct_Interface(DataStruct_Interface, metaclass=ABCMeta):
@abstractmethod
def add_data(self, data_in):
pass
@abstractmethod
def get_data_at_index(self, index):
pass
@abstractmethod
def get_data_generator(self):
pass
@abstractmethod
def delete_points_at_indices(self, indices):
"""
Delete several elements from the Collection
:param indices: list of indices to delete
"""
pass
@abstractmethod
def get_nbr_elements(self):
"""
:return: the number of elements contained inside the structure
"""
pass
@abstractmethod
def extract_collection_from_indices(self, theIndices):
"""
Extract data from the collection at specific indices, and return it as new collection
:param theIndices:
:return: A new collection
"""
pass
@staticmethod
def _jsondata_to_obj(item, theClass=None):
return json_to_obj(item) if theClass is None else json_to_obj_safe(item, theClass)
[docs] def get_list_attributes(self, attributeName):
"""
Get the value of attributeName of all the data in the Collection
:param attributeName: string (name of the attribute to get)
:return: list
"""
if not attributeName:
return list()
return [rgetattr(data, attributeName) for data in self.get_data_generator()]
[docs]class AutosaveStruct:
"""Structure that provides automated save of DataStructures"""
def __init__(self, dataStruct, filename='', change_filename_if_exists=True):
if not filename:
self.filename = getPath_workspace() + '/default_collection'
else:
self.filename = os.path.abspath(filename)
self.theDataStruct = dataStruct
self.set_filename(self.filename, change_filename_if_exists)
self.timer = None
[docs] def set_filename(self, filename, change_filename_if_exists):
"""
:param filename: Filename to set
:param change_filename_if_exists: If already exists, create a new filename
"""
if not filename.endswith(self.theDataStruct.get_extension()):
filename += self.theDataStruct.get_extension()
if change_filename_if_exists:
curr_index = 1
head, tail = os.path.split(filename)
base_name = os.path.splitext(tail)[0]
while os.path.exists(filename):
filename = head + '/' + base_name + '_' + str(curr_index) + self.theDataStruct.get_extension()
curr_index += 1
self.filename = filename
[docs] def start_autosave(self, timer_autosave, safe_save=True):
"""Start autosave"""
self.save(safe_save=safe_save)
if timer_autosave > 0:
self.timer = Timer(timer_autosave, lambda: self.start_autosave(timer_autosave, safe_save=safe_save))
self.timer.daemon = True
self.timer.start()
[docs] def save(self, safe_save=True):
"""Save"""
if not os.path.exists(os.path.dirname(self.filename)):
os.makedirs(os.path.dirname(self.filename))
if os.path.exists(self.filename) and safe_save:
temp_file = self.filename + 'temp'
self.theDataStruct.save(temp_file)
try:
os.replace(temp_file, self.filename)
except FileNotFoundError:
pass
else:
self.theDataStruct.save(self.filename)
[docs] def get_datastruct(self):
"""Return :class:'~DataStruct_Interface'"""
return self.theDataStruct
[docs]class ListDataStruct(ListDataStruct_Interface):
_DATA_STR = "data"
_COMPRESS_SAVE_STR = "module_tree"
def __init__(self, compress_save=False):
"""
Structure of data containing a single list of objects.
:param compress_save: Set to True if all the data have the same class and module names, so that emitted file is more efficient
"""
super().__init__()
self.theData = list()
self.compress_save = compress_save
self.json_module_tree = ''
[docs] def clone(self, filename):
self.save(filename) # Same behaviour as 'save': all the memory is dumped
[docs] def save(self, filename):
"""Save data using json format. The data to be saved are automatically detected, see :meth:`~optimeed.core.myjson.obj_to_json` """
theStr = self._format_str_save()
with open(filename, "w", encoding='utf-8') as f:
f.write(theStr)
[docs] def extract_collection_from_indices(self, indices):
"""Extract data from the collection at specific indices, and return it as new collection"""
new_collection = ListDataStruct()
for index in indices:
new_collection.add_data(self.get_data_at_index(index))
return new_collection
[docs] def _format_str_save(self):
"""Save data using json format. The data to be saved are automatically detected, see :meth:`~optimeed.core.myjson.obj_to_json` """
# Header
theStr = "{\n"
theStr += '\t"{}": \n{}'.format(ListDataStruct._DATA_STR, indentParagraph(self._format_data_lines(), 3)) # Core lines
# if compress save: add module and class tree at end of file
if self.compress_save:
self._get_json_module_tree()
theStr += '\t,"{}": {}\n'.format(ListDataStruct._COMPRESS_SAVE_STR, self.json_module_tree)
theStr += "}"
return theStr
[docs] def _format_data_lines(self):
# Format data as json object
listStr = "[\n"
has_previous_element = False
for k, item in enumerate(self.get_data_generator()):
obj_str = ""
if has_previous_element: # Add separator from previous element
obj_str += ",\n"
obj_str += "\t" + json.dumps(obj_to_json(item))
# if compress save: remove all information relative to module and class
if self.compress_save:
remove_module_tree_from_string(obj_str)
listStr += obj_str
has_previous_element = True
listStr += "\n]"
return listStr
[docs] def _get_json_module_tree(self):
if not self.json_module_tree and len(self.theData):
new_module_tree = json.dumps(get_json_module_tree_from_dict(obj_to_json(self.theData[-1])))
if len(new_module_tree) > len(self.json_module_tree):
self.json_module_tree = new_module_tree
@staticmethod
def load(filename, theClass=None):
"""
Load the file filename.
:param filename: file to load
:param theClass: optional. Can be used to fix unpickling errors.
:return: self-like object
"""
if filename:
if not os.path.splitext(filename)[-1].lower():
filename += ListDataStruct.get_extension()
try:
newStruct = ListDataStruct()
with open(filename, 'r', encoding="utf-8", newline="\n") as f:
theDict = json.load(f)
# If datastruct has been compressed,
try:
theModuleTree = theDict[ListDataStruct._COMPRESS_SAVE_STR]
for item in theDict[ListDataStruct._DATA_STR]:
apply_module_tree_to_dict(theModuleTree, item)
except KeyError:
pass
if theClass is not None:
theData = [json_to_obj_safe(item, theClass) for item in theDict[ListDataStruct._DATA_STR]]
else:
theData = [json_to_obj(item) for item in theDict[ListDataStruct._DATA_STR]]
newStruct.set_data(theData)
return newStruct
# except TypeError as e:
# printIfShown("{}, recommended to use theclass keyword in method load".format(e), SHOW_WARNING)
except OSError:
printIfShown("Could not read file ! " + filename, SHOW_WARNING)
except EOFError:
printIfShown("File empty ! " + filename, SHOW_WARNING)
else:
printIfShown("INVALID COLLECTION FILENAME: " + filename, SHOW_WARNING)
[docs] def add_data(self, data_in):
"""Add a data to the list"""
self.theData.append(copy.deepcopy(data_in))
[docs] def get_data(self):
"""Get full list of datas"""
printIfShown("get_data() deprecated! Please use get_data_at_index or get_data_generator whenever possible", SHOW_WARNING)
inspect.stack()
return self.theData
[docs] def get_data_generator(self):
"""Get a generator to all the data stored"""
for k in range(len(self)):
yield self.get_data_at_index(k)
[docs] def set_data_at_index(self, data_in, index):
"""Replace data at specific index"""
self.theData[index] = data_in
[docs] def extract_collection_from_attribute(self, attributeName):
"""Convenience class to create a sub-collection from an attribute of all the items.
:param attributeName: Name of the attribute to extract
:return: ListDataStruct
"""
subdata = self.get_list_attributes(attributeName)
newListDataStruct = ListDataStruct()
newListDataStruct.set_data(subdata)
return newListDataStruct
# def set_attribute_data(self, the_attribute, the_value):
# """Set attribute to all data"""
# for item in self.get_data():
# setattr(item, the_attribute, the_value)
#
# def set_attribute_equation(self, attribute_name, equation_str):
# """
# Advanced method to set the value of attribute_name from equation_str
#
# :param attribute_name: string (name of the attribute to set)
# :param equation_str: formatted equation, check :meth:`~optimeed.core.CommonFunctions_Library.applyEquation`
# :return:
# """
# for item in self.get_data():
# setattr(item, attribute_name, applyEquation(item, equation_str))
[docs] def delete_points_at_indices(self, indices):
"""
Delete several elements from the Collection
:param indices: list of indices to delete
"""
for index in sorted(indices, reverse=True):
del self.theData[index]
[docs] def merge(self, collection):
"""
Merge a collection with the current collection
:param collection: :class:`~optimeed.core.Collection.Collection` to merge
"""
for item in collection.get_data():
self.add_data(item)
theLock = Lock()
[docs]class Performance_ListDataStruct(ListDataStruct_Interface):
_NBR_ELEMENTS = "nbr_elements"
_STACK_SIZE = "stack_size"
_COMPRESS_SAVE_STR = "module_tree"
def __init__(self, stack_size=500):
"""
This class is an extension to the classical :class:`ListDataStruct`.
It is designed to be well suited to opimization: split huge files into smaller ones, efficient CPU usage, efficient RAM usage.
:param stack_size: Number of elements to store on each file
"""
super().__init__()
self.compress_save = True
self.data_buffer = list()
self.curr_elements_in_last_file = 0
self.curr_number_of_files = 1
self.has_been_initialized = False
self.stack_size = stack_size
self.json_module_tree = ''
self._mainfilename = ''
[docs] def _initialize(self, filename):
rmtree(self._get_subfolder_path(filename), ignore_errors=True)
os.makedirs(self._get_subfolder_path(filename))
self._mainfilename = filename
self.has_been_initialized = True
@staticmethod
def _get_subfolder_path(main_filename):
folder_main = os.path.dirname(main_filename)
filename = os.path.basename(main_filename)
sub_folder, _ = os.path.splitext(filename)
sub_folder += '_datafiles'
return os.path.join(folder_main, sub_folder)
[docs] def _get_list_from_file(self, filenumber):
for file in range(filenumber, self.curr_number_of_files):
with open(self._get_path_from_filenumber(self._mainfilename, filenumber)) as f:
return f.read().splitlines()
@staticmethod
def _get_path_from_filenumber(main_filename, filenumber):
return os.path.join(Performance_ListDataStruct._get_subfolder_path(main_filename), "data_{}".format(filenumber))
@staticmethod
def split_list(numb_in_list, stack_size, thing_to_hash):
# Returns [index_begin, index_end, lefts] to hash
item_put_in_list = stack_size - numb_in_list
item_still_to_be_put = len(thing_to_hash)
max_number_to_put_in_list = min(item_still_to_be_put, item_put_in_list)
hashed = [(0, max_number_to_put_in_list, numb_in_list + max_number_to_put_in_list)]
if len(thing_to_hash) > item_put_in_list:
returned_list = Performance_ListDataStruct.split_list(0, stack_size, thing_to_hash[item_put_in_list:])
for k, item in enumerate(returned_list):
index_begin_next, index_end_next, max_number_to_put_in_list = item
mapped_index_begin = index_begin_next + item_put_in_list
mapped_index_end = index_end_next + item_put_in_list
returned_list[k] = (mapped_index_begin, mapped_index_end, max_number_to_put_in_list)
hashed.extend(returned_list)
return hashed
return hashed
[docs] def extract_collection_from_indices(self, indices):
"""Extract data from the collection at specific indices, and return it as new collection"""
new_collection = Performance_ListDataStruct()
new_collection.json_module_tree = self.json_module_tree
for index in indices:
new_collection.data_buffer.append(self._get_json_str_at_index(index))
return new_collection
[docs] def clone(self, filename):
self.save(filename) # Dump buffer to files
theListDataCollection = self.load_as_listdatastruct(self._mainfilename) # Recover files content
theListDataCollection.clone(filename) # Save it
[docs] def _get_str_mainfile(self):
theDict = {self._NBR_ELEMENTS: self.get_total_nbr_elements(False),
self._STACK_SIZE: self.stack_size,
self._COMPRESS_SAVE_STR: self.json_module_tree}
return json.dumps(theDict, indent=4)
[docs] def get_total_nbr_elements(self, count_unsaved=True):
nbr_elements_full_files = max(0, self.stack_size * (self.curr_number_of_files-1))
nbr_elements_last_file = self.curr_elements_in_last_file
unsaved_elements = len(self.data_buffer) if count_unsaved else 0
return nbr_elements_full_files + nbr_elements_last_file + unsaved_elements
[docs] def add_data(self, theData):
"""Add data to the collection"""
theDict = obj_to_json(theData)
theStr = json.dumps(theDict)
if self.compress_save:
theStr = remove_module_tree_from_string(theStr)
self._save_moduletree(theDict)
global theLock
with theLock:
self.data_buffer.append(theStr)
[docs] def add_json_data(self, theStr):
"""Add already deserialized data to the collection"""
if self.compress_save:
# If class does not have module tree => Get it
if not self.json_module_tree:
theDict = json.loads(theStr)
self._save_moduletree(theDict)
theStr = remove_module_tree_from_string(theStr)
global theLock
with theLock:
self.data_buffer.append(theStr)
[docs] def _save_moduletree(self, theDict):
if not self.json_module_tree:
if not theDict == "null":
self.json_module_tree = json.dumps(get_json_module_tree_from_dict(theDict))
[docs] def _map_index_to_file(self, index):
filenumber = int(floor(index / self.stack_size))
linenumber = index - filenumber * self.stack_size
return filenumber, linenumber
[docs] def _get_json_str_at_index(self, index, refresh_cache=False):
"""Internal method to return the json string at index"""
if self.get_total_nbr_elements(count_unsaved=True) < index:
raise IndexError
elif index >= self.get_total_nbr_elements(count_unsaved=False): # data in buffer
json_str = self.data_buffer[index - self.get_total_nbr_elements(count_unsaved=False)]
else: # data in files
filenumber, linenumber = self._map_index_to_file(index)
filename = self._get_path_from_filenumber(self._mainfilename, filenumber)
if refresh_cache:
linecache.checkcache(filename)
json_str = linecache.getline(filename, linenumber + 1).strip() # Open file and retrieve specific line
return json_str
[docs] def get_attribute_value_at_index_fast(self, attribute, index):
"""Experimental method to extract the value of an attribute without converting the string to an object.
It is based on regex search, and will fail if the element is the last in string.
:param attribute: attribute to search
:param index: index of the element in collection
:return:
"""
theStr = self._get_json_str_at_index(index)
regex = r"\"{}\":\s(.*?),\s\"".format(attribute) # Find XXX is between "{attribute}": XXXX,"
try:
return re.findall(regex, theStr)[0]
except IndexError:
print(theStr, index)
[docs] def reorder(self, permutations):
"""Reorder collection accordingly to permutations.
E.G, permutations [0,2,1] with collection elems [0,3,2] => collection elems = [0,2,3]
:param permutations:
:return: /
"""
nbr_elems = self.get_total_nbr_elements(count_unsaved=True)
all_data_str = [""]*nbr_elems
for index in range(nbr_elems):
all_data_str[index] = self._get_json_str_at_index(index)
indices, ordered_all_data_str = (list(t) for t in zip(*sorted(zip(permutations, all_data_str))))
self.data_buffer = ordered_all_data_str
self.curr_elements_in_last_file = 0
self.curr_number_of_files = 1
self.has_been_initialized = False
[docs] def get_data_at_index(self, index, ignore_attributes=None, none_if_error=False):
"""Same as parent, with additional kwargs
:param index:
:param ignore_attributes: ignore attributes to deserialize (list)
:param none_if_error:
:return:
"""
try:
json_str = self._get_json_str_at_index(index)
if not json_str:
json_str = self._get_json_str_at_index(index, refresh_cache=True)
try:
item = json.loads(json_str)
apply_module_tree_to_dict(json.loads(self.json_module_tree), item, raiseError=none_if_error)
if ignore_attributes is not None:
for attribute in ignore_attributes:
item.pop(attribute, None)
except json.decoder.JSONDecodeError:
printIfShown("Could not json.loads of item at index {} : \n{}".format(index, json_str), SHOW_WARNING)
if none_if_error:
return None
raise
except TypeError:
return None
return json_to_obj(item)
# LOAD/SAVES
[docs] def save(self, filename):
global theLock
with theLock:
if not len(self.data_buffer):
return
if not self.has_been_initialized:
self._initialize(filename)
# Hash data to write
splitlists = self.split_list(self.curr_elements_in_last_file, self.stack_size, self.data_buffer)
for index_begin, index_end, numberofelem in splitlists:
data_to_write = self.data_buffer[index_begin:index_end]
self.curr_elements_in_last_file = numberofelem
if data_to_write:
theStr = '\n'.join(data_to_write)
if os.path.exists(self._get_path_from_filenumber(filename, self.curr_number_of_files - 1)):
theStr = "\n" + theStr
with open(self._get_path_from_filenumber(filename, self.curr_number_of_files - 1), 'a+') as f:
f.write(theStr)
# Increment number of elements or files
if numberofelem == self.stack_size:
self.curr_number_of_files += 1
self.curr_elements_in_last_file = 0
else:
self.curr_elements_in_last_file = numberofelem
# Write main file
with open(filename, "w") as f:
f.write(self._get_str_mainfile())
self.data_buffer = list()
@staticmethod
def load(filename, **kwargs):
with open(filename, 'r', encoding="utf-8") as f:
theDict = json.load(f)
newStruct = Performance_ListDataStruct()
newStruct.stack_size = theDict[Performance_ListDataStruct._STACK_SIZE]
nbr_elements = theDict[Performance_ListDataStruct._NBR_ELEMENTS]
number_of_files = int(ceil(nbr_elements/newStruct.stack_size))
number_of_elements_last_file = nbr_elements - (number_of_files-1)*newStruct.stack_size
newStruct.stack_size = theDict[Performance_ListDataStruct._STACK_SIZE]
newStruct.curr_elements_in_last_file = number_of_elements_last_file
newStruct.curr_number_of_files = number_of_files
newStruct.has_been_initialized = True
newStruct.json_module_tree = theDict[Performance_ListDataStruct._COMPRESS_SAVE_STR]
newStruct._mainfilename = filename
return newStruct
@staticmethod
def load_as_listdatastruct(filename, **kwargs):
"""Return :class:`ListDataStruct`. Serializes all objects of collection"""
# 1) Save temporary file formated like ListDataStruct
with open(filename, 'r', encoding="utf-8", newline='\n') as f:
theDict = json.load(f)
stack_size = theDict[Performance_ListDataStruct._STACK_SIZE]
nbr_elements = theDict[Performance_ListDataStruct._NBR_ELEMENTS]
number_of_files = int(ceil(nbr_elements/stack_size))
lines = list()
for file_number in range(number_of_files):
with open(Performance_ListDataStruct._get_path_from_filenumber(filename, file_number), 'r', encoding='utf-8', errors='ignore') as f:
for line in f:
if nbr_elements <= len(lines):
break
line = line.rstrip()
if not line:
continue
if line.isprintable():
lines.append(line)
else:
printIfShown("Removed line {}".format(line), SHOW_WARNING)
# lines += list(filter(None, (line.rstrip() for line in f))) # Non-blank lines
printIfShown("loaded {} %".format(int(file_number/number_of_files*100)), SHOW_INFO)
theStr = ''
theStr += '{\n'
# noinspection PyProtectedMember
theStr += '\t"{}": {},'.format(ListDataStruct._COMPRESS_SAVE_STR, theDict[Performance_ListDataStruct._COMPRESS_SAVE_STR])
# noinspection PyProtectedMember
theStr += '\t"{}": [\n{}\n]'.format(ListDataStruct._DATA_STR, "\n,".join(lines))
theStr += '}'
tempfile = filename + "_temp"
with open(tempfile, "w", encoding="utf-8") as f:
f.write(theStr)
# 2) Use ListDataStruct load function
theDataStruct = ListDataStruct.load(tempfile, **kwargs)
os.remove(tempfile)
printIfShown("Done loading", SHOW_INFO)
return theDataStruct
[docs] def get_data_generator(self, **kwargs):
for k in range(self.get_total_nbr_elements(count_unsaved=True)):
yield self.get_data_at_index(k, kwargs)
[docs] def set_data_at_index(self, data_in, index):
"""Replace data at specific index"""
self.set_data_at_indices([data_in], [index])
[docs] def set_data_at_indices(self, data_list, indices):
"""Replace datas at specific indices
:param data_list: list of objects to set to the collection, at specific indices
:param indices: list of indices
:return:
"""
indices, data_list = (list(t) for t in zip(*sorted(zip(indices, data_list))))
if len(data_list) != len(indices):
printIfShown("Unconsistent length in set_data_at_indices", SHOW_ERROR)
else:
if len(self.data_buffer):
printIfShown("Buffer of collection is not empty. Please use save function first !", SHOW_ERROR)
# First, convert data to something readable
self.data_buffer = list()
for data in data_list:
self.add_data(data)
previous_filenumber = -1
data = list()
for i, index in enumerate(indices):
filenumber, _ = self._map_index_to_file(index)
if filenumber != previous_filenumber:
# Started new file => save previous data
if previous_filenumber >= 0:
with open(self._get_path_from_filenumber(self._mainfilename, previous_filenumber), "w") as f:
f.write('\n'.join(data))
# And load new file
data = self._get_list_from_file(filenumber)
previous_filenumber = filenumber
start_index = filenumber * self.stack_size
data[index - start_index] = self.data_buffer[i]
# Final save
with open(self._get_path_from_filenumber(self._mainfilename, previous_filenumber), "w") as f:
f.write('\n'.join(data))
self.data_buffer = list()
[docs] def delete_points_at_indices(self, indices):
if not self.has_been_initialized:
for index in sorted(indices, reverse=True):
del self.data_buffer[index]
return
if len(self.data_buffer):
printIfShown("Buffer of collection is not empty. Please use save function first !", SHOW_ERROR)
previous_filenumber = self.curr_number_of_files
first_index = min(indices)
start_filenumber, _ = self._map_index_to_file(first_index)
start_index = start_filenumber * self.stack_size
all_data = list()
for filenumber in range(start_filenumber, self.curr_number_of_files):
try:
all_data += self._get_list_from_file(filenumber)
except FileNotFoundError:
pass
delete_indices_from_list([index - start_index for index in indices], all_data)
self.data_buffer = all_data
self.curr_elements_in_last_file = 0
self.curr_number_of_files = start_filenumber+1
# Delete previous files
for filenumber in range(start_filenumber, previous_filenumber):
if os.path.exists(self._get_path_from_filenumber(self._mainfilename, filenumber)):
os.remove(self._get_path_from_filenumber(self._mainfilename, filenumber))
# Save
self.save(self._mainfilename)