This commit is contained in:
Lorenzo Volpi 2024-04-04 17:09:05 +02:00
parent fd8dae5ceb
commit 558c3231a3
22 changed files with 0 additions and 4636 deletions

View File

@ -1,376 +0,0 @@
from typing import List, Tuple
import numpy as np
import scipy.sparse as sp
from quapy.data import LabelledCollection
# Extended classes
#
# 0 ~ True 0
# 1 ~ False 1
# 2 ~ False 0
# 3 ~ True 1
# _____________________
# | | |
# | True 0 | False 1 |
# |__________|__________|
# | | |
# | False 0 | True 1 |
# |__________|__________|
#
def _split_index_by_pred(pred_proba: np.ndarray) -> List[np.ndarray]:
_pred_label = np.argmax(pred_proba, axis=1)
return [(_pred_label == cl).nonzero()[0] for cl in np.arange(pred_proba.shape[1])]
class ExtensionPolicy:
def __init__(self, collapse_false=False, group_false=False, dense=False):
self.collapse_false = collapse_false
self.group_false = group_false
self.dense = dense
def qclasses(self, nbcl):
if self.collapse_false:
return np.arange(nbcl + 1)
elif self.group_false:
return np.arange(nbcl * 2)
return np.arange(nbcl**2)
def eclasses(self, nbcl):
return np.arange(nbcl**2)
def tfp_classes(self, nbcl):
if self.group_false:
return np.arange(2)
else:
return np.arange(nbcl)
def matrix_idx(self, nbcl):
if self.collapse_false:
_idxs = np.array([[i, i] for i in range(nbcl)] + [[0, 1]]).T
return tuple(_idxs)
elif self.group_false:
diag_idxs = np.diag_indices(nbcl)
sub_diag_idxs = tuple(
np.array([((i + 1) % nbcl, i) for i in range(nbcl)]).T
)
return tuple(np.concatenate(axis) for axis in zip(diag_idxs, sub_diag_idxs))
# def mask_fn(m, k):
# n = m.shape[0]
# d = np.diag(np.tile(1, n))
# d[tuple(zip(*[(i, (i + 1) % n) for i in range(n)]))] = 1
# return d
# _mi = np.mask_indices(nbcl, mask_func=mask_fn)
# print(_mi)
# return _mi
else:
_idxs = np.indices((nbcl, nbcl))
return _idxs[0].flatten(), _idxs[1].flatten()
def ext_lbl(self, nbcl):
if self.collapse_false:
def cf_fun(t, p):
return t if t == p else nbcl
return np.vectorize(cf_fun, signature="(),()->()")
elif self.group_false:
def gf_fun(t, p):
# if t < nbcl - 1:
# return t * 2 if t == p else (t * 2) + 1
# else:
# return t * 2 if t != p else (t * 2) + 1
return p if t == p else nbcl + p
return np.vectorize(gf_fun, signature="(),()->()")
else:
def default_fn(t, p):
return t * nbcl + p
return np.vectorize(default_fn, signature="(),()->()")
def true_lbl_from_pred(self, nbcl):
if self.group_false:
return np.vectorize(lambda t, p: 0 if t == p else 1, signature="(),()->()")
else:
return np.vectorize(lambda t, p: t, signature="(),()->()")
def can_f1(self, nbcl):
return nbcl == 2 or (not self.collapse_false and not self.group_false)
class ExtendedData:
def __init__(
self,
instances: np.ndarray | sp.csr_matrix,
pred_proba: np.ndarray,
ext: np.ndarray = None,
extpol=None,
):
self.extpol = ExtensionPolicy() if extpol is None else extpol
self.b_instances_ = instances
self.pred_proba_ = pred_proba
self.ext_ = ext
self.instances = self.__extend_instances(instances, pred_proba, ext=ext)
def __extend_instances(
self,
instances: np.ndarray | sp.csr_matrix,
pred_proba: np.ndarray,
ext: np.ndarray = None,
) -> np.ndarray | sp.csr_matrix:
to_append = ext
if ext is None:
to_append = pred_proba
if isinstance(instances, sp.csr_matrix):
if self.extpol.dense:
n_x = to_append
else:
n_x = sp.hstack([instances, sp.csr_matrix(to_append)], format="csr")
elif isinstance(instances, np.ndarray):
_concat = [instances, to_append] if not self.extpol.dense else [to_append]
n_x = np.concatenate(_concat, axis=1)
else:
raise ValueError("Unsupported matrix format")
return n_x
@property
def X(self):
return self.instances
@property
def nbcl(self):
return self.pred_proba_.shape[1]
def split_by_pred(self, _indexes: List[np.ndarray] | None = None):
def _empty_matrix():
if isinstance(self.instances, np.ndarray):
return np.asarray([], dtype=int)
elif isinstance(self.instances, sp.csr_matrix):
return sp.csr_matrix(np.empty((0, 0), dtype=int))
if _indexes is None:
_indexes = _split_index_by_pred(self.pred_proba_)
_instances = [
self.instances[ind] if ind.shape[0] > 0 else _empty_matrix()
for ind in _indexes
]
return _instances
def __len__(self):
return self.instances.shape[0]
class ExtendedLabels:
def __init__(
self,
true: np.ndarray,
pred: np.ndarray,
nbcl: np.ndarray,
extpol: ExtensionPolicy = None,
):
self.extpol = ExtensionPolicy() if extpol is None else extpol
self.true = true
self.pred = pred
self.nbcl = nbcl
@property
def y(self):
return self.extpol.ext_lbl(self.nbcl)(self.true, self.pred)
@property
def classes(self):
return self.extpol.qclasses(self.nbcl)
def __getitem__(self, idx):
return ExtendedLabels(self.true[idx], self.pred[idx], self.nbcl)
def split_by_pred(self, _indexes: List[np.ndarray]):
_labels = []
for cl, ind in enumerate(_indexes):
_true, _pred = self.true[ind], self.pred[ind]
assert (
_pred.shape[0] == 0 or (_pred == _pred[0]).all()
), "index is selecting non uniform class"
_tfp = self.extpol.true_lbl_from_pred(self.nbcl)(_true, _pred)
_labels.append(_tfp)
return _labels, self.extpol.tfp_classes(self.nbcl)
class ExtendedPrev:
def __init__(
self,
flat: np.ndarray,
nbcl: int,
extpol: ExtensionPolicy = None,
):
self.flat = flat
self.nbcl = nbcl
self.extpol = ExtensionPolicy() if extpol is None else extpol
# self._matrix = self.__build_matrix()
def __build_matrix(self):
_matrix = np.zeros((self.nbcl, self.nbcl))
_matrix[self.extpol.matrix_idx(self.nbcl)] = self.flat
return _matrix
def can_f1(self):
return self.extpol.can_f1(self.nbcl)
@property
def A(self):
# return self._matrix
return self.__build_matrix()
@property
def classes(self):
return self.extpol.qclasses(self.nbcl)
class ExtMulPrev(ExtendedPrev):
def __init__(
self,
flat: np.ndarray,
nbcl: int,
q_classes: list = None,
extpol: ExtensionPolicy = None,
):
super().__init__(flat, nbcl, extpol=extpol)
self.flat = self.__check_q_classes(q_classes, flat)
def __check_q_classes(self, q_classes, flat):
if q_classes is None:
return flat
q_classes = np.array(q_classes)
_flat = np.zeros(self.extpol.qclasses(self.nbcl).shape)
_flat[q_classes] = flat
return _flat
class ExtBinPrev(ExtendedPrev):
def __init__(
self,
flat: List[np.ndarray],
nbcl: int,
q_classes: List[List[int]] = None,
extpol: ExtensionPolicy = None,
):
super().__init__(flat, nbcl, extpol=extpol)
flat = self.__check_q_classes(q_classes, flat)
self.flat = self.__build_flat(flat)
def __check_q_classes(self, q_classes, flat):
if q_classes is None:
return flat
_flat = []
for fl, qc in zip(flat, q_classes):
qc = np.array(qc)
_fl = np.zeros(self.extpol.tfp_classes(self.nbcl).shape)
_fl[qc] = fl
_flat.append(_fl)
return np.array(_flat)
def __build_flat(self, flat):
return np.concatenate(flat.T)
class ExtendedCollection(LabelledCollection):
def __init__(
self,
instances: np.ndarray | sp.csr_matrix,
labels: np.ndarray,
pred_proba: np.ndarray = None,
ext: np.ndarray = None,
extpol=None,
):
self.extpol = ExtensionPolicy() if extpol is None else extpol
e_data, e_labels = self.__extend_collection(
instances=instances,
labels=labels,
pred_proba=pred_proba,
ext=ext,
)
self.e_data_ = e_data
self.e_labels_ = e_labels
super().__init__(e_data.X, e_labels.y, classes=e_labels.classes)
@classmethod
def from_lc(
cls,
lc: LabelledCollection,
pred_proba: np.ndarray,
ext: np.ndarray = None,
extpol=None,
):
return ExtendedCollection(
lc.X, lc.y, pred_proba=pred_proba, ext=ext, extpol=extpol
)
@property
def pred_proba(self):
return self.e_data_.pred_proba_
@property
def ext(self):
return self.e_data_.ext_
@property
def eX(self):
return self.e_data_
@property
def ey(self):
return self.e_labels_
@property
def n_base_classes(self):
return self.e_labels_.nbcl
@property
def n_classes(self):
return len(self.e_labels_.classes)
def e_prevalence(self) -> ExtendedPrev:
_prev = self.prevalence()
return ExtendedPrev(_prev, self.n_base_classes, extpol=self.extpol)
def split_by_pred(self):
_indexes = _split_index_by_pred(self.pred_proba)
_instances = self.e_data_.split_by_pred(_indexes)
# _labels = [self.ey[ind] for ind in _indexes]
_labels, _cls = self.e_labels_.split_by_pred(_indexes)
return [
LabelledCollection(inst, lbl, classes=_cls)
for inst, lbl in zip(_instances, _labels)
]
def __extend_collection(
self,
instances: sp.csr_matrix | np.ndarray,
labels: np.ndarray,
pred_proba: np.ndarray,
ext: np.ndarray = None,
extpol=None,
) -> Tuple[ExtendedData, ExtendedLabels]:
n_classes = pred_proba.shape[1]
# n_X = [ X | predicted probs. ]
e_instances = ExtendedData(instances, pred_proba, ext=ext, extpol=self.extpol)
# n_y = (exptected y, predicted y)
preds = np.argmax(pred_proba, axis=-1)
e_labels = ExtendedLabels(labels, preds, n_classes, extpol=self.extpol)
return e_instances, e_labels

View File

@ -1,86 +0,0 @@
from contextlib import contextmanager
import numpy as np
import quapy as qp
import yaml
class environ:
_default_env = {
"DATASET_NAME": None,
"DATASET_TARGET": None,
"METRICS": [],
"COMP_ESTIMATORS": [],
"DATASET_N_PREVS": 9,
"DATASET_PREVS": None,
"OUT_DIR_NAME": "output",
"OUT_DIR": None,
"PLOT_DIR_NAME": "plot",
"PLOT_OUT_DIR": None,
"DATASET_DIR_UPDATE": False,
"PROTOCOL_N_PREVS": 21,
"PROTOCOL_REPEATS": 100,
"SAMPLE_SIZE": 1000,
# "PLOT_ESTIMATORS": [],
"PLOT_STDEV": False,
"_R_SEED": 0,
"N_JOBS": 1,
}
_keys = list(_default_env.keys())
def __init__(self):
self.__load_file()
def __load_file(self):
_state = environ._default_env.copy()
with open("conf.yaml", "r") as f:
confs = yaml.safe_load(f)["exec"]
_state = _state | confs["global"]
self.__setdict(_state)
self._confs = confs["confs"]
def __setdict(self, d: dict):
for k, v in d.items():
super().__setattr__(k, v)
match k:
case "SAMPLE_SIZE":
qp.environ["SAMPLE_SIZE"] = v
case "_R_SEED":
qp.environ["_R_SEED"] = v
np.random.seed(v)
def to_dict(self) -> dict:
return {k: self.__getattribute__(k) for k in environ._keys}
@property
def confs(self):
return self._confs.copy()
@contextmanager
def load(self, conf):
__current = self.to_dict()
__np_random_state = np.random.get_state()
if conf is None:
conf = {}
if isinstance(conf, environ):
conf = conf.to_dict()
self.__setdict(conf)
try:
yield
finally:
self.__setdict(__current)
np.random.set_state(__np_random_state)
def load_confs(self):
for c in self.confs:
with self.load(c):
yield c
env = environ()

View File

@ -1,115 +0,0 @@
from functools import wraps
import numpy as np
import quapy.functional as F
import sklearn.metrics as metrics
from quapy.method.aggregative import ACC, EMQ
from sklearn import clone
from sklearn.linear_model import LogisticRegression
import quacc as qc
from quacc.evaluation.report import EvaluationReport
_alts = {}
def alt(func):
@wraps(func)
def wrapper(c_model, validation, protocol):
return func(c_model, validation, protocol)
wrapper.name = func.__name__
_alts[func.__name__] = wrapper
return wrapper
@alt
def cross(c_model, validation, protocol):
y_val = validation.labels
y_hat_val = c_model.predict(validation.instances)
qcls = clone(c_model)
qcls.fit(*validation.Xy)
er = EvaluationReport(name="cross")
for sample in protocol():
y_hat = c_model.predict(sample.instances)
y = sample.labels
ground_acc = (y_hat == y).mean()
ground_f1 = metrics.f1_score(y, y_hat, zero_division=0)
q = EMQ(qcls)
q.fit(validation, fit_classifier=False)
M_hat = ACC.getPteCondEstim(validation.classes_, y_val, y_hat_val)
p_hat = q.quantify(sample.instances)
cont_table_hat = p_hat * M_hat
acc_score = qc.error.acc(cont_table_hat)
f1_score = qc.error.f1(cont_table_hat)
meta_acc = abs(acc_score - ground_acc)
meta_f1 = abs(f1_score - ground_f1)
er.append_row(
sample.prevalence(),
acc=meta_acc,
f1=meta_f1,
acc_score=acc_score,
f1_score=f1_score,
)
return er
@alt
def cross2(c_model, validation, protocol):
classes = validation.classes_
y_val = validation.labels
y_hat_val = c_model.predict(validation.instances)
M_hat = ACC.getPteCondEstim(classes, y_val, y_hat_val)
pos_prev_val = validation.prevalence()[1]
er = EvaluationReport(name="cross2")
for sample in protocol():
y_test = sample.labels
y_hat_test = c_model.predict(sample.instances)
ground_acc = (y_hat_test == y_test).mean()
ground_f1 = metrics.f1_score(y_test, y_hat_test, zero_division=0)
pos_prev_cc = F.prevalence_from_labels(y_hat_test, classes)[1]
tpr_hat = M_hat[1, 1]
fpr_hat = M_hat[1, 0]
tnr_hat = M_hat[0, 0]
pos_prev_test_hat = (pos_prev_cc - fpr_hat) / (tpr_hat - fpr_hat)
pos_prev_test_hat = np.clip(pos_prev_test_hat, 0, 1)
if pos_prev_val > 0.5:
# in this case, the tpr might be a more reliable estimate than tnr
A = np.asarray(
[[0, 0, 1, 1], [0, 1, 0, 1], [1, 1, 1, 1], [0, tpr_hat, 0, tpr_hat - 1]]
)
else:
# in this case, the tnr might be a more reliable estimate than tpr
A = np.asarray(
[[0, 0, 1, 1], [0, 1, 0, 1], [1, 1, 1, 1], [tnr_hat - 1, 0, tnr_hat, 0]]
)
b = np.asarray([pos_prev_cc, pos_prev_test_hat, 1, 0])
tn, fn, fp, tp = np.linalg.solve(A, b)
cont_table_hat = np.array([[tn, fp], [fn, tp]])
acc_score = qc.error.acc(cont_table_hat)
f1_score = qc.error.f1(cont_table_hat)
meta_acc = abs(acc_score - ground_acc)
meta_f1 = abs(f1_score - ground_f1)
er.append_row(
sample.prevalence(),
acc=meta_acc,
f1=meta_f1,
acc_score=acc_score,
f1_score=f1_score,
)
return er

View File

