QuaPy/BayesianKDEy/prior_effect.py

122 lines
5.4 KiB
Python

from collections import defaultdict
import model_selection
import quapy as qp
from BayesianKDEy._bayeisan_kdey import BayesianKDEy
from BayesianKDEy.temperature_calibration import temp_calibration
from commons import *
from data import Dataset
from protocol import DirichletProtocol
from quapy.method.confidence import BayesianCC
from quapy.method.aggregative import ACC, AggregativeQuantifier
from sklearn.linear_model import LogisticRegression as LR
from copy import deepcopy as cp
from tqdm import tqdm
from full_experiments import model_selection
def select_imbalanced_datasets(top_m=5):
datasets_prevs = []
# choose top-m imbalanced datasets
for data_name in multiclass['datasets']:
data_prev = multiclass['fetch_fn'](data_name).training.prevalence()
balance = normalized_entropy(data_prev)
datasets_prevs.append((data_name, balance))
datasets_prevs.sort(key=lambda x: x[1])
data_selected = [data_name for data_name, balance in datasets_prevs[:top_m]]
return data_selected
def methods():
acc_hyper = {}
kdey_hyper = {'bandwidth': [0.001, 0.005, 0.01, 0.05, 0.1, 0.2]}
kdey_hyper_clr = {'bandwidth': [0.05, 0.1, 0.5, 1., 2., 5.]}
yield 'BayesianACC', ACC(LR()), acc_hyper, lambda hyper: BayesianCC(LR(), mcmc_seed=0, prior='uniform')
# yield f'BaKDE-Ait', KDEyCLR(LR()), kdey_hyper_clr, lambda hyper: BayesianKDEy(kernel='aitchison', mcmc_seed=0, engine='numpyro', temperature=None, prior='uniform', **hyper)
def run_test(test, alpha_test, alpha_train, concentration, prior_type, bay_quant, train_prev, results):
test_generator = DirichletProtocol(test, alpha=alpha_test, repeats=100, random_state=0)
for i, (sample_X, true_prev) in tqdm(enumerate(test_generator()), total=test_generator.total(),
desc=f'{method_name} informative alpha with {concentration=}'):
estim_prev, region = bay_quant.predict_conf(sample_X)
results['prior-type'].append(prior_type)
results['train-prev'].append(train_prev)
results['concentration'].append(concentration)
results['train-alpha'].append(alpha_train)
results['test-alpha'].append(alpha_test)
results['true-prevs'].append(true_prev)
results['point-estim'].append(estim_prev)
results['shift'].append(qp.error.ae(true_prev, train_prev))
results['ae'].append(qp.error.ae(prevs_true=true_prev, prevs_hat=estim_prev))
results['sre'].append(qp.error.sre(prevs_true=true_prev, prevs_hat=estim_prev, prevs_train=train_prev))
results['rae'].append(qp.error.rae(prevs_true=true_prev, prevs_hat=estim_prev))
results['coverage'].append(region.coverage(true_prev))
results['amplitude'].append(region.montecarlo_proportion(n_trials=50_000))
results['samples'].append(region.samples)
def experiment(dataset: Dataset, point_quantifier: AggregativeQuantifier, grid: dict, bay_constructor, hyper_choice_path: Path):
with qp.util.temp_seed(0):
training, test = dataset.train_test
# model selection
best_hyperparams = qp.util.pickled_resource(
hyper_choice_path, model_selection, training, cp(point_quantifier), grid
)
bay_quant = bay_constructor(best_hyperparams)
if hasattr(bay_quant, 'temperature') and bay_quant.temperature is None:
train, val = data.training.split_stratified(train_prop=0.6, random_state=0)
temperature = temp_calibration(bay_quant, train, val, temp_grid=[.5, 1., 1.5, 2., 5., 10., 100.], n_jobs=-1)
bay_quant.temperature = temperature
bay_quant.fit(*training.Xy)
# test
train_prev = training.prevalence()
results = defaultdict(list)
for concentration in [50, 500, 5_000]:
alpha_train = train_prev * concentration
bay_quant.prior = alpha_train
# informative prior
alpha_test_informative = alpha_train
prior_type = 'informative'
run_test(test, alpha_test_informative, alpha_train, concentration, prior_type, bay_quant, train_prev, results)
# informative prior
alpha_test_wrong = antagonistic_prevalence(train_prev, strength=1) * concentration
prior_type = 'wrong'
run_test(test, alpha_test_wrong, alpha_train, concentration, prior_type, bay_quant, train_prev, results)
report = {
'optim_hyper': best_hyperparams,
'train-prev': train_prev,
'results': {k: np.asarray(v) for k, v in results.items()}
}
return report
if __name__ == '__main__':
result_dir = Path('./results/prior_effect')
selected = select_imbalanced_datasets()
print(f'selected datasets={selected}')
qp.environ['SAMPLE_SIZE'] = multiclass['sample_size']
for data_name in selected:
data = multiclass['fetch_fn'](data_name)
for method_name, surrogate_quant, hyper_params, bay_constructor in methods():
result_path = experiment_path(result_dir, data_name, method_name)
hyper_path = experiment_path(result_dir/'hyperparams', data_name, surrogate_quant.__class__.__name__)
print(f'Launching {method_name} in dataset {data_name}')
experiment(dataset=data,
point_quantifier=surrogate_quant,
grid=hyper_params,
bay_constructor=bay_constructor,
hyper_choice_path=hyper_path)