trust score imported

This commit is contained in:
Lorenzo Volpi 2023-09-16 01:59:49 +02:00
parent 2235fd35c0
commit d6b1f6e796
8 changed files with 462 additions and 10 deletions

BIN
.coverage

Binary file not shown.

View File

@ -0,0 +1,141 @@
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
from sklearn.neighbors import KDTree, KNeighborsClassifier
class TrustScore:
"""
Trust Score: a measure of classifier uncertainty based on nearest neighbors.
"""
def __init__(self, k=10, alpha=0.0, filtering="none", min_dist=1e-12):
"""
k and alpha are the tuning parameters for the filtering,
filtering: method of filtering. option are "none", "density",
"uncertainty"
min_dist: some small number to mitigate possible division by 0.
"""
self.k = k
self.filtering = filtering
self.alpha = alpha
self.min_dist = min_dist
def filter_by_density(self, X: np.array):
"""Filter out points with low kNN density.
Args:
X: an array of sample points.
Returns:
A subset of the array without points in the bottom alpha-fraction of
original points of kNN density.
"""
kdtree = KDTree(X)
knn_radii = kdtree.query(X, k=self.k)[0][:, -1]
eps = np.percentile(knn_radii, (1 - self.alpha) * 100)
return X[np.where(knn_radii <= eps)[0], :]
def filter_by_uncertainty(self, X: np.array, y: np.array):
"""Filter out points with high label disagreement amongst its kNN neighbors.
Args:
X: an array of sample points.
Returns:
A subset of the array without points in the bottom alpha-fraction of
samples with highest disagreement amongst its k nearest neighbors.
"""
neigh = KNeighborsClassifier(n_neighbors=self.k)
neigh.fit(X, y)
confidence = neigh.predict_proba(X)
cutoff = np.percentile(confidence, self.alpha * 100)
unfiltered_idxs = np.where(confidence >= cutoff)[0]
return X[unfiltered_idxs, :], y[unfiltered_idxs]
def fit(self, X: np.array, y: np.array):
"""Initialize trust score precomputations with training data.
WARNING: assumes that the labels are 0-indexed (i.e.
0, 1,..., n_labels-1).
Args:
X: an array of sample points.
y: corresponding labels.
"""
self.n_labels = np.max(y) + 1
self.kdtrees = [None] * self.n_labels
if self.filtering == "uncertainty":
X_filtered, y_filtered = self.filter_by_uncertainty(X, y)
for label in range(self.n_labels):
if self.filtering == "none":
X_to_use = X[np.where(y == label)[0]]
self.kdtrees[label] = KDTree(X_to_use)
elif self.filtering == "density":
X_to_use = self.filter_by_density(X[np.where(y == label)[0]])
self.kdtrees[label] = KDTree(X_to_use)
elif self.filtering == "uncertainty":
X_to_use = X_filtered[np.where(y_filtered == label)[0]]
self.kdtrees[label] = KDTree(X_to_use)
if len(X_to_use) == 0:
print(
"Filtered too much or missing examples from a label! Please lower "
"alpha or check data."
)
def get_score(self, X: np.array, y_pred: np.array):
"""Compute the trust scores.
Given a set of points, determines the distance to each class.
Args:
X: an array of sample points.
y_pred: The predicted labels for these points.
Returns:
The trust score, which is ratio of distance to closest class that was not
the predicted class to the distance to the predicted class.
"""
d = np.tile(None, (X.shape[0], self.n_labels))
for label_idx in range(self.n_labels):
d[:, label_idx] = self.kdtrees[label_idx].query(X, k=2)[0][:, -1]
sorted_d = np.sort(d, axis=1)
d_to_pred = d[range(d.shape[0]), y_pred]
d_to_closest_not_pred = np.where(
sorted_d[:, 0] != d_to_pred, sorted_d[:, 0], sorted_d[:, 1]
)
return d_to_closest_not_pred / (d_to_pred + self.min_dist)
class KNNConfidence:
"""Baseline which uses disagreement to kNN classifier.
"""
def __init__(self, k=10):
self.k = k
def fit(self, X, y):
self.kdtree = KDTree(X)
self.y = y
def get_score(self, X, y_pred):
knn_idxs = self.kdtree.query(X, k=self.k)[1]
knn_outputs = self.y[knn_idxs]
return np.mean(
knn_outputs == np.transpose(np.tile(y_pred, (self.k, 1))), axis=1
)

