trying with emcee with no success...

This commit is contained in:
Alejandro Moreo Fernandez 2025-12-08 12:14:43 +01:00
parent b2750ab6ea
commit 7342e57cda
6 changed files with 103 additions and 21 deletions

View File

@ -1,4 +1,6 @@
- Add other methods that natively provide uncertainty quantification methods?
- Explore neighbourhood in the CLR space instead than in the simplex!
- CI con Bonferroni
-
- MPIW (Mean Prediction Interval Width): is the average of the amplitudes (w/o aggregating coverage whatsoever)
- Implement Interval Score or Winkler Score
- Add (w-hat_w)**2/N measure
- analyze across shift

View File

@ -1,11 +1,16 @@
from numpy.ma.core import shape
from sklearn.base import BaseEstimator
import numpy as np
import quapy.util
from quapy.method._kdey import KDEBase
from quapy.method.confidence import WithConfidenceABC, ConfidenceRegionABC
from quapy.functional import CLRtransformation, ILRtransformation
from quapy.method.aggregative import AggregativeSoftQuantifier
from tqdm import tqdm
import quapy.functional as F
import emcee
class BayesianKDEy(AggregativeSoftQuantifier, KDEBase, WithConfidenceABC):
@ -72,7 +77,8 @@ class BayesianKDEy(AggregativeSoftQuantifier, KDEBase, WithConfidenceABC):
return self
def aggregate(self, classif_predictions):
self.prevalence_samples = self._bayesian_kde(classif_predictions, init=None, verbose=self.verbose)
# self.prevalence_samples = self._bayesian_kde(classif_predictions, init=None, verbose=self.verbose)
self.prevalence_samples = self._bayesian_emcee(classif_predictions)
return self.prevalence_samples.mean(axis=0)
def predict_conf(self, instances, confidence_level=None) -> (np.ndarray, ConfidenceRegionABC):
@ -178,6 +184,32 @@ class BayesianKDEy(AggregativeSoftQuantifier, KDEBase, WithConfidenceABC):
samples = np.asarray(samples[self.num_warmup:])
return samples
def _bayesian_emcee(self, X_probs):
ndim = X_probs.shape[1]
nwalkers = 32
f = CLRtransformation()
def log_likelihood(unconstrained, test_densities, epsilon=1e-10):
prev = f.inverse(unconstrained)
test_likelihoods = prev @ test_densities
test_loglikelihood = np.log(test_likelihoods + epsilon)
return np.sum(test_loglikelihood)
kdes = self.mix_densities
test_densities = np.asarray([self.pdf(kde_i, X_probs, self.kernel) for kde_i in kdes])
# p0 = np.random.normal(nwalkers, ndim)
p0 = F.uniform_prevalence_sampling(ndim, nwalkers)
p0 = f(p0)
sampler = emcee.EnsembleSampler(nwalkers, ndim, log_likelihood, args=[test_densities])
state = sampler.run_mcmc(p0, self.num_warmup, skip_initial_state_check=True)
sampler.reset()
sampler.run_mcmc(state, self.num_samples, skip_initial_state_check=True)
samples = sampler.get_chain(flat=True)
samples = f.inverse(samples)
return samples
def in_simplex(x):
return np.all(x >= 0) and np.isclose(x.sum(), 1)

View File

