1
0
Fork 0
QuaPy/quapy/method/aggregative.py

834 lines
34 KiB
Python
Raw Normal View History

from abc import abstractmethod
2021-01-15 18:32:32 +01:00
from copy import deepcopy
from typing import Union
2021-05-05 17:12:44 +02:00
2021-01-15 18:32:32 +01:00
import numpy as np
from joblib import Parallel, delayed
from sklearn.base import BaseEstimator
from sklearn.calibration import CalibratedClassifierCV
from sklearn.metrics import confusion_matrix
2021-01-11 12:55:06 +01:00
from sklearn.model_selection import StratifiedKFold
from tqdm import tqdm
2021-05-05 17:12:44 +02:00
2021-01-18 10:53:22 +01:00
import quapy as qp
2021-01-15 18:32:32 +01:00
import quapy.functional as F
from quapy.classification.svmperf import SVMperf
from quapy.data import LabelledCollection
from quapy.method.base import BaseQuantifier, BinaryQuantifier
2020-12-03 18:12:28 +01:00
# Abstract classes
# ------------------------------------
class AggregativeQuantifier(BaseQuantifier):
"""
Abstract class for quantification methods that base their estimations on the aggregation of classification
results. Aggregative Quantifiers thus implement a _classify_ method and maintain a _learner_ attribute.
2020-12-03 18:12:28 +01:00
"""
@abstractmethod
def fit(self, data: LabelledCollection, fit_learner=True): ...
2020-12-03 18:12:28 +01:00
@property
def learner(self):
return self.learner_
@learner.setter
def learner(self, value):
self.learner_ = value
2021-12-15 15:27:43 +01:00
def preclassify(self, instances):
return self.classify(instances)
def classify(self, instances):
return self.learner.predict(instances)
2020-12-03 18:12:28 +01:00
def quantify(self, instances):
classif_predictions = self.classify(instances)
return self.aggregate(classif_predictions)
@abstractmethod
def aggregate(self, classif_predictions: np.ndarray): ...
2020-12-03 18:12:28 +01:00
def get_params(self, deep=True):
return self.learner.get_params()
def set_params(self, **parameters):
2021-12-15 15:27:43 +01:00
self.learner.set_params(**parameters)
2020-12-03 18:12:28 +01:00
@property
def n_classes(self):
return len(self.classes_)
2021-12-15 15:27:43 +01:00
@property
def classes_(self):
2020-12-03 18:12:28 +01:00
return self.learner.classes_
@property
def aggregative(self):
return True
2020-12-03 18:12:28 +01:00
class AggregativeProbabilisticQuantifier(AggregativeQuantifier):
"""
Abstract class for quantification methods that base their estimations on the aggregation of posterior probabilities
as returned by a probabilistic classifier. Aggregative Probabilistic Quantifiers thus extend Aggregative
2020-12-15 15:20:35 +01:00
Quantifiers by implementing a _posterior_probabilities_ method returning values in [0,1] -- the posterior
2020-12-03 18:12:28 +01:00
probabilities.
"""
def preclassify(self, instances):
return self.predict_proba(instances)
2021-01-18 10:53:22 +01:00
def posterior_probabilities(self, instances):
return self.learner.predict_proba(instances)
def predict_proba(self, instances):
return self.posterior_probabilities(instances)
2020-12-03 18:12:28 +01:00
def quantify(self, instances):
classif_posteriors = self.posterior_probabilities(instances)
return self.aggregate(classif_posteriors)
2020-12-03 18:12:28 +01:00
def set_params(self, **parameters):
if isinstance(self.learner, CalibratedClassifierCV):
2021-05-05 17:12:44 +02:00
parameters = {'base_estimator__' + k: v for k, v in parameters.items()}
2020-12-03 18:12:28 +01:00
self.learner.set_params(**parameters)
@property
def probabilistic(self):
return True
2020-12-03 18:12:28 +01:00
2020-12-03 18:12:28 +01:00
# Helper
# ------------------------------------
def training_helper(learner,
data: LabelledCollection,
fit_learner: bool = True,
ensure_probabilistic=False,
val_split: Union[LabelledCollection, float] = None):
2020-12-03 18:12:28 +01:00
"""
Training procedure common to all Aggregative Quantifiers.
:param learner: the learner to be fit
:param data: the data on which to fit the learner. If requested, the data will be split before fitting the learner.
:param fit_learner: whether or not to fit the learner (if False, then bypasses any action)
2020-12-03 18:12:28 +01:00
:param ensure_probabilistic: if True, guarantees that the resulting classifier implements predict_proba (if the
learner is not probabilistic, then a CalibratedCV instance of it is trained)
:param val_split: if specified as a float, indicates the proportion of training instances that will define the
validation split (e.g., 0.3 for using 30% of the training set as validation data); if specified as a
LabelledCollection, represents the validation split itself
2020-12-03 18:12:28 +01:00
:return: the learner trained on the training set, and the unused data (a _LabelledCollection_ if train_val_split>0
or None otherwise) to be used as a validation set for any subsequent parameter fitting
2020-12-03 18:12:28 +01:00
"""
if fit_learner:
if ensure_probabilistic:
if not hasattr(learner, 'predict_proba'):
print(f'The learner {learner.__class__.__name__} does not seem to be probabilistic. '
f'The learner will be calibrated.')
learner = CalibratedClassifierCV(learner, cv=5)
if val_split is not None:
if isinstance(val_split, float):
if not (0 < val_split < 1):
raise ValueError(f'train/val split {val_split} out of range, must be in (0,1)')
2021-05-05 17:12:44 +02:00
train, unused = data.split_stratified(train_prop=1 - val_split)
elif isinstance(val_split, LabelledCollection):
train = data
unused = val_split
else:
2021-05-05 17:12:44 +02:00
raise ValueError(
f'param "val_split" ({type(val_split)}) not understood; use either a float indicating the split '
'proportion, or a LabelledCollection indicating the validation split')
2020-12-03 18:12:28 +01:00
else:
train, unused = data, None
2021-01-18 10:53:22 +01:00
if isinstance(learner, BaseQuantifier):
learner.fit(train)
else:
learner.fit(*train.Xy)
2020-12-03 18:12:28 +01:00
else:
if ensure_probabilistic:
if not hasattr(learner, 'predict_proba'):
raise AssertionError('error: the learner cannot be calibrated since fit_learner is set to False')
unused = None
if isinstance(val_split, LabelledCollection):
unused = val_split
2020-12-03 18:12:28 +01:00
return learner, unused
# Methods
# ------------------------------------
2021-01-07 17:58:48 +01:00
class CC(AggregativeQuantifier):
2020-12-03 18:12:28 +01:00
"""
The most basic Quantification method. One that simply classifies all instances and countes how many have been
attributed each of the classes in order to compute class prevalence estimates.
2020-12-03 18:12:28 +01:00
"""
2021-05-05 17:12:44 +02:00
def __init__(self, learner: BaseEstimator):
2020-12-03 18:12:28 +01:00
self.learner = learner
def fit(self, data: LabelledCollection, fit_learner=True):
2020-12-03 18:12:28 +01:00
"""
Trains the Classify & Count method unless _fit_learner_ is False, in which case it is assumed to be already fit.
:param data: training data
2020-12-03 18:12:28 +01:00
:param fit_learner: if False, the classifier is assumed to be fit
:return: self
"""
self.learner, _ = training_helper(self.learner, data, fit_learner)
2020-12-03 18:12:28 +01:00
return self
def aggregate(self, classif_predictions):
2021-05-05 17:12:44 +02:00
return F.prevalence_from_labels(classif_predictions, self.classes_)
2020-12-03 18:12:28 +01:00
2021-01-07 17:58:48 +01:00
class ACC(AggregativeQuantifier):
2020-12-03 18:12:28 +01:00
2021-05-05 17:12:44 +02:00
def __init__(self, learner: BaseEstimator, val_split=0.4):
2020-12-03 18:12:28 +01:00
self.learner = learner
self.val_split = val_split
2020-12-03 18:12:28 +01:00
2021-05-05 17:12:44 +02:00
def fit(self, data: LabelledCollection, fit_learner=True, val_split: Union[float, int, LabelledCollection] = None):
"""
Trains a ACC quantifier
:param data: the training set
:param fit_learner: set to False to bypass the training (the learner is assumed to be already fit)
:param val_split: either a float in (0,1) indicating the proportion of training instances to use for
validation (e.g., 0.3 for using 30% of the training set as validation data), or a LabelledCollection
indicating the validation set itself, or an int indicating the number k of folds to be used in kFCV
to estimate the parameters
:return: self
"""
if val_split is None:
val_split = self.val_split
2021-01-11 12:55:06 +01:00
if isinstance(val_split, int):
assert fit_learner == True, \
'the parameters for the adjustment cannot be estimated with kFCV with fit_learner=False'
2021-01-11 12:55:06 +01:00
# kFCV estimation of parameters
y, y_ = [], []
kfcv = StratifiedKFold(n_splits=val_split)
pbar = tqdm(kfcv.split(*data.Xy), total=val_split)
for k, (training_idx, validation_idx) in enumerate(pbar):
pbar.set_description(f'{self.__class__.__name__} fitting fold {k}')
training = data.sampling_from_index(training_idx)
validation = data.sampling_from_index(validation_idx)
learner, val_data = training_helper(self.learner, training, fit_learner, val_split=validation)
2021-01-11 12:55:06 +01:00
y_.append(learner.predict(val_data.instances))
y.append(val_data.labels)
y = np.concatenate(y)
y_ = np.concatenate(y_)
class_count = data.counts()
# fit the learner on all data
self.learner, _ = training_helper(self.learner, data, fit_learner, val_split=None)
2021-01-11 12:55:06 +01:00
else:
self.learner, val_data = training_helper(self.learner, data, fit_learner, val_split=val_split)
2021-01-11 12:55:06 +01:00
y_ = self.learner.predict(val_data.instances)
y = val_data.labels
2021-01-07 17:58:48 +01:00
self.cc = CC(self.learner)
2021-01-11 12:55:06 +01:00
2020-12-03 18:12:28 +01:00
# estimate the matrix with entry (i,j) being the estimate of P(yi|yj), that is, the probability that a
# document that belongs to yj ends up being classified as belonging to yi
self.Pte_cond_estim_ = self.getPteCondEstim(data.classes_, y, y_)
2021-01-11 12:55:06 +01:00
2020-12-03 18:12:28 +01:00
return self
@classmethod
def getPteCondEstim(cls, classes, y, y_):
# estimate the matrix with entry (i,j) being the estimate of P(yi|yj), that is, the probability that a
# document that belongs to yj ends up being classified as belonging to yi
conf = confusion_matrix(y, y_, labels=classes).T
conf = conf.astype(np.float)
class_counts = conf.sum(axis=0)
for i, _ in enumerate(classes):
if class_counts[i] == 0:
conf[i, i] = 1
else:
conf[:, i] /= class_counts[i]
return conf
def classify(self, data):
return self.cc.classify(data)
def aggregate(self, classif_predictions):
prevs_estim = self.cc.aggregate(classif_predictions)
2021-01-07 17:58:48 +01:00
return ACC.solve_adjustment(self.Pte_cond_estim_, prevs_estim)
@classmethod
def solve_adjustment(cls, PteCondEstim, prevs_estim):
# solve for the linear system Ax = B with A=PteCondEstim and B = prevs_estim
A = PteCondEstim
2020-12-03 18:12:28 +01:00
B = prevs_estim
try:
adjusted_prevs = np.linalg.solve(A, B)
adjusted_prevs = np.clip(adjusted_prevs, 0, 1)
adjusted_prevs /= adjusted_prevs.sum()
except np.linalg.LinAlgError:
adjusted_prevs = prevs_estim # no way to adjust them!
return adjusted_prevs
2021-01-07 17:58:48 +01:00
class PCC(AggregativeProbabilisticQuantifier):
2021-01-18 10:53:22 +01:00
def __init__(self, learner: BaseEstimator):
2020-12-03 18:12:28 +01:00
self.learner = learner
2021-01-18 10:53:22 +01:00
def fit(self, data: LabelledCollection, fit_learner=True):
self.learner, _ = training_helper(self.learner, data, fit_learner, ensure_probabilistic=True)
2020-12-03 18:12:28 +01:00
return self
def aggregate(self, classif_posteriors):
return F.prevalence_from_probabilities(classif_posteriors, binarize=False)
2020-12-03 18:12:28 +01:00
2021-01-07 17:58:48 +01:00
class PACC(AggregativeProbabilisticQuantifier):
2020-12-03 18:12:28 +01:00
def __init__(self, learner: BaseEstimator, val_split=0.4):
2020-12-03 18:12:28 +01:00
self.learner = learner
self.val_split = val_split
2020-12-03 18:12:28 +01:00
2021-05-05 17:12:44 +02:00
def fit(self, data: LabelledCollection, fit_learner=True, val_split: Union[float, int, LabelledCollection] = None):
"""
Trains a PACC quantifier
:param data: the training set
:param fit_learner: set to False to bypass the training (the learner is assumed to be already fit)
:param val_split: either a float in (0,1) indicating the proportion of training instances to use for
validation (e.g., 0.3 for using 30% of the training set as validation data), or a LabelledCollection
2021-01-11 12:55:06 +01:00
indicating the validation set itself, or an int indicating the number k of folds to be used in kFCV
to estimate the parameters
:return: self
"""
if val_split is None:
val_split = self.val_split
2021-01-11 12:55:06 +01:00
if isinstance(val_split, int):
assert fit_learner == True, \
'the parameters for the adjustment cannot be estimated with kFCV with fit_learner=False'
2021-01-11 12:55:06 +01:00
# kFCV estimation of parameters
y, y_ = [], []
kfcv = StratifiedKFold(n_splits=val_split)
pbar = tqdm(kfcv.split(*data.Xy), total=val_split)
for k, (training_idx, validation_idx) in enumerate(pbar):
pbar.set_description(f'{self.__class__.__name__} fitting fold {k}')
training = data.sampling_from_index(training_idx)
validation = data.sampling_from_index(validation_idx)
learner, val_data = training_helper(
2021-01-11 12:55:06 +01:00
self.learner, training, fit_learner, ensure_probabilistic=True, val_split=validation)
y_.append(learner.predict_proba(val_data.instances))
y.append(val_data.labels)
y = np.concatenate(y)
y_ = np.vstack(y_)
# fit the learner on all data
self.learner, _ = training_helper(self.learner, data, fit_learner, ensure_probabilistic=True,
val_split=None)
classes = data.classes_
2021-01-11 12:55:06 +01:00
else:
self.learner, val_data = training_helper(
2021-01-11 12:55:06 +01:00
self.learner, data, fit_learner, ensure_probabilistic=True, val_split=val_split)
y_ = self.learner.predict_proba(val_data.instances)
y = val_data.labels
classes = val_data.classes_
2021-01-11 12:55:06 +01:00
2021-01-07 17:58:48 +01:00
self.pcc = PCC(self.learner)
2021-01-11 12:55:06 +01:00
self.Pte_cond_estim_ = self.getPteCondEstim(classes, y, y_)
return self
@classmethod
def getPteCondEstim(cls, classes, y, y_):
2021-01-11 12:55:06 +01:00
# estimate the matrix with entry (i,j) being the estimate of P(yi|yj), that is, the probability that a
# document that belongs to yj ends up being classified as belonging to yi
n_classes = len(classes)
# confusion = np.zeros(shape=(n_classes, n_classes))
confusion = np.eye(n_classes)
for i, class_ in enumerate(classes):
idx = y == class_
if idx.any():
confusion[i] = y_[idx].mean(axis=0)
return confusion.T
2020-12-03 18:12:28 +01:00
def aggregate(self, classif_posteriors):
prevs_estim = self.pcc.aggregate(classif_posteriors)
2021-01-07 17:58:48 +01:00
return ACC.solve_adjustment(self.Pte_cond_estim_, prevs_estim)
2020-12-03 18:12:28 +01:00
def classify(self, data):
return self.pcc.classify(data)
2021-01-07 17:58:48 +01:00
class EMQ(AggregativeProbabilisticQuantifier):
2021-02-16 19:38:52 +01:00
"""
The method is described in:
Saerens, M., Latinne, P., and Decaestecker, C. (2002).
Adjusting the outputs of a classifier to new a priori probabilities: A simple procedure.
Neural Computation, 14(1): 2141.
2021-02-16 19:38:52 +01:00
"""
2020-12-03 18:12:28 +01:00
MAX_ITER = 1000
EPSILON = 1e-4
2021-01-11 12:55:06 +01:00
def __init__(self, learner: BaseEstimator):
2020-12-03 18:12:28 +01:00
self.learner = learner
def fit(self, data: LabelledCollection, fit_learner=True):
self.learner, _ = training_helper(self.learner, data, fit_learner, ensure_probabilistic=True)
2021-05-05 17:12:44 +02:00
self.train_prevalence = F.prevalence_from_labels(data.labels, self.classes_)
2020-12-03 18:12:28 +01:00
return self
def aggregate(self, classif_posteriors, epsilon=EPSILON):
2021-01-18 10:53:22 +01:00
priors, posteriors = self.EM(self.train_prevalence, classif_posteriors, epsilon)
return priors
def predict_proba(self, instances, epsilon=EPSILON):
classif_posteriors = self.learner.predict_proba(instances)
priors, posteriors = self.EM(self.train_prevalence, classif_posteriors, epsilon)
return posteriors
2020-12-03 18:12:28 +01:00
@classmethod
2020-12-15 15:20:35 +01:00
def EM(cls, tr_prev, posterior_probabilities, epsilon=EPSILON):
2020-12-03 18:12:28 +01:00
Px = posterior_probabilities
Ptr = np.copy(tr_prev)
qs = np.copy(Ptr) # qs (the running estimate) is initialized as the training prevalence
s, converged = 0, False
qs_prev_ = None
2021-01-07 17:58:48 +01:00
while not converged and s < EMQ.MAX_ITER:
2021-03-11 19:00:40 +01:00
# E-step: ps is Ps(y|xi)
2020-12-03 18:12:28 +01:00
ps_unnormalized = (qs / Ptr) * Px
2021-03-11 19:00:40 +01:00
ps = ps_unnormalized / ps_unnormalized.sum(axis=1, keepdims=True)
2020-12-03 18:12:28 +01:00
2021-03-11 19:00:40 +01:00
# M-step:
2020-12-03 18:12:28 +01:00
qs = ps.mean(axis=0)
2021-05-05 17:12:44 +02:00
if qs_prev_ is not None and qp.error.mae(qs, qs_prev_) < epsilon and s > 10:
2020-12-03 18:12:28 +01:00
converged = True
qs_prev_ = qs
s += 1
2020-12-03 18:12:28 +01:00
if not converged:
2021-01-25 09:02:11 +01:00
print('[warning] the method has reached the maximum number of iterations; it might have not converged')
2020-12-03 18:12:28 +01:00
2021-01-18 10:53:22 +01:00
return qs, ps
2020-12-03 18:12:28 +01:00
2021-01-07 17:58:48 +01:00
class HDy(AggregativeProbabilisticQuantifier, BinaryQuantifier):
"""
Implementation of the method based on the Hellinger Distance y (HDy) proposed by
González-Castro, V., Alaiz-Rodrı́guez, R., and Alegre, E. (2013). Class distribution
estimation based on the Hellinger distance. Information Sciences, 218:146164.
"""
2020-12-03 18:12:28 +01:00
def __init__(self, learner: BaseEstimator, val_split=0.4):
self.learner = learner
self.val_split = val_split
2020-12-03 18:12:28 +01:00
2021-05-05 17:12:44 +02:00
def fit(self, data: LabelledCollection, fit_learner=True, val_split: Union[float, LabelledCollection] = None):
"""
Trains a HDy quantifier
:param data: the training set
:param fit_learner: set to False to bypass the training (the learner is assumed to be already fit)
:param val_split: either a float in (0,1) indicating the proportion of training instances to use for
validation (e.g., 0.3 for using 30% of the training set as validation data), or a LabelledCollection
indicating the validation set itself
:return: self
"""
if val_split is None:
val_split = self.val_split
self._check_binary(data, self.__class__.__name__)
self.learner, validation = training_helper(
self.learner, data, fit_learner, ensure_probabilistic=True, val_split=val_split)
2021-05-05 17:12:44 +02:00
Px = self.posterior_probabilities(validation.instances)[:, 1] # takes only the P(y=+1|x)
self.Pxy1 = Px[validation.labels == self.learner.classes_[1]]
self.Pxy0 = Px[validation.labels == self.learner.classes_[0]]
# pre-compute the histogram for positive and negative examples
2021-05-05 17:12:44 +02:00
self.bins = np.linspace(10, 110, 11, dtype=int) # [10, 20, 30, ..., 100, 110]
self.Pxy1_density = {bins: np.histogram(self.Pxy1, bins=bins, range=(0, 1), density=True)[0] for bins in
self.bins}
self.Pxy0_density = {bins: np.histogram(self.Pxy0, bins=bins, range=(0, 1), density=True)[0] for bins in
self.bins}
return self
2020-12-03 18:12:28 +01:00
def aggregate(self, classif_posteriors):
# "In this work, the number of bins b used in HDx and HDy was chosen from 10 to 110 in steps of 10,
# and the final estimated a priori probability was taken as the median of these 11 estimates."
# (González-Castro, et al., 2013).
2020-12-03 18:12:28 +01:00
2021-05-05 17:12:44 +02:00
Px = classif_posteriors[:, 1] # takes only the P(y=+1|x)
2020-12-03 18:12:28 +01:00
prev_estimations = []
2021-05-05 17:12:44 +02:00
# for bins in np.linspace(10, 110, 11, dtype=int): #[10, 20, 30, ..., 100, 110]
# Pxy0_density, _ = np.histogram(self.Pxy0, bins=bins, range=(0, 1), density=True)
# Pxy1_density, _ = np.histogram(self.Pxy1, bins=bins, range=(0, 1), density=True)
for bins in self.bins:
Pxy0_density = self.Pxy0_density[bins]
Pxy1_density = self.Pxy1_density[bins]
2020-12-03 18:12:28 +01:00
Px_test, _ = np.histogram(Px, bins=bins, range=(0, 1), density=True)
prev_selected, min_dist = None, None
for prev in F.prevalence_linspace(n_prevalences=100, repeat=1, smooth_limits_epsilon=0.0):
2021-05-05 17:12:44 +02:00
Px_train = prev * Pxy1_density + (1 - prev) * Pxy0_density
hdy = F.HellingerDistance(Px_train, Px_test)
if prev_selected is None or hdy < min_dist:
prev_selected, min_dist = prev, hdy
prev_estimations.append(prev_selected)
2021-05-05 17:12:44 +02:00
class1_prev = np.median(prev_estimations)
return np.asarray([1 - class1_prev, class1_prev])
2021-01-07 17:58:48 +01:00
class ELM(AggregativeQuantifier, BinaryQuantifier):
2020-12-15 15:20:35 +01:00
2021-02-16 19:38:52 +01:00
def __init__(self, svmperf_base=None, loss='01', **kwargs):
self.svmperf_base = svmperf_base if svmperf_base is not None else qp.environ['SVMPERF_HOME']
2020-12-15 15:20:35 +01:00
self.loss = loss
self.kwargs = kwargs
2021-01-15 08:33:39 +01:00
self.learner = SVMperf(self.svmperf_base, loss=self.loss, **self.kwargs)
2020-12-15 15:20:35 +01:00
def fit(self, data: LabelledCollection, fit_learner=True):
self._check_binary(data, self.__class__.__name__)
2020-12-15 15:20:35 +01:00
assert fit_learner, 'the method requires that fit_learner=True'
2021-01-15 08:33:39 +01:00
self.learner.fit(data.instances, data.labels)
2020-12-15 15:20:35 +01:00
return self
2021-05-05 17:12:44 +02:00
def aggregate(self, classif_predictions: np.ndarray):
return F.prevalence_from_labels(classif_predictions, self.classes_)
2020-12-15 15:20:35 +01:00
def classify(self, X, y=None):
return self.learner.predict(X)
2021-01-07 17:58:48 +01:00
class SVMQ(ELM):
2021-02-16 19:38:52 +01:00
"""
Barranquero, J., Díez, J., and del Coz, J. J. (2015).
Quantification-oriented learning based on reliable classifiers.
Pattern Recognition, 48(2):591604.
2021-02-16 19:38:52 +01:00
"""
2021-05-05 17:12:44 +02:00
2021-02-16 19:38:52 +01:00
def __init__(self, svmperf_base=None, **kwargs):
2020-12-15 15:20:35 +01:00
super(SVMQ, self).__init__(svmperf_base, loss='q', **kwargs)
2021-01-07 17:58:48 +01:00
class SVMKLD(ELM):
2021-02-16 19:38:52 +01:00
"""
Esuli, A. and Sebastiani, F. (2015).
Optimizing text quantifiers for multivariate loss functions.
ACM Transactions on Knowledge Discovery and Data, 9(4):Article 27.
2021-02-16 19:38:52 +01:00
"""
2021-05-05 17:12:44 +02:00
2021-02-16 19:38:52 +01:00
def __init__(self, svmperf_base=None, **kwargs):
2020-12-15 15:20:35 +01:00
super(SVMKLD, self).__init__(svmperf_base, loss='kld', **kwargs)
2021-01-07 17:58:48 +01:00
class SVMNKLD(ELM):
2021-02-16 19:38:52 +01:00
"""
Esuli, A. and Sebastiani, F. (2015).
Optimizing text quantifiers for multivariate loss functions.
ACM Transactions on Knowledge Discovery and Data, 9(4):Article 27.
2021-02-16 19:38:52 +01:00
"""
2021-05-05 17:12:44 +02:00
2021-02-16 19:38:52 +01:00
def __init__(self, svmperf_base=None, **kwargs):
2020-12-15 15:20:35 +01:00
super(SVMNKLD, self).__init__(svmperf_base, loss='nkld', **kwargs)
2021-01-07 17:58:48 +01:00
class SVMAE(ELM):
2021-02-16 19:38:52 +01:00
def __init__(self, svmperf_base=None, **kwargs):
2020-12-15 15:20:35 +01:00
super(SVMAE, self).__init__(svmperf_base, loss='mae', **kwargs)
2021-01-07 17:58:48 +01:00
class SVMRAE(ELM):
2021-02-16 19:38:52 +01:00
def __init__(self, svmperf_base=None, **kwargs):
2020-12-15 15:20:35 +01:00
super(SVMRAE, self).__init__(svmperf_base, loss='mrae', **kwargs)
class ThresholdOptimization(AggregativeQuantifier, BinaryQuantifier):
def __init__(self, learner: BaseEstimator, val_split=0.4):
self.learner = learner
self.val_split = val_split
@abstractmethod
def optimize_threshold(self, y, probabilities):
...
def fit(self, data: LabelledCollection, fit_learner=True, val_split: Union[float, int, LabelledCollection] = None):
self._check_binary(data, "Threshold Optimization")
if val_split is None:
val_split = self.val_split
if isinstance(val_split, int):
assert fit_learner == True, \
'the parameters for the adjustment cannot be estimated with kFCV with fit_learner=False'
# kFCV estimation of parameters
y, probabilities = [], []
kfcv = StratifiedKFold(n_splits=val_split)
pbar = tqdm(kfcv.split(*data.Xy), total=val_split)
for k, (training_idx, validation_idx) in enumerate(pbar):
pbar.set_description(f'{self.__class__.__name__} fitting fold {k}')
training = data.sampling_from_index(training_idx)
validation = data.sampling_from_index(validation_idx)
learner, val_data = training_helper(self.learner, training, fit_learner, val_split=validation)
probabilities.append(learner.predict_proba(val_data.instances))
y.append(val_data.labels)
y = np.concatenate(y)
probabilities = np.concatenate(probabilities)
# fit the learner on all data
self.learner, _ = training_helper(self.learner, data, fit_learner, val_split=None)
else:
self.learner, val_data = training_helper(self.learner, data, fit_learner, val_split=val_split)
probabilities = self.learner.predict_proba(val_data.instances)
y = val_data.labels
self.cc = CC(self.learner)
self.tpr, self.fpr = self.optimize_threshold(y, probabilities)
return self
@abstractmethod
def _condition(self, tpr, fpr) -> float:
"""
Implements the criterion according to which the threshold should be selected.
This function should return a (float) score to be minimized.
"""
...
def optimize_threshold(self, y, probabilities):
best_candidate_threshold_score = None
best_tpr = 0
best_fpr = 0
candidate_thresholds = np.unique(probabilities[:, 1])
for candidate_threshold in candidate_thresholds:
y_ = [self.classes_[1] if p > candidate_threshold else self.classes_[0] for p in probabilities[:, 1]]
TP, FP, FN, TN = self.compute_table(y, y_)
tpr = self.compute_tpr(TP, FP)
fpr = self.compute_fpr(FP, TN)
condition_score = self._condition(tpr, fpr)
if best_candidate_threshold_score is None or condition_score < best_candidate_threshold_score:
best_candidate_threshold_score = condition_score
best_tpr = tpr
best_fpr = fpr
return best_tpr, best_fpr
def aggregate(self, classif_predictions):
prevs_estim = self.cc.aggregate(classif_predictions)
if self.tpr - self.fpr == 0:
return prevs_estim
adjusted_prevs_estim = np.clip((prevs_estim[1] - self.fpr) / (self.tpr - self.fpr), 0, 1)
adjusted_prevs_estim = np.array((1 - adjusted_prevs_estim, adjusted_prevs_estim))
return adjusted_prevs_estim
def compute_table(self, y, y_):
TP = np.logical_and(y == y_, y == self.classes_[1]).sum()
FP = np.logical_and(y != y_, y == self.classes_[0]).sum()
FN = np.logical_and(y != y_, y == self.classes_[1]).sum()
TN = np.logical_and(y == y_, y == self.classes_[0]).sum()
return TP, FP, FN, TN
def compute_tpr(self, TP, FP):
if TP + FP == 0:
return 0
return TP / (TP + FP)
def compute_fpr(self, FP, TN):
if FP + TN == 0:
return 0
return FP / (FP + TN)
class T50(ThresholdOptimization):
def __init__(self, learner: BaseEstimator, val_split=0.4):
super().__init__(learner, val_split)
def _condition(self, tpr, fpr) -> float:
return abs(tpr - 0.5)
class MAX(ThresholdOptimization):
def __init__(self, learner: BaseEstimator, val_split=0.4):
super().__init__(learner, val_split)
def _condition(self, tpr, fpr) -> float:
# MAX strives to maximize (tpr - fpr), which is equivalent to minimize (fpr - tpr)
return (fpr - tpr)
class X(ThresholdOptimization):
def __init__(self, learner: BaseEstimator, val_split=0.4):
super().__init__(learner, val_split)
def _condition(self, tpr, fpr) -> float:
return abs(1 - (tpr + fpr))
class MS(ThresholdOptimization):
def __init__(self, learner: BaseEstimator, val_split=0.4):
super().__init__(learner, val_split)
def _condition(self, tpr, fpr) -> float:
pass
def optimize_threshold(self, y, probabilities):
tprs = []
fprs = []
candidate_thresholds = np.unique(probabilities[:, 1])
for candidate_threshold in candidate_thresholds:
y_ = [self.classes_[1] if p > candidate_threshold else self.classes_[0] for p in probabilities[:, 1]]
TP, FP, FN, TN = self.compute_table(y, y_)
tpr = self.compute_tpr(TP, FP)
fpr = self.compute_fpr(FP, TN)
tprs.append(tpr)
fprs.append(fpr)
return np.median(tprs), np.median(fprs)
class MS2(MS):
def __init__(self, learner: BaseEstimator, val_split=0.4):
super().__init__(learner, val_split)
def optimize_threshold(self, y, probabilities):
tprs = [0, 1]
fprs = [0, 1]
candidate_thresholds = np.unique(probabilities[:, 1])
for candidate_threshold in candidate_thresholds:
y_ = [self.classes_[1] if p > candidate_threshold else self.classes_[0] for p in probabilities[:, 1]]
TP, FP, FN, TN = self.compute_table(y, y_)
tpr = self.compute_tpr(TP, FP)
fpr = self.compute_fpr(FP, TN)
if (tpr - fpr) > 0.25:
tprs.append(tpr)
fprs.append(fpr)
return np.median(tprs), np.median(fprs)
2021-01-07 17:58:48 +01:00
ClassifyAndCount = CC
AdjustedClassifyAndCount = ACC
ProbabilisticClassifyAndCount = PCC
ProbabilisticAdjustedClassifyAndCount = PACC
ExpectationMaximizationQuantifier = EMQ
HellingerDistanceY = HDy
2021-01-11 12:55:06 +01:00
ExplicitLossMinimisation = ELM
MedianSweep = MS
MedianSweep2 = MS2
2020-12-15 15:20:35 +01:00
class OneVsAll(AggregativeQuantifier):
"""
Allows any binary quantifier to perform quantification on single-label datasets. The method maintains one binary
quantifier for each class, and then l1-normalizes the outputs so that the class prevelences sum up to 1.
This variant was used, along with the ExplicitLossMinimization quantifier in
Gao, W., Sebastiani, F.: From classification to quantification in tweet sentiment analysis.
Social Network Analysis and Mining 6(19), 122 (2016)
"""
def __init__(self, binary_quantifier, n_jobs=-1):
self.binary_quantifier = binary_quantifier
self.n_jobs = n_jobs
2020-12-03 18:12:28 +01:00
def fit(self, data: LabelledCollection, fit_learner=True):
assert not data.binary, \
f'{self.__class__.__name__} expect non-binary data'
assert isinstance(self.binary_quantifier, BaseQuantifier), \
f'{self.binary_quantifier} does not seem to be a Quantifier'
2021-05-05 17:12:44 +02:00
assert fit_learner == True, 'fit_learner must be True'
self.dict_binary_quantifiers = {c: deepcopy(self.binary_quantifier) for c in data.classes_}
self.__parallel(self._delayed_binary_fit, data)
2020-12-03 18:12:28 +01:00
return self
def classify(self, instances):
# returns a matrix of shape (n,m) with n the number of instances and m the number of classes. The entry
# (i,j) is a binary value indicating whether instance i belongs to class j. The binary classifications are
# independent of each other, meaning that an instance can end up be attributed to 0, 1, or more classes.
classif_predictions_bin = self.__parallel(self._delayed_binary_classification, instances)
return classif_predictions_bin.T
def posterior_probabilities(self, instances):
# returns a matrix of shape (n,m,2) with n the number of instances and m the number of classes. The entry
# (i,j,1) (resp. (i,j,0)) is a value in [0,1] indicating the posterior probability that instance i belongs
# (resp. does not belong) to class j.
# The posterior probabilities are independent of each other, meaning that, in general, they do not sum
# up to one.
if not self.binary_quantifier.probabilistic:
raise NotImplementedError(f'{self.__class__.__name__} does not implement posterior_probabilities because '
f'the base quantifier {self.binary_quantifier.__class__.__name__} is not '
f'probabilistic')
posterior_predictions_bin = self.__parallel(self._delayed_binary_posteriors, instances)
return np.swapaxes(posterior_predictions_bin, 0, 1)
def aggregate(self, classif_predictions_bin):
if self.probabilistic:
2021-05-05 17:12:44 +02:00
assert classif_predictions_bin.shape[1] == self.n_classes and classif_predictions_bin.shape[2] == 2, \
'param classif_predictions_bin does not seem to be a valid matrix (ndarray) of posterior ' \
'probabilities (2 dimensions) for each document (row) and class (columns)'
else:
2021-05-05 17:12:44 +02:00
assert set(np.unique(classif_predictions_bin)).issubset({0, 1}), \
'param classif_predictions_bin does not seem to be a valid matrix (ndarray) of binary ' \
'predictions for each document (row) and class (columns)'
prevalences = self.__parallel(self._delayed_binary_aggregate, classif_predictions_bin)
return F.normalize_prevalence(prevalences)
def quantify(self, X):
if self.probabilistic:
predictions = self.posterior_probabilities(X)
else:
predictions = self.classify(X)
return self.aggregate(predictions)
def __parallel(self, func, *args, **kwargs):
return np.asarray(
# some quantifiers (in particular, ELM-based ones) cannot be run with multiprocess, since the temp dir they
# create during the fit will be removed and be no longer available for the predict...
Parallel(n_jobs=self.n_jobs, backend='threading')(
delayed(func)(c, *args, **kwargs) for c in self.classes_
2020-12-03 18:12:28 +01:00
)
)
@property
def classes_(self):
return sorted(self.dict_binary_quantifiers.keys())
2020-12-03 18:12:28 +01:00
def set_params(self, **parameters):
self.binary_quantifier.set_params(**parameters)
2020-12-03 18:12:28 +01:00
def get_params(self, deep=True):
return self.binary_quantifier.get_params()
def _delayed_binary_classification(self, c, X):
return self.dict_binary_quantifiers[c].preclassify(X)
def _delayed_binary_posteriors(self, c, X):
return self.dict_binary_quantifiers[c].posterior_probabilities(X)
def _delayed_binary_aggregate(self, c, classif_predictions):
2021-01-11 12:55:06 +01:00
# the estimation for the positive class prevalence
return self.dict_binary_quantifiers[c].aggregate(classif_predictions[:, c])[1]
2020-12-03 18:12:28 +01:00
def _delayed_binary_fit(self, c, data):
2021-05-05 17:12:44 +02:00
bindata = LabelledCollection(data.instances, data.labels == c, classes_=[False, True])
self.dict_binary_quantifiers[c].fit(bindata)
@property
def binary(self):
return False
@property
def probabilistic(self):
return self.binary_quantifier.probabilistic