import os
import re
import shutil
import csv
import pickle
import logging
import multiprocessing
from timeit import default_timer as timer
import numpy as np
import sklearn.metrics
import yaml
from ctlearn.run_model import run_model
# set dummy authentication key for multiprocessing
[docs]multiprocessing.current_process().authkey = b'1234'
[docs]def set_value(dictionary, value, *keys):
"""Modify the value the keys point to in a nested dictionary.
The dictionary can be a nested dictionary containing lists, these lists can
also contain nested dictionaries, and so on. The keys list can contain
strings (which refer dictionary keys) and integers (which refer list
indices). Dictionary cannot be empty.
Parameters:
dictionary (dict): dictionary that contais the key-value pair the user
wishes to modify.
value (int, float, string): value to set.
keys (list): list of keys containing strings and integers.
Returns:
dict: modified dictionary.
Raises:
TypeError: if type(dictionary) is not ``dict``.
Example::
dictionary = {'a':[0,{'b':1},0]}
value = 2
keys = ['a', 1, 'b']
set_value(dictionary, value, *keys) = {'a':[0,{'b':2},0]}
"""
if not isinstance(dictionary, dict):
raise TypeError('set_value expects dict as first argument')
_keys = keys[:-1]
_element = dictionary
for key in _keys:
_element = _element[key]
_element[keys[-1]] = value
return dictionary
[docs]def create_nested_item(dictionary, *keys):
"""Create an empty item with specific keys and positions in a dictionary.
The dictionary can be a nested dictionary containing lists, these lists can
also contain nested dictionaries, and so on. The keys list can contain
string (which refer dictionary keys) and integers (which refer list
indices). The dictionary may or may be not empty.
Parameters:
dictionary (dict): dictionary to modify.
keys (list): list of keys containing strings and integers.
Returns:
dict : modified dictionary.
Raises:
TypeError: if type(dictionary) is not ``dict``.
Example::
dictionary = {}
keys = ['a', 'b', 1, 'c', 2 , 'd']
create_nested_item(dictionary, *keys) =
{'a': {'b': [0, {'c': [0, 0, {'d': {}}]}]}}
"""
if not isinstance(dictionary, dict):
raise TypeError('create_nested_item expects dict as first argument')
_keys = keys
_element = dictionary
# iterate over the list of keys
for counter, key in enumerate(_keys, 1):
# set next_key value
if counter < len(_keys):
next_key = _keys[counter]
else:
next_key = None
# key is str, therefore _element is dict
if isinstance(key, str):
if isinstance(_element, dict):
# if key in the dictionary, access it
if key in _element:
_element = _element[key]
# else, create new item and access it
else:
if isinstance(next_key, str):
_element.update({'{}'.format(key): {}})
_element = _element[key]
if isinstance(next_key, int):
_element.update({'{}'.format(key): []})
_element = _element[key]
if next_key is None:
_element.update({'{}'.format(key): {}})
# key is int, therefore _element is list
if isinstance(key, int):
if isinstance(_element, list):
# if list lenght is enought
if len(_element) > key:
_dummy_element = _element[key]
# create new item
if (isinstance(next_key, str) and
not isinstance(_dummy_element, dict)):
_element[key] = {}
if (isinstance(next_key, int) and
not isinstance(_dummy_element, list)):
_element[key] = []
# else, extend the list
else:
while len(_element) < key + 1:
_element.append(0)
# create new item
if isinstance(next_key, str):
_element[key] = {}
if isinstance(next_key, int):
_element[key] = []
# access the item
_element = _element[key]
return dictionary
[docs]def auxiliar_modify_params(self, hyperparams):
"""Modify the values of the hyperparameters in CTLearn configuration file.
This function also modifies the logging model_directory of CTLearn and the
``prediction_file_path``.
Parameters:
self: ``ctlearn_optimizer.optimizer.Optimizer`` instance.
hyperparams (dict): dictionary containing values of the
hyperparameters.
"""
# load ctlearn config file
with open(self.ctlearn_config_path, 'r') as config:
myconfig = yaml.load(config)
# empty layers list in myconfig in order to get rid of previous
# configurations
myconfig['Model']['Model Parameters']['basic']['conv_block']['layers'] = []
# modify values of the hyperparameters in myconfig
for param, value in hyperparams.items():
if param in self.hyperparameters_config:
# create hyperparameter empty item in myconfig
create_nested_item(myconfig, *self.hyperparameters_config[param])
# set hyperparameter value
set_value(myconfig, value, *self.hyperparameters_config[param])
# set model_directory and prediction_file_path
myconfig['Logging']['model_directory'] = os.path.join(
self.working_directory, 'run' + str(self.iteration.value))
myconfig['Prediction']['prediction_file_path'] = os.path.join(
self.working_directory, 'run' + str(self.iteration.value),
'predictions_run{}.csv'.format(self.iteration.value))
# dump ctlearn configuration
with open(self.ctlearn_config_path, 'w') as config:
yaml.dump(myconfig, config)
[docs]def get_pred_metrics(self):
"""Get CTLearn prediction metrics from the current CTLearn logging folder.
Parameters:
self: ``ctlearn_optimizer.optimizer.Optimizer`` instance.
Returns:
dict: dictionary containing prediction set metrics to log to the
``optimization_results.csv`` file.
"""
# load prediction file
predictions_path = os.path.join(
self.working_directory, 'run' + str(self.iteration.value),
'predictions_run{}.csv'.format(self.iteration.value))
# load prediction data
predictions = np.genfromtxt(predictions_path, delimiter=',', names=True)
labels = predictions['gamma_hadron_label'].astype(int)
gamma_classifier_values = predictions['gamma']
predicted_class = predictions['predicted_class'].astype(int)
# compute metrics
fpr, tpr, _thresholds = sklearn.metrics.roc_curve(
labels, gamma_classifier_values, pos_label=0)
auc = sklearn.metrics.auc(fpr, tpr)
f1 = sklearn.metrics.f1_score(labels, predicted_class)
acc = sklearn.metrics.accuracy_score(labels, predicted_class)
bacc = sklearn.metrics.balanced_accuracy_score(labels, predicted_class)
prec = sklearn.metrics.precision_score(labels, predicted_class)
rec = sklearn.metrics.recall_score(labels, predicted_class)
log_loss = sklearn.metrics.log_loss(labels, predicted_class)
metrics_pred = {'auc': auc, 'accuracy': acc, 'balanced_accuracy': bacc,
'f1': f1, 'precision': prec, 'recall': rec,
'log_loss': log_loss}
# compute validation user defined metric if required
if self.user_defined_metric_pred is not None:
user_defined = eval(self.user_defined_metric_pred['expression'])
metrics_pred.update(
{self.user_defined_metric_pred['label']: user_defined})
# return metrics_pred_to_log
metrics_pred_to_log = {}
for metric in self.list_metrics_pred_to_log:
metrics_pred_to_log.update(
{metric + '_pred': metrics_pred.get(metric)})
return metrics_pred_to_log
[docs]def get_val_metrics(self):
"""Get CTLearn validation metrics from the current CTLearn logging folder.
Parameters:
self: ``ctlearn_optimizer.optimizer.Optimizer`` instance.
Returns:
dict: dictionary containing validation set metrics to log to the
``optimization_results.csv`` file.
"""
# load training log file
run_folder = os.path.join(self.working_directory,
'run' + str(self.iteration.value))
for file in os.listdir(run_folder):
if file.endswith('logfile.log'):
with open(os.path.join(run_folder, file)) as log_file:
contents = log_file.read()
# ensure that prediction log file is not loaded
if 'Training' in contents:
train_logfile = file
# find required data
with open(os.path.join(run_folder, train_logfile), 'r') as stream:
r = re.compile('INFO:Saving dict for global step .*')
matches = list(filter(r.match, stream))
assert len(matches) > 0
val_info = matches[-1]
# extract validation metrics
auc = float(re.findall(r'auc = [-+]?\d*\.*\d+', val_info)[0][6:])
acc = float(re.findall(r'accuracy = [-+]?\d*\.*\d+', val_info)[0][11:])
acc_gamma = float(re.findall(
r'accuracy_gamma = [-+]?\d*\.*\d+', val_info)[0][17:])
acc_proton = float(re.findall(
r'accuracy_proton = [-+]?\d*\.*\d+', val_info)[0][18:])
loss = float(re.findall(r'loss = [-+]?\d*\.*\d+', val_info)[0][7:])
metrics_val = {'auc': auc, 'accuracy': acc, 'accuracy_gamma': acc_gamma,
'accuracy_proton': acc_proton, 'loss': loss}
# compute prediction user defined metric
if self.user_defined_metric_val is not None:
user_defined = eval(
self.user_defined_metric_val['expression'], metrics_val)
metrics_val.update(
{self.user_defined_metric_val['label']: user_defined})
# return metrics_val_to_log
metrics_val_to_log = {}
for metric in self.list_metrics_val_to_log:
metrics_val_to_log.update({metric + '_val': metrics_val.get(metric)})
return metrics_val_to_log
[docs]def set_basic_config(self):
"""Set basic config and fixed hyperparameters in CTLearn config file.
Parameters:
self: ``ctlearn_optimizer.optimizer.Optimizer`` instance.
"""
# load ctlearn config file
with open(self.ctlearn_config_path, 'r') as config:
myconfig = yaml.load(config)
# set basic configuration
myconfig['Training']['num_validations'] = (self.basic_config
['num_validations'])
myconfig['Training']['num_training_steps_per_validation'] = (
self.basic_config['num_training_steps_per_validation'])
myconfig['Data']['Input']['batch_size'] = self.basic_config['batch_size']
myconfig['Model']['model_directory'] = self.basic_config.get(
'model_directory', 'null')
myconfig['Data']['Loading']['validation_split'] = self.basic_config.get(
'validation_split', 0.1)
myconfig['Data']['Processing']['sorting'] = self.basic_config.get(
'sorting', 'null')
myconfig['Data']['Loading']['min_num_tels'] = self.basic_config.get(
'min_num_tels', 1)
myconfig['Data']['Loading']['example_type'] = (self.basic_config
['example_type'])
myconfig['Data']['Loading']['seed'] = self.basic_config.get('seed', None)
if self.basic_config['model'] == 'cnn_rnn':
myconfig['Model']['model']['module'] = 'cnn_rnn'
myconfig['Model']['model']['function'] = 'cnn_rnn_model'
assert self.basic_config['example_type'] == 'array'
elif self.basic_config['example_type'] == 'single_tel':
myconfig['Model']['model']['module'] = 'single_tel'
myconfig['Model']['model']['function'] = 'single_tel_model'
assert self.basic_config['example_type'] == 'single_tel'
myconfig['Data']['Loading']['selected_tel_types'] = (
self.basic_config['selected_tel_types'])
aux_dict = {'SST:ASTRICam': {'camera_types': 'ASTRICam',
'interpolation_image_shape': [56, 56, 1]},
'SST:CHEC': {'camera_types': 'CHEC',
'interpolation_image_shape': [48, 48, 1]},
'SST:DigiCam': {'camera_types': 'DigiCam',
'interpolation_image_shape': [96, 96, 1]},
'MST:FlashCam': {'camera_types': 'FlashCam',
'interpolation_image_shape': [112, 112, 1]},
'LST:LSTCam': {'camera_types': 'LSTCam',
'interpolation_image_shape': [110, 110, 1]},
'MST:NectarCam': {'camera_types': 'NectarCam',
'interpolation_image_shape': [110, 110, 1]},
'SCT:SCTCam': {'camera_types': 'SCTCam',
'interpolation_image_shape': [120, 120, 1]}}
myconfig['Image Mapping']['camera_types'] = []
myconfig['Image Mapping']['interpolation_image_shape'] = {}
for tel_type in self.basic_config['selected_tel_types']:
element = aux_dict[tel_type]
myconfig['Image Mapping']['camera_types'].append(
element['camera_types'])
myconfig['Image Mapping']['interpolation_image_shape'].update(
{element['camera_types']: element['interpolation_image_shape']})
# set values of the fixed hyperparameters
if self.fixed_hyperparameters is not None:
for param, value in self.fixed_hyperparameters.items():
create_nested_item(myconfig, *self.hyperparameters_config[param])
set_value(myconfig, value, *self.hyperparameters_config[param])
# dump ctlearn configuration
with open(self.ctlearn_config_path, 'w') as config:
yaml.dump(myconfig, config)
[docs]def train(self):
"""Run a CTlearn model training.
``Debug`` is set to ``False`` and ``log_to_file`` is set to ``True``.
Parameters:
self: ``ctlearn_optimizer.optimizer.Optimizer`` instance.
"""
# update file_list in ctlearn config
with open(self.ctlearn_config_path, 'r') as config:
myconfig = yaml.load(config)
myconfig['Data']['file_list'] = os.path.join(
self.working_directory, self.basic_config['training_file_list'])
# dump ctlearn configuration
with open(self.ctlearn_config_path, 'w') as config:
yaml.dump(myconfig, config)
# run training
run_model(myconfig, mode='train', debug=False, log_to_file=True)
[docs]def predict(self):
"""Predict using a trained CTLearn model.
``Debug`` is set to ``False`` and ``log_to_file`` is set to ``True``.
Parameters:
self: ``ctlearn_optimizer.optimizer.Optimizer`` instance.
"""
# update file_list in ctlearn config
with open(self.ctlearn_config_path, 'r') as config:
myconfig = yaml.load(config)
myconfig['Data']['file_list'] = os.path.join(
self.working_directory, self.basic_config['prediction_file_list'])
# modify ctlearn config to make sure that a prediction file will be created
myconfig['Prediction']['export_as_file'] = True
myconfig['Prediction']['true_labels_given'] = True
# dump ctlearn configuration
with open(self.ctlearn_config_path, 'w') as config:
yaml.dump(myconfig, config)
# run prediction
run_model(myconfig, mode='predict', debug=False, log_to_file=True)
[docs]def modify_optimizable_params(self, hyperparams):
"""Update CTLearn config file with new hyperparameters at each iteration.
This function takes the dictionary containing the values of the
hyperparameters to optimize suggested by the optimizer, flattens and
corrects the dictionary if required, then add the values of the dependent
hyperparameters to the dictionary. Finally calls
``auxiliar_modify_params()`` to modify the hyperparameters.
Parameters:
self: ``ctlearn_optimizer.optimizer.Optimizer`` instance.
hyperparams (dict): flat or nested dictionary containing the values of
the hyperparameters to optimize suggested by the optimizer.
Returns:
dict: flat dictionary containing the values of the dependent
hyperparameters and hyperparameters to optimize.
"""
# flatten optimizable hyperparameters dict if required
def aux_flat(hyperparams):
flat_hyperparams = {}
for key, item in hyperparams.items():
if not isinstance(item, dict):
flat_hyperparams.update({key: item})
else:
flat_hyperparams.update(aux_flat(item))
return flat_hyperparams
hyperparams = aux_flat(hyperparams)
# correct hyperparameters_to_optimize keys (hyperopt space creator doesn't
# support repeated labels, so a ! character is appended to each repeated
# label)
corrected_hyperparams = {}
for key in hyperparams:
if key.endswith('!'):
dummy_key = key
while dummy_key.endswith('!'):
dummy_key = dummy_key[:-1]
corrected_hyperparams.update({dummy_key: hyperparams[key]})
else:
corrected_hyperparams.update({key: hyperparams[key]})
hyperparams = corrected_hyperparams
# add dependent hyperparameters to the hyperparameters dict
if self.dependent_hyperparameters is not None:
for param, expression in self.dependent_hyperparameters.items():
hyperparams.update({param: eval(expression, hyperparams)})
# update myconfig with the values in hyperparams dict
auxiliar_modify_params(self, hyperparams)
return hyperparams
[docs]def save(self):
""" Save trials of the current run at the working folder as ``trials.pkl``.
Currently, trial saving for only tree parzen estimators, random search or
gaussian processes based optimization using Ray Tune is supported.
Raises:
NotImplementedError: if ``self.optimization_type`` is
``genetic_algorithm``.
"""
if self.optimization_type in ('tree_parzen_estimators',
'random_search'):
self.optimization_algorithm.save(self.trials_file_path)
if self.optimization_type == 'gaussian_processes':
with open(self.trials_file_path, 'wb') as output_file:
pickle.dump(self.gp_opt, output_file)
if self.optimization_type == 'genetic_algorithm':
raise TypeError('trial saving is not currently \
supported by the genetic algorithm optimization')
[docs]def restore(self):
""" Load ``trials.pkl`` of a previous run from the ``working_directory``.
Currently, trial loading for only tree parzen_estimators, random search or
gaussian processes based optimization using Ray Tune is supported.
Returns:
skopt.optimizer.optimizer.Optimizer: optimizer provided from Skopt
(only if ``self.optimization.type`` is ``gaussian_processes``).
Raises:
NotImplementedError: if ``self.optimization_type`` is
``genetic_algorithm``.
"""
if self.optimization_type in ('tree_parzen_estimators',
'random_search'):
self.optimization_algorithm.restore(self.trials_file_path)
if self.optimization_type == 'gaussian_processes':
with open(self.trials_file_path, 'rb') as input_file:
gp_opt_restored = pickle.load(input_file)
if self.optimization_type == 'genetic_algorithm':
raise TypeError('trial loading is not currently \
supported by the genetic algorithm optimization')
return gp_opt_restored if 'gp_opt_restored' in locals() else None
[docs]def set_logger(log_path):
""" Set up new logger writing to both ``log_path`` and ``stdout``.
Ray Tune optimizator runs the objective function on a different Python
process, so new loggers writing to the same file have to be created when
necessary.
Parameters:
log_path (str): path to log file.
"""
logger = logging.getLogger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter("%(levelname)s:%(message)s")
# log to file
file_handler = logging.FileHandler(log_path)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# log to stdout
console_handler = logging.StreamHandler(os.sys.stdout)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
[docs]def optimization_results_logger(self, loss, hyperparams_dict, metrics_val,
metrics_pred, run_time):
""" Write loss, hyperparameters, metrics and run_time to the results file.
This function log the data to the optimization results file stored as
``optimization_results.csv`` at ``working_directory``.
Parameters:
self: ``ctlearn_optimizer.optimizer.Optimizer`` instance.
loss (float): value to optimize.
hyperparams_dict (dict): values of the hyperparameters the user wishes
to store.
metrics_val (dict): values of the validation set metrics the user
wishes to store.
metrics_pred (dict): values of the prediction set metrics the user
wishes to store.
run_time (float): execution time the user wishes to store.
"""
with open(self.optim_results_path, 'a') as file:
writer = csv.writer(file)
row_hyperparams = []
for element in self.hyperparams_to_log:
if element in hyperparams_dict:
row_hyperparams.append(hyperparams_dict[element])
else:
row_hyperparams.append(0)
row = [loss, self.iteration.value] + row_hyperparams + \
list(metrics_val.values()) + \
list(metrics_pred.values()) + [run_time]
writer.writerow(row)
[docs]def ctlearn_objective(self, hyperparams):
""" Evaluate a CTLearn model and return metric to optimize.
Train a CTLearn model and predict if necessary, get the metrics and log
them to the ``optimization_results.csv`` file. Also save trials file for
resuming training if it has been interrupted.
Parameters:
self: ``ctlearn_optimizer.optimizer.Optimizer`` instance.
hyperparams (dict): values of the hyperparameters to evaluate
suggested by the optimizer.
Returns:
float: metric to optimize.
"""
# set up logger
logger = set_logger(self.log_path)
self.iteration.value += 1
self.counter.value += 1
logger.info('Current run iteration: {}' .format(self.counter.value))
logger.info('Global iteration: {}' .format(self.iteration.value))
# update values of the hyperparameters
hyperparams_dict = modify_optimizable_params(self, hyperparams)
start = timer()
logger.info('Training')
logger.info('Current hyperparameters: {}'. format(hyperparams_dict))
# train ctlearn network
train(self)
logger = set_logger(self.log_path)
logger.info('Training ended')
run_time = timer() - start
# get validation set metrics
metrics_val = get_val_metrics(self)
metrics_pred = {}
# predict if required
if self.data_set_to_optimize == 'prediction':
logger.info('Predicting')
predict(self)
logger = set_logger(self.log_path)
logger.info('Prediction ended')
metrics_pred = get_pred_metrics(self)
# set loss depending on metric and data set to optimize
if self.data_set_to_optimize == 'validation':
metric = self.metric_to_optimize + '_val'
loss = metrics_val[metric]
logger.info('{}: {:.4f}'.format(metric, metrics_val[metric]))
elif self.data_set_to_optimize == 'prediction':
metric = self.metric_to_optimize + '_pred'
loss = metrics_pred[metric]
logger.info('{}: {:.4f}'.format(metric, metrics_pred[metric]))
# write loss, hyperparameters, metrics and run_time to the optimization
# results file
optimization_results_logger(self, loss, hyperparams_dict, metrics_val,
metrics_pred, run_time)
# remove training folders in order to avoid space issues in long runs
if self.remove_training_folders:
run_folder = os.path.join(self.working_directory, 'run' +
str(self.iteration.value))
shutil.rmtree(run_folder, ignore_errors=True)
# save trials file
if self.optimization_type != 'genetic_algorithm':
save(self)
return loss