merged from devel

This commit is contained in:
Alejandro Moreo Fernandez 2025-12-10 12:06:03 +01:00
commit 8c063afd1e
18 changed files with 1773 additions and 134 deletions

7
BayesianKDEy/TODO.txt Normal file
View File

@ -0,0 +1,7 @@
- Add other methods that natively provide uncertainty quantification methods?
- MPIW (Mean Prediction Interval Width): is the average of the amplitudes (w/o aggregating coverage whatsoever)
- Implement Interval Score or Winkler Score
- analyze across shift
- add Bayesian EM
- optimize also C and class_weight?

View File

@ -0,0 +1,215 @@
from numpy.ma.core import shape
from sklearn.base import BaseEstimator
import numpy as np
import quapy.util
from quapy.method._kdey import KDEBase
from quapy.method.confidence import WithConfidenceABC, ConfidenceRegionABC
from quapy.functional import CLRtransformation, ILRtransformation
from quapy.method.aggregative import AggregativeSoftQuantifier
from tqdm import tqdm
import quapy.functional as F
#import emcee
class BayesianKDEy(AggregativeSoftQuantifier, KDEBase, WithConfidenceABC):
"""
`Bayesian version of KDEy.
:param classifier: a scikit-learn's BaseEstimator, or None, in which case the classifier is taken to be
the one indicated in `qp.environ['DEFAULT_CLS']`
:param val_split: specifies the data used for generating classifier predictions. This specification
can be made as float in (0, 1) indicating the proportion of stratified held-out validation set to
be extracted from the training set; or as an integer (default 5), indicating that the predictions
are to be generated in a `k`-fold cross-validation manner (with this integer indicating the value
for `k`); or as a tuple `(X,y)` defining the specific set of data to use for validation. Set to
None when the method does not require any validation data, in order to avoid that some portion of
the training data be wasted.
:param num_warmup: number of warmup iterations for the MCMC sampler (default 500)
:param num_samples: number of samples to draw from the posterior (default 1000)
:param mcmc_seed: random seed for the MCMC sampler (default 0)
:param confidence_level: float in [0,1] to construct a confidence region around the point estimate (default 0.95)
:param region: string, set to `intervals` for constructing confidence intervals (default), or to
`ellipse` for constructing an ellipse in the probability simplex, or to `ellipse-clr` for
constructing an ellipse in the Centered-Log Ratio (CLR) unconstrained space.
:param verbose: bool, whether to display progress bar
"""
def __init__(self,
classifier: BaseEstimator=None,
fit_classifier=True,
val_split: int = 5,
kernel='gaussian',
bandwidth=0.1,
num_warmup: int = 500,
num_samples: int = 1_000,
mcmc_seed: int = 0,
confidence_level: float = 0.95,
region: str = 'intervals',
explore='simplex',
step_size=0.05,
temperature=1.,
verbose: bool = False):
if num_warmup <= 0:
raise ValueError(f'parameter {num_warmup=} must be a positive integer')
if num_samples <= 0:
raise ValueError(f'parameter {num_samples=} must be a positive integer')
assert explore in ['simplex', 'clr', 'ilr'], \
f'unexpected value for param {explore=}; valid ones are "simplex", "clr", and "ilr"'
assert temperature>0., f'temperature must be >0'
super().__init__(classifier, fit_classifier, val_split)
self.bandwidth = KDEBase._check_bandwidth(bandwidth, kernel)
self.kernel = self._check_kernel(kernel)
self.num_warmup = num_warmup
self.num_samples = num_samples
self.mcmc_seed = mcmc_seed
self.confidence_level = confidence_level
self.region = region
self.explore = explore
self.step_size = step_size
self.temperature = temperature
self.verbose = verbose
def aggregation_fit(self, classif_predictions, labels):
self.mix_densities = self.get_mixture_components(classif_predictions, labels, self.classes_, self.bandwidth, self.kernel)
return self
def aggregate(self, classif_predictions):
# self.prevalence_samples = self._bayesian_kde(classif_predictions, init=None, verbose=self.verbose)
self.prevalence_samples = self._bayesian_emcee(classif_predictions)
return self.prevalence_samples.mean(axis=0)
def predict_conf(self, instances, confidence_level=None) -> (np.ndarray, ConfidenceRegionABC):
if confidence_level is None:
confidence_level = self.confidence_level
classif_predictions = self.classify(instances)
point_estimate = self.aggregate(classif_predictions)
samples = self.prevalence_samples # available after calling "aggregate" function
region = WithConfidenceABC.construct_region(samples, confidence_level=confidence_level, method=self.region)
return point_estimate, region
def _bayesian_kde(self, X_probs, init=None, verbose=False):
"""
Bayes:
P(prev|data) = P(data|prev) P(prev) / P(data)
i.e.,
posterior = likelihood * prior / evidence
we assume the likelihood be:
prev @ [kde_i_likelihood(data) 1..i..n]
prior be uniform in simplex
"""
rng = np.random.default_rng(self.mcmc_seed)
kdes = self.mix_densities
test_densities = np.asarray([self.pdf(kde_i, X_probs, self.kernel) for kde_i in kdes])
def log_likelihood(prev, epsilon=1e-10):
test_likelihoods = prev @ test_densities
test_loglikelihood = np.log(test_likelihoods + epsilon)
return (1./self.temperature) * np.sum(test_loglikelihood)
# def log_prior(prev):
# todo: adapt to arbitrary prior knowledge (e.g., something around training prevalence)
# return 1/np.sum((prev-init)**2) # it is not 1 but we assume uniform, son anyway is an useless constant
# def log_prior(prev, alpha_scale=1000):
# alpha = np.array(init) * alpha_scale
# return dirichlet.logpdf(prev, alpha)
def log_prior(prev):
return 0
def sample_neighbour(prev, step_size):
# random-walk Metropolis-Hastings
d = len(prev)
neighbour = None
if self.explore=='simplex':
dir_noise = rng.normal(scale=step_size/np.sqrt(d), size=d)
neighbour = F.normalize_prevalence(prev + dir_noise, method='mapsimplex')
elif self.explore=='clr':
clr = CLRtransformation()
clr_point = clr(prev)
dir_noise = rng.normal(scale=step_size, size=d)
clr_neighbour = clr_point+dir_noise
neighbour = clr.inverse(clr_neighbour)
assert in_simplex(neighbour), 'wrong CLR transformation'
elif self.explore=='ilr':
ilr = ILRtransformation()
ilr_point = ilr(prev)
dir_noise = rng.normal(scale=step_size, size=d-1)
ilr_neighbour = ilr_point + dir_noise
neighbour = ilr.inverse(ilr_neighbour)
assert in_simplex(neighbour), 'wrong ILR transformation'
return neighbour
n_classes = X_probs.shape[1]
current_prev = F.uniform_prevalence(n_classes) if init is None else init
current_likelihood = log_likelihood(current_prev) + log_prior(current_prev)
# Metropolis-Hastings with adaptive rate
step_size = self.step_size
target_acceptance = 0.3
adapt_rate = 0.05
acceptance_history = []
samples = []
total_steps = self.num_samples + self.num_warmup
for i in tqdm(range(total_steps), total=total_steps, disable=not verbose):
proposed_prev = sample_neighbour(current_prev, step_size)
# probability of acceptance
proposed_likelihood = log_likelihood(proposed_prev) + log_prior(proposed_prev)
acceptance = proposed_likelihood - current_likelihood
# decide acceptance
accepted = np.log(rng.random()) < acceptance
if accepted:
current_prev = proposed_prev
current_likelihood = proposed_likelihood
samples.append(current_prev)
acceptance_history.append(1. if accepted else 0.)
# if i < self.num_warmup and i%10==0 and len(acceptance_history)>=100:
if i % 10 == 0 and len(acceptance_history) >= 100:
recent_accept_rate = np.mean(acceptance_history[-100:])
step_size *= np.exp(adapt_rate * (recent_accept_rate - target_acceptance))
# step_size = float(np.clip(step_size, min_step, max_step))
if i %100==0:
print(f'acceptance-rate={recent_accept_rate*100:.3f}%, step-size={step_size:.5f}')
# remove "warmup" initial iterations
samples = np.asarray(samples[self.num_warmup:])
return samples
def _bayesian_emcee(self, X_probs):
ndim = X_probs.shape[1]
nwalkers = 32
f = CLRtransformation()
def log_likelihood(unconstrained, test_densities, epsilon=1e-10):
prev = f.inverse(unconstrained)
test_likelihoods = prev @ test_densities
test_loglikelihood = np.log(test_likelihoods + epsilon)
return np.sum(test_loglikelihood)
kdes = self.mix_densities
test_densities = np.asarray([self.pdf(kde_i, X_probs, self.kernel) for kde_i in kdes])
# p0 = np.random.normal(nwalkers, ndim)
p0 = F.uniform_prevalence_sampling(ndim, nwalkers)
p0 = f(p0)
sampler = emcee.EnsembleSampler(nwalkers, ndim, log_likelihood, args=[test_densities])
state = sampler.run_mcmc(p0, self.num_warmup, skip_initial_state_check=True)
sampler.reset()
sampler.run_mcmc(state, self.num_samples, skip_initial_state_check=True)
samples = sampler.get_chain(flat=True)
samples = f.inverse(samples)
return samples
def in_simplex(x):
return np.all(x >= 0) and np.isclose(x.sum(), 1)

View File

