Compare commits

...

25 Commits

Author SHA1 Message Date
Alejandro Moreo Fernandez 23608f2038 adding exploration in CLR 2025-12-04 10:24:02 +01:00
Alejandro Moreo Fernandez 881e1033f1 lauching experiments 2025-11-26 15:19:08 +01:00
Alejandro Moreo Fernandez a3f0008a2a Merge branch 'devel' into kdeplus 2025-11-18 10:13:56 +01:00
Alejandro Moreo Fernandez 6db659e3c4 unifying n_bins in PQ and DMy 2025-11-18 10:13:34 +01:00
Alejandro Moreo Fernandez 277a2e617f added aggregative bootstrap 2025-11-18 10:12:41 +01:00
Alejandro Moreo Fernandez be465712cd Merge branch 'devel' into kdeplus 2025-11-17 17:55:15 +01:00
Alejandro Moreo Fernandez db49cd31be Merge branch 'devel' of github.com:HLT-ISTI/QuaPy into devel 2025-11-17 17:54:09 +01:00
Alejandro Moreo Fernandez 9da4fd57db added property samples to confidence regions 2025-11-17 17:53:56 +01:00
Alejandro Moreo Fernandez fdc0560ccc no optim classifier 2025-11-17 17:47:14 +01:00
Alejandro Moreo Fernandez fd62e73d2d adding bootstrap variants 2025-11-17 12:37:12 +01:00
Alejandro Moreo Fernandez 12b431ef4b scripting experiments binary and multiclass 2025-11-17 12:22:40 +01:00
Alejandro Moreo Fernandez 2f83a520c7 drafting experiments 2025-11-16 12:42:26 +01:00
Alejandro Moreo Fernandez 4255098ba7 creating class BayesianKDEy 2025-11-15 21:47:30 +01:00
Alejandro Moreo Fernandez 8a8988d2de Merge branch 'kdeplus' of gitea-s2i2s.isti.cnr.it:moreo/QuaPy into kdeplus 2025-11-15 19:01:42 +01:00
Alejandro Moreo Fernandez f324ca049e Merge branch 'devel' into kdeplus 2025-11-15 18:57:56 +01:00
Alejandro Moreo Fernandez 5f6a151263 Merge branch 'pglez82-precisequant' into devel 2025-11-15 18:55:38 +01:00
Alejandro Moreo Fernandez 047cb9e533 merged 2025-11-15 18:54:04 +01:00
Alejandro Moreo Fernandez 6388d9b549 merged 2025-11-15 18:03:06 +01:00
Alejandro Moreo Fernandez d9cf6cc11d index in labelled collection from versions restored 2025-11-15 17:56:37 +01:00
pglez82 46e7246f3a adding stan file to setup 2025-11-15 17:04:55 +01:00
pglez82 c6492a0f20 fixing import 2025-11-15 17:04:44 +01:00
pglez82 e4c07e1835 changing the way the file is loaded 2025-11-15 16:51:48 +01:00
Alejandro Moreo Fernandez c2044cb200 Merge branch 'precisequant' of github.com:pglez82/QuaPy into pglez82-precisequant 2025-11-15 15:03:20 +01:00
pglez82 3268e9fada PQ (precise quantifier) 2025-11-14 18:35:40 +01:00
Alejandro Moreo Fernandez a868d2d561 import fix 2025-11-14 16:10:17 +01:00
17 changed files with 941 additions and 164 deletions

3
BayesianKDEy/TODO.txt Normal file
View File

@ -0,0 +1,3 @@
- Add other methods that natively provide uncertainty quantification methods?
- Explore neighbourhood in the CLR space instead than in the simplex!
-

View File

@ -0,0 +1,167 @@
from sklearn.base import BaseEstimator
import numpy as np
from quapy.method._kdey import KDEBase
from quapy.method.confidence import WithConfidenceABC, ConfidenceRegionABC, CLRtransformation
from quapy.method.aggregative import AggregativeSoftQuantifier
from tqdm import tqdm
import quapy.functional as F
class BayesianKDEy(AggregativeSoftQuantifier, KDEBase, WithConfidenceABC):
"""
`Bayesian version of KDEy.
:param classifier: a scikit-learn's BaseEstimator, or None, in which case the classifier is taken to be
the one indicated in `qp.environ['DEFAULT_CLS']`
:param val_split: specifies the data used for generating classifier predictions. This specification
can be made as float in (0, 1) indicating the proportion of stratified held-out validation set to
be extracted from the training set; or as an integer (default 5), indicating that the predictions
are to be generated in a `k`-fold cross-validation manner (with this integer indicating the value
for `k`); or as a tuple `(X,y)` defining the specific set of data to use for validation. Set to
None when the method does not require any validation data, in order to avoid that some portion of
the training data be wasted.
:param num_warmup: number of warmup iterations for the MCMC sampler (default 500)
:param num_samples: number of samples to draw from the posterior (default 1000)
:param mcmc_seed: random seed for the MCMC sampler (default 0)
:param confidence_level: float in [0,1] to construct a confidence region around the point estimate (default 0.95)
:param region: string, set to `intervals` for constructing confidence intervals (default), or to
`ellipse` for constructing an ellipse in the probability simplex, or to `ellipse-clr` for
constructing an ellipse in the Centered-Log Ratio (CLR) unconstrained space.
:param verbose: bool, whether to display progress bar
"""
def __init__(self,
classifier: BaseEstimator=None,
fit_classifier=True,
val_split: int = 5,
kernel='gaussian',
bandwidth=0.1,
num_warmup: int = 500,
num_samples: int = 1_000,
mcmc_seed: int = 0,
confidence_level: float = 0.95,
region: str = 'intervals',
explore_CLR=False,
step_size=0.05,
verbose: bool = False):
if num_warmup <= 0:
raise ValueError(f'parameter {num_warmup=} must be a positive integer')
if num_samples <= 0:
raise ValueError(f'parameter {num_samples=} must be a positive integer')
super().__init__(classifier, fit_classifier, val_split)
self.bandwidth = KDEBase._check_bandwidth(bandwidth, kernel)
self.kernel = self._check_kernel(kernel)
self.num_warmup = num_warmup
self.num_samples = num_samples
self.mcmc_seed = mcmc_seed
self.confidence_level = confidence_level
self.region = region
self.explore_CLR = explore_CLR
self.step_size = step_size
self.verbose = verbose
def aggregation_fit(self, classif_predictions, labels):
self.mix_densities = self.get_mixture_components(classif_predictions, labels, self.classes_, self.bandwidth, self.kernel)
return self
def aggregate(self, classif_predictions):
self.prevalence_samples = self._bayesian_kde(classif_predictions, init=None, verbose=self.verbose)
return self.prevalence_samples.mean(axis=0)
def predict_conf(self, instances, confidence_level=None) -> (np.ndarray, ConfidenceRegionABC):
if confidence_level is None:
confidence_level = self.confidence_level
classif_predictions = self.classify(instances)
point_estimate = self.aggregate(classif_predictions)
samples = self.prevalence_samples # available after calling "aggregate" function
region = WithConfidenceABC.construct_region(samples, confidence_level=confidence_level, method=self.region)
return point_estimate, region
def _bayesian_kde(self, X_probs, init=None, verbose=False):
"""
Bayes:
P(prev|data) = P(data|prev) P(prev) / P(data)
i.e.,
posterior = likelihood * prior / evidence
we assume the likelihood be:
prev @ [kde_i_likelihood(data) 1..i..n]
prior be uniform in simplex
"""
rng = np.random.default_rng(self.mcmc_seed)
kdes = self.mix_densities
test_densities = np.asarray([self.pdf(kde_i, X_probs, self.kernel) for kde_i in kdes])
def log_likelihood(prev, epsilon=1e-10):
test_likelihoods = prev @ test_densities
test_loglikelihood = np.log(test_likelihoods + epsilon)
return np.sum(test_loglikelihood)
# def log_prior(prev):
# todo: adapt to arbitrary prior knowledge (e.g., something around training prevalence)
# return 1/np.sum((prev-init)**2) # it is not 1 but we assume uniform, son anyway is an useless constant
# def log_prior(prev, alpha_scale=1000):
# alpha = np.array(init) * alpha_scale
# return dirichlet.logpdf(prev, alpha)
def log_prior(prev):
return 0
def sample_neighbour(prev, step_size):
# random-walk Metropolis-Hastings
d = len(prev)
if not self.explore_CLR:
dir_noise = rng.normal(scale=step_size/np.sqrt(d), size=d)
neighbour = F.normalize_prevalence(prev + dir_noise, method='mapsimplex')
else:
clr = CLRtransformation()
clr_point = clr(prev)
dir_noise = rng.normal(scale=step_size, size=d)
clr_neighbour = clr_point+dir_noise
neighbour = clr.inverse(clr_neighbour)
assert in_simplex(neighbour), 'wrong CLR transformation'
return neighbour
n_classes = X_probs.shape[1]
current_prev = F.uniform_prevalence(n_classes) if init is None else init
current_likelihood = log_likelihood(current_prev) + log_prior(current_prev)
# Metropolis-Hastings with adaptive rate
step_size = self.step_size
target_acceptance = 0.3
adapt_rate = 0.05
acceptance_history = []
samples = []
total_steps = self.num_samples + self.num_warmup
for i in tqdm(range(total_steps), total=total_steps, disable=not verbose):
proposed_prev = sample_neighbour(current_prev, step_size)
# probability of acceptance
proposed_likelihood = log_likelihood(proposed_prev) + log_prior(proposed_prev)
acceptance = proposed_likelihood - current_likelihood
# decide acceptance
accepted = np.log(rng.random()) < acceptance
if accepted:
current_prev = proposed_prev
current_likelihood = proposed_likelihood
samples.append(current_prev)
acceptance_history.append(1. if accepted else 0.)
if i < self.num_warmup and i%10==0 and len(acceptance_history)>=100:
recent_accept_rate = np.mean(acceptance_history[-100:])
step_size *= np.exp(adapt_rate * (recent_accept_rate - target_acceptance))
# step_size = float(np.clip(step_size, min_step, max_step))
print(f'acceptance-rate={recent_accept_rate*100:.3f}%, step-size={step_size:.5f}')
# remove "warmup" initial iterations
samples = np.asarray(samples[self.num_warmup:])
return samples
def in_simplex(x):
return np.all(x >= 0) and np.isclose(x.sum(), 1)