@ -1,590 +0,0 @@
from functools import wraps
from statistics import mean
import numpy as np
import sklearn.metrics as metrics
from quapy.data import LabelledCollection
from quapy.protocol import APP, AbstractStochasticSeededProtocol
from scipy.sparse import issparse
from sklearn.base import BaseEstimator
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import cross_validate
import baselines.atc as atc
import baselines.doc as doclib
import baselines.gde as gdelib
import baselines.impweight as iw
import baselines.mandoline as mandolib
import baselines.rca as rcalib
from baselines.utils import clone_fit
from quacc.environment import env
from .report import EvaluationReport
_baselines = {}
def baseline(func):
@wraps(func)
def wrapper(c_model, validation, protocol):
return func(c_model, validation, protocol)
wrapper.name = func.__name__
_baselines[func.__name__] = wrapper
return wrapper
@baseline
def kfcv(
c_model: BaseEstimator,
validation: LabelledCollection,
protocol: AbstractStochasticSeededProtocol,
predict_method="predict",
):
c_model_predict = getattr(c_model, predict_method)
f1_average = "binary" if validation.n_classes == 2 else "macro"
scoring = ["accuracy", "f1_macro"]
scores = cross_validate(c_model, validation.X, validation.y, scoring=scoring)
acc_score = mean(scores["test_accuracy"])
f1_score = mean(scores["test_f1_macro"])
report = EvaluationReport(name="kfcv")
for test in protocol():
test_preds = c_model_predict(test.X)
meta_acc = abs(acc_score - metrics.accuracy_score(test.y, test_preds))
meta_f1 = abs(
f1_score - metrics.f1_score(test.y, test_preds, average=f1_average)
)
report.append_row(
test.prevalence(),
acc_score=acc_score,
f1_score=f1_score,
acc=meta_acc,
f1=meta_f1,
)
return report
@baseline
def ref(
c_model: BaseEstimator,
validation: LabelledCollection,
protocol: AbstractStochasticSeededProtocol,
):
c_model_predict = getattr(c_model, "predict")
f1_average = "binary" if validation.n_classes == 2 else "macro"
report = EvaluationReport(name="ref")
for test in protocol():
test_preds = c_model_predict(test.X)
report.append_row(
test.prevalence(),
acc_score=metrics.accuracy_score(test.y, test_preds),
f1_score=metrics.f1_score(test.y, test_preds, average=f1_average),
)
return report
@baseline
def naive(
c_model: BaseEstimator,
validation: LabelledCollection,
protocol: AbstractStochasticSeededProtocol,
predict_method="predict",
):
c_model_predict = getattr(c_model, predict_method)
f1_average = "binary" if validation.n_classes == 2 else "macro"
val_preds = c_model_predict(validation.X)
val_acc = metrics.accuracy_score(validation.y, val_preds)
val_f1 = metrics.f1_score(validation.y, val_preds, average=f1_average)
report = EvaluationReport(name="naive")
for test in protocol():
test_preds = c_model_predict(test.X)
test_acc = metrics.accuracy_score(test.y, test_preds)
test_f1 = metrics.f1_score(test.y, test_preds, average=f1_average)
meta_acc = abs(val_acc - test_acc)
meta_f1 = abs(val_f1 - test_f1)
report.append_row(
test.prevalence(),
acc_score=val_acc,
f1_score=val_f1,
acc=meta_acc,
f1=meta_f1,
)
return report
@baseline
def mandoline(
c_model: BaseEstimator,
validation: LabelledCollection,
protocol: AbstractStochasticSeededProtocol,
predict_method="predict_proba",
) -> EvaluationReport:
c_model_predict = getattr(c_model, predict_method)
val_probs = c_model_predict(validation.X)
val_preds = np.argmax(val_probs, axis=1)
D_val = mandolib.get_slices(val_probs)
emprical_mat_list_val = (1.0 * (val_preds == validation.y))[:, np.newaxis]
report = EvaluationReport(name="mandoline")
for test in protocol():
test_probs = c_model_predict(test.X)
test_pred = np.argmax(test_probs, axis=1)
D_test = mandolib.get_slices(test_probs)
wp = mandolib.estimate_performance(D_val, D_test, None, emprical_mat_list_val)
score = wp.all_estimates[0].weighted[0]
meta_score = abs(score - metrics.accuracy_score(test.y, test_pred))
report.append_row(test.prevalence(), acc=meta_score, acc_score=score)
return report
@baseline
def rca(
c_model: BaseEstimator,
validation: LabelledCollection,
protocol: AbstractStochasticSeededProtocol,
predict_method="predict",
):
"""elsahar19"""
c_model_predict = getattr(c_model, predict_method)
f1_average = "binary" if validation.n_classes == 2 else "macro"
val1, val2 = validation.split_stratified(train_prop=0.5, random_state=env._R_SEED)
val1_pred1 = c_model_predict(val1.X)
val2_protocol = APP(
val2,
n_prevalences=21,
repeats=100,
return_type="labelled_collection",
)
val2_prot_preds = []
val2_rca = []
val2_prot_preds = []
val2_prot_y = []
for v2 in val2_protocol():
_preds = c_model_predict(v2.X)
try:
c_model2 = clone_fit(c_model, v2.X, _preds)
c_model2_predict = getattr(c_model2, predict_method)
val1_pred2 = c_model2_predict(val1.X)
rca_score = 1.0 - rcalib.get_score(val1_pred1, val1_pred2, val1.y)
val2_rca.append(rca_score)
val2_prot_preds.append(_preds)
val2_prot_y.append(v2.y)
except ValueError:
pass
val_targets_acc = np.array(
[
metrics.accuracy_score(v2_y, v2_preds)
for v2_y, v2_preds in zip(val2_prot_y, val2_prot_preds)
]
)
reg_acc = LinearRegression().fit(np.array(val2_rca)[:, np.newaxis], val_targets_acc)
val_targets_f1 = np.array(
[
metrics.f1_score(v2_y, v2_preds, average=f1_average)
for v2_y, v2_preds in zip(val2_prot_y, val2_prot_preds)
]
)
reg_f1 = LinearRegression().fit(np.array(val2_rca)[:, np.newaxis], val_targets_f1)
report = EvaluationReport(name="rca")
for test in protocol():
try:
test_preds = c_model_predict(test.X)
c_model2 = clone_fit(c_model, test.X, test_preds)
c_model2_predict = getattr(c_model2, predict_method)
val1_pred2 = c_model2_predict(val1.X)
rca_score = 1.0 - rcalib.get_score(val1_pred1, val1_pred2, val1.y)
acc_score = reg_acc.predict(np.array([[rca_score]]))[0]
f1_score = reg_f1.predict(np.array([[rca_score]]))[0]
meta_acc = abs(acc_score - metrics.accuracy_score(test.y, test_preds))
meta_f1 = abs(
f1_score - metrics.f1_score(test.y, test_preds, average=f1_average)
)
report.append_row(
test.prevalence(),
acc=meta_acc,
acc_score=acc_score,
f1=meta_f1,
f1_score=f1_score,
)
except ValueError:
report.append_row(
test.prevalence(),
acc=np.nan,
acc_score=np.nan,
f1=np.nan,
f1_score=np.nan,
)
return report
@baseline
def rca_star(
c_model: BaseEstimator,
validation: LabelledCollection,
protocol: AbstractStochasticSeededProtocol,
predict_method="predict",
):
"""elsahar19"""
c_model_predict = getattr(c_model, predict_method)
f1_average = "binary" if validation.n_classes == 2 else "macro"
validation1, val2 = validation.split_stratified(
train_prop=0.5, random_state=env._R_SEED
)
val11, val12 = validation1.split_stratified(
train_prop=0.5, random_state=env._R_SEED
)
val11_pred = c_model_predict(val11.X)
c_model1 = clone_fit(c_model, val11.X, val11_pred)
c_model1_predict = getattr(c_model1, predict_method)
val12_pred1 = c_model1_predict(val12.X)
val2_protocol = APP(
val2,
n_prevalences=21,
repeats=100,
return_type="labelled_collection",
)
val2_prot_preds = []
val2_rca = []
val2_prot_preds = []
val2_prot_y = []
for v2 in val2_protocol():
_preds = c_model_predict(v2.X)
try:
c_model2 = clone_fit(c_model, v2.X, _preds)
c_model2_predict = getattr(c_model2, predict_method)
val12_pred2 = c_model2_predict(val12.X)
rca_score = 1.0 - rcalib.get_score(val12_pred1, val12_pred2, val12.y)
val2_rca.append(rca_score)
val2_prot_preds.append(_preds)
val2_prot_y.append(v2.y)
except ValueError:
pass
val_targets_acc = np.array(
[
metrics.accuracy_score(v2_y, v2_preds)
for v2_y, v2_preds in zip(val2_prot_y, val2_prot_preds)
]
)
reg_acc = LinearRegression().fit(np.array(val2_rca)[:, np.newaxis], val_targets_acc)
val_targets_f1 = np.array(
[
metrics.f1_score(v2_y, v2_preds, average=f1_average)
for v2_y, v2_preds in zip(val2_prot_y, val2_prot_preds)
]
)
reg_f1 = LinearRegression().fit(np.array(val2_rca)[:, np.newaxis], val_targets_f1)
report = EvaluationReport(name="rca_star")
for test in protocol():
try:
test_pred = c_model_predict(test.X)
c_model2 = clone_fit(c_model, test.X, test_pred)
c_model2_predict = getattr(c_model2, predict_method)
val12_pred2 = c_model2_predict(val12.X)
rca_star_score = 1.0 - rcalib.get_score(val12_pred1, val12_pred2, val12.y)
acc_score = reg_acc.predict(np.array([[rca_star_score]]))[0]
f1_score = reg_f1.predict(np.array([[rca_score]]))[0]
meta_acc = abs(acc_score - metrics.accuracy_score(test.y, test_pred))
meta_f1 = abs(
f1_score - metrics.f1_score(test.y, test_pred, average=f1_average)
)
report.append_row(
test.prevalence(),
acc=meta_acc,
acc_score=acc_score,
f1=meta_f1,
f1_score=f1_score,
)
except ValueError:
report.append_row(
test.prevalence(),
acc=np.nan,
acc_score=np.nan,
f1=np.nan,
f1_score=np.nan,
)
return report
@baseline
def atc_mc(
c_model: BaseEstimator,
validation: LabelledCollection,
protocol: AbstractStochasticSeededProtocol,
predict_method="predict_proba",
):
"""garg"""
c_model_predict = getattr(c_model, predict_method)
f1_average = "binary" if validation.n_classes == 2 else "macro"
## Load ID validation data probs and labels
val_probs, val_labels = c_model_predict(validation.X), validation.y
## score function, e.g., negative entropy or argmax confidence
val_scores = atc.get_max_conf(val_probs)
val_preds = np.argmax(val_probs, axis=-1)
_, atc_thres = atc.find_ATC_threshold(val_scores, val_labels == val_preds)
report = EvaluationReport(name="atc_mc")
for test in protocol():
## Load OOD test data probs
test_probs = c_model_predict(test.X)
test_preds = np.argmax(test_probs, axis=-1)
test_scores = atc.get_max_conf(test_probs)
atc_accuracy = atc.get_ATC_acc(atc_thres, test_scores)
meta_acc = abs(atc_accuracy - metrics.accuracy_score(test.y, test_preds))
f1_score = atc.get_ATC_f1(
atc_thres, test_scores, test_probs, average=f1_average
)
meta_f1 = abs(
f1_score - metrics.f1_score(test.y, test_preds, average=f1_average)
)
report.append_row(
test.prevalence(),
acc=meta_acc,
acc_score=atc_accuracy,
f1_score=f1_score,
f1=meta_f1,
)
return report
@baseline
def atc_ne(
c_model: BaseEstimator,
validation: LabelledCollection,
protocol: AbstractStochasticSeededProtocol,
predict_method="predict_proba",
):
"""garg"""
c_model_predict = getattr(c_model, predict_method)
f1_average = "binary" if validation.n_classes == 2 else "macro"
## Load ID validation data probs and labels
val_probs, val_labels = c_model_predict(validation.X), validation.y
## score function, e.g., negative entropy or argmax confidence
val_scores = atc.get_entropy(val_probs)
val_preds = np.argmax(val_probs, axis=-1)
_, atc_thres = atc.find_ATC_threshold(val_scores, val_labels == val_preds)
report = EvaluationReport(name="atc_ne")
for test in protocol():
## Load OOD test data probs
test_probs = c_model_predict(test.X)
test_preds = np.argmax(test_probs, axis=-1)
test_scores = atc.get_entropy(test_probs)
atc_accuracy = atc.get_ATC_acc(atc_thres, test_scores)
meta_acc = abs(atc_accuracy - metrics.accuracy_score(test.y, test_preds))
f1_score = atc.get_ATC_f1(
atc_thres, test_scores, test_probs, average=f1_average
)
meta_f1 = abs(
f1_score - metrics.f1_score(test.y, test_preds, average=f1_average)
)
report.append_row(
test.prevalence(),
acc=meta_acc,
acc_score=atc_accuracy,
f1_score=f1_score,
f1=meta_f1,
)
return report
@baseline
def doc(
c_model: BaseEstimator,
validation: LabelledCollection,
protocol: AbstractStochasticSeededProtocol,
predict_method="predict_proba",
):
c_model_predict = getattr(c_model, predict_method)
f1_average = "binary" if validation.n_classes == 2 else "macro"
val1, val2 = validation.split_stratified(train_prop=0.5, random_state=env._R_SEED)
val1_probs = c_model_predict(val1.X)
val1_mc = np.max(val1_probs, axis=-1)
val1_preds = np.argmax(val1_probs, axis=-1)
val1_acc = metrics.accuracy_score(val1.y, val1_preds)
val1_f1 = metrics.f1_score(val1.y, val1_preds, average=f1_average)
val2_protocol = APP(
val2,
n_prevalences=21,
repeats=100,
return_type="labelled_collection",
)
val2_prot_mc = []
val2_prot_preds = []
val2_prot_y = []
for v2 in val2_protocol():
_probs = c_model_predict(v2.X)
_mc = np.max(_probs, axis=-1)
_preds = np.argmax(_probs, axis=-1)
val2_prot_mc.append(_mc)
val2_prot_preds.append(_preds)
val2_prot_y.append(v2.y)
val_scores = np.array([doclib.get_doc(val1_mc, v2_mc) for v2_mc in val2_prot_mc])
val_targets_acc = np.array(
[
val1_acc - metrics.accuracy_score(v2_y, v2_preds)
for v2_y, v2_preds in zip(val2_prot_y, val2_prot_preds)
]
)
reg_acc = LinearRegression().fit(val_scores[:, np.newaxis], val_targets_acc)
val_targets_f1 = np.array(
[
val1_f1 - metrics.f1_score(v2_y, v2_preds, average=f1_average)
for v2_y, v2_preds in zip(val2_prot_y, val2_prot_preds)
]
)
reg_f1 = LinearRegression().fit(val_scores[:, np.newaxis], val_targets_f1)
report = EvaluationReport(name="doc")
for test in protocol():
test_probs = c_model_predict(test.X)
test_preds = np.argmax(test_probs, axis=-1)
test_mc = np.max(test_probs, axis=-1)
acc_score = (
val1_acc
- reg_acc.predict(np.array([[doclib.get_doc(val1_mc, test_mc)]]))[0]
)
f1_score = (
val1_f1 - reg_f1.predict(np.array([[doclib.get_doc(val1_mc, test_mc)]]))[0]
)
meta_acc = abs(acc_score - metrics.accuracy_score(test.y, test_preds))
meta_f1 = abs(
f1_score - metrics.f1_score(test.y, test_preds, average=f1_average)
)
report.append_row(
test.prevalence(),
acc=meta_acc,
acc_score=acc_score,
f1=meta_f1,
f1_score=f1_score,
)
return report
@baseline
def doc_feat(
c_model: BaseEstimator,
validation: LabelledCollection,
protocol: AbstractStochasticSeededProtocol,
predict_method="predict_proba",
):
c_model_predict = getattr(c_model, predict_method)
val_probs, val_labels = c_model_predict(validation.X), validation.y
val_scores = np.max(val_probs, axis=-1)
val_preds = np.argmax(val_probs, axis=-1)
v1acc = np.mean(val_preds == val_labels) * 100
report = EvaluationReport(name="doc_feat")
for test in protocol():
test_probs = c_model_predict(test.X)
test_preds = np.argmax(test_probs, axis=-1)
test_scores = np.max(test_probs, axis=-1)
score = (v1acc + doc.get_doc(val_scores, test_scores)) / 100.0
meta_acc = abs(score - metrics.accuracy_score(test.y, test_preds))
report.append_row(test.prevalence(), acc=meta_acc, acc_score=score)
return report
@baseline
def gde(
c_model: BaseEstimator,
validation: LabelledCollection,
protocol: AbstractStochasticSeededProtocol,
predict_method="predict",
) -> EvaluationReport:
c_model_predict = getattr(c_model, predict_method)
val1, val2 = validation.split_stratified(train_prop=0.5, random_state=env._R_SEED)
c_model1 = clone_fit(c_model, val1.X, val1.y)
c_model1_predict = getattr(c_model1, predict_method)
c_model2 = clone_fit(c_model, val2.X, val2.y)
c_model2_predict = getattr(c_model2, predict_method)
report = EvaluationReport(name="gde")
for test in protocol():
test_pred = c_model_predict(test.X)
test_pred1 = c_model1_predict(test.X)
test_pred2 = c_model2_predict(test.X)
score = gdelib.get_score(test_pred1, test_pred2)
meta_score = abs(score - metrics.accuracy_score(test.y, test_pred))
report.append_row(test.prevalence(), acc=meta_score, acc_score=score)
return report
@baseline
def logreg(
c_model: BaseEstimator,
validation: LabelledCollection,
protocol: AbstractStochasticSeededProtocol,
predict_method="predict",
):
c_model_predict = getattr(c_model, predict_method)
val_preds = c_model_predict(validation.X)
report = EvaluationReport(name="logreg")
for test in protocol():
wx = iw.logreg(validation.X, validation.y, test.X)
test_preds = c_model_predict(test.X)
estim_acc = iw.get_acc(val_preds, validation.y, wx)
true_acc = metrics.accuracy_score(test.y, test_preds)
meta_score = abs(estim_acc - true_acc)
report.append_row(test.prevalence(), acc=meta_score, acc_score=estim_acc)
return report
@baseline
def kdex2(
c_model: BaseEstimator,
validation: LabelledCollection,
protocol: AbstractStochasticSeededProtocol,
predict_method="predict",
):
c_model_predict = getattr(c_model, predict_method)
val_preds = c_model_predict(validation.X)
log_likelihood_val = iw.kdex2_lltr(validation.X)
Xval = validation.X.toarray() if issparse(validation.X) else validation.X
report = EvaluationReport(name="kdex2")
for test in protocol():
Xte = test.X.toarray() if issparse(test.X) else test.X
wx = iw.kdex2_weights(Xval, Xte, log_likelihood_val)
test_preds = c_model_predict(Xte)
estim_acc = iw.get_acc(val_preds, validation.y, wx)
true_acc = metrics.accuracy_score(test.y, test_preds)
meta_score = abs(estim_acc - true_acc)
report.append_row(test.prevalence(), acc=meta_score, acc_score=estim_acc)
return report

View File

