QuaPy/LocalStack/method.py

207 lines
7.3 KiB
Python

import numpy as np
import torch
import quapy as qp
from sklearn.multioutput import MultiOutputRegressor
from sklearn.svm import SVR
from LocalStack._neural import DistributionRegressor
from quapy.data import LabelledCollection
from quapy.method.base import BaseQuantifier
from quapy.method.aggregative import AggregativeSoftQuantifier
from tqdm import tqdm
class LocalStackingQuantification(BaseQuantifier):
def __init__(self, surrogate_quantifier, n_samples_gen=200, n_samples_sel=50, comparison_measure='ae'):
assert isinstance(surrogate_quantifier, AggregativeSoftQuantifier), \
f'the surrogate quantifier must be of type {AggregativeSoftQuantifier.__class__.__name__}'
self.surrogate_quantifier = surrogate_quantifier
self.n_samples_gen = n_samples_gen
self.n_samples_sel = n_samples_sel
self.comparison_measure = qp.error.from_name(comparison_measure)
def fit(self, data: LabelledCollection):
train, val = data.split_stratified()
self.surrogate_quantifier.fit(train)
self.val_data = val
return self
def normalize(self, out_simplex:np.ndarray):
in_simplex = out_simplex/out_simplex.sum()
return in_simplex
def quantify(self, instances: np.ndarray):
assert hasattr(self, 'val_data'), 'quantify called before fit'
pred_prevs = self.surrogate_quantifier.quantify(instances)
test_size = instances.shape[0]
samples = []
samples_pred_prevs = []
samples_distance = []
for i in range(self.n_samples_gen):
sample_i = self.val_data.sampling(test_size, *pred_prevs, random_state=self.random_state)
pred_prev_sample_i = self.surrogate_quantifier.quantify(sample_i.X)
err_dist = self.comparison_measure(pred_prevs, pred_prev_sample_i)
samples.append(sample_i)
samples_pred_prevs.append(pred_prev_sample_i)
samples_distance.append(err_dist)
ord_distances = np.argsort(samples_distance)
samples_sel = np.asarray(samples)[ord_distances][:self.n_samples_sel]
samples_pred_prevs_sel = np.asarray(samples_pred_prevs)[ord_distances][:self.n_samples_sel]
reg = MultiOutputRegressor(SVR())
reg_X = samples_pred_prevs_sel
reg_y = [s.prevalence() for s in samples_sel]
reg.fit(reg_X, reg_y)
corrected_prev = reg.predict([pred_prevs])[0]
corrected_prev = self.normalize(corrected_prev)
return corrected_prev
class LocalStackingQuantification2(BaseQuantifier):
"""
Este en vez de seleccionar samples de training para los que la prevalencia predicha se parece a la prevalencia
predica en test, saca directamente samples de training con la prevalencia predicha en test
"""
def __init__(self, surrogate_quantifier, n_samples_gen=200, n_samples_sel=50, comparison_measure='ae'):
assert isinstance(surrogate_quantifier, AggregativeSoftQuantifier), \
f'the surrogate quantifier must be of type {AggregativeSoftQuantifier.__class__.__name__}'
self.surrogate_quantifier = surrogate_quantifier
self.n_samples_gen = n_samples_gen
self.n_samples_sel = n_samples_sel
self.comparison_measure = qp.error.from_name(comparison_measure)
def fit(self, data: LabelledCollection):
train, val = data.split_stratified()
self.surrogate_quantifier.fit(train)
self.val_data = val
return self
def normalize(self, out_simplex:np.ndarray):
in_simplex = out_simplex/out_simplex.sum()
return in_simplex
def quantify(self, instances: np.ndarray):
assert hasattr(self, 'val_data'), 'quantify called before fit'
pred_prevs = self.surrogate_quantifier.quantify(instances)
test_size = instances.shape[0]
samples = []
samples_pred_prevs = []
for i in range(self.n_samples_gen):
sample_i = self.val_data.sampling(test_size, *pred_prevs)
pred_prev_sample_i = self.surrogate_quantifier.quantify(sample_i.X)
samples.append(sample_i)
samples_pred_prevs.append(pred_prev_sample_i)
reg = MultiOutputRegressor(SVR())
reg_X = samples_pred_prevs
reg_y = [s.prevalence() for s in samples]
reg.fit(reg_X, reg_y)
corrected_prev = reg.predict([pred_prevs])[0]
corrected_prev = self.normalize(corrected_prev)
return corrected_prev
class LocalStackingQuantification3(BaseQuantifier):
"""
Este hace una red neuronal para el regresor y optimiza una metrica especifica
"""
def __init__(self, surrogate_quantifier, batch_size=100, target='ae'):
assert isinstance(surrogate_quantifier, AggregativeSoftQuantifier), \
f'the surrogate quantifier must be of type {AggregativeSoftQuantifier.__class__.__name__}'
self.surrogate_quantifier = surrogate_quantifier
self.batch_size = batch_size
self.target = target
if target not in ['ae']:
raise NotImplementedError('only AE supported')
def fit(self, data: LabelledCollection):
train, val = data.split_stratified()
self.surrogate_quantifier.fit(train)
self.val_data = val
return self
def gen_batch(self, test_size, pred_prevs):
samples_true_prevs = []
samples_pred_prevs = []
for i in range(self.batch_size):
sample_i = self.val_data.sampling(test_size, *pred_prevs)
pred_prev_sample_i = self.surrogate_quantifier.quantify(sample_i.X)
samples_true_prevs.append(sample_i.prevalence())
samples_pred_prevs.append(pred_prev_sample_i)
samples_pred_prevs = torch.from_numpy(np.asarray(samples_pred_prevs)).float()
samples_true_prevs = torch.from_numpy(np.asarray(samples_true_prevs)).float()
return samples_true_prevs, samples_pred_prevs
def quantify(self, instances: np.ndarray):
import torch
import torch.nn as nn
assert hasattr(self, 'val_data'), 'quantify called before fit'
pred_prevs = self.surrogate_quantifier.quantify(instances)
test_size = instances.shape[0]
n_classes = len(pred_prevs)
reg = DistributionRegressor(n_classes)
optimizer = torch.optim.Adam(reg.parameters(), lr=0.01)
loss_fn = nn.L1Loss()
reg.train()
n_epochs = 500
best_loss = None
PATIENCE = 10
patience = PATIENCE
pbar = tqdm(range(n_epochs), total=n_epochs)
for epoch in pbar:
true_prev, pred_prev = self.gen_batch(test_size, pred_prevs)
pred_prev_hat = reg(pred_prev)
loss = loss_fn(pred_prev_hat, true_prev)
optimizer.zero_grad()
loss.backward()
optimizer.step()
loss_val = loss.item()
pbar.set_description(f'loss={loss_val:.5f}')
# early stop
if best_loss is None or loss_val < best_loss:
best_loss = loss_val
patience = PATIENCE
else:
patience -= 1
if patience <= 0:
print('\tearly stop!')
break
reg.eval()
with torch.no_grad():
target_prev = torch.from_numpy(pred_prevs).float()
corrected_prev = reg(target_prev)
corrected_prev = corrected_prev.detach().numpy()
return corrected_prev