1
0
Fork 0
QuaPy/ClassifierAccuracy/models_multiclass.py

337 lines
11 KiB
Python

import numpy as np
from sklearn.base import BaseEstimator
from sklearn.linear_model import LogisticRegression
import quapy as qp
from sklearn import clone
from sklearn.metrics import confusion_matrix
import scipy
from scipy.sparse import issparse, csr_matrix
from data import LabelledCollection
from abc import ABC, abstractmethod
from sklearn.model_selection import cross_val_predict
from quapy.protocol import UPP
from quapy.method.base import BaseQuantifier
from quapy.method.aggregative import PACC
import quapy.functional as F
class ClassifierAccuracyPrediction(ABC):
def __init__(self, h: BaseEstimator, acc: callable):
self.h = h
self.acc = acc
@abstractmethod
def fit(self, val: LabelledCollection):
...
def predict(self, X):
"""
Evaluates the accuracy function on the predicted contingency table
:param X: test data
:return: float
"""
return ...
def true_acc(self, sample: LabelledCollection):
y_pred = self.h.predict(sample.X)
y_true = sample.y
conf_table = confusion_matrix(y_true, y_pred=y_pred, labels=sample.classes_)
return self.acc(conf_table)
class CAPContingencyTable(ClassifierAccuracyPrediction):
def __init__(self, h: BaseEstimator, acc: callable):
self.h = h
self.acc = acc
@abstractmethod
def fit(self, val: LabelledCollection):
...
def predict(self, X):
"""
Evaluates the accuracy function on the predicted contingency table
:param X: test data
:return: float
"""
cont_table = self.predict_ct(X)
raw_acc = self.acc(cont_table)
norm_acc = np.clip(raw_acc, 0, 1)
return norm_acc
@abstractmethod
def predict_ct(self, X):
"""
Predicts the contingency table for the test data
:param X: test data
:return: a contingency table
"""
...
class NaiveCAP(CAPContingencyTable):
"""
The Naive CAP is a method that relies on the IID assumption, and thus uses the estimation in the validation data
as an estimate for the test data.
"""
def __init__(self, h: BaseEstimator, acc: callable):
super().__init__(h, acc)
def fit(self, val: LabelledCollection):
y_hat = self.h.predict(val.X)
y_true = val.y
self.cont_table = confusion_matrix(y_true, y_pred=y_hat, labels=val.classes_)
return self
def predict_ct(self, test):
"""
This method disregards the test set, under the assumption that it is IID wrt the training. This meaning that
the confusion matrix for the test data should coincide with the one computed for training (using any cross
validation strategy).
:param test: test collection (ignored)
:return: a confusion matrix in the return format of `sklearn.metrics.confusion_matrix`
"""
return self.cont_table
class ContTableTransferCAP(CAPContingencyTable):
"""
"""
def __init__(self, h: BaseEstimator, acc: callable, q: BaseQuantifier):
super().__init__(h, acc)
self.q = q
def fit(self, val: LabelledCollection):
y_hat = self.h.predict(val.X)
y_true = val.y
self.cont_table = confusion_matrix(y_true, y_pred=y_hat, labels=val.classes_)
self.train_prev = val.prevalence()
self.q.fit(val)
return self
def predict_ct(self, test):
"""
:param test: test collection (ignored)
:return: a confusion matrix in the return format of `sklearn.metrics.confusion_matrix`
"""
prev_hat = self.q.quantify(test)
adjustment = prev_hat / self.train_prev
return self.cont_table * adjustment[:, np.newaxis]
class ContTableWithHTransferCAP(CAPContingencyTable):
"""
"""
def __init__(self, h: BaseEstimator, acc: callable, q_class):
super().__init__(h, acc)
self.q = q_class(classifier=h)
def fit(self, val: LabelledCollection):
y_hat = self.h.predict(val.X)
y_true = val.y
self.cont_table = confusion_matrix(y_true, y_pred=y_hat, labels=val.classes_)
self.train_prev = val.prevalence()
self.q.fit(val, fit_classifier=False, val_split=val)
return self
def predict_ct(self, test):
"""
:param test: test collection (ignored)
:return: a confusion matrix in the return format of `sklearn.metrics.confusion_matrix`
"""
test_prev_estim = self.q.quantify(test)
adjustment = test_prev_estim / self.train_prev
return self.cont_table * adjustment[:, np.newaxis]
class NsquaredEquationsCAP(CAPContingencyTable):
"""
"""
def __init__(self, h: BaseEstimator, acc: callable, q_class, reuse_h=False):
super().__init__(h, acc)
self.reuse_h = reuse_h
if reuse_h:
self.q = q_class(classifier=h)
else:
self.q = q_class(classifier=LogisticRegression())
def fit(self, val: LabelledCollection):
y_hat = self.h.predict(val.X)
y_true = val.y
self.cont_table = confusion_matrix(y_true, y_pred=y_hat, labels=val.classes_)
if self.reuse_h:
self.q.fit(val, fit_classifier=False, val_split=val)
else:
self.q.fit(val)
self.A, self.partial_b = self._construct_equations()
return self
def _construct_equations(self):
# we need a n x n matrix of unknowns
n = self.cont_table.shape[1]
# I is the matrix of indexes of unknowns. For example, if we need the counts of
# all instances belonging to class i that have been classified as belonging to 0, 1, ..., n:
# the indexes of the corresponding unknowns are given by I[i,:]
I = np.arange(n * n).reshape(n, n)
# system of equations: Ax=b, A.shape=(n*n, n*n,), b.shape=(n*n,)
A = np.zeros(shape=(n * n, n * n))
b = np.zeros(shape=(n * n))
# first equation: the sum of all unknowns is 1
eq_no = 0
A[eq_no, :] = 1
b[eq_no] = 1
eq_no += 1
# (n-1)*(n-1) equations: the class cond rations should be the same in training and in test due to the
# PPS assumptions. Example in three classes, a ratio: a/(a+b+c) [test] = ar [a ratio in training]
# a / (a + b + c) = ar
# a = (a + b + c) * ar
# a = a ar + b ar + c ar
# a - a ar - b ar - c ar = 0
# a (1-ar) + b (-ar) + c (-ar) = 0
class_cond_ratios_tr = self.cont_table / self.cont_table.sum(axis=1, keepdims=True)
for i in range(1, n):
for j in range(1, n):
ratio_ij = class_cond_ratios_tr[i, j]
A[eq_no, I[i, :]] = -ratio_ij
A[eq_no, I[i, j]] = 1 - ratio_ij
b[eq_no] = 0
eq_no += 1
# n-1 equations: the sum of class-cond counts must equal the C&C prevalence prediction
for i in range(1, n):
A[eq_no, I[:, i]] = 1
#b[eq_no] = cc_prev_estim[i]
eq_no += 1
# n-1 equations: the sum of true true class-conditional positives must equal the class prev label in test
for i in range(1, n):
A[eq_no, I[i, :]] = 1
#b[eq_no] = q_prev_estim[i]
eq_no += 1
return A, b
def predict_ct(self, test):
"""
:param test: test collection (ignored)
:return: a confusion matrix in the return format of `sklearn.metrics.confusion_matrix`
"""
n = self.cont_table.shape[1]
h_label_preds = self.h.predict(test)
cc_prev_estim = F.prevalence_from_labels(h_label_preds, self.h.classes_)
q_prev_estim = self.q.quantify(test)
A = self.A
b = self.partial_b
# b is partially filled; we finish the vector by plugin in the classify and count
# prevalence estimates (n-1 values only), and the quantification estimates (n-1 values only)
b[-2*(n-1):-(n-1)] = cc_prev_estim[1:]
b[-(n-1):] = q_prev_estim[1:]
x = np.linalg.solve(A, b)
cont_table_test = x.reshape(n,n)
return cont_table_test
class SebastianiCAP(ClassifierAccuracyPrediction):
def __init__(self, h, acc_fn, q_class, n_val_samples=500, alpha=0.3):
self.h = h
self.acc = acc_fn
self.q = q_class(h)
self.n_val_samples = n_val_samples
self.alpha = alpha
self.sample_size = qp.environ['SAMPLE_SIZE']
def fit(self, val: LabelledCollection):
v2, v1 = val.split_stratified(train_prop=0.5)
self.q.fit(v1, fit_classifier=False, val_split=v1)
# precompute classifier predictions on samples
gen_samples = UPP(v2, repeats=self.n_val_samples, sample_size=self.sample_size, return_type='labelled_collection')
self.sigma_acc = [self.true_acc(sigma_i) for sigma_i in gen_samples()]
# precompute prevalence predictions on samples
gen_samples.on_preclassified_instances(self.q.classify(v2.X), in_place=True)
self.sigma_pred_prevs = [self.q.aggregate(sigma_i.X) for sigma_i in gen_samples()]
def predict(self, X):
test_pred_prev = self.q.quantify(X)
if self.alpha > 0:
# select samples from V2 with predicted prevalence close to the predicted prevalence for U
selected_accuracies = []
for pred_prev_i, acc_i in zip(self.sigma_pred_prevs, self.sigma_acc):
max_discrepancy = np.max(np.abs(pred_prev_i - test_pred_prev))
if max_discrepancy < self.alpha:
selected_accuracies.append(acc_i)
return np.median(selected_accuracies)
else:
# mean average, weights samples from V2 according to the closeness of predicted prevalence in U
accum_weight = 0
moving_mean = 0
epsilon = 10E-4
for pred_prev_i, acc_i in zip(self.sigma_pred_prevs, self.sigma_acc):
max_discrepancy = np.max(np.abs(pred_prev_i - test_pred_prev))
weight = -np.log(max_discrepancy+epsilon)
accum_weight += weight
moving_mean += (weight*acc_i)
return moving_mean/accum_weight
class PabloCAP(ClassifierAccuracyPrediction):
def __init__(self, h, acc_fn, q_class, n_val_samples=50, aggr='mean'):
self.h = h
self.acc = acc_fn
self.q = q_class(h)
self.n_val_samples = n_val_samples
self.aggr = aggr
assert aggr in ['mean', 'median'], 'unknown aggregation function, use mean or median'
def fit(self, val: LabelledCollection):
self.q.fit(val)
label_predictions = self.h.predict(val.X)
self.pre_classified = LabelledCollection(instances=label_predictions, labels=val.labels)
def predict(self, X):
pred_prev = F.smooth(self.q.quantify(X))
X_size = X.shape[0]
acc_estim = []
for _ in range(self.n_val_samples):
sigma_i = self.pre_classified.sampling(X_size, *pred_prev[:-1])
y_pred, y_true = sigma_i.Xy
conf_table = confusion_matrix(y_true, y_pred=y_pred, labels=sigma_i.classes_)
acc_i = self.acc(conf_table)
acc_estim.append(acc_i)
if self.aggr == 'mean':
return np.mean(acc_estim)
elif self.aggr == 'median':
return np.median(acc_estim)
else:
raise ValueError('unknown aggregation function')