@ -1,121 +0,0 @@
import os
import time
from traceback import print_exception as traceback
import numpy as np
import pandas as pd
import quapy as qp
from joblib import Parallel, delayed
from quapy.protocol import APP
from sklearn.linear_model import LogisticRegression
from quacc import logger
from quacc.dataset import Dataset
from quacc.environment import env
from quacc.evaluation.estimators import CE
from quacc.evaluation.report import CompReport, DatasetReport
from quacc.utils import parallel
# from quacc.logger import logger, logger_manager
# from quacc.evaluation.worker import WorkerArgs, estimate_worker
pd.set_option("display.float_format", "{:.4f}".format)
# qp.environ["SAMPLE_SIZE"] = env.SAMPLE_SIZE
def estimate_worker(_estimate, train, validation, test, q=None):
# qp.environ["SAMPLE_SIZE"] = env.SAMPLE_SIZE
log = logger.setup_worker_logger(q)
model = LogisticRegression()
model.fit(*train.Xy)
protocol = APP(
test,
n_prevalences=env.PROTOCOL_N_PREVS,
repeats=env.PROTOCOL_REPEATS,
return_type="labelled_collection",
random_state=env._R_SEED,
)
start = time.time()
try:
result = _estimate(model, validation, protocol)
except Exception as e:
log.warning(f"Method {_estimate.name} failed. Exception: {e}")
traceback(e)
return None
result.time = time.time() - start
log.info(f"{_estimate.name} finished [took {result.time:.4f}s]")
logger.logger_manager().rm_worker()
return result
def split_tasks(estimators, train, validation, test, q):
_par, _seq = [], []
for estim in estimators:
if hasattr(estim, "nocall"):
continue
_task = [estim, train, validation, test]
match estim.name:
case n if n.endswith("_gs"):
_seq.append(_task)
case _:
_par.append(_task + [q])
return _par, _seq
def evaluate_comparison(dataset: Dataset, estimators=None) -> DatasetReport:
# log = Logger.logger()
log = logger.logger()
# with multiprocessing.Pool(1) as pool:
__pool_size = round(os.cpu_count() * 0.8)
# with multiprocessing.Pool(__pool_size) as pool:
dr = DatasetReport(dataset.name)
log.info(f"dataset {dataset.name} [pool size: {__pool_size}]")
for d in dataset():
log.info(
f"Dataset sample {np.around(d.train_prev, decimals=2)} "
f"of dataset {dataset.name} started"
)
par_tasks, seq_tasks = split_tasks(
CE.func[estimators],
d.train,
d.validation,
d.test,
logger.logger_manager().q,
)
try:
tstart = time.time()
results = parallel(estimate_worker, par_tasks, n_jobs=env.N_JOBS, _env=env)
results += parallel(estimate_worker, seq_tasks, n_jobs=1, _env=env)
results = [r for r in results if r is not None]
g_time = time.time() - tstart
log.info(
f"Dataset sample {np.around(d.train_prev, decimals=2)} "
f"of dataset {dataset.name} finished "
f"[took {g_time:.4f}s]"
)
cr = CompReport(
results,
name=dataset.name,
train_prev=d.train_prev,
valid_prev=d.validation_prev,
g_time=g_time,
)
dr += cr
except Exception as e:
log.warning(
f"Dataset sample {np.around(d.train_prev, decimals=2)} "
f"of dataset {dataset.name} failed. "
f"Exception: {e}"
)
traceback(e)
return dr

View File

@ -1,112 +0,0 @@
from typing import List
import numpy as np
from quacc.evaluation import baseline, method, alt
class CompEstimatorFunc_:
def __init__(self, ce):
self.ce = ce
def __getitem__(self, e: str | List[str]):
if isinstance(e, str):
return list(self.ce._CompEstimator__get(e).values())[0]
elif isinstance(e, list):
return list(self.ce._CompEstimator__get(e).values())
class CompEstimatorName_:
def __init__(self, ce):
self.ce = ce
def __getitem__(self, e: str | List[str]):
if isinstance(e, str):
return list(self.ce._CompEstimator__get(e).keys())[0]
elif isinstance(e, list):
return list(self.ce._CompEstimator__get(e).keys())
def sort(self, e: List[str]):
return list(self.ce._CompEstimator__get(e, get_ref=False).keys())
@property
def all(self):
return list(self.ce._CompEstimator__get("__all").keys())
@property
def baselines(self):
return list(self.ce._CompEstimator__get("__baselines").keys())
class CompEstimator:
def __get(cls, e: str | List[str], get_ref=True):
_dict = alt._alts | baseline._baselines | method._methods
if isinstance(e, str) and e == "__all":
e = list(_dict.keys())
if isinstance(e, str) and e == "__baselines":
e = list(baseline._baselines.keys())
if isinstance(e, str):
try:
return {e: _dict[e]}
except KeyError:
raise KeyError(f"Invalid estimator: estimator {e} does not exist")
elif isinstance(e, list) or isinstance(e, np.ndarray):
_subtr = np.setdiff1d(e, list(_dict.keys()))
if len(_subtr) > 0:
raise KeyError(
f"Invalid estimator: estimator {_subtr[0]} does not exist"
)
e_fun = {k: fun for k, fun in _dict.items() if k in e}
if get_ref and "ref" not in e:
e_fun["ref"] = _dict["ref"]
elif not get_ref and "ref" in e:
del e_fun["ref"]
return e_fun
@property
def name(self):
return CompEstimatorName_(self)
@property
def func(self):
return CompEstimatorFunc_(self)
CE = CompEstimator()
_renames = {
"bin_sld_lr": "(2x2)_SLD_LR",
"mul_sld_lr": "(1x4)_SLD_LR",
"m3w_sld_lr": "(1x3)_SLD_LR",
"d_bin_sld_lr": "d_(2x2)_SLD_LR",
"d_mul_sld_lr": "d_(1x4)_SLD_LR",
"d_m3w_sld_lr": "d_(1x3)_SLD_LR",
"d_bin_sld_rbf": "(2x2)_SLD_RBF",
"d_mul_sld_rbf": "(1x4)_SLD_RBF",
"d_m3w_sld_rbf": "(1x3)_SLD_RBF",
# "sld_lr_gs": "MS_SLD_LR",
"sld_lr_gs": "QuAcc(SLD)",
"bin_kde_lr": "(2x2)_KDEy_LR",
"mul_kde_lr": "(1x4)_KDEy_LR",
"m3w_kde_lr": "(1x3)_KDEy_LR",
"d_bin_kde_lr": "d_(2x2)_KDEy_LR",
"d_mul_kde_lr": "d_(1x4)_KDEy_LR",
"d_m3w_kde_lr": "d_(1x3)_KDEy_LR",
"bin_cc_lr": "(2x2)_CC_LR",
"mul_cc_lr": "(1x4)_CC_LR",
"m3w_cc_lr": "(1x3)_CC_LR",
# "kde_lr_gs": "MS_KDEy_LR",
"kde_lr_gs": "QuAcc(KDEy)",
# "cc_lr_gs": "MS_CC_LR",
"cc_lr_gs": "QuAcc(CC)",
"atc_mc": "ATC",
"doc": "DoC",
"mandoline": "Mandoline",
"rca": "RCA",
"rca_star": "RCA*",
"naive": "Naive",
}

View File

@ -1,32 +0,0 @@
from typing import Callable, Union
from quapy.protocol import AbstractProtocol, OnLabelledCollectionProtocol
import quacc as qc
from quacc.method.base import BaseAccuracyEstimator
def evaluate(
estimator: BaseAccuracyEstimator,
protocol: AbstractProtocol,
error_metric: Union[Callable | str],
) -> float:
if isinstance(error_metric, str):
error_metric = qc.error.from_name(error_metric)
collator_bck_ = protocol.collator
protocol.collator = OnLabelledCollectionProtocol.get_collator("labelled_collection")
estim_prevs, true_prevs = [], []
for sample in protocol():
e_sample = estimator.extend(sample)
estim_prev = estimator.estimate(e_sample.eX)
estim_prevs.append(estim_prev)
true_prevs.append(e_sample.e_prevalence())
protocol.collator = collator_bck_
# true_prevs = np.array(true_prevs)
# estim_prevs = np.array(estim_prevs)
return error_metric(true_prevs, estim_prevs)

View File