@ -0,0 +1,196 @@
import os
import warnings
from os.path import join
from pathlib import Path
from sklearn.calibration import CalibratedClassifierCV
from sklearn.linear_model import LogisticRegression as LR
from sklearn.model_selection import GridSearchCV, StratifiedKFold
from copy import deepcopy as cp
import quapy as qp
from BayesianKDEy._bayeisan_kdey import BayesianKDEy
from build.lib.quapy.data import LabelledCollection
from quapy.method.aggregative import DistributionMatchingY as DMy, AggregativeQuantifier, EMQ
from quapy.method.base import BinaryQuantifier, BaseQuantifier
from quapy.model_selection import GridSearchQ
from quapy.data import Dataset
# from BayesianKDEy.plot_simplex import plot_prev_points, plot_prev_points_matplot
from quapy.method.confidence import ConfidenceIntervals, BayesianCC, PQ, WithConfidenceABC, AggregativeBootstrap
from quapy.functional import strprev
from quapy.method.aggregative import KDEyML, ACC
from quapy.protocol import UPP
import quapy.functional as F
import numpy as np
from tqdm import tqdm
from scipy.stats import dirichlet
from collections import defaultdict
from time import time
from sklearn.base import clone, BaseEstimator
class KDEyCLR(KDEyML):
def __init__(self, classifier: BaseEstimator=None, fit_classifier=True, val_split=5, bandwidth=1., random_state=None):
super().__init__(
classifier=classifier, fit_classifier=fit_classifier, val_split=val_split, bandwidth=bandwidth,
random_state=random_state, kernel='aitchison'
)
class KDEyILR(KDEyML):
def __init__(self, classifier: BaseEstimator=None, fit_classifier=True, val_split=5, bandwidth=1., random_state=None):
super().__init__(
classifier=classifier, fit_classifier=fit_classifier, val_split=val_split, bandwidth=bandwidth,
random_state=random_state, kernel='ilr'
)
def methods():
"""
Returns a tuple (name, quantifier, hyperparams, bayesian/bootstrap_constructor), where:
- name: is a str representing the name of the method (e.g., 'BayesianKDEy')
- quantifier: is the base model (e.g., KDEyML())
- hyperparams: is a dictionary for the quantifier (e.g., {'bandwidth': [0.001, 0.005, 0.01, 0.05, 0.1, 0.2]})
- bayesian/bootstrap_constructor: is a function that instantiates the bayesian o bootstrap method with the
quantifier with optimized hyperparameters
"""
acc_hyper = {}
emq_hyper = {'calib': ['nbvs', 'bcts', 'ts', 'vs']}
hdy_hyper = {'nbins': [3,4,5,8,16,32]}
kdey_hyper = {'bandwidth': [0.001, 0.005, 0.01, 0.05, 0.1, 0.2]}
kdey_hyper_clr = {'bandwidth': [0.05, 0.1, 0.5, 1., 2., 5.]}
multiclass_method = 'multiclass'
only_binary = 'only_binary'
only_multiclass = 'only_multiclass'
# yield 'BootstrapACC', ACC(LR()), acc_hyper, lambda hyper: AggregativeBootstrap(ACC(LR()), n_test_samples=1000, random_state=0), multiclass_method
# yield 'BayesianACC', ACC(LR()), acc_hyper, lambda hyper: BayesianCC(LR(), mcmc_seed=0), multiclass_method
yield 'BootstrapEMQ', EMQ(LR(), on_calib_error='backup', val_split=5), emq_hyper, lambda hyper: AggregativeBootstrap(EMQ(LR(), on_calib_error='backup', calib=hyper['calib'], val_split=5), n_test_samples=1000, random_state=0), multiclass_method
# yield 'BootstrapHDy', DMy(LR()), hdy_hyper, lambda hyper: AggregativeBootstrap(DMy(LR(), **hyper), n_test_samples=1000, random_state=0), multiclass_method
# yield 'BayesianHDy', DMy(LR()), hdy_hyper, lambda hyper: PQ(LR(), stan_seed=0, **hyper), only_binary
#
# yield 'BootstrapKDEy', KDEyML(LR()), kdey_hyper, lambda hyper: AggregativeBootstrap(KDEyML(LR(), **hyper), n_test_samples=1000, random_state=0, verbose=True), multiclass_method
# yield 'BayesianKDEy', KDEyML(LR()), kdey_hyper, lambda hyper: BayesianKDEy(mcmc_seed=0, **hyper), multiclass_method
# yield 'BayesianKDEy*', KDEyCLR(LR()), kdey_hyper_clr, lambda hyper: BayesianKDEy(kernel='aitchison', mcmc_seed=0, **hyper), multiclass_method
# yield 'BayKDEy*CLR', KDEyCLR(LR()), kdey_hyper_clr, lambda hyper: BayesianKDEy(kernel='aitchison', mcmc_seed=0, explore='clr', step_size=.15, **hyper), multiclass_method
# yield 'BayKDEy*CLR2', KDEyCLR(LR()), kdey_hyper_clr, lambda hyper: BayesianKDEy(kernel='aitchison', mcmc_seed=0, explore='clr', step_size=.05, **hyper), multiclass_method
# yield 'BayKDEy*ILR', KDEyCLR(LR()), kdey_hyper_clr, lambda hyper: BayesianKDEy(kernel='aitchison', mcmc_seed=0, explore='ilr', step_size=.15, **hyper), only_multiclass
# yield 'BayKDEy*ILR2', KDEyILR(LR()), kdey_hyper_clr, lambda hyper: BayesianKDEy(kernel='ilr', mcmc_seed=0, explore='ilr', step_size=.1, **hyper), only_multiclass
def model_selection(train: LabelledCollection, point_quantifier: AggregativeQuantifier, grid: dict):
with qp.util.temp_seed(0):
print(f'performing model selection for {point_quantifier.__class__.__name__} with grid {grid}')
# model selection
if len(grid)>0:
train, val = train.split_stratified(train_prop=0.6, random_state=0)
mod_sel = GridSearchQ(
model=point_quantifier,
param_grid=grid,
protocol=qp.protocol.UPP(val, repeats=250, random_state=0),
refit=False,
n_jobs=-1,
verbose=True
).fit(*train.Xy)
best_params = mod_sel.best_params_
else:
best_params = {}
return best_params
def experiment(dataset: Dataset, point_quantifier: AggregativeQuantifier, method_name:str, grid: dict, withconf_constructor, hyper_choice_path: Path):
with qp.util.temp_seed(0):
training, test = dataset.train_test
# model selection
best_hyperparams = qp.util.pickled_resource(
hyper_choice_path, model_selection, training, cp(point_quantifier), grid
)
t_init = time()
withconf_quantifier = withconf_constructor(best_hyperparams).fit(*training.Xy)
tr_time = time() - t_init
# test
train_prevalence = training.prevalence()
results = defaultdict(list)
test_generator = UPP(test, repeats=100, random_state=0)
for i, (sample_X, true_prevalence) in tqdm(enumerate(test_generator()), total=test_generator.total(), desc=f'{method_name} predictions'):
t_init = time()
point_estimate, region = withconf_quantifier.predict_conf(sample_X)
ttime = time()-t_init
results['true-prevs'].append(true_prevalence)
results['point-estim'].append(point_estimate)
results['shift'].append(qp.error.ae(true_prevalence, train_prevalence))
results['ae'].append(qp.error.ae(prevs_true=true_prevalence, prevs_hat=point_estimate))
results['rae'].append(qp.error.rae(prevs_true=true_prevalence, prevs_hat=point_estimate))
results['coverage'].append(region.coverage(true_prevalence))
results['amplitude'].append(region.montecarlo_proportion(n_trials=50_000))
results['test-time'].append(ttime)
results['samples'].append(region.samples)
report = {
'optim_hyper': best_hyperparams,
'train_time': tr_time,
'train-prev': train_prevalence,
'results': {k:np.asarray(v) for k,v in results.items()}
}
return report
def experiment_path(dir:Path, dataset_name:str, method_name:str):
os.makedirs(dir, exist_ok=True)
return dir/f'{dataset_name}__{method_name}.pkl'
if __name__ == '__main__':
binary = {
'datasets': qp.datasets.UCI_BINARY_DATASETS,
'fetch_fn': qp.datasets.fetch_UCIBinaryDataset,
'sample_size': 500
}
multiclass = {
'datasets': qp.datasets.UCI_MULTICLASS_DATASETS,
'fetch_fn': qp.datasets.fetch_UCIMulticlassDataset,
'sample_size': 1000
}
result_dir = Path('./results')
for setup in [binary, multiclass]: # [binary, multiclass]:
qp.environ['SAMPLE_SIZE'] = setup['sample_size']
for data_name in setup['datasets']:
print(f'dataset={data_name}')
# if data_name=='breast-cancer' or data_name.startswith("cmc") or data_name.startswith("ctg"):
# print(f'skipping dataset: {data_name}')
# continue
data = setup['fetch_fn'](data_name)
is_binary = data.n_classes==2
result_subdir = result_dir / ('binary' if is_binary else 'multiclass')
hyper_subdir = result_dir / 'hyperparams' / ('binary' if is_binary else 'multiclass')
for method_name, method, hyper_params, withconf_constructor, method_scope in methods():
if method_scope == 'only_binary' and not is_binary:
continue
if method_scope == 'only_multiclass' and is_binary:
continue
result_path = experiment_path(result_subdir, data_name, method_name)
hyper_path = experiment_path(hyper_subdir, data_name, method.__class__.__name__)
report = qp.util.pickled_resource(
result_path, experiment, data, method, method_name, hyper_params, withconf_constructor, hyper_path
)
print(f'dataset={data_name}, '
f'method={method_name}: '
f'mae={report["results"]["ae"].mean():.3f}, '
f'coverage={report["results"]["coverage"].mean():.5f}, '
f'amplitude={report["results"]["amplitude"].mean():.5f}, ')

View File

@ -0,0 +1,170 @@
import pickle
from collections import defaultdict
from joblib import Parallel, delayed
from tqdm import tqdm
import pandas as pd
from glob import glob
from pathlib import Path
import quapy as qp
from error import dist_aitchison
from quapy.method.confidence import ConfidenceIntervals
from quapy.method.confidence import ConfidenceEllipseSimplex, ConfidenceEllipseCLR, ConfidenceEllipseILR, ConfidenceIntervals, ConfidenceRegionABC
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 2000)
pd.set_option('display.max_rows', None)
pd.set_option("display.expand_frame_repr", False)
pd.set_option("display.precision", 4)
pd.set_option("display.float_format", "{:.4f}".format)
def region_score(true_prev, region: ConfidenceRegionABC):
amp = region.montecarlo_proportion(50_000)
if true_prev in region:
cost = 0
else:
scale_cost = 1/region.alpha
cost = scale_cost * dist_aitchison(true_prev, region.closest_point_in_region(true_prev))
return amp + cost
def compute_coverage_amplitude(region_constructor, **kwargs):
all_samples = results['samples']
all_true_prevs = results['true-prevs']
def process_one(samples, true_prevs):
region = region_constructor(samples, **kwargs)
if isinstance(region, ConfidenceIntervals):
winkler = region.mean_winkler_score(true_prevs)
else:
winkler = None
return region.coverage(true_prevs), region.montecarlo_proportion(), winkler
out = Parallel(n_jobs=3)(
delayed(process_one)(samples, true_prevs)
for samples, true_prevs in tqdm(
zip(all_samples, all_true_prevs),
total=len(all_samples),
desc='constructing ellipses'
)
)
# unzip results
coverage, amplitude, winkler = zip(*out)
return list(coverage), list(amplitude), list(winkler)
def update_pickle(report, pickle_path, updated_dict:dict):
for k,v in updated_dict.items():
report[k]=v
pickle.dump(report, open(pickle_path, 'wb'), protocol=pickle.HIGHEST_PROTOCOL)
def update_pickle_with_region(report, file, conf_name, conf_region_class, **kwargs):
if f'coverage-{conf_name}' not in report:
covs, amps, winkler = compute_coverage_amplitude(conf_region_class, **kwargs)
update_fields = {
f'coverage-{conf_name}': covs,
f'amplitude-{conf_name}': amps,
f'winkler-{conf_name}': winkler
}
update_pickle(report, file, update_fields)
methods = None # show all methods
# methods = ['BayesianACC', 'BayesianKDEy']
for setup in ['multiclass']:
path = f'./results/{setup}/*.pkl'
table = defaultdict(list)
for file in tqdm(glob(path), desc='processing results', total=len(glob(path))):
file = Path(file)
dataset, method = file.name.replace('.pkl', '').split('__')
if methods is not None and method not in methods:
continue
report = pickle.load(open(file, 'rb'))
results = report['results']
n_samples = len(results['ae'])
table['method'].extend([method.replace('Bayesian','Ba').replace('Bootstrap', 'Bo')] * n_samples)
table['dataset'].extend([dataset] * n_samples)
table['ae'].extend(results['ae'])
table['rae'].extend(results['rae'])
# table['c-CI'].extend(results['coverage'])
# table['a-CI'].extend(results['amplitude'])
update_pickle_with_region(report, file, conf_name='CI', conf_region_class=ConfidenceIntervals, bonferroni_correction=True)
update_pickle_with_region(report, file, conf_name='CE', conf_region_class=ConfidenceEllipseSimplex)
update_pickle_with_region(report, file, conf_name='CLR', conf_region_class=ConfidenceEllipseCLR)
update_pickle_with_region(report, file, conf_name='ILR', conf_region_class=ConfidenceEllipseILR)
table['c-CI'].extend(report['coverage-CI'])
table['a-CI'].extend(report['amplitude-CI'])
table['w-CI'].extend(report['winkler-CI'])
table['c-CE'].extend(report['coverage-CE'])
table['a-CE'].extend(report['amplitude-CE'])
table['c-CLR'].extend(report['coverage-CLR'])
table['a-CLR'].extend(report['amplitude-CLR'])
table['c-ILR'].extend(report['coverage-ILR'])
table['a-ILR'].extend(report['amplitude-ILR'])
table['aitch'].extend(qp.error.dist_aitchison(results['true-prevs'], results['point-estim']))
# table['aitch-well'].extend(qp.error.dist_aitchison(results['true-prevs'], [ConfidenceEllipseILR(samples).mean_ for samples in results['samples']]))
# table['aitch'].extend()
table['reg-score-ILR'].extend(
[region_score(true_prev, ConfidenceEllipseILR(samples)) for true_prev, samples in zip(results['true-prevs'], results['samples'])]
)
df = pd.DataFrame(table)
n_classes = {}
tr_size = {}
for dataset in df['dataset'].unique():
fetch_fn = {
'binary': qp.datasets.fetch_UCIBinaryDataset,
'multiclass': qp.datasets.fetch_UCIMulticlassDataset
}[setup]
data = fetch_fn(dataset)
n_classes[dataset] = data.n_classes
tr_size[dataset] = len(data.training)
# remove datasets with more than max_classes classes
# max_classes = 30
# min_train = 1000
# for data_name, n in n_classes.items():
# if n > max_classes:
# df = df[df["dataset"] != data_name]
# for data_name, n in tr_size.items():
# if n < min_train:
# df = df[df["dataset"] != data_name]
for region in ['ILR']: # , 'CI', 'CE', 'CLR', 'ILR']:
if setup == 'binary' and region=='ILR':
continue
# pv = pd.pivot_table(
# df, index='dataset', columns='method', values=['ae', f'c-{region}', f'a-{region}'], margins=True
# )
pv = pd.pivot_table(
df, index='dataset', columns='method', values=[
#f'w-{region}',
# 'ae',
# 'rae',
# f'aitch',
# f'aitch-well'
'reg-score-ILR',
], margins=True
)
pv['n_classes'] = pv.index.map(n_classes).astype('Int64')
pv['tr_size'] = pv.index.map(tr_size).astype('Int64')
pv = pv.drop(columns=[col for col in pv.columns if col[-1] == "All"])
print(f'{setup=}')
print(pv)
print('-'*80)

