"""
Collection of (optimization) algorithms. All entries share a common signature with
optional arguments.
"""
import ast
from scipy.optimize import minimize as minimize
import cma.evolution_strategy as cma
import numpy as np
from typing import Callable
import adaptive
import copy
import warnings
from scipy.optimize import OptimizeResult
import tensorflow as tf
algorithms = dict()
[docs]def algo_reg_deco(func):
"""
Decorator for making registry of functions
"""
algorithms[str(func.__name__)] = func
return func
[docs]@algo_reg_deco
def single_eval(x_init, fun=None, fun_grad=None, grad_lookup=None, options={}):
"""
Return the function value at given point.
Parameters
----------
x_init : float
Initial point
fun : callable
Goal function
fun_grad : callable
Function that computes the gradient of the goal function
grad_lookup : callable
Lookup a previously computed gradient
options : dict
Algorithm specific options
"""
fun(x_init)
[docs]@algo_reg_deco
def grid2D(x_init, fun=None, fun_grad=None, grad_lookup=None, options={}):
"""
Two dimensional scan of the function values around the initial point.
Parameters
----------
x_init : float
Initial point
fun : callable
Goal function
fun_grad : callable
Function that computes the gradient of the goal function
grad_lookup : callable
Lookup a previously computed gradient
options : dict
Options include
points : int
The number of samples
bounds : list
Range of the scan for both dimensions
"""
# TODO generalize grid to take any number of dimensions
if "points" in options:
points = options["points"]
else:
points = 100
bounds = options["bounds"][0]
bound_min = bounds[0]
bound_max = bounds[1]
xs = np.linspace(bound_min, bound_max, points)
bounds = options["bounds"][1]
bound_min = bounds[0]
bound_max = bounds[1]
ys = np.linspace(bound_min, bound_max, points)
for x in xs:
for y in ys:
if "wrapper" in options:
val = copy.deepcopy(options["wrapper"])
val[val.index("x")] = x
val[val.index("y")] = y
fun([val])
else:
fun([x, y])
[docs]@algo_reg_deco
def sweep(x_init, fun=None, fun_grad=None, grad_lookup=None, options={}):
"""
One dimensional scan of the function values around the initial point.
Parameters
----------
x_init : float
Initial point
fun : callable
Goal function
fun_grad : callable
Function that computes the gradient of the goal function
grad_lookup : callable
Lookup a previously computed gradient
options : dict
Options include
points : int
The number of samples
bounds : list
Range of the scan
"""
if "points" in options:
points = options["points"]
else:
points = 100
if "init_point" in options:
init_point = bool(options["init_point"])
if init_point:
fun([x_init[0]])
bounds = options["bounds"][0]
bound_min = bounds[0]
bound_max = bounds[1]
probe_list = []
if "probe_list" in options:
for x in options["probe_list"]:
probe_list.append(x)
probe_list_min = min(probe_list)
probe_list_max = max(probe_list)
bound_min = min(bound_min, probe_list_min)
bound_max = max(bound_max, probe_list_max)
for p in probe_list:
if "wrapper" in options:
val = copy.deepcopy(options["wrapper"])
val[val.index("x")] = p
fun([val])
else:
fun([p])
xs = np.linspace(bound_min, bound_max, points)
for x in xs:
if "wrapper" in options:
val = copy.deepcopy(options["wrapper"])
val[val.index("x")] = x
fun([val])
else:
fun([x])
[docs]@algo_reg_deco
def adaptive_scan(x_init, fun=None, fun_grad=None, grad_lookup=None, options={}):
"""
One dimensional scan of the function values around the initial point, using
adaptive sampling
Parameters
----------
x_init : float
Initial point
fun : callable
Goal function
fun_grad : callable
Function that computes the gradient of the goal function
grad_lookup : callable
Lookup a previously computed gradient
options : dict
Options include
accuracy_goal: float
Targeted accuracy for the sampling algorithm
probe_list : list
Points to definitely include in the sampling
init_point : boolean
Include the initial point in the sampling
"""
warnings.warn(
"The Adaptive Scan algorithm is not thoroughly tested and might contain bugs"
)
if "accuracy_goal" in options:
accuracy_goal = options["accuracy_goal"]
else:
accuracy_goal = 0.5
print("accuracy_goal: " + str(accuracy_goal))
probe_list = []
if "probe_list" in options:
for x in options["probe_list"]:
probe_list.append(ast.literal_eval(x))
if "init_point" in options:
init_point = bool(options.pop("init_point"))
if init_point:
probe_list.append(x_init)
# TODO make adaptive scan be able to do multidimensional scan
bounds = options["bounds"][0]
bound_min = bounds[0]
bound_max = bounds[1]
probe_list_min = min(probe_list)
probe_list_max = max(probe_list)
bound_min = min(bound_min, probe_list_min)
bound_max = max(bound_max, probe_list_max)
print(" ")
print("bound_min: " + str((bound_min) / (2e9 * np.pi)))
print("bound_max: " + str((bound_max) / (2e9 * np.pi)))
print(" ")
def fun1d(x):
return fun([x])
learner = adaptive.Learner1D(fun1d, bounds=(bound_min, bound_max))
if probe_list:
for x in probe_list:
print("from probe_list: " + str(x))
tmp = learner.function(x)
print("done\n")
learner.tell(x, tmp)
adaptive.runner.simple(
learner, goal=lambda learner_: learner_.loss() < accuracy_goal
)
[docs]@algo_reg_deco
def tf_sgd(
x_init: np.ndarray,
fun: Callable = None,
fun_grad: Callable = None,
grad_lookup: Callable = None,
options: dict = {},
) -> OptimizeResult:
"""Optimize using TensorFlow Stochastic Gradient Descent with Momentum
https://www.tensorflow.org/api_docs/python/tf/keras/optimizers/SGD
Parameters
----------
x_init : np.ndarray
starting value of parameter(s)
fun : Callable, optional
function to minimize, by default None
fun_grad : Callable, optional
gradient of function to minimize, by default None
grad_lookup : Callable, optional
lookup stored gradients, by default None
options : dict, optional
optional parameters for optimizer, by default {}
Returns
-------
OptimizeResult
SciPy OptimizeResult type object with final parameters
"""
if "maxfun" in options.keys():
raise KeyError("Tensorflow Optimizers require a maxiters")
iters = options["maxiters"] # TF based optimizers use algo iters not fevals
var = tf.Variable(x_init)
def tf_fun():
return fun(var)
learning_rate = 0.1
momentum = 0.9
if "learning_rate" in options.keys():
learning_rate = options["learning_rate"]
if "momentum" in options.keys():
momentum = options["momentum"]
opt_sgd = tf.keras.optimizers.SGD(learning_rate=learning_rate, momentum=momentum)
for _ in range(iters):
step_count = opt_sgd.minimize(tf_fun, [var])
print(f"epoch {step_count.numpy()}: func_value: {tf_fun()}")
result = OptimizeResult(x=var.numpy(), success=True)
return result
[docs]def tf_adam(
x_init: np.ndarray,
fun: Callable = None,
fun_grad: Callable = None,
grad_lookup: Callable = None,
options: dict = {},
) -> OptimizeResult:
"""Optimize using TensorFlow ADAM
https://www.tensorflow.org/api_docs/python/tf/keras/optimizers/Adam
Parameters
----------
x_init : np.ndarray
starting value of parameter(s)
fun : Callable, optional
function to minimize, by default None
fun_grad : Callable, optional
gradient of function to minimize, by default None
grad_lookup : Callable, optional
lookup stored gradients, by default None
options : dict, optional
optional parameters for optimizer, by default {}
Returns
-------
OptimizeResult
SciPy OptimizeResult type object with final parameters
"""
# TODO Update maxfun->maxiters, default hyperparameters and error handling
raise NotImplementedError("This algorithm is not yet implemented.")
[docs]def tf_rmsprop(
x_init: np.ndarray,
fun: Callable = None,
fun_grad: Callable = None,
grad_lookup: Callable = None,
options: dict = {},
) -> OptimizeResult:
"""Optimize using TensorFlow RMSProp
https://www.tensorflow.org/api_docs/python/tf/keras/optimizers/RMSprop
Parameters
----------
x_init : np.ndarray
starting value of parameter(s)
fun : Callable, optional
function to minimize, by default None
fun_grad : Callable, optional
gradient of function to minimize, by default None
grad_lookup : Callable, optional
lookup stored gradients, by default None
options : dict, optional
optional parameters for optimizer, by default {}
Returns
-------
OptimizeResult
SciPy OptimizeResult type object with final parameters
"""
# TODO Update maxfun->maxiters, default hyperparameters and error handling
raise NotImplementedError("This algorithm is not yet implemented.")
[docs]def tf_adadelta(
x_init: np.ndarray,
fun: Callable = None,
fun_grad: Callable = None,
grad_lookup: Callable = None,
options: dict = {},
) -> OptimizeResult:
"""Optimize using TensorFlow Adadelta
https://www.tensorflow.org/api_docs/python/tf/keras/optimizers/Adadelta
Parameters
----------
x_init : np.ndarray
starting value of parameter(s)
fun : Callable, optional
function to minimize, by default None
fun_grad : Callable, optional
gradient of function to minimize, by default None
grad_lookup : Callable, optional
lookup stored gradients, by default None
options : dict, optional
optional parameters for optimizer, by default {}
Returns
-------
OptimizeResult
SciPy OptimizeResult type object with final parameters
"""
# TODO Update maxfun->maxiters, default hyperparameters and error handling
raise NotImplementedError("This algorithm is not yet implemented.")
[docs]@algo_reg_deco
def lbfgs(x_init, fun=None, fun_grad=None, grad_lookup=None, options={}):
"""
Wrapper for the scipy.optimize.minimize implementation of LBFG-S. See also:
https://docs.scipy.org/doc/scipy/reference/optimize.minimize-lbfgsb.html
Parameters
----------
x_init : float
Initial point
fun : callable
Goal function
fun_grad : callable
Function that computes the gradient of the goal function
grad_lookup : callable
Lookup a previously computed gradient
options : dict
Options of scipy.optimize.minimize
Returns
-------
Result
Scipy result object.
"""
# TODO print from the log not from here
# options.update({"disp": True})
return minimize(
fun_grad, x_init, jac=grad_lookup, method="L-BFGS-B", options=options
)
[docs]@algo_reg_deco
def lbfgs_grad_free(x_init, fun=None, fun_grad=None, grad_lookup=None, options={}):
"""
Wrapper for the scipy.optimize.minimize implementation of LBFG-S.
We let the algorithm determine the gradient by its own.
See also:
https://docs.scipy.org/doc/scipy/reference/optimize.minimize-lbfgsb.html
Parameters
----------
x_init : float
Initial point
fun : callable
Goal function
fun_grad : callable
Function that computes the gradient of the goal function
grad_lookup : callable
Lookup a previously computed gradient
options : dict
Options of scipy.optimize.minimize
Returns
-------
Result
Scipy result object.
"""
return minimize(fun=fun, x0=x_init, options=options)
[docs]@algo_reg_deco
def cmaes(x_init, fun=None, fun_grad=None, grad_lookup=None, options={}):
"""
Wrapper for the pycma implementation of CMA-Es. See also:
http://cma.gforge.inria.fr/apidocs-pycma/
Parameters
----------
x_init : float
Initial point.
fun : callable
Goal function.
fun_grad : callable
Function that computes the gradient of the goal function.
grad_lookup : callable
Lookup a previously computed gradient.
options : dict
Options of pycma and the following custom options.
noise : float
Artificial noise added to a function evaluation.
init_point : boolean
Force the use of the initial point in the first generation.
spread : float
Adjust the parameter spread of the first generation cloud.
stop_at_convergence : int
Custom stopping condition. Stop if the cloud shrunk for this number of
generations.
stop_at_sigma : float
Custom stopping condition. Stop if the cloud shrunk to this standard
deviation.
Returns
-------
np.ndarray
Parameters of the best point.
"""
if "noise" in options:
noise = float(options.pop("noise"))
else:
noise = 0
if "batch_noise" in options:
batch_noise = float(options.pop("batch_noise"))
else:
batch_noise = 0
if "init_point" in options:
init_point = bool(options.pop("init_point"))
else:
init_point = False
if "spread" in options:
spread = float(options.pop("spread"))
else:
spread = 0.1
shrunk_check = False
if "stop_at_convergence" in options:
sigma_conv = int(options.pop("stop_at_convergence"))
sigmas = []
shrunk_check = True
sigma_check = False
if "stop_at_sigma" in options:
stop_sigma = int(options.pop("stop_at_sigma"))
sigma_check = True
settings = options
es = cma.CMAEvolutionStrategy(x_init, spread, settings)
iter = 0
while not es.stop():
if shrunk_check:
sigmas.append(es.sigma)
if iter > sigma_conv:
if all(
sigmas[-(i + 1)] < sigmas[-(i + 2)] for i in range(sigma_conv - 1)
):
print(
f"C3:STATUS:Shrunk cloud for {sigma_conv} steps. "
"Switching to gradients."
)
break
if sigma_check:
if es.sigma < stop_sigma:
print("C3:STATUS:Goal sigma reached. Stopping CMA.")
break
samples = es.ask()
if init_point and iter == 0:
# convert x_init to numpy array before appending
x_init_numpy = tf.concat(x_init, axis=0).numpy()
samples.insert(0, x_init_numpy)
print("C3:STATUS:Adding initial point to CMA sample.")
solutions = []
if batch_noise:
error = np.random.randn() * noise
for sample in samples:
goal = fun(sample)
if noise:
error = np.random.randn() * noise
if batch_noise or noise:
goal = goal + error
solutions.append(goal)
es.tell(samples, solutions)
es.disp()
iter += 1
es.result_pretty()
return es.result.xbest
[docs]@algo_reg_deco
def cma_pre_lbfgs(x_init, fun=None, fun_grad=None, grad_lookup=None, options={}):
"""
Performs a CMA-Es optimization and feeds the result into LBFG-S for further
refinement.
"""
if "cmaes" not in options:
options["cmaes"] = {}
if "lbfgs" not in options:
options["lbfgs"] = {}
for k in options:
if k == "cmaes" or k == "lbfgs":
continue
else:
if k not in options["cmaes"]:
options["cmaes"][k] = options[k]
if k not in options["lbfgs"]:
options["lbfgs"][k] = options[k]
x1 = cmaes(x_init, fun, options=options["cmaes"])
lbfgs(x1, fun_grad=fun_grad, grad_lookup=grad_lookup, options=options["lbfgs"])
[docs]@algo_reg_deco
def gcmaes(x_init, fun=None, fun_grad=None, grad_lookup=None, options={}):
"""
EXPERIMENTAL CMA-Es where every point in the cloud is optimized with LBFG-S and the
resulting cloud and results are used for the CMA update.
"""
warnings.warn(
"The GCMA-ES algorithm is not thoroughly tested and might contain bugs"
)
options_cma = options["cmaes"]
if "init_point" in options_cma:
init_point = bool(options_cma.pop("init_point"))
else:
init_point = False
if "spread" in options_cma:
spread = float(options_cma.pop("spread"))
else:
spread = 0.1
shrinked_check = False
if "stop_at_convergence" in options_cma:
sigma_conv = int(options_cma.pop("stop_at_convergence"))
sigmas = []
shrinked_check = True
sigma_check = False
if "stop_at_sigma" in options_cma:
stop_sigma = int(options_cma.pop("stop_at_sigma"))
sigma_check = True
settings = options_cma
es = cma.CMAEvolutionStrategy(x_init, spread, settings)
iter = 0
while not es.stop():
if shrinked_check:
sigmas.append(es.sigma)
if iter > sigma_conv:
if all(
sigmas[-(i + 1)] < sigmas[-(i + 2)] for i in range(sigma_conv - 1)
):
print(
f"C3:STATUS:Shrinked cloud for {sigma_conv} steps. "
"Switching to gradients."
)
break
if sigma_check:
if es.sigma < stop_sigma:
print("C3:STATUS:Goal sigma reached. Stopping CMA.")
break
samples = es.ask()
if init_point and iter == 0:
samples.insert(0, x_init)
print("C3:STATUS:Adding initial point to CMA sample.")
solutions = []
points = []
for sample in samples:
r = lbfgs(
sample,
fun_grad=fun_grad,
grad_lookup=grad_lookup,
options=options["lbfgs"],
)
solutions.append(r.fun)
points.append(r.x)
es.tell(points, solutions)
es.disp()
iter += 1
return es.result.xbest
[docs]@algo_reg_deco
def oneplusone(x_init, goal_fun):
raise NotImplementedError("This algorithm is not yet implemented.")