@ -1,517 +0,0 @@
from dataclasses import dataclass
from typing import Callable, List, Union
import numpy as np
from matplotlib.pylab import rand
from quapy.method.aggregative import CC, PACC, SLD, BaseQuantifier
from quapy.protocol import UPP, AbstractProtocol, OnLabelledCollectionProtocol
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC, LinearSVC
import quacc as qc
from quacc.environment import env
from quacc.evaluation.report import EvaluationReport
from quacc.method.base import BQAE, MCAE, BaseAccuracyEstimator
from quacc.method.model_selection import (
GridSearchAE,
SpiderSearchAE,
)
from quacc.quantification import KDEy
import traceback
def _param_grid(method, X_fit: np.ndarray):
match method:
case "sld_lr":
return {
"q__classifier__C": np.logspace(-3, 3, 7),
"q__classifier__class_weight": [None, "balanced"],
"q__recalib": [None, "bcts"],
"confidence": [
None,
["isoft"],
["max_conf", "entropy"],
["max_conf", "entropy", "isoft"],
],
}
case "sld_rbf":
_scale = 1.0 / (X_fit.shape[1] * X_fit.var())
return {
"q__classifier__C": np.logspace(-3, 3, 7),
"q__classifier__class_weight": [None, "balanced"],
"q__classifier__gamma": _scale * np.logspace(-2, 2, 5),
"q__recalib": [None, "bcts"],
"confidence": [
None,
["isoft"],
["max_conf", "entropy"],
["max_conf", "entropy", "isoft"],
],
}
case "pacc":
return {
"q__classifier__C": np.logspace(-3, 3, 7),
"q__classifier__class_weight": [None, "balanced"],
"confidence": [None, ["isoft"], ["max_conf", "entropy"]],
}
case "cc_lr":
return {
"q__classifier__C": np.logspace(-3, 3, 7),
"q__classifier__class_weight": [None, "balanced"],
"confidence": [
None,
["isoft"],
["max_conf", "entropy"],
["max_conf", "entropy", "isoft"],
],
}
case "kde_lr":
return {
"q__classifier__C": np.logspace(-3, 3, 7),
"q__classifier__class_weight": [None, "balanced"],
"q__bandwidth": np.linspace(0.01, 0.2, 20),
"confidence": [None, ["isoft"], ["max_conf", "entropy", "isoft"]],
}
case "kde_rbf":
_scale = 1.0 / (X_fit.shape[1] * X_fit.var())
return {
"q__classifier__C": np.logspace(-3, 3, 7),
"q__classifier__class_weight": [None, "balanced"],
"q__classifier__gamma": _scale * np.logspace(-2, 2, 5),
"q__bandwidth": np.linspace(0.01, 0.2, 20),
"confidence": [None, ["isoft"], ["max_conf", "entropy", "isoft"]],
}
def evaluation_report(
estimator: BaseAccuracyEstimator, protocol: AbstractProtocol, method_name=None
) -> EvaluationReport:
# method_name = inspect.stack()[1].function
report = EvaluationReport(name=method_name)
for sample in protocol():
try:
e_sample = estimator.extend(sample)
estim_prev = estimator.estimate(e_sample.eX)
true_prev = e_sample.e_prevalence()
acc_score = qc.error.acc(estim_prev)
row = dict(
acc_score=acc_score,
acc=abs(qc.error.acc(true_prev) - acc_score),
)
if estim_prev.can_f1():
f1_score = qc.error.f1(estim_prev)
row = row | dict(
f1_score=f1_score,
f1=abs(qc.error.f1(true_prev) - f1_score),
)
report.append_row(sample.prevalence(), **row)
except Exception as e:
print(f"sample prediction failed for method {method_name}: {e}")
traceback.print_exception(e)
report.append_row(
sample.prevalence(),
acc_score=np.nan,
acc=np.nan,
f1_score=np.nan,
f1=np.nan,
)
return report
@dataclass(frozen=True)
class EmptyMethod:
name: str
nocall: bool = True
def __call__(self, c_model, validation, protocol) -> EvaluationReport:
pass
@dataclass(frozen=True)
class EvaluationMethod:
name: str
q: BaseQuantifier
est_n: str
conf: List[str] | str = None
cf: bool = False # collapse_false
gf: bool = False # group_false
d: bool = False # dense
def get_est(self, c_model):
match self.est_n:
case "mul":
return MCAE(
c_model,
self.q,
confidence=self.conf,
collapse_false=self.cf,
group_false=self.gf,
dense=self.d,
)
case "bin":
return BQAE(
c_model,
self.q,
confidence=self.conf,
group_false=self.gf,
dense=self.d,
)
def __call__(self, c_model, validation, protocol) -> EvaluationReport:
est = self.get_est(c_model).fit(validation)
return evaluation_report(
estimator=est, protocol=protocol, method_name=self.name
)
@dataclass(frozen=True)
class EvaluationMethodGridSearch(EvaluationMethod):
pg: str = "sld"
search: str = "grid"
def get_search(self):
match self.search:
case "grid":
return (GridSearchAE, {})
case "spider" | "spider2":
return (SpiderSearchAE, dict(best_width=2))
case "spider3":
return (SpiderSearchAE, dict(best_width=3))
case _:
return GridSearchAE
def __call__(self, c_model, validation, protocol) -> EvaluationReport:
v_train, v_val = validation.split_stratified(0.6, random_state=env._R_SEED)
_model = self.get_est(c_model)
_grid = _param_grid(self.pg, X_fit=_model.extend(v_train, prefit=True).X)
_search_class, _search_params = self.get_search()
est = _search_class(
model=_model,
param_grid=_grid,
refit=False,
protocol=UPP(v_val, repeats=100),
verbose=False,
**_search_params,
).fit(v_train)
er = evaluation_report(
estimator=est,
protocol=protocol,
method_name=self.name,
)
er.fit_score = est.best_score()
return er
E = EmptyMethod
M = EvaluationMethod
G = EvaluationMethodGridSearch
def __sld_lr():
return SLD(LogisticRegression())
def __sld_rbf():
return SLD(SVC(kernel="rbf", probability=True))
def __kde_lr():
return KDEy(LogisticRegression(), random_state=env._R_SEED)
def __kde_rbf():
return KDEy(SVC(kernel="rbf", probability=True), random_state=env._R_SEED)
def __sld_lsvc():
return SLD(LinearSVC())
def __pacc_lr():
return PACC(LogisticRegression())
def __cc_lr():
return CC(LogisticRegression())
# fmt: off
__sld_lr_set = [
M("bin_sld_lr", __sld_lr(), "bin" ),
M("bgf_sld_lr", __sld_lr(), "bin", gf=True),
M("mul_sld_lr", __sld_lr(), "mul" ),
M("m3w_sld_lr", __sld_lr(), "mul", cf=True),
M("mgf_sld_lr", __sld_lr(), "mul", gf=True),
# max_conf sld
M("bin_sld_lr_mc", __sld_lr(), "bin", conf="max_conf", ),
M("bgf_sld_lr_mc", __sld_lr(), "bin", conf="max_conf", gf=True),
M("mul_sld_lr_mc", __sld_lr(), "mul", conf="max_conf", ),
M("m3w_sld_lr_mc", __sld_lr(), "mul", conf="max_conf", cf=True),
M("mgf_sld_lr_mc", __sld_lr(), "mul", conf="max_conf", gf=True),
# entropy sld
M("bin_sld_lr_ne", __sld_lr(), "bin", conf="entropy", ),
M("bgf_sld_lr_ne", __sld_lr(), "bin", conf="entropy", gf=True),
M("mul_sld_lr_ne", __sld_lr(), "mul", conf="entropy", ),
M("m3w_sld_lr_ne", __sld_lr(), "mul", conf="entropy", cf=True),
M("mgf_sld_lr_ne", __sld_lr(), "mul", conf="entropy", gf=True),
# inverse softmax sld
M("bin_sld_lr_is", __sld_lr(), "bin", conf="isoft", ),
M("bgf_sld_lr_is", __sld_lr(), "bin", conf="isoft", gf=True),
M("mul_sld_lr_is", __sld_lr(), "mul", conf="isoft", ),
M("m3w_sld_lr_is", __sld_lr(), "mul", conf="isoft", cf=True),
M("mgf_sld_lr_is", __sld_lr(), "mul", conf="isoft", gf=True),
# max_conf + entropy sld
M("bin_sld_lr_c", __sld_lr(), "bin", conf=["max_conf", "entropy"] ),
M("bgf_sld_lr_c", __sld_lr(), "bin", conf=["max_conf", "entropy"], gf=True),
M("mul_sld_lr_c", __sld_lr(), "mul", conf=["max_conf", "entropy"] ),
M("m3w_sld_lr_c", __sld_lr(), "mul", conf=["max_conf", "entropy"], cf=True),
M("mgf_sld_lr_c", __sld_lr(), "mul", conf=["max_conf", "entropy"], gf=True),
# sld all
M("bin_sld_lr_a", __sld_lr(), "bin", conf=["max_conf", "entropy", "isoft"], ),
M("bgf_sld_lr_a", __sld_lr(), "bin", conf=["max_conf", "entropy", "isoft"], gf=True),
M("mul_sld_lr_a", __sld_lr(), "mul", conf=["max_conf", "entropy", "isoft"], ),
M("m3w_sld_lr_a", __sld_lr(), "mul", conf=["max_conf", "entropy", "isoft"], cf=True),
M("mgf_sld_lr_a", __sld_lr(), "mul", conf=["max_conf", "entropy", "isoft"], gf=True),
# gs sld
G("bin_sld_lr_gs", __sld_lr(), "bin", pg="sld_lr" ),
G("bgf_sld_lr_gs", __sld_lr(), "bin", pg="sld_lr", gf=True),
G("mul_sld_lr_gs", __sld_lr(), "mul", pg="sld_lr" ),
G("m3w_sld_lr_gs", __sld_lr(), "mul", pg="sld_lr", cf=True),
G("mgf_sld_lr_gs", __sld_lr(), "mul", pg="sld_lr", gf=True),
]
__dense_sld_lr_set = [
M("d_bin_sld_lr", __sld_lr(), "bin", d=True, ),
M("d_bgf_sld_lr", __sld_lr(), "bin", d=True, gf=True),
M("d_mul_sld_lr", __sld_lr(), "mul", d=True, ),
M("d_m3w_sld_lr", __sld_lr(), "mul", d=True, cf=True),
M("d_mgf_sld_lr", __sld_lr(), "mul", d=True, gf=True),
# max_conf sld
M("d_bin_sld_lr_mc", __sld_lr(), "bin", d=True, conf="max_conf", ),
M("d_bgf_sld_lr_mc", __sld_lr(), "bin", d=True, conf="max_conf", gf=True),
M("d_mul_sld_lr_mc", __sld_lr(), "mul", d=True, conf="max_conf", ),
M("d_m3w_sld_lr_mc", __sld_lr(), "mul", d=True, conf="max_conf", cf=True),
M("d_mgf_sld_lr_mc", __sld_lr(), "mul", d=True, conf="max_conf", gf=True),
# entropy sld
M("d_bin_sld_lr_ne", __sld_lr(), "bin", d=True, conf="entropy", ),
M("d_bgf_sld_lr_ne", __sld_lr(), "bin", d=True, conf="entropy", gf=True),
M("d_mul_sld_lr_ne", __sld_lr(), "mul", d=True, conf="entropy", ),
M("d_m3w_sld_lr_ne", __sld_lr(), "mul", d=True, conf="entropy", cf=True),
M("d_mgf_sld_lr_ne", __sld_lr(), "mul", d=True, conf="entropy", gf=True),
# inverse softmax sld
M("d_bin_sld_lr_is", __sld_lr(), "bin", d=True, conf="isoft", ),
M("d_bgf_sld_lr_is", __sld_lr(), "bin", d=True, conf="isoft", gf=True),
M("d_mul_sld_lr_is", __sld_lr(), "mul", d=True, conf="isoft", ),
M("d_m3w_sld_lr_is", __sld_lr(), "mul", d=True, conf="isoft", cf=True),
M("d_mgf_sld_lr_is", __sld_lr(), "mul", d=True, conf="isoft", gf=True),
# max_conf + entropy sld
M("d_bin_sld_lr_c", __sld_lr(), "bin", d=True, conf=["max_conf", "entropy"] ),
M("d_bgf_sld_lr_c", __sld_lr(), "bin", d=True, conf=["max_conf", "entropy"], gf=True),
M("d_mul_sld_lr_c", __sld_lr(), "mul", d=True, conf=["max_conf", "entropy"] ),
M("d_m3w_sld_lr_c", __sld_lr(), "mul", d=True, conf=["max_conf", "entropy"], cf=True),
M("d_mgf_sld_lr_c", __sld_lr(), "mul", d=True, conf=["max_conf", "entropy"], gf=True),
# sld all
M("d_bin_sld_lr_a", __sld_lr(), "bin", d=True, conf=["max_conf", "entropy", "isoft"], ),
M("d_bgf_sld_lr_a", __sld_lr(), "bin", d=True, conf=["max_conf", "entropy", "isoft"], gf=True),
M("d_mul_sld_lr_a", __sld_lr(), "mul", d=True, conf=["max_conf", "entropy", "isoft"], ),
M("d_m3w_sld_lr_a", __sld_lr(), "mul", d=True, conf=["max_conf", "entropy", "isoft"], cf=True),
M("d_mgf_sld_lr_a", __sld_lr(), "mul", d=True, conf=["max_conf", "entropy", "isoft"], gf=True),
# gs sld
G("d_bin_sld_lr_gs", __sld_lr(), "bin", d=True, pg="sld_lr" ),
G("d_bgf_sld_lr_gs", __sld_lr(), "bin", d=True, pg="sld_lr", gf=True),
G("d_mul_sld_lr_gs", __sld_lr(), "mul", d=True, pg="sld_lr" ),
G("d_m3w_sld_lr_gs", __sld_lr(), "mul", d=True, pg="sld_lr", cf=True),
G("d_mgf_sld_lr_gs", __sld_lr(), "mul", d=True, pg="sld_lr", gf=True),
]
__dense_sld_rbf_set = [
M("d_bin_sld_rbf", __sld_rbf(), "bin", d=True, ),
M("d_bgf_sld_rbf", __sld_rbf(), "bin", d=True, gf=True),
M("d_mul_sld_rbf", __sld_rbf(), "mul", d=True, ),
M("d_m3w_sld_rbf", __sld_rbf(), "mul", d=True, cf=True),
M("d_mgf_sld_rbf", __sld_rbf(), "mul", d=True, gf=True),
# max_conf sld
M("d_bin_sld_rbf_mc", __sld_rbf(), "bin", d=True, conf="max_conf", ),
M("d_bgf_sld_rbf_mc", __sld_rbf(), "bin", d=True, conf="max_conf", gf=True),
M("d_mul_sld_rbf_mc", __sld_rbf(), "mul", d=True, conf="max_conf", ),
M("d_m3w_sld_rbf_mc", __sld_rbf(), "mul", d=True, conf="max_conf", cf=True),
M("d_mgf_sld_rbf_mc", __sld_rbf(), "mul", d=True, conf="max_conf", gf=True),
# entropy sld
M("d_bin_sld_rbf_ne", __sld_rbf(), "bin", d=True, conf="entropy", ),
M("d_bgf_sld_rbf_ne", __sld_rbf(), "bin", d=True, conf="entropy", gf=True),
M("d_mul_sld_rbf_ne", __sld_rbf(), "mul", d=True, conf="entropy", ),
M("d_m3w_sld_rbf_ne", __sld_rbf(), "mul", d=True, conf="entropy", cf=True),
M("d_mgf_sld_rbf_ne", __sld_rbf(), "mul", d=True, conf="entropy", gf=True),
# inverse softmax sld
M("d_bin_sld_rbf_is", __sld_rbf(), "bin", d=True, conf="isoft", ),
M("d_bgf_sld_rbf_is", __sld_rbf(), "bin", d=True, conf="isoft", gf=True),
M("d_mul_sld_rbf_is", __sld_rbf(), "mul", d=True, conf="isoft", ),
M("d_m3w_sld_rbf_is", __sld_rbf(), "mul", d=True, conf="isoft", cf=True),
M("d_mgf_sld_rbf_is", __sld_rbf(), "mul", d=True, conf="isoft", gf=True),
# max_conf + entropy sld
M("d_bin_sld_rbf_c", __sld_rbf(), "bin", d=True, conf=["max_conf", "entropy"] ),
M("d_bgf_sld_rbf_c", __sld_rbf(), "bin", d=True, conf=["max_conf", "entropy"], gf=True),
M("d_mul_sld_rbf_c", __sld_rbf(), "mul", d=True, conf=["max_conf", "entropy"] ),
M("d_m3w_sld_rbf_c", __sld_rbf(), "mul", d=True, conf=["max_conf", "entropy"], cf=True),
M("d_mgf_sld_rbf_c", __sld_rbf(), "mul", d=True, conf=["max_conf", "entropy"], gf=True),
# sld all
M("d_bin_sld_rbf_a", __sld_rbf(), "bin", d=True, conf=["max_conf", "entropy", "isoft"], ),
M("d_bgf_sld_rbf_a", __sld_rbf(), "bin", d=True, conf=["max_conf", "entropy", "isoft"], gf=True),
M("d_mul_sld_rbf_a", __sld_rbf(), "mul", d=True, conf=["max_conf", "entropy", "isoft"], ),
M("d_m3w_sld_rbf_a", __sld_rbf(), "mul", d=True, conf=["max_conf", "entropy", "isoft"], cf=True),
M("d_mgf_sld_rbf_a", __sld_rbf(), "mul", d=True, conf=["max_conf", "entropy", "isoft"], gf=True),
# gs sld
G("d_bin_sld_rbf_gs", __sld_rbf(), "bin", d=True, pg="sld_rbf", search="grid", ),
G("d_bgf_sld_rbf_gs", __sld_rbf(), "bin", d=True, pg="sld_rbf", search="grid", gf=True),
G("d_mul_sld_rbf_gs", __sld_rbf(), "mul", d=True, pg="sld_rbf", search="grid", ),
G("d_m3w_sld_rbf_gs", __sld_rbf(), "mul", d=True, pg="sld_rbf", search="grid", cf=True),
G("d_mgf_sld_rbf_gs", __sld_rbf(), "mul", d=True, pg="sld_rbf", search="grid", gf=True),
]
__kde_lr_set = [
# base kde
M("bin_kde_lr", __kde_lr(), "bin" ),
M("mul_kde_lr", __kde_lr(), "mul" ),
M("m3w_kde_lr", __kde_lr(), "mul", cf=True),
# max_conf kde
M("bin_kde_lr_mc", __kde_lr(), "bin", conf="max_conf", ),
M("mul_kde_lr_mc", __kde_lr(), "mul", conf="max_conf", ),
M("m3w_kde_lr_mc", __kde_lr(), "mul", conf="max_conf", cf=True),
# entropy kde
M("bin_kde_lr_ne", __kde_lr(), "bin", conf="entropy", ),
M("mul_kde_lr_ne", __kde_lr(), "mul", conf="entropy", ),
M("m3w_kde_lr_ne", __kde_lr(), "mul", conf="entropy", cf=True),
# inverse softmax kde
M("bin_kde_lr_is", __kde_lr(), "bin", conf="isoft", ),
M("mul_kde_lr_is", __kde_lr(), "mul", conf="isoft", ),
M("m3w_kde_lr_is", __kde_lr(), "mul", conf="isoft", cf=True),
# max_conf + entropy kde
M("bin_kde_lr_c", __kde_lr(), "bin", conf=["max_conf", "entropy"] ),
M("mul_kde_lr_c", __kde_lr(), "mul", conf=["max_conf", "entropy"] ),
M("m3w_kde_lr_c", __kde_lr(), "mul", conf=["max_conf", "entropy"], cf=True),
# kde all
M("bin_kde_lr_a", __kde_lr(), "bin", conf=["max_conf", "entropy", "isoft"], ),
M("mul_kde_lr_a", __kde_lr(), "mul", conf=["max_conf", "entropy", "isoft"], ),
M("m3w_kde_lr_a", __kde_lr(), "mul", conf=["max_conf", "entropy", "isoft"], cf=True),
# gs kde
G("bin_kde_lr_gs", __kde_lr(), "bin", pg="kde_lr", search="grid" ),
G("mul_kde_lr_gs", __kde_lr(), "mul", pg="kde_lr", search="grid" ),
G("m3w_kde_lr_gs", __kde_lr(), "mul", pg="kde_lr", search="grid", cf=True),
]
__dense_kde_lr_set = [
# base kde
M("d_bin_kde_lr", __kde_lr(), "bin", d=True, ),
M("d_mul_kde_lr", __kde_lr(), "mul", d=True, ),
M("d_m3w_kde_lr", __kde_lr(), "mul", d=True, cf=True),
# max_conf kde
M("d_bin_kde_lr_mc", __kde_lr(), "bin", d=True, conf="max_conf", ),
M("d_mul_kde_lr_mc", __kde_lr(), "mul", d=True, conf="max_conf", ),
M("d_m3w_kde_lr_mc", __kde_lr(), "mul", d=True, conf="max_conf", cf=True),
# entropy kde
M("d_bin_kde_lr_ne", __kde_lr(), "bin", d=True, conf="entropy", ),
M("d_mul_kde_lr_ne", __kde_lr(), "mul", d=True, conf="entropy", ),
M("d_m3w_kde_lr_ne", __kde_lr(), "mul", d=True, conf="entropy", cf=True),
# inverse softmax kde d=True,
M("d_bin_kde_lr_is", __kde_lr(), "bin", d=True, conf="isoft", ),
M("d_mul_kde_lr_is", __kde_lr(), "mul", d=True, conf="isoft", ),
M("d_m3w_kde_lr_is", __kde_lr(), "mul", d=True, conf="isoft", cf=True),
# max_conf + entropy kde
M("d_bin_kde_lr_c", __kde_lr(), "bin", d=True, conf=["max_conf", "entropy"] ),
M("d_mul_kde_lr_c", __kde_lr(), "mul", d=True, conf=["max_conf", "entropy"] ),
M("d_m3w_kde_lr_c", __kde_lr(), "mul", d=True, conf=["max_conf", "entropy"], cf=True),
# kde all
M("d_bin_kde_lr_a", __kde_lr(), "bin", d=True, conf=["max_conf", "entropy", "isoft"], ),
M("d_mul_kde_lr_a", __kde_lr(), "mul", d=True, conf=["max_conf", "entropy", "isoft"], ),
M("d_m3w_kde_lr_a", __kde_lr(), "mul", d=True, conf=["max_conf", "entropy", "isoft"], cf=True),
# gs kde
G("d_bin_kde_lr_gs", __kde_lr(), "bin", d=True, pg="kde_lr", search="grid" ),
G("d_mul_kde_lr_gs", __kde_lr(), "mul", d=True, pg="kde_lr", search="grid" ),
G("d_m3w_kde_lr_gs", __kde_lr(), "mul", d=True, pg="kde_lr", search="grid", cf=True),
]
__dense_kde_rbf_set = [
# base kde
M("d_bin_kde_rbf", __kde_rbf(), "bin", d=True, ),
M("d_mul_kde_rbf", __kde_rbf(), "mul", d=True, ),
M("d_m3w_kde_rbf", __kde_rbf(), "mul", d=True, cf=True),
# max_conf kde
M("d_bin_kde_rbf_mc", __kde_rbf(), "bin", d=True, conf="max_conf", ),
M("d_mul_kde_rbf_mc", __kde_rbf(), "mul", d=True, conf="max_conf", ),
M("d_m3w_kde_rbf_mc", __kde_rbf(), "mul", d=True, conf="max_conf", cf=True),
# entropy kde
M("d_bin_kde_rbf_ne", __kde_rbf(), "bin", d=True, conf="entropy", ),
M("d_mul_kde_rbf_ne", __kde_rbf(), "mul", d=True, conf="entropy", ),
M("d_m3w_kde_rbf_ne", __kde_rbf(), "mul", d=True, conf="entropy", cf=True),
# inverse softmax kde
M("d_bin_kde_rbf_is", __kde_rbf(), "bin", d=True, conf="isoft", ),
M("d_mul_kde_rbf_is", __kde_rbf(), "mul", d=True, conf="isoft", ),
M("d_m3w_kde_rbf_is", __kde_rbf(), "mul", d=True, conf="isoft", cf=True),
# max_conf + entropy kde
M("d_bin_kde_rbf_c", __kde_rbf(), "bin", d=True, conf=["max_conf", "entropy"] ),
M("d_mul_kde_rbf_c", __kde_rbf(), "mul", d=True, conf=["max_conf", "entropy"] ),
M("d_m3w_kde_rbf_c", __kde_rbf(), "mul", d=True, conf=["max_conf", "entropy"], cf=True),
# kde all
M("d_bin_kde_rbf_a", __kde_rbf(), "bin", d=True, conf=["max_conf", "entropy", "isoft"], ),
M("d_mul_kde_rbf_a", __kde_rbf(), "mul", d=True, conf=["max_conf", "entropy", "isoft"], ),
M("d_m3w_kde_rbf_a", __kde_rbf(), "mul", d=True, conf=["max_conf", "entropy", "isoft"], cf=True),
# gs kde
G("d_bin_kde_rbf_gs", __kde_rbf(), "bin", d=True, pg="kde_rbf", search="spider" ),
G("d_mul_kde_rbf_gs", __kde_rbf(), "mul", d=True, pg="kde_rbf", search="spider" ),
G("d_m3w_kde_rbf_gs", __kde_rbf(), "mul", d=True, pg="kde_rbf", search="spider", cf=True),
]
__cc_lr_set = [
# base cc
M("bin_cc_lr", __cc_lr(), "bin" ),
M("mul_cc_lr", __cc_lr(), "mul" ),
M("m3w_cc_lr", __cc_lr(), "mul", cf=True),
# max_conf cc
M("bin_cc_lr_mc", __cc_lr(), "bin", conf="max_conf", ),
M("mul_cc_lr_mc", __cc_lr(), "mul", conf="max_conf", ),
M("m3w_cc_lr_mc", __cc_lr(), "mul", conf="max_conf", cf=True),
# entropy cc
M("bin_cc_lr_ne", __cc_lr(), "bin", conf="entropy", ),
M("mul_cc_lr_ne", __cc_lr(), "mul", conf="entropy", ),
M("m3w_cc_lr_ne", __cc_lr(), "mul", conf="entropy", cf=True),
# inverse softmax cc
M("bin_cc_lr_is", __cc_lr(), "bin", conf="isoft", ),
M("mul_cc_lr_is", __cc_lr(), "mul", conf="isoft", ),
M("m3w_cc_lr_is", __cc_lr(), "mul", conf="isoft", cf=True),
# max_conf + entropy cc
M("bin_cc_lr_c", __cc_lr(), "bin", conf=["max_conf", "entropy"] ),
M("mul_cc_lr_c", __cc_lr(), "mul", conf=["max_conf", "entropy"] ),
M("m3w_cc_lr_c", __cc_lr(), "mul", conf=["max_conf", "entropy"], cf=True),
# cc all
M("bin_cc_lr_a", __cc_lr(), "bin", conf=["max_conf", "entropy", "isoft"], ),
M("mul_cc_lr_a", __cc_lr(), "mul", conf=["max_conf", "entropy", "isoft"], ),
M("m3w_cc_lr_a", __cc_lr(), "mul", conf=["max_conf", "entropy", "isoft"], cf=True),
# gs cc
G("bin_cc_lr_gs", __cc_lr(), "bin", pg="cc_lr", search="grid" ),
G("mul_cc_lr_gs", __cc_lr(), "mul", pg="cc_lr", search="grid" ),
G("m3w_cc_lr_gs", __cc_lr(), "mul", pg="cc_lr", search="grid", cf=True),
]
__ms_set = [
E("cc_lr_gs"),
E("sld_lr_gs"),
E("kde_lr_gs"),
E("QuAcc"),
]
# fmt: on
__methods_set = (
__sld_lr_set
+ __dense_sld_lr_set
+ __dense_sld_rbf_set
+ __kde_lr_set
+ __dense_kde_lr_set
+ __dense_kde_rbf_set
+ __cc_lr_set
+ __ms_set
)
_methods = {m.name: m for m in __methods_set}

View File