View File

@ -0,0 +1,258 @@
import os
import pickle
from pathlib import Path
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
from scipy.stats import gaussian_kde
from method.confidence import (ConfidenceIntervals as CI,
ConfidenceEllipseSimplex as CE,
ConfidenceEllipseCLR as CLR,
ConfidenceEllipseILR as ILR)
def get_region_colormap(name="blue", alpha=0.40):
name = name.lower()
if name == "blue":
base = (76/255, 114/255, 176/255)
elif name == "orange":
base = (221/255, 132/255, 82/255)
elif name == "violet":
base = (129/255, 114/255, 178/255)
else:
raise ValueError(f"Unknown palette name: {name}")
cmap = ListedColormap([
(1, 1, 1, 0), # 0: transparent white
(base[0], base[1], base[2], alpha) # 1: color
])
return cmap
def plot_prev_points(samples=None,
show_samples=True,
true_prev=None,
point_estim=None, train_prev=None, show_mean=True, show_legend=True,
region=None,
region_resolution=1000,
confine_region_in_simplex=False,
color='blue',
save_path=None):
plt.rcParams.update({
'font.size': 10, # tamaño base de todo el texto
'axes.titlesize': 12, # título del eje
'axes.labelsize': 10, # etiquetas de ejes
'xtick.labelsize': 8, # etiquetas de ticks
'ytick.labelsize': 8,
'legend.fontsize': 9, # leyenda
})
def cartesian(p):
dim = p.shape[-1]
p = p.reshape(-1,dim)
x = p[:, 1] + p[:, 2] * 0.5
y = p[:, 2] * np.sqrt(3) / 2
return x, y
def barycentric_from_xy(x, y):
"""
Given cartesian (x,y) in simplex returns baricentric coordinates (p1,p2,p3).
"""
p3 = 2 * y / np.sqrt(3)
p2 = x - 0.5 * p3
p1 = 1 - p2 - p3
return np.stack([p1, p2, p3], axis=-1)
# simplex coordinates
v1 = np.array([0, 0])
v2 = np.array([1, 0])
v3 = np.array([0.5, np.sqrt(3)/2])
# Plot
fig, ax = plt.subplots(figsize=(6, 6))
if region is not None:
if callable(region):
region_list = [("region", region)]
else:
region_list = region # lista de (name, fn)
if region is not None:
# rectangular mesh
x_min, x_max = -0.2, 1.2
y_min, y_max = -0.2, np.sqrt(3) / 2 + 0.2
xs = np.linspace(x_min, x_max, region_resolution)
ys = np.linspace(y_min, y_max, region_resolution)
grid_x, grid_y = np.meshgrid(xs, ys)
# barycentric
pts_bary = barycentric_from_xy(grid_x, grid_y)
# mask within simplex
if confine_region_in_simplex:
in_simplex = np.all(pts_bary >= 0, axis=-1)
else:
in_simplex = np.full(shape=(region_resolution, region_resolution), fill_value=True, dtype=bool)
# --- Colormap 0 → blanco, 1 → rojo semitransparente ---
# iterar sobre todas las regiones
for (rname, rfun) in region_list:
mask = np.zeros_like(in_simplex, dtype=float)
valid_pts = pts_bary[in_simplex]
mask_vals = np.array([float(rfun(p)) for p in valid_pts])
mask[in_simplex] = mask_vals
ax.pcolormesh(
xs, ys, mask,
shading='auto',
cmap=get_region_colormap(color),
alpha=0.3,
)
if samples is not None:
if show_samples:
ax.scatter(*cartesian(samples), s=15, alpha=0.5, edgecolors='none', label='samples', color='black', linewidth=0.5)
if show_mean is not None:
if isinstance(show_mean, np.ndarray):
ax.scatter(*cartesian(show_mean), s=10, alpha=1, label='sample-mean', edgecolors='black')
elif show_mean==True and samples is not None:
ax.scatter(*cartesian(samples.mean(axis=0)), s=10, alpha=1, label='sample-mean', edgecolors='black')
else:
raise ValueError(f'show_mean should either be a boolean (if True, then samples must be provided) or '
f'the mean point itself')
if train_prev is not None:
ax.scatter(*cartesian(true_prev), s=10, alpha=1, label='true-prev', edgecolors='black')
if point_estim is not None:
ax.scatter(*cartesian(point_estim), s=10, alpha=1, label='KDEy-estim', edgecolors='black')
if train_prev is not None:
ax.scatter(*cartesian(train_prev), s=10, alpha=1, label='train-prev', edgecolors='black')
# edges
triangle = np.array([v1, v2, v3, v1])
ax.plot(triangle[:, 0], triangle[:, 1], color='black')
# vertex labels
ax.text(-0.05, -0.05, "Y=1", ha='right', va='top')
ax.text(1.05, -0.05, "Y=2", ha='left', va='top')
ax.text(0.5, np.sqrt(3)/2 + 0.05, "Y=3", ha='center', va='bottom')
ax.set_aspect('equal')
ax.axis('off')
if show_legend:
plt.legend(
loc='center left',
bbox_to_anchor=(1.05, 0.5),
)
plt.tight_layout()
if save_path is None:
plt.show()
else:
os.makedirs(Path(save_path).parent, exist_ok=True)
plt.savefig(save_path)
def plot_prev_points_matplot(points):
# project 2D
v1 = np.array([0, 0])
v2 = np.array([1, 0])
v3 = np.array([0.5, np.sqrt(3) / 2])
x = points[:, 1] + points[:, 2] * 0.5
y = points[:, 2] * np.sqrt(3) / 2
# kde
xy = np.vstack([x, y])
kde = gaussian_kde(xy, bw_method=0.25)
xmin, xmax = 0, 1
ymin, ymax = 0, np.sqrt(3) / 2
# grid
xx, yy = np.mgrid[xmin:xmax:200j, ymin:ymax:200j]
positions = np.vstack([xx.ravel(), yy.ravel()])
zz = np.reshape(kde(positions).T, xx.shape)
# mask points in simplex
def in_triangle(x, y):
return (y >= 0) & (y <= np.sqrt(3) * np.minimum(x, 1 - x))
mask = in_triangle(xx, yy)
zz_masked = np.ma.array(zz, mask=~mask)
# plot
fig, ax = plt.subplots(figsize=(6, 6))
ax.imshow(
np.rot90(zz_masked),
cmap=plt.cm.viridis,
extent=[xmin, xmax, ymin, ymax],
alpha=0.8,
)
# Bordes del triángulo
triangle = np.array([v1, v2, v3, v1])
ax.plot(triangle[:, 0], triangle[:, 1], color='black', lw=2)
# Puntos (opcional)
ax.scatter(x, y, s=5, c='white', alpha=0.3)
# Etiquetas
ax.text(-0.05, -0.05, "A (1,0,0)", ha='right', va='top')
ax.text(1.05, -0.05, "B (0,1,0)", ha='left', va='top')
ax.text(0.5, np.sqrt(3) / 2 + 0.05, "C (0,0,1)", ha='center', va='bottom')
ax.set_aspect('equal')
ax.axis('off')
plt.show()
if __name__ == '__main__':
np.random.seed(1)
n = 1000
# alpha = [3,5,10]
alpha = [10,1,1]
prevs = np.random.dirichlet(alpha, size=n)
def regions():
confs = [0.99, 0.95, 0.90]
yield 'CI', [(f'{int(c*100)}%', CI(prevs, confidence_level=c).coverage) for c in confs]
# yield 'CI-b', [(f'{int(c * 100)}%', CI(prevs, confidence_level=c, bonferroni_correction=True).coverage) for c in confs]
# yield 'CE', [(f'{int(c*100)}%', CE(prevs, confidence_level=c).coverage) for c in confs]
# yield 'CLR', [(f'{int(c*100)}%', CLR(prevs, confidence_level=c).coverage) for c in confs]
# yield 'ILR', [(f'{int(c*100)}%', ILR(prevs, confidence_level=c).coverage) for c in confs]
# resolution = 1000
# alpha_str = ','.join([f'{str(i)}' for i in alpha])
# for crname, cr in regions():
# plot_prev_points(prevs, show_mean=True, show_legend=False, region=cr, region_resolution=resolution,
# color='blue',
# save_path=f'./plots/simplex_{crname}_alpha{alpha_str}_res{resolution}.png',
# )
def regions():
confs = [0.99, 0.95, 0.90]
yield 'CI', [(f'{int(c*100)}%', CI(prevs, confidence_level=c).coverage) for c in confs]
# yield 'CI-b', [(f'{int(c * 100)}%', CI(prevs, confidence_level=c, bonferroni_correction=True).coverage) for c in confs]
# yield 'CE', [(f'{int(c*100)}%', CE(prevs, confidence_level=c).coverage) for c in confs]
# yield 'CLR', [(f'{int(c*100)}%', CLR(prevs, confidence_level=c).coverage) for c in confs]
# yield 'ILR', [(f'{int(c*100)}%', ILR(prevs, confidence_level=c).coverage) for c in confs]
resolution = 1000
alpha_str = ','.join([f'{str(i)}' for i in alpha])
region = ILR(prevs, confidence_level=.99)
p = np.asarray([0.1, 0.8, 0.1])
plot_prev_points(prevs, show_samples=False,
show_mean=region.mean_,
# show_mean=prevs.mean(axis=0),
show_legend=False, region=[('', region.coverage)], region_resolution=resolution,
color='blue',
true_prev=p,
train_prev=region.closest_point_in_region(p),
save_path=f'./plots3/simplex_ilr.png',
)

View File

@ -0,0 +1,56 @@
import warnings
from sklearn.linear_model import LogisticRegression
import quapy as qp
from BayesianKDEy._bayeisan_kdey import BayesianKDEy
from BayesianKDEy.plot_simplex import plot_prev_points, plot_prev_points_matplot
from method.confidence import ConfidenceIntervals
from quapy.functional import strprev
from quapy.method.aggregative import KDEyML
from quapy.protocol import UPP
import quapy.functional as F
import numpy as np
from tqdm import tqdm
from scipy.stats import dirichlet
if __name__ == '__main__':
qp.environ["SAMPLE_SIZE"] = 500
cls = LogisticRegression()
bayes_kdey = BayesianKDEy(cls, bandwidth=.3, kernel='aitchison', mcmc_seed=0)
datasets = qp.datasets.UCI_BINARY_DATASETS
train, test = qp.datasets.fetch_UCIBinaryDataset(datasets[0]).train_test
# train, test = qp.datasets.fetch_UCIMulticlassDataset('academic-success', standardize=True).train_test
with qp.util.temp_seed(0):
print('fitting KDEy')
bayes_kdey.fit(*train.Xy)
shifted = test.sampling(500, *[0.2, 0.8])
# shifted = test.sampling(500, *test.prevalence()[::-1])
# shifted = test.sampling(500, *F.uniform_prevalence_sampling(train.n_classes))
prev_hat = bayes_kdey.predict(shifted.X)
mae = qp.error.mae(shifted.prevalence(), prev_hat)
print(f'true_prev={strprev(shifted.prevalence())}')
print(f'prev_hat={strprev(prev_hat)}, {mae=:.4f}')
prev_hat, conf_interval = bayes_kdey.predict_conf(shifted.X)
mae = qp.error.mae(shifted.prevalence(), prev_hat)
print(f'mean posterior {strprev(prev_hat)}, {mae=:.4f}')
print(f'CI={conf_interval}')
print(f'\tcontains true={conf_interval.coverage(true_value=shifted.prevalence())==1}')
print(f'\tamplitude={conf_interval.montecarlo_proportion(50_000)*100.:.3f}%')
if train.n_classes == 3:
plot_prev_points(bayes_kdey.prevalence_samples, true_prev=shifted.prevalence(), point_estim=prev_hat, train_prev=train.prevalence())
# plot_prev_points_matplot(samples)
# report = qp.evaluation.evaluation_report(kdey, protocol=UPP(test), verbose=True)
# print(report.mean(numeric_only=True))

