292 lines
10 KiB
Python
292 lines
10 KiB
Python
from functools import cached_property
|
|
import numpy as np
|
|
import quapy as qp
|
|
import quapy.functional as F
|
|
from quapy.data import LabelledCollection
|
|
from quapy.method.aggregative import AggregativeQuantifier
|
|
from scipy.stats import chi2
|
|
from scipy.special import gamma
|
|
from sklearn.utils import resample
|
|
from abc import ABC, abstractmethod
|
|
from scipy.special import softmax, factorial
|
|
import copy
|
|
from functools import lru_cache
|
|
|
|
|
|
|
|
class ConfidenceRegionABC(ABC):
|
|
|
|
@abstractmethod
|
|
def point_estimate(self) -> np.ndarray:
|
|
...
|
|
|
|
def ndim(self):
|
|
return len(self.point_estimate())
|
|
|
|
@abstractmethod
|
|
def coverage(self, true_value):
|
|
...
|
|
|
|
@lru_cache
|
|
def simplex_portion(self):
|
|
return self.montecarlo_proportion()
|
|
|
|
@lru_cache
|
|
def montecarlo_proportion(self, n_trials=10_000):
|
|
with qp.util.temp_seed(0):
|
|
uniform_simplex = F.uniform_simplex_sampling(n_classes=self.ndim(), size=n_trials)
|
|
proportion = np.clip(self.coverage(uniform_simplex), 0., 1.)
|
|
return proportion
|
|
|
|
|
|
class WithConfidenceABC(ABC):
|
|
@abstractmethod
|
|
def quantify_conf(self, instances, confidence_level=None) -> (np.ndarray, ConfidenceRegionABC):
|
|
...
|
|
|
|
|
|
def simplex_volume(n):
|
|
return 1 / factorial(n)
|
|
|
|
|
|
def within_ellipse_prop(values, mean, prec_matrix, chi2_critical):
|
|
"""
|
|
Checks the proportion of values that belong to the ellipse with center `mean` and precision matrix `prec_matrix`
|
|
at a distance `chi2_critical`.
|
|
|
|
:param values: a np.ndarray with shape (ndim,) or (n_values,ndim,)
|
|
:param mean: a np.ndarray with the mean of the sample
|
|
:param prec_matrix: a np.ndarray with the precision matrix (inverse of the
|
|
covariance matrix) of the sample. If this inverse cannot be computed
|
|
then None must be passed
|
|
:param chi2_critical: the chi2 critical value
|
|
|
|
:return: the fraction of values that are contained in the ellipse
|
|
defined by the mean, the precision matrix, and the chi2_critical.
|
|
If values is only one value, then either 0 (not contained) or
|
|
1 (contained) is returned.
|
|
"""
|
|
if prec_matrix is None:
|
|
return 0.
|
|
|
|
diff = values - mean # Mahalanobis distance
|
|
|
|
d_M_squared = diff @ prec_matrix @ diff.T # d_M^2
|
|
if d_M_squared.ndim == 2:
|
|
d_M_squared = np.diag(d_M_squared)
|
|
|
|
within_elipse = (d_M_squared <= chi2_critical)
|
|
|
|
if isinstance(within_elipse, np.ndarray):
|
|
within_elipse = np.mean(within_elipse)
|
|
|
|
return within_elipse * 1.0
|
|
|
|
|
|
class ConfidenceEllipseSimplex(ConfidenceRegionABC):
|
|
|
|
def __init__(self, X, confidence_level=0.95):
|
|
|
|
assert 0. < confidence_level < 1., f'{confidence_level=} must be in range(0,1)'
|
|
|
|
X = np.asarray(X)
|
|
|
|
self.mean_ = X.mean(axis=0)
|
|
self.cov_ = np.cov(X, rowvar=False, ddof=1)
|
|
|
|
try:
|
|
self.precision_matrix_ = np.linalg.inv(self.cov_)
|
|
except:
|
|
self.precision_matrix_ = None
|
|
|
|
self.dim = X.shape[-1]
|
|
self.ddof = self.dim - 1
|
|
|
|
# critical chi-square value
|
|
self.confidence_level = confidence_level
|
|
self.chi2_critical_ = chi2.ppf(confidence_level, df=self.ddof)
|
|
|
|
def point_estimate(self):
|
|
return self.mean_
|
|
|
|
def coverage(self, true_value):
|
|
"""
|
|
true_value can be an array (n_dimensions,) or a matrix (n_vectors, n_dimensions,)
|
|
confidence_level None means that the confidence_level is taken from the __init__
|
|
returns true or false depending on whether true_value is in the ellipse or not,
|
|
or returns the proportion of true_values that are within the ellipse if more
|
|
than one are passed
|
|
"""
|
|
return within_ellipse_prop(true_value, self.mean_, self.precision_matrix_, self.chi2_critical_)
|
|
|
|
|
|
class ConfidenceEllipseCLR(ConfidenceRegionABC):
|
|
|
|
def __init__(self, X, confidence_level=0.95):
|
|
self.clr = CLRtransformation()
|
|
Z = self.clr(X)
|
|
self.mean_ = np.mean(X, axis=0)
|
|
self.conf_region_clr = ConfidenceEllipseSimplex(Z, confidence_level=confidence_level)
|
|
|
|
def point_estimate(self):
|
|
# Z_mean = self.conf_region_clr.mean()
|
|
# return self.clr.inverse(Z_mean)
|
|
# the inverse of the CLR does not coincide with the clean mean because the geometric mean
|
|
# requires smoothing the prevalence vectors and this affects the softmax (inverse)
|
|
return self.mean_
|
|
|
|
def coverage(self, true_value):
|
|
"""
|
|
true_value can be an array (n_dimensions,) or a matrix (n_vectors, n_dimensions,)
|
|
confidence_level None means that the confidence_level is taken from the __init__
|
|
returns true or false depending on whether true_value is in the ellipse or not,
|
|
or returns the proportion of true_values that are within the ellipse if more
|
|
than one are passed
|
|
"""
|
|
transformed_values = self.clr(true_value)
|
|
return self.conf_region_clr.coverage(transformed_values)
|
|
|
|
|
|
class ConfidenceIntervals(ConfidenceRegionABC):
|
|
|
|
def __init__(self, X, confidence_level=0.95):
|
|
assert 0 < confidence_level < 1, f'{confidence_level=} must be in range(0,1)'
|
|
|
|
X = np.asarray(X)
|
|
|
|
self.means_ = X.mean(axis=0)
|
|
self.I_low, self.I_high = np.percentile(X, q=[2.5, 97.5], axis=0)
|
|
|
|
def point_estimate(self):
|
|
return self.means_
|
|
|
|
def coverage(self, true_value):
|
|
"""
|
|
true_value can be an array (n_dimensions,) or a matrix (n_vectors, n_dimensions,)
|
|
returns true or false depending on whether true_value is in the ellipse or not,
|
|
or returns the proportion of true_values that are within the ellipse if more
|
|
than one are passed
|
|
"""
|
|
within_intervals = np.logical_and(self.I_low <= true_value, true_value <= self.I_high)
|
|
within_all_intervals = np.all(within_intervals, axis=-1, keepdims=True)
|
|
proportion = within_all_intervals.mean()
|
|
|
|
return proportion
|
|
|
|
|
|
class CLRtransformation:
|
|
"""
|
|
Centered log-ratio
|
|
"""
|
|
|
|
def __call__(self, X, epsilon=1e-6):
|
|
X = np.asarray(X)
|
|
X = qp.error.smooth(X, epsilon)
|
|
G = np.exp(np.mean(np.log(X), axis=-1, keepdims=True)) # geometric mean
|
|
return np.log(X / G)
|
|
|
|
def inverse(self, X):
|
|
return softmax(X, axis=-1)
|
|
|
|
|
|
class AggregativeBootstrap(WithConfidenceABC, AggregativeQuantifier):
|
|
|
|
METHODS = ['intervals', 'ellipse', 'ellipse-clr']
|
|
|
|
def __init__(self,
|
|
quantifier: AggregativeQuantifier,
|
|
n_train_samples=1,
|
|
n_test_samples=500,
|
|
confidence_level=0.95,
|
|
method='intervals',
|
|
random_state=None):
|
|
|
|
assert isinstance(quantifier, AggregativeQuantifier), \
|
|
f'base quantifier does not seem to be an instance of {AggregativeQuantifier.__name__}'
|
|
assert n_train_samples >= 1, \
|
|
f'{n_train_samples=} must be >= 1'
|
|
assert n_test_samples >= 1, \
|
|
f'{n_test_samples=} must be >= 1'
|
|
assert n_test_samples>1 or n_train_samples>1, \
|
|
f'either {n_test_samples=} or {n_train_samples=} must be >1'
|
|
assert method in self.METHODS, \
|
|
f'unknown method; valid ones are {self.METHODS}'
|
|
|
|
self.quantifier = quantifier
|
|
self.n_train_samples = n_train_samples
|
|
self.n_test_samples = n_test_samples
|
|
self.confidence_level = confidence_level
|
|
self.method = method
|
|
self.random_state = random_state
|
|
|
|
def _return_conf(self, prevs, confidence_level):
|
|
region = None
|
|
if self.method == 'intervals':
|
|
region = ConfidenceIntervals(prevs, confidence_level=confidence_level)
|
|
elif self.method == 'ellipse':
|
|
region = ConfidenceEllipseSimplex(prevs, confidence_level=confidence_level)
|
|
elif self.method == 'ellipse-clr':
|
|
region = ConfidenceEllipseCLR(prevs, confidence_level=confidence_level)
|
|
|
|
if region is None:
|
|
raise NotImplementedError(f'unknown method {self.method}')
|
|
|
|
return region
|
|
|
|
def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection):
|
|
self.quantifiers = []
|
|
if self.n_train_samples==1:
|
|
self.quantifier.aggregation_fit(classif_predictions, data)
|
|
self.quantifiers.append(self.quantifier)
|
|
else:
|
|
# model-based bootstrap (only on the aggregative part)
|
|
full_index = np.arange(len(data))
|
|
with qp.util.temp_seed(self.random_state):
|
|
for i in range(self.n_train_samples):
|
|
quantifier = copy.deepcopy(self.quantifier)
|
|
index = resample(full_index, n_samples=len(data))
|
|
classif_predictions_i = classif_predictions.sampling_from_index(index)
|
|
data_i = data.sampling_from_index(index)
|
|
quantifier.aggregation_fit(classif_predictions_i, data_i)
|
|
self.quantifiers.append(quantifier)
|
|
return self
|
|
|
|
def aggregate(self, classif_predictions: np.ndarray):
|
|
prev_mean, self.confidence = self.aggregate_conf(classif_predictions)
|
|
return prev_mean
|
|
|
|
def aggregate_conf(self, classif_predictions: np.ndarray, confidence_level=None):
|
|
if confidence_level is None:
|
|
confidence_level = self.confidence_level
|
|
|
|
n_samples = classif_predictions.shape[0]
|
|
prevs = []
|
|
with qp.util.temp_seed(self.random_state):
|
|
for quantifier in self.quantifiers:
|
|
for i in range(self.n_test_samples):
|
|
sample_i = resample(classif_predictions, n_samples=n_samples)
|
|
prev_i = quantifier.aggregate(sample_i)
|
|
prevs.append(prev_i)
|
|
|
|
conf = self._return_conf(prevs, confidence_level)
|
|
prev_estim = conf.point_estimate()
|
|
|
|
return prev_estim, conf
|
|
|
|
def fit(self, data: LabelledCollection, fit_classifier=True, val_split=None):
|
|
self.quantifier._check_init_parameters()
|
|
classif_predictions = self.quantifier.classifier_fit_predict(data, fit_classifier, predict_on=val_split)
|
|
self.aggregation_fit(classif_predictions, data)
|
|
return self
|
|
|
|
def quantify_conf(self, instances, confidence_level=None) -> (np.ndarray, ConfidenceRegionABC):
|
|
predictions = self.quantifier.classify(instances)
|
|
return self.aggregate_conf(predictions, confidence_level=confidence_level)
|
|
|
|
@property
|
|
def classifier(self):
|
|
return self.quantifier.classifier
|
|
|
|
def _classifier_method(self):
|
|
return self.quantifier._classifier_method()
|