View File

@ -0,0 +1,286 @@
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
from sklearn.model_selection import StratifiedShuffleSplit
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
import matplotlib.cm as cm
from sklearn.metrics import precision_recall_curve
import tensorflow as tf
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC
from sklearn.ensemble import RandomForestClassifier
def run_logistic(X_train, y_train, X_test, y_test, get_training=False):
model = LogisticRegression()
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
all_confidence = model.predict_proba(X_test)
confidences = all_confidence[range(len(y_pred)), y_pred]
if not get_training:
return y_pred, confidences
y_pred_training = model.predict(X_train)
all_confidence_training = model.predict_proba(X_train)
confidence_training = all_confidence_training[range(len(y_pred_training)),
y_pred_training]
return y_pred, confidences, y_pred_training, confidence_training
def run_linear_svc(X_train, y_train, X_test, y_test, get_training=False):
model = LinearSVC()
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
all_confidence = model.decision_function(X_test)
confidences = all_confidence[range(len(y_pred)), y_pred]
if not get_training:
return y_pred, confidences
y_pred_training = model.predict(X_train)
all_confidence_training = model.decision_function(X_train)
confidence_training = all_confidence_training[range(len(y_pred_training)),
y_pred_training]
return y_pred, confidences, y_pred_training, confidence_training
def run_random_forest(X_train, y_train, X_test, y_test, get_training=False):
model = RandomForestClassifier()
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
all_confidence = model.predict_proba(X_test)
confidences = all_confidence[range(len(y_pred)), y_pred]
if not get_training:
return y_pred, confidences
y_pred_training = model.predict(X_train)
all_confidence_training = model.predict_proba(X_train)
confidence_training = all_confidence_training[range(len(y_pred_training)),
y_pred_training]
return y_pred, confidences, y_pred_training, confidence_training
def run_simple_NN(X,
y,
X_test,
y_test,
num_iter=10000,
hidden_units=100,
learning_rate=0.05,
batch_size=100,
display_steps=1000,
n_layers=1,
get_training=False):
"""Run a NN with a single layer on some data.
Returns the predicted values as well as the confidences.
"""
n_labels = np.max(y) + 1
n_features = X.shape[1]
x = tf.placeholder(tf.float32, [None, n_features])
y_ = tf.placeholder(tf.float32, [None, n_labels])
def simple_NN(input_placeholder, n_layers):
W_in = weight_variable([n_features, hidden_units])
b_in = bias_variable([hidden_units])
W_mid = [
weight_variable([hidden_units, hidden_units])
for i in range(n_layers - 1)
]
b_mid = [bias_variable([hidden_units]) for i in range(n_layers - 1)]
W_out = weight_variable([hidden_units, n_labels])
b_out = bias_variable([n_labels])
layers = [tf.nn.relu(tf.matmul(input_placeholder, W_in) + b_in)]
for i in range(n_layers - 1):
layer = tf.nn.relu(tf.matmul(layers[-1], W_mid[i]) + b_mid[i])
layers.append(layer)
logits = tf.matmul(layers[-1], W_out) + b_out
return logits
NN_logits = simple_NN(x, n_layers)
cross_entropy = tf.reduce_mean(
tf.nn.softmax_cross_entropy_with_logits(labels=y_, logits=NN_logits))
train_step = tf.train.AdamOptimizer(learning_rate).minimize(cross_entropy)
correct_prediction = tf.equal(tf.argmax(NN_logits, 1), tf.argmax(y_, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
def one_hot(ns):
return np.eye(n_labels)[ns]
y_onehot = one_hot(y)
y_test_onehot = one_hot(y_test)
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
for i in range(num_iter):
ns = np.random.randint(0, len(X), size=batch_size)
if (i + 1) % display_steps == 0:
train_accuracy = accuracy.eval(feed_dict={x: X, y_: y_onehot})
test_accuracy = accuracy.eval(feed_dict={x: X_test, y_: y_test_onehot})
print("step %d, training accuracy %g, test accuracy %g" %
(i + 1, train_accuracy, test_accuracy))
train_step.run(feed_dict={x: X[ns, :], y_: y_onehot[ns, :]})
testing_logits = NN_logits.eval(feed_dict={x: X_test})
testing_prediction = tf.argmax(NN_logits, 1).eval(feed_dict={x: X_test})
NN_softmax = tf.nn.softmax(NN_logits).eval(feed_dict={x: X_test})
testing_confidence_raw = tf.reduce_max(NN_softmax,
1).eval(feed_dict={x: X_test})
if not get_training:
return testing_prediction, testing_confidence_raw
training_prediction = tf.argmax(NN_logits, 1).eval(feed_dict={x: X})
NN_softmax = tf.nn.softmax(NN_logits).eval(feed_dict={x: X})
training_confidence_raw = tf.reduce_max(NN_softmax,
1).eval(feed_dict={x: X})
return testing_prediction, testing_confidence_raw, training_prediction, training_confidence_raw
def plot_precision_curve(
extra_plot_title,
percentile_levels,
signal_names,
final_TPs,
final_stderrs,
final_misclassification,
model_name="Model",
colors=["blue", "darkorange", "brown", "red", "purple"],
legend_loc=None,
figure_size=None,
ylim=None):
if figure_size is not None:
plt.figure(figsize=figure_size)
title = "Precision Curve" if extra_plot_title == "" else extra_plot_title
plt.title(title, fontsize=20)
colors = colors + list(cm.rainbow(np.linspace(0, 1, len(final_TPs))))
plt.xlabel("Percentile level", fontsize=18)
plt.ylabel("Precision", fontsize=18)
for i, signal_name in enumerate(signal_names):
ls = "--" if ("Model" in signal_name) else "-"
plt.plot(
percentile_levels, final_TPs[i], ls, c=colors[i], label=signal_name)
plt.fill_between(
percentile_levels,
final_TPs[i] - final_stderrs[i],
final_TPs[i] + final_stderrs[i],
color=colors[i],
alpha=0.1)
if legend_loc is None:
if 0. in percentile_levels:
plt.legend(loc="lower right", fontsize=14)
else:
plt.legend(loc="upper left", fontsize=14)
else:
if legend_loc == "outside":
plt.legend(bbox_to_anchor=(1.04, 1), loc="upper left", fontsize=14)
else:
plt.legend(loc=legend_loc, fontsize=14)
if ylim is not None:
plt.ylim(*ylim)
model_acc = 100 * (1 - final_misclassification)
plt.axvline(x=model_acc, linestyle="dotted", color="black")
plt.show()
def run_precision_recall_experiment_general(X,
y,
n_repeats,
percentile_levels,
trainer,
test_size=0.5,
extra_plot_title="",
signals=[],
signal_names=[],
predict_when_correct=False,
skip_print=False):
def get_stderr(L):
return np.std(L) / np.sqrt(len(L))
all_signal_names = ["Model Confidence"] + signal_names
all_TPs = [[[] for p in percentile_levels] for signal in all_signal_names]
misclassifications = []
sign = 1 if predict_when_correct else -1
sss = StratifiedShuffleSplit(
n_splits=n_repeats, test_size=test_size, random_state=0)
for train_idx, test_idx in sss.split(X, y):
X_train = X[train_idx, :]
y_train = y[train_idx]
X_test = X[test_idx, :]
y_test = y[test_idx]
testing_prediction, testing_confidence_raw = trainer(
X_train, y_train, X_test, y_test)
target_points = np.where(
testing_prediction == y_test)[0] if predict_when_correct else np.where(
testing_prediction != y_test)[0]
final_signals = [testing_confidence_raw]
for signal in signals:
signal.fit(X_train, y_train)
final_signals.append(signal.get_score(X_test, testing_prediction))
for p, percentile_level in enumerate(percentile_levels):
all_high_confidence_points = [
np.where(sign * signal >= np.percentile(sign *
signal, percentile_level))[0]
for signal in final_signals
]
if 0 in map(len, all_high_confidence_points):
continue
TP = [
len(np.intersect1d(high_confidence_points, target_points)) /
(1. * len(high_confidence_points))
for high_confidence_points in all_high_confidence_points
]
for i in range(len(all_signal_names)):
all_TPs[i][p].append(TP[i])
misclassifications.append(len(target_points) / (1. * len(X_test)))
final_TPs = [[] for signal in all_signal_names]
final_stderrs = [[] for signal in all_signal_names]
for p, percentile_level in enumerate(percentile_levels):
for i in range(len(all_signal_names)):
final_TPs[i].append(np.mean(all_TPs[i][p]))
final_stderrs[i].append(get_stderr(all_TPs[i][p]))
if not skip_print:
print("Precision at percentile", percentile_level)
ss = ""
for i, signal_name in enumerate(all_signal_names):
ss += (signal_name + (": %.4f " % final_TPs[i][p]))
print(ss)
print()
final_misclassification = np.mean(misclassifications)
if not skip_print:
print("Misclassification rate mean/std", np.mean(misclassifications),
get_stderr(misclassifications))
for i in range(len(all_signal_names)):
final_TPs[i] = np.array(final_TPs[i])
final_stderrs[i] = np.array(final_stderrs[i])
plot_precision_curve(extra_plot_title, percentile_levels, all_signal_names,
final_TPs, final_stderrs, final_misclassification)
return (all_signal_names, final_TPs, final_stderrs, final_misclassification)

View File

@ -23,7 +23,7 @@ pytest-mock = "^3.11.1"
pytest-cov = "^4.1.0"
[tool.pytest.ini_options]
addopts = "--cov=quacc"
addopts = "--cov=quacc --capture=tee-sys"
[build-system]
requires = ["poetry-core"]

View File

@ -10,7 +10,7 @@ from garg22_ATC.ATC_helper import (
get_max_conf,
)
import numpy as np
from jiang18_trustscore.trustscore import TrustScore
def kfcv(c_model: BaseEstimator, validation: LabelledCollection) -> Dict:
@ -43,10 +43,11 @@ def ATC_MC(
ATC_accuracy = get_ATC_acc(ATC_thres, test_scores)
return {
"true_acc": 100*np.mean(np.argmax(test_probs, axis=-1) == test.y),
"pred_acc": ATC_accuracy
"true_acc": 100 * np.mean(np.argmax(test_probs, axis=-1) == test.y),
"pred_acc": ATC_accuracy,
}
def ATC_NE(
c_model: BaseEstimator,
validation: LabelledCollection,
@ -71,7 +72,23 @@ def ATC_NE(
ATC_accuracy = get_ATC_acc(ATC_thres, test_scores)
return {
"true_acc": 100*np.mean(np.argmax(test_probs, axis=-1) == test.y),
"pred_acc": ATC_accuracy
"true_acc": 100 * np.mean(np.argmax(test_probs, axis=-1) == test.y),
"pred_acc": ATC_accuracy,
}
def trust_score(
c_model: BaseEstimator,
validation: LabelledCollection,
test: LabelledCollection,
predict_method="predict",
):
c_model_predict = getattr(c_model, predict_method)
test_pred = c_model_predict(test.X)
trust_model = TrustScore()
trust_model.fit(validation.X, validation.y)
return trust_model.get_score(test.X, test_pred)

View File

@ -99,4 +99,4 @@ def estimate_binary():
if __name__ == "__main__":
estimate_multiclass()
estimate_binary()

View File

@ -1,12 +1,20 @@
from sklearn.linear_model import LogisticRegression
from quacc.baseline import kfcv
from quacc.baseline import kfcv, trust_score
from quacc.dataset import get_spambase
class TestBaseline:
def test_kfcv(self):
train, _, _ = get_spambase()
train, validation, _ = get_spambase()
c_model = LogisticRegression()
assert "f1_score" in kfcv(c_model, train)
c_model.fit(train.X, train.y)
assert "f1_score" in kfcv(c_model, validation)
def test_trust_score(self):
train, validation, test = get_spambase()
c_model = LogisticRegression()
c_model.fit(train.X, train.y)
trustscore = trust_score(c_model, train, test)
assert len(trustscore) == len(test.y)