From 6ce5eea4f2d3ba4bd4b02310110c8eb7d742605c Mon Sep 17 00:00:00 2001 From: Alejandro Moreo Date: Tue, 17 Sep 2024 10:57:46 +0200 Subject: [PATCH] switch --- KDEy/experiments.py | 31 +++- KDEy/kdey_devel.py | 358 ++++++++++++++++++++++++++++++++++++++++++++ KDEy/utils.py | 12 ++ 3 files changed, 394 insertions(+), 7 deletions(-) create mode 100644 KDEy/kdey_devel.py create mode 100644 KDEy/utils.py diff --git a/KDEy/experiments.py b/KDEy/experiments.py index d14ba01..b3c270e 100644 --- a/KDEy/experiments.py +++ b/KDEy/experiments.py @@ -6,7 +6,9 @@ from sklearn.linear_model import LogisticRegression from os.path import join import quapy as qp from quapy.protocol import UPP -from quapy.method.aggregative import KDEyML +from kdey_devel import KDEyML + + DEBUG = False @@ -23,6 +25,7 @@ bandwidth_range = np.linspace(0.01, 0.20, 20) if DEBUG: bandwidth_range = np.linspace(0.01, 0.20, 10) + def datasets(): for dataset_name in qp.datasets.UCI_MULTICLASS_DATASETS: dataset = qp.datasets.fetch_UCIMulticlassDataset(dataset_name) @@ -31,6 +34,24 @@ def datasets(): yield dataset +def predict_b_modsel(train): + tinit = 0 + # bandwidth chosen during model selection in validation + train_tr, train_va = train.split_stratified(random_state=0) + kdey = KDEyML(random_state=0) + modsel = qp.model_selection.GridSearchQ( + model=kdey, + param_grid={'bandwidth': bandwidth_range}, + protocol=UPP(train_va, repeats=val_repeats), + refit=False, + n_jobs=-1, + verbose=True + ).fit(train_tr) + chosen_bandwidth = modsel.best_params_['bandwidth'] + modsel_choice = float(chosen_bandwidth) + tend = + return modsel_choice + def experiment_dataset(dataset): train, test = dataset.train_test test_gen = UPP(test, repeats=test_repeats) @@ -84,7 +105,7 @@ def plot_bandwidth(val_choice, test_results): # Agregar etiquetas y título plt.xlabel('Bandwidth') plt.ylabel('MAE') - plt.title('bandwidth vs score') + plt.title(dataset_name) # Mostrar la leyenda plt.legend() @@ -108,11 +129,7 @@ for dataset in datasets(): else: result_path = f'./results/{dataset.name}.pkl' - #modsel_choice, dataset_results = qp.util.pickled_resource(result_path, experiment_dataset, dataset) - if os.path.exists(result_path): - modsel_choice, dataset_results = pickle.load(open(result_path, 'rb')) - else: - continue + modsel_choice, dataset_results = qp.util.pickled_resource(result_path, experiment_dataset, dataset) val_choice[dataset.name] = modsel_choice test_results[dataset.name] = dataset_results diff --git a/KDEy/kdey_devel.py b/KDEy/kdey_devel.py new file mode 100644 index 0000000..a9b196b --- /dev/null +++ b/KDEy/kdey_devel.py @@ -0,0 +1,358 @@ +from typing import Union +import numpy as np +from sklearn.base import BaseEstimator +from sklearn.neighbors import KernelDensity + +import quapy as qp +from quapy.data import LabelledCollection +from quapy.method.aggregative import AggregativeSoftQuantifier +import quapy.functional as F + +from sklearn.metrics.pairwise import rbf_kernel + + +class KDEBase: + """ + Common ancestor for KDE-based methods. Implements some common routines. + """ + + BANDWIDTH_METHOD = ['scott', 'silverman'] + + @classmethod + def _check_bandwidth(cls, bandwidth): + """ + Checks that the bandwidth parameter is correct + + :param bandwidth: either a string (see BANDWIDTH_METHOD) or a float + :return: the bandwidth if the check is passed, or raises an exception for invalid values + """ + assert bandwidth in KDEBase.BANDWIDTH_METHOD or isinstance(bandwidth, float), \ + f'invalid bandwidth, valid ones are {KDEBase.BANDWIDTH_METHOD} or float values' + if isinstance(bandwidth, float): + assert 0 < bandwidth < 1, \ + "the bandwith for KDEy should be in (0,1), since this method models the unit simplex" + return bandwidth + + def get_kde_function(self, X, bandwidth): + """ + Wraps the KDE function from scikit-learn. + + :param X: data for which the density function is to be estimated + :param bandwidth: the bandwidth of the kernel + :return: a scikit-learn's KernelDensity object + """ + return KernelDensity(bandwidth=bandwidth).fit(X) + + def pdf(self, kde, X): + """ + Wraps the density evalution of scikit-learn's KDE. Scikit-learn returns log-scores (s), so this + function returns :math:`e^{s}` + + :param kde: a previously fit KDE function + :param X: the data for which the density is to be estimated + :return: np.ndarray with the densities + """ + return np.exp(kde.score_samples(X)) + + def get_mixture_components(self, X, y, classes, bandwidth): + """ + Returns an array containing the mixture components, i.e., the KDE functions for each class. + + :param X: the data containing the covariates + :param y: the class labels + :param n_classes: integer, the number of classes + :param bandwidth: float, the bandwidth of the kernel + :return: a list of KernelDensity objects, each fitted with the corresponding class-specific covariates + """ + class_cond_X = [] + for cat in classes: + selX = X[y == cat] + if selX.size == 0: + selX = [F.uniform_prevalence(len(classes))] + class_cond_X.append(selX) + return [self.get_kde_function(X_cond_yi, bandwidth) for X_cond_yi in class_cond_X] + + +class KDEyML(AggregativeSoftQuantifier, KDEBase): + """ + Kernel Density Estimation model for quantification (KDEy) relying on the Kullback-Leibler divergence (KLD) as + the divergence measure to be minimized. This method was first proposed in the paper + `Kernel Density Estimation for Multiclass Quantification `_, in which + the authors show that minimizing the distribution mathing criterion for KLD is akin to performing + maximum likelihood (ML). + + The distribution matching optimization problem comes down to solving: + + :math:`\\hat{\\alpha} = \\arg\\min_{\\alpha\\in\\Delta^{n-1}} \\mathcal{D}(\\boldsymbol{p}_{\\alpha}||q_{\\widetilde{U}})` + + where :math:`p_{\\alpha}` is the mixture of class-specific KDEs with mixture parameter (hence class prevalence) + :math:`\\alpha` defined by + + :math:`\\boldsymbol{p}_{\\alpha}(\\widetilde{x}) = \\sum_{i=1}^n \\alpha_i p_{\\widetilde{L}_i}(\\widetilde{x})` + + where :math:`p_X(\\boldsymbol{x}) = \\frac{1}{|X|} \\sum_{x_i\\in X} K\\left(\\frac{x-x_i}{h}\\right)` is the + KDE function that uses the datapoints in X as the kernel centers. + + In KDEy-ML, the divergence is taken to be the Kullback-Leibler Divergence. This is equivalent to solving: + :math:`\\hat{\\alpha} = \\arg\\min_{\\alpha\\in\\Delta^{n-1}} - + \\mathbb{E}_{q_{\\widetilde{U}}} \\left[ \\log \\boldsymbol{p}_{\\alpha}(\\widetilde{x}) \\right]` + + which corresponds to the maximum likelihood estimate. + + :param classifier: a sklearn's Estimator that generates a binary classifier. + :param val_split: specifies the data used for generating classifier predictions. This specification + can be made as float in (0, 1) indicating the proportion of stratified held-out validation set to + be extracted from the training set; or as an integer (default 5), indicating that the predictions + are to be generated in a `k`-fold cross-validation manner (with this integer indicating the value + for `k`); or as a collection defining the specific set of data to use for validation. + Alternatively, this set can be specified at fit time by indicating the exact set of data + on which the predictions are to be generated. + :param bandwidth: float, the bandwidth of the Kernel + :param random_state: a seed to be set before fitting any base quantifier (default None) + """ + + def __init__(self, classifier: BaseEstimator = None, val_split=5, bandwidth=0.1, random_state=None): + self.classifier = qp._get_classifier(classifier) + self.val_split = val_split + self.bandwidth = KDEBase._check_bandwidth(bandwidth) + self.random_state = random_state + + def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection): + self.mix_densities = self.get_mixture_components(*classif_predictions.Xy, data.classes_, self.bandwidth) + return self + + def aggregate(self, posteriors: np.ndarray): + """ + Searches for the mixture model parameter (the sought prevalence values) that maximizes the likelihood + of the data (i.e., that minimizes the negative log-likelihood) + + :param posteriors: instances in the sample converted into posterior probabilities + :return: a vector of class prevalence estimates + """ + with qp.util.temp_seed(self.random_state): + epsilon = 1e-10 + n_classes = len(self.mix_densities) + test_densities = [self.pdf(kde_i, posteriors) for kde_i in self.mix_densities] + + def neg_loglikelihood(prev): + test_mixture_likelihood = sum(prev_i * dens_i for prev_i, dens_i in zip(prev, test_densities)) + test_loglikelihood = np.log(test_mixture_likelihood + epsilon) + return -np.sum(test_loglikelihood) + + return F.optim_minimize(neg_loglikelihood, n_classes) + + +class KDEyHD(AggregativeSoftQuantifier, KDEBase): + """ + Kernel Density Estimation model for quantification (KDEy) relying on the squared Hellinger Disntace (HD) as + the divergence measure to be minimized. This method was first proposed in the paper + `Kernel Density Estimation for Multiclass Quantification `_, in which + the authors proposed a Monte Carlo approach for minimizing the divergence. + + The distribution matching optimization problem comes down to solving: + + :math:`\\hat{\\alpha} = \\arg\\min_{\\alpha\\in\\Delta^{n-1}} \\mathcal{D}(\\boldsymbol{p}_{\\alpha}||q_{\\widetilde{U}})` + + where :math:`p_{\\alpha}` is the mixture of class-specific KDEs with mixture parameter (hence class prevalence) + :math:`\\alpha` defined by + + :math:`\\boldsymbol{p}_{\\alpha}(\\widetilde{x}) = \\sum_{i=1}^n \\alpha_i p_{\\widetilde{L}_i}(\\widetilde{x})` + + where :math:`p_X(\\boldsymbol{x}) = \\frac{1}{|X|} \\sum_{x_i\\in X} K\\left(\\frac{x-x_i}{h}\\right)` is the + KDE function that uses the datapoints in X as the kernel centers. + + In KDEy-HD, the divergence is taken to be the squared Hellinger Distance, an f-divergence with corresponding + f-generator function given by: + + :math:`f(u)=(\\sqrt{u}-1)^2` + + The authors proposed a Monte Carlo solution that relies on importance sampling: + + :math:`\\hat{D}_f(p||q)= \\frac{1}{t} \\sum_{i=1}^t f\\left(\\frac{p(x_i)}{q(x_i)}\\right) \\frac{q(x_i)}{r(x_i)}` + + where the datapoints (trials) :math:`x_1,\\ldots,x_t\\sim_{\\mathrm{iid}} r` with :math:`r` the + uniform distribution. + + :param classifier: a sklearn's Estimator that generates a binary classifier. + :param val_split: specifies the data used for generating classifier predictions. This specification + can be made as float in (0, 1) indicating the proportion of stratified held-out validation set to + be extracted from the training set; or as an integer (default 5), indicating that the predictions + are to be generated in a `k`-fold cross-validation manner (with this integer indicating the value + for `k`); or as a collection defining the specific set of data to use for validation. + Alternatively, this set can be specified at fit time by indicating the exact set of data + on which the predictions are to be generated. + :param bandwidth: float, the bandwidth of the Kernel + :param random_state: a seed to be set before fitting any base quantifier (default None) + :param montecarlo_trials: number of Monte Carlo trials (default 10000) + """ + + def __init__(self, classifier: BaseEstimator = None, val_split=5, divergence: str = 'HD', + bandwidth=0.1, random_state=None, montecarlo_trials=10000): + + self.classifier = qp._get_classifier(classifier) + self.val_split = val_split + self.divergence = divergence + self.bandwidth = KDEBase._check_bandwidth(bandwidth) + self.random_state = random_state + self.montecarlo_trials = montecarlo_trials + + def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection): + self.mix_densities = self.get_mixture_components(*classif_predictions.Xy, data.classes_, self.bandwidth) + + N = self.montecarlo_trials + rs = self.random_state + n = data.n_classes + self.reference_samples = np.vstack([kde_i.sample(N // n, random_state=rs) for kde_i in self.mix_densities]) + self.reference_classwise_densities = np.asarray( + [self.pdf(kde_j, self.reference_samples) for kde_j in self.mix_densities]) + self.reference_density = np.mean(self.reference_classwise_densities, + axis=0) # equiv. to (uniform @ self.reference_classwise_densities) + + return self + + def aggregate(self, posteriors: np.ndarray): + # we retain all n*N examples (sampled from a mixture with uniform parameter), and then + # apply importance sampling (IS). In this version we compute D(p_alpha||q) with IS + n_classes = len(self.mix_densities) + + test_kde = self.get_kde_function(posteriors, self.bandwidth) + test_densities = self.pdf(test_kde, self.reference_samples) + + def f_squared_hellinger(u): + return (np.sqrt(u) - 1) ** 2 + + # todo: this will fail when self.divergence is a callable, and is not the right place to do it anyway + if self.divergence.lower() == 'hd': + f = f_squared_hellinger + else: + raise ValueError('only squared HD is currently implemented') + + epsilon = 1e-10 + qs = test_densities + epsilon + rs = self.reference_density + epsilon + iw = qs / rs # importance weights + p_class = self.reference_classwise_densities + epsilon + fracs = p_class / qs + + def divergence(prev): + # ps / qs = (prev @ p_class) / qs = prev @ (p_class / qs) = prev @ fracs + ps_div_qs = prev @ fracs + return np.mean(f(ps_div_qs) * iw) + + return F.optim_minimize(divergence, n_classes) + + +class KDEyCS(AggregativeSoftQuantifier): + """ + Kernel Density Estimation model for quantification (KDEy) relying on the Cauchy-Schwarz divergence (CS) as + the divergence measure to be minimized. This method was first proposed in the paper + `Kernel Density Estimation for Multiclass Quantification `_, in which + the authors proposed a Monte Carlo approach for minimizing the divergence. + + The distribution matching optimization problem comes down to solving: + + :math:`\\hat{\\alpha} = \\arg\\min_{\\alpha\\in\\Delta^{n-1}} \\mathcal{D}(\\boldsymbol{p}_{\\alpha}||q_{\\widetilde{U}})` + + where :math:`p_{\\alpha}` is the mixture of class-specific KDEs with mixture parameter (hence class prevalence) + :math:`\\alpha` defined by + + :math:`\\boldsymbol{p}_{\\alpha}(\\widetilde{x}) = \\sum_{i=1}^n \\alpha_i p_{\\widetilde{L}_i}(\\widetilde{x})` + + where :math:`p_X(\\boldsymbol{x}) = \\frac{1}{|X|} \\sum_{x_i\\in X} K\\left(\\frac{x-x_i}{h}\\right)` is the + KDE function that uses the datapoints in X as the kernel centers. + + In KDEy-CS, the divergence is taken to be the Cauchy-Schwarz divergence given by: + + :math:`\\mathcal{D}_{\\mathrm{CS}}(p||q)=-\\log\\left(\\frac{\\int p(x)q(x)dx}{\\sqrt{\\int p(x)^2dx \\int q(x)^2dx}}\\right)` + + The authors showed that this distribution matching admits a closed-form solution + + :param classifier: a sklearn's Estimator that generates a binary classifier. + :param val_split: specifies the data used for generating classifier predictions. This specification + can be made as float in (0, 1) indicating the proportion of stratified held-out validation set to + be extracted from the training set; or as an integer (default 5), indicating that the predictions + are to be generated in a `k`-fold cross-validation manner (with this integer indicating the value + for `k`); or as a collection defining the specific set of data to use for validation. + Alternatively, this set can be specified at fit time by indicating the exact set of data + on which the predictions are to be generated. + :param bandwidth: float, the bandwidth of the Kernel + """ + + def __init__(self, classifier: BaseEstimator = None, val_split=5, bandwidth=0.1): + self.classifier = qp._get_classifier(classifier) + self.val_split = val_split + self.bandwidth = KDEBase._check_bandwidth(bandwidth) + + def gram_matrix_mix_sum(self, X, Y=None): + # this adapts the output of the rbf_kernel function (pairwise evaluations of Gaussian kernels k(x,y)) + # to contain pairwise evaluations of N(x|mu,Sigma1+Sigma2) with mu=y and Sigma1 and Sigma2 are + # two "scalar matrices" (h^2)*I each, so Sigma1+Sigma2 has scalar 2(h^2) (h is the bandwidth) + h = self.bandwidth + variance = 2 * (h ** 2) + nD = X.shape[1] + gamma = 1 / (2 * variance) + norm_factor = 1 / np.sqrt(((2 * np.pi) ** nD) * (variance ** (nD))) + gram = norm_factor * rbf_kernel(X, Y, gamma=gamma) + return gram.sum() + + def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection): + + P, y = classif_predictions.Xy + n = data.n_classes + + assert all(sorted(np.unique(y)) == np.arange(n)), \ + 'label name gaps not allowed in current implementation' + + # counts_inv keeps track of the relative weight of each datapoint within its class + # (i.e., the weight in its KDE model) + counts_inv = 1 / (data.counts()) + + # tr_tr_sums corresponds to symbol \overline{B} in the paper + tr_tr_sums = np.zeros(shape=(n, n), dtype=float) + for i in range(n): + for j in range(n): + if i > j: + tr_tr_sums[i, j] = tr_tr_sums[j, i] + else: + block = self.gram_matrix_mix_sum(P[y == i], P[y == j] if i != j else None) + tr_tr_sums[i, j] = block + + # keep track of these data structures for the test phase + self.Ptr = P + self.ytr = y + self.tr_tr_sums = tr_tr_sums + self.counts_inv = counts_inv + + return self + + def aggregate(self, posteriors: np.ndarray): + Ptr = self.Ptr + Pte = posteriors + y = self.ytr + tr_tr_sums = self.tr_tr_sums + + M, nD = Pte.shape + Minv = (1 / M) # t in the paper + n = Ptr.shape[1] + + # becomes a constant that does not affect the optimization, no need to compute it + # partC = 0.5*np.log(self.gram_matrix_mix_sum(Pte) * Kinv * Kinv) + + # tr_te_sums corresponds to \overline{a}*(1/Li)*(1/M) in the paper (note the constants + # are already aggregated to tr_te_sums, so these multiplications are not carried out + # at each iteration of the optimization phase) + tr_te_sums = np.zeros(shape=n, dtype=float) + for i in range(n): + tr_te_sums[i] = self.gram_matrix_mix_sum(Ptr[y == i], Pte) + + def divergence(alpha): + # called \overline{r} in the paper + alpha_ratio = alpha * self.counts_inv + + # recall that tr_te_sums already accounts for the constant terms (1/Li)*(1/M) + partA = -np.log((alpha_ratio @ tr_te_sums) * Minv) + partB = 0.5 * np.log(alpha_ratio @ tr_tr_sums @ alpha_ratio) + return partA + partB # + partC + + return F.optim_minimize(divergence, n) + diff --git a/KDEy/utils.py b/KDEy/utils.py new file mode 100644 index 0000000..0e09f9a --- /dev/null +++ b/KDEy/utils.py @@ -0,0 +1,12 @@ +import time +from functools import wraps + +def measuretime(func): + @wraps(func) + def wrapper(*args, **kwargs): + start_time = time.time() # inicia el contador de tiempo + result = func(*args, **kwargs) # ejecuta la función original + end_time = time.time() # finaliza el contador de tiempo + time_it_took = end_time - start_time # calcula el tiempo total + return result, time_it_took # devuelve el resultado y el tiempo + return wrapper \ No newline at end of file