adding readme to non-aggregative

This commit is contained in:
Alejandro Moreo Fernandez 2025-10-20 18:13:34 +02:00
parent 1fb8500e87
commit eafe486893
5 changed files with 163 additions and 66 deletions

View File

@ -36,7 +36,7 @@ with qp.util.temp_seed(0):
true_prev = shifted_test.prevalence() true_prev = shifted_test.prevalence()
# by calling "quantify_conf", we obtain the point estimate and the confidence intervals around it # by calling "quantify_conf", we obtain the point estimate and the confidence intervals around it
pred_prev, conf_intervals = pacc.quantify_conf(shifted_test.X) pred_prev, conf_intervals = pacc.predict_conf(shifted_test.X)
# conf_intervals is an instance of ConfidenceRegionABC, which provides some useful utilities like: # conf_intervals is an instance of ConfidenceRegionABC, which provides some useful utilities like:
# - coverage: a function which computes the fraction of true values that belong to the confidence region # - coverage: a function which computes the fraction of true values that belong to the confidence region

View File

@ -0,0 +1,23 @@
from sklearn.feature_extraction.text import CountVectorizer
import quapy as qp
from quapy.method.non_aggregative import ReadMe
import quapy.functional as F
reviews = qp.datasets.fetch_reviews('imdb').reduce(n_train=1000, random_state=0)
encode_0_1 = CountVectorizer(min_df=5, binary=True)
train, test = qp.data.preprocessing.instance_transformation(reviews, encode_0_1, inplace=True).train_test
readme = ReadMe(bootstrap_trials=100, bagging_trials=100, bagging_range=100, random_state=0, verbose=True)
readme.fit(*train.Xy)
for test_prev in [[0.25, 0.75], [0.5, 0.5], [0.75, 0.25]]:
sample = reviews.test.sampling(500, *test_prev, random_state=0)
prev_estim, conf = readme.predict_conf(sample.X)
err = qp.error.mae(sample.prevalence(), prev_estim)
print(f'true-prevalence={F.strprev(sample.prevalence())},\n'
f'predicted-prevalence={F.strprev(prev_estim)},\n'
f'MAE={err:.4f}')
print(conf)

View File

