baselines refactored and updated, report updated

This commit is contained in:
Lorenzo Volpi 2023-10-19 02:36:53 +02:00
parent 17b8f4bf6d
commit 210f50b617
12 changed files with 1130 additions and 893 deletions

1129
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -13,9 +13,8 @@ jinja2 = "^3.1.2"
[tool.poetry.scripts]
main = "quacc.main:main"
multi = "quacc.main:estimate_multiclass"
bin = "quacc.main:estimate_binary"
comp = "quacc.main:estimate_comparison"
tohost = "scp_sync:scp_sync_to_host"
[tool.poetry.group.dev.dependencies]

View File

@ -1,4 +1,4 @@
from typing import List, Optional, Self
from typing import List, Optional
import numpy as np
import math
@ -43,7 +43,7 @@ class ExtendedCollection(LabelledCollection):
):
super().__init__(instances, labels, classes=classes)
def split_by_pred(self) -> List[Self]:
def split_by_pred(self):
_ncl = int(math.sqrt(self.n_classes))
_indexes = ExtendedCollection._split_index_by_pred(_ncl, self.instances)
if isinstance(self.instances, np.ndarray):
@ -129,7 +129,7 @@ class ExtendedCollection(LabelledCollection):
@classmethod
def extend_collection(
cls, base: LabelledCollection, pred_proba: np.ndarray
) -> Self:
):
n_classes = base.n_classes
# n_X = [ X | predicted probs. ]

View File

@ -7,18 +7,23 @@ from sklearn.conftest import fetch_rcv1
TRAIN_VAL_PROP = 0.5
def get_imdb() -> Tuple[LabelledCollection]:
def get_imdb(**kwargs) -> Tuple[LabelledCollection]:
train, test = qp.datasets.fetch_reviews("imdb", tfidf=True).train_test
train, validation = train.split_stratified(train_prop=TRAIN_VAL_PROP)
train, validation = train.split_stratified(
train_prop=TRAIN_VAL_PROP, random_state=0
)
return train, validation, test
def get_spambase() -> Tuple[LabelledCollection]:
def get_spambase(**kwargs) -> Tuple[LabelledCollection]:
train, test = qp.datasets.fetch_UCIDataset("spambase", verbose=False).train_test
train, validation = train.split_stratified(train_prop=TRAIN_VAL_PROP)
train, validation = train.split_stratified(
train_prop=TRAIN_VAL_PROP, random_state=0
)
return train, validation, test
# >>> fetch_rcv1().target_names
# >>> fetch_rcv1().target_names
# array(['C11', 'C12', 'C13', 'C14', 'C15', 'C151', 'C1511', 'C152', 'C16',
# 'C17', 'C171', 'C172', 'C173', 'C174', 'C18', 'C181', 'C182',
# 'C183', 'C21', 'C22', 'C23', 'C24', 'C31', 'C311', 'C312', 'C313',
@ -33,11 +38,15 @@ def get_spambase() -> Tuple[LabelledCollection]:
# 'GWELF', 'M11', 'M12', 'M13', 'M131', 'M132', 'M14', 'M141',
# 'M142', 'M143', 'MCAT'], dtype=object)
def get_rcv1(target:str):
def get_rcv1(target = "default", **kwargs):
sample_size = qp.environ["SAMPLE_SIZE"]
n_train = 23149
dataset = fetch_rcv1()
if target == "default":
target = "C12"
if target not in dataset.target_names:
raise ValueError("Invalid target")
@ -46,7 +55,9 @@ def get_rcv1(target:str):
all_train_l, test_l = labels[:n_train], labels[n_train:]
all_train = LabelledCollection(all_train_d, all_train_l, classes=classes)
test = LabelledCollection(test_d, test_l, classes=classes)
train, validation = all_train.split_stratified(train_prop=TRAIN_VAL_PROP)
train, validation = all_train.split_stratified(
train_prop=TRAIN_VAL_PROP, random_state=0
)
return train, validation, test
target_index = np.where(dataset.target_names == target)[0]
@ -58,4 +69,3 @@ def get_rcv1(target:str):
d = dataset_split(dataset.data, target_labels, classes=[0, 1])
return d

View File

