import os from functools import lru_cache from pathlib import Path from jax import numpy as jnp from sklearn.base import BaseEstimator from sklearn.decomposition import PCA import quapy.functional as F import quapy as qp import numpy as np from quapy.method.aggregative import KDEyML from quapy.functional import ILRtransformation from scipy.stats import entropy RESULT_DIR = Path('results') # utils def experiment_path(dir:Path, dataset_name:str, method_name:str): os.makedirs(dir, exist_ok=True) return dir/f'{dataset_name}__{method_name}.pkl' def normalized_entropy(p): """ Normalized Shannon entropy in [0, 1] p: array-like, prevalence vector (sums to 1) """ p = np.asarray(p) H = entropy(p) # Shannon entropy H_max = np.log(len(p)) return np.clip(H / H_max, 0, 1) def antagonistic_prevalence(p, strength=1): ilr = ILRtransformation() z = ilr(p) z_ant = - strength * z p_ant = ilr.inverse(z_ant) return p_ant """ class KDEyScaledB(KDEyML): def __init__(self, classifier: BaseEstimator=None, fit_classifier=True, val_split=5, bandwidth=1., random_state=None): super().__init__( classifier=classifier, fit_classifier=fit_classifier, val_split=val_split, bandwidth=bandwidth, random_state=random_state, kernel='gaussian' ) def aggregation_fit(self, classif_predictions, labels): if not hasattr(self, '_changed'): def scale_bandwidth(n_classes, beta=0.5): return self.bandwidth * np.power(n_classes, beta) n_classes = len(set(y)) scaled = scale_bandwidth(n_classes) print(f'bandwidth scaling: {self.bandwidth:.4f} => {scaled:.4f}') self.bandwidth = scaled self._changed = True return super().aggregation_fit(classif_predictions, labels) """ class KDEyScaledB(KDEyML): def __init__(self, classifier: BaseEstimator=None, fit_classifier=True, val_split=5, bandwidth=1., random_state=None): super().__init__( classifier=classifier, fit_classifier=fit_classifier, val_split=val_split, bandwidth=bandwidth, random_state=random_state, kernel='gaussian' ) class KDEyFresh(KDEyML): def __init__(self, classifier: BaseEstimator=None, fit_classifier=True, val_split=5, bandwidth=1., random_state=None): super().__init__( classifier=classifier, fit_classifier=fit_classifier, val_split=val_split, bandwidth=bandwidth, random_state=random_state, kernel='gaussian' ) class KDEyReduce(KDEyML): def __init__(self, classifier: BaseEstimator=None, fit_classifier=True, val_split=5, bandwidth=1., n_components=10, random_state=None): super().__init__( classifier=classifier, fit_classifier=fit_classifier, val_split=val_split, bandwidth=bandwidth, random_state=random_state, kernel='gaussian' ) self.n_components=n_components def aggregation_fit(self, classif_predictions, labels): self.pca = PCA(n_components=self.n_components) classif_predictions = self.pca.fit_transform(classif_predictions) return super().aggregation_fit(classif_predictions, labels) def aggregate(self, posteriors: np.ndarray): posteriors = self.pca.transform(posteriors) return super().aggregate(posteriors) class KDEyCLR(KDEyML): def __init__(self, classifier: BaseEstimator=None, fit_classifier=True, val_split=5, bandwidth=1., random_state=None): super().__init__( classifier=classifier, fit_classifier=fit_classifier, val_split=val_split, bandwidth=bandwidth, random_state=random_state, kernel='aitchison' ) class KDEyILR(KDEyML): def __init__(self, classifier: BaseEstimator=None, fit_classifier=True, val_split=5, bandwidth=1., random_state=None): super().__init__( classifier=classifier, fit_classifier=fit_classifier, val_split=val_split, bandwidth=bandwidth, random_state=random_state, kernel='ilr' ) class ILRtransformation(F.CompositionalTransformation): def __init__(self, jax_mode=False): self.jax_mode = jax_mode def array(self, X): if self.jax_mode: return jnp.array(X) else: return np.asarray(X) def __call__(self, X): X = self.array(X) X = qp.error.smooth(X, self.EPSILON) k = X.shape[-1] V = self.array(self.get_V(k)) logp = jnp.log(X) if self.jax_mode else np.log(X) return logp @ V.T def inverse(self, Z): Z = self.array(Z) k_minus_1 = Z.shape[-1] k = k_minus_1 + 1 V = self.array(self.get_V(k)) logp = Z @ V p = jnp.exp(logp) if self.jax_mode else np.exp(logp) p = p / jnp.sum(p, axis=-1, keepdims=True) if self.jax_mode else p / np.sum(p, axis=-1, keepdims=True) return p @lru_cache(maxsize=None) def get_V(self, k): def helmert_matrix(k): """ Returns the (k x k) Helmert matrix. """ H = np.zeros((k, k)) for i in range(1, k): H[i, :i] = 1 H[i, i] = -(i) H[i] = H[i] / np.sqrt(i * (i + 1)) # row 0 stays zeros; will be discarded return H def ilr_basis(k): """ Constructs an orthonormal ILR basis using the Helmert submatrix. Output shape: (k-1, k) """ H = helmert_matrix(k) V = H[1:, :] # remove first row of zeros return V return ilr_basis(k) def in_simplex(x, atol=1e-8): x = np.asarray(x) non_negative = np.all(x >= 0, axis=-1) sum_to_one = np.isclose(x.sum(axis=-1), 1.0, atol=atol) return non_negative & sum_to_one class MockClassifierFromPosteriors(BaseEstimator): def __init__(self): pass def fit(self, X, y): self.classes_ = sorted(np.unique(y)) return self def predict(self, X): return np.argmax(X, axis=1) def predict_proba(self, X): return X