diff --git a/quacc/main_test.py b/quacc/main_test.py index 56aa3a0..1a78815 100644 --- a/quacc/main_test.py +++ b/quacc/main_test.py @@ -1,49 +1,95 @@ -from copy import deepcopy from time import time import numpy as np -from quapy.method.aggregative import SLD -from quapy.protocol import APP, UPP -from sklearn.linear_model import LogisticRegression +import scipy.sparse as sp +from quapy.protocol import APP +from sklearn.linear_model import LinearRegression, LogisticRegression +from sklearn.metrics import accuracy_score -import quacc as qc +from baselines.mandoline import estimate_performance from quacc.dataset import Dataset -from quacc.error import acc -from quacc.evaluation.baseline import ref -from quacc.evaluation.method import mulmc_sld -from quacc.evaluation.report import CompReport, EvaluationReport -from quacc.method.base import MCAE, BinaryQuantifierAccuracyEstimator -from quacc.method.model_selection import GridSearchAE -def test_gs(): +def test_lr(): d = Dataset(name="rcv1", target="CCAT", n_prevalences=1).get_raw() classifier = LogisticRegression() classifier.fit(*d.train.Xy) - quantifier = SLD(LogisticRegression()) - # estimator = MultiClassAccuracyEstimator(classifier, quantifier) - estimator = BinaryQuantifierAccuracyEstimator(classifier, quantifier) + val, _ = d.validation.split_stratified(0.5, random_state=0) + val_X, val_y = val.X, val.y + val_probs = classifier.predict_proba(val_X) - v_train, v_val = d.validation.split_stratified(0.6, random_state=0) - gs_protocol = UPP(v_val, sample_size=1000, repeats=100) - gs_estimator = GridSearchAE( - model=deepcopy(estimator), - param_grid={ - "q__classifier__C": np.logspace(-3, 3, 7), - "q__classifier__class_weight": [None, "balanced"], - "q__recalib": [None, "bcts", "ts"], - }, - refit=False, - protocol=gs_protocol, - verbose=True, - ).fit(v_train) + reg_X = sp.hstack([val_X, val_probs]) + reg_y = val_probs[np.arange(val_probs.shape[0]), val_y] + reg = LinearRegression() + reg.fit(reg_X, reg_y) - estimator.fit(d.validation) + _test_num = 10000 + test_X = d.test.X[:_test_num, :] + test_probs = classifier.predict_proba(test_X) + test_reg_X = sp.hstack([test_X, test_probs]) + reg_pred = reg.predict(test_reg_X) + + def threshold(pred): + # return np.mean( + # (reg.predict(test_reg_X) >= pred) + # == ( + # test_probs[np.arange(_test_num), d.test.y[:_test_num]] == np.max(test_probs, axis=1) + # ) + # ) + return np.mean( + (reg.predict(test_reg_X) >= pred) + == (np.argmax(test_probs, axis=1) == d.test.y[:_test_num]) + ) + + max_p, max_acc = 0, 0 + for p in reg_pred: + acc = threshold(p) + if acc > max_acc: + max_acc = acc + max_p = p + + print(f"{max_p = }, {max_acc = }") + reg_pred = reg_pred - max_p + 0.5 + print(reg_pred) + print(np.mean(reg_pred >= 0.5)) + print(np.mean(np.argmax(test_probs, axis=1) == d.test.y[:_test_num])) + + +def entropy(probas): + return -np.sum(np.multiply(probas, np.log(probas + 1e-20)), axis=1) + + +def get_slices(probas): + ln, ncl = probas.shape + preds = np.argmax(probas, axis=1) + pred_slices = np.full((ln, ncl), fill_value=-1, dtype="<i8") + pred_slices[np.arange(ln), preds] = 1 + + ent = entropy(probas) + n_bins = 10 + range_top = entropy(np.array([np.ones(ncl) / ncl]))[0] + bins = np.linspace(0, range_top, n_bins + 1) + bin_map = np.digitize(ent, bins=bins, right=True) - 1 + ent_slices = np.full((ln, n_bins), fill_value=-1, dtype="<i8") + ent_slices[np.arange(ln), bin_map] = 1 + + return np.concatenate([pred_slices, ent_slices], axis=1) + + +def test_mandoline(): + d = Dataset(name="cifar10", target="dog", n_prevalences=1).get_raw() tstart = time() - erb, ergs = EvaluationReport("base"), EvaluationReport("gs") + classifier = LogisticRegression() + classifier.fit(*d.train.Xy) + + val_probs = classifier.predict_proba(d.validation.X) + val_preds = np.argmax(val_probs, axis=1) + D_val = get_slices(val_probs) + emprical_mat_list_val = (1.0 * (val_preds == d.validation.y))[:, np.newaxis] + protocol = APP( d.test, sample_size=1000, @@ -51,68 +97,19 @@ def test_gs(): repeats=100, return_type="labelled_collection", ) - for sample in protocol(): - e_sample = gs_estimator.extend(sample) - estim_prev_b = estimator.estimate(e_sample.eX) - estim_prev_gs = gs_estimator.estimate(e_sample.eX) - erb.append_row( - sample.prevalence(), - acc=abs(acc(e_sample.prevalence()) - acc(estim_prev_b)), - ) - ergs.append_row( - sample.prevalence(), - acc=abs(acc(e_sample.prevalence()) - acc(estim_prev_gs)), - ) - - cr = CompReport( - [erb, ergs], - "test", - train_prev=d.train_prev, - valid_prev=d.validation_prev, - ) - - print(cr.table()) - print(f"[took {time() - tstart:.3f}s]") - - -def test_mc(): - d = Dataset(name="rcv1", target="CCAT", prevs=[0.9]).get()[0] - classifier = LogisticRegression().fit(*d.train.Xy) - protocol = APP( - d.test, - sample_size=1000, - repeats=100, - n_prevalences=21, - return_type="labelled_collection", - ) - - ref_er = ref(classifier, d.validation, protocol) - mulmc_er = mulmc_sld(classifier, d.validation, protocol) - - cr = CompReport( - [mulmc_er, ref_er], - name="test_mc", - train_prev=d.train_prev, - valid_prev=d.validation_prev, - ) - - with open("test_mc.md", "w") as f: - f.write(cr.data().to_markdown()) - - -def test_et(): - d = Dataset(name="imdb", prevs=[0.5]).get()[0] - classifier = LogisticRegression().fit(*d.train.Xy) - estimator = MCAE( - classifier, - SLD(LogisticRegression(), exact_train_prev=False), - confidence="entropy", - ).fit(d.validation) - e_test = estimator.extend(d.test) - ep = estimator.estimate(e_test.eX) - print(f"estim prev = {qc.error.acc(ep)}") - print(f"true prev {qc.error.acc(e_test.prevalence())}") + res = [] + for test in protocol(): + test_probs = classifier.predict_proba(test.X) + test_preds = np.argmax(test_probs, axis=1) + D_test = get_slices(test_probs) + wp = estimate_performance(D_val, D_test, None, emprical_mat_list_val) + score = wp.all_estimates[0].weighted[0] + res.append(abs(score - accuracy_score(test.y, test_preds))) + print(score) + res = np.array(res).reshape((21, 100)) + print(res.mean(axis=1)) + print(f"time: {time() - tstart}s") if __name__ == "__main__": - test_et() + test_mandoline()