View File

@ -0,0 +1,91 @@
import os
import warnings
from os.path import join
from pathlib import Path
from sklearn.calibration import CalibratedClassifierCV
from sklearn.linear_model import LogisticRegression as LR
from sklearn.model_selection import GridSearchCV, StratifiedKFold
from copy import deepcopy as cp
import quapy as qp
from BayesianKDEy._bayeisan_kdey import BayesianKDEy
from BayesianKDEy.full_experiments import experiment, experiment_path, KDEyCLR
from build.lib.quapy.data import LabelledCollection
from quapy.method.aggregative import DistributionMatchingY as DMy, AggregativeQuantifier
from quapy.method.base import BinaryQuantifier, BaseQuantifier
from quapy.model_selection import GridSearchQ
from quapy.data import Dataset
# from BayesianKDEy.plot_simplex import plot_prev_points, plot_prev_points_matplot
from quapy.method.confidence import ConfidenceIntervals, BayesianCC, PQ, WithConfidenceABC, AggregativeBootstrap
from quapy.functional import strprev
from quapy.method.aggregative import KDEyML, ACC
from quapy.protocol import UPP
import quapy.functional as F
import numpy as np
from tqdm import tqdm
from scipy.stats import dirichlet
from collections import defaultdict
from time import time
from sklearn.base import clone, BaseEstimator
def methods():
"""
Returns a tuple (name, quantifier, hyperparams, bayesian/bootstrap_constructor), where:
- name: is a str representing the name of the method (e.g., 'BayesianKDEy')
- quantifier: is the base model (e.g., KDEyML())
- hyperparams: is a dictionary for the quantifier (e.g., {'bandwidth': [0.001, 0.005, 0.01, 0.05, 0.1, 0.2]})
- bayesian/bootstrap_constructor: is a function that instantiates the bayesian o bootstrap method with the
quantifier with optimized hyperparameters
"""
acc_hyper = {}
hdy_hyper = {'nbins': [3,4,5,8,16,32]}
kdey_hyper = {'bandwidth': [0.001, 0.005, 0.01, 0.05, 0.1, 0.2]}
kdey_hyper_clr = {'bandwidth': [0.05, 0.1, 0.5, 1., 2., 5.]}
wrap_hyper = lambda dic: {f'quantifier__{k}':v for k,v in dic.items()}
# yield 'BootstrapKDEy', KDEyML(LR()), kdey_hyper, lambda hyper: AggregativeBootstrap(KDEyML(LR(), **hyper), n_test_samples=1000, random_state=0, verbose=True),
# yield 'BayesianKDEy', KDEyML(LR()), kdey_hyper, lambda hyper: BayesianKDEy(mcmc_seed=0, **hyper),
# for T in [10, 20, 50, 100., 500]:
# yield f'BaKDE-CLR-T{T}', KDEyCLR(LR()), kdey_hyper_clr, lambda hyper: BayesianKDEy(kernel='aitchison', explore='ilr', mcmc_seed=0, temperature=T, num_warmup=3000, num_samples=1000, step_size=.1, **hyper),
yield f'BaKDE-emcee', KDEyML(LR()), kdey_hyper, lambda hyper: BayesianKDEy(mcmc_seed=0, num_warmup=100, num_samples=100, step_size=.1, **hyper),
if __name__ == '__main__':
binary = {
'fetch_fn': qp.datasets.fetch_UCIBinaryDataset,
'sample_size': 500
}
multiclass = {
'fetch_fn': qp.datasets.fetch_UCIMulticlassDataset,
'sample_size': 1000
}
setup = multiclass
# data_name = 'isolet'
# data_name = 'cmc'
data_name = 'abalone'
qp.environ['SAMPLE_SIZE'] = setup['sample_size']
print(f'dataset={data_name}')
data = setup['fetch_fn'](data_name)
is_binary = data.n_classes==2
hyper_subdir = Path('./results') / 'hyperparams' / ('binary' if is_binary else 'multiclass')
for method_name, method, hyper_params, withconf_constructor in methods():
hyper_path = experiment_path(hyper_subdir, data_name, method.__class__.__name__)
report = experiment(data, method, method_name, hyper_params, withconf_constructor, hyper_path)
print(f'dataset={data_name}, '
f'method={method_name}: '
f'mae={report["results"]["ae"].mean():.3f}, '
f'coverage={report["results"]["coverage"].mean():.5f}, '
f'amplitude={report["results"]["amplitude"].mean():.5f}, ')

View File

@ -1,9 +1,13 @@
Change Log 0.2.1 Change Log 0.2.1
----------------- -----------------
- Added squared ratio error.
- Improved efficiency of confidence regions coverage functions
- Added Precise Quantifier to WithConfidence methods (a Bayesian adaptation of HDy)
- Improved documentation of confidence regions. - Improved documentation of confidence regions.
- Added ReadMe method by Daniel Hopkins and Gary King - Added ReadMe method by Daniel Hopkins and Gary King
- Internal index in LabelledCollection is now "lazy", and is only constructed if required. - Internal index in LabelledCollection is now "lazy", and is only constructed if required.
- I have added dist_aitchison and mean_dist_aitchison as a new error evaluation metric
Change Log 0.2.0 Change Log 0.2.0
----------------- -----------------

56
KDEyAitchison/commons.py Normal file
View File

@ -0,0 +1,56 @@
import numpy as np
import pandas as pd
from quapy.method.aggregative import EMQ, KDEyML, PACC
from sklearn.linear_model import LogisticRegression
METHODS = ['PACC',
'EMQ',
'KDEy-ML',
'KDEy-MLA'
]
# common hyperparameterss
hyper_LR = {
'classifier__C': np.logspace(-3, 3, 7),
'classifier__class_weight': ['balanced', None]
}
hyper_kde = {
'bandwidth': np.linspace(0.001, 0.5, 100)
}
hyper_kde_aitchison = {
'bandwidth': np.linspace(0.01, 2, 100)
}
# instances a new quantifier based on a string name
def new_method(method, **lr_kwargs):
lr = LogisticRegression(**lr_kwargs)
if method == 'KDEy-ML':
param_grid = {**hyper_kde, **hyper_LR}
quantifier = KDEyML(lr, kernel='gaussian')
elif method == 'KDEy-MLA':
param_grid = {**hyper_kde_aitchison, **hyper_LR}
quantifier = KDEyML(lr, kernel='aitchison')
elif method == 'EMQ':
param_grid = hyper_LR
quantifier = EMQ(lr)
elif method == 'PACC':
param_grid = hyper_LR
quantifier = PACC(lr)
else:
raise NotImplementedError('unknown method', method)
return param_grid, quantifier
def show_results(result_path):
df = pd.read_csv(result_path+'.csv', sep='\t')
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
pv = df.pivot_table(index='Dataset', columns="Method", values=["MAE", "MRAE"])
print(pv)

View File

@ -0,0 +1,38 @@
import pickle
import os
import sys
import pandas as pd
import quapy as qp
from quapy.model_selection import GridSearchQ
from quapy.protocol import UPP
from commons import METHODS, new_method, show_results
from new_table import LatexTable
SEED = 1
if __name__ == '__main__':
print(qp.datasets.UCI_MULTICLASS_DATASETS)
for optim in ['mae', 'mrae']:
table = LatexTable()
result_dir = f'results/ucimulti/{optim}'
for method in METHODS:
print()
global_result_path = f'{result_dir}/{method}'
print(f'Method\tDataset\tMAE\tMRAE\tKLD')
for dataset in qp.datasets.UCI_MULTICLASS_DATASETS:
# print(dataset)
local_result_path = global_result_path + '_' + dataset
if os.path.exists(local_result_path + '.dataframe'):
report = pd.read_csv(local_result_path+'.dataframe')
print(f'{method}\t{dataset}\t{report[optim].mean():.5f}')
table.add(benchmark=dataset, method=method, v=report[optim].values)
else:
print(dataset, 'not found for method', method)
table.latexPDF(f'./tables/{optim}.pdf', landscape=False)

View File

@ -0,0 +1,94 @@
import pickle
import os
import sys
import pandas as pd
import quapy as qp
from quapy.model_selection import GridSearchQ
from quapy.protocol import UPP
from commons import METHODS, new_method, show_results
SEED = 1
if __name__ == '__main__':
qp.environ['SAMPLE_SIZE'] = 500
qp.environ['N_JOBS'] = -1
n_bags_val = 250
n_bags_test = 1000
for optim in ['mae', 'mrae']:
result_dir = f'results/ucimulti/{optim}'
os.makedirs(result_dir, exist_ok=True)
for method in METHODS:
print('Init method', method)
global_result_path = f'{result_dir}/{method}'
# show_results(global_result_path)
# sys.exit(0)
if not os.path.exists(global_result_path + '.csv'):
with open(global_result_path + '.csv', 'wt') as csv:
csv.write(f'Method\tDataset\tMAE\tMRAE\tKLD\n')
with open(global_result_path + '.csv', 'at') as csv:
for dataset in qp.datasets.UCI_MULTICLASS_DATASETS:
print('init', dataset)
local_result_path = global_result_path + '_' + dataset
if os.path.exists(local_result_path + '.dataframe'):
print(f'result file {local_result_path}.dataframe already exist; skipping')
report = pd.read_csv(local_result_path+'.dataframe')
print(report["mae"].mean())
# data = qp.datasets.fetch_UCIMulticlassDataset(dataset)
# csv.write(f'{method}\t{data.name}\t{report["mae"].mean():.5f}\t{report["mrae"].mean():.5f}\t{report["kld"].mean():.5f}\n')
continue
with qp.util.temp_seed(SEED):
param_grid, quantifier = new_method(method, max_iter=3000)
data = qp.datasets.fetch_UCIMulticlassDataset(dataset)
# model selection
train, test = data.train_test
train, val = train.split_stratified(random_state=SEED)
protocol = UPP(val, repeats=n_bags_val)
modsel = GridSearchQ(
quantifier, param_grid, protocol, refit=True, n_jobs=-1, verbose=True, error=optim
)
try:
modsel.fit(*train.Xy)
print(f'best params {modsel.best_params_}')
print(f'best score {modsel.best_score_}')
pickle.dump(
(modsel.best_params_, modsel.best_score_,),
open(f'{local_result_path}.hyper.pkl', 'wb'), pickle.HIGHEST_PROTOCOL)
quantifier = modsel.best_model()
except:
print('something went wrong... trying to fit the default model')
quantifier.fit(*train.Xy)
protocol = UPP(test, repeats=n_bags_test)
report = qp.evaluation.evaluation_report(
quantifier, protocol, error_metrics=['mae', 'mrae', 'kld'], verbose=True
)
report.to_csv(f'{local_result_path}.dataframe')
print(f'{method}\t{data.name}\t{report["mae"].mean():.5f}\t{report["mrae"].mean():.5f}\t{report["kld"].mean():.5f}\n')
csv.write(f'{method}\t{data.name}\t{report["mae"].mean():.5f}\t{report["mrae"].mean():.5f}\t{report["kld"].mean():.5f}\n')
csv.flush()
show_results(global_result_path)

View File

