From 45642ad7789d37b96262102da3b3f3a9dbfb9d97 Mon Sep 17 00:00:00 2001
From: Alejandro Moreo <alejandro.moreo@isti.cnr.it>
Date: Wed, 1 Jun 2022 18:28:59 +0200
Subject: [PATCH] lequa as dataset

---
 quapy/CHANGE_LOG.txt           | 16 ++++++++-
 quapy/data/datasets.py         | 53 ++++++++++++++++++++++++++++-
 quapy/evaluation.py            | 12 +++----
 quapy/method/meta.py           |  2 +-
 quapy/model_selection.py       |  7 +++-
 quapy/protocol.py              | 62 ++++++++++++++++++----------------
 quapy/tests/test_datasets.py   | 13 ++++++-
 quapy/tests/test_evaluation.py | 10 +++---
 quapy/tests/test_modsel.py     | 33 +++++++++++++++++-
 quapy/tests/test_protocols.py  |  5 ++-
 quapy/util.py                  |  1 +
 11 files changed, 163 insertions(+), 51 deletions(-)

diff --git a/quapy/CHANGE_LOG.txt b/quapy/CHANGE_LOG.txt
index fe39fc3..ab03b01 100644
--- a/quapy/CHANGE_LOG.txt
+++ b/quapy/CHANGE_LOG.txt
@@ -9,9 +9,19 @@
 
 - ACC, PACC, Forman's threshold variants have been parallelized.
 
+- Exploration of hyperparameters in Model selection can now be run in parallel (it was a n_jobs argument in
+    QuaPy 0.1.6 but only the evaluation part for one specific hyperparameter was run in parallel).
+
+- The prediction function has been refactored, so it applies the optimization for aggregative quantifiers (that
+    consists in pre-classifying all instances, and then only invoking aggregate on the samples) only in cases in
+    which the total number of classifications would be smaller than the number of classifications with the standard
+    procedure. The user can now specify "force", "auto", True of False, in order to actively decide for applying it
+    or not.
 
 Things to fix:
-- clean functions like binary, aggregative, probabilistic, etc; those should be resolved via isinstance()
+- clean functions like binary, aggregative, probabilistic, etc; those should be resolved via isinstance():
+    this is not working; I don't know how to make the isinstance work. Looks like there is some problem with the
+    path of the imported class wrt the path of the class that arrives from another module...
 - clean classes_ and n_classes from methods (maybe not from aggregative ones, but those have to be used only
     internally and not imposed in any abstract class)
 - optimize "qp.evaluation.prediction" for aggregative methods (pre-classification)
@@ -33,6 +43,10 @@ Things to fix:
     stuff).
 - Check method  def __parallel(self, func, *args, **kwargs) in aggregative.OneVsAll
 
+New features:
+- Add LeQua2022 to datasets (everything automatic, and with proper protocols "gen")
+- Add an "experimental room", with scripts to quickly test new ideas and see results.
+
 # 0.1.7
 # change the LabelledCollection API (removing protocol-related samplings)
 # need to change the two references to the above in the wiki / doc, and code examples...
diff --git a/quapy/data/datasets.py b/quapy/data/datasets.py
index 74e2a3e..06ba3d0 100644
--- a/quapy/data/datasets.py
+++ b/quapy/data/datasets.py
@@ -43,6 +43,8 @@ UCI_DATASETS = ['acute.a', 'acute.b',
                 'wine-q-red', 'wine-q-white',
                 'yeast']
 
