Source code for sklearn_genetic.genetic_search

import random
import time
import warnings

import numpy as np
from deap import base, creator, tools
from sklearn.base import clone
from sklearn.model_selection import cross_validate
from sklearn.base import is_classifier, is_regressor, BaseEstimator, MetaEstimatorMixin
from sklearn.feature_selection import SelectorMixin
from sklearn.utils import check_X_y
from sklearn.utils.validation import check_is_fitted
from sklearn.utils.metaestimators import available_if
from sklearn.feature_selection._from_model import _estimator_has
from sklearn.metrics import check_scoring
from sklearn.exceptions import NotFittedError
from sklearn.model_selection._search import BaseSearchCV
from sklearn.model_selection._split import check_cv
from sklearn.metrics._scorer import _check_multimetric_scoring

from .parameters import Algorithms, Criteria
from .space import Space
from .algorithms import algorithms_factory
from .callbacks.validations import check_callback
from .schedules.validations import check_adapter
from .utils.cv_scores import (
    create_gasearch_cv_results_,
    create_feature_selection_cv_results_,
)
from .utils.random import weighted_bool_individual
from .utils.tools import cxUniform, mutFlipBit


[docs]class GASearchCV(BaseSearchCV): """ Evolutionary optimization over hyperparameters. GASearchCV implements a "fit" and a "score" method. It also implements "predict", "predict_proba", "decision_function", "predict_log_proba" if they are implemented in the estimator used. The parameters of the estimator used to apply these methods are optimized by cross-validated search over parameter settings. Parameters ---------- estimator : estimator object, default=None estimator object implementing 'fit' The object to use to fit the data. cv : int, cross-validation generator or an iterable, default=None Determines the cross-validation splitting strategy. Possible inputs for cv are: - None, to use the default 5-fold cross validation, - int, to specify the number of folds in a `(Stratified)KFold`, - CV splitter, - An iterable yielding (train, test) splits as arrays of indices. For int/None inputs, if the estimator is a classifier and ``y`` is either binary or multiclass, :class:`StratifiedKFold` is used. In all other cases, :class:`KFold` is used. These splitters are instantiated with `shuffle=False` so the splits will be the same across calls. param_grid : dict, default=None Grid with the parameters to tune, expects keys a valid name of hyperparameter based on the estimator selected and as values one of :class:`~sklearn_genetic.space.Integer` , :class:`~sklearn_genetic.space.Categorical` :class:`~sklearn_genetic.space.Continuous` classes. At least two parameters are advised to be provided in order to successfully make an optimization routine. population_size : int, default=10 Size of the initial population to sample randomly generated individuals. generations : int, default=40 Number of generations or iterations to run the evolutionary algorithm. crossover_probability : float or a Scheduler, default=0.8 Probability of crossover operation between two individuals. mutation_probability : float or a Scheduler, default=0.1 Probability of child mutation. tournament_size : int, default=3 Number of individuals to perform tournament selection. elitism : bool, default=True If True takes the *tournament_size* best solution to the next generation. scoring : str, callable, list, tuple or dict, default=None Strategy to evaluate the performance of the cross-validated model on the test set. If `scoring` represents a single score, one can use: - a single string; - a callable that returns a single value. If `scoring` represents multiple scores, one can use: - a list or tuple of unique strings; - a callable returning a dictionary where the keys are the metric names and the values are the metric scores; - a dictionary with metric names as keys and callables a values. n_jobs : int, default=None Number of jobs to run in parallel. Training the estimator and computing the score are parallelized over the cross-validation splits. ``None`` means 1 unless in a :obj:`joblib.parallel_backend` context. ``-1`` means using all processors. verbose : bool, default=True If ``True``, shows the metrics on the optimization routine. keep_top_k : int, default=1 Number of best solutions to keep in the hof object. If a callback stops the algorithm before k iterations, it will return only one set of parameters per iteration. criteria : {'max', 'min'} , default='max' ``max`` if a higher scoring metric is better, ``min`` otherwise. algorithm : {'eaMuPlusLambda', 'eaMuCommaLambda', 'eaSimple'}, default='eaMuPlusLambda' Evolutionary algorithm to use. See more details in the deap algorithms documentation. refit : bool, str, or callable, default=True Refit an estimator using the best found parameters on the whole dataset. For multiple metric evaluation, this needs to be a `str` denoting the scorer that would be used to find the best parameters for refitting the estimator at the end. The refitted estimator is made available at the ``best_estimator_`` attribute and permits using ``predict`` directly on this ``GASearchCV`` instance. Also for multiple metric evaluation, the attributes ``best_index_``, ``best_score_`` and ``best_params_`` will only be available if ``refit`` is set and all of them will be determined w.r.t this specific scorer. See ``scoring`` parameter to know more about multiple metric evaluation. If ``False``, it is not possible to make predictions using this GASearchCV instance after fitting. pre_dispatch : int or str, default='2*n_jobs' Controls the number of jobs that get dispatched during parallel execution. Reducing this number can be useful to avoid an explosion of memory consumption when more jobs get dispatched than CPUs can process. This parameter can be: - None, in which case all the jobs are immediately created and spawned. Use this for lightweight and fast-running jobs, to avoid delays due to on-demand spawning of the jobs - An int, giving the exact number of total jobs that are spawned - A str, giving an expression as a function of n_jobs, as in '2*n_jobs' error_score : 'raise' or numeric, default=np.nan Value to assign to the score if an error occurs in estimator fitting. If set to ``'raise'``, the error is raised. If a numeric value is given, FitFailedWarning is raised. return_train_score: bool, default=False If ``False``, the ``cv_results_`` attribute will not include training scores. Computing training scores is used to get insights on how different parameter settings impact the overfitting/underfitting trade-off. However computing the scores on the training set can be computationally expensive and is not strictly required to select the parameters that yield the best generalization performance. log_config : :class:`~sklearn_genetic.mlflow.MLflowConfig`, default = None Configuration to log metrics and models to mlflow, of None, no mlflow logging will be performed Attributes ---------- logbook : :class:`DEAP.tools.Logbook` Contains the logs of every set of hyperparameters fitted with its average scoring metric. history : dict Dictionary of the form: {"gen": [], "fitness": [], "fitness_std": [], "fitness_max": [], "fitness_min": []} *gen* returns the index of the evaluated generations. Each entry on the others lists, represent the average metric in each generation. cv_results_ : dict of numpy (masked) ndarrays A dict with keys as column headers and values as columns, that can be imported into a pandas ``DataFrame``. best_estimator_ : estimator Estimator that was chosen by the search, i.e. estimator which gave highest score on the left out data. Not available if ``refit=False``. best_params_ : dict Parameter setting that gave the best results on the hold out data. best_index_ : int The index (of the ``cv_results_`` arrays) which corresponds to the best candidate parameter setting. The dict at ``search.cv_results_['params'][search.best_index_]`` gives the parameter setting for the best model, that gives the highest mean score (``search.best_score_``). scorer_ : function or a dict Scorer function used on the held out data to choose the best parameters for the model. n_splits_ : int The number of cross-validation splits (folds/iterations). refit_time_ : float Seconds used for refitting the best model on the whole dataset. This is present only if ``refit`` is not False. """ def __init__( self, estimator, cv=3, param_grid=None, scoring=None, population_size=50, generations=80, crossover_probability=0.2, mutation_probability=0.8, tournament_size=3, elitism=True, verbose=True, keep_top_k=1, criteria="max", algorithm="eaMuPlusLambda", refit=True, n_jobs=1, pre_dispatch="2*n_jobs", error_score=np.nan, return_train_score=False, log_config=None, ): self.estimator = estimator self.cv = cv self.scoring = scoring self.population_size = population_size self.generations = generations self.crossover_probability = crossover_probability self.mutation_probability = mutation_probability self.crossover_adapter = check_adapter(self.crossover_probability) self.mutation_adapter = check_adapter(self.mutation_probability) self.tournament_size = tournament_size self.elitism = elitism self.verbose = verbose self.keep_top_k = keep_top_k self.criteria = criteria self.param_grid = param_grid self.algorithm = algorithm self.refit = refit self.n_jobs = n_jobs self.pre_dispatch = pre_dispatch self.error_score = error_score self.return_train_score = return_train_score self.creator = creator self.log_config = log_config # Check that the estimator is compatible with scikit-learn if not is_classifier(self.estimator) and not is_regressor(self.estimator): raise ValueError(f"{self.estimator} is not a valid Sklearn classifier or regressor") if criteria not in Criteria.list(): raise ValueError(f"Criteria must be one of {Criteria.list()}, got {criteria} instead") # Minimization is handle like an optimization problem with a change in the score sign elif criteria == Criteria.max.value: self.criteria_sign = 1.0 elif criteria == Criteria.min.value: self.criteria_sign = -1.0 # Saves the param_grid and computes some extra properties in the same object self.space = Space(param_grid) if len(self.space) == 1: # pragma: no cover warnings.warn( "Warning, only one parameter was provided to the param_grid, the optimization routine " "might not have effect or it could lead to errors, it's advised to use at least 2 parameters" ) super(GASearchCV, self).__init__( estimator=estimator, scoring=scoring, n_jobs=n_jobs, refit=refit, cv=cv, verbose=verbose, pre_dispatch=pre_dispatch, error_score=error_score, ) def _register(self): """ This function is the responsible for registering the DEAPs necessary methods and create other objects to hold the hof, logbook and stats. """ self.toolbox = base.Toolbox() self.creator.create("FitnessMax", base.Fitness, weights=[self.criteria_sign]) self.creator.create("Individual", list, fitness=creator.FitnessMax) attributes = [] # Assign all the parameters defined in the param_grid # It uses the distribution parameter to set the sampling function for parameter, dimension in self.space.param_grid.items(): self.toolbox.register(f"{parameter}", dimension.sample) attributes.append(getattr(self.toolbox, parameter)) IND_SIZE = 1 self.toolbox.register( "individual", tools.initCycle, creator.Individual, tuple(attributes), n=IND_SIZE, ) self.toolbox.register("population", tools.initRepeat, list, self.toolbox.individual) if len(self.space) == 1: sampler = list(self.space.param_grid.values())[0] lower, upper = sampler.lower, sampler.upper self.toolbox.register( "mate", tools.cxSimulatedBinaryBounded, low=lower, up=upper, eta=10 ) else: self.toolbox.register("mate", tools.cxTwoPoint) self.toolbox.register("mutate", self.mutate) if self.elitism: self.toolbox.register("select", tools.selTournament, tournsize=self.tournament_size) else: self.toolbox.register("select", tools.selRoulette) self.toolbox.register("evaluate", self.evaluate) self._pop = self.toolbox.population(n=self.population_size) self._hof = tools.HallOfFame(self.keep_top_k) self._stats = tools.Statistics(lambda ind: ind.fitness.values) self._stats.register("fitness", np.mean) self._stats.register("fitness_std", np.std) self._stats.register("fitness_max", np.max) self._stats.register("fitness_min", np.min) self.logbook = tools.Logbook() def mutate(self, individual): """ This function is responsible for change a randomly selected parameter from an individual Parameters ---------- individual: Individual object The individual (set of hyperparameters) that is being generated Returns ------- Mutated individual """ # Randomly select one of the hyperparameters gen = random.randrange(0, len(self.space)) parameter_idx = self.space.parameters[gen] parameter = self.space[parameter_idx] # Using the defined distribution from the para_grid value # Make a random sample of the parameter individual[gen] = parameter.sample() return [individual] def evaluate(self, individual): """ Compute the cross-validation scores and record the logbook and mlflow (if specified) Parameters ---------- individual: Individual object The individual (set of hyperparameters) that is being evaluated Returns ------- The fitness value of the estimator candidate, corresponding to the cv-score """ # Dictionary representation of the individual with key-> hyperparameter name, value -> value current_generation_params = { key: individual[n] for n, key in enumerate(self.space.parameters) } local_estimator = clone(self.estimator) local_estimator.set_params(**current_generation_params) # Compute the cv-metrics cv_results = cross_validate( local_estimator, self.X_, self.y_, cv=self.cv, scoring=self.scoring, n_jobs=self.n_jobs, pre_dispatch=self.pre_dispatch, error_score=self.error_score, return_train_score=self.return_train_score, ) cv_scores = cv_results[f"test_{self.refit_metric}"] score = np.mean(cv_scores) # Uses the log config to save in remote log server (e.g MLflow) if self.log_config is not None: self.log_config.create_run( parameters=current_generation_params, score=score, estimator=local_estimator, ) # These values are used to compute cv_results_ property current_generation_params["score"] = score current_generation_params["cv_scores"] = cv_scores current_generation_params["fit_time"] = cv_results["fit_time"] current_generation_params["score_time"] = cv_results["score_time"] for metric in self.metrics_list: current_generation_params[f"test_{metric}"] = cv_results[f"test_{metric}"] if self.return_train_score: current_generation_params[f"train_{metric}"] = cv_results[f"train_{metric}"] index = len(self.logbook.chapters["parameters"]) current_generation_params = {"index": index, **current_generation_params} # Log the hyperparameters and the cv-score self.logbook.record(parameters=current_generation_params) return [score]
[docs] def fit(self, X, y, callbacks=None): """ Main method of GASearchCV, starts the optimization procedure with the hyperparameters of the given estimator Parameters ---------- X : array-like of shape (n_samples, n_features) The data to fit. Can be for example a list, or an array. y : array-like of shape (n_samples,) or (n_samples, n_outputs), \ default=None The target variable to try to predict in the case of supervised learning. callbacks: list or callable One or a list of the callbacks methods available in :class:`~sklearn_genetic.callbacks`. The callback is evaluated after fitting the estimators from the generation 1. """ self.X_ = X self.y_ = y self._n_iterations = self.generations + 1 self.refit_metric = "score" self.multimetric_ = False # Make sure the callbacks are valid self.callbacks = check_callback(callbacks) if callable(self.scoring): self.scorer_ = self.scoring self.metrics_list = [self.refit_metric] elif self.scoring is None or isinstance(self.scoring, str): self.scorer_ = check_scoring(self.estimator, self.scoring) self.metrics_list = [self.refit_metric] else: self.scorer_ = _check_multimetric_scoring(self.estimator, self.scoring) self._check_refit_for_multimetric(self.scorer_) self.refit_metric = self.refit self.metrics_list = self.scorer_.keys() self.multimetric_ = True # Check cv and get the n_splits cv_orig = check_cv(self.cv, y, classifier=is_classifier(self.estimator)) self.n_splits_ = cv_orig.get_n_splits(X, y) # Set the DEAPs necessary methods self._register() # Optimization routine from the selected evolutionary algorithm pop, log, n_gen = self._select_algorithm(pop=self._pop, stats=self._stats, hof=self._hof) # Update the _n_iterations value as the algorithm could stop earlier due a callback self._n_iterations = n_gen self.cv_results_ = create_gasearch_cv_results_( logbook=self.logbook, space=self.space, return_train_score=self.return_train_score, metrics=self.metrics_list, ) self.history = { "gen": log.select("gen"), "fitness": log.select("fitness"), "fitness_std": log.select("fitness_std"), "fitness_max": log.select("fitness_max"), "fitness_min": log.select("fitness_min"), } # Imitate the logic of scikit-learn refit parameter if self.refit: self.best_index_ = self.cv_results_[f"rank_test_{self.refit_metric}"].argmin() self.best_score_ = self.cv_results_[f"mean_test_{self.refit_metric}"][self.best_index_] self.best_params_ = self.cv_results_["params"][self.best_index_] self.estimator.set_params(**self.best_params_) refit_start_time = time.time() self.estimator.fit( self.X_, self.y_, ) refit_end_time = time.time() self.refit_time_ = refit_end_time - refit_start_time self.best_estimator_ = self.estimator self.estimator_ = self.best_estimator_ # hof keeps the best params according to the fitness value # To be consistent with self.best_estimator_, if more than 1 model gets the # same score, it could lead to differences between hof and self.best_estimator_ self._hof.remove(0) self._hof.items.insert(0, list(self.best_params_.values())) self._hof.keys.insert(0, self.best_score_) self.hof = { k: {key: self._hof[k][n] for n, key in enumerate(self.space.parameters)} for k in range(len(self._hof)) } del self.creator.FitnessMax del self.creator.Individual return self
def _select_algorithm(self, pop, stats, hof): """ It selects the algorithm to run from the sklearn_genetic.algorithms module based in the parameter self.algorithm. Parameters ---------- pop: pop object from DEAP stats: stats object from DEAP hof: hof object from DEAP Returns ------- pop: pop object The last evaluated population log: Logbook object It contains the calculated metrics {'fitness', 'fitness_std', 'fitness_max', 'fitness_min'} the number of generations and the number of evaluated individuals per generation n_gen: int The number of generations that the evolutionary algorithm ran """ selected_algorithm = algorithms_factory.get(self.algorithm, None) if selected_algorithm: pop, log, gen = selected_algorithm( pop, self.toolbox, mu=self.population_size, lambda_=2 * self.population_size, cxpb=self.crossover_adapter, stats=stats, mutpb=self.mutation_adapter, ngen=self.generations, halloffame=hof, callbacks=self.callbacks, verbose=self.verbose, estimator=self, ) else: raise ValueError( f"The algorithm {self.algorithm} is not supported, " f"please select one from {Algorithms.list()}" ) return pop, log, gen def _run_search(self, evaluate_candidates): pass # noqa @property def _fitted(self): try: check_is_fitted(self.estimator) is_fitted = True except Exception as e: is_fitted = False has_history = hasattr(self, "history") and bool(self.history) return all([is_fitted, has_history, self.refit]) def __getitem__(self, index): """ Parameters ---------- index: slice required to get Returns ------- Best solution of the iteration corresponding to the index number """ if not self._fitted: raise NotFittedError( f"This GASearchCV instance is not fitted yet " f"or used refit=False. Call 'fit' with appropriate " f"arguments before using this estimator." ) return { "gen": self.history["gen"][index], "fitness": self.history["fitness"][index], "fitness_std": self.history["fitness_std"][index], "fitness_max": self.history["fitness_max"][index], "fitness_min": self.history["fitness_min"][index], } def __iter__(self): self.n = 0 return self def __next__(self): """ Returns ------- Iteration over the statistics found in each generation """ if self.n < self._n_iterations + 1: result = self.__getitem__(self.n) self.n += 1 return result else: raise StopIteration # pragma: no cover def __len__(self): """ Returns ------- Number of generations fitted if .fit method has been called, self.generations otherwise """ return self._n_iterations
[docs]class GAFeatureSelectionCV(MetaEstimatorMixin, SelectorMixin, BaseEstimator): """ Evolutionary optimization for feature selection. GAFeatureSelectionCV implements a "fit" and a "score" method. It also implements "predict", "predict_proba", "decision_function", "predict_log_proba" if they are implemented in the estimator used. The features (variables) used by the estimator are found by optimizing the cv-scores and by minimizing the number of features Parameters ---------- estimator : estimator object, default=None estimator object implementing 'fit' The object to use to fit the data. cv : int, cross-validation generator or an iterable, default=None Determines the cross-validation splitting strategy. Possible inputs for cv are: - None, to use the default 5-fold cross validation, - int, to specify the number of folds in a `(Stratified)KFold`, - CV splitter, - An iterable yielding (train, test) splits as arrays of indices. For int/None inputs, if the estimator is a classifier and ``y`` is either binary or multiclass, :class:`StratifiedKFold` is used. In all other cases, :class:`KFold` is used. These splitters are instantiated with `shuffle=False` so the splits will be the same across calls. population_size : int, default=10 Size of the initial population to sample randomly generated individuals. generations : int, default=40 Number of generations or iterations to run the evolutionary algorithm. crossover_probability : float or a Scheduler, default=0.2 Probability of crossover operation between two individuals. mutation_probability : float or a Scheduler, default=0.8 Probability of child mutation. tournament_size : int, default=3 Number of individuals to perform tournament selection. elitism : bool, default=True If True takes the *tournament_size* best solution to the next generation. max_features : int, default=None The upper bound number of features to be selected. scoring : str, callable, list, tuple or dict, default=None Strategy to evaluate the performance of the cross-validated model on the test set. If `scoring` represents a single score, one can use: - a single string; - a callable that returns a single value. If `scoring` represents multiple scores, one can use: - a list or tuple of unique strings; - a callable returning a dictionary where the keys are the metric names and the values are the metric scores; - a dictionary with metric names as keys and callables a values. n_jobs : int, default=None Number of jobs to run in parallel. Training the estimator and computing the score are parallelized over the cross-validation splits. ``None`` means 1 unless in a :obj:`joblib.parallel_backend` context. ``-1`` means using all processors. verbose : bool, default=True If ``True``, shows the metrics on the optimization routine. keep_top_k : int, default=1 Number of best solutions to keep in the hof object. If a callback stops the algorithm before k iterations, it will return only one set of parameters per iteration. criteria : {'max', 'min'} , default='max' ``max`` if a higher scoring metric is better, ``min`` otherwise. algorithm : {'eaMuPlusLambda', 'eaMuCommaLambda', 'eaSimple'}, default='eaMuPlusLambda' Evolutionary algorithm to use. See more details in the deap algorithms documentation. refit : bool, str, or callable, default=True Refit an estimator using the best found parameters on the whole dataset. For multiple metric evaluation, this needs to be a `str` denoting the scorer that would be used to find the best parameters for refitting the estimator at the end. The refitted estimator is made available at the ``best_estimator_`` attribute and permits using ``predict`` directly on this ``FeatureSelectionCV`` instance. Also for multiple metric evaluation, the attributes ``best_index_``, ``best_score_`` and ``best_params_`` will only be available if ``refit`` is set and all of them will be determined w.r.t this specific scorer. See ``scoring`` parameter to know more about multiple metric evaluation. If ``False``, it is not possible to make predictions using this GASearchCV instance after fitting. pre_dispatch : int or str, default='2*n_jobs' Controls the number of jobs that get dispatched during parallel execution. Reducing this number can be useful to avoid an explosion of memory consumption when more jobs get dispatched than CPUs can process. This parameter can be: - None, in which case all the jobs are immediately created and spawned. Use this for lightweight and fast-running jobs, to avoid delays due to on-demand spawning of the jobs - An int, giving the exact number of total jobs that are spawned - A str, giving an expression as a function of n_jobs, as in '2*n_jobs' error_score : 'raise' or numeric, default=np.nan Value to assign to the score if an error occurs in estimator fitting. If set to ``'raise'``, the error is raised. If a numeric value is given, FitFailedWarning is raised. return_train_score: bool, default=False If ``False``, the ``cv_results_`` attribute will not include training scores. Computing training scores is used to get insights on how different parameter settings impact the overfitting/underfitting trade-off. However computing the scores on the training set can be computationally expensive and is not strictly required to select the parameters that yield the best generalization performance. log_config : :class:`~sklearn_genetic.mlflow.MLflowConfig`, default = None Configuration to log metrics and models to mlflow, of None, no mlflow logging will be performed Attributes ---------- logbook : :class:`DEAP.tools.Logbook` Contains the logs of every set of hyperparameters fitted with its average scoring metric. history : dict Dictionary of the form: {"gen": [], "fitness": [], "fitness_std": [], "fitness_max": [], "fitness_min": []} *gen* returns the index of the evaluated generations. Each entry on the others lists, represent the average metric in each generation. cv_results_ : dict of numpy (masked) ndarrays A dict with keys as column headers and values as columns, that can be imported into a pandas ``DataFrame``. best_estimator_ : estimator Estimator that was chosen by the search, i.e. estimator which gave highest score on the left out data. Not available if ``refit=False``. best_features_ : list List of bool, each index represents one feature in the same order the data was fed. 1 means the feature was selected, 0 means the features was discarded. support_ : list The mask of selected features. scorer_ : function or a dict Scorer function used on the held out data to choose the best parameters for the model. n_splits_ : int The number of cross-validation splits (folds/iterations). n_features_in_ : int Number of features seen (selected) during fit. refit_time_ : float Seconds used for refitting the best model on the whole dataset. This is present only if ``refit`` is not False. """ def __init__( self, estimator, cv=3, scoring=None, population_size=50, generations=80, crossover_probability=0.2, mutation_probability=0.8, tournament_size=3, elitism=True, max_features=None, verbose=True, keep_top_k=1, criteria="max", algorithm="eaMuPlusLambda", refit=True, n_jobs=1, pre_dispatch="2*n_jobs", error_score=np.nan, return_train_score=False, log_config=None, ): self.estimator = estimator self.cv = cv self.scoring = scoring self.population_size = population_size self.generations = generations self.crossover_probability = crossover_probability self.mutation_probability = mutation_probability self.crossover_adapter = check_adapter(self.crossover_probability) self.mutation_adapter = check_adapter(self.mutation_probability) self.tournament_size = tournament_size self.elitism = elitism self.max_features = max_features self.verbose = verbose self.keep_top_k = keep_top_k self.criteria = criteria self.algorithm = algorithm self.refit = refit self.n_jobs = n_jobs self.pre_dispatch = pre_dispatch self.error_score = error_score self.return_train_score = return_train_score self.creator = creator self.log_config = log_config # Check that the estimator is compatible with scikit-learn if not is_classifier(self.estimator) and not is_regressor(self.estimator): raise ValueError(f"{self.estimator} is not a valid Sklearn classifier or regressor") if criteria not in Criteria.list(): raise ValueError(f"Criteria must be one of {Criteria.list()}, got {criteria} instead") # Minimization is handle like an optimization problem with a change in the score sign elif criteria == Criteria.max.value: self.criteria_sign = 1.0 elif criteria == Criteria.min.value: self.criteria_sign = -1.0 def _register(self): """ This function is the responsible for registering the DEAPs necessary methods and create other objects to hold the hof, logbook and stats. """ self.toolbox = base.Toolbox() # Criteria sign to set max or min problem # And -1.0 as second weight to minimize number of features self.creator.create("FitnessMax", base.Fitness, weights=[self.criteria_sign, -1.0]) self.creator.create("Individual", list, fitness=creator.FitnessMax) # Register the array to choose the features # Each binary value represents if the feature is selected or not self.toolbox.register( "individual", weighted_bool_individual, creator.Individual, weight=self.features_proportion, size=self.n_features, ) self.toolbox.register("population", tools.initRepeat, list, self.toolbox.individual) self.toolbox.register("mate", cxUniform, indpb=self.crossover_adapter.current_value) self.toolbox.register("mutate", mutFlipBit, indpb=self.mutation_adapter.current_value) if self.elitism: self.toolbox.register("select", tools.selTournament, tournsize=self.tournament_size) else: self.toolbox.register("select", tools.selRoulette) self.toolbox.register("evaluate", self.evaluate) self._pop = self.toolbox.population(n=self.population_size) self._hof = tools.HallOfFame(self.keep_top_k) # Stats among axis 0 to get two values: # One based on the score and the other in the number of features self._stats = tools.Statistics(lambda ind: ind.fitness.values) self._stats.register("fitness", np.mean, axis=0) self._stats.register("fitness_std", np.std, axis=0) self._stats.register("fitness_max", np.max, axis=0) self._stats.register("fitness_min", np.min, axis=0) self.logbook = tools.Logbook() def evaluate(self, individual): """ Compute the cross-validation scores and record the logbook and mlflow (if specified) Parameters ---------- individual: Individual object The individual (set of features) that is being evaluated Returns ------- fitness: List Returns a list with two values. The first one is the corresponding to the cv-score The second one is the number of features selected """ bool_individual = np.array(individual, dtype=bool) current_generation_params = {"features": bool_individual} local_estimator = clone(self.estimator) n_selected_features = np.sum(individual) # Compute the cv-metrics using only the selected features cv_results = cross_validate( local_estimator, self.X_[:, bool_individual], self.y_, cv=self.cv, scoring=self.scoring, n_jobs=self.n_jobs, pre_dispatch=self.pre_dispatch, error_score=self.error_score, return_train_score=self.return_train_score, ) cv_scores = cv_results[f"test_{self.refit_metric}"] score = np.mean(cv_scores) # Uses the log config to save in remote log server (e.g MLflow) if self.log_config is not None: self.log_config.create_run( parameters=current_generation_params, score=score, estimator=local_estimator, ) # These values are used to compute cv_results_ property current_generation_params["score"] = score current_generation_params["cv_scores"] = cv_scores current_generation_params["fit_time"] = cv_results["fit_time"] current_generation_params["score_time"] = cv_results["score_time"] for metric in self.metrics_list: current_generation_params[f"test_{metric}"] = cv_results[f"test_{metric}"] if self.return_train_score: current_generation_params[f"train_{metric}"] = cv_results[f"train_{metric}"] index = len(self.logbook.chapters["parameters"]) current_generation_features = {"index": index, **current_generation_params} # Log the features and the cv-score self.logbook.record(parameters=current_generation_features) # Penalize individuals with more features than the max_features parameter if self.max_features and ( n_selected_features > self.max_features or n_selected_features == 0 ): score = -self.criteria_sign * 100000 return [score, n_selected_features]
[docs] def fit(self, X, y, callbacks=None): """ Main method of GAFeatureSelectionCV, starts the optimization procedure with to find the best features set Parameters ---------- X : array-like of shape (n_samples, n_features) The data to fit. Can be for example a list, or an array. y : array-like of shape (n_samples,) or (n_samples, n_outputs), \ default=None The target variable to try to predict in the case of supervised learning. callbacks: list or callable One or a list of the callbacks methods available in :class:`~sklearn_genetic.callbacks`. The callback is evaluated after fitting the estimators from the generation 1. """ self.X_, self.y_ = check_X_y(X, y) self.n_features = X.shape[1] self._n_iterations = self.generations + 1 self.refit_metric = "score" self.multimetric_ = False self.features_proportion = None if self.max_features: self.features_proportion = self.max_features / self.n_features # Make sure the callbacks are valid self.callbacks = check_callback(callbacks) if callable(self.scoring): self.scorer_ = self.scoring self.metrics_list = [self.refit_metric] elif self.scoring is None or isinstance(self.scoring, str): self.scorer_ = check_scoring(self.estimator, self.scoring) self.metrics_list = [self.refit_metric] else: self.scorer_ = _check_multimetric_scoring(self.estimator, self.scoring) self._check_refit_for_multimetric(self.scorer_) self.refit_metric = self.refit self.metrics_list = self.scorer_.keys() self.multimetric_ = True # Check cv and get the n_splits cv_orig = check_cv(self.cv, y, classifier=is_classifier(self.estimator)) self.n_splits_ = cv_orig.get_n_splits(X, y) # Set the DEAPs necessary methods self._register() # Optimization routine from the selected evolutionary algorithm pop, log, n_gen = self._select_algorithm(pop=self._pop, stats=self._stats, hof=self._hof) # Update the _n_iterations value as the algorithm could stop earlier due a callback self._n_iterations = n_gen self.best_features_ = np.array(self._hof[0], dtype=bool) self.support_ = self.best_features_ self.cv_results_ = create_feature_selection_cv_results_( logbook=self.logbook, return_train_score=self.return_train_score, metrics=self.metrics_list, ) self.history = { "gen": log.select("gen"), "fitness": log.select("fitness"), "fitness_std": log.select("fitness_std"), "fitness_max": log.select("fitness_max"), "fitness_min": log.select("fitness_min"), } if self.refit: bool_individual = np.array(self.best_features_, dtype=bool) refit_start_time = time.time() self.estimator.fit(self.X_[:, bool_individual], self.y_) refit_end_time = time.time() self.refit_time_ = refit_end_time - refit_start_time self.best_estimator_ = self.estimator self.estimator_ = self.best_estimator_ self.hof = self._hof del self.creator.FitnessMax del self.creator.Individual return self
def _select_algorithm(self, pop, stats, hof): """ It selects the algorithm to run from the sklearn_genetic.algorithms module based in the parameter self.algorithm. Parameters ---------- pop: pop object from DEAP stats: stats object from DEAP hof: hof object from DEAP Returns ------- pop: pop object The last evaluated population log: Logbook object It contains the calculated metrics {'fitness', 'fitness_std', 'fitness_max', 'fitness_min'} the number of generations and the number of evaluated individuals per generation n_gen: int The number of generations that the evolutionary algorithm ran """ selected_algorithm = algorithms_factory.get(self.algorithm, None) if selected_algorithm: pop, log, gen = selected_algorithm( pop, self.toolbox, mu=self.population_size, lambda_=2 * self.population_size, cxpb=self.crossover_adapter, stats=stats, mutpb=self.mutation_adapter, ngen=self.generations, halloffame=hof, callbacks=self.callbacks, verbose=self.verbose, estimator=self, ) else: raise ValueError( f"The algorithm {self.algorithm} is not supported, " f"please select one from {Algorithms.list()}" ) return pop, log, gen def _run_search(self, evaluate_candidates): pass # noqa @property def _fitted(self): try: check_is_fitted(self.estimator) is_fitted = True except Exception as e: is_fitted = False has_history = hasattr(self, "history") and bool(self.history) return all([is_fitted, has_history, self.refit]) def __getitem__(self, index): """ Parameters ---------- index: slice required to get Returns ------- Best solution of the iteration corresponding to the index number """ if not self._fitted: raise NotFittedError( f"This GAFeatureSelectionCV instance is not fitted yet " f"or used refit=False. Call 'fit' with appropriate " f"arguments before using this estimator." ) return { "gen": self.history["gen"][index], "fitness": self.history["fitness"][index], "fitness_std": self.history["fitness_std"][index], "fitness_max": self.history["fitness_max"][index], "fitness_min": self.history["fitness_min"][index], } def __iter__(self): self.n = 0 return self def __next__(self): """ Returns ------- Iteration over the statistics found in each generation """ if self.n < self._n_iterations + 1: result = self.__getitem__(self.n) self.n += 1 return result else: raise StopIteration # pragma: no cover def __len__(self): """ Returns ------- Number of generations fitted if .fit method has been called, self.generations otherwise """ return self._n_iterations def _check_refit_for_multimetric(self, scores): # pragma: no cover """Check `refit` is compatible with `scores` is valid""" multimetric_refit_msg = ( "For multi-metric scoring, the parameter refit must be set to a " "scorer key or a callable to refit an estimator with the best " "parameter setting on the whole data and make the best_* " "attributes available for that metric. If this is not needed, " f"refit should be set to False explicitly. {self.refit!r} was " "passed." ) valid_refit_dict = isinstance(self.refit, str) and self.refit in scores if self.refit is not False and not valid_refit_dict and not callable(self.refit): raise ValueError(multimetric_refit_msg) @property def n_features_in_(self): # pragma: no cover """Number of features seen during `fit`.""" # For consistency with other estimators we raise a AttributeError so # that hasattr() fails if the estimator isn't fitted. if not self._fitted: raise AttributeError( "{} object has no n_features_in_ attribute.".format(self.__class__.__name__) ) return self.n_features def _get_support_mask(self): if not self._fitted: raise NotFittedError( f"This GAFeatureSelectionCV instance is not fitted yet " f"or used refit=False. Call 'fit' with appropriate " f"arguments before using this estimator." ) return self.best_features_
[docs] @available_if(_estimator_has("decision_function")) def decision_function(self, X): """Call decision_function on the estimator with the best found features. Only available if ``refit=True`` and the underlying estimator supports ``decision_function``. Parameters ---------- X : indexable, length n_samples Must fulfill the input assumptions of the underlying estimator. Returns ------- y_score : ndarray of shape (n_samples,) or (n_samples, n_classes) \ or (n_samples, n_classes * (n_classes-1) / 2) Result of the decision function for `X` based on the estimator with the best found parameters. """ return self.estimator.decision_function(self.transform(X))
[docs] @available_if(_estimator_has("predict")) def predict(self, X): """Call predict on the estimator with the best found features. Only available if ``refit=True`` and the underlying estimator supports ``predict``. Parameters ---------- X : indexable, length n_samples Must fulfill the input assumptions of the underlying estimator. Returns ------- y_pred : ndarray of shape (n_samples,) The predicted labels or values for `X` based on the estimator with the best found parameters. """ return self.estimator.predict(self.transform(X))
[docs] @available_if(_estimator_has("predict_log_proba")) def predict_log_proba(self, X): """Call predict_log_proba on the estimator with the best found features. Only available if ``refit=True`` and the underlying estimator supports ``predict_log_proba``. Parameters ---------- X : indexable, length n_samples Must fulfill the input assumptions of the underlying estimator. Returns ------- y_pred : ndarray of shape (n_samples,) or (n_samples, n_classes) Predicted class log-probabilities for `X` based on the estimator with the best found parameters. The order of the classes corresponds to that in the fitted attribute :term:`classes_`. """ return self.estimator.predict_log_proba(self.transform(X))
[docs] @available_if(_estimator_has("predict_proba")) def predict_proba(self, X): """Call predict_proba on the estimator with the best found features. Only available if ``refit=True`` and the underlying estimator supports ``predict_proba``. Parameters ---------- X : indexable, length n_samples Must fulfill the input assumptions of the underlying estimator. Returns ------- y_pred : ndarray of shape (n_samples,) or (n_samples, n_classes) Predicted class probabilities for `X` based on the estimator with the best found parameters. The order of the classes corresponds to that in the fitted attribute :term:`classes_`. """ return self.estimator.predict_proba(self.transform(X))
[docs] @available_if(_estimator_has("score")) def score(self, X, y): """Return the score on the given data, if the estimator has been refit. This uses the score defined by ``scoring`` where provided, and the ``best_estimator_.score`` method otherwise. Parameters ---------- X : array-like of shape (n_samples, n_features) Input data, where `n_samples` is the number of samples and `n_features` is the number of features. y : array-like of shape (n_samples, n_output) \ or (n_samples,), default=None Target relative to X for classification or regression; None for unsupervised learning. Returns ------- score : float The score defined by ``scoring`` if provided, and the ``best_estimator_.score`` method otherwise. """ return self.estimator.score(self.transform(X), y)