@ -3,6 +3,7 @@
import numpy as np import numpy as np
from sklearn.metrics import f1_score from sklearn.metrics import f1_score
import quapy as qp import quapy as qp
from functional import CLRtransformation
def from_name(err_name): def from_name(err_name):
@ -128,6 +129,59 @@ def se(prevs_true, prevs_hat):
return ((prevs_hat - prevs_true) ** 2).mean(axis=-1) return ((prevs_hat - prevs_true) ** 2).mean(axis=-1)
def sre(prevs_true, prevs_hat, prevs_train, eps=0.):
"""
The squared ratio error is defined as
:math:`SRE(p,\\hat{p},p^{tr})=\\frac{1}{|\\mathcal{Y}|}\\sum_{i \\in \\mathcal{Y}}(w_i-\\hat{w}_i)^2`,
where
:math:`w_i=\\frac{p_i}{p^{tr}_i}=\\frac{Q(Y=i)}{P(Y=i)}`,
and `\\hat{w}_i` is the estimate obtained by replacing the true test prior with an estimate, and
:math:`\\mathcal{Y}` are the classes of interest
:param prevs_true: array-like, true prevalence values
:param prevs_hat: array-like, estimated prevalence values
:param prevs_train: array-like, training prevalence values
:param eps: float, for smoothing the prevalence values. It is 0 by default (no smoothing), meaning the
training prevalence is expected to be >0 everywhere.
:return: the squared ratio error
"""
prevs_true = np.asarray(prevs_true)
prevs_hat = np.asarray(prevs_hat)
prevs_train = np.asarray(prevs_train)
assert prevs_true.shape == prevs_hat.shape, f'wrong shape {prevs_true.shape=} vs {prevs_hat.shape=}'
assert prevs_true.shape[-1]==prevs_train.shape[-1], 'wrong shape for training prevalence'
if eps>0:
prevs_true = smooth(prevs_true, eps)
prevs_hat = smooth(prevs_hat, eps)
prevs_train = smooth(prevs_train, eps)
N = prevs_true.shape[-1]
w = prevs_true / prevs_train
w_hat = prevs_hat / prevs_train
return (1./N) * (w - w_hat)**2.
def msre(prevs_true, prevs_hat, prevs_train, eps=0.):
"""
Returns the mean across all experiments. See :function:`sre`.
:param prevs_true: array-like, true prevalence values of shape (n_experiments, n_classes,) or (n_classes,)
:param prevs_hat: array-like, estimated prevalence values of shape equal to prevs_true
:param prevs_train: array-like, training prevalence values of (n_experiments, n_classes,) or (n_classes,)
:param eps: float, for smoothing the prevalence values. It is 0 by default (no smoothing), meaning the
training prevalence is expected to be >0 everywhere.
:return: the squared ratio error
"""
return np.mean(squared_ratio_error(prevs_true, prevs_hat, prevs_train, eps))
def dist_aitchison(prevs_true, prevs_hat):
clr = CLRtransformation()
return np.linalg.norm(clr(prevs_true) - clr(prevs_hat), axis=-1)
def mean_dist_aitchison(prevs_true, prevs_hat):
return np.mean(dist_aitchison(prevs_true, prevs_hat))
def mkld(prevs_true, prevs_hat, eps=None): def mkld(prevs_true, prevs_hat, eps=None):
"""Computes the mean Kullback-Leibler divergence (see :meth:`quapy.error.kld`) across the """Computes the mean Kullback-Leibler divergence (see :meth:`quapy.error.kld`) across the
sample pairs. The distributions are smoothed using the `eps` factor sample pairs. The distributions are smoothed using the `eps` factor
@ -374,8 +428,8 @@ def __check_eps(eps=None):
CLASSIFICATION_ERROR = {f1e, acce} CLASSIFICATION_ERROR = {f1e, acce}
QUANTIFICATION_ERROR = {mae, mnae, mrae, mnrae, mse, mkld, mnkld} QUANTIFICATION_ERROR = {mae, mnae, mrae, mnrae, mse, mkld, mnkld, msre, mean_dist_aitchison}
QUANTIFICATION_ERROR_SINGLE = {ae, nae, rae, nrae, se, kld, nkld} QUANTIFICATION_ERROR_SINGLE = {ae, nae, rae, nrae, se, kld, nkld, sre, dist_aitchison}
QUANTIFICATION_ERROR_SMOOTH = {kld, nkld, rae, nrae, mkld, mnkld, mrae} QUANTIFICATION_ERROR_SMOOTH = {kld, nkld, rae, nrae, mkld, mnkld, mrae}
CLASSIFICATION_ERROR_NAMES = {func.__name__ for func in CLASSIFICATION_ERROR} CLASSIFICATION_ERROR_NAMES = {func.__name__ for func in CLASSIFICATION_ERROR}
QUANTIFICATION_ERROR_NAMES = {func.__name__ for func in QUANTIFICATION_ERROR} QUANTIFICATION_ERROR_NAMES = {func.__name__ for func in QUANTIFICATION_ERROR}
@ -387,6 +441,7 @@ ERROR_NAMES = \
f1_error = f1e f1_error = f1e
acc_error = acce acc_error = acce
mean_absolute_error = mae mean_absolute_error = mae
squared_ratio_error = sre
absolute_error = ae absolute_error = ae
mean_relative_absolute_error = mrae mean_relative_absolute_error = mrae
relative_absolute_error = rae relative_absolute_error = rae

View File