+LEQUA2022_TASKS = ['T1A', 'T1B', 'T2A', 'T2B']
+
 
 def fetch_reviews(dataset_name, tfidf=False, min_df=None, data_home=None, pickle=False) -> Dataset:
     """
@@ -532,4 +534,53 @@ def fetch_UCILabelledCollection(dataset_name, data_home=None, verbose=False) ->
 
 
 def _df_replace(df, col, repl={'yes': 1, 'no':0}, astype=float):
-    df[col] = df[col].apply(lambda x:repl[x]).astype(astype, copy=False)
\ No newline at end of file
+    df[col] = df[col].apply(lambda x:repl[x]).astype(astype, copy=False)
+
+
+def fetch_lequa2022(task, data_home=None):
+    """
+    """
+    from quapy.data._lequa2022 import load_raw_documents, load_vector_documents, SamplesFromDir
+
+    assert task in LEQUA2022_TASKS, \
+        f'Unknown task {task}. Valid ones are {LEQUA2022_TASKS}'
+    if data_home is None:
+        data_home = get_quapy_home()
+
+    URL_TRAINDEV=f'https://zenodo.org/record/6546188/files/{task}.train_dev.zip'
+    URL_TEST=f'https://zenodo.org/record/6546188/files/{task}.test.zip'
+    URL_TEST_PREV=f'https://zenodo.org/record/6546188/files/{task}.test_prevalences.zip'
+
+    lequa_dir = join(data_home, 'lequa2022')
+    os.makedirs(lequa_dir, exist_ok=True)
+
+    def download_unzip_and_remove(unzipped_path, url):
+        tmp_path = join(lequa_dir, task + '_tmp.zip')
+        download_file_if_not_exists(url, tmp_path)
+        with zipfile.ZipFile(tmp_path) as file:
+            file.extractall(unzipped_path)
+        os.remove(tmp_path)
+
+    if not os.path.exists(join(lequa_dir, task)):
+        download_unzip_and_remove(lequa_dir, URL_TRAINDEV)
+        download_unzip_and_remove(lequa_dir, URL_TEST)
+        download_unzip_and_remove(lequa_dir, URL_TEST_PREV)
+
+    if task in ['T1A', 'T1B']:
+        load_fn = load_vector_documents
+    elif task in ['T2A', 'T2B']:
+        load_fn = load_raw_documents
+
+    tr_path = join(lequa_dir, task, 'public', 'training_data.txt')
+    train = LabelledCollection.load(tr_path, loader_func=load_fn)
+
+    val_samples_path = join(lequa_dir, task, 'public', 'dev_samples')
+    val_true_prev_path = join(lequa_dir, task, 'public', 'dev_prevalences.txt')
+    val_gen = SamplesFromDir(val_samples_path, val_true_prev_path, load_fn=load_fn)
+
+    test_samples_path = join(lequa_dir, task, 'public', 'dev_samples')
+    test_true_prev_path = join(lequa_dir, task, 'public', 'test_prevalences.txt')
+    test_gen = SamplesFromDir(val_samples_path, val_true_prev_path, load_fn=load_fn)
+
+    return train, val_gen, test_gen
+
diff --git a/quapy/evaluation.py b/quapy/evaluation.py
index d32cfb7..57c2ed1 100644
--- a/quapy/evaluation.py
+++ b/quapy/evaluation.py
@@ -1,13 +1,9 @@
 from typing import Union, Callable, Iterable
 import numpy as np
 from tqdm import tqdm
-import inspect
 import quapy as qp
 from quapy.protocol import AbstractProtocol, OnLabelledCollectionProtocol
-from quapy.data import LabelledCollection
 from quapy.method.base import BaseQuantifier
-from quapy.util import temp_seed
-import quapy.functional as F
 import pandas as pd
 
 
@@ -22,7 +18,7 @@ def prediction(model: BaseQuantifier, protocol: AbstractProtocol, aggr_speedup='
         # checks whether the prediction can be made more efficiently; this check consists in verifying if the model is
         # of type aggregative, if the protocol is based on LabelledCollection, and if the total number of documents to
         # classify using the protocol would exceed the number of test documents in the original collection
-        from method.aggregative import AggregativeQuantifier
+        from quapy.method.aggregative import AggregativeQuantifier
         if isinstance(model, AggregativeQuantifier) and isinstance(protocol, OnLabelledCollectionProtocol):
             if aggr_speedup == 'force':
                 apply_optimization = True
@@ -45,9 +41,9 @@ def prediction(model: BaseQuantifier, protocol: AbstractProtocol, aggr_speedup='
 
 def __prediction_helper(quantification_fn, protocol: AbstractProtocol, verbose=False):
     true_prevs, estim_prevs = [], []
-    for sample in tqdm(protocol(), total=protocol.total()) if verbose else protocol():
-        estim_prevs.append(quantification_fn(sample.instances))
-        true_prevs.append(sample.prevalence())
+    for sample_instances, sample_prev in tqdm(protocol(), total=protocol.total()) if verbose else protocol():
+        estim_prevs.append(quantification_fn(sample_instances))
+        true_prevs.append(sample_prev)
 
     true_prevs = np.asarray(true_prevs)
     estim_prevs = np.asarray(estim_prevs)
diff --git a/quapy/method/meta.py b/quapy/method/meta.py
index 3e57652..d5e8c2a 100644
--- a/quapy/method/meta.py
+++ b/quapy/method/meta.py
@@ -9,7 +9,6 @@ from tqdm import tqdm
 import quapy as qp
 from quapy import functional as F
 from quapy.data import LabelledCollection
-from quapy.evaluation import evaluate
 from quapy.model_selection import GridSearchQ
 
 try:
@@ -176,6 +175,7 @@ class Ensemble(BaseQuantifier):
         For each model in the ensemble, the performance is measured in terms of _error_name_ on the quantification of
         the samples used for training the rest of the models in the ensemble.
         """