@ -1,956 +0,0 @@
import json
import pickle
from collections import defaultdict
from pathlib import Path
from typing import List, Tuple
import numpy as np
import pandas as pd
import quacc as qc
import quacc.plot as plot
from quacc.utils import fmt_line_md
def _get_metric(metric: str):
return slice(None) if metric is None else metric
def _get_estimators(estimators: List[str], cols: np.ndarray):
if estimators is None:
return slice(None)
estimators = np.array(estimators)
return estimators[np.isin(estimators, cols)]
def _get_shift(index: np.ndarray, train_prev: np.ndarray):
index = np.array([np.array(tp) for tp in index])
train_prevs = np.tile(train_prev, (index.shape[0], 1))
# assert index.shape[1] == train_prev.shape[0], "Mismatch in prevalence shape"
# _shift = np.abs(index - train_prev)[:, 1:].sum(axis=1)
_shift = qc.error.nae(index, train_prevs)
return np.around(_shift, decimals=2)
class EvaluationReport:
def __init__(self, name=None):
self.data: pd.DataFrame | None = None
self.name = name if name is not None else "default"
self.time = 0.0
self.fit_score = None
def append_row(self, basep: np.ndarray | Tuple, **row):
# bp = basep[1]
bp = tuple(basep)
_keys, _values = zip(*row.items())
# _keys = list(row.keys())
# _values = list(row.values())
if self.data is None:
_idx = 0
self.data = pd.DataFrame(
{k: [v] for k, v in row.items()},
index=pd.MultiIndex.from_tuples([(bp, _idx)]),
columns=_keys,
)
return
_idx = len(self.data.loc[(bp,), :]) if (bp,) in self.data.index else 0
not_in_data = np.setdiff1d(list(row.keys()), self.data.columns.unique(0))
self.data.loc[:, not_in_data] = np.nan
self.data.loc[(bp, _idx), :] = row
return
@property
def columns(self) -> np.ndarray:
return self.data.columns.unique(0)
@property
def prevs(self):
return np.sort(self.data.index.unique(0))
class CompReport:
_default_modes = [
"delta_train",
"stdev_train",
"train_table",
"shift",
"shift_table",
"diagonal",
"stats_table",
]
def __init__(
self,
datas: List[EvaluationReport] | pd.DataFrame,
name="default",
train_prev: np.ndarray = None,
valid_prev: np.ndarray = None,
times=None,
fit_scores=None,
g_time=None,
):
if isinstance(datas, pd.DataFrame):
self._data: pd.DataFrame = datas
else:
self._data: pd.DataFrame = (
pd.concat(
[er.data for er in datas],
keys=[er.name for er in datas],
axis=1,
)
.swaplevel(0, 1, axis=1)
.sort_index(axis=1, level=0, sort_remaining=False)
.sort_index(axis=0, level=0, ascending=False, sort_remaining=False)
)
if fit_scores is None:
self.fit_scores = {
er.name: er.fit_score for er in datas if er.fit_score is not None
}
else:
self.fit_scores = fit_scores
if times is None:
self.times = {er.name: er.time for er in datas}
else:
self.times = times
self.times["tot"] = g_time if g_time is not None else 0.0
self.train_prev = train_prev
self.valid_prev = valid_prev
def postprocess(
self,
f_data: pd.DataFrame,
_data: pd.DataFrame,
metric=None,
estimators=None,
) -> pd.DataFrame:
_mapping = {
"sld_lr_gs": [
"bin_sld_lr_gs",
"mul_sld_lr_gs",
"m3w_sld_lr_gs",
],
"kde_lr_gs": [
"bin_kde_lr_gs",
"mul_kde_lr_gs",
"m3w_kde_lr_gs",
],
"cc_lr_gs": [
"bin_cc_lr_gs",
"mul_cc_lr_gs",
"m3w_cc_lr_gs",
],
"QuAcc": [
"bin_sld_lr_gs",
"mul_sld_lr_gs",
"m3w_sld_lr_gs",
"bin_kde_lr_gs",
"mul_kde_lr_gs",
"m3w_kde_lr_gs",
],
}
for name, methods in _mapping.items():
if estimators is not None and name not in estimators:
continue
available_idx = np.where(np.in1d(methods, self._data.columns.unique(1)))[0]
if len(available_idx) == 0:
continue
methods = np.array(methods)[available_idx]
_metric = _get_metric(metric)
m_data = _data.loc[:, (_metric, methods)]
_fit_scores = [(k, v) for (k, v) in self.fit_scores.items() if k in methods]
_best_method = [k for k, v in _fit_scores][
np.argmin([v for k, v in _fit_scores])
]
_metric = (
[_metric]
if _metric is isinstance(_metric, str)
else m_data.columns.unique(0)
)
for _m in _metric:
f_data.loc[:, (_m, name)] = m_data.loc[:, (_m, _best_method)]
return f_data
@property
def prevs(self) -> np.ndarray:
return self.data().index.unique(0)
def join(self, other, how="update", estimators=None):
if how not in ["update"]:
how = "update"
if not (self.train_prev == other.train_prev).all():
raise ValueError(
f"self has train prev. {self.train_prev} while other has {other.train_prev}"
)
self_data = self.data(estimators=estimators)
other_data = other.data(estimators=estimators)
if not (self_data.index == other_data.index).all():
raise ValueError("self and other have different indexes")
update_col = self_data.columns.intersection(other_data.columns)
other_join_col = other_data.columns.difference(update_col)
_join = pd.concat(
[self_data, other_data.loc[:, other_join_col.to_list()]],
axis=1,
)
_join.loc[:, update_col.to_list()] = other_data.loc[:, update_col.to_list()]
_join.sort_index(axis=1, level=0, sort_remaining=False, inplace=True)
df = CompReport(
_join,
self.name if hasattr(self, "name") else "default",
train_prev=self.train_prev,
valid_prev=self.valid_prev,
times=self.times | other.times,
fit_scores=self.fit_scores | other.fit_scores,
g_time=self.times["tot"] + other.times["tot"],
)
return df
def data(self, metric: str = None, estimators: List[str] = None) -> pd.DataFrame:
_metric = _get_metric(metric)
_estimators = _get_estimators(
estimators, self._data.loc[:, (_metric, slice(None))].columns.unique(1)
)
_data: pd.DataFrame = self._data.copy()
f_data: pd.DataFrame = _data.loc[:, (_metric, _estimators)]
f_data = self.postprocess(f_data, _data, metric=metric, estimators=estimators)
if len(f_data.columns.unique(0)) == 1:
f_data = f_data.droplevel(level=0, axis=1)
return f_data
def shift_data(
self, metric: str = None, estimators: List[str] = None
) -> pd.DataFrame:
shift_idx_0 = _get_shift(
self._data.index.get_level_values(0).to_numpy(),
self.train_prev,
)
shift_idx_1 = np.zeros(shape=shift_idx_0.shape[0], dtype="<i4")
for _id in np.unique(shift_idx_0):
_wh = (shift_idx_0 == _id).nonzero()[0]
shift_idx_1[_wh] = np.arange(_wh.shape[0], dtype="<i4")
shift_data = self._data.copy()
shift_data.index = pd.MultiIndex.from_arrays([shift_idx_0, shift_idx_1])
shift_data = shift_data.sort_index(axis=0, level=0)
_metric = _get_metric(metric)
_estimators = _get_estimators(
estimators, shift_data.loc[:, (_metric, slice(None))].columns.unique(1)
)
s_data: pd.DataFrame = shift_data
shift_data: pd.DataFrame = shift_data.loc[:, (_metric, _estimators)]
shift_data = self.postprocess(
shift_data, s_data, metric=metric, estimators=estimators
)
if len(shift_data.columns.unique(0)) == 1:
shift_data = shift_data.droplevel(level=0, axis=1)
return shift_data
def avg_by_prevs(
self, metric: str = None, estimators: List[str] = None
) -> pd.DataFrame:
f_dict = self.data(metric=metric, estimators=estimators)
return f_dict.groupby(level=0, sort=False).mean()
def stdev_by_prevs(
self, metric: str = None, estimators: List[str] = None
) -> pd.DataFrame:
f_dict = self.data(metric=metric, estimators=estimators)
return f_dict.groupby(level=0, sort=False).std()
def train_table(
self, metric: str = None, estimators: List[str] = None
) -> pd.DataFrame:
f_data = self.data(metric=metric, estimators=estimators)
avg_p = f_data.groupby(level=0, sort=False).mean()
avg_p.loc["mean", :] = f_data.mean()
return avg_p
def shift_table(
self, metric: str = None, estimators: List[str] = None
) -> pd.DataFrame:
f_data = self.shift_data(metric=metric, estimators=estimators)
avg_p = f_data.groupby(level=0, sort=False).mean()
avg_p.loc["mean", :] = f_data.mean()
return avg_p
def get_plots(
self,
mode="delta_train",
metric="acc",
estimators=None,
conf="default",
save_fig=True,
base_path=None,
backend=None,
) -> List[Tuple[str, Path]]:
if mode == "delta_train":
avg_data = self.avg_by_prevs(metric=metric, estimators=estimators)
if avg_data.empty:
return None
return plot.plot_delta(
base_prevs=self.prevs,
columns=avg_data.columns.to_numpy(),
data=avg_data.T.to_numpy(),
metric=metric,
name=conf,
train_prev=self.train_prev,
save_fig=save_fig,
base_path=base_path,
backend=backend,
)
elif mode == "stdev_train":
avg_data = self.avg_by_prevs(metric=metric, estimators=estimators)
if avg_data.empty is True:
return None
st_data = self.stdev_by_prevs(metric=metric, estimators=estimators)
return plot.plot_delta(
base_prevs=self.prevs,
columns=avg_data.columns.to_numpy(),
data=avg_data.T.to_numpy(),
metric=metric,
name=conf,
train_prev=self.train_prev,
stdevs=st_data.T.to_numpy(),
save_fig=save_fig,
base_path=base_path,
backend=backend,
)
elif mode == "diagonal":
f_data = self.data(metric=metric + "_score", estimators=estimators)
if f_data.empty is True:
return None
ref: pd.Series = f_data.loc[:, "ref"]
f_data.drop(columns=["ref"], inplace=True)
return plot.plot_diagonal(
reference=ref.to_numpy(),
columns=f_data.columns.to_numpy(),
data=f_data.T.to_numpy(),
metric=metric,
name=conf,
train_prev=self.train_prev,
save_fig=save_fig,
base_path=base_path,
backend=backend,
)
elif mode == "shift":
_shift_data = self.shift_data(metric=metric, estimators=estimators)
if _shift_data.empty is True:
return None
shift_avg = _shift_data.groupby(level=0, sort=False).mean()
shift_counts = _shift_data.groupby(level=0, sort=False).count()
shift_prevs = shift_avg.index.unique(0)
# shift_prevs = np.around(
# [(1.0 - p, p) for p in np.sort(shift_avg.index.unique(0))],
# decimals=2,
# )
return plot.plot_shift(
shift_prevs=shift_prevs,
columns=shift_avg.columns.to_numpy(),
data=shift_avg.T.to_numpy(),
metric=metric,
name=conf,
train_prev=self.train_prev,
counts=shift_counts.T.to_numpy(),
save_fig=save_fig,
base_path=base_path,
backend=backend,
)
def to_md(
self,
conf="default",
metric="acc",
estimators=None,
modes=_default_modes,
plot_path=None,
) -> str:
res = f"## {int(np.around(self.train_prev, decimals=2)[1]*100)}% positives\n"
res += fmt_line_md(f"train: {str(self.train_prev)}")
res += fmt_line_md(f"validation: {str(self.valid_prev)}")
for k, v in self.times.items():
if estimators is not None and k not in estimators:
continue
res += fmt_line_md(f"{k}: {v:.3f}s")
res += "\n"
if "train_table" in modes:
res += "### table\n"
res += (
self.train_table(metric=metric, estimators=estimators).to_html()
+ "\n\n"
)
if "shift_table" in modes:
res += "### shift table\n"
res += (
self.shift_table(metric=metric, estimators=estimators).to_html()
+ "\n\n"
)
plot_modes = [m for m in modes if not m.endswith("table")]
for mode in plot_modes:
res += f"### {mode}\n"
_, op = self.get_plots(
mode=mode,
metric=metric,
estimators=estimators,
conf=conf,
save_fig=True,
base_path=plot_path,
)
res += f"![plot_{mode}]({op.relative_to(op.parents[1]).as_posix()})\n"
return res
def _cr_train_prev(cr: CompReport):
return tuple(np.around(cr.train_prev, decimals=2))
def _cr_data(cr: CompReport, metric=None, estimators=None):
return cr.data(metric, estimators)
def _key_reverse_delta_train(idx):
idx = idx.to_numpy()
sorted_idx = np.array(
sorted(list(idx), key=lambda x: x[-1]), dtype=("float," * len(idx[0]))[:-1]
)
# get sorting index
nparr = np.nonzero(idx[:, None] == sorted_idx)[1]
return nparr
class DatasetReport:
_default_dr_modes = [
"delta_train",
"stdev_train",
"train_table",
"train_std_table",
"shift",
"shift_table",
"delta_test",
"stdev_test",
"test_table",
"diagonal",
"stats_table",
"fit_scores",
]
_default_cr_modes = CompReport._default_modes
def __init__(self, name, crs=None):
self.name = name
self.crs: List[CompReport] = [] if crs is None else crs
def sort_delta_train_index(self, data):
# data_ = data.sort_index(axis=0, level=0, ascending=True, sort_remaining=False)
data_ = data.sort_index(
axis=0,
level=0,
key=_key_reverse_delta_train,
)
print(data_.index)
return data_
def join(self, other, estimators=None):
_crs = [
s_cr.join(o_cr, estimators=estimators)
for s_cr, o_cr in zip(self.crs, other.crs)
]
return DatasetReport(self.name, _crs)
def fit_scores(self, metric: str = None, estimators: List[str] = None):
def _get_sort_idx(arr):
return np.array([np.searchsorted(np.sort(a), a) + 1 for a in arr])
def _get_best_idx(arr):
return np.argmin(arr, axis=1)
def _fdata_idx(idx) -> np.ndarray:
return _fdata.loc[(idx, slice(None), slice(None)), :].to_numpy()
_crs_train = [_cr_train_prev(cr) for cr in self.crs]
for cr in self.crs:
if not hasattr(cr, "fit_scores"):
return None
_crs_fit_scores = [cr.fit_scores for cr in self.crs]
_fit_scores = pd.DataFrame(_crs_fit_scores, index=_crs_train)
_fit_scores = _fit_scores.sort_index(axis=0, ascending=False)
_estimators = _get_estimators(estimators, _fit_scores.columns)
if _estimators.shape[0] == 0:
return None
_fdata = self.data(metric=metric, estimators=_estimators)
# ensure that columns in _fit_scores have the same ordering of _fdata
_fit_scores = _fit_scores.loc[:, _fdata.columns]
_best_fit_estimators = _get_best_idx(_fit_scores.to_numpy())
# scores = np.array(
# [
# _get_sort_idx(
# _fdata.loc[(idx, slice(None), slice(None)), :].to_numpy()
# )[:, cl].mean()
# for idx, cl in zip(_fit_scores.index, _best_fit_estimators)
# ]
# )
# for idx, cl in zip(_fit_scores.index, _best_fit_estimators):
# print(_fdata_idx(idx)[:, cl])
# print(_fdata_idx(idx).min(axis=1), end="\n\n")
scores = np.array(
[
np.abs(_fdata_idx(idx)[:, cl] - _fdata_idx(idx).min(axis=1)).mean()
for idx, cl in zip(_fit_scores.index, _best_fit_estimators)
]
)
return scores
def data(self, metric: str = None, estimators: List[str] = None) -> pd.DataFrame:
_crs_sorted = sorted(
[(_cr_train_prev(cr), _cr_data(cr, metric, estimators)) for cr in self.crs],
key=lambda cr: len(cr[1].columns),
reverse=True,
)
_crs_train, _crs_data = zip(*_crs_sorted)
_data: pd.DataFrame = pd.concat(
_crs_data,
axis=0,
keys=_crs_train,
)
# The MultiIndex is recreated to make the outer-most level a tuple and not a
# sequence of values
_len_tr_idx = len(_crs_train[0])
_idx = _data.index.to_list()
_idx = pd.MultiIndex.from_tuples(
[tuple([midx[:_len_tr_idx]] + list(midx[_len_tr_idx:])) for midx in _idx]
)
_data.index = _idx
_data = _data.sort_index(axis=0, level=0, ascending=False, sort_remaining=False)
return _data
def shift_data(
self, metric: str = None, estimators: List[str] = None
) -> pd.DataFrame:
_shift_data: pd.DataFrame = pd.concat(
sorted(
[cr.shift_data(metric, estimators) for cr in self.crs],
key=lambda d: len(d.columns),
reverse=True,
),
axis=0,
)
shift_idx_0 = _shift_data.index.get_level_values(0)
shift_idx_1 = np.empty(shape=shift_idx_0.shape, dtype="<i4")
for _id in np.unique(shift_idx_0):
_wh = np.where(shift_idx_0 == _id)[0]
shift_idx_1[_wh] = np.arange(_wh.shape[0])
_shift_data.index = pd.MultiIndex.from_arrays([shift_idx_0, shift_idx_1])
_shift_data = _shift_data.sort_index(axis=0, level=0)
return _shift_data
def add(self, cr: CompReport):
if cr is None:
return
self.crs.append(cr)
def __add__(self, cr: CompReport):
if cr is None:
return
return DatasetReport(self.name, crs=self.crs + [cr])
def __iadd__(self, cr: CompReport):
self.add(cr)
return self
def train_table(
self, metric: str = None, estimators: List[str] = None
) -> pd.DataFrame:
f_data = self.data(metric=metric, estimators=estimators)
avg_p = f_data.groupby(level=1, sort=False).mean()
avg_p.loc["mean", :] = f_data.mean()
return avg_p
def train_std_table(self, metric: str = None, estimators: List[str] = None):
f_data = self.data(metric=metric, estimators=estimators)
avg_p = f_data.groupby(level=1, sort=False).mean()
avg_p.loc["mean", :] = f_data.mean()
avg_s = f_data.groupby(level=1, sort=False).std()
avg_s.loc["mean", :] = f_data.std()
avg_r = pd.concat([avg_p, avg_s], axis=1, keys=["avg", "std"])
return avg_r
def test_table(
self, metric: str = None, estimators: List[str] = None
) -> pd.DataFrame:
f_data = self.data(metric=metric, estimators=estimators)
avg_p = f_data.groupby(level=0, sort=False).mean()
avg_p.loc["mean", :] = f_data.mean()
return avg_p
def shift_table(
self, metric: str = None, estimators: List[str] = None
) -> pd.DataFrame:
f_data = self.shift_data(metric=metric, estimators=estimators)
avg_p = f_data.groupby(level=0, sort=False).mean()
avg_p.loc["mean", :] = f_data.mean()
return avg_p
def get_plots(
self,
data=None,
mode="delta_train",
metric="acc",
estimators=None,
conf="default",
save_fig=True,
base_path=None,
backend=None,
):
if mode == "delta_train":
_data = self.data(metric, estimators) if data is None else data
avg_on_train = _data.groupby(level=1, sort=False).mean()
if avg_on_train.empty:
return None
# sort index in reverse order
avg_on_train = self.sort_delta_train_index(avg_on_train)
prevs_on_train = avg_on_train.index.unique(0)
return plot.plot_delta(
# base_prevs=np.around(
# [(1.0 - p, p) for p in prevs_on_train], decimals=2
# ),
base_prevs=prevs_on_train,
columns=avg_on_train.columns.to_numpy(),
data=avg_on_train.T.to_numpy(),
metric=metric,
name=conf,
train_prev=None,
avg="train",
save_fig=save_fig,
base_path=base_path,
backend=backend,
)
elif mode == "stdev_train":
_data = self.data(metric, estimators) if data is None else data
avg_on_train = _data.groupby(level=1, sort=False).mean()
if avg_on_train.empty:
return None
prevs_on_train = avg_on_train.index.unique(0)
stdev_on_train = _data.groupby(level=1, sort=False).std()
return plot.plot_delta(
# base_prevs=np.around(
# [(1.0 - p, p) for p in prevs_on_train], decimals=2
# ),
base_prevs=prevs_on_train,
columns=avg_on_train.columns.to_numpy(),
data=avg_on_train.T.to_numpy(),
metric=metric,
name=conf,
train_prev=None,
stdevs=stdev_on_train.T.to_numpy(),
avg="train",
save_fig=save_fig,
base_path=base_path,
backend=backend,
)
elif mode == "delta_test":
_data = self.data(metric, estimators) if data is None else data
avg_on_test = _data.groupby(level=0, sort=False).mean()
if avg_on_test.empty:
return None
prevs_on_test = avg_on_test.index.unique(0)
return plot.plot_delta(
# base_prevs=np.around([(1.0 - p, p) for p in prevs_on_test], decimals=2),
base_prevs=prevs_on_test,
columns=avg_on_test.columns.to_numpy(),
data=avg_on_test.T.to_numpy(),
metric=metric,
name=conf,
train_prev=None,
avg="test",
save_fig=save_fig,
base_path=base_path,
backend=backend,
)
elif mode == "stdev_test":
_data = self.data(metric, estimators) if data is None else data
avg_on_test = _data.groupby(level=0, sort=False).mean()
if avg_on_test.empty:
return None
prevs_on_test = avg_on_test.index.unique(0)
stdev_on_test = _data.groupby(level=0, sort=False).std()
return plot.plot_delta(
# base_prevs=np.around([(1.0 - p, p) for p in prevs_on_test], decimals=2),
base_prevs=prevs_on_test,
columns=avg_on_test.columns.to_numpy(),
data=avg_on_test.T.to_numpy(),
metric=metric,
name=conf,
train_prev=None,
stdevs=stdev_on_test.T.to_numpy(),
avg="test",
save_fig=save_fig,
base_path=base_path,
backend=backend,
)
elif mode == "shift":
_shift_data = self.shift_data(metric, estimators) if data is None else data
avg_shift = _shift_data.groupby(level=0, sort=False).mean()
if avg_shift.empty:
return None
count_shift = _shift_data.groupby(level=0, sort=False).count()
prevs_shift = avg_shift.index.unique(0)
return plot.plot_shift(
# shift_prevs=np.around([(1.0 - p, p) for p in prevs_shift], decimals=2),
shift_prevs=prevs_shift,
columns=avg_shift.columns.to_numpy(),
data=avg_shift.T.to_numpy(),
metric=metric,
name=conf,
train_prev=None,
counts=count_shift.T.to_numpy(),
save_fig=save_fig,
base_path=base_path,
backend=backend,
)
elif mode == "fit_scores":
_fit_scores = self.fit_scores(metric, estimators) if data is None else data
if _fit_scores is None:
return None
train_prevs = self.data(metric, estimators).index.unique(0)
return plot.plot_fit_scores(
train_prevs=train_prevs,
scores=_fit_scores,
metric=metric,
name=conf,
save_fig=save_fig,
base_path=base_path,
backend=backend,
)
elif mode == "diagonal":
f_data = self.data(metric=metric + "_score", estimators=estimators)
if f_data.empty:
return None
ref: pd.Series = f_data.loc[:, "ref"]
f_data.drop(columns=["ref"], inplace=True)
return plot.plot_diagonal(
reference=ref.to_numpy(),
columns=f_data.columns.to_numpy(),
data=f_data.T.to_numpy(),
metric=metric,
name=conf,
# train_prev=self.train_prev,
fixed_lim=True,
save_fig=save_fig,
base_path=base_path,
backend=backend,
)
def to_md(
self,
conf="default",
metric="acc",
estimators=[],
dr_modes=_default_dr_modes,
cr_modes=_default_cr_modes,
cr_prevs: List[str] = None,
plot_path=None,
):
res = f"# {self.name}\n\n"
for cr in self.crs:
if (
cr_prevs is not None
and str(round(cr.train_prev[1] * 100)) not in cr_prevs
):
continue
_md = cr.to_md(
conf,
metric=metric,
estimators=estimators,
modes=cr_modes,
plot_path=plot_path,
)
res += f"{_md}\n\n"
_data = self.data(metric=metric, estimators=estimators)
_shift_data = self.shift_data(metric=metric, estimators=estimators)
res += "## avg\n"
######################## avg on train ########################
res += "### avg on train\n"
if "train_table" in dr_modes:
avg_on_train_tbl = _data.groupby(level=1, sort=False).mean()
avg_on_train_tbl.loc["avg", :] = _data.mean()
res += avg_on_train_tbl.to_html() + "\n\n"
if "delta_train" in dr_modes:
_, delta_op = self.get_plots(
data=_data,
mode="delta_train",
metric=metric,
estimators=estimators,
conf=conf,
base_path=plot_path,
save_fig=True,
)
_op = delta_op.relative_to(delta_op.parents[1]).as_posix()
res += f"![plot_delta]({_op})\n"
if "stdev_train" in dr_modes:
_, delta_stdev_op = self.get_plots(
data=_data,
mode="stdev_train",
metric=metric,
estimators=estimators,
conf=conf,
base_path=plot_path,
save_fig=True,
)
_op = delta_stdev_op.relative_to(delta_stdev_op.parents[1]).as_posix()
res += f"![plot_delta_stdev]({_op})\n"
######################## avg on test ########################
res += "### avg on test\n"
if "test_table" in dr_modes:
avg_on_test_tbl = _data.groupby(level=0, sort=False).mean()
avg_on_test_tbl.loc["avg", :] = _data.mean()
res += avg_on_test_tbl.to_html() + "\n\n"
if "delta_test" in dr_modes:
_, delta_op = self.get_plots(
data=_data,
mode="delta_test",
metric=metric,
estimators=estimators,
conf=conf,
base_path=plot_path,
save_fig=True,
)
_op = delta_op.relative_to(delta_op.parents[1]).as_posix()
res += f"![plot_delta]({_op})\n"
if "stdev_test" in dr_modes:
_, delta_stdev_op = self.get_plots(
data=_data,
mode="stdev_test",
metric=metric,
estimators=estimators,
conf=conf,
base_path=plot_path,
save_fig=True,
)
_op = delta_stdev_op.relative_to(delta_stdev_op.parents[1]).as_posix()
res += f"![plot_delta_stdev]({_op})\n"
######################## avg shift ########################
res += "### avg dataset shift\n"
if "shift_table" in dr_modes:
shift_on_train_tbl = _shift_data.groupby(level=0, sort=False).mean()
shift_on_train_tbl.loc["avg", :] = _shift_data.mean()
res += shift_on_train_tbl.to_html() + "\n\n"
if "shift" in dr_modes:
_, shift_op = self.get_plots(
data=_shift_data,
mode="shift",
metric=metric,
estimators=estimators,
conf=conf,
base_path=plot_path,
save_fig=True,
)
_op = shift_op.relative_to(shift_op.parents[1]).as_posix()
res += f"![plot_shift]({_op})\n"
return res
def pickle(self, pickle_path: Path):
with open(pickle_path, "wb") as f:
pickle.dump(self, f)
return self
@classmethod
def unpickle(cls, pickle_path: Path, report_info=False):
with open(pickle_path, "rb") as f:
dr = pickle.load(f)
if report_info:
return DatasetReportInfo(dr, pickle_path)
return dr
def __iter__(self):
return (cr for cr in self.crs)
class DatasetReportInfo:
def __init__(self, dr: DatasetReport, path: Path):
self.dr = dr
self.name = str(path.parent)
_data = dr.data()
self.columns = defaultdict(list)
for metric, estim in _data.columns:
self.columns[estim].append(metric)
# self.columns = list(_data.columns.unique(1))
self.train_prevs = len(self.dr.crs)
self.test_prevs = len(_data.index.unique(1))
self.repeats = len(_data.index.unique(2))
def __repr__(self) -> str:
_d = {
"train prevs.": self.train_prevs,
"test prevs.": self.test_prevs,
"repeats": self.repeats,
"columns": self.columns,
}
_r = f"{self.name}\n{json.dumps(_d, indent=2)}\n"
return _r

