1
0
Fork 0
QuaPy/ClassifierAccuracy/models_multiclass.py

293 lines
9.2 KiB
Python

import numpy as np
from sklearn.base import BaseEstimator
import quapy as qp
from sklearn import clone
from sklearn.metrics import confusion_matrix
import scipy
from scipy.sparse import issparse, csr_matrix
from data import LabelledCollection
from abc import ABC, abstractmethod
from sklearn.model_selection import cross_val_predict
from quapy.method.base import BaseQuantifier
from quapy.method.aggregative import PACC
import quapy.functional as F
class ClassifierAccuracyPrediction(ABC):
def __init__(self, h: BaseEstimator, acc: callable):
self.h = h
self.acc = acc
@abstractmethod
def fit(self, val: LabelledCollection):
...
def predict(self, X):
"""
Evaluates the accuracy function on the predicted contingency table
:param X: test data
:return: float
"""
return self.acc(self.predict_ct(X))
@abstractmethod
def predict_ct(self, X):
"""
Predicts the contingency table for the test data
:param X: test data
:return: a contingency table
"""
...
class NaiveCAP(ClassifierAccuracyPrediction):
"""
The Naive CAP is a method that relies on the IID assumption, and thus uses the estimation in the validation data
as an estimate for the test data.
"""
def __init__(self, h: BaseEstimator, acc: callable):
super().__init__(h, acc)
def fit(self, val: LabelledCollection):
y_hat = self.h.predict(val.X)
y_true = val.y
self.cont_table = confusion_matrix(y_true, y_pred=y_hat, labels=val.classes_)
return self
def predict_ct(self, test):
"""
This method disregards the test set, under the assumption that it is IID wrt the training. This meaning that
the confusion matrix for the test data should coincide with the one computed for training (using any cross
validation strategy).
:param test: test collection (ignored)
:return: a confusion matrix in the return format of `sklearn.metrics.confusion_matrix`
"""
return self.cont_table
class ContTableTransferCAP(ClassifierAccuracyPrediction):
"""
"""
def __init__(self, h: BaseEstimator, acc: callable, q: BaseQuantifier):
super().__init__(h, acc)
self.q = q
def fit(self, val: LabelledCollection):
y_hat = self.h.predict(val.X)
y_true = val.y
self.cont_table = confusion_matrix(y_true, y_pred=y_hat, labels=val.classes_)
self.train_prev = val.prevalence()
self.q.fit(val)
return self
def predict_ct(self, test):
"""
:param test: test collection (ignored)
:return: a confusion matrix in the return format of `sklearn.metrics.confusion_matrix`
"""
prev_hat = self.q.quantify(test)
adjustment = prev_hat / self.train_prev
return self.cont_table * adjustment[:, np.newaxis]
class ContTableWithHTransferCAP(ClassifierAccuracyPrediction):
"""
"""
def __init__(self, h: BaseEstimator, acc: callable, q_class):
super().__init__(h, acc)
self.q = q_class(classifier=h)
def fit(self, val: LabelledCollection):
y_hat = self.h.predict(val.X)
y_true = val.y
self.cont_table = confusion_matrix(y_true, y_pred=y_hat, labels=val.classes_)
self.train_prev = val.prevalence()
self.q.fit(val, fit_classifier=False, val_split=val)
return self
def predict_ct(self, test):
"""
:param test: test collection (ignored)
:return: a confusion matrix in the return format of `sklearn.metrics.confusion_matrix`
"""
test_prev_estim = self.q.quantify(test)
adjustment = test_prev_estim / self.train_prev
return self.cont_table * adjustment[:, np.newaxis]
class NsquaredEquationsCAP(ClassifierAccuracyPrediction):
"""
"""
def __int__(self, h: BaseEstimator, acc: callable, q_class):
super().__init__(h, acc)
self.q = q_class(classifier=h)
def fit(self, val: LabelledCollection):
y_hat = self.h.predict(val.X)
y_true = val.y
self.cont_table = confusion_matrix(y_true, y_pred=y_hat, labels=val.classes_)
self.q.fit(val, fit_classifier=False, val_split=val)
return self
def predict_ct(self, test):
"""
:param test: test collection (ignored)
:return: a confusion matrix in the return format of `sklearn.metrics.confusion_matrix`
"""
# we need a n x n matrix of unknowns
n = self.cont_table.shape[1]
I = np.arange(n*n).reshape(n,n)
h_label_preds = self.h.predict(test)
cc_prev_estim = F.prevalence_from_labels(h_label_preds, self.h.classes_)
q_prev_estim = self.q.quantify(test)
A = np.zeros_like(self.cont_table)
b = np.zeros(n)
# first equation: the sum of all unknowns is 1
eq_no = 0
A[eq_no, :] = 1
b[eq_no] = 1
eq_no += 1
# n-1 equations: the sum of class-cond predictions must equal the sum of predictions
for i in range(n-1):
A[eq_no + i, I[:, i+1]] = 1
b[eq_no + i] = cc_prev_estim[i+1]
eq_no += (n-1)
# n-1 equations: the sum of true true class-conditional positives must equal the class prev label in test
for i in range(n-1):
A[eq_no + i, I[i+1, :]] = 1
b[eq_no + i] = q_prev_estim[i+1]
# (n-1)*(n-1) equations: the class cond rations should be the same in training and in test due to the
# PPS assumptions
class UpperBound(ClassifierAccuracyPrediction):
def __init__(self, classifier, y_test):
self.classifier = classifier
self.y_test = y_test
def fit(self, train: LabelledCollection):
self.classifier.fit(*train.Xy)
self.classes = train.classes_
return self
def show_true_labels(self, y_test):
self.y_test = y_test
def predict(self, test):
predictions = self.classifier.predict(test)
return confusion_matrix(self.y_test, predictions, labels=self.classes)
def get_counters(y_true, y_pred):
counters = np.full(shape=y_true.shape, fill_value=-1)
counters[np.logical_and(y_true == 1, y_pred == 1)] = 0
counters[np.logical_and(y_true == 1, y_pred == 0)] = 1
counters[np.logical_and(y_true == 0, y_pred == 1)] = 2
counters[np.logical_and(y_true == 0, y_pred == 0)] = 3
class_map = {
0:'tp',
1:'fn',
2:'fp',
3:'tn'
}
return counters, class_map
def safehstack(matrix, posteriors):
if issparse(matrix):
instances = csr_matrix(scipy.sparse.hstack([matrix, posteriors]))
else:
instances = np.hstack([matrix, posteriors])
return instances
class QuantificationCMPredictor(ClassifierAccuracyPrediction):
"""
"""
def __init__(self, classifier, quantifier, strategy='kfcv', **kwargs):
assert strategy in ['kfcv'], 'unknown strategy'
if strategy=='kfcv':
assert 'k' in kwargs, 'strategy "kfcv" requires "k" to be passed as an argument'
self.classifier = clone(classifier)
self.quantifier = quantifier
self.strategy = strategy
self.kwargs = kwargs
def sout(self, msg):
if 'verbose' in self.kwargs:
print(msg)
def fit(self, train: LabelledCollection):
X, y = train.Xy
if self.strategy == 'kfcv':
k=self.kwargs['k']
n_jobs = self.kwargs['n_jobs'] if 'n_jobs' in self.kwargs else 1
self.sout(f'{self.__class__.__name__}: '
f'running cross_val_predict with k={k} n_jobs={n_jobs}')
predictions = cross_val_predict(self.classifier, X, y, cv=k, n_jobs=n_jobs, method='predict')
posteriors = cross_val_predict(self.classifier, X, y, cv=k, n_jobs=n_jobs, method='predict_proba')
self.classifier.fit(X, y)
instances = safehstack(train.instances, posteriors)
counters, class_map = get_counters(train.labels, predictions)
q_data = LabelledCollection(instances=instances, labels=counters, classes_=[0,1,2,3])
print('counters prevalence', q_data.counts())
self.quantifier.fit(q_data)
return self
def predict(self, test):
"""
:param test: test collection (ignored)
:return: a confusion matrix in the return format of `sklearn.metrics.confusion_matrix`
"""
posteriors = self.classifier.predict_proba(test)
instances = safehstack(test, posteriors)
counters = self.quantifier.quantify(instances)
tp, fn, fp, tn = counters
conf_matrix = np.asarray([[tn, fp], [fn, tp]])
return conf_matrix
def quantify(self, test):
posteriors = self.classifier.predict_proba(test)
instances = safehstack(test, posteriors)
counters = self.quantifier.quantify(instances)
tp, fn, fp, tn = counters
den_tpr = (tp+fn)
if den_tpr>0:
tpr = tp/den_tpr
else:
tpr = 1
den_fpr = (fp+tn)
if den_fpr>0:
fpr = fp / den_fpr
else:
fpr = 0
pcc = posteriors.sum(axis=0)[1]
pacc = (pcc-fpr)/(tpr-fpr)
pacc = np.clip(pacc, 0, 1)
q = tp+fn
return q