+        from quapy.evaluation import evaluate
         error = qp.error.from_name(error_name)
         tests = [m[3] for m in self.ensemble]
         scores = []
diff --git a/quapy/model_selection.py b/quapy/model_selection.py
index c1fa817..7d71023 100644
--- a/quapy/model_selection.py
+++ b/quapy/model_selection.py
@@ -81,6 +81,8 @@ class GridSearchQ(BaseQuantifier):
         self.param_scores_ = {}
         self.best_score_ = None
 
+        tinit = time()
+
         hyper = [dict({k: values[i] for i, k in enumerate(params_keys)}) for values in itertools.product(*params_values)]
         scores = qp.util.parallel(self._delayed_eval, ((params, training) for params in hyper), n_jobs=n_jobs)
 
@@ -94,10 +96,13 @@ class GridSearchQ(BaseQuantifier):
             else:
                 self.param_scores_[str(params)] = 'timeout'
 
+        tend = time()-tinit
+
         if self.best_score_ is None:
             raise TimeoutError('all jobs took more than the timeout time to end')
 
-        self._sout(f'optimization finished: best params {self.best_params_} (score={self.best_score_:.5f})')
+        self._sout(f'optimization finished: best params {self.best_params_} (score={self.best_score_:.5f}) '
+                   f'[took {tend:.4f}s]')
 
         if self.refit:
             if isinstance(protocol, OnLabelledCollectionProtocol):
diff --git a/quapy/protocol.py b/quapy/protocol.py
index d74e797..f539830 100644
--- a/quapy/protocol.py
+++ b/quapy/protocol.py
@@ -1,14 +1,11 @@
 from copy import deepcopy
-
 import quapy as qp
 import numpy as np
 import itertools
-from collections.abc import Generator
 from contextlib import ExitStack
 from abc import ABCMeta, abstractmethod
 from quapy.data import LabelledCollection
 import quapy.functional as F
-from tqdm import tqdm
 from os.path import exists
 from glob import glob
 
@@ -87,10 +84,14 @@ class AbstractStochasticSeededProtocol(AbstractProtocol):
             if self.random_seed is not None:
                 stack.enter_context(qp.util.temp_seed(self.random_seed))
             for params in self.samples_parameters():
-                yield self.sample(params)
+                yield self.collator_fn(self.sample(params))
+
+    def set_collator(self, collator_fn):
+        self.collator_fn = collator_fn
 
 
 class OnLabelledCollectionProtocol:
+
     def get_labelled_collection(self):
         return self.data
 
@@ -106,31 +107,6 @@ class OnLabelledCollectionProtocol:
             return new.on_preclassified_instances(pre_classifications, in_place=True)
 
 
-class LoadSamplesFromDirectory(AbstractProtocol):
-
-    def __init__(self, folder_path, loader_fn, classes=None, **loader_kwargs):
-        assert exists(folder_path), f'folder {folder_path} does not exist'
-        assert callable(loader_fn), f'the passed load_fn does not seem to be callable'
-        self.folder_path = folder_path
-        self.loader_fn = loader_fn
-        self.classes = classes
-        self.loader_kwargs = loader_kwargs
-        self._list_files = None
-
-    def __call__(self):
-        for file in self.list_files:
-            yield LabelledCollection.load(file, loader_func=self.loader_fn, classes=self.classes, **self.loader_kwargs)
-
-    @property
-    def list_files(self):
-        if self._list_files is None:
-            self._list_files = sorted(glob(self.folder_path, '*'))
-        return self._list_files
-
-    def total(self):
-        return len(self.list_files)
-
-
 class APP(AbstractStochasticSeededProtocol, OnLabelledCollectionProtocol):
     """
     Implementation of the artificial prevalence protocol (APP).
@@ -154,6 +130,7 @@ class APP(AbstractStochasticSeededProtocol, OnLabelledCollectionProtocol):
         self.sample_size = sample_size
         self.n_prevalences = n_prevalences
         self.repeats = repeats
+        self.set_collator(collator_fn=lambda x: (x.instances, x.prevalence()))
 
     def prevalence_grid(self):
         """
@@ -210,6 +187,7 @@ class NPP(AbstractStochasticSeededProtocol, OnLabelledCollectionProtocol):
         self.sample_size = sample_size
         self.repeats = repeats
         self.random_seed = random_seed
+        self.set_collator(collator_fn=lambda x: (x.instances, x.prevalence()))
 
     def samples_parameters(self):
         indexes = []