View File

@ -1,41 +0,0 @@
from typing import List
import numpy as np
import pandas as pd
from scipy import stats as sp_stats
# from quacc.evaluation.estimators import CE
from quacc.evaluation.report import CompReport, DatasetReport
def shapiro(
r: DatasetReport | CompReport, metric: str = None, estimators: List[str] = None
) -> pd.DataFrame:
_data = r.data(metric, estimators)
shapiro_data = np.array(
[sp_stats.shapiro(_data.loc[:, e]) for e in _data.columns.unique(0)]
).T
dr_index = ["shapiro_W", "shapiro_p"]
dr_columns = _data.columns.unique(0)
return pd.DataFrame(shapiro_data, columns=dr_columns, index=dr_index)
def wilcoxon(
r: DatasetReport | CompReport, metric: str = None, estimators: List[str] = None
) -> pd.DataFrame:
_data = r.data(metric, estimators)
_data = _data.dropna(axis=0, how="any")
_wilcoxon = {}
for est in _data.columns.unique(0):
_wilcoxon[est] = [
sp_stats.wilcoxon(_data.loc[:, est], _data.loc[:, e]).pvalue
if e != est
else 1.0
for e in _data.columns.unique(0)
]
wilcoxon_data = np.array(list(_wilcoxon.values()))
dr_index = list(_wilcoxon.keys())
dr_columns = _data.columns.unique(0)
return pd.DataFrame(wilcoxon_data, columns=dr_columns, index=dr_index)

View File

@ -1,58 +0,0 @@
from traceback import print_exception as traceback
import quacc.evaluation.comp as comp
# from quacc.logger import Logger
from quacc import logger
from quacc.dataset import Dataset
from quacc.environment import env
from quacc.evaluation.estimators import CE
from quacc.utils import create_dataser_dir
def estimate_comparison():
# log = Logger.logger()
log = logger.logger()
for conf in env.load_confs():
dataset = Dataset(
env.DATASET_NAME,
target=env.DATASET_TARGET,
n_prevalences=env.DATASET_N_PREVS,
prevs=env.DATASET_PREVS,
)
create_dataser_dir(
dataset.name,
update=env.DATASET_DIR_UPDATE,
)
# Logger.add_handler(env.OUT_DIR / f"{dataset.name}.log")
logger.add_handler(env.OUT_DIR / f"{dataset.name}.log")
try:
dr = comp.evaluate_comparison(
dataset,
estimators=CE.name[env.COMP_ESTIMATORS],
)
dr.pickle(env.OUT_DIR / f"{dataset.name}.pickle")
except Exception as e:
log.error(f"Evaluation over {dataset.name} failed. Exception: {e}")
traceback(e)
# Logger.clear_handlers()
logger.clear_handlers()
def main():
# log = Logger.logger()
log = logger.setup_logger()
try:
estimate_comparison()
except Exception as e:
log.error(f"estimate comparison failed. Exception: {e}")
traceback(e)
# Logger.close()
logger.logger_manager().close()
if __name__ == "__main__":
main()

View File

@ -1,355 +0,0 @@
from abc import abstractmethod
from copy import deepcopy
from typing import List
import numpy as np
import scipy.sparse as sp
from quapy.data import LabelledCollection
from quapy.method.aggregative import BaseQuantifier
from sklearn.base import BaseEstimator
import quacc.method.confidence as conf
from quacc.data import (
ExtBinPrev,
ExtendedCollection,
ExtendedData,
ExtendedPrev,
ExtensionPolicy,
ExtMulPrev,
)
class BaseAccuracyEstimator(BaseQuantifier):
def __init__(
self,
classifier: BaseEstimator,
quantifier: BaseQuantifier,
dense=False,
):
self.__check_classifier(classifier)
self.quantifier = quantifier
self.extpol = ExtensionPolicy(dense=dense)
def __check_classifier(self, classifier):
if not hasattr(classifier, "predict_proba"):
raise ValueError(
f"Passed classifier {classifier.__class__.__name__} cannot predict probabilities."
)
self.classifier = classifier
def extend(self, coll: LabelledCollection, pred_proba=None) -> ExtendedCollection:
if pred_proba is None:
pred_proba = self.classifier.predict_proba(coll.X)
return ExtendedCollection.from_lc(
coll, pred_proba=pred_proba, ext=pred_proba, extpol=self.extpol
)
def _extend_instances(self, instances: np.ndarray | sp.csr_matrix):
pred_proba = self.classifier.predict_proba(instances)
return ExtendedData(instances, pred_proba=pred_proba, extpol=self.extpol)
@abstractmethod
def fit(self, train: LabelledCollection | ExtendedCollection):
...
@abstractmethod
def estimate(self, instances, ext=False) -> ExtendedPrev:
...
@property
def dense(self):
return self.extpol.dense
class ConfidenceBasedAccuracyEstimator(BaseAccuracyEstimator):
def __init__(
self,
classifier: BaseEstimator,
quantifier: BaseQuantifier,
confidence=None,
):
super().__init__(
classifier=classifier,
quantifier=quantifier,
)
self.__check_confidence(confidence)
self.calibrator = None
def __check_confidence(self, confidence):
if isinstance(confidence, str):
self.confidence = [confidence]
elif isinstance(confidence, list):
self.confidence = confidence
else:
self.confidence = None
def _fit_confidence(self, X, y, probas):
self.confidence_metrics = conf.get_metrics(self.confidence)
if self.confidence_metrics is None:
return
for m in self.confidence_metrics:
m.fit(X, y, probas)
def _get_pred_ext(self, pred_proba: np.ndarray):
return pred_proba
def __get_ext(
self, X: np.ndarray | sp.csr_matrix, pred_proba: np.ndarray
) -> np.ndarray:
if self.confidence_metrics is None or len(self.confidence_metrics) == 0:
return pred_proba
_conf_ext = np.concatenate(
[m.conf(X, pred_proba) for m in self.confidence_metrics],
axis=1,
)
_pred_ext = self._get_pred_ext(pred_proba)
return np.concatenate([_conf_ext, _pred_ext], axis=1)
def extend(
self, coll: LabelledCollection, pred_proba=None, prefit=False
) -> ExtendedCollection:
if pred_proba is None:
pred_proba = self.classifier.predict_proba(coll.X)
if prefit:
self._fit_confidence(coll.X, coll.y, pred_proba)
else:
if not hasattr(self, "confidence_metrics"):
raise AttributeError(
"Confidence metrics are not fit and cannot be computed."
"Consider setting prefit to True."
)
_ext = self.__get_ext(coll.X, pred_proba)
return ExtendedCollection.from_lc(
coll, pred_proba=pred_proba, ext=_ext, extpol=self.extpol
)
def _extend_instances(
self,
instances: np.ndarray | sp.csr_matrix,
) -> ExtendedData:
pred_proba = self.classifier.predict_proba(instances)
_ext = self.__get_ext(instances, pred_proba)
return ExtendedData(
instances, pred_proba=pred_proba, ext=_ext, extpol=self.extpol
)
class MultiClassAccuracyEstimator(ConfidenceBasedAccuracyEstimator):
def __init__(
self,
classifier: BaseEstimator,
quantifier: BaseQuantifier,
confidence: str = None,
collapse_false=False,
group_false=False,
dense=False,
):
super().__init__(
classifier=classifier,
quantifier=quantifier,
confidence=confidence,
)
self.extpol = ExtensionPolicy(
collapse_false=collapse_false,
group_false=group_false,
dense=dense,
)
self.e_train = None
# def _get_pred_ext(self, pred_proba: np.ndarray):
# return np.argmax(pred_proba, axis=1, keepdims=True)
def _get_multi_quant(self, quant, train: LabelledCollection):
_nz = np.nonzero(train.counts())[0]
if _nz.shape[0] == 1:
return TrivialQuantifier(train.n_classes, _nz[0])
else:
return quant
def fit(self, train: LabelledCollection):
pred_proba = self.classifier.predict_proba(train.X)
self._fit_confidence(train.X, train.y, pred_proba)
self.e_train = self.extend(train, pred_proba=pred_proba)
self.quantifier = self._get_multi_quant(self.quantifier, self.e_train)
self.quantifier.fit(self.e_train)
return self
def estimate(
self, instances: ExtendedData | np.ndarray | sp.csr_matrix
) -> ExtendedPrev:
e_inst = instances
if not isinstance(e_inst, ExtendedData):
e_inst = self._extend_instances(instances)
estim_prev = self.quantifier.quantify(e_inst.X)
return ExtMulPrev(
estim_prev,
e_inst.nbcl,
q_classes=self.quantifier.classes_,
extpol=self.extpol,
)
@property
def collapse_false(self):
return self.extpol.collapse_false
@property
def group_false(self):
return self.extpol.group_false
class TrivialQuantifier:
def __init__(self, n_classes, trivial_class):
self.trivial_class = trivial_class
def fit(self, train: LabelledCollection):
pass
def quantify(self, inst: LabelledCollection) -> np.ndarray:
return np.array([1.0])
@property
def classes_(self):
return np.array([self.trivial_class])
class QuantifierProxy:
def __init__(self, train: LabelledCollection):
self.o_nclasses = train.n_classes
self.o_classes = train.classes_
self.o_index = {c: i for i, c in enumerate(train.classes_)}
self.mapping = {}
self.r_mapping = {}
_cnt = 0
for cl, c in zip(train.classes_, train.counts()):
if c > 0:
self.mapping[cl] = _cnt
self.r_mapping[_cnt] = cl
_cnt += 1
self.n_nclasses = len(self.mapping)
def apply_mapping(self, coll: LabelledCollection) -> LabelledCollection:
if not self.proxied:
return coll
n_labels = np.copy(coll.labels)
for k in self.mapping:
n_labels[coll.labels == k] = self.mapping[k]
return LabelledCollection(coll.X, n_labels, classes=np.arange(self.n_nclasses))
def apply_rmapping(self, prevs: np.ndarray, q_classes: np.ndarray) -> np.ndarray:
if not self.proxied:
return prevs, q_classes
n_qclasses = np.array([self.r_mapping[qc] for qc in q_classes])
return prevs, n_qclasses
def get_trivial(self):
return TrivialQuantifier(self.o_nclasses, self.n_nclasses)
@property
def proxied(self):
return self.o_nclasses != self.n_nclasses
class BinaryQuantifierAccuracyEstimator(ConfidenceBasedAccuracyEstimator):
def __init__(
self,
classifier: BaseEstimator,
quantifier: BaseAccuracyEstimator,
confidence: str = None,
group_false: bool = False,
dense: bool = False,
):
super().__init__(
classifier=classifier,
quantifier=quantifier,
confidence=confidence,
)
self.quantifiers = []
self.extpol = ExtensionPolicy(
group_false=group_false,
dense=dense,
)
def _get_binary_quant(self, quant, train: LabelledCollection):
_nz = np.nonzero(train.counts())[0]
if _nz.shape[0] == 1:
return TrivialQuantifier(train.n_classes, _nz[0])
else:
return deepcopy(quant)
def fit(self, train: LabelledCollection | ExtendedCollection):
pred_proba = self.classifier.predict_proba(train.X)
self._fit_confidence(train.X, train.y, pred_proba)
self.e_train = self.extend(train, pred_proba=pred_proba)
self.n_classes = self.e_train.n_classes
e_trains = self.e_train.split_by_pred()
self.quantifiers = []
for train in e_trains:
quant = self._get_binary_quant(self.quantifier, train)
quant.fit(train)
self.quantifiers.append(quant)
return self
def estimate(
self, instances: ExtendedData | np.ndarray | sp.csr_matrix
) -> np.ndarray:
e_inst = instances
if not isinstance(e_inst, ExtendedData):
e_inst = self._extend_instances(instances)
s_inst = e_inst.split_by_pred()
norms = [s_i.shape[0] / len(e_inst) for s_i in s_inst]
estim_prevs = self._quantify_helper(s_inst, norms)
# estim_prev = np.concatenate(estim_prevs.T)
# return ExtendedPrev(estim_prev, e_inst.nbcl, extpol=self.extpol)
return ExtBinPrev(
estim_prevs,
e_inst.nbcl,
q_classes=[quant.classes_ for quant in self.quantifiers],
extpol=self.extpol,
)
def _quantify_helper(
self,
s_inst: List[np.ndarray | sp.csr_matrix],
norms: List[float],
):
estim_prevs = []
for quant, inst, norm in zip(self.quantifiers, s_inst, norms):
if inst.shape[0] > 0:
estim_prev = quant.quantify(inst) * norm
estim_prevs.append(estim_prev)
else:
estim_prevs.append(np.zeros((len(quant.classes_),)))
# return np.array(estim_prevs)
return estim_prevs
@property
def group_false(self):
return self.extpol.group_false
BAE = BaseAccuracyEstimator
MCAE = MultiClassAccuracyEstimator
BQAE = BinaryQuantifierAccuracyEstimator