View File

@ -1,127 +0,0 @@
from sklearn.linear_model import LogisticRegression
import quapy as qp
from BayesianKDEy.plot_simplex import plot_prev_points, plot_prev_points_matplot
from method.confidence import ConfidenceIntervals
from quapy.functional import strprev
from quapy.method.aggregative import KDEyML
from quapy.protocol import UPP
import quapy.functional as F
import numpy as np
from tqdm import tqdm
from scipy.stats import dirichlet
def bayesian(kdey, data, probabilistic_classifier, init=None, MAX_ITER=100_000, warmup=3_000):
"""
Bayes:
P(prev|data) = P(data|prev) P(prev) / P(data)
i.e.,
posterior = likelihood * prior / evidence
we assume the likelihood be:
prev @ [kde_i_likelihood(data) 1..i..n]
prior be uniform in simplex
"""
def pdf(kde, X):
# todo: remove exp, since we are then doing the log every time? (not sure)
return np.exp(kde.score_samples(X))
X = probabilistic_classifier.predict_proba(data)
kdes = kdey.mix_densities
test_densities = np.asarray([pdf(kde_i, X) for kde_i in kdes])
def log_likelihood(prev, epsilon=1e-10):
test_likelihoods = prev @ test_densities
test_loglikelihood = np.log(test_likelihoods + epsilon)
return np.sum(test_loglikelihood)
# def log_prior(prev):
# todo: adapt to arbitrary prior knowledge (e.g., something around training prevalence)
# return 1/np.sum((prev-init)**2) # it is not 1 but we assume uniform, son anyway is an useless constant
# def log_prior(prev, alpha_scale=1000):
# alpha = np.array(init) * alpha_scale
# return dirichlet.logpdf(prev, alpha)
def log_prior(prev):
return 0
def sample_neighbour(prev, step_size=0.05):
# random-walk Metropolis-Hastings
dir_noise = np.random.normal(scale=step_size, size=len(prev))
neighbour = F.normalize_prevalence(prev + dir_noise, method='mapsimplex')
return neighbour
n_classes = len(probabilistic_classifier.classes_)
current_prev = F.uniform_prevalence(n_classes) if init is None else init
current_likelihood = log_likelihood(current_prev) + log_prior(current_prev)
# Metropolis-Hastings with adaptive rate
step_size = 0.05
target_acceptance = 0.3
adapt_rate = 0.01
acceptance_history = []
samples = []
for i in tqdm(range(MAX_ITER), total=MAX_ITER):
proposed_prev = sample_neighbour(current_prev, step_size)
# probability of acceptance
proposed_likelihood = log_likelihood(proposed_prev) + log_prior(proposed_prev)
acceptance = proposed_likelihood - current_likelihood
# decide acceptance
accepted = np.log(np.random.rand()) < acceptance
if accepted:
current_prev = proposed_prev
current_likelihood = proposed_likelihood
samples.append(current_prev)
acceptance_history.append(1. if accepted else 0.)
if i < warmup and i%10==0 and len(acceptance_history)>=100:
recent_accept_rate = np.mean(acceptance_history[-100:])
# print(f'{i=} recent_accept_rate={recent_accept_rate:.4f} (current step_size={step_size:.4f})')
step_size *= np.exp(adapt_rate * (recent_accept_rate - target_acceptance))
# remove "warmup" initial iterations
samples = np.asarray(samples[warmup:])
return samples
if __name__ == '__main__':
qp.environ["SAMPLE_SIZE"] = 500
cls = LogisticRegression()
kdey = KDEyML(cls)
train, test = qp.datasets.fetch_UCIMulticlassDataset('academic-success', standardize=True).train_test
with qp.util.temp_seed(2):
print('fitting KDEy')
kdey.fit(*train.Xy)
# shifted = test.sampling(500, *[0.7, 0.1, 0.2])
# shifted = test.sampling(500, *test.prevalence()[::-1])
shifted = test.sampling(500, *F.uniform_prevalence_sampling(train.n_classes))
prev_hat = kdey.predict(shifted.X)
mae = qp.error.mae(shifted.prevalence(), prev_hat)
print(f'true_prev={strprev(shifted.prevalence())}')
print(f'prev_hat={strprev(prev_hat)}, {mae=:.4f}')
h = kdey.classifier
samples = bayesian(kdey, shifted.X, h, init=None, MAX_ITER=5_000, warmup=3_000)
conf_interval = ConfidenceIntervals(samples, confidence_level=0.95)
mae = qp.error.mae(shifted.prevalence(), conf_interval.point_estimate())
print(f'mean posterior {strprev(samples.mean(axis=0))}, {mae=:.4f}')
print(f'CI={conf_interval}')
print(f'\tcontains true={conf_interval.coverage(true_value=shifted.prevalence())==1}')
print(f'\tamplitude={conf_interval.montecarlo_proportion(50_000)*100.:.20f}%')
if train.n_classes == 3:
plot_prev_points(samples, true_prev=shifted.prevalence(), point_estim=prev_hat, train_prev=train.prevalence())
# plot_prev_points_matplot(samples)
# report = qp.evaluation.evaluation_report(kdey, protocol=UPP(test), verbose=True)
# print(report.mean(numeric_only=True))