@@ -246,6 +224,7 @@ class USimplexPP(AbstractStochasticSeededProtocol, OnLabelledCollectionProtocol)
         self.sample_size = sample_size
         self.repeats = repeats
         self.random_seed = random_seed
+        self.set_collator(collator_fn=lambda x: (x.instances, x.prevalence()))
 
     def samples_parameters(self):
         indexes = []
@@ -261,6 +240,31 @@ class USimplexPP(AbstractStochasticSeededProtocol, OnLabelledCollectionProtocol)
         return self.repeats
 
 
+# class LoadSamplesFromDirectory(AbstractProtocol):
+#
+#     def __init__(self, folder_path, loader_fn, classes=None, **loader_kwargs):
+#         assert exists(folder_path), f'folder {folder_path} does not exist'
+#         assert callable(loader_fn), f'the passed load_fn does not seem to be callable'
+#         self.folder_path = folder_path
+#         self.loader_fn = loader_fn
+#         self.classes = classes
+#         self.loader_kwargs = loader_kwargs
+#         self._list_files = None
+#
+#     def __call__(self):
+#         for file in self.list_files:
+#             yield LabelledCollection.load(file, loader_func=self.loader_fn, classes=self.classes, **self.loader_kwargs)
+#
+#     @property
+#     def list_files(self):
+#         if self._list_files is None:
+#             self._list_files = sorted(glob(self.folder_path, '*'))
+#         return self._list_files
+#
+#     def total(self):
+#         return len(self.list_files)
+
+
 class CovariateShiftPP(AbstractStochasticSeededProtocol):
     """
     Generates mixtures of two domains (A and B) at controlled rates, but preserving the original class prevalence.
diff --git a/quapy/tests/test_datasets.py b/quapy/tests/test_datasets.py
index 88209e8..8d70fe9 100644
--- a/quapy/tests/test_datasets.py
+++ b/quapy/tests/test_datasets.py
@@ -1,7 +1,8 @@
 import pytest
 
 from quapy.data.datasets import REVIEWS_SENTIMENT_DATASETS, TWITTER_SENTIMENT_DATASETS_TEST, \
-    TWITTER_SENTIMENT_DATASETS_TRAIN, UCI_DATASETS, fetch_reviews, fetch_twitter, fetch_UCIDataset
+    TWITTER_SENTIMENT_DATASETS_TRAIN, UCI_DATASETS, LEQUA2022_TASKS, \
+    fetch_reviews, fetch_twitter, fetch_UCIDataset, fetch_lequa2022
 
 
 @pytest.mark.parametrize('dataset_name', REVIEWS_SENTIMENT_DATASETS)
@@ -41,3 +42,13 @@ def test_fetch_UCIDataset(dataset_name):
     print('Training set stats')
     dataset.training.stats()
     print('Test set stats')
+
+
+@pytest.mark.parametrize('dataset_name', LEQUA2022_TASKS)
+def test_fetch_lequa2022(dataset_name):
+    fetch_lequa2022(dataset_name)
+    # dataset = fetch_lequa2022(dataset_name)
+    # print(f'Dataset {dataset_name}')
+    # print('Training set stats')
+    # dataset.training.stats()
+    # print('Test set stats')
\ No newline at end of file
diff --git a/quapy/tests/test_evaluation.py b/quapy/tests/test_evaluation.py
index de6603b..73dc485 100644
--- a/quapy/tests/test_evaluation.py
+++ b/quapy/tests/test_evaluation.py
@@ -2,8 +2,8 @@ import unittest
 import quapy as qp
 from sklearn.linear_model import LogisticRegression
 from time import time
-from method.aggregative import EMQ
-from method.base import BaseQuantifier
+from quapy.method.aggregative import EMQ
+from quapy.method.base import BaseQuantifier
 
 
 class EvalTestCase(unittest.TestCase):
@@ -12,7 +12,7 @@ class EvalTestCase(unittest.TestCase):
         data = qp.datasets.fetch_reviews('hp', tfidf=True, min_df=10, pickle=True)
         train, test = data.training, data.test
 
-        protocol = qp.protocol.APP(test, sample_size=1000, n_prevalences=21, repeats=1, random_seed=1)
+        protocol = qp.protocol.APP(test, sample_size=1000, n_prevalences=11, repeats=1, random_seed=1)
 
         class SlowLR(LogisticRegression):
             def predict_proba(self, X):
@@ -23,7 +23,7 @@ class EvalTestCase(unittest.TestCase):
         emq = EMQ(SlowLR()).fit(train)
 
         tinit = time()
-        score = qp.evaluation.evaluate(emq, protocol, error_metric='mae', verbose=True)
+        score = qp.evaluation.evaluate(emq, protocol, error_metric='mae', verbose=True, aggr_speedup='force')
         tend_optim = time()-tinit
         print(f'evaluation (with optimization) took {tend_optim}s [MAE={score:.4f}]')
 
@@ -50,7 +50,7 @@ class EvalTestCase(unittest.TestCase):
         tend_no_optim = time() - tinit
         print(f'evaluation (w/o optimization) took {tend_no_optim}s [MAE={score:.4f}]')
 
-        self.assertEqual(tend_no_optim>tend_optim, True)
+        self.assertEqual(tend_no_optim>(tend_optim/2), True)
 
 
 if __name__ == '__main__':
diff --git a/quapy/tests/test_modsel.py b/quapy/tests/test_modsel.py
index 637f831..9c6604a 100644
--- a/quapy/tests/test_modsel.py
+++ b/quapy/tests/test_modsel.py
@@ -8,6 +8,7 @@ import quapy as qp
 from method.aggregative import PACC
 from model_selection import GridSearchQ
 from protocol import APP
+import time
 
 
 class ModselTestCase(unittest.TestCase):
@@ -18,7 +19,6 @@ class ModselTestCase(unittest.TestCase):
 
         data = qp.datasets.fetch_reviews('imdb', tfidf=True, min_df=10)
         training, validation = data.training.split_stratified(0.7, random_state=1)
-        # test = data.test
 
         param_grid = {'C': np.logspace(-3,3,7)}
         app = APP(validation, sample_size=100, random_seed=1)
@@ -50,6 +50,37 @@ class ModselTestCase(unittest.TestCase):
         self.assertEqual(q.best_params_['C'], 10.0)
         self.assertEqual(q.best_model().get_params()['C'], 10.0)
 
+    def test_modsel_parallel_speedup(self):
+        class SlowLR(LogisticRegression):
+            def fit(self, X, y, sample_weight=None):
+                time.sleep(1)
+                return super(SlowLR, self).fit(X, y, sample_weight)
+
+        q = PACC(SlowLR(random_state=1, max_iter=5000))
+
+        data = qp.datasets.fetch_reviews('imdb', tfidf=True, min_df=10)
+        training, validation = data.training.split_stratified(0.7, random_state=1)
+
+        param_grid = {'C': np.logspace(-3, 3, 7)}
+        app = APP(validation, sample_size=100, random_seed=1)
+
+        tinit = time.time()
+        GridSearchQ(
+            q, param_grid, protocol=app, error='mae', refit=False, timeout=-1, n_jobs=1, verbose=True
+        ).fit(training)
+        tend_nooptim = time.time()-tinit
+
+        tinit = time.time()
+        GridSearchQ(
+            q, param_grid, protocol=app, error='mae', refit=False, timeout=-1, n_jobs=-1, verbose=True
+        ).fit(training)
+        tend_optim = time.time() - tinit
+
+        print(f'parallel training took {tend_optim:.4f}s')
+        print(f'sequential training took {tend_nooptim:.4f}s')
+
+        self.assertEqual(tend_optim < (0.5*tend_nooptim), True)
+
     def test_modsel_timeout(self):
 
         class SlowLR(LogisticRegression):
diff --git a/quapy/tests/test_protocols.py b/quapy/tests/test_protocols.py
index bf92ce5..b68567b 100644
--- a/quapy/tests/test_protocols.py
+++ b/quapy/tests/test_protocols.py
@@ -1,7 +1,7 @@
 import unittest
 import numpy as np
-from data import LabelledCollection
-from protocol import APP, NPP, USimplexPP, CovariateShiftPP, AbstractStochasticSeededProtocol
+from quapy.data import LabelledCollection
+from quapy.protocol import APP, NPP, USimplexPP, CovariateShiftPP, AbstractStochasticSeededProtocol
 
 
 def mock_labelled_collection(prefix=''):
@@ -134,6 +134,5 @@ class TestProtocols(unittest.TestCase):
             print('done')
 
 
-
 if __name__ == '__main__':
     unittest.main()
diff --git a/quapy/util.py b/quapy/util.py
index 9d44633..952c2da 100644
--- a/quapy/util.py
+++ b/quapy/util.py
@@ -46,6 +46,7 @@ def parallel(func, args, n_jobs):
 
     that takes the `quapy.environ` variable as input silently
     """
+    print('n_jobs',n_jobs)
     def func_dec(environ, *args):
         qp.environ = environ
         return func(*args)