@ -24,12 +24,12 @@ def from_name(err_name):
def f1(prev):
den = (2*prev[3]) + prev[1] + prev[2]
if den == 0:
return 1.0
return 0.0
else:
return (2*prev[3])/den
def f1e(prev):
return 1 - f1(prev)
def mae(prev):
def acc(prev):
return (prev[1] + prev[2]) / sum(prev)

View File

@ -1,149 +0,0 @@
import itertools
from quapy.protocol import (
OnLabelledCollectionProtocol,
AbstractStochasticSeededProtocol,
)
from typing import Iterable, Callable, Union
from .estimator import AccuracyEstimator
import pandas as pd
import numpy as np
import quacc.error as error
import statistics as stats
def estimate(
estimator: AccuracyEstimator,
protocol: AbstractStochasticSeededProtocol,
):
# ensure that the protocol returns a LabelledCollection for each iteration
protocol.collator = OnLabelledCollectionProtocol.get_collator("labelled_collection")
base_prevs, true_prevs, estim_prevs = [], [], []
for sample in protocol():
e_sample = estimator.extend(sample)
estim_prev = estimator.estimate(e_sample.X, ext=True)
base_prevs.append(sample.prevalence())
true_prevs.append(e_sample.prevalence())
estim_prevs.append(estim_prev)
return base_prevs, true_prevs, estim_prevs
def avg_groupby_distribution(lst, error_names):
def _bprev(s):
return (s[("base", "F")], s[("base", "T")])
def _normalize_prev(r):
for prev_name in ["true", "estim"]:
raw_prev = [v for ((k0, k1), v) in r.items() if k0 == prev_name]
norm_prev = [v / sum(raw_prev) for v in raw_prev]
for n, v in zip(
itertools.product([prev_name], ["TN", "FP", "FN", "TP"]), norm_prev
):
r[n] = v
return r
current_bprev = _bprev(lst[0])
bprev_cnt = 0
g_lst = [[]]
for s in lst:
if _bprev(s) == current_bprev:
g_lst[bprev_cnt].append(s)
else:
g_lst.append([])
bprev_cnt += 1
current_bprev = _bprev(s)
g_lst[bprev_cnt].append(s)
r_lst = []
for gs in g_lst:
assert len(gs) > 0
r = {}
r[("base", "F")], r[("base", "T")] = _bprev(gs[0])
for pn in [(n1, n2) for ((n1, n2), _) in gs[0].items() if n1 != "base"]:
r[pn] = stats.mean(map(lambda s: s[pn], gs))
r = _normalize_prev(r)
for en in itertools.product(["errors"], error_names):
r[en] = stats.mean(map(lambda s: s[en], gs))
r_lst.append(r)
return r_lst
def evaluation_report(
estimator: AccuracyEstimator,
protocol: AbstractStochasticSeededProtocol,
error_metrics: Iterable[Union[str, Callable]] = "all",
aggregate: bool = True,
prevalence: bool = True,
):
def _report_columns(err_names):
base_cols = list(itertools.product(["base"], ["F", "T"]))
prev_cols = list(itertools.product(["true", "estim"], ["TN", "FP", "FN", "TP"]))
err_cols = list(itertools.product(["errors"], err_names))
return base_cols, prev_cols, err_cols
base_prevs, true_prevs, estim_prevs = estimate(estimator, protocol)
if error_metrics == "all":
error_metrics = ["mae", "f1"]
error_funcs = [
error.from_name(e) if isinstance(e, str) else e for e in error_metrics
]
assert all(hasattr(e, "__call__") for e in error_funcs), "invalid error function"
error_names = [e.__name__ for e in error_funcs]
error_cols = []
for err in error_names:
if err == "mae":
error_cols.extend(["mae estim", "mae true"])
elif err == "f1":
error_cols.extend(["f1 estim", "f1 true"])
elif err == "f1e":
error_cols.extend(["f1e estim", "f1e true"])
else:
error_cols.append(err)
# df_cols = ["base_prev", "true_prev", "estim_prev"] + error_names
base_cols, prev_cols, err_cols = _report_columns(error_cols)
lst = []
for base_prev, true_prev, estim_prev in zip(base_prevs, true_prevs, estim_prevs):
if prevalence:
series = {
k: v
for (k, v) in zip(
base_cols + prev_cols,
np.concatenate((base_prev, true_prev, estim_prev), axis=0),
)
}
df_cols = base_cols + prev_cols + err_cols
else:
series = {k: v for (k, v) in zip(base_cols, base_prev)}
df_cols = base_cols + err_cols
for err in error_cols:
error_funcs = {
"mae true": lambda: error.mae(true_prev),
"mae estim": lambda: error.mae(estim_prev),
"f1 true": lambda: error.f1(true_prev),
"f1 estim": lambda: error.f1(estim_prev),
"f1e true": lambda: error.f1e(true_prev),
"f1e estim": lambda: error.f1e(estim_prev),
}
series[("errors", err)] = error_funcs[err]()
lst.append(series)
lst = avg_groupby_distribution(lst, error_cols) if aggregate else lst
df = pd.DataFrame(
lst,
columns=pd.MultiIndex.from_tuples(df_cols),
)
return df