View File

@ -0,0 +1,186 @@
import os
import warnings
from os.path import join
from pathlib import Path
from sklearn.calibration import CalibratedClassifierCV
from sklearn.linear_model import LogisticRegression as LR
from sklearn.model_selection import GridSearchCV, StratifiedKFold
from copy import deepcopy as cp
import quapy as qp
from BayesianKDEy._bayeisan_kdey import BayesianKDEy
from build.lib.quapy.data import LabelledCollection
from quapy.method.aggregative import DistributionMatchingY as DMy, AggregativeQuantifier
from quapy.method.base import BinaryQuantifier, BaseQuantifier
from quapy.model_selection import GridSearchQ
from quapy.data import Dataset
# from BayesianKDEy.plot_simplex import plot_prev_points, plot_prev_points_matplot
from quapy.method.confidence import ConfidenceIntervals, BayesianCC, PQ, WithConfidenceABC, AggregativeBootstrap
from quapy.functional import strprev
from quapy.method.aggregative import KDEyML, ACC
from quapy.protocol import UPP
import quapy.functional as F
import numpy as np
from tqdm import tqdm
from scipy.stats import dirichlet
from collections import defaultdict
from time import time
from sklearn.base import clone, BaseEstimator
class KDEyCLR(KDEyML):
def __init__(self, classifier: BaseEstimator=None, fit_classifier=True, val_split=5, bandwidth=1., random_state=None):
super().__init__(
classifier=classifier, fit_classifier=fit_classifier, val_split=val_split, bandwidth=bandwidth,
random_state=random_state, kernel='aitchison'
)
def methods__():
acc_hyper = {}
hdy_hyper = {'nbins': [3,4,5,8,16,32]}
kdey_hyper = {'bandwidth': [0.001, 0.005, 0.01, 0.05, 0.1, 0.2], 'classifier__C':[1]}
wrap_hyper = lambda dic: {f'quantifier__{k}':v for k,v in dic.items()}
# yield 'BootstrapACC', AggregativeBootstrap(ACC(LR()), n_test_samples=1000, random_state=0), wrap_hyper(acc_hyper)
# yield 'BootstrapHDy', AggregativeBootstrap(DMy(LR(), divergence='HD'), n_test_samples=1000, random_state=0), wrap_hyper(hdy_hyper)
yield 'BootstrapKDEy', AggregativeBootstrap(KDEyML(LR()), n_test_samples=1000, random_state=0), wrap_hyper(kdey_hyper)
# yield 'BayesianACC', BayesianCC(LR(), mcmc_seed=0), acc_hyper
# yield 'BayesianHDy', PQ(LR(), stan_seed=0), hdy_hyper
# yield 'BayesianKDEy', BayesianKDEy(LR(), mcmc_seed=0), kdey_hyper
def methods():
"""
Returns a tuple (name, quantifier, hyperparams, bayesian/bootstrap_constructor), where:
- name: is a str representing the name of the method (e.g., 'BayesianKDEy')
- quantifier: is the base model (e.g., KDEyML())
- hyperparams: is a dictionary for the quantifier (e.g., {'bandwidth': [0.001, 0.005, 0.01, 0.05, 0.1, 0.2]})
- bayesian/bootstrap_constructor: is a function that instantiates the bayesian o bootstrap method with the
quantifier with optimized hyperparameters
"""
acc_hyper = {}
hdy_hyper = {'nbins': [3,4,5,8,16,32]}
kdey_hyper = {'bandwidth': [0.001, 0.005, 0.01, 0.05, 0.1, 0.2]}
kdey_hyper_clr = {'bandwidth': [0.05, 0.1, 0.5, 1., 2., 5.]}
yield 'BootstrapACC', ACC(LR()), acc_hyper, lambda hyper: AggregativeBootstrap(ACC(LR()), n_test_samples=1000, random_state=0),
yield 'BayesianACC', ACC(LR()), acc_hyper, lambda hyper: BayesianCC(LR(), mcmc_seed=0)
yield 'BootstrapHDy', DMy(LR()), hdy_hyper, lambda hyper: AggregativeBootstrap(DMy(LR(), **hyper), n_test_samples=1000, random_state=0),
yield 'BootstrapKDEy', KDEyML(LR()), kdey_hyper, lambda hyper: AggregativeBootstrap(KDEyML(LR(), **hyper), n_test_samples=1000, random_state=0, verbose=True),
yield 'BayesianKDEy', KDEyML(LR()), kdey_hyper, lambda hyper: BayesianKDEy(mcmc_seed=0, **hyper),
yield 'BayesianKDEy*', KDEyCLR(LR()), kdey_hyper_clr, lambda hyper: BayesianKDEy(kernel='aitchison', mcmc_seed=0, **hyper),
def model_selection(train: LabelledCollection, point_quantifier: AggregativeQuantifier, grid: dict):
with qp.util.temp_seed(0):
print(f'performing model selection for {point_quantifier.__class__.__name__} with grid {grid}')
# model selection
if len(grid)>0:
train, val = train.split_stratified(train_prop=0.6, random_state=0)
mod_sel = GridSearchQ(
model=point_quantifier,
param_grid=grid,
protocol=qp.protocol.UPP(val, repeats=250, random_state=0),
refit=False,
n_jobs=-1,
verbose=True
).fit(*train.Xy)
best_params = mod_sel.best_params_
else:
best_params = {}
return best_params
def experiment(dataset: Dataset, point_quantifier: AggregativeQuantifier, method_name:str, grid: dict, withconf_constructor, hyper_choice_path: Path):
with qp.util.temp_seed(0):
training, test = dataset.train_test
# model selection
best_hyperparams = qp.util.pickled_resource(
hyper_choice_path, model_selection, training, cp(point_quantifier), grid
)
t_init = time()
withconf_quantifier = withconf_constructor(best_hyperparams).fit(*training.Xy)
tr_time = time() - t_init
# test
train_prevalence = training.prevalence()
results = defaultdict(list)
test_generator = UPP(test, repeats=100, random_state=0)
for i, (sample_X, true_prevalence) in tqdm(enumerate(test_generator()), total=test_generator.total(), desc=f'{method_name} predictions'):
t_init = time()
point_estimate, region = withconf_quantifier.predict_conf(sample_X)
ttime = time()-t_init
results['true-prevs'].append(true_prevalence)
results['point-estim'].append(point_estimate)
results['shift'].append(qp.error.ae(true_prevalence, train_prevalence))
results['ae'].append(qp.error.ae(prevs_true=true_prevalence, prevs_hat=point_estimate))
results['rae'].append(qp.error.rae(prevs_true=true_prevalence, prevs_hat=point_estimate))
results['coverage'].append(region.coverage(true_prevalence))
results['amplitude'].append(region.montecarlo_proportion(n_trials=50_000))
results['test-time'].append(ttime)
results['samples'].append(region.samples)
report = {
'optim_hyper': best_hyperparams,
'train_time': tr_time,
'train-prev': train_prevalence,
'results': {k:np.asarray(v) for k,v in results.items()}
}
return report
def experiment_path(dir:Path, dataset_name:str, method_name:str):
os.makedirs(dir, exist_ok=True)
return dir/f'{dataset_name}__{method_name}.pkl'
if __name__ == '__main__':
binary = {
'datasets': qp.datasets.UCI_BINARY_DATASETS,
'fetch_fn': qp.datasets.fetch_UCIBinaryDataset,
'sample_size': 500
}
multiclass = {
'datasets': qp.datasets.UCI_MULTICLASS_DATASETS,
'fetch_fn': qp.datasets.fetch_UCIMulticlassDataset,
'sample_size': 1000
}
result_dir = Path('./results')
for setup in [binary, multiclass]: # [binary, multiclass]:
qp.environ['SAMPLE_SIZE'] = setup['sample_size']
for data_name in setup['datasets']:
print(f'dataset={data_name}')
# if data_name=='breast-cancer' or data_name.startswith("cmc") or data_name.startswith("ctg"):
# print(f'skipping dataset: {data_name}')
# continue
data = setup['fetch_fn'](data_name)
is_binary = data.n_classes==2
result_subdir = result_dir / ('binary' if is_binary else 'multiclass')
hyper_subdir = result_dir / 'hyperparams' / ('binary' if is_binary else 'multiclass')
for method_name, method, hyper_params, withconf_constructor in methods():
if isinstance(method, BinaryQuantifier) and not is_binary:
continue
result_path = experiment_path(result_subdir, data_name, method_name)
hyper_path = experiment_path(hyper_subdir, data_name, method.__class__.__name__)
report = qp.util.pickled_resource(
result_path, experiment, data, method, method_name, hyper_params, withconf_constructor, hyper_path
)
print(f'dataset={data_name}, '
f'method={method_name}: '
f'mae={report["results"]["ae"].mean():.3f}, '
f'coverage={report["results"]["coverage"].mean():.5f}, '
f'amplitude={report["results"]["amplitude"].mean():.5f}, ')