@ -1,10 +1,15 @@
import warnings import warnings
from abc import ABC, abstractmethod
from collections import defaultdict from collections import defaultdict
from functools import lru_cache
from typing import Literal, Union, Callable from typing import Literal, Union, Callable
from numpy.typing import ArrayLike from numpy.typing import ArrayLike
import scipy import scipy
import numpy as np import numpy as np
from scipy.special import softmax
import quapy as qp
# ------------------------------------------------------------------------------------------ # ------------------------------------------------------------------------------------------
@ -583,8 +588,8 @@ def solve_adjustment(
""" """
Function that tries to solve for :math:`p` the equation :math:`q = M p`, where :math:`q` is the vector of Function that tries to solve for :math:`p` the equation :math:`q = M p`, where :math:`q` is the vector of
`unadjusted counts` (as estimated, e.g., via classify and count) with :math:`q_i` an estimate of `unadjusted counts` (as estimated, e.g., via classify and count) with :math:`q_i` an estimate of
:math:`P(\hat{Y}=y_i)`, and where :math:`M` is the matrix of `class-conditional rates` with :math:`M_{ij}` an :math:`P(\\hat{Y}=y_i)`, and where :math:`M` is the matrix of `class-conditional rates` with :math:`M_{ij}` an
estimate of :math:`P(\hat{Y}=y_i|Y=y_j)`. estimate of :math:`P(\\hat{Y}=y_i|Y=y_j)`.
:param class_conditional_rates: array of shape `(n_classes, n_classes,)` with entry `(i,j)` being the estimate :param class_conditional_rates: array of shape `(n_classes, n_classes,)` with entry `(i,j)` being the estimate
of :math:`P(\hat{Y}=y_i|Y=y_j)`, that is, the probability that an instance that belongs to class :math:`y_j` of :math:`P(\hat{Y}=y_i|Y=y_j)`, that is, the probability that an instance that belongs to class :math:`y_j`
@ -649,3 +654,96 @@ def solve_adjustment(
raise ValueError(f'unknown {solver=}') raise ValueError(f'unknown {solver=}')
# ------------------------------------------------------------------------------------------
# Transformations from Compositional analysis
# ------------------------------------------------------------------------------------------
class CompositionalTransformation(ABC):
"""
Abstract class of transformations from compositional data.
Basically, callable functions with an "inverse" function.
"""
@abstractmethod
def __call__(self, X): ...
@abstractmethod
def inverse(self, Z): ...
EPSILON=1e-6
class CLRtransformation(CompositionalTransformation):
"""
Centered log-ratio (CLR), from compositional analysis
"""
def __call__(self, X):
"""
Applies the CLR function to X thus mapping the instances, which are contained in `\\mathcal{R}^{n}` but
actually lie on a `\\mathcal{R}^{n-1}` simplex, onto an unrestricted space in :math:`\\mathcal{R}^{n}`
:param X: np.ndarray of (n_instances, n_dimensions) to be transformed
:param epsilon: small float for prevalence smoothing
:return: np.ndarray of (n_instances, n_dimensions), the CLR-transformed points
"""
X = np.asarray(X)
X = qp.error.smooth(X, self.EPSILON)
G = np.exp(np.mean(np.log(X), axis=-1, keepdims=True)) # geometric mean
return np.log(X / G)
def inverse(self, Z):
"""
Inverse function. However, clr.inverse(clr(X)) does not exactly coincide with X due to smoothing.
:param Z: np.ndarray of (n_instances, n_dimensions) to be transformed
:return: np.ndarray of (n_instances, n_dimensions), the CLR-transformed points
"""
return scipy.special.softmax(Z, axis=-1)
class ILRtransformation(CompositionalTransformation):
"""
Isometric log-ratio (ILR), from compositional analysis
"""
def __call__(self, X):
X = np.asarray(X)
X = qp.error.smooth(X, self.EPSILON)
k = X.shape[-1]
V = self.get_V(k) # (k-1, k)
logp = np.log(X)
return logp @ V.T
def inverse(self, Z):
Z = np.asarray(Z)
# get dimension
k_minus_1 = Z.shape[-1]
k = k_minus_1 + 1
V = self.get_V(k) # (k-1, k)
logp = Z @ V
p = np.exp(logp)
p = p / np.sum(p, axis=-1, keepdims=True)
return p
@lru_cache(maxsize=None)
def get_V(self, k):
def helmert_matrix(k):
"""
Returns the (k x k) Helmert matrix.
"""
H = np.zeros((k, k))
for i in range(1, k):
H[i, :i] = 1
H[i, i] = -(i)
H[i] = H[i] / np.sqrt(i * (i + 1))
# row 0 stays zeros; will be discarded
return H
def ilr_basis(k):
"""
Constructs an orthonormal ILR basis using the Helmert submatrix.
Output shape: (k-1, k)
"""
H = helmert_matrix(k)
V = H[1:, :] # remove first row of zeros
return V
return ilr_basis(k)

View File

@ -1,6 +1,10 @@
""" """
Utility functions for `Bayesian quantification <https://arxiv.org/abs/2302.09159>`_ methods. Utility functions for `Bayesian quantification <https://arxiv.org/abs/2302.09159>`_ methods.
""" """
import contextlib
import os
import sys
import numpy as np import numpy as np
import importlib.resources import importlib.resources
@ -10,6 +14,8 @@ try:
import numpyro import numpyro
import numpyro.distributions as dist import numpyro.distributions as dist
import stan import stan
import logging
import stan.common
DEPENDENCIES_INSTALLED = True DEPENDENCIES_INSTALLED = True
except ImportError: except ImportError:
@ -86,6 +92,18 @@ def sample_posterior(
def load_stan_file(): def load_stan_file():
return importlib.resources.files('quapy.method').joinpath('stan/pq.stan').read_text(encoding='utf-8') return importlib.resources.files('quapy.method').joinpath('stan/pq.stan').read_text(encoding='utf-8')
@contextlib.contextmanager
def _suppress_stan_logging():
with open(os.devnull, "w") as devnull:
old_stderr = sys.stderr
sys.stderr = devnull
try:
yield
finally:
sys.stderr = old_stderr
def pq_stan(stan_code, n_bins, pos_hist, neg_hist, test_hist, number_of_samples, num_warmup, stan_seed): def pq_stan(stan_code, n_bins, pos_hist, neg_hist, test_hist, number_of_samples, num_warmup, stan_seed):
""" """
Perform Bayesian prevalence estimation using a Stan model for probabilistic quantification. Perform Bayesian prevalence estimation using a Stan model for probabilistic quantification.
@ -121,6 +139,8 @@ def pq_stan(stan_code, n_bins, pos_hist, neg_hist, test_hist, number_of_samples,
Each element corresponds to one draw from the posterior distribution. Each element corresponds to one draw from the posterior distribution.
""" """
logging.getLogger("stan.common").setLevel(logging.ERROR)
stan_data = { stan_data = {
'n_bucket': n_bins, 'n_bucket': n_bins,
'train_neg': neg_hist.tolist(), 'train_neg': neg_hist.tolist(),
@ -129,7 +149,8 @@ def pq_stan(stan_code, n_bins, pos_hist, neg_hist, test_hist, number_of_samples,
'posterior': 1 'posterior': 1
} }
stan_model = stan.build(stan_code, data=stan_data, random_seed=stan_seed) with _suppress_stan_logging():
fit = stan_model.sample(num_chains=1, num_samples=number_of_samples,num_warmup=num_warmup) stan_model = stan.build(stan_code, data=stan_data, random_seed=stan_seed)
fit = stan_model.sample(num_chains=1, num_samples=number_of_samples,num_warmup=num_warmup)
return fit['prev'] return fit['prev']

View File

@ -15,9 +15,11 @@ class KDEBase:
""" """
BANDWIDTH_METHOD = ['scott', 'silverman'] BANDWIDTH_METHOD = ['scott', 'silverman']
KERNELS = ['gaussian', 'aitchison', 'ilr']
@classmethod @classmethod
def _check_bandwidth(cls, bandwidth): def _check_bandwidth(cls, bandwidth, kernel):
""" """
Checks that the bandwidth parameter is correct Checks that the bandwidth parameter is correct
@ -27,32 +29,56 @@ class KDEBase:
assert bandwidth in KDEBase.BANDWIDTH_METHOD or isinstance(bandwidth, float), \ assert bandwidth in KDEBase.BANDWIDTH_METHOD or isinstance(bandwidth, float), \
f'invalid bandwidth, valid ones are {KDEBase.BANDWIDTH_METHOD} or float values' f'invalid bandwidth, valid ones are {KDEBase.BANDWIDTH_METHOD} or float values'
if isinstance(bandwidth, float): if isinstance(bandwidth, float):
assert 0 < bandwidth < 1, \ assert kernel!='gaussian' or (0 < bandwidth < 1), \
"the bandwith for KDEy should be in (0,1), since this method models the unit simplex" ("the bandwidth for a Gaussian kernel in KDEy should be in (0,1), "
"since this method models the unit simplex")
return bandwidth return bandwidth
def get_kde_function(self, X, bandwidth): @classmethod
def _check_kernel(cls, kernel):
"""
Checks that the kernel parameter is correct
:param kernel: str
:return: the validated kernel
"""
assert kernel in KDEBase.KERNELS, f'unknown {kernel=}'
return kernel
def get_kde_function(self, X, bandwidth, kernel):
""" """
Wraps the KDE function from scikit-learn. Wraps the KDE function from scikit-learn.
:param X: data for which the density function is to be estimated :param X: data for which the density function is to be estimated
:param bandwidth: the bandwidth of the kernel :param bandwidth: the bandwidth of the kernel
:param kernel: the kernel (valid ones are in KDEBase.KERNELS)
:return: a scikit-learn's KernelDensity object :return: a scikit-learn's KernelDensity object
""" """
if kernel == 'aitchison':
X = self.clr_transform(X)
elif kernel == 'ilr':
X = self.ilr_transform(X)
return KernelDensity(bandwidth=bandwidth).fit(X) return KernelDensity(bandwidth=bandwidth).fit(X)
def pdf(self, kde, X): def pdf(self, kde, X, kernel):
""" """
Wraps the density evalution of scikit-learn's KDE. Scikit-learn returns log-scores (s), so this Wraps the density evalution of scikit-learn's KDE. Scikit-learn returns log-scores (s), so this
function returns :math:`e^{s}` function returns :math:`e^{s}`
:param kde: a previously fit KDE function :param kde: a previously fit KDE function
:param X: the data for which the density is to be estimated :param X: the data for which the density is to be estimated
:param kernel: the kernel (valid ones are in KDEBase.KERNELS)
:return: np.ndarray with the densities :return: np.ndarray with the densities
""" """
if kernel == 'aitchison':
X = self.clr_transform(X)
elif kernel == 'ilr':
X = self.ilr_transform(X)
return np.exp(kde.score_samples(X)) return np.exp(kde.score_samples(X))
def get_mixture_components(self, X, y, classes, bandwidth): def get_mixture_components(self, X, y, classes, bandwidth, kernel):
""" """
Returns an array containing the mixture components, i.e., the KDE functions for each class. Returns an array containing the mixture components, i.e., the KDE functions for each class.
@ -60,6 +86,7 @@ class KDEBase:
:param y: the class labels :param y: the class labels
:param n_classes: integer, the number of classes :param n_classes: integer, the number of classes
:param bandwidth: float, the bandwidth of the kernel :param bandwidth: float, the bandwidth of the kernel
:param kernel: the kernel (valid ones are in KDEBase.KERNELS)
:return: a list of KernelDensity objects, each fitted with the corresponding class-specific covariates :return: a list of KernelDensity objects, each fitted with the corresponding class-specific covariates
""" """
class_cond_X = [] class_cond_X = []
@ -67,8 +94,19 @@ class KDEBase:
selX = X[y==cat] selX = X[y==cat]
if selX.size==0: if selX.size==0:
selX = [F.uniform_prevalence(len(classes))] selX = [F.uniform_prevalence(len(classes))]
class_cond_X.append(selX) class_cond_X.append(selX)
return [self.get_kde_function(X_cond_yi, bandwidth) for X_cond_yi in class_cond_X] return [self.get_kde_function(X_cond_yi, bandwidth, kernel) for X_cond_yi in class_cond_X]
def clr_transform(self, X):
if not hasattr(self, 'clr'):
self.clr = F.CLRtransformation()
return self.clr(X)
def ilr_transform(self, X):
if not hasattr(self, 'ilr'):
self.ilr = F.ILRtransformation()
return self.ilr(X)
class KDEyML(AggregativeSoftQuantifier, KDEBase): class KDEyML(AggregativeSoftQuantifier, KDEBase):
@ -107,17 +145,19 @@ class KDEyML(AggregativeSoftQuantifier, KDEBase):
are to be generated in a `k`-fold cross-validation manner (with this integer indicating the value are to be generated in a `k`-fold cross-validation manner (with this integer indicating the value
for `k`); or as a tuple (X,y) defining the specific set of data to use for validation. for `k`); or as a tuple (X,y) defining the specific set of data to use for validation.
:param bandwidth: float, the bandwidth of the Kernel :param bandwidth: float, the bandwidth of the Kernel
:param kernel: kernel of KDE, valid ones are in KDEBase.KERNELS
:param random_state: a seed to be set before fitting any base quantifier (default None) :param random_state: a seed to be set before fitting any base quantifier (default None)
""" """
def __init__(self, classifier: BaseEstimator=None, fit_classifier=True, val_split=5, bandwidth=0.1, def __init__(self, classifier: BaseEstimator=None, fit_classifier=True, val_split=5, bandwidth=0.1, kernel='gaussian',
random_state=None): random_state=None):
super().__init__(classifier, fit_classifier, val_split) super().__init__(classifier, fit_classifier, val_split)
self.bandwidth = KDEBase._check_bandwidth(bandwidth) self.bandwidth = KDEBase._check_bandwidth(bandwidth, kernel)
self.kernel = self._check_kernel(kernel)
self.random_state=random_state self.random_state=random_state
def aggregation_fit(self, classif_predictions, labels): def aggregation_fit(self, classif_predictions, labels):
self.mix_densities = self.get_mixture_components(classif_predictions, labels, self.classes_, self.bandwidth) self.mix_densities = self.get_mixture_components(classif_predictions, labels, self.classes_, self.bandwidth, self.kernel)
return self return self
def aggregate(self, posteriors: np.ndarray): def aggregate(self, posteriors: np.ndarray):
@ -131,7 +171,7 @@ class KDEyML(AggregativeSoftQuantifier, KDEBase):
with qp.util.temp_seed(self.random_state): with qp.util.temp_seed(self.random_state):
epsilon = 1e-10 epsilon = 1e-10
n_classes = len(self.mix_densities) n_classes = len(self.mix_densities)
test_densities = [self.pdf(kde_i, posteriors) for kde_i in self.mix_densities] test_densities = [self.pdf(kde_i, posteriors, self.kernel) for kde_i in self.mix_densities]
def neg_loglikelihood(prev): def neg_loglikelihood(prev):
# test_mixture_likelihood = sum(prev_i * dens_i for prev_i, dens_i in zip (prev, test_densities)) # test_mixture_likelihood = sum(prev_i * dens_i for prev_i, dens_i in zip (prev, test_densities))
@ -192,7 +232,7 @@ class KDEyHD(AggregativeSoftQuantifier, KDEBase):
super().__init__(classifier, fit_classifier, val_split) super().__init__(classifier, fit_classifier, val_split)
self.divergence = divergence self.divergence = divergence
self.bandwidth = KDEBase._check_bandwidth(bandwidth) self.bandwidth = KDEBase._check_bandwidth(bandwidth, kernel='gaussian')
self.random_state=random_state self.random_state=random_state
self.montecarlo_trials = montecarlo_trials self.montecarlo_trials = montecarlo_trials
@ -279,7 +319,7 @@ class KDEyCS(AggregativeSoftQuantifier):
def __init__(self, classifier: BaseEstimator=None, fit_classifier=True, val_split=5, bandwidth=0.1): def __init__(self, classifier: BaseEstimator=None, fit_classifier=True, val_split=5, bandwidth=0.1):
super().__init__(classifier, fit_classifier, val_split) super().__init__(classifier, fit_classifier, val_split)
self.bandwidth = KDEBase._check_bandwidth(bandwidth) self.bandwidth = KDEBase._check_bandwidth(bandwidth, kernel='gaussian')
def gram_matrix_mix_sum(self, X, Y=None): def gram_matrix_mix_sum(self, X, Y=None):
# this adapts the output of the rbf_kernel function (pairwise evaluations of Gaussian kernels k(x,y)) # this adapts the output of the rbf_kernel function (pairwise evaluations of Gaussian kernels k(x,y))

View File

@ -673,7 +673,7 @@ class PACC(AggregativeSoftQuantifier):
class EMQ(AggregativeSoftQuantifier): class EMQ(AggregativeSoftQuantifier):
""" """
`Expectation Maximization for Quantification <https://ieeexplore.ieee.org/abstract/document/6789744>`_ (EMQ), `Expectation Maximization for Quantification <https://ieeexplore.ieee.org/abstract/document/6789744>`_ (EMQ),
aka `Saerens-Latinne-Decaestecker` (SLD) algorithm. aka `Saerens-Latinne-Decaestecker` (SLD) algorithm, or `Maximum Likelihood Label Shif` (MLLS).
EMQ consists of using the well-known `Expectation Maximization algorithm` to iteratively update the posterior EMQ consists of using the well-known `Expectation Maximization algorithm` to iteratively update the posterior
probabilities generated by a probabilistic classifier and the class prevalence estimates obtained via probabilities generated by a probabilistic classifier and the class prevalence estimates obtained via
maximum-likelihood estimation, in a mutually recursive way, until convergence. maximum-likelihood estimation, in a mutually recursive way, until convergence.

View File

@ -1,18 +1,21 @@
import numpy as np import numpy as np
from joblib import Parallel, delayed
from sklearn.base import BaseEstimator from sklearn.base import BaseEstimator
from sklearn.metrics import confusion_matrix from sklearn.metrics import confusion_matrix
import quapy as qp import quapy as qp
import quapy.functional as F import quapy.functional as F
from quapy.functional import CompositionalTransformation, CLRtransformation, ILRtransformation
from quapy.method import _bayesian from quapy.method import _bayesian
from quapy.data import LabelledCollection from quapy.data import LabelledCollection
from quapy.method.aggregative import AggregativeQuantifier, AggregativeCrispQuantifier, AggregativeSoftQuantifier, BinaryAggregativeQuantifier from quapy.method.aggregative import AggregativeQuantifier, AggregativeCrispQuantifier, AggregativeSoftQuantifier, BinaryAggregativeQuantifier
from scipy.stats import chi2 from scipy.stats import chi2
from sklearn.utils import resample from sklearn.utils import resample
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from scipy.special import softmax, factorial from scipy.special import factorial
import copy import copy
from functools import lru_cache from functools import lru_cache
from tqdm import tqdm
""" """
This module provides implementation of different types of confidence regions, and the implementation of Bootstrap This module provides implementation of different types of confidence regions, and the implementation of Bootstrap
@ -79,6 +82,58 @@ class ConfidenceRegionABC(ABC):
proportion = np.clip(self.coverage(uniform_simplex), 0., 1.) proportion = np.clip(self.coverage(uniform_simplex), 0., 1.)
return proportion return proportion
@property
@abstractmethod
def samples(self):
""" Returns internal samples """
...
def __contains__(self, p):
"""
Overloads in operator, checks if `p` is contained in the region
:param p: array-like
:return: boolean
"""
p = np.asarray(p)
assert p.ndim==1, f'unexpected shape for point parameter'
return self.coverage(p)==1.
def closest_point_in_region(self, p, tol=1e-6, max_iter=30):
"""
Finds the closes point to p that belongs to the region. Assumes the region is convex.
:param p: array-like, the point
:param tol: float, error tolerance
:param max_iter: int, max number of iterations
:returns: array-like, the closes point to p in the segment between p and the center of the region, that
belongs to the region
"""
p = np.asarray(p, dtype=float)
# if p in region, returns p itself
if p in self:
return p.copy()
# center of the region
c = self.point_estimate()
# binary search in [0,1], interpolation parameter
# low=closest to p, high=closest to c
low, high = 0.0, 1.0
for _ in range(max_iter):
mid = 0.5 * (low + high)
x = p*(1-mid) + c*mid
if x in self:
high = mid
else:
low = mid
if high - low < tol:
break
in_boundary = p*(1-high) + c*high
return in_boundary
class WithConfidenceABC(ABC): class WithConfidenceABC(ABC):
""" """
@ -112,7 +167,7 @@ class WithConfidenceABC(ABC):
return self.predict_conf(instances=instances, confidence_level=confidence_level) return self.predict_conf(instances=instances, confidence_level=confidence_level)
@classmethod @classmethod
def construct_region(cls, prev_estims, confidence_level=0.95, method='intervals'): def construct_region(cls, prev_estims, confidence_level=0.95, method='intervals')->ConfidenceRegionABC:
""" """
Construct a confidence region given many prevalence estimations. Construct a confidence region given many prevalence estimations.
@ -147,7 +202,7 @@ def simplex_volume(n):
return 1 / factorial(n) return 1 / factorial(n)
def within_ellipse_prop(values, mean, prec_matrix, chi2_critical): def within_ellipse_prop__(values, mean, prec_matrix, chi2_critical):
""" """
Checks the proportion of values that belong to the ellipse with center `mean` and precision matrix `prec_matrix` Checks the proportion of values that belong to the ellipse with center `mean` and precision matrix `prec_matrix`
at a distance `chi2_critical`. at a distance `chi2_critical`.
@ -180,111 +235,125 @@ def within_ellipse_prop(values, mean, prec_matrix, chi2_critical):
return within_elipse * 1.0 return within_elipse * 1.0
class ConfidenceEllipseSimplex(ConfidenceRegionABC): def within_ellipse_prop(values, mean, prec_matrix, chi2_critical):
""" """
Instantiates a Confidence Ellipse in the probability simplex. Checks the proportion of values that belong to the ellipse with center `mean` and precision matrix `prec_matrix`
at a distance `chi2_critical`.
:param X: np.ndarray of shape (n_bootstrap_samples, n_classes) :param values: a np.ndarray of shape (n_dim,) or (n_values, n_dim,)
:param confidence_level: float, the confidence level (default 0.95) :param mean: a np.ndarray of shape (n_dim,) with the center of the ellipse
:param prec_matrix: a np.ndarray with the precision matrix (inverse of the
covariance matrix) of the ellipse. If this inverse cannot be computed
then None must be passed
:param chi2_critical: float, the chi2 critical value
:return: float in [0,1], the fraction of values that are contained in the ellipse
defined by the mean (center), the precision matrix (shape), and the chi2_critical value (distance).
If `values` is only one value, then either 0. (not contained) or 1. (contained) is returned.
"""
if prec_matrix is None:
return 0.
values = np.atleast_2d(values)
diff = values - mean
d_M_squared = np.sum(diff @ prec_matrix * diff, axis=-1)
within_ellipse = d_M_squared <= chi2_critical
if len(within_ellipse) == 1:
return float(within_ellipse[0])
else:
return float(np.mean(within_ellipse))
def closest_point_on_ellipsoid(p, mean, cov, chi2_critical, tol=1e-9, max_iter=100):
"""
Computes the closest point on the ellipsoid defined by:
(x - mean)^T cov^{-1} (x - mean) = chi2_critical
""" """
def __init__(self, X, confidence_level=0.95): p = np.asarray(p)
mean = np.asarray(mean)
Sigma = np.asarray(cov)
assert 0. < confidence_level < 1., f'{confidence_level=} must be in range(0,1)' # Precompute precision matrix
P = np.linalg.pinv(Sigma)
d = P.shape[0]
X = np.asarray(X) # Define v = p - mean
v = p - mean
self.mean_ = X.mean(axis=0) # If p is inside the ellipsoid, return p itself
self.cov_ = np.cov(X, rowvar=False, ddof=1) M_dist = v @ P @ v
if M_dist <= chi2_critical:
return p.copy()
try: # Function to compute x(lambda)
self.precision_matrix_ = np.linalg.inv(self.cov_) def x_lambda(lmbda):
except: A = np.eye(d) + lmbda * P
self.precision_matrix_ = None return mean + np.linalg.solve(A, v)
self.dim = X.shape[-1] # Function whose root we want: f(lambda) = Mahalanobis distance - chi2
self.ddof = self.dim - 1 def f(lmbda):
x = x_lambda(lmbda)
diff = x - mean
return diff @ P @ diff - chi2_critical
# critical chi-square value # Bisection search over lambda >= 0
self.confidence_level = confidence_level l_low, l_high = 0.0, 1.0
self.chi2_critical_ = chi2.ppf(confidence_level, df=self.ddof)
def point_estimate(self): # Increase high until f(high) < 0
""" while f(l_high) > 0:
Returns the point estimate, the center of the ellipse. l_high *= 2
if l_high > 1e12:
raise RuntimeError("Failed to bracket the root.")
:return: np.ndarray of shape (n_classes,) # Bisection
""" for _ in range(max_iter):
return self.mean_ l_mid = 0.5 * (l_low + l_high)
fm = f(l_mid)
if abs(fm) < tol:
break
if fm > 0:
l_low = l_mid
else:
l_high = l_mid
def coverage(self, true_value): l_opt = l_mid
""" return x_lambda(l_opt)
Checks whether a value, or a sets of values, are contained in the confidence region. The method computes the
fraction of these that are contained in the region, if more than one value is passed. If only one value is
passed, then it either returns 1.0 or 0.0, for indicating the value is in the region or not, respectively.
:param true_value: a np.ndarray of shape (n_classes,) or shape (n_values, n_classes,)
:return: float in [0,1]
"""
return within_ellipse_prop(true_value, self.mean_, self.precision_matrix_, self.chi2_critical_)
class ConfidenceEllipseCLR(ConfidenceRegionABC):
"""
Instantiates a Confidence Ellipse in the Centered-Log Ratio (CLR) space.
:param X: np.ndarray of shape (n_bootstrap_samples, n_classes)
:param confidence_level: float, the confidence level (default 0.95)
"""
def __init__(self, X, confidence_level=0.95):
X = np.asarray(X)
self.clr = CLRtransformation()
Z = self.clr(X)
self.mean_ = np.mean(X, axis=0)
self.conf_region_clr = ConfidenceEllipseSimplex(Z, confidence_level=confidence_level)
def point_estimate(self):
"""
Returns the point estimate, the center of the ellipse.
:return: np.ndarray of shape (n_classes,)
"""
# The inverse of the CLR does not coincide with the true mean, because the geometric mean
# requires smoothing the prevalence vectors and this affects the softmax (inverse);
# return self.clr.inverse(self.mean_) # <- does not coincide
return self.mean_
def coverage(self, true_value):
"""
Checks whether a value, or a sets of values, are contained in the confidence region. The method computes the
fraction of these that are contained in the region, if more than one value is passed. If only one value is
passed, then it either returns 1.0 or 0.0, for indicating the value is in the region or not, respectively.
:param true_value: a np.ndarray of shape (n_classes,) or shape (n_values, n_classes,)
:return: float in [0,1]
"""
transformed_values = self.clr(true_value)
return self.conf_region_clr.coverage(transformed_values)
class ConfidenceIntervals(ConfidenceRegionABC): class ConfidenceIntervals(ConfidenceRegionABC):
""" """
Instantiates a region based on (independent) Confidence Intervals. Instantiates a region based on (independent) Confidence Intervals.
:param X: np.ndarray of shape (n_bootstrap_samples, n_classes) :param samples: np.ndarray of shape (n_bootstrap_samples, n_classes)
:param confidence_level: float, the confidence level (default 0.95) :param confidence_level: float, the confidence level (default 0.95)
:param bonferroni_correction: bool (default False), if True, a Bonferroni correction
is applied to the significance level (`alpha`) before computing confidence intervals.
The correction consists of replacing `alpha` with `alpha/n_classes`. When
`n_classes=2` the correction is not applied because there is only one verification test
since the other class is constrained. This is not necessarily true for n_classes>2.
""" """
def __init__(self, X, confidence_level=0.95): def __init__(self, samples, confidence_level=0.95, bonferroni_correction=False):
assert 0 < confidence_level < 1, f'{confidence_level=} must be in range(0,1)' assert 0 < confidence_level < 1, f'{confidence_level=} must be in range(0,1)'
X = np.asarray(X) samples = np.asarray(samples)
self.means_ = X.mean(axis=0) self.means_ = samples.mean(axis=0)
alpha = 1-confidence_level alpha = 1-confidence_level
if bonferroni_correction:
n_classes = samples.shape[-1]
if n_classes>2:
alpha = alpha/n_classes
low_perc = (alpha/2.)*100 low_perc = (alpha/2.)*100
high_perc = (1-alpha/2.)*100 high_perc = (1-alpha/2.)*100
self.I_low, self.I_high = np.percentile(X, q=[low_perc, high_perc], axis=0) self.I_low, self.I_high = np.percentile(samples, q=[low_perc, high_perc], axis=0)
self._samples = samples
self.alpha = alpha
@property
def samples(self):
return self._samples
def point_estimate(self): def point_estimate(self):
""" """
@ -312,33 +381,171 @@ class ConfidenceIntervals(ConfidenceRegionABC):
def __repr__(self): def __repr__(self):
return '['+', '.join(f'({low:.4f}, {high:.4f})' for (low,high) in zip(self.I_low, self.I_high))+']' return '['+', '.join(f'({low:.4f}, {high:.4f})' for (low,high) in zip(self.I_low, self.I_high))+']'
@property
def n_dim(self):
return len(self.I_low)
class CLRtransformation: def winkler_scores(self, true_prev):
true_prev = np.asarray(true_prev)
assert true_prev.ndim == 1, 'unexpected dimensionality for true_prev'
assert len(true_prev)==self.n_dim, \
f'unexpected number of dimensions; found {true_prev.ndim}, expected {self.n_dim}'
def winkler_score(low, high, true_val, alpha):
amp = high-low
scale_cost = 1./alpha
cost = np.max([0, low-true_val], axis=0) + np.max([0, true_val-high], axis=0)
return amp + scale_cost*cost
return np.asarray(
[winkler_score(low_i, high_i, true_v, self.alpha)
for (low_i, high_i, true_v) in zip(self.I_low, self.I_high, true_prev)]
)
def mean_winkler_score(self, true_prev):
return np.mean(self.winkler_scores(true_prev))
class ConfidenceEllipseSimplex(ConfidenceRegionABC):
""" """
Centered log-ratio, from component analysis Instantiates a Confidence Ellipse in the probability simplex.
:param samples: np.ndarray of shape (n_bootstrap_samples, n_classes)
:param confidence_level: float, the confidence level (default 0.95)
""" """
def __call__(self, X, epsilon=1e-6):
"""
Applies the CLR function to X thus mapping the instances, which are contained in `\\mathcal{R}^{n}` but
actually lie on a `\\mathcal{R}^{n-1}` simplex, onto an unrestricted space in :math:`\\mathcal{R}^{n}`
:param X: np.ndarray of (n_instances, n_dimensions) to be transformed def __init__(self, samples, confidence_level=0.95):
:param epsilon: small float for prevalence smoothing
:return: np.ndarray of (n_instances, n_dimensions), the CLR-transformed points
"""
X = np.asarray(X)
X = qp.error.smooth(X, epsilon)
G = np.exp(np.mean(np.log(X), axis=-1, keepdims=True)) # geometric mean
return np.log(X / G)
def inverse(self, X): assert 0. < confidence_level < 1., f'{confidence_level=} must be in range(0,1)'
"""
Inverse function. However, clr.inverse(clr(X)) does not exactly coincide with X due to smoothing.
:param X: np.ndarray of (n_instances, n_dimensions) to be transformed samples = np.asarray(samples)
:return: np.ndarray of (n_instances, n_dimensions), the CLR-transformed points
self.mean_ = samples.mean(axis=0)
self.cov_ = np.cov(samples, rowvar=False, ddof=1)
try:
self.precision_matrix_ = np.linalg.pinv(self.cov_)
except:
self.precision_matrix_ = None
self.dim = samples.shape[-1]
self.ddof = self.dim - 1
# critical chi-square value
self.confidence_level = confidence_level
self.chi2_critical_ = chi2.ppf(confidence_level, df=self.ddof)
self._samples = samples
self.alpha = 1.-confidence_level
@property
def samples(self):
return self._samples
def point_estimate(self):
""" """
return softmax(X, axis=-1) Returns the point estimate, the center of the ellipse.
:return: np.ndarray of shape (n_classes,)
"""
return self.mean_
def coverage(self, true_value):
"""
Checks whether a value, or a sets of values, are contained in the confidence region. The method computes the
fraction of these that are contained in the region, if more than one value is passed. If only one value is
passed, then it either returns 1.0 or 0.0, for indicating the value is in the region or not, respectively.
:param true_value: a np.ndarray of shape (n_classes,) or shape (n_values, n_classes,)
:return: float in [0,1]
"""
return within_ellipse_prop(true_value, self.mean_, self.precision_matrix_, self.chi2_critical_)
def closest_point_in_region(self, p, tol=1e-6, max_iter=30):
return closest_point_on_ellipsoid(
p,
mean=self.mean_,
cov=self.cov_,
chi2_critical=self.chi2_critical_
)
class ConfidenceEllipseTransformed(ConfidenceRegionABC):
"""
Instantiates a Confidence Ellipse in a transformed space.
:param samples: np.ndarray of shape (n_bootstrap_samples, n_classes)
:param confidence_level: float, the confidence level (default 0.95)
"""
def __init__(self, samples, transformation: CompositionalTransformation, confidence_level=0.95):
samples = np.asarray(samples)
self.transformation = transformation
Z = self.transformation(samples)
# self.mean_ = np.mean(samples, axis=0)
self.mean_ = self.transformation.inverse(np.mean(Z, axis=0))
self.conf_region_z = ConfidenceEllipseSimplex(Z, confidence_level=confidence_level)
self._samples = samples
self.alpha = 1.-confidence_level
@property
def samples(self):
return self._samples
def point_estimate(self):
"""
Returns the point estimate, the center of the ellipse.
:return: np.ndarray of shape (n_classes,)
"""
# The inverse of the CLR does not coincide with the true mean, because the geometric mean
# requires smoothing the prevalence vectors and this affects the softmax (inverse);
# return self.clr.inverse(self.mean_) # <- does not coincide
return self.mean_
def coverage(self, true_value):
"""
Checks whether a value, or a sets of values, are contained in the confidence region. The method computes the
fraction of these that are contained in the region, if more than one value is passed. If only one value is
passed, then it either returns 1.0 or 0.0, for indicating the value is in the region or not, respectively.
:param true_value: a np.ndarray of shape (n_classes,) or shape (n_values, n_classes,)
:return: float in [0,1]
"""
transformed_values = self.transformation(true_value)
return self.conf_region_z.coverage(transformed_values)
def closest_point_in_region(self, p, tol=1e-6, max_iter=30):
p_prime = self.transformation(p)
b_prime = self.conf_region_z.closest_point_in_region(p_prime, tol=tol, max_iter=max_iter)
b = self.transformation.inverse(b_prime)
return b
class ConfidenceEllipseCLR(ConfidenceEllipseTransformed):
"""
Instantiates a Confidence Ellipse in the Centered-Log Ratio (CLR) space.
:param samples: np.ndarray of shape (n_bootstrap_samples, n_classes)
:param confidence_level: float, the confidence level (default 0.95)
"""
def __init__(self, samples, confidence_level=0.95):
super().__init__(samples, CLRtransformation(), confidence_level=confidence_level)
class ConfidenceEllipseILR(ConfidenceEllipseTransformed):
"""
Instantiates a Confidence Ellipse in the Isometric-Log Ratio (CLR) space.
:param samples: np.ndarray of shape (n_bootstrap_samples, n_classes)
:param confidence_level: float, the confidence level (default 0.95)
"""
def __init__(self, samples, confidence_level=0.95):
super().__init__(samples, ILRtransformation(), confidence_level=confidence_level)
class AggregativeBootstrap(WithConfidenceABC, AggregativeQuantifier): class AggregativeBootstrap(WithConfidenceABC, AggregativeQuantifier):
@ -378,7 +585,8 @@ class AggregativeBootstrap(WithConfidenceABC, AggregativeQuantifier):
n_test_samples=500, n_test_samples=500,
confidence_level=0.95, confidence_level=0.95,
region='intervals', region='intervals',
random_state=None): random_state=None,
verbose=False):
assert isinstance(quantifier, AggregativeQuantifier), \ assert isinstance(quantifier, AggregativeQuantifier), \
f'base quantifier does not seem to be an instance of {AggregativeQuantifier.__name__}' f'base quantifier does not seem to be an instance of {AggregativeQuantifier.__name__}'
@ -395,6 +603,7 @@ class AggregativeBootstrap(WithConfidenceABC, AggregativeQuantifier):
self.confidence_level = confidence_level self.confidence_level = confidence_level
self.region = region self.region = region
self.random_state = random_state self.random_state = random_state
self.verbose = verbose
def aggregation_fit(self, classif_predictions, labels): def aggregation_fit(self, classif_predictions, labels):
data = LabelledCollection(classif_predictions, labels, classes=self.classes_) data = LabelledCollection(classif_predictions, labels, classes=self.classes_)
@ -420,6 +629,24 @@ class AggregativeBootstrap(WithConfidenceABC, AggregativeQuantifier):
prev_mean, self.confidence = self.aggregate_conf(classif_predictions) prev_mean, self.confidence = self.aggregate_conf(classif_predictions)
return prev_mean return prev_mean
def aggregate_conf_sequential__(self, classif_predictions: np.ndarray, confidence_level=None):
if confidence_level is None:
confidence_level = self.confidence_level
n_samples = classif_predictions.shape[0]
prevs = []
with qp.util.temp_seed(self.random_state):
for quantifier in self.quantifiers:
for i in tqdm(range(self.n_test_samples), desc='resampling', total=self.n_test_samples, disable=not self.verbose):
sample_i = resample(classif_predictions, n_samples=n_samples)
prev_i = quantifier.aggregate(sample_i)
prevs.append(prev_i)
conf = WithConfidenceABC.construct_region(prevs, confidence_level, method=self.region)
prev_estim = conf.point_estimate()
return prev_estim, conf
def aggregate_conf(self, classif_predictions: np.ndarray, confidence_level=None): def aggregate_conf(self, classif_predictions: np.ndarray, confidence_level=None):
if confidence_level is None: if confidence_level is None:
confidence_level = self.confidence_level confidence_level = self.confidence_level
@ -428,10 +655,15 @@ class AggregativeBootstrap(WithConfidenceABC, AggregativeQuantifier):
prevs = [] prevs = []
with qp.util.temp_seed(self.random_state): with qp.util.temp_seed(self.random_state):
for quantifier in self.quantifiers: for quantifier in self.quantifiers:
for i in range(self.n_test_samples): results = Parallel(n_jobs=-1)(
sample_i = resample(classif_predictions, n_samples=n_samples) delayed(bootstrap_once)(i, classif_predictions, quantifier, n_samples)
prev_i = quantifier.aggregate(sample_i) for i in range(self.n_test_samples)
prevs.append(prev_i) )
prevs.extend(results)
# for i in tqdm(range(self.n_test_samples), desc='resampling', total=self.n_test_samples, disable=not self.verbose):
# sample_i = resample(classif_predictions, n_samples=n_samples)
# prev_i = quantifier.aggregate(sample_i)
# prevs.append(prev_i)
conf = WithConfidenceABC.construct_region(prevs, confidence_level, method=self.region) conf = WithConfidenceABC.construct_region(prevs, confidence_level, method=self.region)
prev_estim = conf.point_estimate() prev_estim = conf.point_estimate()
@ -456,6 +688,13 @@ class AggregativeBootstrap(WithConfidenceABC, AggregativeQuantifier):
return self.quantifier._classifier_method() return self.quantifier._classifier_method()
def bootstrap_once(i, classif_predictions, quantifier, n_samples):
idx = np.random.randint(0, len(classif_predictions), n_samples)
sample = classif_predictions[idx]
prev = quantifier.aggregate(sample)
return prev
class BayesianCC(AggregativeCrispQuantifier, WithConfidenceABC): class BayesianCC(AggregativeCrispQuantifier, WithConfidenceABC):
""" """
`Bayesian quantification <https://arxiv.org/abs/2302.09159>`_ method (by Albert Ziegler and Paweł Czyż), `Bayesian quantification <https://arxiv.org/abs/2302.09159>`_ method (by Albert Ziegler and Paweł Czyż),
@ -603,7 +842,7 @@ class PQ(AggregativeSoftQuantifier, BinaryAggregativeQuantifier):
classifier: BaseEstimator=None, classifier: BaseEstimator=None,
fit_classifier=True, fit_classifier=True,
val_split: int = 5, val_split: int = 5,
n_bins: int = 4, nbins: int = 4,
fixed_bins: bool = False, fixed_bins: bool = False,
num_warmup: int = 500, num_warmup: int = 500,
num_samples: int = 1_000, num_samples: int = 1_000,
@ -621,7 +860,8 @@ class PQ(AggregativeSoftQuantifier, BinaryAggregativeQuantifier):
"Run `$ pip install quapy[bayes]` to install them.") "Run `$ pip install quapy[bayes]` to install them.")
super().__init__(classifier, fit_classifier, val_split) super().__init__(classifier, fit_classifier, val_split)
self.n_bins = n_bins
self.nbins = nbins
self.fixed_bins = fixed_bins self.fixed_bins = fixed_bins
self.num_warmup = num_warmup self.num_warmup = num_warmup
self.num_samples = num_samples self.num_samples = num_samples
@ -636,10 +876,10 @@ class PQ(AggregativeSoftQuantifier, BinaryAggregativeQuantifier):
# Compute bin limits # Compute bin limits
if self.fixed_bins: if self.fixed_bins:
# Uniform bins in [0,1] # Uniform bins in [0,1]
self.bin_limits = np.linspace(0, 1, self.n_bins + 1) self.bin_limits = np.linspace(0, 1, self.nbins + 1)
else: else:
# Quantile bins # Quantile bins
self.bin_limits = np.quantile(y_pred, np.linspace(0, 1, self.n_bins + 1)) self.bin_limits = np.quantile(y_pred, np.linspace(0, 1, self.nbins + 1))
# Assign each prediction to a bin # Assign each prediction to a bin
bin_indices = np.digitize(y_pred, self.bin_limits[1:-1], right=True) bin_indices = np.digitize(y_pred, self.bin_limits[1:-1], right=True)
@ -649,14 +889,14 @@ class PQ(AggregativeSoftQuantifier, BinaryAggregativeQuantifier):
neg_mask = ~pos_mask neg_mask = ~pos_mask
# Count positives and negatives per bin # Count positives and negatives per bin
self.pos_hist = np.bincount(bin_indices[pos_mask], minlength=self.n_bins) self.pos_hist = np.bincount(bin_indices[pos_mask], minlength=self.nbins)
self.neg_hist = np.bincount(bin_indices[neg_mask], minlength=self.n_bins) self.neg_hist = np.bincount(bin_indices[neg_mask], minlength=self.nbins)
def aggregate(self, classif_predictions): def aggregate(self, classif_predictions):
Px_test = classif_predictions[:, self.pos_label] Px_test = classif_predictions[:, self.pos_label]
test_hist, _ = np.histogram(Px_test, bins=self.bin_limits) test_hist, _ = np.histogram(Px_test, bins=self.bin_limits)
prevs = _bayesian.pq_stan( prevs = _bayesian.pq_stan(
self.stan_code, self.n_bins, self.pos_hist, self.neg_hist, test_hist, self.stan_code, self.nbins, self.pos_hist, self.neg_hist, test_hist,
self.num_samples, self.num_warmup, self.stan_seed self.num_samples, self.num_warmup, self.stan_seed
).flatten() ).flatten()
self.prev_distribution = np.vstack([1-prevs, prevs]).T self.prev_distribution = np.vstack([1-prevs, prevs]).T

View File

@ -410,7 +410,7 @@ def group_params(param_grid: dict):
""" """
classifier_params, quantifier_params = {}, {} classifier_params, quantifier_params = {}, {}
for key, values in param_grid.items(): for key, values in param_grid.items():
if key.startswith('classifier__') or key == 'val_split': if 'classifier__' in key or key == 'val_split':
classifier_params[key] = values classifier_params[key] = values
else: else:
quantifier_params[key] = values quantifier_params[key] = values