View File

@ -1,98 +0,0 @@
from typing import List
import numpy as np
import scipy.sparse as sp
from sklearn.linear_model import LinearRegression
import baselines.atc as atc
__confs = {}
def metric(name):
def wrapper(cl):
__confs[name] = cl
return cl
return wrapper
class ConfidenceMetric:
def fit(self, X, y, probas):
pass
def conf(self, X, probas):
return probas
@metric("max_conf")
class MaxConf(ConfidenceMetric):
def conf(self, X, probas):
_mc = np.max(probas, axis=1, keepdims=True)
return _mc
@metric("entropy")
class Entropy(ConfidenceMetric):
def conf(self, X, probas):
_ent = np.sum(
np.multiply(probas, np.log(probas + 1e-20)), axis=1, keepdims=True
)
return _ent
@metric("isoft")
class InverseSoftmax(ConfidenceMetric):
def conf(self, X, probas):
_probas = probas / np.sum(probas, axis=1, keepdims=True)
_probas = np.log(_probas) - np.mean(np.log(_probas), axis=1, keepdims=True)
return np.max(_probas, axis=1, keepdims=True)
@metric("threshold")
class Threshold(ConfidenceMetric):
def get_scores(self, probas, keepdims=False):
return np.max(probas, axis=1, keepdims=keepdims)
def fit(self, X, y, probas):
scores = self.get_scores(probas)
_, self.threshold = atc.find_ATC_threshold(scores, y)
def conf(self, X, probas):
scores = self.get_scores(probas, keepdims=True)
_exp = scores - self.threshold
return _exp
# def conf(self, X, probas):
# scores = self.get_scores(probas)
# _exp = np.where(
# scores >= self.threshold, np.ones(scores.shape), np.zeros(scores.shape)
# )
# return _exp[:, np.newaxis]
@metric("linreg")
class LinReg(ConfidenceMetric):
def extend(self, X, probas):
if sp.issparse(X):
return sp.hstack([X, probas])
else:
return np.concatenate([X, probas], axis=1)
def fit(self, X, y, probas):
reg_X = self.extend(X, probas)
reg_y = probas[np.arange(probas.shape[0]), y]
self.reg = LinearRegression()
self.reg.fit(reg_X, reg_y)
def conf(self, X, probas):
reg_X = self.extend(X, probas)
return self.reg.predict(reg_X)[:, np.newaxis]
def get_metrics(names: List[str]):
if names is None:
return None
__fnames = [n for n in names if n in __confs]
return [__confs[m]() for m in __fnames]

View File