@ -7,6 +7,7 @@ import pandas as pd
from glob import glob
from pathlib import Path
import quapy as qp
from method.confidence import ConfidenceIntervals
from quapy.method.confidence import ConfidenceEllipseSimplex, ConfidenceEllipseCLR, ConfidenceEllipseILR
pd.set_option('display.max_columns', None)
@ -17,13 +18,17 @@ pd.set_option("display.precision", 4)
pd.set_option("display.float_format", "{:.4f}".format)
def compute_coverage_amplitude(region_constructor):
def compute_coverage_amplitude(region_constructor, **kwargs):
all_samples = results['samples']
all_true_prevs = results['true-prevs']
def process_one(samples, true_prevs):
ellipse = region_constructor(samples)
return ellipse.coverage(true_prevs), ellipse.montecarlo_proportion()
region = region_constructor(samples, **kwargs)
if isinstance(region, ConfidenceIntervals):
winkler = region.mean_winkler_score(true_prevs)
else:
winkler = None
return region.coverage(true_prevs), region.montecarlo_proportion(), winkler
out = Parallel(n_jobs=3)(
delayed(process_one)(samples, true_prevs)
@ -35,8 +40,8 @@ def compute_coverage_amplitude(region_constructor):
)
# unzip results
coverage, amplitude = zip(*out)
return list(coverage), list(amplitude)
coverage, amplitude, winkler = zip(*out)
return list(coverage), list(amplitude), list(winkler)
def update_pickle(report, pickle_path, updated_dict:dict):
@ -45,18 +50,20 @@ def update_pickle(report, pickle_path, updated_dict:dict):
pickle.dump(report, open(pickle_path, 'wb'), protocol=pickle.HIGHEST_PROTOCOL)
def update_pickle_with_region(report, file, conf_name, conf_region_class):
def update_pickle_with_region(report, file, conf_name, conf_region_class, **kwargs):
if f'coverage-{conf_name}' not in report:
cov, amp = compute_coverage_amplitude(conf_region_class)
covs, amps, winkler = compute_coverage_amplitude(conf_region_class, **kwargs)
update_fields = {
f'coverage-{conf_name}': cov,
f'amplitude-{conf_name}': amp,
f'coverage-{conf_name}': covs,
f'amplitude-{conf_name}': amps,
f'winkler-{conf_name}': winkler
}
update_pickle(report, file, update_fields)
for setup in ['binary', 'multiclass']:
for setup in ['multiclass', 'binary']:
path = f'./results/{setup}/*.pkl'
table = defaultdict(list)
for file in tqdm(glob(path), desc='processing results', total=len(glob(path))):
@ -68,13 +75,18 @@ for setup in ['binary', 'multiclass']:
table['method'].extend([method.replace('Bayesian','Ba').replace('Bootstrap', 'Bo')] * n_samples)
table['dataset'].extend([dataset] * n_samples)
table['ae'].extend(results['ae'])
table['c-CI'].extend(results['coverage'])
table['a-CI'].extend(results['amplitude'])
# table['c-CI'].extend(results['coverage'])
# table['a-CI'].extend(results['amplitude'])
update_pickle_with_region(report, file, conf_name='CI', conf_region_class=ConfidenceIntervals, bonferroni_correction=True)
update_pickle_with_region(report, file, conf_name='CE', conf_region_class=ConfidenceEllipseSimplex)
update_pickle_with_region(report, file, conf_name='CLR', conf_region_class=ConfidenceEllipseCLR)
update_pickle_with_region(report, file, conf_name='ILR', conf_region_class=ConfidenceEllipseILR)
table['c-CI'].extend(report['coverage-CI'])
table['a-CI'].extend(report['amplitude-CI'])
table['w-CI'].extend(report['winkler-CI'])
table['c-CE'].extend(report['coverage-CE'])
table['a-CE'].extend(report['amplitude-CE'])

View File