View File

View File

@ -2,52 +2,73 @@ from statistics import mean
from typing import Dict
import numpy as np
import quapy as qp
from quapy.data import LabelledCollection
from sklearn.base import BaseEstimator
from sklearn.model_selection import cross_validate
import sklearn.metrics as metrics
from quapy.protocol import (
AbstractStochasticSeededProtocol,
OnLabelledCollectionProtocol,
)
from .report import EvaluationReport
import elsahar19_rca.rca as rca
import garg22_ATC.ATC_helper as atc
import guillory21_doc.doc as doc
import jiang18_trustscore.trustscore as trustscore
import lipton_bbse.labelshift as bbse
import pandas as pd
import statistics as stats
def kfcv(c_model: BaseEstimator, validation: LabelledCollection) -> Dict:
scoring = ["f1_macro"]
def kfcv(
c_model: BaseEstimator,
validation: LabelledCollection,
protocol: AbstractStochasticSeededProtocol,
predict_method="predict"
):
c_model_predict = getattr(c_model, predict_method)
scoring = ["accuracy", "f1_macro"]
scores = cross_validate(c_model, validation.X, validation.y, scoring=scoring)
return {"f1_score": mean(scores["test_f1_macro"])}
acc_score = mean(scores["test_accuracy"])
f1_score = mean(scores["test_f1_macro"])
# ensure that the protocol returns a LabelledCollection for each iteration
protocol.collator = OnLabelledCollectionProtocol.get_collator("labelled_collection")
report = EvaluationReport(prefix="kfcv")
for test in protocol():
test_preds = c_model_predict(test.X)
meta_acc = abs(acc_score - metrics.accuracy_score(test.y, test_preds))
meta_f1 = abs(f1_score - metrics.f1_score(test.y, test_preds))
report.append_row(
test.prevalence(),
acc_score=(1. - acc_score),
f1_score=f1_score,
acc=meta_acc,
f1=meta_f1,
)
return report
def avg_groupby_distribution(results):
def base_prev(s):
return (s[("base", "F")], s[("base", "T")])
def reference(
c_model: BaseEstimator,
validation: LabelledCollection,
protocol: AbstractStochasticSeededProtocol,
):
protocol.collator = OnLabelledCollectionProtocol.get_collator("labelled_collection")
c_model_predict = getattr(c_model, "predict_proba")
report = EvaluationReport(prefix="ref")
for test in protocol():
test_probs = c_model_predict(test.X)
test_preds = np.argmax(test_probs, axis=-1)
report.append_row(
test.prevalence(),
acc_score=(1 - metrics.accuracy_score(test.y, test_preds)),
f1_score=metrics.f1_score(test.y, test_preds),
)
grouped_list = {}
for r in results:
bp = base_prev(r)
if bp in grouped_list.keys():
grouped_list[bp].append(r)
else:
grouped_list[bp] = [r]
series = []
for (fp, tp), r_list in grouped_list.items():
assert len(r_list) > 0
r_avg = {}
r_avg[("base", "F")], r_avg[("base", "T")] = fp, tp
for pn in [(n1, n2) for ((n1, n2), _) in r_list[0].items() if n1 != "base"]:
r_avg[pn] = stats.mean(map(lambda r: r[pn], r_list))
series.append(r_avg)
return series
return report
def atc_mc(
@ -69,25 +90,25 @@ def atc_mc(
# ensure that the protocol returns a LabelledCollection for each iteration
protocol.collator = OnLabelledCollectionProtocol.get_collator("labelled_collection")
cols = [
("base", "F"),
("base", "T"),
("atc mc", "accuracy"),
]
results = []
report = EvaluationReport(prefix="atc_mc")
for test in protocol():
## Load OOD test data probs
test_probs = c_model_predict(test.X)
test_preds = np.argmax(test_probs, axis=-1)
test_scores = atc.get_max_conf(test_probs)
atc_accuracy = 1.0 - (atc.get_ATC_acc(atc_thres, test_scores) / 100.0)
[f_prev, t_prev] = test.prevalence()
results.append({k: v for k, v in zip(cols, [f_prev, t_prev, atc_accuracy])})
atc_accuracy = atc.get_ATC_acc(atc_thres, test_scores)
meta_acc = abs(atc_accuracy - metrics.accuracy_score(test.y, test_preds))
f1_score = atc.get_ATC_f1(atc_thres, test_scores, test_probs)
meta_f1 = abs(f1_score - metrics.f1_score(test.y, test_preds))
report.append_row(
test.prevalence(),
acc=meta_acc,
acc_score=1.0 - atc_accuracy,
f1_score=f1_score,
f1=meta_f1,
)
series = avg_groupby_distribution(results)
return pd.DataFrame(
series,
columns=pd.MultiIndex.from_tuples(cols),
)
return report
def atc_ne(
@ -109,25 +130,25 @@ def atc_ne(
# ensure that the protocol returns a LabelledCollection for each iteration
protocol.collator = OnLabelledCollectionProtocol.get_collator("labelled_collection")
cols = [
("base", "F"),
("base", "T"),
("atc ne", "accuracy"),
]
results = []
report = EvaluationReport(prefix="atc_ne")
for test in protocol():
## Load OOD test data probs
test_probs = c_model_predict(test.X)
test_preds = np.argmax(test_probs, axis=-1)
test_scores = atc.get_entropy(test_probs)
atc_accuracy = 1.0 - (atc.get_ATC_acc(atc_thres, test_scores) / 100.0)
[f_prev, t_prev] = test.prevalence()
results.append({k: v for k, v in zip(cols, [f_prev, t_prev, atc_accuracy])})
atc_accuracy = atc.get_ATC_acc(atc_thres, test_scores)
meta_acc = abs(atc_accuracy - metrics.accuracy_score(test.y, test_preds))
f1_score = atc.get_ATC_f1(atc_thres, test_scores, test_probs)
meta_f1 = abs(f1_score - metrics.f1_score(test.y, test_preds))
report.append_row(
test.prevalence(),
acc=meta_acc,
acc_score=(1.0 - atc_accuracy),
f1_score=f1_score,
f1=meta_f1,
)
series = avg_groupby_distribution(results)
return pd.DataFrame(
series,
columns=pd.MultiIndex.from_tuples(cols),
)
return report
def trust_score(
@ -162,24 +183,16 @@ def doc_feat(
# ensure that the protocol returns a LabelledCollection for each iteration
protocol.collator = OnLabelledCollectionProtocol.get_collator("labelled_collection")
cols = [
("base", "F"),
("base", "T"),
("doc feat", "score"),
]
results = []
report = EvaluationReport(prefix="doc_feat")
for test in protocol():
test_probs = c_model_predict(test.X)
test_preds = np.argmax(test_probs, axis=-1)
test_scores = np.max(test_probs, axis=-1)
score = 1.0 - ((v1acc + doc.get_doc(val_scores, test_scores)) / 100.0)
[f_prev, t_prev] = test.prevalence()
results.append({k: v for k, v in zip(cols, [f_prev, t_prev, score])})
score = (v1acc + doc.get_doc(val_scores, test_scores)) / 100.0
meta_acc = abs(score - metrics.accuracy_score(test.y, test_preds))
report.append_row(test.prevalence(), acc=meta_acc, acc_score=(1.0 - score))
series = avg_groupby_distribution(results)
return pd.DataFrame(
series,
columns=pd.MultiIndex.from_tuples(cols),
)
return report
def rca_score(
@ -194,29 +207,24 @@ def rca_score(
# ensure that the protocol returns a LabelledCollection for each iteration
protocol.collator = OnLabelledCollectionProtocol.get_collator("labelled_collection")
cols = [
("base", "F"),
("base", "T"),
("rca", "score"),
]
results = []
report = EvaluationReport(prefix="rca")
for test in protocol():
try:
[f_prev, t_prev] = test.prevalence()
try:
test_pred = c_model_predict(test.X)
c_model2 = rca.clone_fit(c_model, test.X, test_pred)
c_model2_predict = getattr(c_model2, predict_method)
val_pred2 = c_model2_predict(validation.X)
rca_score = rca.get_score(val_pred1, val_pred2, validation.y)
results.append({k: v for k, v in zip(cols, [f_prev, t_prev, rca_score])})
meta_score = abs(
rca_score - (1 - metrics.accuracy_score(test.y, test_pred))
)
report.append_row(test.prevalence(), acc=meta_score, acc_score=rca_score)
except ValueError:
results.append({k: v for k, v in zip(cols, [f_prev, t_prev, float("nan")])})
report.append_row(
test.prevalence(), acc=float("nan"), acc_score=float("nan")
)
series = avg_groupby_distribution(results)
return pd.DataFrame(
series,
columns=pd.MultiIndex.from_tuples(cols),
)
return report
def rca_star_score(
@ -226,7 +234,9 @@ def rca_star_score(
predict_method="predict",
):
c_model_predict = getattr(c_model, predict_method)
validation1, validation2 = validation.split_stratified(train_prop=0.5)
validation1, validation2 = validation.split_stratified(
train_prop=0.5, random_state=0
)
val1_pred = c_model_predict(validation1.X)
c_model1 = rca.clone_fit(c_model, validation1.X, val1_pred)
c_model1_predict = getattr(c_model1, predict_method)
@ -235,62 +245,23 @@ def rca_star_score(
# ensure that the protocol returns a LabelledCollection for each iteration
protocol.collator = OnLabelledCollectionProtocol.get_collator("labelled_collection")
cols = [
("base", "F"),
("base", "T"),
("rca*", "score"),
]
results = []
report = EvaluationReport(prefix="rca_star")
for test in protocol():
[f_prev, t_prev] = test.prevalence()
try:
test_pred = c_model_predict(test.X)
c_model2 = rca.clone_fit(c_model, test.X, test_pred)
c_model2_predict = getattr(c_model2, predict_method)
val2_pred2 = c_model2_predict(validation2.X)
rca_star_score = rca.get_score(val2_pred1, val2_pred2, validation2.y)
results.append(
{k: v for k, v in zip(cols, [f_prev, t_prev, rca_star_score])}
meta_score = abs(
rca_star_score - (1 - metrics.accuracy_score(test.y, test_pred))
)
report.append_row(
test.prevalence(), acc=meta_score, acc_score=rca_star_score
)
except ValueError:
results.append({k: v for k, v in zip(cols, [f_prev, t_prev, float("nan")])})
report.append_row(
test.prevalence(), acc=float("nan"), acc_score=float("nan")
)
series = avg_groupby_distribution(results)
return pd.DataFrame(
series,
columns=pd.MultiIndex.from_tuples(cols),
)
def bbse_score(
c_model: BaseEstimator,
validation: LabelledCollection,
protocol: AbstractStochasticSeededProtocol,
predict_method="predict_proba",
):
c_model_predict = getattr(c_model, predict_method)
val_probs, val_labels = c_model_predict(validation.X), validation.y
# ensure that the protocol returns a LabelledCollection for each iteration
protocol.collator = OnLabelledCollectionProtocol.get_collator("labelled_collection")
cols = [
("base", "F"),
("base", "T"),
("bbse", "score"),
]
results = []
for test in protocol():
test_probs = c_model_predict(test.X)
wt = bbse.estimate_labelshift_ratio(val_labels, val_probs, test_probs, 2)
estim_prev = bbse.estimate_target_dist(wt, val_labels, 2)[1]
true_prev = test.prevalence()
[f_prev, t_prev] = true_prev
acc = qp.error.ae(true_prev, estim_prev)
results.append({k: v for k, v in zip(cols, [f_prev, t_prev, acc])})
series = avg_groupby_distribution(results)
return pd.DataFrame(
series,
columns=pd.MultiIndex.from_tuples(cols),
)
return report

149
quacc/evaluation/method.py Normal file
View File

@ -0,0 +1,149 @@
import multiprocessing
import time
import pandas as pd
import quapy as qp
from quapy.data import LabelledCollection
from quapy.protocol import (
APP,
AbstractStochasticSeededProtocol,
OnLabelledCollectionProtocol,
)
from sklearn.base import BaseEstimator
from sklearn.linear_model import LogisticRegression
import quacc.error as error
import quacc.evaluation.baseline as baseline
from quacc.dataset import get_imdb, get_rcv1, get_spambase
from quacc.evaluation.report import EvaluationReport
from ..estimator import (
AccuracyEstimator,
BinaryQuantifierAccuracyEstimator,
MulticlassAccuracyEstimator,
)
qp.environ["SAMPLE_SIZE"] = 100
pd.set_option("display.float_format", "{:.4f}".format)
n_prevalences = 21
repreats = 100
def estimate(
estimator: AccuracyEstimator,
protocol: AbstractStochasticSeededProtocol,
):
# ensure that the protocol returns a LabelledCollection for each iteration
protocol.collator = OnLabelledCollectionProtocol.get_collator("labelled_collection")
base_prevs, true_prevs, estim_prevs = [], [], []
for sample in protocol():
e_sample = estimator.extend(sample)
estim_prev = estimator.estimate(e_sample.X, ext=True)
base_prevs.append(sample.prevalence())
true_prevs.append(e_sample.prevalence())
estim_prevs.append(estim_prev)
return base_prevs, true_prevs, estim_prevs
def evaluation_report(
estimator: AccuracyEstimator,
protocol: AbstractStochasticSeededProtocol,
method: str,
) -> EvaluationReport:
base_prevs, true_prevs, estim_prevs = estimate(estimator, protocol)
report = EvaluationReport(prefix=method)
for base_prev, true_prev, estim_prev in zip(base_prevs, true_prevs, estim_prevs):
acc_score = error.acc(estim_prev)
f1_score = error.f1(estim_prev)
report.append_row(
base_prev,
acc_score=1. - acc_score,
acc = abs(error.acc(true_prev) - acc_score),
f1_score=f1_score,
f1=abs(error.f1(true_prev) - f1_score)
)
return report
def evaluate(
c_model: BaseEstimator,
validation: LabelledCollection,
protocol: AbstractStochasticSeededProtocol,
method: str,
):
estimator : AccuracyEstimator = {
"bin": BinaryQuantifierAccuracyEstimator,
"mul": MulticlassAccuracyEstimator,
}[method](c_model)
estimator.fit(validation)
return evaluation_report(estimator, protocol, method)
def evaluate_binary(model, validation, protocol):
return evaluate(model, validation, protocol, "bin")
def evaluate_multiclass(model, validation, protocol):
return evaluate(model, validation, protocol, "mul")
def fit_and_estimate(_estimate, train, validation, test):
model = LogisticRegression()
model.fit(*train.Xy)
protocol = APP(test, n_prevalences=n_prevalences, repeats=repreats)
start = time.time()
result = _estimate(model, validation, protocol)
end = time.time()
return {
"name": _estimate.__name__,
"result": result,
"time": end - start,
}
def evaluate_comparison(dataset: str, **kwargs) -> EvaluationReport:
train, validation, test = {
"spambase": get_spambase,
"imdb": get_imdb,
"rcv1": get_rcv1,
}[dataset](**kwargs)
for k,v in kwargs.items():
print(k, ":", v)
prevs = {
"train": train.prevalence(),
"validation": validation.prevalence(),
}
start = time.time()
with multiprocessing.Pool(8) as pool:
estimators = [
evaluate_binary,
evaluate_multiclass,
baseline.kfcv,
baseline.atc_mc,
baseline.atc_ne,
baseline.doc_feat,
baseline.rca_score,
baseline.rca_star_score,
]
tasks = [(estim, train, validation, test) for estim in estimators]
results = [pool.apply_async(fit_and_estimate, t) for t in tasks]
results = list(map(lambda r: r.get(), results))
er = EvaluationReport.combine_reports(*list(map(lambda r: r["result"], results)))
times = {r["name"]:r["time"] for r in results}
end = time.time()
times["tot"] = end - start
er.times = times
er.prevs = prevs
return er

162
quacc/evaluation/report.py Normal file
View File

@ -0,0 +1,162 @@
from email import header
from typing import Tuple
import statistics as stats
import numpy as np
import pandas as pd
def _in_div(s):
return "<div>" + s + "</div>\n"
def _header_footer(s):
return (
"""
<html>
<head>
<style>
.dataframe {
tr:hover {
background-color: aquamarine;
}
}
</style>
</head>
<body>
""" +
s +
"</body></html>"
)
class EvaluationReport:
def __init__(self, prefix=None):
self.base = []
self.dict = {}
self._grouped = False
self._grouped_base = []
self._grouped_dict = {}
self._dataframe = None
self.prefix = prefix if prefix is not None else "default"
self._times = {}
self._prevs = {}
self._target = "default"
def append_row(self, base: np.ndarray | Tuple, **row):
if isinstance(base, np.ndarray):
base = tuple(base.tolist())
self.base.append(base)
for k, v in row.items():
if (k, self.prefix) in self.dict:
self.dict[(k, self.prefix)].append(v)
else:
self.dict[(k, self.prefix)] = [v]
self._grouped = False
self._dataframe = None
@property
def columns(self):
return self.dict.keys()
@property
def grouped(self):
if self._grouped:
return self._grouped_dict
self._grouped_base = []
self._grouped_dict = {k: [] for k in self.dict.keys()}
last_end = 0
for ind, bp in enumerate(self.base):
if ind < (len(self.base) - 1) and bp == self.base[ind + 1]:
continue
self._grouped_base.append(bp)
for col in self.dict.keys():
self._grouped_dict[col].append(
stats.mean(self.dict[col][last_end : ind + 1])
)
last_end = ind + 1
self._grouped = True
return self._grouped_dict
@property
def gbase(self):
self.grouped
return self._grouped_base
def get_dataframe(self, metrics=None):
if self._dataframe is None:
self_columns = sorted(self.columns, key=lambda c: c[0])
self._dataframe = pd.DataFrame(
self.grouped,
index=self.gbase,
columns=pd.MultiIndex.from_tuples(self_columns),
)
df = pd.DataFrame(self._dataframe)
if metrics is not None:
df = df.drop(
[(c0, c1) for (c0, c1) in df.columns if c0 not in metrics], axis=1
)
if len(set(k0 for k0, k1 in df.columns)) == 1:
df = df.droplevel(0, axis=1)
return df
def merge(self, other):
if not all(v1 == v2 for v1, v2 in zip(self.base, other.base)):
raise ValueError("other has not same base prevalences of self")
if len(set(self.dict.keys()).intersection(set(other.dict.keys()))) > 0:
raise ValueError("self and other have matching keys")
report = EvaluationReport()
report.base = self.base
report.dict = self.dict | other.dict
return report
@property
def times(self):
return self._times
@times.setter
def times(self, val):
self._times = val
@property
def prevs(self):
return self._prevs
@prevs.setter
def prevs(self, val):
self._prevs = val
@property
def target(self):
return self._target
@target.setter
def target(self, val):
self._target = val
def to_html(self, *metrics):
res = _in_div("target: " + self.target)
for k,v in self.prevs.items():
res += _in_div(f"{k}: {str(v)}")
for k,v in self.times.items():
res += _in_div(f"{k}: {v:.3f}s")
res += "\n"
for m in metrics:
res += self.get_dataframe(metrics=m).to_html() + "\n\n"
return _header_footer(res)
@staticmethod
def combine_reports(*args):
er = args[0]
for r in args[1:]:
er = er.merge(r)
return er

View File

@ -1,133 +1,41 @@
import pandas as pd
import quapy as qp
from quapy.protocol import APP
from sklearn.linear_model import LogisticRegression
from quacc import utils
import traceback
import quacc.evaluation.method as method
import quacc.evaluation as eval
import quacc.baseline as baseline
from quacc.estimator import (
BinaryQuantifierAccuracyEstimator,
MulticlassAccuracyEstimator,
)
from quacc.dataset import get_imdb, get_rcv1, get_spambase
qp.environ["SAMPLE_SIZE"] = 100
pd.set_option("display.float_format", "{:.4f}".format)
dataset_name = "imdb"
def estimate_multiclass():
print(dataset_name)
train, validation, test = get_imdb()
model = LogisticRegression()
print(f"fitting model {model.__class__.__name__}...", end=" ", flush=True)
model.fit(*train.Xy)
print("fit")
estimator = MulticlassAccuracyEstimator(model)
print(
f"fitting qmodel {estimator.q_model.__class__.__name__}...", end=" ", flush=True
)
estimator.fit(train)
print("fit")
n_prevalences = 21
repreats = 1000
protocol = APP(test, n_prevalences=n_prevalences, repeats=repreats)
print(
f"Tests:\n\
protocol={protocol.__class__.__name__}\n\
n_prevalences={n_prevalences}\n\
repreats={repreats}\n\
executing...\n"
)
df = eval.evaluation_report(
estimator,
protocol,
aggregate=True,
)
# print(df.to_latex())
print(df.to_string())
# print(df.to_html())
print()
def estimate_binary():
print(dataset_name)
train, validation, test = get_imdb()
model = LogisticRegression()
print(f"fitting model {model.__class__.__name__}...", end=" ", flush=True)
model.fit(*train.Xy)
print("fit")
estimator = BinaryQuantifierAccuracyEstimator(model)
print(
f"fitting qmodel {estimator.q_model_0.__class__.__name__}...",
end=" ",
flush=True,
)
estimator.fit(train)
print("fit")
n_prevalences = 21
repreats = 1000
protocol = APP(test, n_prevalences=n_prevalences, repeats=repreats)
print(
f"Tests:\n\
protocol={protocol.__class__.__name__}\n\
n_prevalences={n_prevalences}\n\
repreats={repreats}\n\
executing...\n"
)
df = eval.evaluation_report(
estimator,
protocol,
aggregate=True,
)
# print(df.to_latex(float_format="{:.4f}".format))
print(df.to_string())
# print(df.to_html())
print()
DATASET = "imdb"
OUTPUT_FILE = "out_" + DATASET + ".html"
TARGETS = {
"rcv1" : [
'C12',
'C13', 'C15', 'C151', 'C1511', 'C152', 'C17', 'C172',
'C18', 'C181', 'C21', 'C24', 'C31', 'C42', 'CCAT'
'E11', 'E12', 'E21', 'E211', 'E212', 'E41', 'E51', 'ECAT',
'G15', 'GCAT', 'GCRIM', 'GDIP', 'GPOL', 'GVIO', 'GVOTE', 'GWEA',
'GWELF', 'M11', 'M12', 'M13', 'M131', 'M132', 'M14', 'M141',
'M142', 'M143', 'MCAT'
],
"spambase": ["default"],
"imdb": ["default"],
}
def estimate_comparison():
train, validation, test = get_spambase()
model = LogisticRegression()
model.fit(*train.Xy)
open(OUTPUT_FILE, "w").close()
targets = TARGETS[DATASET]
for target in targets:
try:
er = method.evaluate_comparison(DATASET, target=target)
er.target = target
with open(OUTPUT_FILE, "a") as f:
f.write(er.to_html(["acc"], ["f1"]))
except Exception:
traceback.print_exc()
n_prevalences = 21
repreats = 1000
protocol = APP(test, n_prevalences=n_prevalences, repeats=repreats)
# print(df.to_latex(float_format="{:.4f}".format))
# print(utils.avg_group_report(df).to_latex(float_format="{:.4f}".format))
estimator = BinaryQuantifierAccuracyEstimator(model)
estimator.fit(validation)
df = eval.evaluation_report(estimator, protocol, prevalence=False)
df = utils.combine_dataframes(
baseline.atc_mc(model, validation, protocol),
baseline.atc_ne(model, validation, protocol),
baseline.doc_feat(model, validation, protocol),
baseline.rca_score(model, validation, protocol),
baseline.rca_star_score(model, validation, protocol),
baseline.bbse_score(model, validation, protocol),
df,
df_index=[("base", "F"), ("base", "T")]
)
print(df.to_latex(float_format="{:.4f}".format))
print(utils.avg_group_report(df).to_latex(float_format="{:.4f}".format))
def main():
estimate_comparison()
if __name__ == "__main__":
main()

View File

@ -2,7 +2,7 @@
import functools
import pandas as pd
def combine_dataframes(*dfs, df_index=[]) -> pd.DataFrame:
def combine_dataframes(dfs, df_index=[]) -> pd.DataFrame:
if len(dfs) < 1:
raise ValueError
if len(dfs) == 1: