from typing import Union, Callable import numpy as np from sklearn.base import BaseEstimator from sklearn.neighbors import KernelDensity import quapy as qp from quapy.protocol import UPP from quapy.method._kdey import KDEBase from quapy.data import LabelledCollection from quapy.method.aggregative import AggregativeSoftQuantifier, KDEyML import quapy.functional as F from sklearn.metrics.pairwise import rbf_kernel from scipy import optimize from tqdm import tqdm import quapy.functional as F epsilon = 1e-10 BANDWIDTH_RANGE = (0.001, 0.2) class KDEyMLauto(KDEyML): def __init__(self, classifier: BaseEstimator = None, val_split=5, random_state=None, optim='two_steps'): self.classifier = qp._get_classifier(classifier) self.val_split = val_split self.bandwidth = None self.random_state = random_state self.optim = optim def chose_bandwidth(self, train, test_instances): classif_predictions = self.classifier_fit_predict(train, fit_classifier=True, predict_on=self.val_split) te_posteriors = self.classify(test_instances) return self.transduce(classif_predictions, te_posteriors) def transduce(self, classif_predictions, te_posteriors): tr_posteriors, tr_y = classif_predictions.Xy classes = classif_predictions.classes_ n_classes = len(classes) current_bandwidth = 0.05 if self.optim == 'both_fine': current_bandwidth = np.full(fill_value=current_bandwidth, shape=(n_classes,)) current_prevalence = np.full(fill_value=1 / n_classes, shape=(n_classes,)) if self.optim == 'max_likelihood': current_prevalence, current_bandwidth = self.optim_minimize_like(tr_posteriors, tr_y, te_posteriors, classes, grid=True) elif self.optim == 'max_likelihood2': current_prevalence, current_bandwidth = self.optim_minimize_like(tr_posteriors, tr_y, te_posteriors, classes, grid=False) else: iterations = 0 convergence = False with qp.util.temp_seed(self.random_state): while not convergence: previous_bandwidth = current_bandwidth previous_prevalence = current_prevalence iterations += 1 print(f'{iterations}:') if self.optim == 'two_steps': current_prevalence = self.optim_minimize_prevalence(current_bandwidth, current_prevalence, tr_posteriors, tr_y, te_posteriors, classes) print(f'\testim-prev={F.strprev(current_prevalence)}') current_bandwidth = self.optim_minimize_bandwidth(current_bandwidth, current_prevalence, tr_posteriors, tr_y, te_posteriors, classes) print(f'\tbandwidth={current_bandwidth}') elif self.optim == 'both': current_prevalence, current_bandwidth = self.optim_minimize_both(current_bandwidth, current_prevalence, tr_posteriors, tr_y, te_posteriors, classes) elif self.optim == 'both_fine': current_prevalence, current_bandwidth = self.optim_minimize_both_fine(current_bandwidth, current_prevalence, tr_posteriors, tr_y, te_posteriors, classes) # check converngece prev_convergence = all(np.isclose(previous_prevalence, current_prevalence, atol=0.01)) if isinstance(current_bandwidth, float): band_convergence = np.isclose(previous_bandwidth, current_bandwidth, atol=0.001) else: band_convergence = all(np.isclose(previous_bandwidth, current_bandwidth, atol=0.001)) convergence = band_convergence and prev_convergence self.bandwidth = current_bandwidth print('bandwidth=', current_bandwidth) print('prevalence=', current_prevalence) return current_prevalence def optim_minimize_prevalence(self, current_bandwidth, current_prev, tr_posteriors, tr_y, te_posteriors, classes): mix_densities = self.get_mixture_components(tr_posteriors, tr_y, classes, current_bandwidth) test_densities = [self.pdf(kde_i, te_posteriors) for kde_i in mix_densities] def neg_loglikelihood_prev(prev): test_mixture_likelihood = sum(prev_i * dens_i for prev_i, dens_i in zip(prev, test_densities)) test_loglikelihood = np.log(test_mixture_likelihood + epsilon) return -np.sum(test_loglikelihood) return optim_minimize(neg_loglikelihood_prev, current_prev) def optim_minimize_bandwidth(self, current_bandwidth, current_prev, tr_posteriors, tr_y, te_posteriors, classes): def neg_loglikelihood_bandwidth(bandwidth): # bandwidth = bandwidth[0] mix_densities = self.get_mixture_components(tr_posteriors, tr_y, classes, bandwidth) test_densities = [self.pdf(kde_i, te_posteriors) for kde_i in mix_densities] test_mixture_likelihood = sum(prev_i * dens_i for prev_i, dens_i in zip(current_prev, test_densities)) test_loglikelihood = np.log(test_mixture_likelihood + epsilon) nll = -np.sum(test_loglikelihood) # print(f'\t{bandwidth=:.10f}\t{nll=:.10f}') return nll # bounds = [(0.00001, 0.2)] # r = optimize.minimize(neg_loglikelihood_bandwidth, x0=[current_bandwidth], method='SLSQP', bounds=bounds) r = optimize.minimize_scalar(neg_loglikelihood_bandwidth, bounds=(0.00001, 0.2)) # print(f'iterations-bandwidth={r.nit}') assert r.success, f'Process did not converge! {r.message}' return r.x def optim_minimize_both(self, current_bandwidth, current_prev, tr_posteriors, tr_y, te_posteriors, classes): n_classes = len(current_prev) def neg_loglikelihood_bandwidth(prevalence_bandwidth): bandwidth = prevalence_bandwidth[-1] prevalence = prevalence_bandwidth[:-1] mix_densities = self.get_mixture_components(tr_posteriors, tr_y, classes, bandwidth) test_densities = [self.pdf(kde_i, te_posteriors) for kde_i in mix_densities] test_mixture_likelihood = sum(prev_i * dens_i for prev_i, dens_i in zip(prevalence, test_densities)) test_loglikelihood = np.log(test_mixture_likelihood + epsilon) return -np.sum(test_loglikelihood) bounds = [(0, 1) for _ in range(n_classes)] + [(0.00001, 0.2)] constraints = ({'type': 'eq', 'fun': lambda x: 1 - sum(x[:n_classes])}) prevalence_bandwidth = np.append(current_prev, current_bandwidth) r = optimize.minimize(neg_loglikelihood_bandwidth, x0=prevalence_bandwidth, method='SLSQP', bounds=bounds, constraints=constraints) print(f'iterations-both={r.nit}') assert r.success, 'Process did not converge!' prev_band = r.x current_prevalence = prev_band[:-1] current_bandwidth = prev_band[-1] return current_prevalence, current_bandwidth def optim_minimize_both_fine(self, current_bandwidth, current_prev, tr_posteriors, tr_y, te_posteriors, classes): n_classes = len(current_bandwidth) def neg_loglikelihood_bandwidth(prevalence_bandwidth): prevalence = prevalence_bandwidth[:n_classes] bandwidth = prevalence_bandwidth[n_classes:] mix_densities = self.get_mixture_components(tr_posteriors, tr_y, classes, bandwidth) test_densities = [self.pdf(kde_i, te_posteriors) for kde_i in mix_densities] test_mixture_likelihood = sum(prev_i * dens_i for prev_i, dens_i in zip(prevalence, test_densities)) test_loglikelihood = np.log(test_mixture_likelihood + epsilon) return -np.sum(test_loglikelihood) bounds = [(0, 1) for _ in range(n_classes)] + [(0.00001, 1) for _ in range(n_classes)] constraints = ({'type': 'eq', 'fun': lambda x: 1 - sum(x[:n_classes])}) prevalence_bandwidth = np.concatenate((current_prev, current_bandwidth)) r = optimize.minimize(neg_loglikelihood_bandwidth, x0=prevalence_bandwidth, method='SLSQP', bounds=bounds, constraints=constraints) print(f'iterations-both-fine={r.nit}') assert r.success, 'Process did not converge!' prev_band = r.x current_prevalence = prev_band[:n_classes] current_bandwidth = prev_band[n_classes:] return current_prevalence, current_bandwidth def optim_minimize_like(self, tr_posteriors, tr_y, te_posteriors, classes, reduction=100, grid=True): n_classes = len(classes) # reduce samples to speed up computation posteriors_subsample = LabelledCollection(tr_posteriors, tr_y) posteriors_subsample = posteriors_subsample.sampling(reduction*n_classes) n_test = te_posteriors.shape[0] subsample_index = np.random.choice(np.arange(n_test), size=reduction) te_posterior_subsample = te_posteriors[subsample_index] if grid: _, best_band = self.choose_bandwidth_maxlikelihood_grid(*posteriors_subsample.Xy, te_posterior_subsample, classes) else: best_band = self.choose_bandwidth_maxlikelihood_search(*posteriors_subsample.Xy, te_posterior_subsample, classes) mix_densities = self.get_mixture_components(tr_posteriors, tr_y, classes, best_band) test_densities = [self.pdf(kde_i, te_posteriors) for kde_i in mix_densities] def neg_loglikelihood_prev(prev): test_mixture_likelihood = sum(prev_i * dens_i for prev_i, dens_i in zip(prev, test_densities)) test_loglikelihood = np.log(test_mixture_likelihood + epsilon) return -np.sum(test_loglikelihood) init_prev = np.full(fill_value=1 / n_classes, shape=(n_classes,)) pred_prev, neglikelihood = optim_minimize(neg_loglikelihood_prev, init_prev, return_loss=True) return pred_prev, best_band def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection): self.classif_predictions = classif_predictions return self def aggregate(self, posteriors: np.ndarray): return self.transduce(self.classif_predictions, posteriors) def choose_bandwidth_maxlikelihood_grid(self, tr_posteriors, tr_y, te_posteriors, classes): n_classes = len(classes) best_band = None best_like = None best_prev = None init_prev = np.full(fill_value=1 / n_classes, shape=(n_classes,)) for bandwidth in np.logspace(-4, 0.5, 50): mix_densities = self.get_mixture_components(tr_posteriors, tr_y, classes, bandwidth) test_densities = [self.pdf(kde_i, te_posteriors) for kde_i in mix_densities] def neg_loglikelihood_prev(prev): test_mixture_likelihood = sum(prev_i * dens_i for prev_i, dens_i in zip(prev, test_densities)) test_loglikelihood = np.log(test_mixture_likelihood + epsilon) return -np.sum(test_loglikelihood) pred_prev, neglikelihood = optim_minimize(neg_loglikelihood_prev, init_prev, return_loss=True) if best_like is None or neglikelihood < best_like: best_like = neglikelihood best_band = bandwidth best_prev = pred_prev print(f'best-like={best_like:.4f}') print(f'best-band={best_band:.4f}') return best_prev, best_band def choose_bandwidth_maxlikelihood_search(self, tr_posteriors, tr_y, te_posteriors, classes): n_classes = len(classes) init_prev = F.uniform_prevalence(n_classes) def neglikelihood_band(bandwidth): mix_densities = self.get_mixture_components(tr_posteriors, tr_y, classes, bandwidth[0]) test_densities = [self.pdf(kde_i, te_posteriors) for kde_i in mix_densities] def neg_loglikelihood_prev(prev): test_mixture_likelihood = sum(prev_i * dens_i for prev_i, dens_i in zip(prev, test_densities)) test_loglikelihood = np.log(test_mixture_likelihood + epsilon) return -np.sum(test_loglikelihood) pred_prev, neglikelihood = optim_minimize(neg_loglikelihood_prev, init_prev, return_loss=True) return neglikelihood bounds = [(0.0001, 0.2)] r = optimize.minimize(neglikelihood_band, x0=[0.001], method='SLSQP', bounds=bounds) best_band = r.x[0] assert r.success, 'Process did not converge!' print(f'solved in nit={r.nit}') return best_band def optim_minimize(loss: Callable, init_prev: np.ndarray, return_loss=False): """ Searches for the optimal prevalence values, i.e., an `n_classes`-dimensional vector of the (`n_classes`-1)-simplex that yields the smallest lost. This optimization is carried out by means of a constrained search using scipy's SLSQP routine. :param loss: (callable) the function to minimize :return: (ndarray) the best prevalence vector found """ n_classes = len(init_prev) # solutions are bounded to those contained in the unit-simplex bounds = tuple((0, 1) for _ in range(n_classes)) # values in [0,1] constraints = ({'type': 'eq', 'fun': lambda x: 1 - sum(x)}) # values summing up to 1 r = optimize.minimize(loss, x0=init_prev, method='SLSQP', bounds=bounds, constraints=constraints) # print(f'iterations-prevalence={r.nit}') # assert r.success, 'Process did not converge!' if return_loss: return r.x, r.fun else: return r.x class KDEyMLauto2(KDEyML): def __init__(self, classifier: BaseEstimator=None, val_split=5, bandwidth=0.1, random_state=None, reduction=100, max_reduced=500, target='likelihood', search='grid'): """ reduction: number of examples per class for automatically setting the bandwidth """ self.classifier = qp._get_classifier(classifier) self.val_split = val_split if bandwidth == 'auto': self.bandwidth = bandwidth else: self.bandwidth = KDEBase._check_bandwidth(bandwidth) self.reduction = reduction self.max_reduced = max_reduced self.random_state = random_state assert target in ['likelihood'] or target in qp.error.QUANTIFICATION_ERROR_NAMES, 'unknown target for auto' assert search in ['grid', 'optim'], 'unknown value for search' self.target = target self.search = search def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection): if self.bandwidth == 'auto': self.auto_bandwidth_likelihood(classif_predictions) else: self.bandwidth_ = self.bandwidth self.mix_densities = self.get_mixture_components(*classif_predictions.Xy, data.classes_, self.bandwidth_) return self def auto_bandwidth_likelihood(self, classif_predictions: LabelledCollection): n_classes = classif_predictions.n_classes train, val = classif_predictions.split_stratified(train_prop=0.5, random_state=self.random_state) if self.reduction is not None: # reduce samples to speed up computation tr_length = min(self.reduction * n_classes, self.max_reduced) if len(train) > tr_length: train = train.sampling(tr_length) init_prev = F.uniform_prevalence(n_classes=n_classes) repeats = 25 prot = UPP(val, sample_size=self.reduction, repeats=repeats, random_state=self.random_state) def eval_bandwidth(bandwidth): mix_densities = self.get_mixture_components(*train.Xy, train.classes_, bandwidth) loss_accum = 0 for (sample, prevtrue) in prot(): test_densities = [self.pdf(kde_i, sample) for kde_i in mix_densities] def neg_loglikelihood_prev(prev): test_mixture_likelihood = sum(prev_i * dens_i for prev_i, dens_i in zip(prev, test_densities)) test_loglikelihood = np.log(test_mixture_likelihood + epsilon) nll = -np.sum(test_loglikelihood) return nll if self.target == 'likelihood': loss_fn = neg_loglikelihood_prev else: loss_fn = lambda prev_hat: qp.error.from_name(self.target)(prev, prev_hat) pred_prev, neglikelihood = optim_minimize(loss_fn, init_prev, return_loss=True) loss_accum += neglikelihood return loss_accum if self.search == 'optim': r = optimize.minimize_scalar(eval_bandwidth, bounds=(0.001, 0.2), options={'xatol': 0.005}) best_band = r.x best_loss_value = r.fun nit = r.nit # assert r.success, 'Process did not converge!' elif self.search=='grid': nit=20 band_evals = [(band, eval_bandwidth(band)) for band in np.logspace(-4, np.log10(0.2), num=nit)] best_band, best_loss_value = sorted(band_evals, key=lambda x:x[1])[0] print(f'found bandwidth={best_band:.8f} after {nit=} iterations loss_val={best_loss_value:.5f})') self.bandwidth_ = best_band class KDEyMLred(KDEyML): def __init__(self, classifier: BaseEstimator=None, val_split=5, bandwidth=0.1, random_state=None, reduction=100, max_reduced=500): self.classifier = qp._get_classifier(classifier) self.val_split = val_split self.bandwidth = KDEBase._check_bandwidth(bandwidth) self.reduction = reduction self.max_reduced = max_reduced self.random_state = random_state def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection): n_classes = classif_predictions.n_classes tr_length = min(self.reduction * n_classes, self.max_reduced) if len(classif_predictions) > tr_length: classif_predictions = classif_predictions.sampling(tr_length) self.mix_densities = self.get_mixture_components(*classif_predictions.Xy, data.classes_, self.bandwidth) return self