import numpy as np
from multiprocessing import Pool
[docs]def _is_feasible(theList):
return np.all(np.array(theList) <= 0)
[docs]class MyMapEvaluator:
def __init__(self, evaluation_function, callback_on_evaluation):
self.callback_on_evaluation = callback_on_evaluation
self.evaluation_function = evaluation_function
[docs] def evaluate_all(self, x):
objectives_pop = [None]*len(x)
constraints_pop = [None]*len(x)
for k, xi in enumerate(x):
returned_value = self.evaluation_function(xi)
self.callback_on_evaluation(returned_value)
objectives_pop[k] = returned_value["objectives"]
constraints_pop[k] = returned_value["constraints"]
return objectives_pop, constraints_pop
[docs]class MyMultiprocessEvaluator:
def __init__(self, evaluation_function, callback_on_evaluation, numberOfCores):
self.callback_on_evaluation = callback_on_evaluation
self.evaluation_function = evaluation_function
self.pool = Pool(numberOfCores)
[docs] def evaluate_all(self, x):
objectives_pop = [None]*len(x)
constraints_pop = [None]*len(x)
for k, returned_value in enumerate(self.pool.imap_unordered(self.evaluation_function, x)):
self.callback_on_evaluation(returned_value)
objectives_pop[k] = returned_value["objectives"]
constraints_pop[k] = returned_value["constraints"]
return objectives_pop, constraints_pop
[docs]class MyArrayEvaluator:
def __init__(self, evaluation_function):
self.evaluation_function = evaluation_function
[docs] def evaluate_all(self, list_of_x):
returned_value = self.evaluation_function(list_of_x)
return returned_value["objectives"], returned_value["constraints"]
[docs]def pso(lb, ub, initialVectorGuess, theEvaluator, terminationCondition, callback_generation=lambda objectives, constraints: None, swarmsize=100, omega=0.5, phip=0.5, phig=0.5):
"""Perform a particle swarm optimization (PSO)
:param lb: List. Lower bounds of each parameter
:param ub: List. Upper bounds of each parameter
:param initialVectorGuess: List. initial vector guess for the solution (to be included inside population)
:param theEvaluator: object define before
:param terminationCondition: Termination condition with shouldTerminated method
:param callback_generation: function lambda (objectives (as list), constraints (as list)) per step
:param swarmsize: Int. The number of particles in the swarm (Default: 100)
:param omega: Float. Particle velocity scaling factor (Default: 0.5)
:param phip: Float. Scaling factor to search away from the particle's best known position (Default: 0.5)
:param phig: Float. Scaling factor to search away from the swarm's best known position (Default: 0.5)
:return: (g) array The swarm's best known position (optimal design). (f) scalar, the objective value at (g)
"""
lb = np.array(lb)
ub = np.array(ub)
vhigh = np.abs(ub - lb)
vlow = -vhigh
# Initialize the particle swarm ############################################
S = swarmsize
D = len(lb) # the number of dimensions each particle has
x = np.random.rand(S, D) # particle positions
# v = np.zeros_like(x) # particle velocities
p = np.zeros_like(x) # best particle positions
# fx = np.zeros(S) # current particle function values
# fs = np.zeros(S, dtype=bool) # feasibility of each particle
fp = np.ones(S)*np.inf # best particle function values
# g = [] # best swarm position
fg = np.inf # best swarm position starting value
fsg = True
# Initialize the particle's position
x = lb + x*(ub - lb)
x[0, :] = initialVectorGuess[:] # inserted initial guess in population
# Calculate objective and constraints for each particle
obj, cons = theEvaluator.evaluate_all(x)
fx, fs = _format_fx_fs(obj, cons)
# Store particle's best position (if constraints are satisfied)
i_update = np.logical_and((fx < fp), fs)
p[i_update, :] = x[i_update, :].copy()
fp[i_update] = fx[i_update]
# Update swarm's best position
i_min = np.argmin(fp)
if fp[i_min] < fg:
fg = fp[i_min]
g = p[i_min, :].copy()
else:
# At the start, there may not be any feasible starting point, so just
# give it a temporary "best" point since it's likely to change
g = x[0, :].copy()
# Initialize the particle's velocity
v = vlow + np.random.rand(S, D)*(vhigh - vlow)
# Iterate until termination criterion met ##################################
while not terminationCondition.shouldTerminate():
rp = np.random.uniform(size=(S, D))
rg = np.random.uniform(size=(S, D))
# Update the particles velocities
v = omega*v + phip*rp*(p - x) + phig*rg*(g - x)
# Update the particles' positions
x = x + v
# Correct for bound violations
maskl = x < lb
masku = x > ub
x = x*(~np.logical_or(maskl, masku)) + lb*maskl + ub*masku
# Update objectives and constraints
obj, cons = theEvaluator.evaluate_all(x)
fx, fs = _format_fx_fs(obj, cons)
callback_generation(obj, cons)
# Store particle's best position (if constraints are satisfied)
i_update = np.logical_and((fx < fp), fs)
p[i_update, :] = x[i_update, :].copy()
fp[i_update] = fx[i_update]
# Compare swarm's best position with global best position
i_min = np.argmin(fp)
if fp[i_min] < fg:
p_min = p[i_min, :].copy()
g = p_min.copy()
fg = fp[i_min]
fsg = fs[i_min]
return g, fg, fsg