View File

@ -0,0 +1,112 @@
import pickle
from collections import defaultdict
from joblib import Parallel, delayed
from tqdm import tqdm
import pandas as pd
from glob import glob
from pathlib import Path
import quapy as qp
from quapy.method.confidence import ConfidenceEllipseSimplex, ConfidenceEllipseCLR
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 2000)
pd.set_option('display.max_rows', None)
pd.set_option("display.expand_frame_repr", False)
pd.set_option("display.precision", 4)
pd.set_option("display.float_format", "{:.4f}".format)
def compute_coverage_amplitude(region_constructor):
all_samples = results['samples']
all_true_prevs = results['true-prevs']
def process_one(samples, true_prevs):
ellipse = region_constructor(samples)
return ellipse.coverage(true_prevs), ellipse.montecarlo_proportion()
out = Parallel(n_jobs=3)(
delayed(process_one)(samples, true_prevs)
for samples, true_prevs in tqdm(
zip(all_samples, all_true_prevs),
total=len(all_samples),
desc='constructing ellipses'
)
)
# unzip results
coverage, amplitude = zip(*out)
return list(coverage), list(amplitude)
def update_pickle(report, pickle_path, updated_dict:dict):
for k,v in updated_dict.items():
report[k]=v
pickle.dump(report, open(pickle_path, 'wb'), protocol=pickle.HIGHEST_PROTOCOL)
for setup in ['binary', 'multiclass']:
path = f'./results/{setup}/*.pkl'
table = defaultdict(list)
for file in tqdm(glob(path), desc='processing results', total=len(glob(path))):
file = Path(file)
dataset, method = file.name.replace('.pkl', '').split('__')
report = pickle.load(open(file, 'rb'))
results = report['results']
n_samples = len(results['ae'])
table['method'].extend([method.replace('Bayesian','Ba').replace('Bootstrap', 'Bo')] * n_samples)
table['dataset'].extend([dataset] * n_samples)
table['ae'].extend(results['ae'])
table['c-CI'].extend(results['coverage'])
table['a-CI'].extend(results['amplitude'])
if 'coverage-CE' not in report:
covCE, ampCE = compute_coverage_amplitude(ConfidenceEllipseSimplex)
covCLR, ampCLR = compute_coverage_amplitude(ConfidenceEllipseCLR)
update_fields = {
'coverage-CE': covCE,
'amplitude-CE': ampCE,
'coverage-CLR': covCLR,
'amplitude-CLR': ampCLR
}
update_pickle(report, file, update_fields)
table['c-CE'].extend(report['coverage-CE'])
table['a-CE'].extend(report['amplitude-CE'])
table['c-CLR'].extend(report['coverage-CLR'])
table['a-CLR'].extend(report['amplitude-CLR'])
df = pd.DataFrame(table)
n_classes = {}
tr_size = {}
for dataset in df['dataset'].unique():
fetch_fn = {
'binary': qp.datasets.fetch_UCIBinaryDataset,
'multiclass': qp.datasets.fetch_UCIMulticlassDataset
}[setup]
data = fetch_fn(dataset)
n_classes[dataset] = data.n_classes
tr_size[dataset] = len(data.training)
# remove datasets with more than max_classes classes
max_classes = 30
for data_name, n in n_classes.items():
if n > max_classes:
df = df[df["dataset"] != data_name]
for region in ['CI', 'CE', 'CLR']:
pv = pd.pivot_table(
df, index='dataset', columns='method', values=['ae', f'c-{region}', f'a-{region}'], margins=True
)
pv['n_classes'] = pv.index.map(n_classes).astype('Int64')
pv['tr_size'] = pv.index.map(tr_size).astype('Int64')
pv = pv.drop(columns=[col for col in pv.columns if col[-1] == "All"])
print(f'{setup=}')
print(pv)
print('-'*80)

View File

