import numpy as np import torch import quapy as qp from sklearn.multioutput import MultiOutputRegressor from sklearn.svm import SVR from LocalStack._neural import DistributionRegressor from data import LabelledCollection from quapy.method.base import BaseQuantifier from quapy.method.aggregative import AggregativeSoftQuantifier from tqdm import tqdm class LocalStackingQuantification(BaseQuantifier): def __init__(self, surrogate_quantifier, n_samples_gen=200, n_samples_sel=50, comparison_measure='ae'): assert isinstance(surrogate_quantifier, AggregativeSoftQuantifier), \ f'the surrogate quantifier must be of type {AggregativeSoftQuantifier.__class__.__name__}' self.surrogate_quantifier = surrogate_quantifier self.n_samples_gen = n_samples_gen self.n_samples_sel = n_samples_sel self.comparison_measure = qp.error.from_name(comparison_measure) def fit(self, data: LabelledCollection): train, val = data.split_stratified() self.surrogate_quantifier.fit(train) self.val_data = val return self def normalize(self, out_simplex:np.ndarray): in_simplex = out_simplex/out_simplex.sum() return in_simplex def quantify(self, instances: np.ndarray): assert hasattr(self, 'val_data'), 'quantify called before fit' pred_prevs = self.surrogate_quantifier.quantify(instances) test_size = instances.shape[0] samples = [] samples_pred_prevs = [] samples_distance = [] for i in range(self.n_samples_gen): sample_i = self.val_data.sampling(test_size, *pred_prevs) pred_prev_sample_i = self.surrogate_quantifier.quantify(sample_i.X) err_dist = self.comparison_measure(pred_prevs, pred_prev_sample_i) samples.append(sample_i) samples_pred_prevs.append(pred_prev_sample_i) samples_distance.append(err_dist) ord_distances = np.argsort(samples_distance) samples_sel = np.asarray(samples)[ord_distances][:self.n_samples_sel] samples_pred_prevs_sel = np.asarray(samples_pred_prevs)[ord_distances][:self.n_samples_sel] reg = MultiOutputRegressor(SVR(C=1000)) reg_X = samples_pred_prevs_sel reg_y = [s.prevalence() for s in samples_sel] reg.fit(reg_X, reg_y) corrected_prev = reg.predict([pred_prevs])[0] corrected_prev = self.normalize(corrected_prev) return corrected_prev class LocalStackingQuantification2(BaseQuantifier): """ Este en vez de seleccionar samples de training para los que la prevalencia predicha se parece a la prevalencia predica en test, saca directamente samples de training con la prevalencia predicha en test """ def __init__(self, surrogate_quantifier, n_samples_gen=200, n_samples_sel=50, comparison_measure='ae'): assert isinstance(surrogate_quantifier, AggregativeSoftQuantifier), \ f'the surrogate quantifier must be of type {AggregativeSoftQuantifier.__class__.__name__}' self.surrogate_quantifier = surrogate_quantifier self.n_samples_gen = n_samples_gen self.n_samples_sel = n_samples_sel self.comparison_measure = qp.error.from_name(comparison_measure) def fit(self, data: LabelledCollection): train, val = data.split_stratified() self.surrogate_quantifier.fit(train) self.val_data = val return self def normalize(self, out_simplex:np.ndarray): in_simplex = out_simplex/out_simplex.sum() return in_simplex def quantify(self, instances: np.ndarray): assert hasattr(self, 'val_data'), 'quantify called before fit' pred_prevs = self.surrogate_quantifier.quantify(instances) test_size = instances.shape[0] samples = [] samples_pred_prevs = [] for i in range(self.n_samples_gen): sample_i = self.val_data.sampling(test_size, *pred_prevs) pred_prev_sample_i = self.surrogate_quantifier.quantify(sample_i.X) samples.append(sample_i) samples_pred_prevs.append(pred_prev_sample_i) reg = MultiOutputRegressor(SVR()) reg_X = samples_pred_prevs reg_y = [s.prevalence() for s in samples] reg.fit(reg_X, reg_y) corrected_prev = reg.predict([pred_prevs])[0] corrected_prev = self.normalize(corrected_prev) return corrected_prev class LocalStackingQuantification3(BaseQuantifier): """ Este hace una red neuronal para el regresor y optimiza una metrica especifica """ def __init__(self, surrogate_quantifier, batch_size=100, target='ae'): assert isinstance(surrogate_quantifier, AggregativeSoftQuantifier), \ f'the surrogate quantifier must be of type {AggregativeSoftQuantifier.__class__.__name__}' self.surrogate_quantifier = surrogate_quantifier self.batch_size = batch_size self.target = target if target not in ['ae']: raise NotImplementedError('only AE supported') def fit(self, data: LabelledCollection): train, val = data.split_stratified() self.surrogate_quantifier.fit(train) self.val_data = val return self def gen_batch(self, test_size, pred_prevs): samples_true_prevs = [] samples_pred_prevs = [] for i in range(self.batch_size): sample_i = self.val_data.sampling(test_size, *pred_prevs) pred_prev_sample_i = self.surrogate_quantifier.quantify(sample_i.X) samples_true_prevs.append(sample_i.prevalence()) samples_pred_prevs.append(pred_prev_sample_i) samples_pred_prevs = torch.from_numpy(np.asarray(samples_pred_prevs)).float() samples_true_prevs = torch.from_numpy(np.asarray(samples_true_prevs)).float() return samples_true_prevs, samples_pred_prevs def quantify(self, instances: np.ndarray): import torch import torch.nn as nn assert hasattr(self, 'val_data'), 'quantify called before fit' pred_prevs = self.surrogate_quantifier.quantify(instances) test_size = instances.shape[0] n_classes = len(pred_prevs) reg = DistributionRegressor(n_classes) optimizer = torch.optim.Adam(reg.parameters(), lr=0.01) loss_fn = nn.L1Loss() reg.train() n_epochs = 500 best_loss = None PATIENCE = 10 patience = PATIENCE pbar = tqdm(range(n_epochs), total=n_epochs) for epoch in pbar: true_prev, pred_prev = self.gen_batch(test_size, pred_prevs) pred_prev_hat = reg(pred_prev) loss = loss_fn(pred_prev_hat, true_prev) optimizer.zero_grad() loss.backward() optimizer.step() loss_val = loss.item() pbar.set_description(f'loss={loss_val:.5f}') # early stop if best_loss is None or loss_val < best_loss: best_loss = loss_val patience = PATIENCE else: patience -= 1 if patience <= 0: print('\tearly stop!') break reg.eval() with torch.no_grad(): target_prev = torch.from_numpy(pred_prevs).float() corrected_prev = reg(target_prev) corrected_prev = corrected_prev.detach().numpy() return corrected_prev