Source code for pypunisher.selection_engines.selection

#!/usr/bin/env python

"""
Forward and Backward Selection Algorithms
=========================================
"""
# NOTE: `Selection()`, `Selection().forward()` and
# `Selection.backward()` have docstrings formatted
# for sphinx's auto-documentation engine.

from pypunisher.metrics.criterion import aic, bic
from pypunisher._checks import model_check, array_check, input_checks
from pypunisher.selection_engines._utils import (get_n_features,
                                                 enforce_use_of_all_cpus,
                                                 parse_n_features)


[docs]class Selection(object): """Forward and Backward Selection Algorithms. Args: model (sklearn model) any sklearn model with `.fit()`, `.predict()` and `.score()` methods. X_train (2d ndarray) a 2D numpy array of (observations, features). y_train (1d ndarray) a 1D array of target classes for X_train. X_val (2d ndarray) a 2D numpy array of (observations, features). y_val (1d ndarray) a 1D array of target classes for X_validate. criterion (str or None) model selection criterion. * 'aic': use Akaike Information Criterion. * 'bic': use Bayesian Information Criterion. * None: Use the model's default (i.e., call ``.score()``). verbose (bool) if True, print additional information as selection occurs. Defaults to True. """ def __init__(self, model, X_train, y_train, X_val, y_val, criterion=None, verbose=True): model_check(model) self._model = enforce_use_of_all_cpus(model) self._X_train = X_train self._y_train = y_train self._X_val = X_val self._y_val = y_val array_check(self) if criterion not in (None, 'aic', 'bic'): raise ValueError("`criterion` must be one of: None, 'aic', 'bic'.") self._criterion = criterion self._verbose = verbose self._total_number_of_features = get_n_features(X_train) def _fit_and_score(self, S, feature, algorithm): """Fit and score the model Args: S : list The list of features as found in `forward` and `backward()` feature : int The feature to add or drop. algorithm : str One of: 'forward', 'backward'. Returns : float The score of the model. """ if algorithm == 'forward': features = S + [feature] else: features = [f for f in S if f != feature] if feature else S self._model.fit(self._X_train[:, features], self._y_train) X_val, y_val = self._X_val[:, features], self._y_val if self._criterion == 'aic': # Note: We want to do selection against the validation # data, hence `X_train=X_val` and `y_train=y_val`. score = aic(self._model, X_train=X_val, y_train=y_val) elif self._criterion == 'bic': score = bic(self._model, X_train=X_val, y_train=y_val) else: score = self._model.score(X_val, y_val) return score @staticmethod def _do_not_skip(kwargs): """Check for skipping override by looking for `_do_not_skip` in keyword arguments If it is present, the loops in the algorithms will be run to exhaustion. Args: kwargs : dict Keyword Args Returns: Bool If `_do_not_skip` is not present or `_do_not_skip` is present and is True. Otherwise, the value of `do_not_skip` is returned. """ return kwargs.get('_do_not_skip', True) def _forward_break_criteria(self, S, min_change, best_j_score, j_score_dict, n_features): """Check if `forward()` should break Args: S : list The list of features as found in `forward` and `backward()` min_change : int or float, optional The smallest change to be considered significant. best_j_score : float The best score for a given iteration as evolved inside `forward()`. j_score_dict : dict A dictionary of scores in step 1. of `forward()`. n_features : int The `n_features` object as developed inide `forward()`. Returns: bool Whether or not `forward()` should halt. """ # a. Check if the algorithm should halt b/c of features themselves if not len(j_score_dict) or len(S) == self._total_number_of_features: return True # b. Break if the change was too small if isinstance(min_change, (int, float)) and best_j_score < min_change: return True # c. Break if the number of features in S > n_features. elif isinstance(n_features, int) and n_features > len(S): return True else: return False
[docs] def forward(self, n_features=0.5, min_change=None, **kwargs): """Perform Forward Selection on a Sklearn model. Args: n_features (int) the max. number of features to allow. Note: ``min_change`` must be None in order for ``n_features`` to operate. Floats will be regarded as proportions of the total that must lie on (0, 1). min_change (int or float) The smallest change to be considered significant. Note: `n_features` must be None in order for ``min_change`` to operate. kwargs (Keyword Args) * `_do_not_skip`: Explore loop exhaustion. **For internal use only**. Not intended for outside use. Returns: S (list) The column indices of ``X_train`` (and ``X_val``) that denote the chosen features. Raises: if ``n_features`` and ``min_change`` are both non-None. """ input_checks(locals()) S = list() best_score = None itera = list(range(self._total_number_of_features)) do_not_skip = self._do_not_skip(kwargs) if n_features and do_not_skip: n_features = parse_n_features(n_features, total=len(itera)) for i in range(self._total_number_of_features): if self._verbose: print("Iteration: {}".format(i)) if not do_not_skip: continue # 1. Find best feature, j, to add. j_score_dict = dict() for j in itera: j_model_score = self._fit_and_score(S, feature=j, algorithm='forward') if best_score is None or (j_model_score > best_score): j_score_dict[j] = j_model_score # 2. Save the best j to S, if possible. if len(j_score_dict): best_j = max(j_score_dict, key=j_score_dict.get) best_j_score = j_score_dict[best_j] # Update S, the best score and score history --- best_score = best_j_score # update the score to beat S.append(best_j) # add feature itera.remove(best_j) # no longer search over this feature. if self._forward_break_criteria(S, min_change=min_change, best_j_score=best_j_score, j_score_dict=j_score_dict, n_features=n_features): break
return S
[docs] def backward(self, n_features=0.5, min_change=None, **kwargs): """Perform Backward Selection on a Sklearn model. Args: n_features (int or float) The number of features to select. Floats will be regarded as proportions of the total that must lie on (0, 1). ``min_change`` must be None for ``n_features`` to operate. min_change (int or float) The smallest change to be considered significant. `n_features` must be None for ``min_change`` to operate. kwargs (Keyword Args) * `_do_not_skip` (bool): Explore loop exhaustion. **For internal use only**. Not intended for outside use. * `_last_score_punt` (bool): Relax `defeated_last_iter_score` decision boundary. **For internal use only**. Not intended for outside use. Returns: S (list) The column indices of `X_train` (and `X_val`) that denote the chosen features. Raises: if ``n_features`` and ``min_change`` are both non-None. """ input_checks(locals()) S = list(range(self._total_number_of_features)) # start with all features do_not_skip = self._do_not_skip(kwargs) last_score_punt = kwargs.get('_last_score_punt', False) if n_features and do_not_skip: n_features = parse_n_features(n_features, total=len(S)) last_iter_score = self._fit_and_score(S, feature=None, algorithm='backward') for i in range(self._total_number_of_features): if self._verbose: print("Iteration: {}".format(i)) if not do_not_skip: continue # 1. Hunt for the least predictive feature. best = {'feature': None, 'score': None, 'defeated_last_iter_score': True} for j in S: score = self._fit_and_score(S, feature=j, algorithm='backward') if best['score'] is None or score > best['score']: best = {'feature': j, 'score': score, 'defeated_last_iter_score': score > last_iter_score} to_drop, best_new_score = best['feature'], best['score'] # 2a. Halting Blindly Based on `n_features`. if isinstance(n_features, int): S.remove(to_drop) # blindly drop. last_iter_score = best_new_score if not len(S) == n_features: continue # i.e., ignore criteria below. else: break # 2b. Halt if the change is not longer considered significant. else: if best['defeated_last_iter_score'] or last_score_punt: if (best_new_score - last_iter_score) < min_change: break # there was a change, but it was not large enough. else: S.remove(to_drop) last_iter_score = best_new_score else: break
return S