@ -0,0 +1,56 @@
import warnings
from sklearn.linear_model import LogisticRegression
import quapy as qp
from BayesianKDEy._bayeisan_kdey import BayesianKDEy
from BayesianKDEy.plot_simplex import plot_prev_points, plot_prev_points_matplot
from method.confidence import ConfidenceIntervals
from quapy.functional import strprev
from quapy.method.aggregative import KDEyML
from quapy.protocol import UPP
import quapy.functional as F
import numpy as np
from tqdm import tqdm
from scipy.stats import dirichlet
if __name__ == '__main__':
qp.environ["SAMPLE_SIZE"] = 500
cls = LogisticRegression()
bayes_kdey = BayesianKDEy(cls, bandwidth=.3, kernel='aitchison', mcmc_seed=0)
datasets = qp.datasets.UCI_BINARY_DATASETS
train, test = qp.datasets.fetch_UCIBinaryDataset(datasets[0]).train_test
# train, test = qp.datasets.fetch_UCIMulticlassDataset('academic-success', standardize=True).train_test
with qp.util.temp_seed(0):
print('fitting KDEy')
bayes_kdey.fit(*train.Xy)
shifted = test.sampling(500, *[0.2, 0.8])
# shifted = test.sampling(500, *test.prevalence()[::-1])
# shifted = test.sampling(500, *F.uniform_prevalence_sampling(train.n_classes))
prev_hat = bayes_kdey.predict(shifted.X)
mae = qp.error.mae(shifted.prevalence(), prev_hat)
print(f'true_prev={strprev(shifted.prevalence())}')
print(f'prev_hat={strprev(prev_hat)}, {mae=:.4f}')
prev_hat, conf_interval = bayes_kdey.predict_conf(shifted.X)
mae = qp.error.mae(shifted.prevalence(), prev_hat)
print(f'mean posterior {strprev(prev_hat)}, {mae=:.4f}')
print(f'CI={conf_interval}')
print(f'\tcontains true={conf_interval.coverage(true_value=shifted.prevalence())==1}')
print(f'\tamplitude={conf_interval.montecarlo_proportion(50_000)*100.:.3f}%')
if train.n_classes == 3:
plot_prev_points(bayes_kdey.prevalence_samples, true_prev=shifted.prevalence(), point_estim=prev_hat, train_prev=train.prevalence())
# plot_prev_points_matplot(samples)
# report = qp.evaluation.evaluation_report(kdey, protocol=UPP(test), verbose=True)
# print(report.mean(numeric_only=True))

View File

@ -0,0 +1,95 @@
import os
import warnings
from os.path import join
from pathlib import Path
from sklearn.calibration import CalibratedClassifierCV
from sklearn.linear_model import LogisticRegression as LR
from sklearn.model_selection import GridSearchCV, StratifiedKFold
from copy import deepcopy as cp
import quapy as qp
from BayesianKDEy._bayeisan_kdey import BayesianKDEy
from BayesianKDEy.full_experiments import experiment, experiment_path, KDEyCLR
from build.lib.quapy.data import LabelledCollection
from quapy.method.aggregative import DistributionMatchingY as DMy, AggregativeQuantifier
from quapy.method.base import BinaryQuantifier, BaseQuantifier
from quapy.model_selection import GridSearchQ
from quapy.data import Dataset
# from BayesianKDEy.plot_simplex import plot_prev_points, plot_prev_points_matplot
from quapy.method.confidence import ConfidenceIntervals, BayesianCC, PQ, WithConfidenceABC, AggregativeBootstrap
from quapy.functional import strprev
from quapy.method.aggregative import KDEyML, ACC
from quapy.protocol import UPP
import quapy.functional as F
import numpy as np
from tqdm import tqdm
from scipy.stats import dirichlet
from collections import defaultdict
from time import time
from sklearn.base import clone, BaseEstimator
def method():
"""
Returns a tuple (name, quantifier, hyperparams, bayesian/bootstrap_constructor), where:
- name: is a str representing the name of the method (e.g., 'BayesianKDEy')
- quantifier: is the base model (e.g., KDEyML())
- hyperparams: is a dictionary for the quantifier (e.g., {'bandwidth': [0.001, 0.005, 0.01, 0.05, 0.1, 0.2]})
- bayesian/bootstrap_constructor: is a function that instantiates the bayesian o bootstrap method with the
quantifier with optimized hyperparameters
"""
acc_hyper = {}
hdy_hyper = {'nbins': [3,4,5,8,16,32]}
kdey_hyper = {'bandwidth': [0.001, 0.005, 0.01, 0.05, 0.1, 0.2]}
kdey_hyper_clr = {'bandwidth': [0.05, 0.1, 0.5, 1., 2., 5.]}
wrap_hyper = lambda dic: {f'quantifier__{k}':v for k,v in dic.items()}
# yield 'BootstrapKDEy', KDEyML(LR()), kdey_hyper, lambda hyper: AggregativeBootstrap(KDEyML(LR(), **hyper), n_test_samples=1000, random_state=0, verbose=True),
# yield 'BayesianKDEy', KDEyML(LR()), kdey_hyper, lambda hyper: BayesianKDEy(mcmc_seed=0, **hyper),
return 'BayKDE*CLR', KDEyCLR(LR()), kdey_hyper_clr, lambda hyper: BayesianKDEy(kernel='aitchison', mcmc_seed=0,
explore_CLR=True,
step_size=.15,
# num_warmup = 5000,
# num_samples = 10_000,
# region='ellipse',
**hyper),
if __name__ == '__main__':
binary = {
'datasets': qp.datasets.UCI_BINARY_DATASETS,
'fetch_fn': qp.datasets.fetch_UCIBinaryDataset,
'sample_size': 500
}
multiclass = {
'datasets': qp.datasets.UCI_MULTICLASS_DATASETS,
'fetch_fn': qp.datasets.fetch_UCIMulticlassDataset,
'sample_size': 1000
}
result_dir = Path('./results')
setup = multiclass
qp.environ['SAMPLE_SIZE'] = setup['sample_size']
data_name = 'digits'
print(f'dataset={data_name}')
data = setup['fetch_fn'](data_name)
is_binary = data.n_classes==2
hyper_subdir = result_dir / 'hyperparams' / ('binary' if is_binary else 'multiclass')
method_name, method, hyper_params, withconf_constructor = method()
hyper_path = experiment_path(hyper_subdir, data_name, method.__class__.__name__)
report = experiment(data, method, method_name, hyper_params, withconf_constructor, hyper_path)
print(f'dataset={data_name}, '
f'method={method_name}: '
f'mae={report["results"]["ae"].mean():.3f}, '
f'coverage={report["results"]["coverage"].mean():.5f}, '
f'amplitude={report["results"]["amplitude"].mean():.5f}, ')

View File

@ -44,7 +44,7 @@ class LabelledCollection:
@property
def index(self):
if self._index is None:
if not hasattr(self, '_index') or self._index is None:
self._index = {class_: np.arange(len(self))[self.labels == class_] for class_ in self.classes_}
return self._index

View File

