import random
import time
import warnings
import numpy as np
from deap import base, creator, tools
from sklearn.base import clone
from sklearn.model_selection import cross_validate
from sklearn.base import is_classifier, is_regressor, BaseEstimator, MetaEstimatorMixin
try:
from sklearn.base import is_outlier_detector
except ImportError:
# Fallback for older sklearn versions
def is_outlier_detector(estimator):
return hasattr(estimator, "fit_predict") and hasattr(estimator, "decision_function")
def _safe_estimator_check(check, estimator):
try:
return check(estimator)
except AttributeError:
return False
def _is_classifier(estimator):
return _safe_estimator_check(is_classifier, estimator)
def _is_regressor(estimator):
return _safe_estimator_check(is_regressor, estimator)
def _is_outlier_detector(estimator):
return _safe_estimator_check(is_outlier_detector, estimator)
from sklearn.feature_selection import SelectorMixin
from sklearn.utils import check_X_y
from sklearn.utils.metaestimators import available_if
from sklearn.feature_selection._from_model import _estimator_has
from sklearn.metrics import check_scoring
from sklearn.exceptions import NotFittedError
from sklearn.model_selection._search import BaseSearchCV
from sklearn.model_selection._split import check_cv
from sklearn.metrics._scorer import _check_multimetric_scoring
from .parameters import Criteria
from .space import Categorical, Continuous, Integer, Space
from ._base import GeneticEstimatorMixin, reset_adapters as _reset_adapters
from .callbacks.validations import check_callback
from .schedules.validations import check_adapter
from .utils.cv_scores import (
create_gasearch_cv_results_,
create_feature_selection_cv_results_,
)
from .utils.random import weighted_bool_individual
from .utils.tools import cxUniform, mutFlipBit
from .evaluation import (
create_fit_stats as _create_fit_stats,
evaluate_population as _evaluate_population_batch,
logbook_record as _logbook_record,
record_fit_stats as _record_fit_stats,
validate_parallel_backend as _validate_parallel_backend,
)
from .population import (
initialize_feature_population,
initialize_search_population,
validate_population_initializer as _validate_population_initializer,
)
from .optimizer_control import (
adaptive_tournament_size,
validate_optimizer_control as _validate_optimizer_control,
)
import os
from .callbacks.model_checkpoint import ModelCheckpoint
from .config import EvolutionConfig, OptimizationConfig, PopulationConfig, RuntimeConfig
def _resolve_config_value(config, field_name, fallback):
if config is None:
return fallback
return getattr(config, field_name, fallback)
[docs]
class GASearchCV(GeneticEstimatorMixin, BaseSearchCV):
"""
Evolutionary optimization over hyperparameters.
GASearchCV implements a "fit" and a "score" method.
It also implements "predict", "predict_proba", "decision_function",
"predict_log_proba" if they are implemented in the
estimator used.
The parameters of the estimator used to apply these methods are optimized
by cross-validated search over parameter settings.
Parameters
----------
estimator : estimator object, default=None
estimator object implementing 'fit'
The object to use to fit the data.
cv : int, cross-validation generator or an iterable, default=None
Determines the cross-validation splitting strategy.
Possible inputs for cv are:
- None, to use the default 5-fold cross validation,
- int, to specify the number of folds in a `(Stratified)KFold`,
- CV splitter,
- An iterable yielding (train, test) splits as arrays of indices.
For int/None inputs, if the estimator is a classifier and ``y`` is
either binary or multiclass, :class:`StratifiedKFold` is used. In all
other cases, :class:`KFold` is used. These splitters are instantiated
with `shuffle=False` so the splits will be the same across calls.
param_grid : dict, default=None
Grid with the parameters to tune, expects keys a valid name
of hyperparameter based on the estimator selected and as values
one of :class:`~sklearn_genetic.space.Integer` ,
:class:`~sklearn_genetic.space.Categorical`
:class:`~sklearn_genetic.space.Continuous` classes.
At least two parameters are advised to be provided in order to successfully make
an optimization routine.
population_size : int, default=10
Size of the initial population to sample generated individuals.
evolution_config : :class:`~sklearn_genetic.config.EvolutionConfig`, default=None
Optional grouped configuration for core genetic algorithm controls such
as population size, generation count, crossover, mutation, tournament
size, elitism, hall-of-fame size, criteria, and algorithm.
population_config : :class:`~sklearn_genetic.config.PopulationConfig`, default=None
Optional grouped configuration for initial population behavior,
including ``initializer`` and ``warm_start_configs``.
runtime_config : :class:`~sklearn_genetic.config.RuntimeConfig`, default=None
Optional grouped configuration for parallelism, caching, train-score
collection, error handling, and verbose output.
optimization_config : :class:`~sklearn_genetic.config.OptimizationConfig`, default=None
Optional grouped configuration for local refinement, diversity control,
adaptive selection, fitness sharing, and robust final selection.
population_initializer : {'smart', 'random'}, default='smart'
Strategy used to generate the initial population. ``'smart'`` combines
valid warm-start configurations, valid estimator defaults, Latin
hypercube sampling for numeric dimensions, stratified categorical
values, and duplicate avoidance. ``'random'`` uses the previous random
sampling behavior.
generations : int, default=40
Number of generations or iterations to run the evolutionary algorithm.
crossover_probability : float or a Scheduler, default=0.8
Probability of crossover operation between two individuals.
mutation_probability : float or a Scheduler, default=0.1
Probability of child mutation.
tournament_size : int, default=3
Number of individuals to perform tournament selection.
elitism : bool, default=True
If True takes the *tournament_size* best solution to the next generation.
scoring : str, callable, list, tuple or dict, default=None
Strategy to evaluate the performance of the cross-validated model on
the test set.
If `scoring` represents a single score, one can use:
- a single string;
- a callable that returns a single value.
If `scoring` represents multiple scores, one can use:
- a list or tuple of unique strings;
- a callable returning a dictionary where the keys are the metric
names and the values are the metric scores;
- a dictionary with metric names as keys and callables a values.
n_jobs : int, default=None
Number of jobs to run in parallel. Candidate evaluations in each
generation are parallelized when possible; each candidate then runs
cross-validation sequentially to avoid nested parallelism.
``None`` means 1 unless in a :obj:`joblib.parallel_backend` context.
``-1`` means using all processors.
parallel_backend : {'auto', 'population', 'cv'}, default='auto'
Controls where ``n_jobs`` parallelism is applied during ``fit``.
``'auto'`` and ``'population'`` evaluate unique candidates in each
generation in parallel when possible. ``'cv'`` keeps candidate
evaluation serial and passes ``n_jobs`` to each candidate's
cross-validation call.
local_search : bool, default=False
If ``True``, run a short local refinement phase around the current
hall-of-fame individuals after the genetic search finishes.
local_search_top_k : int, default=1
Number of hall-of-fame individuals used as local-search seeds.
local_search_steps : int, default=1
Number of neighbor candidates generated per local-search seed.
local_search_radius : float, default=0.1
Fraction of the search range used to sample local numeric neighbors.
For categorical parameters, a different category is sampled.
diversity_control : bool, default=True
If ``True``, monitor diversity and stagnation to boost mutation,
replace duplicate candidates, and inject random immigrants.
adaptive_selection : bool, default=False
If ``True``, adapt tournament size from generation telemetry. Selection
pressure is reduced when diversity is low or the search is stagnant,
and slightly increased when the population is improving with enough
diversity.
selection_pressure_min : int, default=2
Minimum tournament size used by adaptive selection.
selection_pressure_max : int, default=None
Maximum tournament size used by adaptive selection. If ``None``, the
maximum is one larger than ``tournament_size``.
offspring_diversity_retries : int, default=0
Number of retries used when replacing duplicate or parent-matching
offspring with new random candidates.
diversity_threshold : float, default=0.25
Diversity value below which diversity control can trigger.
diversity_stagnation_generations : int, default=5
Number of stagnant generations after which diversity control can
inject random immigrants.
diversity_mutation_boost : float, default=2.0
Multiplicative boost applied to mutation probability when diversity
control triggers. The boosted value is capped to DEAP's valid range.
random_immigrants_fraction : float, default=0.1
Fraction of offspring replaced by random immigrants when diversity
control triggers.
fitness_sharing : bool, default=False
If ``True``, temporarily penalize candidates in crowded niches during
selection. Raw cross-validation scores and ``cv_results_`` are not
modified.
sharing_radius : float, default=0.2
Normalized distance below which two individuals are considered part of
the same niche for fitness sharing.
sharing_alpha : float, default=1.0
Shape parameter that controls how quickly sharing pressure decreases
with distance inside ``sharing_radius``.
final_selection : bool, default=False
If ``True``, re-evaluate the top ``final_selection_top_k`` candidates
after the GA finishes and select ``best_params_`` from those robust
final scores before refitting.
final_selection_top_k : int, default=3
Number of top candidates from the original GA ``cv_results_`` to
re-evaluate during final selection.
final_selection_cv : int, cross-validation splitter or iterable, default=None
Cross-validation strategy used for final selection. If ``None``, the
same CV splits used during the GA are reused.
verbose : bool, default=True
If ``True``, shows the metrics on the optimization routine.
keep_top_k : int, default=1
Number of best solutions to keep in the hof object. If a callback stops the algorithm before k iterations,
it will return only one set of parameters per iteration.
criteria : {'max', 'min'} , default='max'
``max`` if a higher scoring metric is better, ``min`` otherwise.
algorithm : {'eaMuPlusLambda', 'eaMuCommaLambda', 'eaSimple'}, default='eaMuPlusLambda'
Evolutionary algorithm to use.
See more details in the deap algorithms documentation.
refit : bool, str, or callable, default=True
Refit an estimator using the best found parameters on the whole
dataset.
For multiple metric evaluation, this needs to be a `str` denoting the
scorer that would be used to find the best parameters for refitting
the estimator at the end.
The refitted estimator is made available at the ``best_estimator_``
attribute and permits using ``predict`` directly on this
``GASearchCV`` instance.
Also for multiple metric evaluation, the attributes ``best_index_``,
``best_score_`` and ``best_params_`` will only be available if
``refit`` is set and all of them will be determined w.r.t this specific
scorer.
See ``scoring`` parameter to know more about multiple metric
evaluation.
If ``False``, it is not possible to make predictions
using this GASearchCV instance after fitting.
pre_dispatch : int or str, default='2*n_jobs'
Controls the number of jobs that get dispatched during parallel
execution. Reducing this number can be useful to avoid an
explosion of memory consumption when more jobs get dispatched
than CPUs can process. This parameter can be ``None`` to dispatch all
jobs immediately, an integer number of total jobs to spawn, or a string
expression as a function of ``n_jobs``, such as ``'2*n_jobs'``.
error_score : 'raise' or numeric, default=np.nan
Value to assign to the score if an error occurs in estimator fitting.
If set to ``'raise'``, the error is raised.
If a numeric value is given, FitFailedWarning is raised.
return_train_score: bool, default=False
If ``False``, the ``cv_results_`` attribute will not include training
scores.
Computing training scores is used to get insights on how different
parameter settings impact the overfitting/underfitting trade-off.
However computing the scores on the training set can be computationally
expensive and is not strictly required to select the parameters that
yield the best generalization performance.
log_config : :class:`~sklearn_genetic.mlflow.MLflowConfig`, default = None
Configuration to log metrics and models to mlflow, of None,
no mlflow logging will be performed
use_cache: bool, default=True
If set to true it will avoid to re-evaluating solutions that have already seen,
otherwise it will always evaluate the solutions to get the performance metrics
Attributes
----------
logbook : :class:`DEAP.tools.Logbook`
Contains the logs of every set of hyperparameters fitted with its average scoring metric.
history : dict
Dictionary with one list per generation. It includes ``gen``,
``fitness``, ``fitness_std``, ``fitness_best``, ``fitness_max``, ``fitness_min``,
population diversity fields, stagnation fields, optimizer-control
telemetry, and local-refinement telemetry.
cv_results_ : dict of numpy (masked) ndarrays
A dict with keys as column headers and values as columns, that can be
imported into a pandas ``DataFrame``.
best_estimator_ : estimator
Estimator that was chosen by the search, i.e. estimator
which gave highest score
on the left out data. Not available if ``refit=False``.
best_params_ : dict
Parameter setting that gave the best results on the hold out data.
best_index_ : int
The index (of the ``cv_results_`` arrays) which corresponds to the best
candidate parameter setting.
The dict at ``search.cv_results_['params'][search.best_index_]`` gives
the parameter setting for the best model, that gives the highest
mean score (``search.best_score_``).
scorer_ : function or a dict
Scorer function used on the held out data to choose the best
parameters for the model.
n_splits_ : int
The number of cross-validation splits (folds/iterations).
refit_time_ : float
Seconds used for refitting the best model on the whole dataset.
This is present only if ``refit`` is not False.
fit_stats_ : dict
Counters collected during the last ``fit`` call. Includes evaluated
candidates, unique candidates, cross-validation calls, cache hits,
duplicate candidates, skipped invalid candidates, and population-level
parallel/serial batch counts.
"""
def __init__(
self,
estimator,
cv=3,
param_grid=None,
scoring=None,
population_size=50,
generations=80,
crossover_probability=0.8,
mutation_probability=0.1,
tournament_size=3,
elitism=True,
verbose=True,
keep_top_k=1,
criteria="max",
algorithm="eaMuPlusLambda",
refit=True,
n_jobs=None,
pre_dispatch="2*n_jobs",
error_score=np.nan,
return_train_score=False,
log_config=None,
use_cache=True,
warm_start_configs=None,
evolution_config=None,
population_config=None,
runtime_config=None,
optimization_config=None,
parallel_backend="auto",
population_initializer="smart",
local_search=False,
local_search_top_k=1,
local_search_steps=1,
local_search_radius=0.1,
diversity_control=True,
diversity_threshold=0.25,
diversity_stagnation_generations=5,
diversity_mutation_boost=2.0,
random_immigrants_fraction=0.1,
adaptive_selection=False,
selection_pressure_min=2,
selection_pressure_max=None,
offspring_diversity_retries=0,
fitness_sharing=False,
sharing_radius=0.2,
sharing_alpha=1.0,
final_selection=False,
final_selection_top_k=3,
final_selection_cv=None,
):
legacy_warm_start_configs = warm_start_configs
population_size = _resolve_config_value(
evolution_config, "population_size", population_size
)
generations = _resolve_config_value(evolution_config, "generations", generations)
crossover_probability = _resolve_config_value(
evolution_config, "crossover_probability", crossover_probability
)
mutation_probability = _resolve_config_value(
evolution_config, "mutation_probability", mutation_probability
)
tournament_size = _resolve_config_value(
evolution_config, "tournament_size", tournament_size
)
elitism = _resolve_config_value(evolution_config, "elitism", elitism)
keep_top_k = _resolve_config_value(evolution_config, "keep_top_k", keep_top_k)
criteria = _resolve_config_value(evolution_config, "criteria", criteria)
algorithm = _resolve_config_value(evolution_config, "algorithm", algorithm)
population_initializer = _resolve_config_value(
population_config, "initializer", population_initializer
)
warm_start_configs = _resolve_config_value(
population_config, "warm_start_configs", warm_start_configs
)
n_jobs = _resolve_config_value(runtime_config, "n_jobs", n_jobs)
pre_dispatch = _resolve_config_value(runtime_config, "pre_dispatch", pre_dispatch)
error_score = _resolve_config_value(runtime_config, "error_score", error_score)
return_train_score = _resolve_config_value(
runtime_config, "return_train_score", return_train_score
)
use_cache = _resolve_config_value(runtime_config, "use_cache", use_cache)
parallel_backend = _resolve_config_value(
runtime_config, "parallel_backend", parallel_backend
)
verbose = _resolve_config_value(runtime_config, "verbose", verbose)
local_search = _resolve_config_value(optimization_config, "local_search", local_search)
local_search_top_k = _resolve_config_value(
optimization_config, "local_search_top_k", local_search_top_k
)
local_search_steps = _resolve_config_value(
optimization_config, "local_search_steps", local_search_steps
)
local_search_radius = _resolve_config_value(
optimization_config, "local_search_radius", local_search_radius
)
diversity_control = _resolve_config_value(
optimization_config, "diversity_control", diversity_control
)
diversity_threshold = _resolve_config_value(
optimization_config, "diversity_threshold", diversity_threshold
)
diversity_stagnation_generations = _resolve_config_value(
optimization_config,
"diversity_stagnation_generations",
diversity_stagnation_generations,
)
diversity_mutation_boost = _resolve_config_value(
optimization_config, "diversity_mutation_boost", diversity_mutation_boost
)
random_immigrants_fraction = _resolve_config_value(
optimization_config, "random_immigrants_fraction", random_immigrants_fraction
)
adaptive_selection = _resolve_config_value(
optimization_config, "adaptive_selection", adaptive_selection
)
selection_pressure_min = _resolve_config_value(
optimization_config, "selection_pressure_min", selection_pressure_min
)
selection_pressure_max = _resolve_config_value(
optimization_config, "selection_pressure_max", selection_pressure_max
)
offspring_diversity_retries = _resolve_config_value(
optimization_config, "offspring_diversity_retries", offspring_diversity_retries
)
fitness_sharing = _resolve_config_value(
optimization_config, "fitness_sharing", fitness_sharing
)
sharing_radius = _resolve_config_value(
optimization_config, "sharing_radius", sharing_radius
)
sharing_alpha = _resolve_config_value(optimization_config, "sharing_alpha", sharing_alpha)
final_selection = _resolve_config_value(
optimization_config, "final_selection", final_selection
)
final_selection_top_k = _resolve_config_value(
optimization_config, "final_selection_top_k", final_selection_top_k
)
final_selection_cv = _resolve_config_value(
optimization_config, "final_selection_cv", final_selection_cv
)
self.evolution_config = evolution_config
self.population_config = population_config
self.runtime_config = runtime_config
self.optimization_config = optimization_config
self.estimator = estimator
self.cv = cv
self.scoring = scoring
self.population_size = population_size
self.generations = generations
self.crossover_probability = crossover_probability
self.mutation_probability = mutation_probability
self.crossover_adapter = check_adapter(self.crossover_probability)
self.mutation_adapter = check_adapter(self.mutation_probability)
self.tournament_size = tournament_size
self.elitism = elitism
self.verbose = verbose
self.keep_top_k = keep_top_k
self.criteria = criteria
self.param_grid = param_grid
self.algorithm = algorithm
self.refit = refit
self.n_jobs = n_jobs
self.pre_dispatch = pre_dispatch
self.error_score = error_score
self.return_train_score = return_train_score
# self.creator = creator
self.log_config = log_config
self.use_cache = use_cache
self.fitness_cache = {}
self.warm_start_configs = legacy_warm_start_configs
self._warm_start_configs = warm_start_configs
self.parallel_backend = parallel_backend
self.population_initializer = population_initializer
self.local_search = local_search
self.local_search_top_k = local_search_top_k
self.local_search_steps = local_search_steps
self.local_search_radius = local_search_radius
self.diversity_control = diversity_control
self.diversity_threshold = diversity_threshold
self.diversity_stagnation_generations = diversity_stagnation_generations
self.diversity_mutation_boost = diversity_mutation_boost
self.random_immigrants_fraction = random_immigrants_fraction
self.adaptive_selection = adaptive_selection
self.selection_pressure_min = selection_pressure_min
self.selection_pressure_max = selection_pressure_max
self.offspring_diversity_retries = offspring_diversity_retries
self.fitness_sharing = fitness_sharing
self.sharing_radius = sharing_radius
self.sharing_alpha = sharing_alpha
self.final_selection = final_selection
self.final_selection_top_k = final_selection_top_k
self.final_selection_cv = final_selection_cv
_validate_parallel_backend(self.parallel_backend)
_validate_population_initializer(self.population_initializer)
if self.final_selection_top_k < 1:
raise ValueError("final_selection_top_k must be greater than or equal to 1")
_validate_optimizer_control(
self.local_search_top_k,
self.local_search_steps,
self.local_search_radius,
self.diversity_threshold,
self.diversity_stagnation_generations,
self.diversity_mutation_boost,
self.random_immigrants_fraction,
self.sharing_radius,
self.sharing_alpha,
self.selection_pressure_min,
self.selection_pressure_max,
self.offspring_diversity_retries,
)
# Check that the estimator is compatible with scikit-learn
if not (
_is_classifier(self.estimator)
or _is_regressor(self.estimator)
or _is_outlier_detector(self.estimator)
):
raise ValueError(
f"{self.estimator} is not a valid Sklearn classifier, regressor, or outlier detector"
)
if criteria not in Criteria.list():
raise ValueError(f"Criteria must be one of {Criteria.list()}, got {criteria} instead")
# Minimization is handle like an optimization problem with a change in the score sign
elif criteria == Criteria.max.value:
self.criteria_sign = 1.0
elif criteria == Criteria.min.value:
self.criteria_sign = -1.0
# Saves the param_grid and computes some extra properties in the same object
self.space = Space(param_grid)
if len(self.space) == 1: # pragma: no cover
warnings.warn(
"Warning, only one parameter was provided to the param_grid, the optimization routine "
"might not have effect or it could lead to errors, it's advised to use at least 2 parameters"
)
super(GASearchCV, self).__init__(
estimator=estimator,
scoring=scoring,
n_jobs=n_jobs,
refit=refit,
cv=cv,
verbose=verbose,
pre_dispatch=pre_dispatch,
error_score=error_score,
)
def _register(self):
"""
This function is the responsible for registering the DEAPs necessary methods
and create other objects to hold the hof, logbook and stats.
"""
self.toolbox = base.Toolbox()
creator.create("FitnessMax", base.Fitness, weights=(self.criteria_sign,))
creator.create("Individual", list, fitness=creator.FitnessMax)
attributes = []
# Assign all the parameters defined in the param_grid
# It uses the distribution parameter to set the sampling function
for parameter, dimension in self.space.param_grid.items():
self.toolbox.register(f"{parameter}", dimension.sample)
attributes.append(getattr(self.toolbox, parameter))
IND_SIZE = 1
self.toolbox.register(
"individual",
tools.initCycle,
creator.Individual,
tuple(attributes),
n=IND_SIZE,
)
self.toolbox.register("population", tools.initRepeat, list, self.toolbox.individual)
if len(self.space) == 1 and hasattr(list(self.space.param_grid.values())[0], "lower"):
sampler = list(self.space.param_grid.values())[0]
lower, upper = sampler.lower, sampler.upper
self.toolbox.register(
"mate_raw", tools.cxSimulatedBinaryBounded, low=lower, up=upper, eta=10
)
else:
self.toolbox.register("mate_raw", tools.cxUniform, indpb=0.5)
self.toolbox.register("mate", self.mate)
self.toolbox.register("mutate", self.mutate)
self.toolbox.register("select", self.select)
self.toolbox.register("evaluate", self.evaluate)
self.toolbox.register("evaluate_population", self.evaluate_population)
self._pop = self._initialize_population()
self._hof = tools.HallOfFame(self.keep_top_k)
self._stats = tools.Statistics(lambda ind: ind.fitness.values)
self._stats.register("fitness", np.mean, axis=0)
self._stats.register("fitness_std", np.std, axis=0)
self._stats.register("fitness_max", np.max, axis=0)
self._stats.register("fitness_min", np.min, axis=0)
self.logbook = tools.Logbook()
def _initialize_population(self):
"""
Initialize the population, using warm-start configurations if provided.
"""
population = initialize_search_population(self, self.toolbox, creator.Individual)
for individual in population:
self._repair_individual(individual)
return population
[docs]
def select(self, population, k):
if not self.elitism:
self._selection_pressure_ = None
return tools.selRoulette(population, k)
tournament_size = adaptive_tournament_size(
self,
getattr(self, "_last_generation_record", None),
len(population),
)
self._selection_pressure_ = tournament_size
return tools.selTournament(population, k, tournsize=tournament_size)
def _repair_value(self, dimension, value):
if isinstance(dimension, Integer):
if value is None:
return dimension.sample()
repaired = int(round(float(value)))
return int(np.clip(repaired, dimension.lower, dimension.upper))
if isinstance(dimension, Continuous):
if value is None:
return dimension.sample()
repaired = float(value)
return float(np.clip(repaired, dimension.lower, dimension.upper))
if isinstance(dimension, Categorical):
return value if value in dimension.choices else dimension.sample()
return value
def _repair_individual(self, individual):
if not hasattr(self, "space"):
return individual
for index, parameter in enumerate(self.space.parameters):
individual[index] = self._repair_value(self.space[parameter], individual[index])
return individual
[docs]
def mate(self, individual_1, individual_2):
offspring_1, offspring_2 = self.toolbox.mate_raw(individual_1, individual_2)
self._repair_individual(offspring_1)
self._repair_individual(offspring_2)
return offspring_1, offspring_2
def mutate(self, individual):
"""
This function is responsible for change a randomly selected parameter from an individual
Parameters
----------
individual: Individual object
The individual (set of hyperparameters) that is being generated
Returns
-------
Mutated individual
"""
# Randomly select one of the hyperparameters
gen = random.randrange(0, len(self.space))
parameter_idx = self.space.parameters[gen]
parameter = self.space[parameter_idx]
# Using the defined distribution from the para_grid value
# Make a random sample of the parameter
individual[gen] = parameter.sample()
self._repair_individual(individual)
return [individual]
def _individual_key(self, individual):
current_generation_params = {
key: individual[n] for n, key in enumerate(self.space.parameters)
}
return tuple(sorted(current_generation_params.items()))
[docs]
def evaluate_population(self, individuals):
for individual in individuals:
self._repair_individual(individual)
return _evaluate_population_batch(self, individuals, "current_generation_params")
def _evaluate_individual(self, individual, n_jobs=None):
self._repair_individual(individual)
# Dictionary representation of the individual with key-> hyperparameter name, value -> value
current_generation_params = {
key: individual[n] for n, key in enumerate(self.space.parameters)
}
local_estimator = clone(self.estimator)
local_estimator.set_params(**current_generation_params)
# standard cross_validate for all estimator types is used
cv_results = cross_validate(
local_estimator,
self.X_,
self.y_,
cv=self._cv_splits,
scoring=self.scorer_,
n_jobs=n_jobs,
pre_dispatch=self.pre_dispatch,
error_score=self.error_score,
return_train_score=self.return_train_score,
)
cv_scores = cv_results[f"test_{self.refit_metric}"]
score = np.mean(cv_scores)
# Uses the log config to save in remote log server (e.g MLflow)
if self.log_config is not None:
self.log_config.create_run(
parameters=current_generation_params,
score=score,
estimator=local_estimator,
)
# These values are used to compute cv_results_ property
current_generation_params["score"] = score
current_generation_params["cv_scores"] = cv_scores
current_generation_params["fit_time"] = cv_results["fit_time"]
current_generation_params["score_time"] = cv_results["score_time"]
for metric in self.metrics_list:
current_generation_params[f"test_{metric}"] = cv_results[f"test_{metric}"]
if self.return_train_score:
current_generation_params[f"train_{metric}"] = cv_results[f"train_{metric}"]
fitness_result = (score,)
return fitness_result, current_generation_params, True, False
def evaluate(self, individual):
"""
Compute the cross-validation scores and record the logbook and mlflow (if specified)
Parameters
----------
individual: Individual object
The individual (set of hyperparameters) that is being evaluated
Returns
-------
The fitness value of the estimator candidate, corresponding to the cv-score
"""
# Convert hyperparameters to a tuple to use as a key in the cache
self._repair_individual(individual)
individual_key = self._individual_key(individual)
# Check if the individual has already been evaluated
if individual_key in self.fitness_cache and self.use_cache:
# Retrieve cached result
cached_result = self.fitness_cache[individual_key]
# Ensure the logbook is updated even if the individual is cached
self.logbook.record(parameters=cached_result["current_generation_params"])
_record_fit_stats(self, evaluated=1, cache_hits=1)
return cached_result["fitness"]
candidate_n_jobs = self.n_jobs if self.parallel_backend == "cv" else 1
(
fitness_result,
current_generation_params,
used_cv,
skipped_invalid,
) = self._evaluate_individual(
individual,
n_jobs=candidate_n_jobs,
)
current_generation_params = _logbook_record(
self.logbook,
"parameters",
current_generation_params,
)
if self.use_cache:
# Store the fitness result and the current generation parameters in the cache
self.fitness_cache[individual_key] = {
"fitness": fitness_result,
"current_generation_params": current_generation_params,
}
_record_fit_stats(
self,
evaluated=1,
unique=1,
cv_calls=int(used_cv),
skipped=int(skipped_invalid),
)
return fitness_result
def _candidate_params_from_index(self, index):
return self.cv_results_["params"][index]
def _top_candidate_indices(self):
ranks = np.asarray(self.cv_results_[f"rank_test_{self.refit_metric}"])
return list(np.argsort(ranks)[: self.final_selection_top_k])
def _final_selection_splits(self):
if self.final_selection_cv is None:
return self._cv_splits
cv = check_cv(self.final_selection_cv, self.y_, classifier=_is_classifier(self.estimator))
return list(cv.split(self.X_, self.y_))
def _score_final_candidate(self, params, cv_splits):
local_estimator = clone(self.estimator)
local_estimator.set_params(**params)
cv_results = cross_validate(
local_estimator,
self.X_,
self.y_,
cv=cv_splits,
scoring=self.scorer_,
n_jobs=self.n_jobs,
pre_dispatch=self.pre_dispatch,
error_score=self.error_score,
return_train_score=False,
)
cv_scores = cv_results[f"test_{self.refit_metric}"]
return float(np.mean(cv_scores)), cv_scores
def _select_final_candidate(self):
original_best_index = int(self.cv_results_[f"rank_test_{self.refit_metric}"].argmin())
original_best_score = float(
self.cv_results_[f"mean_test_{self.refit_metric}"][original_best_index]
)
original_best_params = self._candidate_params_from_index(original_best_index)
self.final_selection_results_ = {
"enabled": bool(self.final_selection),
"top_k": 1,
"cv": self.final_selection_cv,
"original_best_index": original_best_index,
"original_best_score": original_best_score,
"original_best_params": original_best_params,
"selected_index": original_best_index,
"selected_score": original_best_score,
"selected_params": original_best_params,
"changed": False,
"candidates": [],
"time_seconds": 0.0,
}
if not self.final_selection:
return original_best_index, original_best_score, original_best_params
started_at = time.time()
cv_splits = self._final_selection_splits()
candidate_results = []
seen_params = set()
for index in self._top_candidate_indices():
params = self._candidate_params_from_index(index)
params_key = tuple(sorted(params.items()))
if params_key in seen_params:
continue
seen_params.add(params_key)
score, cv_scores = self._score_final_candidate(params, cv_splits)
candidate_results.append(
{
"index": int(index),
"original_score": float(
self.cv_results_[f"mean_test_{self.refit_metric}"][index]
),
"score": score,
"cv_scores": cv_scores.tolist(),
"params": params,
}
)
if candidate_results:
selected = max(candidate_results, key=lambda item: item["score"])
selected_index = selected["index"]
selected_score = selected["score"]
selected_params = selected["params"]
else: # pragma: no cover
selected_index = original_best_index
selected_score = original_best_score
selected_params = original_best_params
self.final_selection_results_.update(
{
"top_k": self.final_selection_top_k,
"selected_index": selected_index,
"selected_score": selected_score,
"selected_params": selected_params,
"changed": selected_index != original_best_index,
"candidates": candidate_results,
"time_seconds": time.time() - started_at,
}
)
return selected_index, selected_score, selected_params
[docs]
def fit(self, X, y=None, callbacks=None):
"""
Main method of GASearchCV, starts the optimization
procedure with the hyperparameters of the given estimator
Parameters
----------
X : array-like of shape (n_samples, n_features)
The data to fit. Can be for example a list, or an array.
y : array-like of shape (n_samples,) or (n_samples, n_outputs), \
default=None
The target variable to try to predict in the case of
supervised learning. For outlier detection, y can be None.
callbacks: list or callable
One or a list of the callbacks methods available in
:class:`~sklearn_genetic.callbacks`.
The callback is evaluated after fitting the estimators from the generation 1.
"""
self.X_ = X
self.y_ = y
self._n_iterations = self.generations + 1
self.refit_metric = "score"
self.multimetric_ = False
# added a handle outlier detection jussst in case where y might be None
if _is_outlier_detector(self.estimator) and y is None:
# and for unsupervised outlier detection, it will create dummy y for cv compatibility :)
self.y_ = np.zeros(X.shape[0])
# Make sure the callbacks are valid
self.callbacks = check_callback(callbacks)
checkpoint_loaded = False
# Load state if a checkpoint exists
for callback in self.callbacks:
if isinstance(callback, ModelCheckpoint):
if os.path.exists(callback.checkpoint_path):
checkpoint_data = callback.load()
if checkpoint_data:
self.__dict__.update(checkpoint_data["estimator_state"]) # noqa
self.logbook = checkpoint_data["logbook"]
checkpoint_loaded = True
break
if not checkpoint_loaded:
_reset_adapters(self)
self.fit_stats_ = _create_fit_stats()
if callable(self.scoring):
self.scorer_ = self.scoring
self.metrics_list = [self.refit_metric]
elif self.scoring is None or isinstance(self.scoring, str):
# it will handle outlier detectors that don't have a score method
if _is_outlier_detector(self.estimator) and self.scoring is None:
# this function creates a default scorer for outlier detection
def default_outlier_scorer(estimator, X, y=None):
if hasattr(estimator, "score_samples"):
return np.mean(estimator.score_samples(X))
elif hasattr(estimator, "decision_function"):
return np.mean(estimator.decision_function(X))
else:
predictions = estimator.fit_predict(X)
return np.mean(predictions == 1)
self.scorer_ = default_outlier_scorer
self.metrics_list = [self.refit_metric]
else:
self.scorer_ = check_scoring(self.estimator, self.scoring)
self.metrics_list = [self.refit_metric]
else:
self.scorer_ = _check_multimetric_scoring(self.estimator, self.scoring)
self._check_refit_for_multimetric(self.scorer_)
self.refit_metric = self.refit
self.metrics_list = self.scorer_.keys()
self.multimetric_ = True
# Check cv and get the n_splits
if _is_outlier_detector(self.estimator):
# For outlier detectors, better to use KFold instead of classifier-based CV
from sklearn.model_selection import KFold
cv_orig = KFold(n_splits=self.cv if isinstance(self.cv, int) else 5)
self.n_splits_ = cv_orig.get_n_splits(X, self.y_)
else:
cv_orig = check_cv(self.cv, self.y_, classifier=_is_classifier(self.estimator))
self.n_splits_ = cv_orig.get_n_splits(X, self.y_)
self._cv_splits = list(cv_orig.split(self.X_, self.y_))
# Set the DEAPs necessary methods
self._register()
# Optimization routine from the selected evolutionary algorithm
pop, log, n_gen = self._select_algorithm(pop=self._pop, stats=self._stats, hof=self._hof)
# Update the _n_iterations value as the algorithm could stop earlier due a callback
self._n_iterations = n_gen
self.cv_results_ = create_gasearch_cv_results_(
logbook=self.logbook,
space=self.space,
return_train_score=self.return_train_score,
metrics=self.metrics_list,
)
self.history = {
"gen": log.select("gen"),
"fitness": log.select("fitness"),
"fitness_std": log.select("fitness_std"),
"fitness_best": log.select("fitness_best"),
"fitness_max": log.select("fitness_max"),
"fitness_min": log.select("fitness_min"),
"population_size": log.select("population_size"),
"unique_individuals": log.select("unique_individuals"),
"unique_individual_ratio": log.select("unique_individual_ratio"),
"genotype_diversity": log.select("genotype_diversity"),
"fitness_improvement": log.select("fitness_improvement"),
"fitness_improved": log.select("fitness_improved"),
"stagnation_generations": log.select("stagnation_generations"),
"best_generation": log.select("best_generation"),
"mutation_probability": log.select("mutation_probability"),
"selection_pressure": log.select("selection_pressure"),
"diversity_control_triggered": log.select("diversity_control_triggered"),
"random_immigrants": log.select("random_immigrants"),
"duplicate_replacements": log.select("duplicate_replacements"),
"local_refinements": log.select("local_refinements"),
"fitness_sharing_applied": log.select("fitness_sharing_applied"),
"mean_niche_count": log.select("mean_niche_count"),
"max_niche_count": log.select("max_niche_count"),
}
# Imitate the logic of scikit-learn refit parameter
if self.refit:
self.best_index_, self.best_score_, self.best_params_ = self._select_final_candidate()
self.estimator.set_params(**self.best_params_)
refit_start_time = time.time()
self.estimator.fit(
self.X_,
self.y_,
)
refit_end_time = time.time()
self.refit_time_ = refit_end_time - refit_start_time
self.best_estimator_ = self.estimator
self.estimator_ = self.best_estimator_
# hof keeps the best params according to the fitness value
# To be consistent with self.best_estimator_, if more than 1 model gets the
# same score, it could lead to differences between hof and self.best_estimator_
self._hof.remove(0)
self._hof.items.insert(0, list(self.best_params_.values()))
self._hof.keys.insert(0, self.best_score_)
self.hof = {
k: {key: self._hof[k][n] for n, key in enumerate(self.space.parameters)}
for k in range(len(self._hof))
}
del creator.FitnessMax
del creator.Individual
return self
[docs]
class GAFeatureSelectionCV(GeneticEstimatorMixin, MetaEstimatorMixin, SelectorMixin, BaseEstimator):
"""
Evolutionary optimization for feature selection.
GAFeatureSelectionCV implements a "fit" and a "score" method.
It also implements "predict", "predict_proba", "decision_function",
"predict_log_proba" if they are implemented in the
estimator used.
The features (variables) used by the estimator are found by optimizing
the cv-scores and by minimizing the number of features
Parameters
----------
estimator : estimator object, default=None
estimator object implementing 'fit'
The object to use to fit the data.
cv : int, cross-validation generator or an iterable, default=None
Determines the cross-validation splitting strategy.
Possible inputs for cv are:
- None, to use the default 5-fold cross validation,
- int, to specify the number of folds in a `(Stratified)KFold`,
- CV splitter,
- An iterable yielding (train, test) splits as arrays of indices.
For int/None inputs, if the estimator is a classifier and ``y`` is
either binary or multiclass, :class:`StratifiedKFold` is used. In all
other cases, :class:`KFold` is used. These splitters are instantiated
with `shuffle=False` so the splits will be the same across calls.
population_size : int, default=10
Size of the initial population to sample generated individuals.
evolution_config : :class:`~sklearn_genetic.config.EvolutionConfig`, default=None
Optional grouped configuration for core genetic algorithm controls such
as population size, generation count, crossover, mutation, tournament
size, elitism, hall-of-fame size, criteria, and algorithm.
population_config : :class:`~sklearn_genetic.config.PopulationConfig`, default=None
Optional grouped configuration for the initial feature-mask population.
runtime_config : :class:`~sklearn_genetic.config.RuntimeConfig`, default=None
Optional grouped configuration for parallelism, caching, train-score
collection, error handling, and verbose output.
optimization_config : :class:`~sklearn_genetic.config.OptimizationConfig`, default=None
Optional grouped configuration for local refinement, diversity control,
adaptive selection, and fitness sharing. Final-selection fields are
ignored by :class:`~sklearn_genetic.GAFeatureSelectionCV`.
population_initializer : {'smart', 'random'}, default='smart'
Strategy used to generate the initial population. ``'smart'`` creates
duplicate-aware feature masks with a spread of selected-feature counts.
``'random'`` uses the previous weighted random feature-mask sampling.
local_search : bool, default=False
If ``True``, run a short local refinement phase around the current
hall-of-fame feature masks after the genetic search finishes.
local_search_top_k : int, default=1
Number of hall-of-fame feature masks used as local-search seeds.
local_search_steps : int, default=1
Number of neighbor feature masks generated per local-search seed.
local_search_radius : float, default=0.1
Fraction of features to flip when sampling a local neighbor.
diversity_control : bool, default=True
If ``True``, monitor diversity and stagnation to boost mutation,
replace duplicate candidates, and inject random immigrants.
diversity_threshold : float, default=0.25
Diversity value below which diversity control can trigger.
diversity_stagnation_generations : int, default=5
Number of stagnant generations after which diversity control can
inject random immigrants.
diversity_mutation_boost : float, default=2.0
Multiplicative boost applied to mutation probability when diversity
control triggers. The boosted value is capped to DEAP's valid range.
random_immigrants_fraction : float, default=0.1
Fraction of offspring replaced by random immigrants when diversity
control triggers.
adaptive_selection : bool, default=False
If ``True``, adapt tournament size from generation telemetry. Selection
pressure is reduced when diversity is low or the search is stagnant,
and slightly increased when the population is improving with enough
diversity.
selection_pressure_min : int, default=2
Minimum tournament size used by adaptive selection.
selection_pressure_max : int, default=None
Maximum tournament size used by adaptive selection. If ``None``, the
maximum is one larger than ``tournament_size``.
offspring_diversity_retries : int, default=0
Number of retries used when replacing duplicate or parent-matching
offspring with new random feature masks.
fitness_sharing : bool, default=False
If ``True``, temporarily penalize candidates in crowded niches during
selection. Raw cross-validation scores and ``cv_results_`` are not
modified.
sharing_radius : float, default=0.2
Normalized distance below which two individuals are considered part of
the same niche for fitness sharing.
sharing_alpha : float, default=1.0
Shape parameter that controls how quickly sharing pressure decreases
with distance inside ``sharing_radius``.
generations : int, default=40
Number of generations or iterations to run the evolutionary algorithm.
crossover_probability : float or a Scheduler, default=0.2
Probability of crossover operation between two individuals.
mutation_probability : float or a Scheduler, default=0.8
Probability of child mutation.
tournament_size : int, default=3
Number of individuals to perform tournament selection.
elitism : bool, default=True
If True takes the *tournament_size* best solution to the next generation.
max_features : int, default=None
The upper bound number of features to be selected.
scoring : str, callable, list, tuple or dict, default=None
Strategy to evaluate the performance of the cross-validated model on
the test set.
If `scoring` represents a single score, one can use:
- a single string;
- a callable that returns a single value.
If `scoring` represents multiple scores, one can use:
- a list or tuple of unique strings;
- a callable returning a dictionary where the keys are the metric
names and the values are the metric scores;
- a dictionary with metric names as keys and callables a values.
n_jobs : int, default=None
Number of jobs to run in parallel. Candidate evaluations in each
generation are parallelized when possible; each candidate then runs
cross-validation sequentially to avoid nested parallelism.
``None`` means 1 unless in a :obj:`joblib.parallel_backend` context.
``-1`` means using all processors.
verbose : bool, default=True
If ``True``, shows the metrics on the optimization routine.
keep_top_k : int, default=1
Number of best solutions to keep in the hof object. If a callback stops the algorithm before k iterations,
it will return only one set of parameters per iteration.
criteria : {'max', 'min'} , default='max'
``max`` if a higher scoring metric is better, ``min`` otherwise.
algorithm : {'eaMuPlusLambda', 'eaMuCommaLambda', 'eaSimple'}, default='eaMuPlusLambda'
Evolutionary algorithm to use.
See more details in the deap algorithms documentation.
refit : bool, str, or callable, default=True
Refit an estimator using the best found parameters on the whole
dataset.
For multiple metric evaluation, this needs to be a `str` denoting the
scorer that would be used to find the best parameters for refitting
the estimator at the end.
The refitted estimator is made available at the ``best_estimator_``
attribute and permits using ``predict`` directly on this
``FeatureSelectionCV`` instance.
Also for multiple metric evaluation, the attributes ``best_index_``,
``best_score_`` and ``best_params_`` will only be available if
``refit`` is set and all of them will be determined w.r.t this specific
scorer.
See ``scoring`` parameter to know more about multiple metric
evaluation.
If ``False``, it is not possible to make predictions
using this GASearchCV instance after fitting.
pre_dispatch : int or str, default='2*n_jobs'
Controls the number of jobs that get dispatched during parallel
execution. Reducing this number can be useful to avoid an
explosion of memory consumption when more jobs get dispatched
than CPUs can process. This parameter can be ``None`` to dispatch all
jobs immediately, an integer number of total jobs to spawn, or a string
expression as a function of ``n_jobs``, such as ``'2*n_jobs'``.
error_score : 'raise' or numeric, default=np.nan
Value to assign to the score if an error occurs in estimator fitting.
If set to ``'raise'``, the error is raised.
If a numeric value is given, FitFailedWarning is raised.
return_train_score: bool, default=False
If ``False``, the ``cv_results_`` attribute will not include training
scores.
Computing training scores is used to get insights on how different
parameter settings impact the overfitting/underfitting trade-off.
However computing the scores on the training set can be computationally
expensive and is not strictly required to select the parameters that
yield the best generalization performance.
log_config : :class:`~sklearn_genetic.mlflow.MLflowConfig`, default = None
Configuration to log metrics and models to mlflow, of None,
no mlflow logging will be performed
use_cache: bool, default=True
If set to true it will avoid to re-evaluating solutions that have already seen,
otherwise it will always evaluate the solutions to get the performance metrics
Attributes
----------
logbook : :class:`DEAP.tools.Logbook`
Contains the logs of every set of hyperparameters fitted with its average scoring metric.
history : dict
Dictionary with one list per generation. It includes ``gen``,
``fitness``, ``fitness_std``, ``fitness_best``, ``fitness_max``, ``fitness_min``,
population diversity fields, stagnation fields, optimizer-control
telemetry, and local-refinement telemetry.
cv_results_ : dict of numpy (masked) ndarrays
A dict with keys as column headers and values as columns, that can be
imported into a pandas ``DataFrame``.
best_estimator_ : estimator
Estimator that was chosen by the search, i.e. estimator
which gave highest score
on the left out data. Not available if ``refit=False``.
best_features_ : list
List of bool, each index represents one feature in the same order the data was fed.
1 means the feature was selected, 0 means the features was discarded.
support_ : list
The mask of selected features.
scorer_ : function or a dict
Scorer function used on the held out data to choose the best
parameters for the model.
n_splits_ : int
The number of cross-validation splits (folds/iterations).
n_features_in_ : int
Number of features seen (selected) during fit.
refit_time_ : float
Seconds used for refitting the best model on the whole dataset.
This is present only if ``refit`` is not False.
fit_stats_ : dict
Counters collected during the last ``fit`` call. Includes evaluated
candidates, unique candidates, cross-validation calls, cache hits,
duplicate candidates, skipped invalid candidates, and population-level
parallel/serial batch counts.
"""
def __init__(
self,
estimator,
cv=3,
scoring=None,
population_size=50,
generations=80,
crossover_probability=0.8,
mutation_probability=0.1,
tournament_size=3,
elitism=True,
max_features=None,
verbose=True,
keep_top_k=1,
criteria="max",
algorithm="eaMuPlusLambda",
refit=True,
n_jobs=None,
pre_dispatch="2*n_jobs",
error_score=np.nan,
return_train_score=False,
log_config=None,
use_cache=True,
evolution_config=None,
population_config=None,
runtime_config=None,
optimization_config=None,
parallel_backend="auto",
population_initializer="smart",
local_search=False,
local_search_top_k=1,
local_search_steps=1,
local_search_radius=0.1,
diversity_control=True,
diversity_threshold=0.25,
diversity_stagnation_generations=5,
diversity_mutation_boost=2.0,
random_immigrants_fraction=0.1,
adaptive_selection=False,
selection_pressure_min=2,
selection_pressure_max=None,
offspring_diversity_retries=0,
fitness_sharing=False,
sharing_radius=0.2,
sharing_alpha=1.0,
):
population_size = _resolve_config_value(
evolution_config, "population_size", population_size
)
generations = _resolve_config_value(evolution_config, "generations", generations)
crossover_probability = _resolve_config_value(
evolution_config, "crossover_probability", crossover_probability
)
mutation_probability = _resolve_config_value(
evolution_config, "mutation_probability", mutation_probability
)
tournament_size = _resolve_config_value(
evolution_config, "tournament_size", tournament_size
)
elitism = _resolve_config_value(evolution_config, "elitism", elitism)
keep_top_k = _resolve_config_value(evolution_config, "keep_top_k", keep_top_k)
criteria = _resolve_config_value(evolution_config, "criteria", criteria)
algorithm = _resolve_config_value(evolution_config, "algorithm", algorithm)
population_initializer = _resolve_config_value(
population_config, "initializer", population_initializer
)
n_jobs = _resolve_config_value(runtime_config, "n_jobs", n_jobs)
pre_dispatch = _resolve_config_value(runtime_config, "pre_dispatch", pre_dispatch)
error_score = _resolve_config_value(runtime_config, "error_score", error_score)
return_train_score = _resolve_config_value(
runtime_config, "return_train_score", return_train_score
)
use_cache = _resolve_config_value(runtime_config, "use_cache", use_cache)
parallel_backend = _resolve_config_value(
runtime_config, "parallel_backend", parallel_backend
)
verbose = _resolve_config_value(runtime_config, "verbose", verbose)
local_search = _resolve_config_value(optimization_config, "local_search", local_search)
local_search_top_k = _resolve_config_value(
optimization_config, "local_search_top_k", local_search_top_k
)
local_search_steps = _resolve_config_value(
optimization_config, "local_search_steps", local_search_steps
)
local_search_radius = _resolve_config_value(
optimization_config, "local_search_radius", local_search_radius
)
diversity_control = _resolve_config_value(
optimization_config, "diversity_control", diversity_control
)
diversity_threshold = _resolve_config_value(
optimization_config, "diversity_threshold", diversity_threshold
)
diversity_stagnation_generations = _resolve_config_value(
optimization_config,
"diversity_stagnation_generations",
diversity_stagnation_generations,
)
diversity_mutation_boost = _resolve_config_value(
optimization_config, "diversity_mutation_boost", diversity_mutation_boost
)
random_immigrants_fraction = _resolve_config_value(
optimization_config, "random_immigrants_fraction", random_immigrants_fraction
)
adaptive_selection = _resolve_config_value(
optimization_config, "adaptive_selection", adaptive_selection
)
selection_pressure_min = _resolve_config_value(
optimization_config, "selection_pressure_min", selection_pressure_min
)
selection_pressure_max = _resolve_config_value(
optimization_config, "selection_pressure_max", selection_pressure_max
)
offspring_diversity_retries = _resolve_config_value(
optimization_config, "offspring_diversity_retries", offspring_diversity_retries
)
fitness_sharing = _resolve_config_value(
optimization_config, "fitness_sharing", fitness_sharing
)
sharing_radius = _resolve_config_value(
optimization_config, "sharing_radius", sharing_radius
)
sharing_alpha = _resolve_config_value(optimization_config, "sharing_alpha", sharing_alpha)
self.evolution_config = evolution_config
self.population_config = population_config
self.runtime_config = runtime_config
self.optimization_config = optimization_config
self.estimator = estimator
self.cv = cv
self.scoring = scoring
self.population_size = population_size
self.generations = generations
self.crossover_probability = crossover_probability
self.mutation_probability = mutation_probability
self.crossover_adapter = check_adapter(self.crossover_probability)
self.mutation_adapter = check_adapter(self.mutation_probability)
self.tournament_size = tournament_size
self.elitism = elitism
self.max_features = max_features
self.verbose = verbose
self.keep_top_k = keep_top_k
self.criteria = criteria
self.algorithm = algorithm
self.refit = refit
self.n_jobs = n_jobs
self.pre_dispatch = pre_dispatch
self.error_score = error_score
self.return_train_score = return_train_score
# self.creator = creator
self.log_config = log_config
self.use_cache = use_cache
self.fitness_cache = {}
self.parallel_backend = parallel_backend
self.population_initializer = population_initializer
self.local_search = local_search
self.local_search_top_k = local_search_top_k
self.local_search_steps = local_search_steps
self.local_search_radius = local_search_radius
self.diversity_control = diversity_control
self.diversity_threshold = diversity_threshold
self.diversity_stagnation_generations = diversity_stagnation_generations
self.diversity_mutation_boost = diversity_mutation_boost
self.random_immigrants_fraction = random_immigrants_fraction
self.adaptive_selection = adaptive_selection
self.selection_pressure_min = selection_pressure_min
self.selection_pressure_max = selection_pressure_max
self.offspring_diversity_retries = offspring_diversity_retries
self.fitness_sharing = fitness_sharing
self.sharing_radius = sharing_radius
self.sharing_alpha = sharing_alpha
_validate_parallel_backend(self.parallel_backend)
_validate_population_initializer(self.population_initializer)
_validate_optimizer_control(
self.local_search_top_k,
self.local_search_steps,
self.local_search_radius,
self.diversity_threshold,
self.diversity_stagnation_generations,
self.diversity_mutation_boost,
self.random_immigrants_fraction,
self.sharing_radius,
self.sharing_alpha,
self.selection_pressure_min,
self.selection_pressure_max,
self.offspring_diversity_retries,
)
# added new check for whether the estimator is compatible with scikit-learn
if not (
_is_classifier(self.estimator)
or _is_regressor(self.estimator)
or _is_outlier_detector(self.estimator)
):
raise ValueError(
f"{self.estimator} is not a valid Sklearn classifier, regressor, or outlier detector"
)
if criteria not in Criteria.list():
raise ValueError(f"Criteria must be one of {Criteria.list()}, got {criteria} instead")
# Minimization is handle like an optimization problem with a change in the score sign
elif criteria == Criteria.max.value:
self.criteria_sign = 1.0
elif criteria == Criteria.min.value:
self.criteria_sign = -1.0
def _register(self):
"""
This function is the responsible for registering the DEAPs necessary methods
and create other objects to hold the hof, logbook and stats.
"""
self.toolbox = base.Toolbox()
# Criteria sign to set max or min problem
# And -1.0 as second weight to minimize number of features
creator.create("FitnessMax", base.Fitness, weights=[self.criteria_sign, -1.0])
creator.create("Individual", list, fitness=creator.FitnessMax)
# Register the array to choose the features
# Each binary value represents if the feature is selected or not
self.toolbox.register(
"individual_raw",
weighted_bool_individual,
creator.Individual,
weight=self.features_proportion,
size=self.n_features,
)
self.toolbox.register("individual", self._new_feature_individual)
self.toolbox.register("population", tools.initRepeat, list, self.toolbox.individual)
self.toolbox.register("mate_raw", cxUniform, indpb=self.crossover_adapter.current_value)
self.toolbox.register("mutate_raw", mutFlipBit, indpb=self.mutation_adapter.current_value)
self.toolbox.register("mate", self.mate)
self.toolbox.register("mutate", self.mutate)
self.toolbox.register("select", self.select)
self.toolbox.register("evaluate", self.evaluate)
self.toolbox.register("evaluate_population", self.evaluate_population)
self._pop = self._initialize_population()
self._hof = tools.HallOfFame(self.keep_top_k)
# Stats among axis 0 to get two values:
# One based on the score and the other in the number of features
self._stats = tools.Statistics(ind_fitness_values)
self._stats.register("fitness", np.mean, axis=0)
self._stats.register("fitness_std", np.std, axis=0)
self._stats.register("fitness_max", np.max, axis=0)
self._stats.register("fitness_min", np.min, axis=0)
self.logbook = tools.Logbook()
def _initialize_population(self):
population = initialize_feature_population(self, self.toolbox, creator.Individual)
for individual in population:
self._repair_individual(individual)
return population
[docs]
def select(self, population, k):
if not self.elitism:
self._selection_pressure_ = None
return tools.selRoulette(population, k)
tournament_size = adaptive_tournament_size(
self,
getattr(self, "_last_generation_record", None),
len(population),
)
self._selection_pressure_ = tournament_size
return tools.selTournament(population, k, tournsize=tournament_size)
def _repair_individual(self, individual):
for index, value in enumerate(individual):
individual[index] = 1 if value else 0
max_features = getattr(self, "max_features", None)
if max_features and sum(individual) > max_features:
selected = [index for index, value in enumerate(individual) if value]
random.shuffle(selected)
for index in selected[max_features:]:
individual[index] = 0
if sum(individual) == 0:
individual[random.randrange(0, len(individual))] = 1
return individual
def _new_feature_individual(self):
return self._repair_individual(self.toolbox.individual_raw())
[docs]
def mate(self, individual_1, individual_2):
offspring_1, offspring_2 = self.toolbox.mate_raw(individual_1, individual_2)
self._repair_individual(offspring_1)
self._repair_individual(offspring_2)
return offspring_1, offspring_2
def mutate(self, individual):
(mutated,) = self.toolbox.mutate_raw(individual)
self._repair_individual(mutated)
return (mutated,)
def _individual_key(self, individual):
return tuple(individual)
[docs]
def evaluate_population(self, individuals):
for individual in individuals:
self._repair_individual(individual)
return _evaluate_population_batch(self, individuals, "current_generation_features")
def _build_feature_evaluation_record(self, current_generation_params, cv_results):
cv_scores = cv_results[f"test_{self.refit_metric}"]
score = np.mean(cv_scores)
current_generation_params["score"] = score
current_generation_params["cv_scores"] = cv_scores
current_generation_params["fit_time"] = cv_results["fit_time"]
current_generation_params["score_time"] = cv_results["score_time"]
for metric in self.metrics_list:
current_generation_params[f"test_{metric}"] = cv_results[f"test_{metric}"]
if self.return_train_score:
current_generation_params[f"train_{metric}"] = cv_results[f"train_{metric}"]
return score, current_generation_params
def _penalized_feature_cv_results(self, score):
cv_results = {
"fit_time": np.zeros(self.n_splits_),
"score_time": np.zeros(self.n_splits_),
}
for metric in self.metrics_list:
cv_results[f"test_{metric}"] = np.full(self.n_splits_, score)
if self.return_train_score:
cv_results[f"train_{metric}"] = np.full(self.n_splits_, score)
return cv_results
def _evaluate_individual(self, individual, n_jobs=None):
self._repair_individual(individual)
bool_individual = np.array(individual, dtype=bool)
current_generation_params = {"features": bool_individual}
n_selected_features = np.sum(individual)
max_features = getattr(self, "max_features", None)
if max_features and (n_selected_features > max_features or n_selected_features == 0):
score = -self.criteria_sign * 100000
cv_results = self._penalized_feature_cv_results(score)
_, current_generation_params = self._build_feature_evaluation_record(
current_generation_params, cv_results
)
fitness_result = [score, n_selected_features]
return fitness_result, current_generation_params, False, True
local_estimator = clone(self.estimator)
# Use standard cross_validate for all estimator types
cv_results = cross_validate(
local_estimator,
self.X_[:, bool_individual],
self.y_,
cv=self._cv_splits,
scoring=self.scorer_,
n_jobs=n_jobs,
pre_dispatch=self.pre_dispatch,
error_score=self.error_score,
return_train_score=self.return_train_score,
)
score, current_generation_params = self._build_feature_evaluation_record(
current_generation_params, cv_results
)
# Uses the log config to save in remote log server (e.g MLflow)
if self.log_config is not None:
self.log_config.create_run(
parameters=current_generation_params,
score=score,
estimator=local_estimator,
)
fitness_result = [score, n_selected_features]
return fitness_result, current_generation_params, True, False
def evaluate(self, individual):
"""
Compute the cross-validation scores and record the logbook and mlflow (if specified)
Parameters
----------
individual: Individual object
The individual (set of features) that is being evaluated
Returns
-------
fitness: List
Returns a list with two values.
The first one is the corresponding to the cv-score
The second one is the number of features selected
"""
# Convert the individual to a tuple to use as a key in the cache
self._repair_individual(individual)
individual_key = self._individual_key(individual)
# Check if the individual has already been evaluated
if individual_key in self.fitness_cache and self.use_cache:
cached_result = self.fitness_cache[individual_key]
# Ensure the logbook is updated even if the individual is cached
self.logbook.record(parameters=cached_result["current_generation_features"])
_record_fit_stats(self, evaluated=1, cache_hits=1)
return cached_result["fitness"]
candidate_n_jobs = self.n_jobs if self.parallel_backend == "cv" else 1
(
fitness_result,
current_generation_params,
used_cv,
skipped_invalid,
) = self._evaluate_individual(
individual,
n_jobs=candidate_n_jobs,
)
current_generation_params = _logbook_record(
self.logbook,
"parameters",
current_generation_params,
)
if self.use_cache:
# Store the fitness result and the current generation features in the cache
self.fitness_cache[individual_key] = {
"fitness": fitness_result,
"current_generation_features": current_generation_params,
}
_record_fit_stats(
self,
evaluated=1,
unique=1,
cv_calls=int(used_cv),
skipped=int(skipped_invalid),
)
return fitness_result
[docs]
def fit(self, X, y=None, callbacks=None):
"""
Main method of GAFeatureSelectionCV, starts the optimization
procedure with to find the best features set
Parameters
----------
X : array-like of shape (n_samples, n_features)
The data to fit. Can be for example a list, or an array.
y : array-like of shape (n_samples,) or (n_samples, n_outputs), \
default=None
The target variable to try to predict in the case of
supervised learning. For outlier detection, y can be None.
callbacks: list or callable
One or a list of the callbacks methods available in
:class:`~sklearn_genetic.callbacks`.
The callback is evaluated after fitting the estimators from the generation 1.
"""
self.X_, self.y_ = check_X_y(X, y, accept_sparse=True) if y is not None else (X, None)
# Handle outlier detection case if y is none
if _is_outlier_detector(self.estimator) and y is None:
self.X_ = X
self.y_ = np.zeros(X.shape[0])
self.n_features = X.shape[1]
self._n_iterations = self.generations + 1
self.refit_metric = "score"
self.multimetric_ = False
self.features_proportion = None
max_features = getattr(self, "max_features", None)
if max_features:
self.features_proportion = max_features / self.n_features
# Make sure the callbacks are valid
self.callbacks = check_callback(callbacks)
checkpoint_loaded = False
# Load state if a checkpoint exists
for callback in self.callbacks:
if isinstance(callback, ModelCheckpoint):
if os.path.exists(callback.checkpoint_path):
checkpoint_data = callback.load()
if checkpoint_data:
self.__dict__.update(checkpoint_data["estimator_state"]) # noqa
self.logbook = checkpoint_data["logbook"]
checkpoint_loaded = True
break
if not checkpoint_loaded:
_reset_adapters(self)
self.fit_stats_ = _create_fit_stats()
if callable(self.scoring):
self.scorer_ = self.scoring
self.metrics_list = [self.refit_metric]
elif self.scoring is None or isinstance(self.scoring, str):
# Handle outlier detectors that don't have a score method
if _is_outlier_detector(self.estimator) and self.scoring is None:
# this function creates a default scorer for outlier detection
def default_outlier_scorer(estimator, X, y=None):
if hasattr(estimator, "score_samples"):
return np.mean(estimator.score_samples(X))
elif hasattr(estimator, "decision_function"):
return np.mean(estimator.decision_function(X))
else:
predictions = estimator.fit_predict(X)
return np.mean(predictions == 1)
self.scorer_ = default_outlier_scorer
self.metrics_list = [self.refit_metric]
else:
self.scorer_ = check_scoring(self.estimator, self.scoring)
self.metrics_list = [self.refit_metric]
else:
self.scorer_ = _check_multimetric_scoring(self.estimator, self.scoring)
self._check_refit_for_multimetric(self.scorer_)
self.refit_metric = self.refit
self.metrics_list = self.scorer_.keys()
self.multimetric_ = True
# Check cv and get the n_splits
if _is_outlier_detector(self.estimator):
from sklearn.model_selection import KFold
cv_orig = KFold(n_splits=self.cv if isinstance(self.cv, int) else 5)
self.n_splits_ = cv_orig.get_n_splits(X, self.y_)
else:
cv_orig = check_cv(self.cv, self.y_, classifier=_is_classifier(self.estimator))
self.n_splits_ = cv_orig.get_n_splits(X, self.y_)
self._cv_splits = list(cv_orig.split(self.X_, self.y_))
# Set the DEAPs necessary methods
self._register()
# Optimization routine from the selected evolutionary algorithm
pop, log, n_gen = self._select_algorithm(pop=self._pop, stats=self._stats, hof=self._hof)
# Update the _n_iterations value as the algorithm could stop earlier due a callback
self._n_iterations = n_gen
self.best_features_ = np.array(self._hof[0], dtype=bool)
self.support_ = self.best_features_
self.cv_results_ = create_feature_selection_cv_results_(
logbook=self.logbook,
return_train_score=self.return_train_score,
metrics=self.metrics_list,
)
self.history = {
"gen": log.select("gen"),
"fitness": log.select("fitness"),
"fitness_std": log.select("fitness_std"),
"fitness_best": log.select("fitness_best"),
"fitness_max": log.select("fitness_max"),
"fitness_min": log.select("fitness_min"),
"population_size": log.select("population_size"),
"unique_individuals": log.select("unique_individuals"),
"unique_individual_ratio": log.select("unique_individual_ratio"),
"genotype_diversity": log.select("genotype_diversity"),
"fitness_improvement": log.select("fitness_improvement"),
"fitness_improved": log.select("fitness_improved"),
"stagnation_generations": log.select("stagnation_generations"),
"best_generation": log.select("best_generation"),
"mutation_probability": log.select("mutation_probability"),
"selection_pressure": log.select("selection_pressure"),
"diversity_control_triggered": log.select("diversity_control_triggered"),
"random_immigrants": log.select("random_immigrants"),
"duplicate_replacements": log.select("duplicate_replacements"),
"local_refinements": log.select("local_refinements"),
"fitness_sharing_applied": log.select("fitness_sharing_applied"),
"mean_niche_count": log.select("mean_niche_count"),
"max_niche_count": log.select("max_niche_count"),
}
if self.refit:
bool_individual = np.array(self.best_features_, dtype=bool)
refit_start_time = time.time()
self.estimator.fit(self.X_[:, bool_individual], self.y_)
refit_end_time = time.time()
self.refit_time_ = refit_end_time - refit_start_time
self.best_estimator_ = self.estimator
self.estimator_ = self.best_estimator_
self.hof = self._hof
del creator.FitnessMax
del creator.Individual
return self
def _check_refit_for_multimetric(self, scores): # pragma: no cover
"""Check `refit` is compatible with `scores` is valid"""
multimetric_refit_msg = (
"For multi-metric scoring, the parameter refit must be set to a "
"scorer key or a callable to refit an estimator with the best "
"parameter setting on the whole data and make the best_* "
"attributes available for that metric. If this is not needed, "
f"refit should be set to False explicitly. {self.refit!r} was "
"passed."
)
valid_refit_dict = isinstance(self.refit, str) and self.refit in scores
if self.refit is not False and not valid_refit_dict and not callable(self.refit):
raise ValueError(multimetric_refit_msg)
@property
def n_features_in_(self): # pragma: no cover
"""Number of features seen during `fit`."""
# For consistency with other estimators we raise a AttributeError so
# that hasattr() fails if the estimator isn't fitted.
if not self._fitted:
raise AttributeError(
"{} object has no n_features_in_ attribute.".format(self.__class__.__name__)
)
return self.n_features
def _get_support_mask(self):
if not self._fitted:
raise NotFittedError(
f"This GAFeatureSelectionCV instance is not fitted yet "
f"or used refit=False. Call 'fit' with appropriate "
f"arguments before using this estimator."
)
return self.best_features_
[docs]
@available_if(_estimator_has("decision_function"))
def decision_function(self, X):
"""Call decision_function on the estimator with the best found features.
Only available if ``refit=True`` and the underlying estimator supports
``decision_function``.
Parameters
----------
X : indexable, length n_samples
Must fulfill the input assumptions of the
underlying estimator.
Returns
-------
y_score : ndarray of shape (n_samples,) or (n_samples, n_classes) \
or (n_samples, n_classes * (n_classes-1) / 2)
Result of the decision function for `X` based on the estimator with
the best found parameters.
"""
return self.estimator.decision_function(self.transform(X))
[docs]
@available_if(_estimator_has("predict"))
def predict(self, X):
"""Call predict on the estimator with the best found features.
Only available if ``refit=True`` and the underlying estimator supports
``predict``.
Parameters
----------
X : indexable, length n_samples
Must fulfill the input assumptions of the
underlying estimator.
Returns
-------
y_pred : ndarray of shape (n_samples,)
The predicted labels or values for `X` based on the estimator with
the best found parameters.
"""
return self.estimator.predict(self.transform(X))
[docs]
@available_if(_estimator_has("predict_log_proba"))
def predict_log_proba(self, X):
"""Call predict_log_proba on the estimator with the best found features.
Only available if ``refit=True`` and the underlying estimator supports
``predict_log_proba``.
Parameters
----------
X : indexable, length n_samples
Must fulfill the input assumptions of the
underlying estimator.
Returns
-------
y_pred : ndarray of shape (n_samples,) or (n_samples, n_classes)
Predicted class log-probabilities for `X` based on the estimator
with the best found parameters. The order of the classes
corresponds to that in the fitted attribute :term:`classes_`.
"""
return self.estimator.predict_log_proba(self.transform(X))
[docs]
@available_if(_estimator_has("predict_proba"))
def predict_proba(self, X):
"""Call predict_proba on the estimator with the best found features.
Only available if ``refit=True`` and the underlying estimator supports
``predict_proba``.
Parameters
----------
X : indexable, length n_samples
Must fulfill the input assumptions of the
underlying estimator.
Returns
-------
y_pred : ndarray of shape (n_samples,) or (n_samples, n_classes)
Predicted class probabilities for `X` based on the estimator with
the best found parameters. The order of the classes corresponds
to that in the fitted attribute :term:`classes_`.
"""
return self.estimator.predict_proba(self.transform(X))
[docs]
@available_if(_estimator_has("score"))
def score(self, X, y):
"""Return the score on the given data, if the estimator has been refit.
This uses the score defined by ``scoring`` where provided, and the
``best_estimator_.score`` method otherwise.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Input data, where `n_samples` is the number of samples and
`n_features` is the number of features.
y : array-like of shape (n_samples, n_output) \
or (n_samples,), default=None
Target relative to X for classification or regression;
None for unsupervised learning.
Returns
-------
score : float
The score defined by ``scoring`` if provided, and the
``best_estimator_.score`` method otherwise.
"""
return self.estimator.score(self.transform(X), y)
# helpers
def ind_fitness_values(ind):
return ind.fitness.values