import os from functools import lru_cache from pathlib import Path from jax import numpy as jnp from sklearn.base import BaseEstimator import error import functional as F import quapy as qp import numpy as np from method.aggregative import KDEyML from quapy.functional import l1_norm, ILRtransformation from scipy.stats import entropy from abc import ABC, abstractmethod FINEGRAINED = True RESULT_DIR = Path('results_finegrained') if FINEGRAINED else Path('results') class DatasetHandler(ABC): def __init__(self, name:str, sample_size:int): self._name = name self._sample_size = sample_size @abstractmethod def get_training(self): ... @abstractmethod def get_train_testprot_for_eval(self): ... @abstractmethod def get_train_valprot_for_modsel(self): ... def sample_size(self): return self._sample_size def name(self): return self._name @classmethod @abstractmethod def iter(cls): ... def __repr__(self): return self.__class__.__name__ @classmethod @abstractmethod def is_binary(self): ... class UCIMulticlassHandler(DatasetHandler): DATASETS = qp.datasets.UCI_MULTICLASS_DATASETS.copy() def __init__(self, name, n_val_samples=100, n_test_samples=100): super().__init__(name, sample_size=1000) self._dataset = None # lazy fetch self.n_val_samples = n_val_samples self.n_test_samples = n_test_samples def get_training(self): return self.dataset().training def get_train_testprot_for_eval(self): training, test = self.dataset().train_test test_generator = qp.protocol.UPP(test, repeats=self.n_test_samples, random_state=0) return training, test_generator def get_train_valprot_for_modsel(self): training = self.dataset().training training, val = training.split_stratified(train_prop=0.6, random_state=0) val_generator = qp.protocol.UPP(val, repeats=self.n_val_samples, random_state=0) return training, val_generator @lru_cache(maxsize=None) def dataset(self): if self._dataset is None: self._dataset = qp.datasets.fetch_UCIMulticlassDataset(self.name(), min_class_support=0.01) return self._dataset def __repr__(self): return "" # self.dataset().__repr__() @classmethod def iter(cls): for name in cls.DATASETS: yield cls(name) @classmethod def is_binary(self): return False class LeQuaHandler(DatasetHandler): DATASETS = ['LeQua2022', 'LeQua2024'] def __init__(self, name): super().__init__(name, sample_size=1000) self._dataset = None # lazy fetch def get_training(self): return self.dataset()[0] def get_train_testprot_for_eval(self): training, _, test_generator = self.dataset() return training, test_generator def get_train_valprot_for_modsel(self): training, val_generator, _ = self.dataset() return training, val_generator @lru_cache(maxsize=None) def dataset(self): if self._dataset is None: if self.name()=='LeQua2022': self._dataset = qp.datasets.fetch_lequa2022(task='T1B') elif self.name()=='LeQua2024': self._dataset = qp.datasets.fetch_lequa2024(task='T2') else: raise ValueError(f'unexpected dataset name {self.name()}; valid ones are {self.DATASETS}') return self._dataset def __repr__(self): return self.dataset().__repr__() @classmethod def iter(cls): for name in cls.DATASETS: yield cls(name) @classmethod def is_binary(self): return False # def fetch_UCI_multiclass(data_name): # return qp.datasets.fetch_UCIMulticlassDataset(data_name, min_class_support=0.01) def fetch_UCI_binary(data_name): return qp.datasets.fetch_UCIBinaryDataset(data_name) # global configurations binary = { 'datasets': qp.datasets.UCI_BINARY_DATASETS.copy(), 'fetch_fn': fetch_UCI_binary, 'sample_size': 500 } # multiclass = { # 'datasets': qp.datasets.UCI_MULTICLASS_DATASETS.copy(), # 'fetch_fn': fetch_UCI_multiclass, # 'sample_size': 1000 # } # try: # multiclass['datasets'].remove('poker_hand') # random performance # multiclass['datasets'].remove('hcv') # random performance # multiclass['datasets'].remove('letter') # many classes # multiclass['datasets'].remove('isolet') # many classes # except ValueError: # pass # utils def experiment_path(dir:Path, dataset_name:str, method_name:str): os.makedirs(dir, exist_ok=True) return dir/f'{dataset_name}__{method_name}.pkl' def normalized_entropy(p): """ Normalized Shannon entropy in [0, 1] p: array-like, prevalence vector (sums to 1) """ p = np.asarray(p) H = entropy(p) # Shannon entropy H_max = np.log(len(p)) return np.clip(H / H_max, 0, 1) def antagonistic_prevalence(p, strength=1): ilr = ILRtransformation() z = ilr(p) z_ant = - strength * z p_ant = ilr.inverse(z_ant) return p_ant class KDEyCLR(KDEyML): def __init__(self, classifier: BaseEstimator=None, fit_classifier=True, val_split=5, bandwidth=1., random_state=None): super().__init__( classifier=classifier, fit_classifier=fit_classifier, val_split=val_split, bandwidth=bandwidth, random_state=random_state, kernel='aitchison' ) class KDEyILR(KDEyML): def __init__(self, classifier: BaseEstimator=None, fit_classifier=True, val_split=5, bandwidth=1., random_state=None): super().__init__( classifier=classifier, fit_classifier=fit_classifier, val_split=val_split, bandwidth=bandwidth, random_state=random_state, kernel='ilr' ) class ILRtransformation(F.CompositionalTransformation): def __init__(self, jax_mode=False): self.jax_mode = jax_mode def array(self, X): if self.jax_mode: return jnp.array(X) else: return np.asarray(X) def __call__(self, X): X = self.array(X) X = qp.error.smooth(X, self.EPSILON) k = X.shape[-1] V = self.array(self.get_V(k)) logp = jnp.log(X) if self.jax_mode else np.log(X) return logp @ V.T def inverse(self, Z): Z = self.array(Z) k_minus_1 = Z.shape[-1] k = k_minus_1 + 1 V = self.array(self.get_V(k)) logp = Z @ V p = jnp.exp(logp) if self.jax_mode else np.exp(logp) p = p / jnp.sum(p, axis=-1, keepdims=True) if self.jax_mode else p / np.sum(p, axis=-1, keepdims=True) return p @lru_cache(maxsize=None) def get_V(self, k): def helmert_matrix(k): """ Returns the (k x k) Helmert matrix. """ H = np.zeros((k, k)) for i in range(1, k): H[i, :i] = 1 H[i, i] = -(i) H[i] = H[i] / np.sqrt(i * (i + 1)) # row 0 stays zeros; will be discarded return H def ilr_basis(k): """ Constructs an orthonormal ILR basis using the Helmert submatrix. Output shape: (k-1, k) """ H = helmert_matrix(k) V = H[1:, :] # remove first row of zeros return V return ilr_basis(k) def in_simplex(x): return np.all(x >= 0) and np.isclose(x.sum(), 1)