from typing import Union, Callable import numpy as np from sklearn.base import BaseEstimator from sklearn.neighbors import KernelDensity import quapy as qp from quapy.data import LabelledCollection from quapy.method.aggregative import AggregativeSoftQuantifier, KDEyML import quapy.functional as F from sklearn.metrics.pairwise import rbf_kernel from scipy import optimize class KDEyMLauto(KDEyML): def __init__(self, classifier: BaseEstimator = None, val_split=5, random_state=None, optim='two_steps'): self.classifier = qp._get_classifier(classifier) self.val_split = val_split self.bandwidth = None self.random_state = random_state self.optim = optim def chose_bandwidth(self, train, test_instances): classif_predictions = self.classifier_fit_predict(train, fit_classifier=True, predict_on=self.val_split) te_posteriors = self.classify(test_instances) return self.transduce(classif_predictions, te_posteriors) def transduce(self, classif_predictions, te_posteriors): tr_posteriors, tr_y = classif_predictions.Xy classes = classif_predictions.classes_ n_classes = len(classes) current_bandwidth = 0.05 if self.optim == 'both_fine': current_bandwidth = np.full(fill_value=current_bandwidth, shape=(n_classes,)) current_prevalence = np.full(fill_value=1 / n_classes, shape=(n_classes,)) iterations = 0 convergence = False with qp.util.temp_seed(self.random_state): while not convergence: previous_bandwidth = current_bandwidth previous_prevalence = current_prevalence iterations += 1 print(f'{iterations}:') if self.optim == 'two_steps': current_prevalence = self.optim_minimize_prevalence(current_bandwidth, current_prevalence, tr_posteriors, tr_y, te_posteriors, classes) print(f'\testim-prev={F.strprev(current_prevalence)}') current_bandwidth = self.optim_minimize_bandwidth(current_bandwidth, current_prevalence, tr_posteriors, tr_y, te_posteriors, classes) print(f'\tbandwidth={current_bandwidth}') if np.isclose(previous_bandwidth, current_bandwidth, atol=0.0001) and all( np.isclose(previous_prevalence, current_prevalence, atol=0.0001)): convergence = True elif self.optim == 'both': current_prevalence, current_bandwidth = self.optim_minimize_both(current_bandwidth, current_prevalence, tr_posteriors, tr_y, te_posteriors, classes) if np.isclose(previous_bandwidth, current_bandwidth, atol=0.0001) and all(np.isclose(previous_prevalence, current_prevalence, atol=0.0001)): convergence = True elif self.optim == 'both_fine': current_prevalence, current_bandwidth = self.optim_minimize_both_fine(current_bandwidth, current_prevalence, tr_posteriors, tr_y, te_posteriors, classes) if all(np.isclose(previous_bandwidth, current_bandwidth, atol=0.0001)) and all(np.isclose(previous_prevalence, current_prevalence, atol=0.0001)): convergence = True self.bandwidth = current_bandwidth print('bandwidth=', current_bandwidth) print('prevalence=', current_prevalence) return current_prevalence def optim_minimize_prevalence(self, current_bandwidth, current_prev, tr_posteriors, tr_y, te_posteriors, classes): epsilon = 1e-10 mix_densities = self.get_mixture_components(tr_posteriors, tr_y, classes, current_bandwidth) test_densities = [self.pdf(kde_i, te_posteriors) for kde_i in mix_densities] def neg_loglikelihood_prev(prev): test_mixture_likelihood = sum(prev_i * dens_i for prev_i, dens_i in zip(prev, test_densities)) test_loglikelihood = np.log(test_mixture_likelihood + epsilon) return -np.sum(test_loglikelihood) return optim_minimize(neg_loglikelihood_prev, current_prev) def optim_minimize_bandwidth(self, current_bandwidth, current_prev, tr_posteriors, tr_y, te_posteriors, classes): epsilon = 1e-10 def neg_loglikelihood_bandwidth(bandwidth): mix_densities = self.get_mixture_components(tr_posteriors, tr_y, classes, bandwidth[0]) test_densities = [self.pdf(kde_i, te_posteriors) for kde_i in mix_densities] test_mixture_likelihood = sum(prev_i * dens_i for prev_i, dens_i in zip(current_prev, test_densities)) test_loglikelihood = np.log(test_mixture_likelihood + epsilon) return -np.sum(test_loglikelihood) bounds = [(0.00001, 1)] r = optimize.minimize(neg_loglikelihood_bandwidth, x0=[current_bandwidth], method='SLSQP', bounds=bounds) print(f'iterations-bandwidth={r.nit}') return r.x[0] def optim_minimize_both(self, current_bandwidth, current_prev, tr_posteriors, tr_y, te_posteriors, classes): epsilon = 1e-10 n_classes = len(current_prev) def neg_loglikelihood_bandwidth(prevalence_bandwidth): bandwidth = prevalence_bandwidth[-1] prevalence = prevalence_bandwidth[:-1] mix_densities = self.get_mixture_components(tr_posteriors, tr_y, classes, bandwidth) test_densities = [self.pdf(kde_i, te_posteriors) for kde_i in mix_densities] test_mixture_likelihood = sum(prev_i * dens_i for prev_i, dens_i in zip(prevalence, test_densities)) test_loglikelihood = np.log(test_mixture_likelihood + epsilon) return -np.sum(test_loglikelihood) bounds = [(0, 1) for _ in range(n_classes)] + [(0.00001, 1)] constraints = ({'type': 'eq', 'fun': lambda x: 1 - sum(x[:n_classes])}) prevalence_bandwidth = np.append(current_prev, current_bandwidth) r = optimize.minimize(neg_loglikelihood_bandwidth, x0=prevalence_bandwidth, method='SLSQP', bounds=bounds, constraints=constraints) print(f'iterations-both={r.nit}') prev_band = r.x current_prevalence = prev_band[:-1] current_bandwidth = prev_band[-1] return current_prevalence, current_bandwidth def optim_minimize_both_fine(self, current_bandwidth, current_prev, tr_posteriors, tr_y, te_posteriors, classes): epsilon = 1e-10 n_classes = len(current_bandwidth) def neg_loglikelihood_bandwidth(prevalence_bandwidth): prevalence = prevalence_bandwidth[:n_classes] bandwidth = prevalence_bandwidth[n_classes:] mix_densities = self.get_mixture_components(tr_posteriors, tr_y, classes, bandwidth) test_densities = [self.pdf(kde_i, te_posteriors) for kde_i in mix_densities] test_mixture_likelihood = sum(prev_i * dens_i for prev_i, dens_i in zip(prevalence, test_densities)) test_loglikelihood = np.log(test_mixture_likelihood + epsilon) return -np.sum(test_loglikelihood) bounds = [(0, 1) for _ in range(n_classes)] + [(0.00001, 1) for _ in range(n_classes)] constraints = ({'type': 'eq', 'fun': lambda x: 1 - sum(x[:n_classes])}) prevalence_bandwidth = np.concatenate((current_prev, current_bandwidth)) r = optimize.minimize(neg_loglikelihood_bandwidth, x0=prevalence_bandwidth, method='SLSQP', bounds=bounds, constraints=constraints) print(f'iterations-both-fine={r.nit}') prev_band = r.x current_prevalence = prev_band[:n_classes] current_bandwidth = prev_band[n_classes:] return current_prevalence, current_bandwidth def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection): self.classif_predictions = classif_predictions return self def aggregate(self, posteriors: np.ndarray): return self.transduce(self.classif_predictions, posteriors) def optim_minimize(loss: Callable, init_prev: np.ndarray): """ Searches for the optimal prevalence values, i.e., an `n_classes`-dimensional vector of the (`n_classes`-1)-simplex that yields the smallest lost. This optimization is carried out by means of a constrained search using scipy's SLSQP routine. :param loss: (callable) the function to minimize :return: (ndarray) the best prevalence vector found """ n_classes = len(init_prev) # solutions are bounded to those contained in the unit-simplex bounds = tuple((0, 1) for _ in range(n_classes)) # values in [0,1] constraints = ({'type': 'eq', 'fun': lambda x: 1 - sum(x)}) # values summing up to 1 r = optimize.minimize(loss, x0=init_prev, method='SLSQP', bounds=bounds, constraints=constraints) print(f'iterations-prevalence={r.nit}') return r.x