diff --git a/examples/model_selection.py b/examples/model_selection.py index df051a0..141cf91 100644 --- a/examples/model_selection.py +++ b/examples/model_selection.py @@ -1,4 +1,5 @@ import quapy as qp +from quapy.method.non_aggregative import DMx from quapy.protocol import APP from quapy.method.aggregative import DMy from sklearn.linear_model import LogisticRegression @@ -38,7 +39,7 @@ with qp.util.temp_seed(0): param_grid = { 'classifier__C': np.logspace(-3,3,7), 'classifier__class_weight': ['balanced', None], - 'nbins': [8, 16, 32, 64], + 'nbins': [8, 16, 32, 64, 'poooo'], } tinit = time() diff --git a/quapy/model_selection.py b/quapy/model_selection.py index 5448d4d..9bd0985 100644 --- a/quapy/model_selection.py +++ b/quapy/model_selection.py @@ -3,6 +3,7 @@ import signal from copy import deepcopy from enum import Enum from typing import Union, Callable +from functools import wraps import numpy as np from sklearn import clone @@ -21,6 +22,56 @@ class Status(Enum): INVALID = 3 ERROR = 4 +def check_status(func): + @wraps(func) + def wrapper(*args, **kwargs): + obj = args[0] + tinit = time() + + job_descriptor = dict(args[1]) + params = {**job_descriptor.get('cls-params', {}), **job_descriptor.get('q-params', {})} + + if obj.timeout > 0: + def handler(signum, frame): + raise TimeoutError() + + signal.signal(signal.SIGALRM, handler) + signal.alarm(obj.timeout) + + try: + job_descriptor = func(*args, **kwargs) + + ttime = time() - tinit + + score = job_descriptor.get('score', None) + if score is not None: + obj._sout(f'hyperparams=[{params}]\t got {obj.error.__name__} = {score:.5f} [took {ttime:.4f}s]') + + if obj.timeout > 0: + signal.alarm(0) + + exit_status = Status.SUCCESS + + except TimeoutError: + obj._sout(f'timeout ({obj.timeout}s) reached for config {params}') + exit_status = Status.TIMEOUT + + except ValueError as e: + obj._sout(f'the combination of hyperparameters {params} is invalid') + obj._sout(f'\tException: {e}') + exit_status = Status.INVALID + + except Exception as e: + obj._sout(f'something went wrong for config {params}; skipping:') + obj._sout(f'\tException: {e}') + exit_status = Status.ERROR + + job_descriptor['status'] = exit_status + job_descriptor['params'] = params + return job_descriptor + return wrapper + + class GridSearchQ(BaseQuantifier): """Grid Search optimization targeting a quantification-oriented metric. @@ -76,184 +127,97 @@ class GridSearchQ(BaseQuantifier): raise ValueError(f'unexpected error type; must either be a callable function or a str representing\n' f'the name of an error function in {qp.error.QUANTIFICATION_ERROR_NAMES}') - def _fit_nonaggregative(self, training): + @check_status + def _prepare_classifier(self, args): + cls_params = args['cls-params'] + training = args['training'] + model = deepcopy(self.model) + model.set_params(**cls_params) + predictions = model.classifier_fit_predict(training) + return {'model': model, 'predictions': predictions, 'cls-params': cls_params} + + @check_status + def _prepare_aggregation(self, args): + # (partial_setup, q_params), training = args + model = args['model'] + predictions = args['predictions'] + cls_params = args['cls-params'] + q_params = args['q-params'] + training = args['training'] + + params = {**cls_params, **q_params} + + model = deepcopy(model) + # overrides default parameters with the parameters being explored at this iteration + model.set_params(**q_params) + model.aggregation_fit(predictions, training) + score = evaluation.evaluate(model, protocol=self.protocol, error_metric=self.error) + + return {'model': model, 'cls-params':cls_params, 'q-params': q_params, 'params': params, 'score': score} + + @check_status + def _prepare_model(self, args): + params, training = args + model = deepcopy(self.model) + # overrides default parameters with the parameters being explored at this iteration + model.set_params(**params) + model.fit(training) + score = evaluation.evaluate(model, protocol=self.protocol, error_metric=self.error) + return {'model': model, 'params': params, 'score': score} + + + def _compute_scores_aggregative(self, training): + + # break down the set of hyperparameters into two: classifier-specific, quantifier-specific + cls_configs, q_configs = group_params(self.param_grid) + + # train all classifiers and get the predictions + partial_setups = qp.util.parallel( + self._prepare_classifier, + ({'cls-params':params, 'training':training} for params in cls_configs), + seed=qp.environ.get('_R_SEED', None), + n_jobs=self.n_jobs, + asarray=False, + ) + + # filter out classifier configurations that yield any error + for setup in partial_setups: + if setup['status'] != Status.SUCCESS: + self._sout(f'-> classifier hyperparemters {setup["params"]} caused ' + f'error {setup["status"]} and will be ignored') + + partial_setups = [setup for setup in partial_setups if setup['status']==Status.SUCCESS] + + if len(partial_setups) == 0: + raise ValueError('No valid configuration found for the classifier.') + + # explore the quantifier-specific hyperparameters for each training configuration + scores = qp.util.parallel( + self._prepare_aggregation, + ({'q-params': setup[1], 'training': training, **setup[0]} for setup in itertools.product(partial_setups, q_configs)), + seed=qp.environ.get('_R_SEED', None), + n_jobs=self.n_jobs + ) + + return scores + + def _compute_scores_nonaggregative(self, training): configs = expand_grid(self.param_grid) - self._sout(f'starting model selection with {self.n_jobs =}') - #pass a seed to parallel so it is set in child processes + # pass a seed to parallel, so it is set in child processes scores = qp.util.parallel( - self._delayed_eval, + self._prepare_model, ((params, training) for params in configs), seed=qp.environ.get('_R_SEED', None), n_jobs=self.n_jobs ) return scores - def _delayed_fit_classifier(self, args): - cls_params, training = args - model = deepcopy(self.model) - model.set_params(**cls_params) - predictions = model.classifier_fit_predict(training) - return (model, predictions, cls_params) - - def _eval_aggregative(self, args): - ((model, predictions, cls_params), q_params), training = args - model = deepcopy(model) - # overrides default parameters with the parameters being explored at this iteration - model.set_params(**q_params) - model.aggregation_fit(predictions, training) - params = {**cls_params, **q_params} - return model, params - - def _delayed_evaluation__(self, args): - - exit_status = Status.SUCCESS - - tinit = time() - if self.timeout > 0: - def handler(signum, frame): - raise TimeoutError() - - signal.signal(signal.SIGALRM, handler) - signal.alarm(self.timeout) - - try: - model, params = self._eval_aggregative(args) - - score = evaluation.evaluate(model, protocol=self.protocol, error_metric=self.error) - - ttime = time() - tinit - self._sout(f'hyperparams=[{params}]\t got {self.error.__name__} score {score:.5f} [took {ttime:.4f}s]') - - if self.timeout > 0: - signal.alarm(0) - - except TimeoutError: - self._sout(f'timeout ({self.timeout}s) reached for config {params}') - score = None - exit_status = Status.TIMEOUT - - except ValueError as e: - self._sout(f'the combination of hyperparameters {params} is invalid') - score = None - exit_status = Status.INVALID - - except Exception as e: - self._sout(f'something went wrong for config {params}; skipping:') - self._sout(f'\tException: {e}') - score = None - exit_status = Status.ERROR - - - return params, score, model, exit_status - - # def _delayed_fit_aggregation_and_eval(self, args): - # - # ((model, predictions, cls_params), q_params), training = args - # exit_status = Status.SUCCESS - # - # tinit = time() - # if self.timeout > 0: - # def handler(signum, frame): - # raise TimeoutError() - # signal.signal(signal.SIGALRM, handler) - # signal.alarm(self.timeout) - # - # try: - # model = deepcopy(model) - # # overrides default parameters with the parameters being explored at this iteration - # model.set_params(**q_params) - # model.aggregation_fit(predictions, training) - # score = evaluation.evaluate(model, protocol=self.protocol, error_metric=self.error) - # - # ttime = time() - tinit - # self._sout(f'hyperparams=[cls:{cls_params}, q:{q_params}]\t got {self.error.__name__} score {score:.5f} [took {ttime:.4f}s]') - # - # if self.timeout > 0: - # signal.alarm(0) - # except TimeoutError: - # self._sout(f'timeout ({self.timeout}s) reached for config {q_params}') - # score = None - # exit_status = Status.TIMEOUT - # except ValueError as e: - # self._sout(f'the combination of hyperparameters {q_params} is invalid') - # score = None - # exit_status = Status.INVALID - # except Exception as e: - # self._sout(f'something went wrong for config {q_params}; skipping:') - # self._sout(f'\tException: {e}') - # score = None - # exit_status = Status.ERROR - # - # params = {**cls_params, **q_params} - # return params, score, model, exit_status - - def _delayed_eval(self, args): - params, training = args - - protocol = self.protocol - error = self.error - - if self.timeout > 0: - def handler(signum, frame): - raise TimeoutError() - - signal.signal(signal.SIGALRM, handler) - - tinit = time() - - if self.timeout > 0: - signal.alarm(self.timeout) - - try: - model = deepcopy(self.model) - # overrides default parameters with the parameters being explored at this iteration - model.set_params(**params) - model.fit(training) - score = evaluation.evaluate(model, protocol=protocol, error_metric=error) - - ttime = time()-tinit - self._sout(f'hyperparams={params}\t got {error.__name__} score {score:.5f} [took {ttime:.4f}s]') - - if self.timeout > 0: - signal.alarm(0) - except TimeoutError: - self._sout(f'timeout ({self.timeout}s) reached for config {params}') - score = None - except ValueError as e: - self._sout(f'the combination of hyperparameters {params} is invalid') - raise e - except Exception as e: - self._sout(f'something went wrong for config {params}; skipping:') - self._sout(f'\tException: {e}') - score = None - - return params, score, model, status - - def _fit_aggregative(self, training): - - # break down the set of hyperparameters into two: classifier-specific, quantifier-specific - cls_configs, q_configs = group_params(self.param_grid) - - # train all classifiers and get the predictions - models_preds_clsconfigs = qp.util.parallel( - self._delayed_fit_classifier, - ((params, training) for params in cls_configs), - seed=qp.environ.get('_R_SEED', None), - n_jobs=self.n_jobs, - asarray=False, - ) - - # explore the quantifier-specific hyperparameters for each training configuration - scores = qp.util.parallel( - self._delayed_fit_aggregation_and_eval, - ((setup, training) for setup in itertools.product(models_preds_clsconfigs, q_configs)), - seed=qp.environ.get('_R_SEED', None), - n_jobs=self.n_jobs - ) - - return scores - + def _compute_scores(self, training): + if isinstance(self.model, AggregativeQuantifier): + return self._compute_scores_aggregative(training) + else: + return self._compute_scores_nonaggregative(training) def fit(self, training: LabelledCollection): """ Learning routine. Fits methods with all combinations of hyperparameters and selects the one minimizing @@ -264,27 +228,29 @@ class GridSearchQ(BaseQuantifier): """ if self.refit and not isinstance(self.protocol, OnLabelledCollectionProtocol): - raise RuntimeWarning(f'"refit" was requested, but the protocol does not ' - f'implement the {OnLabelledCollectionProtocol.__name__} interface') + raise RuntimeWarning( + f'"refit" was requested, but the protocol does not implement ' + f'the {OnLabelledCollectionProtocol.__name__} interface' + ) tinit = time() - if isinstance(self.model, AggregativeQuantifier): - self.results = self._fit_aggregative(training) - else: - self.results = self._fit_nonaggregative(training) + self._sout(f'starting model selection with n_jobs={self.n_jobs}') + results = self._compute_scores(training) self.param_scores_ = {} self.best_score_ = None - for params, score, model in self.results: + for job_result in results: + score = job_result.get('score', None) + params = job_result['params'] if score is not None: if self.best_score_ is None or score < self.best_score_: self.best_score_ = score self.best_params_ = params - self.best_model_ = model + self.best_model_ = job_result['model'] self.param_scores_[str(params)] = score else: - self.param_scores_[str(params)] = 'timeout' + self.param_scores_[str(params)] = job_result['status'] tend = time()-tinit