import pickle
import argparse
import os
import csv
import multiprocessing
import random
from time import sleep
import pandas as pd
import yaml
import skopt
import ray
from ray.tune import run
from ray.tune.suggest.hyperopt import HyperOptSearch
from ray.tune.suggest.skopt import SkOptSearch
from ray.tune.automl import GeneticSearch
import hyperopt as hpo
import ctlearn_optimizer.bayesian_tpe as bayesian_tpe
import ctlearn_optimizer.bayesian_gp as bayesian_gp
import ctlearn_optimizer.genetic_algorithm as genetic_algorithm
import ctlearn_optimizer.common as common
# set dummy authentication key for multiprocessing
[docs]multiprocessing.current_process().authkey = b'1234'
[docs]class Optimizer:
""" Basic class for an optimizer.
Currently, only tree parzen estimators, random search, gaussian processes
and genetic algorithm based optimization using Ray Tune is supported.
"""
[docs] def __init__(self, opt_config):
""" Initialize the optimizer:
- Set optimizer attributes.
- Set logger writing to both ``optimization.log`` and ``stdout``.
- Load trials file from ``working_directory/trials.pkl``
if required, thus allowing to resume a past optimization run.
- Load optimization results file from
``working_directory/optimization_results.csv`` or create one at
the same path as required. The results of the optimization run
(loss, iteration, metrics, hyperparameters, time) are logged to
this file for further analysis.
Parameters:
opt_config (dict): loaded optimization configuration file.
Raises:
NotImplementedError: if ``self.optimization_type`` is
``genetic_algorithm`` and ``self.reload_trials`` is ``True``.
"""
# set working directory path
gen_settings = opt_config['General_settings']
self.working_directory = gen_settings.get('working_directory',
os.getcwd())
# create new logger
self.log_path = os.path.join(self.working_directory,
'optimization.log')
self.logger = common.set_logger(self.log_path)
self.logger.info('Starting optimization run')
# ray tune optimizator runs the objective function on a different
# Python process, so we have to use multiprocessing manager to share
# variables between processes.
manager = multiprocessing.Manager()
# set global run iteration (for taking into account reloaded trials)
self.iteration = manager.Value('i', 0)
# set current run iteration (always starts from 0)
self.counter = manager.Value('i', 0)
# set random state seed for reproducible results
self.random_state = gen_settings.get('random_state', None)
# set hardware resources
self.num_cpus = gen_settings['num_cpus']
self.num_cpus_per_trial = gen_settings['num_cpus_per_trial']
self.num_gpus = gen_settings['num_gpus']
self.num_gpus_per_trial = gen_settings['num_gpus_per_trial']
# set file paths
self.ctlearn_config_path = os.path.join(self.working_directory,
gen_settings['ctlearn_config'])
self.trials_file_path = os.path.join(self.working_directory,
'trials.pkl')
self.optim_results_path = os.path.join(self.working_directory,
'optimization_results.csv')
self.remove_training_folders = gen_settings.get(
'remove_training_folders', True)
# set optimizer configuration
self.num_parallel_trials = gen_settings.get('num_parallel_trials', 1)
self.mode = gen_settings.get('mode', 'max')
self.n_initial_points = gen_settings.get('n_initial_points', 30)
self.optimization_type = gen_settings['optimization_type']
self.num_max_evals = gen_settings['num_max_evals']
self.logger.info('Optimization algorithm:{}'.format(
self.optimization_type))
self.metric_to_optimize = gen_settings['metric_to_optimize']
self.predict = gen_settings.get('predict', False)
self.data_set_to_optimize = gen_settings.get('data_set_to_optimize',
'validation')
if self.data_set_to_optimize == 'prediction':
assert self.predict is True
if self.optimization_type == 'genetic_algorithm':
optimizer_info = opt_config['Optimizer_settings']
self.ga_config = optimizer_info['genetic_algorithm_config']
else:
optimizer_info = opt_config.get('Optimizer_settings', None)
self.ga_config = optimizer_info.get('genetic_algorithm_config',
None)
self.tpe_config = optimizer_info.get('tree_parzen_estimators_config',
None)
self.gp_config = optimizer_info.get('gaussian_processes_config', None)
# set ctlearn basic configuration
self.basic_config = opt_config['CTLearn_settings']
# set hyperparameters logging and configuration
hyperparameters_info = opt_config['Hyperparameters_settings']
self.hyperparams_to_log = (hyperparameters_info
['hyperparameters_to_log'])
self.hyperparameters_config = (hyperparameters_info['config'])
self.hyperparams_to_optimize = (hyperparameters_info
['hyperparameters_to_optimize'])
self.fixed_hyperparameters = hyperparameters_info.get(
'fixed_hyperparameters', None)
self.dependent_hyperparameters = hyperparameters_info.get(
'dependent_hyperparameters', None)
# set metrics logging and configuration
self.list_metrics_val_to_log = gen_settings.get('metrics_val_to_log',
[])
self.list_metrics_pred_to_log = gen_settings.get('metrics_pred_to_log',
[])
if self.list_metrics_pred_to_log:
assert self.predict is True
else:
assert self.data_set_to_optimize != 'prediction'
self.user_defined_metric_val = gen_settings.get(
'user_defined_metric_val', None)
self.user_defined_metric_pred = gen_settings.get(
'user_defined_metric_pred', None)
# create hyperparameters space, used for trial reloading
hyperparameter_space = self.create_space_hyperparams()
# set trial reloading configuration and create optimization algorithm
self.reload_trials = gen_settings.get('reload_trials', False)
if self.reload_trials:
if self.optimization_type == 'genetic_algorithm':
raise NotImplementedError('Trial reloading is not currently \
supported by the genetic algorithm optimization')
else:
# reload trials file
assert os.path.isfile(self.trials_file_path)
with open(self.trials_file_path, 'rb') as input_file:
trials = pickle.load(input_file)
if self.optimization_type == 'gaussian_processes':
# the gaussian processes based optimization needs an
# extra parameter self.gp_opt
self.gp_opt = common.restore(self)
# create optimization algorithm with reloaded trials
self.optimization_algorithm = \
self.create_optimization_algorithm(
hyperparameter_space)
# set global run iteration value to start from the
# number of trials of the reloaded run
self.iteration.value = len(trials.Xi)
else: # optimization type == tpe or random_search
# create optimization algorithm with reloaded trials
self.optimization_algorithm = \
self.create_optimization_algorithm(
hyperparameter_space)
common.restore(self)
if self.optimization_type == 'random_search':
self.optimization_algorithm.algo = hpo.rand.suggest
# set global run iteration value to start from the
# number of trials of the reloaded run
self.iteration.value = len(trials[0].trials)
self.logger.info(
'A trials file with {} saved trials has been reloaded, \
new trials will be added'.format(self.iteration.value))
else:
self.logger.info('No trials file loaded, starting from scratch')
self.gp_opt = None
# create optimization algorithm
self.optimization_algorithm = self.create_optimization_algorithm(
hyperparameter_space)
self.iteration.value = 0
# set optimization results configuration
reload_optimization_results = gen_settings.get(
'reload_optimization_results', False)
if reload_optimization_results:
# reload optimization_results file
assert os.path.isfile(self.optim_results_path)
with open(self.optim_results_path, 'r') as file:
existing_iters_csv = len(file.readlines()) - 1
self.logger.info(
'An optimization_results file with {} saved trials has been \
reloaded, new trials will be added'.format(existing_iters_csv))
if existing_iters_csv != self.iteration.value:
self.logger.WARNING(
'Caution: the number of trials stored in the trials file \
and the number of trials stored in the \
optimization_results file does not match')
else: # create optimization_results.csv
self.logger.info(
'No optimization_results file loaded, starting from scratch')
with open(self.optim_results_path, 'w') as file:
writer = csv.writer(file)
header = ['loss', 'iteration'] + self.hyperparams_to_log + \
[elem + '_val' for elem in self.list_metrics_val_to_log] + \
[elem + '_pred' for elem in self.list_metrics_pred_to_log] + \
['run_time']
writer.writerow(header)
[docs] def set_basic_config(self):
"""Set basic config and fixed hyperparameters in CTLearn config file.
"""
common.set_basic_config(self)
[docs] def create_space_hyperparams(self):
""" Create space of hyperparameters following required syntax.
Currently, only tree parzen estimators and random search spaces based
on hyperopt, gaussian processes space based on skopt and
genetic algorithm space based on ray.tune.automl are supported.
Returns:
space of hyperparameters following the syntax required by the
optimization algorithm.
Raises:
NotImplementedError: if ``self.optimization_type`` is other than
``tree_parzen_estimators``, ``random_search``,
``gaussian_processes`` or ``genetic_algorithm``.
"""
hyper_to_opt = self.hyperparams_to_optimize
if self.optimization_type == 'tree_parzen_estimators':
hyperparameter_space = bayesian_tpe.hyperopt_space(hyper_to_opt)
elif self.optimization_type == 'random_search':
hyperparameter_space = bayesian_tpe.hyperopt_space(hyper_to_opt)
elif self.optimization_type == 'gaussian_processes':
hyperparameter_space = bayesian_gp.skopt_space(hyper_to_opt)
elif self.optimization_type == 'genetic_algorithm':
hyperparameter_space = genetic_algorithm.gen_al_space(hyper_to_opt)
else:
raise NotImplementedError(
'Other optimization types are not supported yet')
return hyperparameter_space
[docs] def create_optimization_algorithm(self, hyperparameter_space):
""" Create optimization algorithm for Ray Tune.
Currently, only tree parzen estimators, random search,
gaussian processes and genetic algorithm based optimization using Ray
Tune is supported.
Parameters:
space (dict, list or ray.tune.automl.search_space.SearchSpace):
space of hyperparameters following the syntax required by
the optimization algorithm.
Returns:
Optimization algorithm for Ray Tune.
Raises:
NotImplementedError: if self.optimization_type is other than
``tree_parzen_estimators``, ``random_search``,
``gaussian_processes`` or ``genetic_algorithm``.
"""
if self.optimization_type == 'tree_parzen_estimators':
algorithm = HyperOptSearch(
hyperparameter_space,
max_concurrent=self.num_parallel_trials,
metric='loss',
mode=self.mode,
n_initial_points=self. n_initial_points,
random_state_seed=self.random_state,
gamma=self.tpe_config.get('gamma', 0.25))
elif self.optimization_type == 'random_search':
algorithm = HyperOptSearch(
hyperparameter_space,
max_concurrent=self.num_parallel_trials,
metric='loss',
mode=self.mode,
random_state_seed=self.random_state)
algorithm.algo = hpo.rand.suggest
elif self.optimization_type == 'gaussian_processes':
if not self.reload_trials:
# the gaussian processes based optimization needs an
# extra parameter self.gp_opt
self.gp_opt = skopt.Optimizer(
hyperparameter_space,
n_initial_points=self.n_initial_points,
base_estimator=self.gp_config.get(
'base_estimator', 'GP'),
acq_func=self.gp_config.get(
'acq_function', 'gp_hedge'),
acq_optimizer=self.gp_config.get(
'acq_optimizer', 'auto'),
random_state=self.random_state,
acq_func_kwargs={'xi': self.gp_config.get('xi',
0.01),
'kappa': self.gp_config.get('kappa',
1.96)})
hyperparams_names = [key for key in self.hyperparams_to_optimize]
algorithm = SkOptSearch(self.gp_opt,
hyperparams_names,
max_concurrent=self.num_parallel_trials,
metric='loss',
mode=self.mode)
elif self.optimization_type == 'genetic_algorithm':
algorithm = GeneticSearch(
hyperparameter_space,
reward_attr='loss',
max_generation=self.ga_config['max_generation'],
population_size=self.ga_config['population_size'],
population_decay=self.ga_config.get('population_decay', 0.95),
keep_top_ratio=self.ga_config.get('keep_top_ratio', 0.2),
selection_bound=self.ga_config.get('selection_bound', 0.4),
crossover_bound=self.ga_config.get('crossover_bound', 0.4))
else:
raise NotImplementedError(
'Other optimization types are not supported yet')
return algorithm
[docs] def get_ctlearn_metric_to_optimize(self, hyperparams):
""" Evaluate a CTLearn model and return metric to optimize.
Parameters:
hyperparams (dict): set of hyperparameters to evaluate provided by
the optimizer.
Returns:
float: metric to optimize.
"""
# if more than one CTLearn model is going to be evaluated at the same
# time, delay the execution of each model for a small random number of
# seconds in order to avoid the simultaneus creation of CTLearn working
# folders that have the same name, which would lead to errors
if self.num_parallel_trials > 1:
sleep(random.randint(1, 20))
# evaluate a CTLearn model and return metric
metric = common.ctlearn_objective(self, hyperparams)
if self.optimization_type == 'genetic_algorithm':
# the genetic algorithm internally maximizes the loss, so:
if self.mode == 'min':
metric *= -1
return metric
[docs] def optimize(self, objective_function):
""" Start the optimization of ``objective_function`` using Ray Tune.
Currently, only tree parzen estimators, random search,
gaussian processes and genetic algorithm based optimization using Ray
Tune is supported.
Parameters:
objective_function: function to optimize following the syntax:
.. code:: python
def(hyperparams, reporter):
...
# compute loss to optimize
...
reporter(loss=loss)
Returns:
ExperimentAnalysis: object used for analyzing results from Tune
``run()``.
"""
# start a Ray cluster with given resources and connect to it
ray.init(num_cpus=self.num_cpus,
num_gpus=self.num_gpus)
# set CTLearn basic config and fixed hyperparameters
self.set_basic_config()
# create custom trial names for Ray Tune
def trial_str_creator(trial):
return 'Iteration_{}: {}'.format(self.iteration.value + 1,
trial.config)
# execute optimization
ray_result = run(
objective_function,
name='ray_exp',
resources_per_trial={'cpu': self.num_cpus_per_trial,
'gpu': self.num_gpus_per_trial},
num_samples=self.num_max_evals,
local_dir=os.path.join(self.working_directory,
'ray_optimization_results'),
search_alg=self.optimization_algorithm,
verbose=1,
queue_trials=True,
trial_name_creator=ray.tune.function(trial_str_creator))
# get best metric and hyperparameters found
results_dataframe = pd.read_csv(self.optim_results_path)
if self.data_set_to_optimize == 'validation':
best_metric_label = self.metric_to_optimize + '_val'
else:
best_metric_label = self.metric_to_optimize + '_pred'
if self.mode == 'max':
best_metric = results_dataframe[best_metric_label].max()
else:
best_metric = results_dataframe[best_metric_label].max()
best_hyperparameters = ray_result.get_best_config(metric='loss',
mode=self.mode)
# log best metric and hyperparameters found
self.logger.info(
'Best metric found: {} = {:.4f}, with hyperparameters: {}'
.format(best_metric_label, best_metric, best_hyperparameters))
# create and save gaussian_processes model for further analysis
if self.optimization_type == 'gaussian_processes':
gp_model = skopt.utils.create_result(self.gp_opt.Xi,
self.gp_opt.yi,
self.gp_opt.space,
self.gp_opt.rng,
models=self.gp_opt.models)
gp_mod_path = os.path.join(self.working_directory, 'gp_model.pkl')
with open(gp_mod_path, 'wb') as output_file:
pickle.dump(gp_model, output_file)
self.logger.info('Gaussian_processes model saved')
# save trials file
if self.optimization_type != 'genetic_algorithm':
common.save(self)
self.logger.info('Trials file saved')
self.logger.info('Optimization run finished')
# terminate processes started by ray.init()
ray.shutdown()
return ray_result
[docs] def optimize_ctlearn_model(self):
""" Start the optimization of a CTLearn model using Ray Tune.
Currently, only tree parzen estimators, random search,
gaussian processes and genetic algorithm based optimization using Ray
Tune is supported.
Returns:
ExperimentAnalysis: object used for analyzing results from Tune
``run()``.
"""
def ctlearn_objective(hyperparams, reporter):
loss = self.get_ctlearn_metric_to_optimize(hyperparams)
reporter(loss=loss)
result = self.optimize(ctlearn_objective)
return result
#####################
# launch optimization
#####################
if __name__ == "__main__":
[docs] PARSER = argparse.ArgumentParser(
description=('Run CTLearn model optimization'))
PARSER.add_argument(
'opt_config',
help='path to YAML file containing ctlearn_optimizer configuration')
ARGS = PARSER.parse_args()
with open(ARGS.opt_config, 'r') as opt_conf:
OPT_CONF = yaml.load(opt_conf)
OPT = Optimizer(OPT_CONF)
OPT_RESULT = OPT.optimize_ctlearn_model()