@ -583,8 +583,8 @@ def solve_adjustment(
"""
Function that tries to solve for :math:`p` the equation :math:`q = M p`, where :math:`q` is the vector of
`unadjusted counts` (as estimated, e.g., via classify and count) with :math:`q_i` an estimate of
:math:`P(\hat{Y}=y_i)`, and where :math:`M` is the matrix of `class-conditional rates` with :math:`M_{ij}` an
estimate of :math:`P(\hat{Y}=y_i|Y=y_j)`.
:math:`P(\\hat{Y}=y_i)`, and where :math:`M` is the matrix of `class-conditional rates` with :math:`M_{ij}` an
estimate of :math:`P(\\hat{Y}=y_i|Y=y_j)`.
:param class_conditional_rates: array of shape `(n_classes, n_classes,)` with entry `(i,j)` being the estimate
of :math:`P(\hat{Y}=y_i|Y=y_j)`, that is, the probability that an instance that belongs to class :math:`y_j`

View File

@ -29,6 +29,7 @@ AGGREGATIVE_METHODS = {
aggregative.KDEyHD,
# aggregative.OneVsAllAggregative,
confidence.BayesianCC,
confidence.PQ,
}
BINARY_METHODS = {
@ -40,6 +41,7 @@ BINARY_METHODS = {
aggregative.MAX,
aggregative.MS,
aggregative.MS2,
confidence.PQ,
}
MULTICLASS_METHODS = {

View File

@ -1,13 +1,21 @@
"""
Utility functions for `Bayesian quantification <https://arxiv.org/abs/2302.09159>`_ methods.
"""
import contextlib
import os
import sys
import numpy as np
import importlib.resources
try:
import jax
import jax.numpy as jnp
import numpyro
import numpyro.distributions as dist
import stan
import logging
import stan.common
DEPENDENCIES_INSTALLED = True
except ImportError:
@ -15,6 +23,7 @@ except ImportError:
jnp = None
numpyro = None
dist = None
stan = None
DEPENDENCIES_INSTALLED = False
@ -77,3 +86,71 @@ def sample_posterior(
rng_key = jax.random.PRNGKey(seed)
mcmc.run(rng_key, n_c_unlabeled=n_c_unlabeled, n_y_and_c_labeled=n_y_and_c_labeled)
return mcmc.get_samples()
def load_stan_file():
return importlib.resources.files('quapy.method').joinpath('stan/pq.stan').read_text(encoding='utf-8')
@contextlib.contextmanager
def _suppress_stan_logging():
with open(os.devnull, "w") as devnull:
old_stderr = sys.stderr
sys.stderr = devnull
try:
yield
finally:
sys.stderr = old_stderr
def pq_stan(stan_code, n_bins, pos_hist, neg_hist, test_hist, number_of_samples, num_warmup, stan_seed):
"""
Perform Bayesian prevalence estimation using a Stan model for probabilistic quantification.
This function builds and samples from a Stan model that implements a bin-based Bayesian
quantifier. It uses the class-conditional histograms of the classifier
outputs for positive and negative examples, along with the test histogram, to estimate
the posterior distribution of prevalence in the test set.
Parameters
----------
stan_code : str
The Stan model code as a string.
n_bins : int
Number of bins used to build the histograms for positive and negative examples.
pos_hist : array-like of shape (n_bins,)
Histogram counts of the classifier outputs for the positive class.
neg_hist : array-like of shape (n_bins,)
Histogram counts of the classifier outputs for the negative class.
test_hist : array-like of shape (n_bins,)
Histogram counts of the classifier outputs for the test set, binned using the same bins.
number_of_samples : int
Number of post-warmup samples to draw from the Stan posterior.
num_warmup : int
Number of warmup iterations for the sampler.
stan_seed : int
Random seed for Stan model compilation and sampling, ensuring reproducibility.
Returns
-------
prev_samples : numpy.ndarray
An array of posterior samples of the prevalence (`prev`) in the test set.
Each element corresponds to one draw from the posterior distribution.
"""
logging.getLogger("stan.common").setLevel(logging.ERROR)
stan_data = {
'n_bucket': n_bins,
'train_neg': neg_hist.tolist(),
'train_pos': pos_hist.tolist(),
'test': test_hist.tolist(),
'posterior': 1
}
with _suppress_stan_logging():
stan_model = stan.build(stan_code, data=stan_data, random_seed=stan_seed)
fit = stan_model.sample(num_chains=1, num_samples=number_of_samples,num_warmup=num_warmup)
return fit['prev']

View File

@ -33,7 +33,7 @@ class KDEBase:
@classmethod
def _check_bandwidth(cls, bandwidth):
def _check_bandwidth(cls, bandwidth, kernel):
"""
Checks that the bandwidth parameter is correct
@ -43,8 +43,9 @@ class KDEBase:
assert bandwidth in KDEBase.BANDWIDTH_METHOD or isinstance(bandwidth, float), \
f'invalid bandwidth, valid ones are {KDEBase.BANDWIDTH_METHOD} or float values'
if isinstance(bandwidth, float):
assert 0 < bandwidth < 1, \
"the bandwidth for KDEy should be in (0,1), since this method models the unit simplex"
assert kernel!='gaussian' or (0 < bandwidth < 1), \
("the bandwidth for a Gaussian kernel in KDEy should be in (0,1), "
"since this method models the unit simplex")
return bandwidth
@classmethod
@ -166,7 +167,7 @@ class KDEyML(AggregativeSoftQuantifier, KDEBase):
def __init__(self, classifier: BaseEstimator=None, fit_classifier=True, val_split=5, bandwidth=0.1, kernel='gaussian',
random_state=None):
super().__init__(classifier, fit_classifier, val_split)
self.bandwidth = KDEBase._check_bandwidth(bandwidth)
self.bandwidth = KDEBase._check_bandwidth(bandwidth, kernel)
self.kernel = self._check_kernel(kernel)
self.random_state=random_state
@ -246,7 +247,7 @@ class KDEyHD(AggregativeSoftQuantifier, KDEBase):
super().__init__(classifier, fit_classifier, val_split)
self.divergence = divergence
self.bandwidth = KDEBase._check_bandwidth(bandwidth)
self.bandwidth = KDEBase._check_bandwidth(bandwidth, kernel='gaussian')
self.random_state=random_state
self.montecarlo_trials = montecarlo_trials
@ -333,7 +334,7 @@ class KDEyCS(AggregativeSoftQuantifier):
def __init__(self, classifier: BaseEstimator=None, fit_classifier=True, val_split=5, bandwidth=0.1):
super().__init__(classifier, fit_classifier, val_split)
self.bandwidth = KDEBase._check_bandwidth(bandwidth)
self.bandwidth = KDEBase._check_bandwidth(bandwidth, kernel='gaussian')
def gram_matrix_mix_sum(self, X, Y=None):
# this adapts the output of the rbf_kernel function (pairwise evaluations of Gaussian kernels k(x,y))

View File

@ -1,19 +1,20 @@
import numpy as np
from joblib import Parallel, delayed
from sklearn.base import BaseEstimator
from sklearn.metrics import confusion_matrix
import quapy as qp
import quapy.functional as F
from quapy.method import _bayesian
from quapy.method.aggregative import AggregativeCrispQuantifier
from quapy.data import LabelledCollection
from quapy.method.aggregative import AggregativeQuantifier
from quapy.method.aggregative import AggregativeQuantifier, AggregativeCrispQuantifier, AggregativeSoftQuantifier, BinaryAggregativeQuantifier
from scipy.stats import chi2
from sklearn.utils import resample
from abc import ABC, abstractmethod
from scipy.special import softmax, factorial
import copy
from functools import lru_cache
from tqdm import tqdm
"""
This module provides implementation of different types of confidence regions, and the implementation of Bootstrap
@ -80,6 +81,12 @@ class ConfidenceRegionABC(ABC):
proportion = np.clip(self.coverage(uniform_simplex), 0., 1.)
return proportion
@property
@abstractmethod
def samples(self):
""" Returns internal samples """
...
class WithConfidenceABC(ABC):
"""
@ -113,7 +120,7 @@ class WithConfidenceABC(ABC):
return self.predict_conf(instances=instances, confidence_level=confidence_level)
@classmethod
def construct_region(cls, prev_estims, confidence_level=0.95, method='intervals'):
def construct_region(cls, prev_estims, confidence_level=0.95, method='intervals')->ConfidenceRegionABC:
"""
Construct a confidence region given many prevalence estimations.
@ -185,30 +192,35 @@ class ConfidenceEllipseSimplex(ConfidenceRegionABC):
"""
Instantiates a Confidence Ellipse in the probability simplex.
:param X: np.ndarray of shape (n_bootstrap_samples, n_classes)
:param samples: np.ndarray of shape (n_bootstrap_samples, n_classes)
:param confidence_level: float, the confidence level (default 0.95)
"""
def __init__(self, X, confidence_level=0.95):
def __init__(self, samples, confidence_level=0.95):
assert 0. < confidence_level < 1., f'{confidence_level=} must be in range(0,1)'
X = np.asarray(X)
samples = np.asarray(samples)
self.mean_ = X.mean(axis=0)
self.cov_ = np.cov(X, rowvar=False, ddof=1)
self.mean_ = samples.mean(axis=0)
self.cov_ = np.cov(samples, rowvar=False, ddof=1)
try:
self.precision_matrix_ = np.linalg.inv(self.cov_)
except:
self.precision_matrix_ = None
self.dim = X.shape[-1]
self.dim = samples.shape[-1]
self.ddof = self.dim - 1
# critical chi-square value
self.confidence_level = confidence_level
self.chi2_critical_ = chi2.ppf(confidence_level, df=self.ddof)
self._samples = samples
@property
def samples(self):
return self._samples
def point_estimate(self):
"""
@ -234,16 +246,21 @@ class ConfidenceEllipseCLR(ConfidenceRegionABC):
"""
Instantiates a Confidence Ellipse in the Centered-Log Ratio (CLR) space.
:param X: np.ndarray of shape (n_bootstrap_samples, n_classes)
:param samples: np.ndarray of shape (n_bootstrap_samples, n_classes)
:param confidence_level: float, the confidence level (default 0.95)
"""
def __init__(self, X, confidence_level=0.95):
X = np.asarray(X)
def __init__(self, samples, confidence_level=0.95):
samples = np.asarray(samples)
self.clr = CLRtransformation()
Z = self.clr(X)
self.mean_ = np.mean(X, axis=0)
Z = self.clr(samples)
self.mean_ = np.mean(samples, axis=0)
self.conf_region_clr = ConfidenceEllipseSimplex(Z, confidence_level=confidence_level)
self._samples = samples
@property
def samples(self):
return self._samples
def point_estimate(self):
"""
@ -273,19 +290,24 @@ class ConfidenceIntervals(ConfidenceRegionABC):
"""
Instantiates a region based on (independent) Confidence Intervals.
:param X: np.ndarray of shape (n_bootstrap_samples, n_classes)
:param samples: np.ndarray of shape (n_bootstrap_samples, n_classes)
:param confidence_level: float, the confidence level (default 0.95)
"""
def __init__(self, X, confidence_level=0.95):
def __init__(self, samples, confidence_level=0.95):
assert 0 < confidence_level < 1, f'{confidence_level=} must be in range(0,1)'
X = np.asarray(X)
samples = np.asarray(samples)
self.means_ = X.mean(axis=0)
self.means_ = samples.mean(axis=0)
alpha = 1-confidence_level
low_perc = (alpha/2.)*100
high_perc = (1-alpha/2.)*100
self.I_low, self.I_high = np.percentile(X, q=[low_perc, high_perc], axis=0)
self.I_low, self.I_high = np.percentile(samples, q=[low_perc, high_perc], axis=0)
self._samples = samples
@property
def samples(self):
return self._samples
def point_estimate(self):
"""
@ -379,7 +401,8 @@ class AggregativeBootstrap(WithConfidenceABC, AggregativeQuantifier):
n_test_samples=500,
confidence_level=0.95,
region='intervals',
random_state=None):
random_state=None,
verbose=False):
assert isinstance(quantifier, AggregativeQuantifier), \
f'base quantifier does not seem to be an instance of {AggregativeQuantifier.__name__}'
@ -396,6 +419,7 @@ class AggregativeBootstrap(WithConfidenceABC, AggregativeQuantifier):
self.confidence_level = confidence_level
self.region = region
self.random_state = random_state
self.verbose = verbose
def aggregation_fit(self, classif_predictions, labels):
data = LabelledCollection(classif_predictions, labels, classes=self.classes_)
@ -421,6 +445,24 @@ class AggregativeBootstrap(WithConfidenceABC, AggregativeQuantifier):
prev_mean, self.confidence = self.aggregate_conf(classif_predictions)
return prev_mean
def aggregate_conf_sequential__(self, classif_predictions: np.ndarray, confidence_level=None):
if confidence_level is None:
confidence_level = self.confidence_level
n_samples = classif_predictions.shape[0]
prevs = []
with qp.util.temp_seed(self.random_state):
for quantifier in self.quantifiers:
for i in tqdm(range(self.n_test_samples), desc='resampling', total=self.n_test_samples, disable=not self.verbose):
sample_i = resample(classif_predictions, n_samples=n_samples)
prev_i = quantifier.aggregate(sample_i)
prevs.append(prev_i)
conf = WithConfidenceABC.construct_region(prevs, confidence_level, method=self.region)
prev_estim = conf.point_estimate()
return prev_estim, conf
def aggregate_conf(self, classif_predictions: np.ndarray, confidence_level=None):
if confidence_level is None:
confidence_level = self.confidence_level
@ -429,10 +471,15 @@ class AggregativeBootstrap(WithConfidenceABC, AggregativeQuantifier):
prevs = []
with qp.util.temp_seed(self.random_state):
for quantifier in self.quantifiers:
for i in range(self.n_test_samples):
sample_i = resample(classif_predictions, n_samples=n_samples)
prev_i = quantifier.aggregate(sample_i)
prevs.append(prev_i)
results = Parallel(n_jobs=-1)(
delayed(bootstrap_once)(i, classif_predictions, quantifier, n_samples)
for i in range(self.n_test_samples)
)
prevs.extend(results)
# for i in tqdm(range(self.n_test_samples), desc='resampling', total=self.n_test_samples, disable=not self.verbose):
# sample_i = resample(classif_predictions, n_samples=n_samples)
# prev_i = quantifier.aggregate(sample_i)
# prevs.append(prev_i)
conf = WithConfidenceABC.construct_region(prevs, confidence_level, method=self.region)
prev_estim = conf.point_estimate()
@ -457,6 +504,13 @@ class AggregativeBootstrap(WithConfidenceABC, AggregativeQuantifier):
return self.quantifier._classifier_method()
def bootstrap_once(i, classif_predictions, quantifier, n_samples):
idx = np.random.randint(0, len(classif_predictions), n_samples)
sample = classif_predictions[idx]
prev = quantifier.aggregate(sample)
return prev
class BayesianCC(AggregativeCrispQuantifier, WithConfidenceABC):
"""
`Bayesian quantification <https://arxiv.org/abs/2302.09159>`_ method (by Albert Ziegler and Paweł Czyż),
@ -566,8 +620,114 @@ class BayesianCC(AggregativeCrispQuantifier, WithConfidenceABC):
return np.asarray(samples.mean(axis=0), dtype=float)
def predict_conf(self, instances, confidence_level=None) -> (np.ndarray, ConfidenceRegionABC):
if confidence_level is None:
confidence_level = self.confidence_level
classif_predictions = self.classify(instances)
point_estimate = self.aggregate(classif_predictions)
samples = self.get_prevalence_samples() # available after calling "aggregate" function
region = WithConfidenceABC.construct_region(samples, confidence_level=self.confidence_level, method=self.region)
region = WithConfidenceABC.construct_region(samples, confidence_level=confidence_level, method=self.region)
return point_estimate, region
class PQ(AggregativeSoftQuantifier, BinaryAggregativeQuantifier):
"""
`Precise Quantifier: Bayesian distribution matching quantifier <https://arxiv.org/abs/2507.06061>,
which is a variant of :class:`HDy` that calculates the posterior probability distribution
over the prevalence vectors, rather than providing a point estimate.
This method relies on extra dependencies, which have to be installed via:
`$ pip install quapy[bayes]`
:param classifier: a scikit-learn's BaseEstimator, or None, in which case the classifier is taken to be
the one indicated in `qp.environ['DEFAULT_CLS']`
:param val_split: specifies the data used for generating classifier predictions. This specification
can be made as float in (0, 1) indicating the proportion of stratified held-out validation set to
be extracted from the training set; or as an integer (default 5), indicating that the predictions
are to be generated in a `k`-fold cross-validation manner (with this integer indicating the value
for `k`); or as a tuple `(X,y)` defining the specific set of data to use for validation. Set to
None when the method does not require any validation data, in order to avoid that some portion of
the training data be wasted.
:param num_warmup: number of warmup iterations for the STAN sampler (default 500)
:param num_samples: number of samples to draw from the posterior (default 1000)
:param stan_seed: random seed for the STAN sampler (default 0)
:param region: string, set to `intervals` for constructing confidence intervals (default), or to
`ellipse` for constructing an ellipse in the probability simplex, or to `ellipse-clr` for
constructing an ellipse in the Centered-Log Ratio (CLR) unconstrained space.
"""
def __init__(self,
classifier: BaseEstimator=None,
fit_classifier=True,
val_split: int = 5,
nbins: int = 4,
fixed_bins: bool = False,
num_warmup: int = 500,
num_samples: int = 1_000,
stan_seed: int = 0,
confidence_level: float = 0.95,
region: str = 'intervals'):
if num_warmup <= 0:
raise ValueError(f'parameter {num_warmup=} must be a positive integer')
if num_samples <= 0:
raise ValueError(f'parameter {num_samples=} must be a positive integer')
if not _bayesian.DEPENDENCIES_INSTALLED:
raise ImportError("Auxiliary dependencies are required. "
"Run `$ pip install quapy[bayes]` to install them.")
super().__init__(classifier, fit_classifier, val_split)
self.nbins = nbins
self.fixed_bins = fixed_bins
self.num_warmup = num_warmup
self.num_samples = num_samples
self.stan_seed = stan_seed
self.stan_code = _bayesian.load_stan_file()
self.confidence_level = confidence_level
self.region = region
def aggregation_fit(self, classif_predictions, labels):
y_pred = classif_predictions[:, self.pos_label]
# Compute bin limits
if self.fixed_bins:
# Uniform bins in [0,1]
self.bin_limits = np.linspace(0, 1, self.nbins + 1)
else:
# Quantile bins
self.bin_limits = np.quantile(y_pred, np.linspace(0, 1, self.nbins + 1))
# Assign each prediction to a bin
bin_indices = np.digitize(y_pred, self.bin_limits[1:-1], right=True)
# Positive and negative masks
pos_mask = (labels == self.pos_label)
neg_mask = ~pos_mask
# Count positives and negatives per bin
self.pos_hist = np.bincount(bin_indices[pos_mask], minlength=self.nbins)
self.neg_hist = np.bincount(bin_indices[neg_mask], minlength=self.nbins)
def aggregate(self, classif_predictions):
Px_test = classif_predictions[:, self.pos_label]
test_hist, _ = np.histogram(Px_test, bins=self.bin_limits)
prevs = _bayesian.pq_stan(
self.stan_code, self.nbins, self.pos_hist, self.neg_hist, test_hist,
self.num_samples, self.num_warmup, self.stan_seed
).flatten()
self.prev_distribution = np.vstack([1-prevs, prevs]).T
return self.prev_distribution.mean(axis=0)
def aggregate_conf(self, predictions, confidence_level=None):
if confidence_level is None:
confidence_level = self.confidence_level
point_estimate = self.aggregate(predictions)
samples = self.prev_distribution
region = WithConfidenceABC.construct_region(samples, confidence_level=confidence_level, method=self.region)
return point_estimate, region
def predict_conf(self, instances, confidence_level=None) -> (np.ndarray, ConfidenceRegionABC):
predictions = self.classify(instances)
return self.aggregate_conf(predictions, confidence_level=confidence_level)

View File

@ -6,7 +6,7 @@ from sklearn.feature_extraction.text import CountVectorizer
from sklearn.utils import resample
from sklearn.preprocessing import normalize
from method.confidence import WithConfidenceABC, ConfidenceRegionABC
from quapy.method.confidence import WithConfidenceABC, ConfidenceRegionABC
from quapy.functional import get_divergence
from quapy.method.base import BaseQuantifier, BinaryQuantifier
import quapy.functional as F

39
quapy/method/stan/pq.stan Normal file
View File

@ -0,0 +1,39 @@
data {
int<lower=0> n_bucket;
array[n_bucket] int<lower=0> train_pos;
array[n_bucket] int<lower=0> train_neg;
array[n_bucket] int<lower=0> test;
int<lower=0,upper=1> posterior;
}
transformed data{
row_vector<lower=0>[n_bucket] train_pos_rv;
row_vector<lower=0>[n_bucket] train_neg_rv;
row_vector<lower=0>[n_bucket] test_rv;
real n_test;
train_pos_rv = to_row_vector( train_pos );
train_neg_rv = to_row_vector( train_neg );
test_rv = to_row_vector( test );
n_test = sum( test );
}
parameters {
simplex[n_bucket] p_neg;
simplex[n_bucket] p_pos;
real<lower=0,upper=1> prev_prior;
}
model {
if( posterior ) {
target += train_neg_rv * log( p_neg );
target += train_pos_rv * log( p_pos );
target += test_rv * log( p_neg * ( 1 - prev_prior) + p_pos * prev_prior );
}
}
generated quantities {
real<lower=0,upper=1> prev;
prev = sum( binomial_rng(test, 1 / ( 1 + (p_neg./p_pos) *(1-prev_prior)/prev_prior ) ) ) / n_test;
}

View File

@ -410,7 +410,7 @@ def group_params(param_grid: dict):
"""
classifier_params, quantifier_params = {}, {}
for key, values in param_grid.items():
if key.startswith('classifier__') or key == 'val_split':
if 'classifier__' in key or key == 'val_split':
classifier_params[key] = values
else:
quantifier_params[key] = values

View File

@ -111,6 +111,12 @@ setup(
#
packages=find_packages(include=['quapy', 'quapy.*']), # Required
package_data={
# For the 'quapy.method' package, include all files
# in the 'stan' subdirectory that end with .stan
'quapy.method': ['stan/*.stan']
},
python_requires='>=3.8, <4',
install_requires=['scikit-learn', 'pandas', 'tqdm', 'matplotlib', 'joblib', 'xlrd', 'abstention', 'ucimlrepo', 'certifi'],
@ -124,7 +130,7 @@ setup(
# Similar to `install_requires` above, these must be valid existing
# projects.
extras_require={ # Optional
'bayes': ['jax', 'jaxlib', 'numpyro'],
'bayes': ['jax', 'jaxlib', 'numpyro', 'pystan'],
'neural': ['torch'],
'tests': ['certifi'],
'docs' : ['sphinx-rtd-theme', 'myst-parser'],