@ -10,6 +10,36 @@ from quapy.util import map_parallel
from .base import LabelledCollection from .base import LabelledCollection
def instance_transformation(dataset:Dataset, transformer, inplace=False):
"""
Transforms a :class:`quapy.data.base.Dataset` applying the `fit_transform` and `transform` functions
of a (sklearn's) transformer.
:param dataset: a :class:`quapy.data.base.Dataset` where the instances of training and test collections are
lists of str
:param transformer: TransformerMixin implementing `fit_transform` and `transform` functions
:param inplace: whether or not to apply the transformation inplace (True), or to a new copy (False, default)
:return: a new :class:`quapy.data.base.Dataset` with transformed instances (if inplace=False) or a reference to the
current Dataset (if inplace=True) where the instances have been transformed
"""
training_transformed = transformer.fit_transform(dataset.training.instances)
test_transformed = transformer.transform(dataset.test.instances)
if inplace:
dataset.training = LabelledCollection(training_transformed, dataset.training.labels, dataset.classes_)
dataset.test = LabelledCollection(test_transformed, dataset.test.labels, dataset.classes_)
if hasattr(transformer, 'vocabulary_'):
dataset.vocabulary = transformer.vocabulary_
return dataset
else:
training = LabelledCollection(training_transformed, dataset.training.labels.copy(), dataset.classes_)
test = LabelledCollection(test_transformed, dataset.test.labels.copy(), dataset.classes_)
if hasattr(transformer, 'vocabulary_'):
return Dataset(training, test, transformer.vocabulary_)
else:
return Dataset(training, test)
def text2tfidf(dataset:Dataset, min_df=3, sublinear_tf=True, inplace=False, **kwargs): def text2tfidf(dataset:Dataset, min_df=3, sublinear_tf=True, inplace=False, **kwargs):
""" """
Transforms a :class:`quapy.data.base.Dataset` of textual instances into a :class:`quapy.data.base.Dataset` of Transforms a :class:`quapy.data.base.Dataset` of textual instances into a :class:`quapy.data.base.Dataset` of
@ -29,18 +59,7 @@ def text2tfidf(dataset:Dataset, min_df=3, sublinear_tf=True, inplace=False, **kw
__check_type(dataset.test.instances, np.ndarray, str) __check_type(dataset.test.instances, np.ndarray, str)
vectorizer = TfidfVectorizer(min_df=min_df, sublinear_tf=sublinear_tf, **kwargs) vectorizer = TfidfVectorizer(min_df=min_df, sublinear_tf=sublinear_tf, **kwargs)
training_documents = vectorizer.fit_transform(dataset.training.instances) return instance_transformation(dataset, vectorizer, inplace)
test_documents = vectorizer.transform(dataset.test.instances)
if inplace:
dataset.training = LabelledCollection(training_documents, dataset.training.labels, dataset.classes_)
dataset.test = LabelledCollection(test_documents, dataset.test.labels, dataset.classes_)
dataset.vocabulary = vectorizer.vocabulary_
return dataset
else:
training = LabelledCollection(training_documents, dataset.training.labels.copy(), dataset.classes_)
test = LabelledCollection(test_documents, dataset.test.labels.copy(), dataset.classes_)
return Dataset(training, test, vectorizer.vocabulary_)
def reduce_columns(dataset: Dataset, min_df=5, inplace=False): def reduce_columns(dataset: Dataset, min_df=5, inplace=False):

View File

@ -88,18 +88,30 @@ class WithConfidenceABC(ABC):
METHODS = ['intervals', 'ellipse', 'ellipse-clr'] METHODS = ['intervals', 'ellipse', 'ellipse-clr']
@abstractmethod @abstractmethod
def quantify_conf(self, instances, confidence_level=None) -> (np.ndarray, ConfidenceRegionABC): def predict_conf(self, instances, confidence_level=0.95) -> (np.ndarray, ConfidenceRegionABC):
""" """
Adds the method `quantify_conf` to the interface. This method returns not only the point-estimate, but Adds the method `predict_conf` to the interface. This method returns not only the point-estimate, but
also the confidence region around it. also the confidence region around it.
:param instances: a np.ndarray of shape (n_instances, n_features,) :param instances: a np.ndarray of shape (n_instances, n_features,)
:confidence_level: float in (0, 1) :param confidence_level: float in (0, 1), default is 0.95
:return: a tuple (`point_estimate`, `conf_region`), where `point_estimate` is a np.ndarray of shape :return: a tuple (`point_estimate`, `conf_region`), where `point_estimate` is a np.ndarray of shape
(n_classes,) and `conf_region` is an object from :class:`ConfidenceRegionABC` (n_classes,) and `conf_region` is an object from :class:`ConfidenceRegionABC`
""" """
... ...
def quantify_conf(self, instances, confidence_level=0.95) -> (np.ndarray, ConfidenceRegionABC):
"""
Alias to `predict_conf`. This method returns not only the point-estimate, but
also the confidence region around it.
:param instances: a np.ndarray of shape (n_instances, n_features,)
:param confidence_level: float in (0, 1), default is 0.95
:return: a tuple (`point_estimate`, `conf_region`), where `point_estimate` is a np.ndarray of shape
(n_classes,) and `conf_region` is an object from :class:`ConfidenceRegionABC`
"""
return self.predict_conf(instances=instances, confidence_level=confidence_level)
@classmethod @classmethod
def construct_region(cls, prev_estims, confidence_level=0.95, method='intervals'): def construct_region(cls, prev_estims, confidence_level=0.95, method='intervals'):
""" """
@ -227,6 +239,7 @@ class ConfidenceEllipseCLR(ConfidenceRegionABC):
""" """
def __init__(self, X, confidence_level=0.95): def __init__(self, X, confidence_level=0.95):
X = np.asarray(X)
self.clr = CLRtransformation() self.clr = CLRtransformation()
Z = self.clr(X) Z = self.clr(X)
self.mean_ = np.mean(X, axis=0) self.mean_ = np.mean(X, axis=0)
@ -297,6 +310,9 @@ class ConfidenceIntervals(ConfidenceRegionABC):
return proportion return proportion
def __repr__(self):
return '['+', '.join(f'({low:.4f}, {high:.4f})' for (low,high) in zip(self.I_low, self.I_high))+']'
class CLRtransformation: class CLRtransformation:
""" """
@ -429,7 +445,7 @@ class AggregativeBootstrap(WithConfidenceABC, AggregativeQuantifier):
self.aggregation_fit(classif_predictions, labels) self.aggregation_fit(classif_predictions, labels)
return self return self
def quantify_conf(self, instances, confidence_level=None) -> (np.ndarray, ConfidenceRegionABC): def predict_conf(self, instances, confidence_level=None) -> (np.ndarray, ConfidenceRegionABC):
predictions = self.quantifier.classify(instances) predictions = self.quantifier.classify(instances)
return self.aggregate_conf(predictions, confidence_level=confidence_level) return self.aggregate_conf(predictions, confidence_level=confidence_level)
@ -549,7 +565,7 @@ class BayesianCC(AggregativeCrispQuantifier, WithConfidenceABC):
samples = self.sample_from_posterior(classif_predictions)[_bayesian.P_TEST_Y] samples = self.sample_from_posterior(classif_predictions)[_bayesian.P_TEST_Y]
return np.asarray(samples.mean(axis=0), dtype=float) return np.asarray(samples.mean(axis=0), dtype=float)
def quantify_conf(self, instances, confidence_level=None) -> (np.ndarray, ConfidenceRegionABC): def predict_conf(self, instances, confidence_level=None) -> (np.ndarray, ConfidenceRegionABC):
classif_predictions = self.classify(instances) classif_predictions = self.classify(instances)
point_estimate = self.aggregate(classif_predictions) point_estimate = self.aggregate(classif_predictions)
samples = self.get_prevalence_samples() # available after calling "aggregate" function samples = self.get_prevalence_samples() # available after calling "aggregate" function

View File

@ -1,11 +1,14 @@
from typing import Union, Callable from typing import Union, Callable
import numpy as np import numpy as np
from sklearn.feature_extraction.text import CountVectorizer from sklearn.feature_extraction.text import CountVectorizer
from sklearn.utils import resample
from sklearn.preprocessing import normalize
from method.confidence import WithConfidenceABC, ConfidenceRegionABC
from quapy.functional import get_divergence from quapy.functional import get_divergence
from quapy.data import LabelledCollection
from quapy.method.base import BaseQuantifier, BinaryQuantifier from quapy.method.base import BaseQuantifier, BinaryQuantifier
import quapy.functional as F import quapy.functional as F
from scipy.optimize import lsq_linear
class MaximumLikelihoodPrevalenceEstimation(BaseQuantifier): class MaximumLikelihoodPrevalenceEstimation(BaseQuantifier):
@ -149,53 +152,89 @@ class DMx(BaseQuantifier):
return F.argmin_prevalence(loss, n_classes, method=self.search) return F.argmin_prevalence(loss, n_classes, method=self.search)
# class ReadMe(BaseQuantifier): class ReadMe(BaseQuantifier, WithConfidenceABC):
#
# def __init__(self, bootstrap_trials=100, bootstrap_range=100, bagging_trials=100, bagging_range=25, **vectorizer_kwargs): def __init__(self,
# raise NotImplementedError('under development ...') bootstrap_trials=100,
# self.bootstrap_trials = bootstrap_trials bagging_trials=100,
# self.bootstrap_range = bootstrap_range bagging_range=250,
# self.bagging_trials = bagging_trials confidence_level=0.95,
# self.bagging_range = bagging_range region='intervals',
# self.vectorizer_kwargs = vectorizer_kwargs random_state=None,
# verbose=False):
# def fit(self, data: LabelledCollection): self.bootstrap_trials = bootstrap_trials
# X, y = data.Xy self.bagging_trials = bagging_trials
# self.vectorizer = CountVectorizer(binary=True, **self.vectorizer_kwargs) self.bagging_range = bagging_range
# X = self.vectorizer.fit_transform(X) self.confidence_level = confidence_level
# self.class_conditional_X = {i: X[y==i] for i in range(data.classes_)} self.region = region
# self.random_state = random_state
# def predict(self, X): self.verbose = verbose
# X = self.vectorizer.transform(X)
# def fit(self, X, y):
# # number of features self.rng = np.random.default_rng(self.random_state)
# num_docs, num_feats = X.shape self.classes_ = np.unique(y)
# n_features = X.shape[1]
# # bootstrap
# p_boots = [] if self.bagging_range is None:
# for _ in range(self.bootstrap_trials): self.bagging_range = int(np.sqrt(n_features))
# docs_idx = np.random.choice(num_docs, size=self.bootstra_range, replace=False)
# class_conditional_X = {i: X[docs_idx] for i, X in self.class_conditional_X.items()} Xsize = X.shape[0]
# Xboot = X[docs_idx]
# # Bootstrap loop
# # bagging self.Xboots, self.yboots = [], []
# p_bags = [] for _ in range(self.bootstrap_trials):
# for _ in range(self.bagging_trials): idx = self.rng.choice(Xsize, size=Xsize, replace=True)
# feat_idx = np.random.choice(num_feats, size=self.bagging_range, replace=False) self.Xboots.append(X[idx])
# class_conditional_Xbag = {i: X[:, feat_idx] for i, X in class_conditional_X.items()} self.yboots.append(y[idx])
# Xbag = Xboot[:,feat_idx]
# p = self.std_constrained_linear_ls(Xbag, class_conditional_Xbag) return self
# p_bags.append(p)
# p_boots.append(np.mean(p_bags, axis=0)) def predict_conf(self, X, confidence_level=0.95) -> (np.ndarray, ConfidenceRegionABC):
# from tqdm import tqdm
# p_mean = np.mean(p_boots, axis=0) n_features = X.shape[1]
# p_std = np.std(p_bags, axis=0)
# boots_prevalences = []
# return p_mean
# for Xboots, yboots in tqdm(
# zip(self.Xboots, self.yboots),
# def std_constrained_linear_ls(self, X, class_cond_X: dict): desc='bootstrap predictions', total=self.bootstrap_trials, disable=not self.verbose
# pass ):
bagging_estimates = []
for _ in range(self.bagging_trials):
feat_idx = self.rng.choice(n_features, size=self.bagging_range, replace=False)
Xboots_bagging = Xboots[:, feat_idx]
X_boots_bagging = X[:, feat_idx]
bagging_prev = self._quantify_iteration(Xboots_bagging, yboots, X_boots_bagging)
bagging_estimates.append(bagging_prev)
boots_prevalences.append(np.mean(bagging_estimates, axis=0))
conf = WithConfidenceABC.construct_region(boots_prevalences, confidence_level, method=self.region)
prev_estim = conf.point_estimate()
return prev_estim, conf
def predict(self, X):
prev_estim, _ = self.predict_conf(X)
return prev_estim
def _quantify_iteration(self, Xtr, ytr, Xte):
"""Single ReadMe estimate."""
n_classes = len(self.classes_)
PX_given_Y = np.zeros((n_classes, Xtr.shape[1]))
for i, c in enumerate(self.classes_):
PX_given_Y[i] = Xtr[ytr == c].sum(axis=0)
PX_given_Y = normalize(PX_given_Y, norm='l1', axis=1)
PX = np.asarray(Xte.sum(axis=0))
PX = normalize(PX, norm='l1', axis=1)
res = lsq_linear(A=PX_given_Y.T, b=PX.ravel(), bounds=(0, 1))
pY = np.maximum(res.x, 0)
return pY / pY.sum()
def _get_features_range(X): def _get_features_range(X):