diff --git a/quapy/CHANGE_LOG.txt b/quapy/CHANGE_LOG.txt index ab03b01..095bb76 100644 --- a/quapy/CHANGE_LOG.txt +++ b/quapy/CHANGE_LOG.txt @@ -18,6 +18,8 @@ procedure. The user can now specify "force", "auto", True of False, in order to actively decide for applying it or not. +- n_jobs is now taken from the environment if set to None + Things to fix: - clean functions like binary, aggregative, probabilistic, etc; those should be resolved via isinstance(): this is not working; I don't know how to make the isinstance work. Looks like there is some problem with the @@ -29,7 +31,7 @@ Things to fix: - Policies should be able to set their output to "labelled_collection" or "instances_prevalence" or something similar. - Policies should implement the "gen()" one, taking a reader function as an input, and a folder path maybe - Review all documentation, redo the Sphinx doc, update Wikis... -- Resolve the OneVsAll thing (it is in base.py and in aggregative.py +- Resolve the OneVsAll thing (it is in base.py and in aggregative.py) - Better handle the environment (e.g., with n_jobs) - test cross_generate_predictions and cancel cross_generate_predictions_depr - Add a proper log? diff --git a/quapy/__init__.py b/quapy/__init__.py index 2ef4c5c..54b1603 100644 --- a/quapy/__init__.py +++ b/quapy/__init__.py @@ -18,7 +18,14 @@ environ = { 'UNK_INDEX': 0, 'PAD_TOKEN': '[PAD]', 'PAD_INDEX': 1, - 'SVMPERF_HOME': './svm_perf_quantification' + 'SVMPERF_HOME': './svm_perf_quantification', + 'N_JOBS': 1 } +def get_njobs(n_jobs): + return environ['N_JOBS'] if n_jobs is None else n_jobs + + + + diff --git a/quapy/data/preprocessing.py b/quapy/data/preprocessing.py index f04f010..a987900 100644 --- a/quapy/data/preprocessing.py +++ b/quapy/data/preprocessing.py @@ -169,7 +169,7 @@ class IndexTransformer: self.pad = self.add_word(qp.environ['PAD_TOKEN'], qp.environ['PAD_INDEX']) return self - def transform(self, X, n_jobs=-1): + def transform(self, X, n_jobs=None): """ Transforms the strings in `X` as lists of numerical ids @@ -179,6 +179,7 @@ class IndexTransformer: """ # given the number of tasks and the number of jobs, generates the slices for the parallel processes assert self.unk != -1, 'transform called before fit' + n_jobs = qp.get_njobs(n_jobs) indexed = map_parallel(func=self._index, args=X, n_jobs=n_jobs) return np.asarray(indexed) @@ -186,7 +187,7 @@ class IndexTransformer: vocab = self.vocabulary_.copy() return [[vocab.prevalence(word, self.unk) for word in self.analyzer(doc)] for doc in tqdm(documents, 'indexing')] - def fit_transform(self, X, n_jobs=-1): + def fit_transform(self, X, n_jobs=None): """ Fits the transform on `X` and transforms it. diff --git a/quapy/method/aggregative.py b/quapy/method/aggregative.py index ca4b25c..c2f4717 100644 --- a/quapy/method/aggregative.py +++ b/quapy/method/aggregative.py @@ -207,6 +207,8 @@ def cross_generate_predictions( n_jobs ): + n_jobs = qp.get_njobs(n_jobs) + if isinstance(val_split, int): assert fit_learner == True, \ 'the parameters for the adjustment cannot be estimated with kFCV with fit_learner=False' @@ -331,10 +333,10 @@ class ACC(AggregativeQuantifier): :class:`quapy.data.base.LabelledCollection` (the split itself). """ - def __init__(self, learner: BaseEstimator, val_split=0.4, n_jobs=1): + def __init__(self, learner: BaseEstimator, val_split=0.4, n_jobs=None): self.learner = learner self.val_split = val_split - self.n_jobs = n_jobs + self.n_jobs = qp.get_njobs(n_jobs) def fit(self, data: LabelledCollection, fit_learner=True, val_split: Union[float, int, LabelledCollection] = None): """ @@ -437,10 +439,10 @@ class PACC(AggregativeProbabilisticQuantifier): :class:`quapy.data.base.LabelledCollection` (the split itself). """ - def __init__(self, learner: BaseEstimator, val_split=0.4, n_jobs=1): + def __init__(self, learner: BaseEstimator, val_split=0.4, n_jobs=None): self.learner = learner self.val_split = val_split - self.n_jobs = n_jobs + self.n_jobs = qp.get_njobs(n_jobs) def fit(self, data: LabelledCollection, fit_learner=True, val_split: Union[float, int, LabelledCollection] = None): """ @@ -769,10 +771,10 @@ class ThresholdOptimization(AggregativeQuantifier, BinaryQuantifier): :class:`quapy.data.base.LabelledCollection` (the split itself). """ - def __init__(self, learner: BaseEstimator, val_split=0.4, n_jobs=1): + def __init__(self, learner: BaseEstimator, val_split=0.4, n_jobs=None): self.learner = learner self.val_split = val_split - self.n_jobs = n_jobs + self.n_jobs = qp.get_njobs(n_jobs) def fit(self, data: LabelledCollection, fit_learner=True, val_split: Union[float, int, LabelledCollection] = None): self._check_binary(data, "Threshold Optimization") @@ -1022,13 +1024,13 @@ class OneVsAll(AggregativeQuantifier): :param n_jobs: number of parallel workers """ - def __init__(self, binary_quantifier, n_jobs=-1): + def __init__(self, binary_quantifier, n_jobs=None): assert isinstance(self.binary_quantifier, BaseQuantifier), \ f'{self.binary_quantifier} does not seem to be a Quantifier' assert isinstance(self.binary_quantifier, AggregativeQuantifier), \ f'{self.binary_quantifier} does not seem to be of type Aggregative' self.binary_quantifier = binary_quantifier - self.n_jobs = n_jobs + self.n_jobs = qp.get_njobs(n_jobs) def fit(self, data: LabelledCollection, fit_learner=True): assert not data.binary, \ diff --git a/quapy/method/base.py b/quapy/method/base.py index 6c2a0c5..c935735 100644 --- a/quapy/method/base.py +++ b/quapy/method/base.py @@ -1,6 +1,6 @@ from abc import ABCMeta, abstractmethod from copy import deepcopy - +import quapy as qp from quapy.data import LabelledCollection @@ -63,17 +63,18 @@ class BinaryQuantifier(BaseQuantifier): assert data.binary, f'{quantifier_name} works only on problems of binary classification. ' \ f'Use the class OneVsAll to enable {quantifier_name} work on single-label data.' + class OneVsAllGeneric: """ Allows any binary quantifier to perform quantification on single-label datasets. The method maintains one binary quantifier for each class, and then l1-normalizes the outputs so that the class prevelences sum up to 1. """ - def __init__(self, binary_quantifier, n_jobs=1): + def __init__(self, binary_quantifier, n_jobs=None): assert isinstance(binary_quantifier, BaseQuantifier), \ f'{binary_quantifier} does not seem to be a Quantifier' self.binary_quantifier = binary_quantifier - self.n_jobs = n_jobs + self.n_jobs = qp.get_njobs(n_jobs) def fit(self, data: LabelledCollection, **kwargs): assert not data.binary, \ diff --git a/quapy/method/meta.py b/quapy/method/meta.py index d5e8c2a..5e084e5 100644 --- a/quapy/method/meta.py +++ b/quapy/method/meta.py @@ -72,7 +72,7 @@ class Ensemble(BaseQuantifier): policy='ave', max_sample_size=None, val_split:Union[qp.data.LabelledCollection, float]=None, - n_jobs=1, + n_jobs=None, verbose=False): assert policy in Ensemble.VALID_POLICIES, \ f'unknown policy={policy}; valid are {Ensemble.VALID_POLICIES}' @@ -84,7 +84,7 @@ class Ensemble(BaseQuantifier): self.red_size = red_size self.policy = policy self.val_split = val_split - self.n_jobs = n_jobs + self.n_jobs = qp.get_njobs(n_jobs) self.post_proba_fn = None self.verbose = verbose self.max_sample_size = max_sample_size diff --git a/quapy/model_selection.py b/quapy/model_selection.py index 7d71023..c227db8 100644 --- a/quapy/model_selection.py +++ b/quapy/model_selection.py @@ -37,7 +37,7 @@ class GridSearchQ(BaseQuantifier): error: Union[Callable, str] = qp.error.mae, refit=True, timeout=-1, - n_jobs=1, + n_jobs=None, verbose=False): self.model = model @@ -45,7 +45,7 @@ class GridSearchQ(BaseQuantifier): self.protocol = protocol self.refit = refit self.timeout = timeout - self.n_jobs = n_jobs + self.n_jobs = qp.get_njobs(n_jobs) self.verbose = verbose self.__check_error(error) assert isinstance(protocol, AbstractProtocol), 'unknown protocol' @@ -76,7 +76,6 @@ class GridSearchQ(BaseQuantifier): params_values = list(self.param_grid.values()) protocol = self.protocol - n_jobs = self.n_jobs self.param_scores_ = {} self.best_score_ = None @@ -84,7 +83,7 @@ class GridSearchQ(BaseQuantifier): tinit = time() hyper = [dict({k: values[i] for i, k in enumerate(params_keys)}) for values in itertools.product(*params_values)] - scores = qp.util.parallel(self._delayed_eval, ((params, training) for params in hyper), n_jobs=n_jobs) + scores = qp.util.parallel(self._delayed_eval, ((params, training) for params in hyper), n_jobs=self.n_jobs) for params, score, model in scores: if score is not None: diff --git a/quapy/util.py b/quapy/util.py index 952c2da..259178e 100644 --- a/quapy/util.py +++ b/quapy/util.py @@ -11,7 +11,7 @@ import numpy as np from joblib import Parallel, delayed -def _get_parallel_slices(n_tasks, n_jobs=-1): +def _get_parallel_slices(n_tasks, n_jobs): if n_jobs == -1: n_jobs = multiprocessing.cpu_count() batch = int(n_tasks / n_jobs) @@ -48,7 +48,9 @@ def parallel(func, args, n_jobs): """ print('n_jobs',n_jobs) def func_dec(environ, *args): - qp.environ = environ + qp.environ = environ.copy() + qp.environ['N_JOBS'] = 1 + print(f'setting n_jobs from {environ["N_JOBS"]} to 1') return func(*args) return Parallel(n_jobs=n_jobs)( delayed(func_dec)(qp.environ, args_i) for args_i in args