updated unit tests

This commit is contained in:
Alejandro Moreo Fernandez 2024-04-16 15:12:22 +02:00
parent 99bc8508ac
commit 561b672200
14 changed files with 82 additions and 427 deletions

View File

@ -12,12 +12,11 @@ from time import time
In this example, we show how to perform model selection on a DistributionMatching quantifier.
"""
model = KDEyML(LogisticRegression())
model = DMy(LogisticRegression())
qp.environ['SAMPLE_SIZE'] = 100
qp.environ['N_JOBS'] = -1
# training, test = qp.datasets.fetch_reviews('imdb', tfidf=True, min_df=5).train_test
training, test = qp.datasets.fetch_UCIMulticlassDataset('letter').train_test
with qp.util.temp_seed(0):
@ -34,19 +33,21 @@ with qp.util.temp_seed(0):
# We will explore a classification-dependent hyper-parameter (e.g., the 'C'
# hyper-parameter of LogisticRegression) and a quantification-dependent hyper-parameter
# (e.g., the number of bins in a DistributionMatching quantifier.
# (e.g., the number of bins in a DistributionMatching quantifier).
# Classifier-dependent hyper-parameters have to be marked with a prefix "classifier__"
# in order to let the quantifier know this hyper-parameter belongs to its underlying
# classifier.
# We consider 7 values for the classifier and 7 values for the quantifier.
# QuaPy is optimized so that only 7 classifiers are trained, and then reused to test the
# different configurations of the quantifier. In other words, quapy avoids to train
# the classifier 7x7 times.
param_grid = {
'classifier__C': np.logspace(-3,3,7),
'classifier__class_weight': ['balanced', None],
'bandwidth': np.linspace(0.01, 0.2, 20),
'nbins': [2, 3, 4, 5, 10, 15, 20]
}
tinit = time()
# model = OLD_GridSearchQ(
model = qp.model_selection.GridSearchQ(
model=model,
param_grid=param_grid,

View File

@ -123,7 +123,7 @@ class LabelledCollection:
if len(prevs) == self.n_classes - 1:
prevs = prevs + (1 - sum(prevs),)
assert len(prevs) == self.n_classes, 'unexpected number of prevalences'
assert sum(prevs) == 1, f'prevalences ({prevs}) wrong range (sum={sum(prevs)})'
assert np.isclose(sum(prevs), 1), f'prevalences ({prevs}) wrong range (sum={sum(prevs)})'
# Decide how many instances should be taken for each class in order to satisfy the requested prevalence
# accurately, and the number of instances in the sample (exactly). If int(size * prevs[i]) (which is

View File

@ -50,7 +50,9 @@ UCI_MULTICLASS_DATASETS = ['dry-bean',
'digits',
'letter']
LEQUA2022_TASKS = ['T1A', 'T1B', 'T2A', 'T2B']
LEQUA2022_VECTOR_TASKS = ['T1A', 'T1B']
LEQUA2022_TEXT_TASKS = ['T2A', 'T2B']
LEQUA2022_TASKS = LEQUA2022_VECTOR_TASKS + LEQUA2022_TEXT_TASKS
_TXA_SAMPLE_SIZE = 250
_TXB_SAMPLE_SIZE = 1000
@ -209,7 +211,7 @@ def fetch_UCIBinaryDataset(dataset_name, data_home=None, test_split=0.3, verbose
:return: a :class:`quapy.data.base.Dataset` instance
"""
data = fetch_UCIBinaryLabelledCollection(dataset_name, data_home, verbose)
return Dataset(*data.split_stratified(1 - test_split, random_state=0))
return Dataset(*data.split_stratified(1 - test_split, random_state=0), name=dataset_name)
def fetch_UCIBinaryLabelledCollection(dataset_name, data_home=None, verbose=False) -> LabelledCollection:
@ -583,7 +585,7 @@ def fetch_UCIMulticlassDataset(dataset_name, data_home=None, test_split=0.3, ver
:return: a :class:`quapy.data.base.Dataset` instance
"""
data = fetch_UCIMulticlassLabelledCollection(dataset_name, data_home, verbose)
return Dataset(*data.split_stratified(1 - test_split, random_state=0))
return Dataset(*data.split_stratified(1 - test_split, random_state=0), name=dataset_name)
def fetch_UCIMulticlassLabelledCollection(dataset_name, data_home=None, verbose=False) -> LabelledCollection:

View File

@ -189,6 +189,19 @@ def check_prevalence_vector(prevalences: ArrayLike, raise_exception: bool=False,
return valid
def uniform_prevalence(n_classes):
"""
Returns a vector representing the uniform distribution for `n_classes`
:param n_classes: number of classes
:return: np.ndarray with all values 1/n_classes
"""
assert isinstance(n_classes, int) and n_classes>0, \
(f'param {n_classes} not understood; must be a positive integer representing the '
f'number of classes ')
return np.full(shape=n_classes, fill_value=1./n_classes)
def normalize_prevalence(prevalences: ArrayLike, method='l1'):
"""
Normalizes a vector or matrix of prevalence values. The normalization consists of applying a L1 normalization in
@ -606,3 +619,5 @@ def solve_adjustment(
raise ValueError(f"Solver {solver} not known.")
else:
raise ValueError(f'unknown {solver=}')

View File

@ -3,6 +3,7 @@ from . import aggregative
from . import non_aggregative
from . import meta
AGGREGATIVE_METHODS = {
aggregative.CC,
aggregative.ACC,

View File

@ -27,7 +27,7 @@ class ThresholdOptimization(BinaryAggregativeQuantifier):
:class:`quapy.data.base.LabelledCollection` (the split itself).
"""
def __init__(self, classifier: BaseEstimator, val_split=5, n_jobs=None):
def __init__(self, classifier: BaseEstimator, val_split=None, n_jobs=None):
self.classifier = classifier
self.val_split = val_split
self.n_jobs = qp._get_njobs(n_jobs)

View File

@ -82,6 +82,13 @@ class AggregativeQuantifier(BaseQuantifier, ABC):
:param data: a :class:`quapy.data.base.LabelledCollection` consisting of the training data
:param fit_classifier: whether to train the learner (default is True). Set to False if the
learner has been trained outside the quantifier.
:param val_split: specifies the data used for generating classifier predictions. This specification
can be made as float in (0, 1) indicating the proportion of stratified held-out validation set to
be extracted from the training set; or as an integer (default 5), indicating that the predictions
are to be generated in a `k`-fold cross-validation manner (with this integer indicating the value
for `k`); or as a collection defining the specific set of data to use for validation.
Alternatively, this set can be specified at fit time by indicating the exact set of data
on which the predictions are to be generated.
:return: self
"""
self._check_init_parameters()
@ -111,6 +118,12 @@ class AggregativeQuantifier(BaseQuantifier, ABC):
if fit_classifier:
self._check_non_empty_classes(data)
if predict_on is None:
if not fit_classifier:
predict_on = data
if isinstance(self.val_split, LabelledCollection) and self.val_split!=predict_on:
raise ValueError(f'{fit_classifier=} but a LabelledCollection was provided as val_split '
f'in __init__ that is not the same as the LabelledCollection provided in fit.')
if predict_on is None:
predict_on = self.val_split
@ -467,7 +480,7 @@ class ACC(AggregativeCrispQuantifier):
if self.method not in ACC.METHODS:
raise ValueError(f"unknown method; valid ones are {ACC.METHODS}")
if self.norm not in ACC.NORMALIZATIONS:
raise ValueError(f"unknown clipping; valid ones are {ACC.NORMALIZATIONS}")
raise ValueError(f"unknown normalization; valid ones are {ACC.NORMALIZATIONS}")
def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection):
"""
@ -577,8 +590,8 @@ class PACC(AggregativeSoftQuantifier):
raise ValueError(f"unknown solver; valid ones are {ACC.SOLVERS}")
if self.method not in ACC.METHODS:
raise ValueError(f"unknown method; valid ones are {ACC.METHODS}")
if self.clipping not in ACC.NORMALIZATIONS:
raise ValueError(f"unknown clipping; valid ones are {ACC.NORMALIZATIONS}")
if self.norm not in ACC.NORMALIZATIONS:
raise ValueError(f"unknown normalization; valid ones are {ACC.NORMALIZATIONS}")
def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection):
"""

View File

@ -54,7 +54,7 @@ class OneVsAll:
pass
def newOneVsAll(binary_quantifier, n_jobs=None):
def newOneVsAll(binary_quantifier: BaseQuantifier, n_jobs=None):
assert isinstance(binary_quantifier, BaseQuantifier), \
f'{binary_quantifier} does not seem to be a Quantifier'
if isinstance(binary_quantifier, qp.method.aggregative.AggregativeQuantifier):
@ -69,7 +69,7 @@ class OneVsAllGeneric(OneVsAll, BaseQuantifier):
quantifier for each class, and then l1-normalizes the outputs so that the class prevelence values sum up to 1.
"""
def __init__(self, binary_quantifier, n_jobs=None):
def __init__(self, binary_quantifier: BaseQuantifier, n_jobs=None):
assert isinstance(binary_quantifier, BaseQuantifier), \
f'{binary_quantifier} does not seem to be a Quantifier'
if isinstance(binary_quantifier, qp.method.aggregative.AggregativeQuantifier):

View File

@ -1,5 +0,0 @@
import pytest
def test_import():
import quapy as qp
assert qp.__version__ is not None

View File

@ -1,61 +0,0 @@
import pytest
from quapy.data.datasets import REVIEWS_SENTIMENT_DATASETS, TWITTER_SENTIMENT_DATASETS_TEST, \
TWITTER_SENTIMENT_DATASETS_TRAIN, UCI_BINARY_DATASETS, LEQUA2022_TASKS, UCI_MULTICLASS_DATASETS,\
fetch_reviews, fetch_twitter, fetch_UCIBinaryDataset, fetch_lequa2022, fetch_UCIMulticlassLabelledCollection
@pytest.mark.parametrize('dataset_name', REVIEWS_SENTIMENT_DATASETS)
def test_fetch_reviews(dataset_name):
dataset = fetch_reviews(dataset_name)
print(f'Dataset {dataset_name}')
print('Training set stats')
dataset.training.stats()
print('Test set stats')
dataset.test.stats()
@pytest.mark.parametrize('dataset_name', TWITTER_SENTIMENT_DATASETS_TEST + TWITTER_SENTIMENT_DATASETS_TRAIN)
def test_fetch_twitter(dataset_name):
try:
dataset = fetch_twitter(dataset_name)
except ValueError as ve:
if dataset_name == 'semeval' and ve.args[0].startswith(
'dataset "semeval" can only be used for model selection.'):
dataset = fetch_twitter(dataset_name, for_model_selection=True)
print(f'Dataset {dataset_name}')
print('Training set stats')
dataset.training.stats()
print('Test set stats')
@pytest.mark.parametrize('dataset_name', UCI_BINARY_DATASETS)
def test_fetch_UCIDataset(dataset_name):
try:
dataset = fetch_UCIBinaryDataset(dataset_name)
except FileNotFoundError as fnfe:
if dataset_name == 'pageblocks.5' and fnfe.args[0].find(
'If this is the first time you attempt to load this dataset') > 0:
print('The pageblocks.5 dataset requires some hand processing to be usable, skipping this test.')
return
print(f'Dataset {dataset_name}')
print('Training set stats')
dataset.training.stats()
print('Test set stats')
@pytest.mark.parametrize('dataset_name', UCI_MULTICLASS_DATASETS)
def test_fetch_UCIMultiDataset(dataset_name):
dataset = fetch_UCIMulticlassLabelledCollection(dataset_name)
print(f'Dataset {dataset_name}')
print('Training set stats')
dataset.stats()
print('Test set stats')
@pytest.mark.parametrize('dataset_name', LEQUA2022_TASKS)
def test_fetch_lequa2022(dataset_name):
train, gen_val, gen_test = fetch_lequa2022(dataset_name)
print(train.stats())
print('Val:', gen_val.total())
print('Test:', gen_test.total())

View File

@ -1,65 +0,0 @@
import unittest
from sklearn.linear_model import LogisticRegression
from quapy.method import AGGREGATIVE_METHODS, BINARY_METHODS
from quapy.method.aggregative import *
import inspect
class HierarchyTestCase(unittest.TestCase):
def test_aggregative(self):
lr = LogisticRegression()
for m in AGGREGATIVE_METHODS:
self.assertEqual(isinstance(m(lr), AggregativeQuantifier), True)
def test_inspect_aggregative(self):
import quapy.method.aggregative as aggregative
members = inspect.getmembers(aggregative)
classes = set([cls for name, cls in members if inspect.isclass(cls)])
quantifiers = [cls for cls in classes if issubclass(cls, BaseQuantifier)]
quantifiers = [cls for cls in quantifiers if issubclass(cls, AggregativeQuantifier)]
quantifiers = [cls for cls in quantifiers if not inspect.isabstract(cls) ]
for cls in quantifiers:
self.assertIn(cls, AGGREGATIVE_METHODS)
def test_binary(self):
lr = LogisticRegression()
for m in BINARY_METHODS:
self.assertEqual(isinstance(m(lr), BinaryQuantifier), True)
def test_inspect_binary(self):
import quapy.method.base as base
import quapy.method.aggregative as aggregative
import quapy.method.non_aggregative as non_aggregative
import quapy.method.meta as meta
members = inspect.getmembers(base)
members+= inspect.getmembers(aggregative)
members += inspect.getmembers(non_aggregative)
members += inspect.getmembers(meta)
classes = set([cls for name, cls in members if inspect.isclass(cls)])
quantifiers = [cls for cls in classes if issubclass(cls, BaseQuantifier)]
quantifiers = [cls for cls in quantifiers if issubclass(cls, BinaryQuantifier)]
quantifiers = [cls for cls in quantifiers if not inspect.isabstract(cls) ]
for cls in quantifiers:
self.assertIn(cls, BINARY_METHODS)
def test_probabilistic(self):
lr = LogisticRegression()
for m in [CC(lr), ACC(lr)]:
self.assertEqual(isinstance(m, AggregativeCrispQuantifier), True)
self.assertEqual(isinstance(m, AggregativeSoftQuantifier), False)
for m in [PCC(lr), PACC(lr)]:
self.assertEqual(isinstance(m, AggregativeCrispQuantifier), False)
self.assertEqual(isinstance(m, AggregativeSoftQuantifier), True)
if __name__ == '__main__':
unittest.main()

View File

@ -1,234 +0,0 @@
import numpy as np
import pytest
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC
import method.aggregative
import quapy as qp
from quapy.model_selection import GridSearchQ
from quapy.method.base import BinaryQuantifier
from quapy.data import Dataset, LabelledCollection
from quapy.method import AGGREGATIVE_METHODS, NON_AGGREGATIVE_METHODS
from quapy.method.meta import Ensemble
from quapy.protocol import APP
from quapy.method.aggregative import DMy
from quapy.method.meta import MedianEstimator
# datasets = [pytest.param(qp.datasets.fetch_twitter('hcr', pickle=True), id='hcr'),
# pytest.param(qp.datasets.fetch_UCIDataset('ionosphere'), id='ionosphere')]
tinydatasets = [pytest.param(qp.datasets.fetch_twitter('hcr', pickle=True).reduce(), id='tiny_hcr'),
pytest.param(qp.datasets.fetch_UCIBinaryDataset('ionosphere').reduce(), id='tiny_ionosphere')]
learners = [LogisticRegression, LinearSVC]
@pytest.mark.parametrize('dataset', tinydatasets)
@pytest.mark.parametrize('aggregative_method', AGGREGATIVE_METHODS)
@pytest.mark.parametrize('learner', learners)
def test_aggregative_methods(dataset: Dataset, aggregative_method, learner):
model = aggregative_method(learner())
if isinstance(model, BinaryQuantifier) and not dataset.binary:
print(f'skipping the test of binary model {type(model)} on non-binary dataset {dataset}')
return
model.fit(dataset.training)
estim_prevalences = model.quantify(dataset.test.instances)
true_prevalences = dataset.test.prevalence()
error = qp.error.mae(true_prevalences, estim_prevalences)
assert type(error) == np.float64
@pytest.mark.parametrize('dataset', tinydatasets)
@pytest.mark.parametrize('non_aggregative_method', NON_AGGREGATIVE_METHODS)
def test_non_aggregative_methods(dataset: Dataset, non_aggregative_method):
model = non_aggregative_method()
if isinstance(model, BinaryQuantifier) and not dataset.binary:
print(f'skipping the test of binary model {model} on non-binary dataset {dataset}')
return
model.fit(dataset.training)
estim_prevalences = model.quantify(dataset.test.instances)
true_prevalences = dataset.test.prevalence()
error = qp.error.mae(true_prevalences, estim_prevalences)
assert type(error) == np.float64
@pytest.mark.parametrize('base_method', [method.aggregative.ACC, method.aggregative.PACC])
@pytest.mark.parametrize('learner', [LogisticRegression])
@pytest.mark.parametrize('dataset', tinydatasets)
@pytest.mark.parametrize('policy', Ensemble.VALID_POLICIES)
def test_ensemble_method(base_method, learner, dataset: Dataset, policy):
qp.environ['SAMPLE_SIZE'] = 20
base_quantifier=base_method(learner())
if not dataset.binary and policy=='ds':
print(f'skipping the test of binary policy ds on non-binary dataset {dataset}')
return
model = Ensemble(quantifier=base_quantifier, size=3, policy=policy, n_jobs=-1)
model.fit(dataset.training)
estim_prevalences = model.quantify(dataset.test.instances)
true_prevalences = dataset.test.prevalence()
error = qp.error.mae(true_prevalences, estim_prevalences)
assert type(error) == np.float64
def test_quanet_method():
try:
import quapy.classification.neural
except ModuleNotFoundError:
print('skipping QuaNet test due to missing torch package')
return
qp.environ['SAMPLE_SIZE'] = 100
# load the kindle dataset as text, and convert words to numerical indexes
dataset = qp.datasets.fetch_reviews('kindle', pickle=True).reduce(200, 200)
qp.data.preprocessing.index(dataset, min_df=5, inplace=True)
from quapy.classification.neural import CNNnet
cnn = CNNnet(dataset.vocabulary_size, dataset.n_classes)
from quapy.classification.neural import NeuralClassifierTrainer
learner = NeuralClassifierTrainer(cnn, device='cuda')
from quapy.method.meta import QuaNet
model = QuaNet(learner, device='cuda')
if isinstance(model, BinaryQuantifier) and not dataset.binary:
print(f'skipping the test of binary model {model} on non-binary dataset {dataset}')
return
model.fit(dataset.training)
estim_prevalences = model.quantify(dataset.test.instances)
true_prevalences = dataset.test.prevalence()
error = qp.error.mae(true_prevalences, estim_prevalences)
assert type(error) == np.float64
def test_str_label_names():
model = qp.method.aggregative.CC(LogisticRegression())
dataset = qp.datasets.fetch_reviews('imdb', pickle=True)
dataset = Dataset(dataset.training.sampling(1000, *dataset.training.prevalence()),
dataset.test.sampling(1000, 0.25, 0.75))
qp.data.preprocessing.text2tfidf(dataset, min_df=5, inplace=True)
np.random.seed(0)
model.fit(dataset.training)
int_estim_prevalences = model.quantify(dataset.test.instances)
true_prevalences = dataset.test.prevalence()
error = qp.error.mae(true_prevalences, int_estim_prevalences)
assert type(error) == np.float64
dataset_str = Dataset(LabelledCollection(dataset.training.instances,
['one' if label == 1 else 'zero' for label in dataset.training.labels]),
LabelledCollection(dataset.test.instances,
['one' if label == 1 else 'zero' for label in dataset.test.labels]))
assert all(dataset_str.training.classes_ == dataset_str.test.classes_), 'wrong indexation'
np.random.seed(0)
model.fit(dataset_str.training)
str_estim_prevalences = model.quantify(dataset_str.test.instances)
true_prevalences = dataset_str.test.prevalence()
error = qp.error.mae(true_prevalences, str_estim_prevalences)
assert type(error) == np.float64
print(true_prevalences)
print(int_estim_prevalences)
print(str_estim_prevalences)
np.testing.assert_almost_equal(int_estim_prevalences[1],
str_estim_prevalences[list(model.classes_).index('one')])
# helper
def __fit_test(quantifier, train, test):
quantifier.fit(train)
test_samples = APP(test)
true_prevs, estim_prevs = qp.evaluation.prediction(quantifier, test_samples)
return qp.error.mae(true_prevs, estim_prevs), estim_prevs
def test_median_meta():
"""
This test compares the performance of the MedianQuantifier with respect to computing the median of the predictions
of a differently parameterized quantifier. We use the DistributionMatching base quantifier and the median is
computed across different values of nbins
"""
qp.environ['SAMPLE_SIZE'] = 100
# grid of values
nbins_grid = list(range(2, 11))
dataset = 'kindle'
train, test = qp.datasets.fetch_reviews(dataset, tfidf=True, min_df=10).train_test
prevs = []
errors = []
for nbins in nbins_grid:
with qp.util.temp_seed(0):
q = DMy(LogisticRegression(), nbins=nbins)
mae, estim_prevs = __fit_test(q, train, test)
prevs.append(estim_prevs)
errors.append(mae)
print(f'{dataset} DistributionMatching(nbins={nbins}) got MAE {mae:.4f}')
prevs = np.asarray(prevs)
mae = np.mean(errors)
print(f'\tMAE={mae:.4f}')
q = DMy(LogisticRegression())
q = MedianEstimator(q, param_grid={'nbins': nbins_grid}, random_state=0, n_jobs=-1)
median_mae, prev = __fit_test(q, train, test)
print(f'\tMAE={median_mae:.4f}')
np.testing.assert_almost_equal(np.median(prevs, axis=0), prev)
assert median_mae < mae, 'the median-based quantifier provided a higher error...'
def test_median_meta_modsel():
"""
This test checks the median-meta quantifier with model selection
"""
qp.environ['SAMPLE_SIZE'] = 100
dataset = 'kindle'
train, test = qp.datasets.fetch_reviews(dataset, tfidf=True, min_df=10).train_test
train, val = train.split_stratified(random_state=0)
nbins_grid = [2, 4, 5, 10, 15]
q = DMy(LogisticRegression())
q = MedianEstimator(q, param_grid={'nbins': nbins_grid}, random_state=0, n_jobs=-1)
median_mae, _ = __fit_test(q, train, test)
print(f'\tMAE={median_mae:.4f}')
q = DMy(LogisticRegression())
lr_params = {'classifier__C': np.logspace(-1, 1, 3)}
q = MedianEstimator(q, param_grid={'nbins': nbins_grid}, random_state=0, n_jobs=-1)
q = GridSearchQ(q, param_grid=lr_params, protocol=APP(val), n_jobs=-1)
optimized_median_ave, _ = __fit_test(q, train, test)
print(f'\tMAE={optimized_median_ave:.4f}')
assert optimized_median_ave < median_mae, "the optimized method yielded worse performance..."

View File

@ -2,9 +2,9 @@ import unittest
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
import quapy as qp
import util
from quapy.method.aggregative import PACC
from quapy.model_selection import GridSearchQ
from quapy.protocol import APP
@ -14,13 +14,16 @@ import time
class ModselTestCase(unittest.TestCase):
def test_modsel(self):
"""
Checks whether a model selection exploration takes a good hyperparameter
"""
q = PACC(LogisticRegression(random_state=1, max_iter=5000))
data = qp.datasets.fetch_reviews('imdb', tfidf=True, min_df=10)
data = qp.datasets.fetch_reviews('imdb', tfidf=True, min_df=10).reduce()
training, validation = data.training.split_stratified(0.7, random_state=1)
param_grid = {'classifier__C': np.logspace(-3,3,7)}
param_grid = {'classifier__C': [0.000001, 10.]}
app = APP(validation, sample_size=100, random_state=1)
q = GridSearchQ(
q, param_grid, protocol=app, error='mae', refit=True, timeout=-1, verbose=True
@ -32,54 +35,40 @@ class ModselTestCase(unittest.TestCase):
self.assertEqual(q.best_model().get_params()['classifier__C'], 10.0)
def test_modsel_parallel(self):
"""
Checks whether a parallelized model selection actually is faster than a sequential exploration but
obtains the same optimal parameters
"""
q = PACC(LogisticRegression(random_state=1, max_iter=5000))
data = qp.datasets.fetch_reviews('imdb', tfidf=True, min_df=10)
data = qp.datasets.fetch_reviews('imdb', tfidf=True, min_df=10).reduce(n_train=500)
training, validation = data.training.split_stratified(0.7, random_state=1)
# test = data.test
param_grid = {'classifier__C': np.logspace(-3,3,7)}
app = APP(validation, sample_size=100, random_state=1)
q = GridSearchQ(
print('starting model selection in sequential exploration')
tinit = time.time()
modsel = GridSearchQ(
q, param_grid, protocol=app, error='mae', refit=True, timeout=-1, n_jobs=1, verbose=True
).fit(training)
tend_seq = time.time()-tinit
best_c_seq = modsel.best_params_['classifier__C']
print(f'[done] took {tend_seq:.2f}s best C = {best_c_seq}')
print('starting model selection in parallel exploration')
tinit = time.time()
modsel = GridSearchQ(
q, param_grid, protocol=app, error='mae', refit=True, timeout=-1, n_jobs=-1, verbose=True
).fit(training)
print('best params', q.best_params_)
print('best score', q.best_score_)
tend_par = time.time() - tinit
best_c_par = modsel.best_params_['classifier__C']
print(f'[done] took {tend_par:.2f}s best C = {best_c_par}')
self.assertEqual(q.best_params_['classifier__C'], 10.0)
self.assertEqual(q.best_model().get_params()['classifier__C'], 10.0)
self.assertEqual(best_c_seq, best_c_par)
self.assertLess(tend_par, tend_seq)
def test_modsel_parallel_speedup(self):
class SlowLR(LogisticRegression):
def fit(self, X, y, sample_weight=None):
time.sleep(1)
return super(SlowLR, self).fit(X, y, sample_weight)
q = PACC(SlowLR(random_state=1, max_iter=5000))
data = qp.datasets.fetch_reviews('imdb', tfidf=True, min_df=10)
training, validation = data.training.split_stratified(0.7, random_state=1)
param_grid = {'classifier__C': np.logspace(-3, 3, 7)}
app = APP(validation, sample_size=100, random_state=1)
tinit = time.time()
GridSearchQ(
q, param_grid, protocol=app, error='mae', refit=False, timeout=-1, n_jobs=1, verbose=True
).fit(training)
tend_nooptim = time.time()-tinit
tinit = time.time()
GridSearchQ(
q, param_grid, protocol=app, error='mae', refit=False, timeout=-1, n_jobs=-1, verbose=True
).fit(training)
tend_optim = time.time() - tinit
print(f'parallel training took {tend_optim:.4f}s')
print(f'sequential training took {tend_nooptim:.4f}s')
self.assertEqual(tend_optim < (0.5*tend_nooptim), True)
def test_modsel_timeout(self):
@ -91,11 +80,10 @@ class ModselTestCase(unittest.TestCase):
q = PACC(SlowLR())
data = qp.datasets.fetch_reviews('imdb', tfidf=True, min_df=10)
data = qp.datasets.fetch_reviews('imdb', tfidf=True, min_df=10).reduce()
training, validation = data.training.split_stratified(0.7, random_state=1)
# test = data.test
param_grid = {'classifier__C': np.logspace(-3,3,7)}
param_grid = {'classifier__C': np.logspace(-1,1,3)}
app = APP(validation, sample_size=100, random_state=1)
print('Expecting TimeoutError to be raised')

View File

@ -8,7 +8,7 @@ from quapy.method.aggregative import PACC
import quapy.functional as F
class MyTestCase(unittest.TestCase):
class TestReplicability(unittest.TestCase):
def test_prediction_replicability(self):
@ -26,7 +26,7 @@ class MyTestCase(unittest.TestCase):
prev2 = pacc.fit(dataset.training).quantify(dataset.test.X)
str_prev2 = strprev(prev2, prec=5)
self.assertEqual(str_prev1, str_prev2) # add assertion here
self.assertEqual(str_prev1, str_prev2)
def test_samping_replicability(self):
@ -78,7 +78,7 @@ class MyTestCase(unittest.TestCase):
def test_parallel_replicability(self):
train, test = qp.datasets.fetch_UCIMulticlassDataset('dry-bean').train_test
train, test = qp.datasets.fetch_UCIMulticlassDataset('dry-bean').reduce().train_test
test = test.sampling(500, *[0.1, 0.0, 0.1, 0.1, 0.2, 0.5, 0.0])