@ -1,481 +0,0 @@
import itertools
import math
import os
from copy import deepcopy
from time import time
from typing import Callable, Union
import numpy as np
from joblib import Parallel
from quapy.data import LabelledCollection
from quapy.protocol import (
AbstractProtocol,
OnLabelledCollectionProtocol,
)
import quacc as qc
import quacc.error
from quacc.data import ExtendedCollection
from quacc.evaluation.evaluate import evaluate
from quacc.logger import logger
from quacc.method.base import (
BaseAccuracyEstimator,
)
class GridSearchAE(BaseAccuracyEstimator):
def __init__(
self,
model: BaseAccuracyEstimator,
param_grid: dict,
protocol: AbstractProtocol,
error: Union[Callable, str] = qc.error.maccd,
refit=True,
# timeout=-1,
n_jobs=None,
verbose=False,
):
self.model = model
self.param_grid = self.__normalize_params(param_grid)
self.protocol = protocol
self.refit = refit
# self.timeout = timeout
self.n_jobs = qc._get_njobs(n_jobs)
self.verbose = verbose
self.__check_error(error)
assert isinstance(protocol, AbstractProtocol), "unknown protocol"
def _sout(self, msg, level=0):
if level > 0 or self.verbose:
print(f"[{self.__class__.__name__}@{self.model.__class__.__name__}]: {msg}")
def __normalize_params(self, params):
__remap = {}
for key in params.keys():
k, delim, sub_key = key.partition("__")
if delim and k == "q":
__remap[key] = f"quantifier__{sub_key}"
return {(__remap[k] if k in __remap else k): v for k, v in params.items()}
def __check_error(self, error):
if error in qc.error.ACCURACY_ERROR:
self.error = error
elif isinstance(error, str):
self.error = qc.error.from_name(error)
elif hasattr(error, "__call__"):
self.error = error
else:
raise ValueError(
f"unexpected error type; must either be a callable function or a str representing\n"
f"the name of an error function in {qc.error.ACCURACY_ERROR_NAMES}"
)
def fit(self, training: LabelledCollection):
"""Learning routine. Fits methods with all combinations of hyperparameters and selects the one minimizing
the error metric.
:param training: the training set on which to optimize the hyperparameters
:return: self
"""
params_keys = list(self.param_grid.keys())
params_values = list(self.param_grid.values())
protocol = self.protocol
self.param_scores_ = {}
self.best_score_ = None
tinit = time()
hyper = [
dict(zip(params_keys, val)) for val in itertools.product(*params_values)
]
self._sout(f"starting model selection with {self.n_jobs =}")
# self._sout("starting model selection")
# scores = [self.__params_eval((params, training)) for params in hyper]
scores = self._select_scores(hyper, training)
for params, score, model in scores:
if score is not None:
if self.best_score_ is None or score < self.best_score_:
self.best_score_ = score
self.best_params_ = params
self.best_model_ = model
self.param_scores_[str(params)] = score
else:
self.param_scores_[str(params)] = "timeout"
tend = time() - tinit
if self.best_score_ is None:
raise TimeoutError("no combination of hyperparameters seem to work")
self._sout(
f"optimization finished: best params {self.best_params_} (score={self.best_score_:.5f}) "
f"[took {tend:.4f}s]",
level=1,
)
# log = Logger.logger()
log = logger()
log.debug(
f"[{self.model.__class__.__name__}] "
f"optimization finished: best params {self.best_params_} (score={self.best_score_:.5f}) "
f"[took {tend:.4f}s]"
)
if self.refit:
if isinstance(protocol, OnLabelledCollectionProtocol):
self._sout("refitting on the whole development set")
self.best_model_.fit(training + protocol.get_labelled_collection())
else:
raise RuntimeWarning(
f'"refit" was requested, but the protocol does not '
f"implement the {OnLabelledCollectionProtocol.__name__} interface"
)
return self
def _select_scores(self, hyper, training):
return qc.utils.parallel(
self._params_eval,
[(params, training) for params in hyper],
n_jobs=self.n_jobs,
verbose=1,
)
def _params_eval(self, params, training, protocol=None):
protocol = self.protocol if protocol is None else protocol
error = self.error
# if self.timeout > 0:
# def handler(signum, frame):
# raise TimeoutError()
# signal.signal(signal.SIGALRM, handler)
tinit = time()
# if self.timeout > 0:
# signal.alarm(self.timeout)
try:
model = deepcopy(self.model)
# overrides default parameters with the parameters being explored at this iteration
model.set_params(**params)
# print({k: v for k, v in model.get_params().items() if k in params})
model.fit(training)
score = evaluate(model, protocol=protocol, error_metric=error)
ttime = time() - tinit
self._sout(
f"hyperparams={params}\t got score {score:.5f} [took {ttime:.4f}s]",
)
# if self.timeout > 0:
# signal.alarm(0)
# except TimeoutError:
# self._sout(f"timeout ({self.timeout}s) reached for config {params}")
# score = None
except ValueError as e:
self._sout(
f"the combination of hyperparameters {params} is invalid. Exception: {e}",
level=1,
)
score = None
# raise e
except Exception as e:
self._sout(
f"something went wrong for config {params}; skipping:"
f"\tException: {e}",
level=1,
)
# raise e
score = None
return params, score, model
def extend(
self, coll: LabelledCollection, pred_proba=None, prefit=False
) -> ExtendedCollection:
assert hasattr(self, "best_model_"), "quantify called before fit"
return self.best_model().extend(coll, pred_proba=pred_proba, prefit=prefit)
def estimate(self, instances):
"""Estimate class prevalence values using the best model found after calling the :meth:`fit` method.
:param instances: sample contanining the instances
:return: a ndarray of shape `(n_classes)` with class prevalence estimates as according to the best model found
by the model selection process.
"""
assert hasattr(self, "best_model_"), "estimate called before fit"
return self.best_model().estimate(instances)
def set_params(self, **parameters):
"""Sets the hyper-parameters to explore.
:param parameters: a dictionary with keys the parameter names and values the list of values to explore
"""
self.param_grid = parameters
def get_params(self, deep=True):
"""Returns the dictionary of hyper-parameters to explore (`param_grid`)
:param deep: Unused
:return: the dictionary `param_grid`
"""
return self.param_grid
def best_model(self):
"""
Returns the best model found after calling the :meth:`fit` method, i.e., the one trained on the combination
of hyper-parameters that minimized the error function.
:return: a trained quantifier
"""
if hasattr(self, "best_model_"):
return self.best_model_
raise ValueError("best_model called before fit")
def best_score(self):
if hasattr(self, "best_score_"):
return self.best_score_
raise ValueError("best_score called before fit")
class RandomizedSearchAE(GridSearchAE):
ERR_THRESHOLD = 1e-4
MAX_ITER_IMPROV = 3
def _select_scores(self, hyper, training: LabelledCollection):
log = logger()
hyper = np.array(hyper)
rand_index = np.random.choice(
np.arange(len(hyper)), size=len(hyper), replace=False
)
_n_jobs = os.cpu_count() + 1 + self.n_jobs if self.n_jobs < 0 else self.n_jobs
batch_size = _n_jobs
log.debug(f"{batch_size = }")
rand_index = list(
rand_index[: (len(hyper) // batch_size) * batch_size].reshape(
(len(hyper) // batch_size, batch_size)
)
) + [rand_index[(len(hyper) // batch_size) * batch_size :]]
scores = []
best_score, iter_from_improv = np.inf, 0
with Parallel(n_jobs=self.n_jobs) as parallel:
for i, ri in enumerate(rand_index):
tstart = time()
_iter_scores = qc.utils.parallel(
self._params_eval,
[(params, training) for params in hyper[ri]],
parallel=parallel,
)
_best_iter_score = np.min(
[s for _, s, _ in _iter_scores if s is not None]
)
log.debug(
f"[iter {i}] best score = {_best_iter_score:.8f} [took {time() - tstart:.3f}s]"
)
scores += _iter_scores
_check, best_score, iter_from_improv = self.__stop_condition(
_best_iter_score, best_score, iter_from_improv
)
if _check:
break
return scores
def __stop_condition(self, best_iter_score, best_score, iter_from_improv):
if best_iter_score < best_score:
_improv = best_score - best_iter_score
best_score = best_iter_score
else:
_improv = 0
if _improv > self.ERR_THRESHOLD:
iter_from_improv = 0
else:
iter_from_improv += 1
return iter_from_improv > self.MAX_ITER_IMPROV, best_score, iter_from_improv
class HalvingSearchAE(GridSearchAE):
def _select_scores(self, hyper, training: LabelledCollection):
log = logger()
hyper = np.array(hyper)
threshold = 22
factor = 3
n_steps = math.ceil(math.log(len(hyper) / threshold, factor))
steps = np.logspace(n_steps, 0, base=1.0 / factor, num=n_steps + 1)
with Parallel(n_jobs=self.n_jobs, verbose=1) as parallel:
for _step in steps:
tstart = time()
_training, _ = (
training.split_stratified(train_prop=_step)
if _step < 1.0
else (training, None)
)
results = qc.utils.parallel(
self._params_eval,
[(params, _training) for params in hyper],
parallel=parallel,
)
scores = [(1.0 if s is None else s) for _, s, _ in results]
res_hyper = np.array([h for h, _, _ in results], dtype="object")
sorted_scores_idx = np.argsort(scores)
best_score = scores[sorted_scores_idx[0]]
hyper = res_hyper[
sorted_scores_idx[: round(len(res_hyper) * (1.0 / factor))]
]
log.debug(
f"[step {_step}] best score = {best_score:.8f} [took {time() - tstart:.3f}s]"
)
return results
class SpiderSearchAE(GridSearchAE):
def __init__(
self,
model: BaseAccuracyEstimator,
param_grid: dict,
protocol: AbstractProtocol,
error: Union[Callable, str] = qc.error.maccd,
refit=True,
n_jobs=None,
verbose=False,
err_threshold=1e-4,
max_iter_improv=0,
pd_th_min=1,
best_width=2,
):
super().__init__(
model=model,
param_grid=param_grid,
protocol=protocol,
error=error,
refit=refit,
n_jobs=n_jobs,
verbose=verbose,
)
self.err_threshold = err_threshold
self.max_iter_improv = max_iter_improv
self.pd_th_min = pd_th_min
self.best_width = best_width
def _select_scores(self, hyper, training: LabelledCollection):
log = logger()
hyper = np.array(hyper)
_n_jobs = os.cpu_count() + 1 + self.n_jobs if self.n_jobs < 0 else self.n_jobs
batch_size = _n_jobs
rand_index = np.arange(len(hyper))
np.random.shuffle(rand_index)
rand_index = rand_index[:batch_size]
remaining_index = np.setdiff1d(np.arange(len(hyper)), rand_index)
_hyper, _hyper_remaining = hyper[rand_index], hyper[remaining_index]
scores = []
best_score, last_best, iter_from_improv = np.inf, np.inf, 0
with Parallel(n_jobs=self.n_jobs, verbose=1) as parallel:
while len(_hyper) > 0:
# log.debug(f"{len(_hyper_remaining)=}")
tstart = time()
_iter_scores = qc.utils.parallel(
self._params_eval,
[(params, training) for params in _hyper],
parallel=parallel,
)
# if all scores are None, select a new random batch
if all([s[1] is None for s in _iter_scores]):
rand_index = np.arange(len(_hyper_remaining))
np.random.shuffle(rand_index)
rand_index = rand_index[:batch_size]
remaining_index = np.setdiff1d(
np.arange(len(_hyper_remaining)), rand_index
)
_hyper = _hyper_remaining[rand_index]
_hyper_remaining = _hyper_remaining[remaining_index]
continue
_sorted_idx = np.argsort(
[1.0 if s is None else s for _, s, _ in _iter_scores]
)
_sorted_scores = np.array(_iter_scores, dtype="object")[_sorted_idx]
_best_iter_params = np.array(
[p for p, _, _ in _sorted_scores], dtype="object"
)
_best_iter_scores = np.array(
[s for _, s, _ in _sorted_scores], dtype="object"
)
for i, (_score, _param) in enumerate(
zip(
_best_iter_scores[: self.best_width],
_best_iter_params[: self.best_width],
)
):
log.debug(
f"[size={len(_hyper)},place={i+1}] best score = {_score:.8f}; "
f"best param = {_param} [took {time() - tstart:.3f}s]"
)
scores += _iter_scores
_improv = best_score - _best_iter_scores[0]
_improv_last = last_best - _best_iter_scores[0]
if _improv > self.err_threshold:
iter_from_improv = 0
best_score = _best_iter_scores[0]
elif _improv_last < 0:
iter_from_improv += 1
last_best = _best_iter_scores[0]
if iter_from_improv > self.max_iter_improv:
break
_new_hyper = np.array([], dtype="object")
for _base_param in _best_iter_params[: self.best_width]:
_rem_pds = np.array(
[
self.__param_distance(_base_param, h)
for h in _hyper_remaining
]
)
_rem_pd_sort_idx = np.argsort(_rem_pds)
# _min_pd = np.min(_rem_pds)
_min_pd_len = (_rem_pds <= self.pd_th_min).nonzero()[0].shape[0]
_new_hyper_idx = _rem_pd_sort_idx[:_min_pd_len]
_hyper_rem_idx = np.setdiff1d(
np.arange(len(_hyper_remaining)), _new_hyper_idx
)
_new_hyper = np.concatenate(
[_new_hyper, _hyper_remaining[_new_hyper_idx]]
)
_hyper_remaining = _hyper_remaining[_hyper_rem_idx]
_hyper = _new_hyper
return scores
def __param_distance(self, param1, param2):
score = 0
for k, v in param1.items():
if param2[k] != v:
score += 1
return score

View File

@ -1,68 +0,0 @@
from pathlib import Path
class BasePlot:
@classmethod
def save_fig(cls, fig, base_path, title) -> Path:
...
@classmethod
def plot_diagonal(
cls,
reference,
columns,
data,
*,
pos_class=1,
title="default",
x_label="true",
y_label="estim.",
fixed_lim=False,
legend=True,
):
...
@classmethod
def plot_delta(
cls,
base_prevs,
columns,
data,
*,
stdevs=None,
pos_class=1,
title="default",
x_label="prevs.",
y_label="error",
legend=True,
):
...
@classmethod
def plot_shift(
cls,
shift_prevs,
columns,
data,
*,
counts=None,
pos_class=1,
title="default",
x_label="true",
y_label="estim.",
legend=True,
):
...
@classmethod
def plot_fit_scores(
train_prevs,
scores,
*,
pos_class=1,
title="default",
x_label="prev.",
y_label="position",
legend=True,
):
...

View File

@ -1,238 +0,0 @@
from pathlib import Path
from re import X
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
from cycler import cycler
from sklearn import base
from quacc import utils
from quacc.plot.base import BasePlot
matplotlib.use("agg")
class MplPlot(BasePlot):
def _get_markers(self, n: int):
ls = "ovx+sDph*^1234X><.Pd"
if n > len(ls):
ls = ls * (n / len(ls) + 1)
return list(ls)[:n]
def save_fig(self, fig, base_path, title) -> Path:
if base_path is None:
base_path = utils.get_quacc_home() / "plots"
output_path = base_path / f"{title}.png"
fig.savefig(output_path, bbox_inches="tight")
return output_path
def plot_delta(
self,
base_prevs,
columns,
data,
*,
stdevs=None,
pos_class=1,
title="default",
x_label="prevs.",
y_label="error",
legend=True,
):
fig, ax = plt.subplots()
ax.set_aspect("auto")
ax.grid()
NUM_COLORS = len(data)
cm = plt.get_cmap("tab10")
if NUM_COLORS > 10:
cm = plt.get_cmap("tab20")
cy = cycler(color=[cm(i) for i in range(NUM_COLORS)])
# base_prevs = base_prevs[:, pos_class]
if isinstance(base_prevs[0], float):
base_prevs = np.around([(1 - bp, bp) for bp in base_prevs], decimals=4)
str_base_prevs = [str(tuple(bp)) for bp in base_prevs]
# xticks = [str(bp) for bp in base_prevs]
xticks = np.arange(len(base_prevs))
for method, deltas, _cy in zip(columns, data, cy):
ax.plot(
xticks,
deltas,
label=method,
color=_cy["color"],
linestyle="-",
marker="o",
markersize=3,
zorder=2,
)
if stdevs is not None:
_col_idx = np.where(columns == method)[0]
stdev = stdevs[_col_idx].flatten()
nn_idx = np.intersect1d(
np.where(deltas != np.nan)[0],
np.where(stdev != np.nan)[0],
)
_bps, _ds, _st = xticks[nn_idx], deltas[nn_idx], stdev[nn_idx]
ax.fill_between(
_bps,
_ds - _st,
_ds + _st,
color=_cy["color"],
alpha=0.25,
)
def format_fn(tick_val, tick_pos):
if int(tick_val) in xticks:
return str_base_prevs[int(tick_val)]
return ""
ax.xaxis.set_major_locator(plt.MaxNLocator(nbins=6, integer=True, prune="both"))
ax.xaxis.set_major_formatter(format_fn)
ax.set(
xlabel=f"{x_label} prevalence",
ylabel=y_label,
title=title,
)
if legend:
ax.legend(loc="center left", bbox_to_anchor=(1, 0.5))
return fig
def plot_diagonal(
self,
reference,
columns,
data,
*,
pos_class=1,
title="default",
x_label="true",
y_label="estim.",
legend=True,
):
fig, ax = plt.subplots()
ax.set_aspect("auto")
ax.grid()
ax.set_aspect("equal")
NUM_COLORS = len(data)
cm = plt.get_cmap("tab10")
if NUM_COLORS > 10:
cm = plt.get_cmap("tab20")
cy = cycler(
color=[cm(i) for i in range(NUM_COLORS)],
marker=self._get_markers(NUM_COLORS),
)
reference = np.array(reference)
x_ticks = np.unique(reference)
x_ticks.sort()
for deltas, _cy in zip(data, cy):
ax.plot(
reference,
deltas,
color=_cy["color"],
linestyle="None",
marker=_cy["marker"],
markersize=3,
zorder=2,
alpha=0.25,
)
# ensure limits are equal for both axes
_alims = np.stack(((ax.get_xlim(), ax.get_ylim())), axis=-1)
_lims = np.array([f(ls) for f, ls in zip([np.min, np.max], _alims)])
ax.set(xlim=tuple(_lims), ylim=tuple(_lims))
for method, deltas, _cy in zip(columns, data, cy):
slope, interc = np.polyfit(reference, deltas, 1)
y_lr = np.array([slope * x + interc for x in _lims])
ax.plot(
_lims,
y_lr,
label=method,
color=_cy["color"],
linestyle="-",
markersize="0",
zorder=1,
)
# plot reference line
ax.plot(
_lims,
_lims,
color="black",
linestyle="--",
markersize=0,
zorder=1,
)
ax.set(xlabel=x_label, ylabel=y_label, title=title)
if legend:
ax.legend(loc="center left", bbox_to_anchor=(1, 0.5))
return fig
def plot_shift(
self,
shift_prevs,
columns,
data,
*,
counts=None,
pos_class=1,
title="default",
x_label="true",
y_label="estim.",
legend=True,
):
fig, ax = plt.subplots()
ax.set_aspect("auto")
ax.grid()
NUM_COLORS = len(data)
cm = plt.get_cmap("tab10")
if NUM_COLORS > 10:
cm = plt.get_cmap("tab20")
cy = cycler(color=[cm(i) for i in range(NUM_COLORS)])
# shift_prevs = shift_prevs[:, pos_class]
for method, shifts, _cy in zip(columns, data, cy):
ax.plot(
shift_prevs,
shifts,
label=method,
color=_cy["color"],
linestyle="-",
marker="o",
markersize=3,
zorder=2,
)
if counts is not None:
_col_idx = np.where(columns == method)[0]
count = counts[_col_idx].flatten()
for prev, shift, cnt in zip(shift_prevs, shifts, count):
label = f"{cnt}"
plt.annotate(
label,
(prev, shift),
textcoords="offset points",
xytext=(0, 10),
ha="center",
color=_cy["color"],
fontsize=12.0,
)
ax.set(xlabel=x_label, ylabel=y_label, title=title)
if legend:
ax.legend(loc="center left", bbox_to_anchor=(1, 0.5))
return fig

View File

@ -1,197 +0,0 @@
from quacc.plot.base import BasePlot
from quacc.plot.mpl import MplPlot
from quacc.plot.plotly import PlotlyPlot
__backend: BasePlot = MplPlot()
def get_backend(name, theme=None):
match name:
case "matplotlib" | "mpl":
return MplPlot()
case "plotly":
return PlotlyPlot(theme=theme)
case _:
return MplPlot()
def plot_delta(
base_prevs,
columns,
data,
*,
stdevs=None,
pos_class=1,
metric="acc",
name="default",
train_prev=None,
legend=True,
avg=None,
save_fig=False,
base_path=None,
backend=None,
):
backend = __backend if backend is None else backend
_base_title = "delta_stdev" if stdevs is not None else "delta"
if train_prev is not None:
t_prev_pos = int(round(train_prev[pos_class] * 100))
title = f"{_base_title}_{name}_{t_prev_pos}_{metric}"
else:
title = f"{_base_title}_{name}_avg_{avg}_{metric}"
if avg is None or avg == "train":
x_label = "Test Prevalence"
else:
x_label = "Train Prevalence"
if metric == "acc":
y_label = "Prediction Error for Vanilla Accuracy"
elif metric == "f1":
y_label = "Prediction Error for F1"
else:
y_label = f"{metric} error"
fig = backend.plot_delta(
base_prevs,
columns,
data,
stdevs=stdevs,
pos_class=pos_class,
title=title,
x_label=x_label,
y_label=y_label,
legend=legend,
)
if save_fig:
output_path = backend.save_fig(fig, base_path, title)
return fig, output_path
return fig
def plot_diagonal(
reference,
columns,
data,
*,
pos_class=1,
metric="acc",
name="default",
train_prev=None,
fixed_lim=False,
legend=True,
save_fig=False,
base_path=None,
backend=None,
):
backend = __backend if backend is None else backend
if train_prev is not None:
t_prev_pos = int(round(train_prev[pos_class] * 100))
title = f"diagonal_{name}_{t_prev_pos}_{metric}"
else:
title = f"diagonal_{name}_{metric}"
if metric == "acc":
x_label = "True Vanilla Accuracy"
y_label = "Estimated Vanilla Accuracy"
else:
x_label = f"true {metric}"
y_label = f"estim. {metric}"
fig = backend.plot_diagonal(
reference,
columns,
data,
pos_class=pos_class,
title=title,
x_label=x_label,
y_label=y_label,
fixed_lim=fixed_lim,
legend=legend,
)
if save_fig:
output_path = backend.save_fig(fig, base_path, title)
return fig, output_path
return fig
def plot_shift(
shift_prevs,
columns,
data,
*,
counts=None,
pos_class=1,
metric="acc",
name="default",
train_prev=None,
legend=True,
save_fig=False,
base_path=None,
backend=None,
):
backend = __backend if backend is None else backend
if train_prev is not None:
t_prev_pos = int(round(train_prev[pos_class] * 100))
title = f"shift_{name}_{t_prev_pos}_{metric}"
else:
title = f"shift_{name}_avg_{metric}"
x_label = "Amount of Prior Probability Shift"
if metric == "acc":
y_label = "Prediction Error for Vanilla Accuracy"
elif metric == "f1":
y_label = "Prediction Error for F1"
else:
y_label = f"{metric} error"
fig = backend.plot_shift(
shift_prevs,
columns,
data,
counts=counts,
pos_class=pos_class,
title=title,
x_label=x_label,
y_label=y_label,
legend=legend,
)
if save_fig:
output_path = backend.save_fig(fig, base_path, title)
return fig, output_path
return fig
def plot_fit_scores(
train_prevs,
scores,
*,
pos_class=1,
metric="acc",
name="default",
legend=True,
save_fig=False,
base_path=None,
backend=None,
):
backend = __backend if backend is None else backend
title = f"fit_scores_{name}_avg_{metric}"
x_label = "train prev."
y_label = "position"
fig = backend.plot_fit_scores(
train_prevs,
scores,
pos_class=pos_class,
title=title,
x_label=x_label,
y_label=y_label,
legend=legend,
)
if save_fig:
output_path = backend.save_fig(fig, base_path, title)
return fig, output_path
return fig

View File

@ -1 +0,0 @@
from .kdey import KDEy

View File

@ -1,86 +0,0 @@
from typing import Union, Callable
import numpy as np
from sklearn.base import BaseEstimator
from sklearn.neighbors import KernelDensity
from quapy.data import LabelledCollection
from quapy.method.aggregative import AggregativeProbabilisticQuantifier, cross_generate_predictions
import quapy as qp
class KDEy(AggregativeProbabilisticQuantifier):
def __init__(self, classifier: BaseEstimator, val_split=10, bandwidth=0.1, n_jobs=None, random_state=0):
self.classifier = classifier
self.val_split = val_split
self.bandwidth = bandwidth
self.n_jobs = n_jobs
self.random_state = random_state
def get_kde_function(self, posteriors):
return KernelDensity(bandwidth=self.bandwidth).fit(posteriors)
def pdf(self, kde, posteriors):
return np.exp(kde.score_samples(posteriors))
def fit(self, data: LabelledCollection, fit_classifier=True, val_split: Union[float, LabelledCollection] = None):
"""
:param data: the training set
:param fit_classifier: set to False to bypass the training (the learner is assumed to be already fit)
:param val_split: either a float in (0,1) indicating the proportion of training instances to use for
validation (e.g., 0.3 for using 30% of the training set as validation data), or a LabelledCollection
indicating the validation set itself, or an int indicating the number k of folds to be used in kFCV
to estimate the parameters
"""
if val_split is None:
val_split = self.val_split
with qp.util.temp_seed(self.random_state):
self.classifier, y, posteriors, classes, class_count = cross_generate_predictions(
data, self.classifier, val_split, probabilistic=True, fit_classifier=fit_classifier, n_jobs=self.n_jobs
)
self.val_densities = [self.get_kde_function(posteriors[y == cat]) for cat in range(data.n_classes)]
return self
def aggregate(self, posteriors: np.ndarray):
"""
Searches for the mixture model parameter (the sought prevalence values) that yields a validation distribution
(the mixture) that best matches the test distribution, in terms of the divergence measure of choice.
:param instances: instances in the sample
:return: a vector of class prevalence estimates
"""
eps = 1e-10
np.random.RandomState(self.random_state)
n_classes = len(self.val_densities)
test_densities = [self.pdf(kde_i, posteriors) for kde_i in self.val_densities]
def neg_loglikelihood(prev):
test_mixture_likelihood = sum(prev_i * dens_i for prev_i, dens_i in zip (prev, test_densities))
test_loglikelihood = np.log(test_mixture_likelihood + eps)
return -np.sum(test_loglikelihood)
return optim_minimize(neg_loglikelihood, n_classes)
def optim_minimize(loss, n_classes):
"""
Searches for the optimal prevalence values, i.e., an `n_classes`-dimensional vector of the (`n_classes`-1)-simplex
that yields the smallest lost. This optimization is carried out by means of a constrained search using scipy's
SLSQP routine.
:param loss: (callable) the function to minimize
:param n_classes: (int) the number of classes, i.e., the dimensionality of the prevalence vector
:return: (ndarray) the best prevalence vector found
"""
from scipy import optimize
# the initial point is set as the uniform distribution
uniform_distribution = np.full(fill_value=1 / n_classes, shape=(n_classes,))
# solutions are bounded to those contained in the unit-simplex
bounds = tuple((0, 1) for _ in range(n_classes)) # values in [0,1]
constraints = ({'type': 'eq', 'fun': lambda x: 1 - sum(x)}) # values summing up to 1
r = optimize.minimize(loss, x0=uniform_distribution, method='SLSQP', bounds=bounds, constraints=constraints)
return r.x

View File

@ -1,108 +0,0 @@
import functools
import os
import shutil
from contextlib import ExitStack
from pathlib import Path
from urllib.request import urlretrieve
import pandas as pd
from joblib import Parallel, delayed
from tqdm import tqdm
from quacc import logger
from quacc.environment import env, environ
def combine_dataframes(dfs, df_index=[]) -> pd.DataFrame:
if len(dfs) < 1:
raise ValueError
if len(dfs) == 1:
return dfs[0]
df = dfs[0]
for ndf in dfs[1:]:
df = df.join(ndf.set_index(df_index), on=df_index)
return df
def avg_group_report(df: pd.DataFrame) -> pd.DataFrame:
def _reduce_func(s1, s2):
return {(n1, n2): v + s2[(n1, n2)] for ((n1, n2), v) in s1.items()}
lst = df.to_dict(orient="records")[1:-1]
summed_series = functools.reduce(_reduce_func, lst)
idx = df.columns.drop([("base", "T"), ("base", "F")])
avg_report = {
(n1, n2): (v / len(lst))
for ((n1, n2), v) in summed_series.items()
if n1 != "base"
}
return pd.DataFrame([avg_report], columns=idx)
def fmt_line_md(s):
return f"> {s} \n"
def create_dataser_dir(dir_name, update=False):
dataset_dir = Path(env.OUT_DIR_NAME) / dir_name
env.OUT_DIR = dataset_dir
if update:
os.makedirs(dataset_dir, exist_ok=True)
else:
shutil.rmtree(dataset_dir, ignore_errors=True)
os.makedirs(dataset_dir)
def get_quacc_home():
home = Path("~/quacc_home").expanduser()
os.makedirs(home, exist_ok=True)
return home
class TqdmUpTo(tqdm):
def update_to(self, b=1, bsize=1, tsize=None):
if tsize is not None:
self.total = tsize
self.update(b * bsize - self.n)
def download_file(url: str, downloaded_path: Path):
with TqdmUpTo(
unit="B",
unit_scale=True,
unit_divisor=1024,
miniters=1,
desc=downloaded_path.name,
) as t:
urlretrieve(url, filename=downloaded_path, reporthook=t.update_to)
def parallel(
func,
f_args=None,
parallel: Parallel = None,
n_jobs=1,
verbose=0,
_env: environ | dict = None,
seed=None,
):
f_args = f_args or []
if _env is None:
_env = {}
elif isinstance(_env, environ):
_env = _env.to_dict()
def wrapper(*args):
if seed is not None:
nonlocal _env
_env = _env | dict(_R_SEED=seed)
with env.load(_env):
return func(*args)
parallel = (
Parallel(n_jobs=n_jobs, verbose=verbose) if parallel is None else parallel
)
return parallel(delayed(wrapper)(*_args) for _args in f_args)