@ -47,8 +47,10 @@ def methods():
# yield 'BootstrapKDEy', KDEyML(LR()), kdey_hyper, lambda hyper: AggregativeBootstrap(KDEyML(LR(), **hyper), n_test_samples=1000, random_state=0, verbose=True),
# yield 'BayesianKDEy', KDEyML(LR()), kdey_hyper, lambda hyper: BayesianKDEy(mcmc_seed=0, **hyper),
for T in [10, 20, 50, 100., 500]:
yield f'BaKDE-CLR-T{T}', KDEyCLR(LR()), kdey_hyper_clr, lambda hyper: BayesianKDEy(kernel='aitchison', explore='ilr', mcmc_seed=0, temperature=T, num_warmup=3000, num_samples=1000, step_size=.1, **hyper),
# for T in [10, 20, 50, 100., 500]:
# yield f'BaKDE-CLR-T{T}', KDEyCLR(LR()), kdey_hyper_clr, lambda hyper: BayesianKDEy(kernel='aitchison', explore='ilr', mcmc_seed=0, temperature=T, num_warmup=3000, num_samples=1000, step_size=.1, **hyper),
yield f'BaKDE-emcee', KDEyML(LR()), kdey_hyper, lambda hyper: BayesianKDEy(mcmc_seed=0, num_warmup=100, num_samples=100, step_size=.1, **hyper),
@ -65,7 +67,9 @@ if __name__ == '__main__':
}
setup = multiclass
data_name = 'isolet'
# data_name = 'isolet'
# data_name = 'cmc'
data_name = 'abalone'
qp.environ['SAMPLE_SIZE'] = setup['sample_size']
print(f'dataset={data_name}')

View File

@ -697,7 +697,7 @@ class CLRtransformation(CompositionalTransformation):
:param Z: np.ndarray of (n_instances, n_dimensions) to be transformed
:return: np.ndarray of (n_instances, n_dimensions), the CLR-transformed points
"""
return softmax(Z, axis=-1)
return scipy.special.softmax(Z, axis=-1)
class ILRtransformation(CompositionalTransformation):

View File

@ -345,6 +345,11 @@ class ConfidenceIntervals(ConfidenceRegionABC):
:param samples: np.ndarray of shape (n_bootstrap_samples, n_classes)
:param confidence_level: float, the confidence level (default 0.95)
:param bonferroni_correction: bool (default False), if True, a Bonferroni correction
is applied to the significance level (`alpha`) before computing confidence intervals.
The correction consists of replacing `alpha` with `alpha/n_classes`. When
`n_classes=2` the correction is not applied because there is only one verification test
since the other class is constrained. This is not necessarily true for n_classes>2.
"""
def __init__(self, samples, confidence_level=0.95, bonferroni_correction=False):
assert 0 < confidence_level < 1, f'{confidence_level=} must be in range(0,1)'
@ -355,11 +360,13 @@ class ConfidenceIntervals(ConfidenceRegionABC):
alpha = 1-confidence_level
if bonferroni_correction:
n_classes = samples.shape[-1]
alpha = alpha/n_classes
if n_classes>2:
alpha = alpha/n_classes
low_perc = (alpha/2.)*100
high_perc = (1-alpha/2.)*100
self.I_low, self.I_high = np.percentile(samples, q=[low_perc, high_perc], axis=0)
self._samples = samples
self.alpha = alpha
@property
def samples(self):
@ -391,6 +398,31 @@ class ConfidenceIntervals(ConfidenceRegionABC):
def __repr__(self):
return '['+', '.join(f'({low:.4f}, {high:.4f})' for (low,high) in zip(self.I_low, self.I_high))+']'
@property
def n_dim(self):
return len(self.I_low)
def winkler_scores(self, true_prev):
true_prev = np.asarray(true_prev)
assert true_prev.ndim == 1, 'unexpected dimensionality for true_prev'
assert len(true_prev)==self.n_dim, \
f'unexpected number of dimensions; found {true_prev.ndim}, expected {self.n_dim}'
def winkler_score(low, high, true_val, alpha):
amp = high-low
scale_cost = 1./alpha
cost = np.max([0, low-true_val], axis=0) + np.max([0, true_val-high], axis=0)
return amp + scale_cost*cost
return np.asarray(
[winkler_score(low_i, high_i, true_v, self.alpha)
for (low_i, high_i, true_v) in zip(self.I_low, self.I_high, true_prev)]
)
def mean_winkler_score(self, true_prev):
return np.mean(self.winkler_scores(true_prev))
class AggregativeBootstrap(WithConfidenceABC, AggregativeQuantifier):