baselines code updated

This commit is contained in:
Lorenzo Volpi 2023-10-27 12:35:25 +02:00
parent 6a5a7a0153
commit 232a670305
24 changed files with 898 additions and 427 deletions

9
.vscode/launch.json vendored
View File

@ -4,6 +4,7 @@
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0", "version": "0.2.0",
"configurations": [ "configurations": [
{ {
"name": "main", "name": "main",
"type": "python", "type": "python",
@ -11,6 +12,14 @@
"program": "C:\\Users\\Lorenzo Volpi\\source\\tesi\\quacc\\main.py", "program": "C:\\Users\\Lorenzo Volpi\\source\\tesi\\quacc\\main.py",
"console": "integratedTerminal", "console": "integratedTerminal",
"justMyCode": true "justMyCode": true
},
{
"name": "models",
"type": "python",
"request": "launch",
"program": "C:\\Users\\Lorenzo Volpi\\source\\tesi\\baselines\\models.py",
"console": "integratedTerminal",
"justMyCode": true
} }
] ]
} }

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,277 @@
"""
Relative Unconstrained Least-Squares Fitting (RuLSIF): A Python Implementation
References:
'Change-point detection in time-series data by relative density-ratio estimation'
Song Liu, Makoto Yamada, Nigel Collier and Masashi Sugiyama,
Neural Networks 43 (2013) 72-83.
'A Least-squares Approach to Direct Importance Estimation'
Takafumi Kanamori, Shohei Hido, and Masashi Sugiyama,
Journal of Machine Learning Research 10 (2009) 1391-1445.
"""
from warnings import warn
from numpy import (
array,
asarray,
asmatrix,
diag,
diagflat,
empty,
exp,
inf,
log,
matrix,
multiply,
ones,
power,
sum,
)
from numpy.linalg import solve
from numpy.random import randint
from .density_ratio import DensityRatio, KernelInfo
from .helpers import guvectorize_compute, np_float, to_ndarray
def RuLSIF(x, y, alpha, sigma_range, lambda_range, kernel_num=100, verbose=True):
"""
Estimation of the alpha-Relative Density Ratio p(x)/p_alpha(x) by RuLSIF
(Relative Unconstrained Least-Square Importance Fitting)
p_alpha(x) = alpha * p(x) + (1 - alpha) * q(x)
Arguments:
x (numpy.matrix): Sample from p(x).
y (numpy.matrix): Sample from q(x).
alpha (float): Mixture parameter.
sigma_range (list<float>): Search range of Gaussian kernel bandwidth.
lambda_range (list<float>): Search range of regularization parameter.
kernel_num (int): Number of kernels. (Default 100)
verbose (bool): Indicator to print messages (Default True)
Returns:
densratio.DensityRatio object which has `compute_density_ratio()`.
"""
# Number of samples.
nx = x.shape[0]
ny = y.shape[0]
# Number of kernel functions.
kernel_num = min(kernel_num, nx)
# Randomly take a subset of x, to identify centers for the kernels.
centers = x[randint(nx, size=kernel_num)]
if verbose:
print("RuLSIF starting...")
if len(sigma_range) == 1 and len(lambda_range) == 1:
sigma = sigma_range[0]
lambda_ = lambda_range[0]
else:
if verbose:
print("Searching for the optimal sigma and lambda...")
# Grid-search cross-validation for optimal kernel and regularization parameters.
opt_params = search_sigma_and_lambda(
x, y, alpha, centers, sigma_range, lambda_range, verbose
)
sigma = opt_params["sigma"]
lambda_ = opt_params["lambda"]
if verbose:
print(
"Found optimal sigma = {:.3f}, lambda = {:.3f}.".format(sigma, lambda_)
)
if verbose:
print("Optimizing theta...")
phi_x = compute_kernel_Gaussian(x, centers, sigma)
phi_y = compute_kernel_Gaussian(y, centers, sigma)
H = alpha * (phi_x.T.dot(phi_x) / nx) + (1 - alpha) * (phi_y.T.dot(phi_y) / ny)
h = phi_x.mean(axis=0).T
theta = asarray(solve(H + diag(array(lambda_).repeat(kernel_num)), h)).ravel()
# No negative coefficients.
theta[theta < 0] = 0
# Compute the alpha-relative density ratio, at the given coordinates.
def alpha_density_ratio(coordinates):
# Evaluate the kernel at these coordinates, and take the dot-product with the weights.
coordinates = to_ndarray(coordinates)
phi_x = compute_kernel_Gaussian(coordinates, centers, sigma)
alpha_density_ratio = phi_x @ theta
return alpha_density_ratio
# Compute the approximate alpha-relative PE-divergence, given samples x and y from the respective distributions.
def alpha_PE_divergence(x, y):
# This is Y, in Reference 1.
x = to_ndarray(x)
# Obtain alpha-relative density ratio at these points.
g_x = alpha_density_ratio(x)
# This is Y', in Reference 1.
y = to_ndarray(y)
# Obtain alpha-relative density ratio at these points.
g_y = alpha_density_ratio(y)
# Compute the alpha-relative PE-divergence as given in Reference 1.
n = x.shape[0]
divergence = (
-alpha * (g_x @ g_x) / 2 - (1 - alpha) * (g_y @ g_y) / 2 + g_x.sum(axis=0)
) / n - 1.0 / 2
return divergence
# Compute the approximate alpha-relative KL-divergence, given samples x and y from the respective distributions.
def alpha_KL_divergence(x, y):
# This is Y, in Reference 1.
x = to_ndarray(x)
# Obtain alpha-relative density ratio at these points.
g_x = alpha_density_ratio(x)
# Compute the alpha-relative KL-divergence.
n = x.shape[0]
divergence = log(g_x).sum(axis=0) / n
return divergence
alpha_PE = alpha_PE_divergence(x, y)
alpha_KL = alpha_KL_divergence(x, y)
if verbose:
print("Approximate alpha-relative PE-divergence = {:03.2f}".format(alpha_PE))
print("Approximate alpha-relative KL-divergence = {:03.2f}".format(alpha_KL))
kernel_info = KernelInfo(
kernel_type="Gaussian", kernel_num=kernel_num, sigma=sigma, centers=centers
)
result = DensityRatio(
method="RuLSIF",
alpha=alpha,
theta=theta,
lambda_=lambda_,
alpha_PE=alpha_PE,
alpha_KL=alpha_KL,
kernel_info=kernel_info,
compute_density_ratio=alpha_density_ratio,
)
if verbose:
print("RuLSIF completed.")
return result
# Grid-search cross-validation for the optimal parameters sigma and lambda by leave-one-out cross-validation. See Reference 2.
def search_sigma_and_lambda(x, y, alpha, centers, sigma_range, lambda_range, verbose):
nx = x.shape[0]
ny = y.shape[0]
n_min = min(nx, ny)
kernel_num = centers.shape[0]
score_new = inf
sigma_new = 0
lambda_new = 0
for sigma in sigma_range:
phi_x = compute_kernel_Gaussian(x, centers, sigma) # (nx, kernel_num)
phi_y = compute_kernel_Gaussian(y, centers, sigma) # (ny, kernel_num)
H = alpha * (phi_x.T @ phi_x / nx) + (1 - alpha) * (
phi_y.T @ phi_y / ny
) # (kernel_num, kernel_num)
h = phi_x.mean(axis=0).reshape(-1, 1) # (kernel_num, 1)
phi_x = phi_x[:n_min].T # (kernel_num, n_min)
phi_y = phi_y[:n_min].T # (kernel_num, n_min)
for lambda_ in lambda_range:
B = H + diag(
array(lambda_ * (ny - 1) / ny).repeat(kernel_num)
) # (kernel_num, kernel_num)
B_inv_X = solve(B, phi_y) # (kernel_num, n_min)
X_B_inv_X = multiply(phi_y, B_inv_X) # (kernel_num, n_min)
denom = ny * ones(n_min) - ones(kernel_num) @ X_B_inv_X # (n_min, )
B0 = solve(B, h @ ones((1, n_min))) + B_inv_X @ diagflat(
h.T @ B_inv_X / denom
) # (kernel_num, n_min)
B1 = solve(B, phi_x) + B_inv_X @ diagflat(
ones(kernel_num) @ multiply(phi_x, B_inv_X)
) # (kernel_num, n_min)
B2 = (ny - 1) * (nx * B0 - B1) / (ny * (nx - 1)) # (kernel_num, n_min)
B2[B2 < 0] = 0
r_y = multiply(phi_y, B2).sum(axis=0).T # (n_min, )
r_x = multiply(phi_x, B2).sum(axis=0).T # (n_min, )
# Squared loss of RuLSIF, without regularization term.
# Directly related to the negative of the PE-divergence.
score = (r_y @ r_y / 2 - r_x.sum(axis=0)) / n_min
if verbose:
print(
"sigma = %.5f, lambda = %.5f, score = %.5f"
% (sigma, lambda_, score)
)
if score < score_new:
score_new = score
sigma_new = sigma
lambda_new = lambda_
return {"sigma": sigma_new, "lambda": lambda_new}
def _compute_kernel_Gaussian(x_list, y_row, neg_gamma, res) -> None:
sq_norm = sum(power(x_list - y_row, 2), 1)
multiply(neg_gamma, sq_norm, res)
exp(res, res)
def _target_numpy_wrapper(x_list, y_list, neg_gamma):
res = empty((y_list.shape[0], x_list.shape[0]), np_float)
if isinstance(x_list, matrix) or isinstance(y_list, matrix):
res = asmatrix(res)
for j, y_row in enumerate(y_list):
# `.T` aligns shapes for matrices, does nothing for 1D ndarray.
_compute_kernel_Gaussian(x_list, y_row, neg_gamma, res[j].T)
return res
_compute_functions = {"numpy": _target_numpy_wrapper}
if guvectorize_compute:
_compute_functions.update(
{
key: guvectorize_compute(key)(_compute_kernel_Gaussian)
for key in ("cpu", "parallel")
}
)
_compute_function = _compute_functions[
"cpu" if "cpu" in _compute_functions else "numpy"
]
# Returns a 2D numpy matrix of kernel evaluated at the gridpoints with coordinates from x_list and y_list.
def compute_kernel_Gaussian(x_list, y_list, sigma):
return _compute_function(x_list, y_list, -0.5 * sigma**-2).T
def set_compute_kernel_target(target: str) -> None:
global _compute_function
if target not in ("numpy", "cpu", "parallel"):
raise ValueError(
"'target' must be one of the following: 'numpy', 'cpu', or 'parallel'."
)
if target not in _compute_functions:
warn("'numba' not available; defaulting to 'numpy'.", ImportWarning)
target = "numpy"
_compute_function = _compute_functions[target]

View File

@ -0,0 +1,7 @@
from warnings import filterwarnings
from .core import densratio
from .RuLSIF import set_compute_kernel_target
filterwarnings("default", message="'numba'", category=ImportWarning, module="densratio")
__all__ = ["densratio", "set_compute_kernel_target"]

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,70 @@
"""
densratio.core
~~~~~~~~~~~~~~
Estimate Density Ratio p(x)/q(y)
"""
from numpy import linspace
from .helpers import to_ndarray
from .RuLSIF import RuLSIF
def densratio(
x, y, alpha=0, sigma_range="auto", lambda_range="auto", kernel_num=100, verbose=True
):
"""Estimate alpha-mixture Density Ratio p(x)/(alpha*p(x) + (1 - alpha)*q(x))
Arguments:
x: sample from p(x).
y: sample from q(x).
alpha: Default 0 - corresponds to ordinary density ratio.
sigma_range: search range of Gaussian kernel bandwidth.
Default "auto" means 10^-3, 10^-2, ..., 10^9.
lambda_range: search range of regularization parameter for uLSIF.
Default "auto" means 10^-3, 10^-2, ..., 10^9.
kernel_num: number of kernels. Default 100.
verbose: indicator to print messages. Default True.
Returns:
densratio.DensityRatio object which has `compute_density_ratio()`.
Raises:
ValueError: if dimension of x != dimension of y
Usage::
>>> from scipy.stats import norm
>>> from densratio import densratio
>>> x = norm.rvs(size=200, loc=1, scale=1./8)
>>> y = norm.rvs(size=200, loc=1, scale=1./2)
>>> result = densratio(x, y, alpha=0.7)
>>> print(result)
>>> density_ratio = result.compute_density_ratio(y)
>>> print(density_ratio)
"""
x = to_ndarray(x)
y = to_ndarray(y)
if x.shape[1] != y.shape[1]:
raise ValueError("x and y must be same dimensions.")
if isinstance(sigma_range, str) and sigma_range != "auto":
raise TypeError("Invalid value for sigma_range.")
if isinstance(lambda_range, str) and lambda_range != "auto":
raise TypeError("Invalid value for lambda_range.")
if sigma_range is None or (isinstance(sigma_range, str) and sigma_range == "auto"):
sigma_range = 10 ** linspace(-3, 9, 13)
if lambda_range is None or (
isinstance(lambda_range, str) and lambda_range == "auto"
):
lambda_range = 10 ** linspace(-3, 9, 13)
result = RuLSIF(x, y, alpha, sigma_range, lambda_range, kernel_num, verbose)
return result

View File

@ -0,0 +1,88 @@
from pprint import pformat
from re import sub
class DensityRatio:
"""Density Ratio."""
def __init__(
self,
method,
alpha,
theta,
lambda_,
alpha_PE,
alpha_KL,
kernel_info,
compute_density_ratio,
):
self.method = method
self.alpha = alpha
self.theta = theta
self.lambda_ = lambda_
self.alpha_PE = alpha_PE
self.alpha_KL = alpha_KL
self.kernel_info = kernel_info
self.compute_density_ratio = compute_density_ratio
def __str__(self):
return """
Method: %(method)s
Alpha: %(alpha)s
Kernel Information:
%(kernel_info)s
Kernel Weights (theta):
%(theta)s
Regularization Parameter (lambda): %(lambda_)s
Alpha-Relative PE-Divergence: %(alpha_PE)s
Alpha-Relative KL-Divergence: %(alpha_KL)s
Function to Estimate Density Ratio:
compute_density_ratio(x)
"""[
1:-1
] % dict(
method=self.method,
kernel_info=self.kernel_info,
alpha=self.alpha,
theta=my_format(self.theta),
lambda_=self.lambda_,
alpha_PE=self.alpha_PE,
alpha_KL=self.alpha_KL,
)
class KernelInfo:
"""Kernel Information."""
def __init__(self, kernel_type, kernel_num, sigma, centers):
self.kernel_type = kernel_type
self.kernel_num = kernel_num
self.sigma = sigma
self.centers = centers
def __str__(self):
return """
Kernel type: %(kernel_type)s
Number of kernels: %(kernel_num)s
Bandwidth(sigma): %(sigma)s
Centers: %(centers)s
"""[
1:-1
] % dict(
kernel_type=self.kernel_type,
kernel_num=self.kernel_num,
sigma=self.sigma,
centers=my_format(self.centers),
)
def my_format(str):
return sub(r"\s+", " ", (pformat(str).split("\n")[0] + ".."))

View File

@ -0,0 +1,36 @@
from numpy import array, ndarray, result_type
np_float = result_type(float)
try:
import numba as nb
except ModuleNotFoundError:
guvectorize_compute = None
else:
_nb_float = nb.from_dtype(np_float)
def guvectorize_compute(target: str, *, cache: bool = True):
return nb.guvectorize(
[nb.void(_nb_float[:, :], _nb_float[:], _nb_float, _nb_float[:])],
"(m, p),(p),()->(m)",
nopython=True,
target=target,
cache=cache,
)
def is_numeric(x):
return isinstance(x, int) or isinstance(x, float)
def to_ndarray(x):
if isinstance(x, ndarray):
if len(x.shape) == 1:
return x.reshape(-1, 1)
else:
return x
elif str(type(x)) == "<class 'pandas.core.frame.DataFrame'>":
return x.values
elif not x:
raise ValueError("Cannot transform to numpy.matrix.")
else:
return to_ndarray(array(x))

52
baselines/impweight.py Normal file
View File

@ -0,0 +1,52 @@
import numpy as np
from scipy.sparse import issparse, vstack
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import GridSearchCV
from sklearn.neighbors import KernelDensity
def logreg(Xtr, ytr, Xte):
# check "Direct Density Ratio Estimation for
# Large-scale Covariate Shift Adaptation", Eq.28
if issparse(Xtr):
X = vstack([Xtr, Xte])
else:
X = np.concatenate([Xtr, Xte])
y = [0] * Xtr.shape[0] + [1] * Xte.shape[0]
logreg = GridSearchCV(
LogisticRegression(),
param_grid={"C": np.logspace(-3, 3, 7), "class_weight": ["balanced", None]},
n_jobs=-1,
)
logreg.fit(X, y)
probs = logreg.predict_proba(Xtr)
prob_train, prob_test = probs[:, 0], probs[:, 1]
prior_train = Xtr.shape[0]
prior_test = Xte.shape[0]
w = (prior_train / prior_test) * (prob_test / prob_train)
return w
kdex2_params = {"bandwidth": np.logspace(-1, 1, 20)}
def kdex2_lltr(Xtr):
if issparse(Xtr):
Xtr = Xtr.toarray()
return GridSearchCV(KernelDensity(), kdex2_params).fit(Xtr).score_samples(Xtr)
def kdex2_weights(Xtr, Xte, log_likelihood_tr):
log_likelihood_te = (
GridSearchCV(KernelDensity(), kdex2_params).fit(Xte).score_samples(Xtr)
)
likelihood_tr = np.exp(log_likelihood_tr)
likelihood_te = np.exp(log_likelihood_te)
return likelihood_te / likelihood_tr
def get_acc(tr_preds, ytr, w):
return np.sum((1.0 * (tr_preds == ytr)) * w) / np.sum(w)

140
baselines/models.py Normal file
View File

@ -0,0 +1,140 @@
# import itertools
# from typing import Iterable
# import quapy as qp
# import quapy.functional as F
# from densratio import densratio
# from quapy.method.aggregative import *
# from quapy.protocol import (
# AbstractStochasticSeededProtocol,
# OnLabelledCollectionProtocol,
# )
# from scipy.sparse import issparse, vstack
# from scipy.spatial.distance import cdist
# from scipy.stats import multivariate_normal
# from sklearn.linear_model import LogisticRegression
# from sklearn.model_selection import GridSearchCV
# from sklearn.neighbors import KernelDensity
import time
import numpy as np
import sklearn.metrics as metrics
from pykliep import DensityRatioEstimator
from quapy.protocol import APP
from scipy.sparse import issparse, vstack
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import GridSearchCV
from sklearn.neighbors import KernelDensity
import baselines.impweight as iw
from baselines.densratio import densratio
from quacc.dataset import Dataset
# ---------------------------------------------------------------------------------------
# Methods of "importance weight", e.g., by ratio density estimation (KLIEP, SILF, LogReg)
# ---------------------------------------------------------------------------------------
class ImportanceWeight:
def weights(self, Xtr, ytr, Xte):
...
class KLIEP(ImportanceWeight):
def __init__(self):
pass
def weights(self, Xtr, ytr, Xte):
kliep = DensityRatioEstimator()
kliep.fit(Xtr, Xte)
return kliep.predict(Xtr)
class USILF(ImportanceWeight):
def __init__(self, alpha=0.0):
self.alpha = alpha
def weights(self, Xtr, ytr, Xte):
dense_ratio_obj = densratio(Xtr, Xte, alpha=self.alpha, verbose=False)
return dense_ratio_obj.compute_density_ratio(Xtr)
class LogReg(ImportanceWeight):
def __init__(self):
pass
def weights(self, Xtr, ytr, Xte):
# check "Direct Density Ratio Estimation for
# Large-scale Covariate Shift Adaptation", Eq.28
if issparse(Xtr):
X = vstack([Xtr, Xte])
else:
X = np.concatenate([Xtr, Xte])
y = [0] * Xtr.shape[0] + [1] * Xte.shape[0]
logreg = GridSearchCV(
LogisticRegression(),
param_grid={"C": np.logspace(-3, 3, 7), "class_weight": ["balanced", None]},
n_jobs=-1,
)
logreg.fit(X, y)
probs = logreg.predict_proba(Xtr)
prob_train, prob_test = probs[:, 0], probs[:, 1]
prior_train = Xtr.shape[0]
prior_test = Xte.shape[0]
w = (prior_train / prior_test) * (prob_test / prob_train)
return w
class KDEx2(ImportanceWeight):
def __init__(self):
pass
def weights(self, Xtr, ytr, Xte):
params = {"bandwidth": np.logspace(-1, 1, 20)}
log_likelihood_tr = (
GridSearchCV(KernelDensity(), params).fit(Xtr).score_samples(Xtr)
)
log_likelihood_te = (
GridSearchCV(KernelDensity(), params).fit(Xte).score_samples(Xtr)
)
likelihood_tr = np.exp(log_likelihood_tr)
likelihood_te = np.exp(log_likelihood_te)
return likelihood_te / likelihood_tr
if __name__ == "__main__":
# d = Dataset("rcv1", target="CCAT").get_raw()
d = Dataset("imdb", n_prevalences=1).get()[0]
tstart = time.time()
lr = LogisticRegression()
lr.fit(*d.train.Xy)
val_preds = lr.predict(d.validation.X)
protocol = APP(
d.test,
n_prevalences=21,
repeats=1,
sample_size=100,
return_type="labelled_collection",
)
results = []
for sample in protocol():
wx = iw.logreg(d.validation.X, d.validation.y, sample.X)
test_preds = lr.predict(sample.X)
estim_acc = np.sum((1.0 * (val_preds == d.validation.y)) * wx) / np.sum(wx)
true_acc = metrics.accuracy_score(sample.y, test_preds)
results.append((sample.prevalence(), estim_acc, true_acc))
tend = time.time()
for r in results:
print(*r)
print(f"logreg finished [took {tend-tstart:.3f}s]")
import win11toast
win11toast.notify("models.py", "Completed")

219
baselines/pykliep.py Normal file
View File

@ -0,0 +1,219 @@
import warnings
import numpy as np
from scipy.sparse import csr_matrix
class DensityRatioEstimator:
"""
Class to accomplish direct density estimation implementing the original KLIEP
algorithm from Direct Importance Estimation with Model Selection
and Its Application to Covariate Shift Adaptation by Sugiyama et al.
The training set is distributed via
train ~ p(x)
and the test set is distributed via
test ~ q(x).
The KLIEP algorithm and its variants approximate w(x) = q(x) / p(x) directly. The predict function returns the
estimate of w(x). The function w(x) can serve as sample weights for the training set during
training to modify the expectation function that the model's loss function is optimized via,
i.e.
E_{x ~ w(x)p(x)} loss(x) = E_{x ~ q(x)} loss(x).
Usage :
The fit method is used to run the KLIEP algorithm using LCV and returns value of J
trained on the entire training/test set with the best sigma found.
Use the predict method on the training set to determine the sample weights from the KLIEP algorithm.
"""
def __init__(
self,
max_iter=5000,
num_params=[0.1, 0.2],
epsilon=1e-4,
cv=3,
sigmas=[0.01, 0.1, 0.25, 0.5, 0.75, 1],
random_state=None,
verbose=0,
):
"""
Direct density estimation using an inner LCV loop to estimate the proper model. Can be used with sklearn
cross validation methods with or without storing the inner CV. To use a standard grid search.
max_iter : Number of iterations to perform
num_params : List of number of test set vectors used to construct the approximation for inner LCV.
Must be a float. Original paper used 10%, i.e. =.1
sigmas : List of sigmas to be used in inner LCV loop.
epsilon : Additive factor in the iterative algorithm for numerical stability.
"""
self.max_iter = max_iter
self.num_params = num_params
self.epsilon = epsilon
self.verbose = verbose
self.sigmas = sigmas
self.cv = cv
self.random_state = 0
def fit(self, X_train, X_test, alpha_0=None):
"""Uses cross validation to select sigma as in the original paper (LCV).
In a break from sklearn convention, y=X_test.
The parameter cv corresponds to R in the original paper.
Once found, the best sigma is used to train on the full set."""
# LCV loop, shuffle a copy in place for performance.
cv = self.cv
chunk = int(X_test.shape[0] / float(cv))
if self.random_state is not None:
np.random.seed(self.random_state)
# if isinstance(X_test, csr_matrix):
# X_test_shuffled = X_test.toarray()
# else:
# X_test_shuffled = X_test.copy()
X_test_shuffled = X_test.copy()
np.random.shuffle(X_test_shuffled)
j_scores = {}
if type(self.sigmas) != list:
self.sigmas = [self.sigmas]
if type(self.num_params) != list:
self.num_params = [self.num_params]
if len(self.sigmas) * len(self.num_params) > 1:
# Inner LCV loop
for num_param in self.num_params:
for sigma in self.sigmas:
j_scores[(num_param, sigma)] = np.zeros(cv)
for k in range(1, cv + 1):
if self.verbose > 0:
print("Training: sigma: %s R: %s" % (sigma, k))
X_test_fold = X_test_shuffled[(k - 1) * chunk : k * chunk, :]
j_scores[(num_param, sigma)][k - 1] = self._fit(
X_train=X_train,
X_test=X_test_fold,
num_parameters=num_param,
sigma=sigma,
)
j_scores[(num_param, sigma)] = np.mean(j_scores[(num_param, sigma)])
sorted_scores = sorted(
[x for x in j_scores.items() if np.isfinite(x[1])],
key=lambda x: x[1],
reverse=True,
)
if len(sorted_scores) == 0:
warnings.warn("LCV failed to converge for all values of sigma.")
return self
self._sigma = sorted_scores[0][0][1]
self._num_parameters = sorted_scores[0][0][0]
self._j_scores = sorted_scores
else:
self._sigma = self.sigmas[0]
self._num_parameters = self.num_params[0]
# best sigma
self._j = self._fit(
X_train=X_train,
X_test=X_test_shuffled,
num_parameters=self._num_parameters,
sigma=self._sigma,
)
return self # Compatibility with sklearn
def _fit(self, X_train, X_test, num_parameters, sigma, alpha_0=None):
"""Fits the estimator with the given parameters w-hat and returns J"""
num_parameters = num_parameters
if type(num_parameters) == float:
num_parameters = int(X_test.shape[0] * num_parameters)
self._select_param_vectors(
X_test=X_test, sigma=sigma, num_parameters=num_parameters
)
# if isinstance(X_train, csr_matrix):
# X_train = X_train.toarray()
X_train = self._reshape_X(X_train)
X_test = self._reshape_X(X_test)
if alpha_0 is None:
alpha_0 = np.ones(shape=(num_parameters, 1)) / float(num_parameters)
self._find_alpha(
X_train=X_train,
X_test=X_test,
num_parameters=num_parameters,
epsilon=self.epsilon,
alpha_0=alpha_0,
sigma=sigma,
)
return self._calculate_j(X_test, sigma=sigma)
def _calculate_j(self, X_test, sigma):
pred = self.predict(X_test, sigma=sigma) + 0.0000001
log = np.log(pred).sum()
return log / (X_test.shape[0])
def score(self, X_test):
"""Return the J score, similar to sklearn's API"""
return self._calculate_j(X_test=X_test, sigma=self._sigma)
@staticmethod
def _reshape_X(X):
"""Reshape input from mxn to mx1xn to take advantage of numpy broadcasting."""
if len(X.shape) != 3:
return X.reshape((X.shape[0], 1, X.shape[1]))
return X
def _select_param_vectors(self, X_test, sigma, num_parameters):
"""X_test is the test set. b is the number of parameters."""
indices = np.random.choice(X_test.shape[0], size=num_parameters, replace=False)
self._test_vectors = X_test[indices, :].copy()
self._phi_fitted = True
def _phi(self, X, sigma=None):
if sigma is None:
sigma = self._sigma
if self._phi_fitted:
return np.exp(
-np.sum((X - self._test_vectors) ** 2, axis=-1) / (2 * sigma**2)
)
raise Exception("Phi not fitted.")
def _find_alpha(self, alpha_0, X_train, X_test, num_parameters, sigma, epsilon):
A = np.zeros(shape=(X_test.shape[0], num_parameters))
b = np.zeros(shape=(num_parameters, 1))
A = self._phi(X_test, sigma)
b = self._phi(X_train, sigma).sum(axis=0) / X_train.shape[0]
b = b.reshape((num_parameters, 1))
out = alpha_0.copy()
for k in range(self.max_iter):
mat = np.dot(A, out)
mat += 0.000000001
out += epsilon * np.dot(np.transpose(A), 1.0 / mat)
out += b * (
((1 - np.dot(np.transpose(b), out)) / np.dot(np.transpose(b), b))
)
out = np.maximum(0, out)
out /= np.dot(np.transpose(b), out)
self._alpha = out
self._fitted = True
def predict(self, X, sigma=None):
"""Equivalent of w(X) from the original paper."""
X = self._reshape_X(X)
if not self._fitted:
raise Exception("Not fitted!")
return np.dot(self._phi(X, sigma=sigma), self._alpha).reshape((X.shape[0],))

View File

@ -1,141 +0,0 @@
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
from sklearn.neighbors import KDTree, KNeighborsClassifier
class TrustScore:
"""
Trust Score: a measure of classifier uncertainty based on nearest neighbors.
"""
def __init__(self, k=10, alpha=0.0, filtering="none", min_dist=1e-12):
"""
k and alpha are the tuning parameters for the filtering,
filtering: method of filtering. option are "none", "density",
"uncertainty"
min_dist: some small number to mitigate possible division by 0.
"""
self.k = k
self.filtering = filtering
self.alpha = alpha
self.min_dist = min_dist
def filter_by_density(self, X: np.array):
"""Filter out points with low kNN density.
Args:
X: an array of sample points.
Returns:
A subset of the array without points in the bottom alpha-fraction of
original points of kNN density.
"""
kdtree = KDTree(X)
knn_radii = kdtree.query(X, k=self.k)[0][:, -1]
eps = np.percentile(knn_radii, (1 - self.alpha) * 100)
return X[np.where(knn_radii <= eps)[0], :]
def filter_by_uncertainty(self, X: np.array, y: np.array):
"""Filter out points with high label disagreement amongst its kNN neighbors.
Args:
X: an array of sample points.
Returns:
A subset of the array without points in the bottom alpha-fraction of
samples with highest disagreement amongst its k nearest neighbors.
"""
neigh = KNeighborsClassifier(n_neighbors=self.k)
neigh.fit(X, y)
confidence = neigh.predict_proba(X)
cutoff = np.percentile(confidence, self.alpha * 100)
unfiltered_idxs = np.where(confidence >= cutoff)[0]
return X[unfiltered_idxs, :], y[unfiltered_idxs]
def fit(self, X: np.array, y: np.array):
"""Initialize trust score precomputations with training data.
WARNING: assumes that the labels are 0-indexed (i.e.
0, 1,..., n_labels-1).
Args:
X: an array of sample points.
y: corresponding labels.
"""
self.n_labels = np.max(y) + 1
self.kdtrees = [None] * self.n_labels
if self.filtering == "uncertainty":
X_filtered, y_filtered = self.filter_by_uncertainty(X, y)
for label in range(self.n_labels):
if self.filtering == "none":
X_to_use = X[np.where(y == label)[0]]
self.kdtrees[label] = KDTree(X_to_use)
elif self.filtering == "density":
X_to_use = self.filter_by_density(X[np.where(y == label)[0]])
self.kdtrees[label] = KDTree(X_to_use)
elif self.filtering == "uncertainty":
X_to_use = X_filtered[np.where(y_filtered == label)[0]]
self.kdtrees[label] = KDTree(X_to_use)
if len(X_to_use) == 0:
print(
"Filtered too much or missing examples from a label! Please lower "
"alpha or check data."
)
def get_score(self, X: np.array, y_pred: np.array):
"""Compute the trust scores.
Given a set of points, determines the distance to each class.
Args:
X: an array of sample points.
y_pred: The predicted labels for these points.
Returns:
The trust score, which is ratio of distance to closest class that was not
the predicted class to the distance to the predicted class.
"""
d = np.tile(None, (X.shape[0], self.n_labels))
for label_idx in range(self.n_labels):
d[:, label_idx] = self.kdtrees[label_idx].query(X, k=2)[0][:, -1]
sorted_d = np.sort(d, axis=1)
d_to_pred = d[range(d.shape[0]), y_pred]
d_to_closest_not_pred = np.where(
sorted_d[:, 0] != d_to_pred, sorted_d[:, 0], sorted_d[:, 1]
)
return d_to_closest_not_pred / (d_to_pred + self.min_dist)
class KNNConfidence:
"""Baseline which uses disagreement to kNN classifier.
"""
def __init__(self, k=10):
self.k = k
def fit(self, X, y):
self.kdtree = KDTree(X)
self.y = y
def get_score(self, X, y_pred):
knn_idxs = self.kdtree.query(X, k=self.k)[1]
knn_outputs = self.y[knn_idxs]
return np.mean(
knn_outputs == np.transpose(np.tile(y_pred, (self.k, 1))), axis=1
)

View File

@ -1,286 +0,0 @@
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
from sklearn.model_selection import StratifiedShuffleSplit
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
import matplotlib.cm as cm
from sklearn.metrics import precision_recall_curve
import tensorflow as tf
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC
from sklearn.ensemble import RandomForestClassifier
def run_logistic(X_train, y_train, X_test, y_test, get_training=False):
model = LogisticRegression()
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
all_confidence = model.predict_proba(X_test)
confidences = all_confidence[range(len(y_pred)), y_pred]
if not get_training:
return y_pred, confidences
y_pred_training = model.predict(X_train)
all_confidence_training = model.predict_proba(X_train)
confidence_training = all_confidence_training[range(len(y_pred_training)),
y_pred_training]
return y_pred, confidences, y_pred_training, confidence_training
def run_linear_svc(X_train, y_train, X_test, y_test, get_training=False):
model = LinearSVC()
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
all_confidence = model.decision_function(X_test)
confidences = all_confidence[range(len(y_pred)), y_pred]
if not get_training:
return y_pred, confidences
y_pred_training = model.predict(X_train)
all_confidence_training = model.decision_function(X_train)
confidence_training = all_confidence_training[range(len(y_pred_training)),
y_pred_training]
return y_pred, confidences, y_pred_training, confidence_training
def run_random_forest(X_train, y_train, X_test, y_test, get_training=False):
model = RandomForestClassifier()
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
all_confidence = model.predict_proba(X_test)
confidences = all_confidence[range(len(y_pred)), y_pred]
if not get_training:
return y_pred, confidences
y_pred_training = model.predict(X_train)
all_confidence_training = model.predict_proba(X_train)
confidence_training = all_confidence_training[range(len(y_pred_training)),
y_pred_training]
return y_pred, confidences, y_pred_training, confidence_training
def run_simple_NN(X,
y,
X_test,
y_test,
num_iter=10000,
hidden_units=100,
learning_rate=0.05,
batch_size=100,
display_steps=1000,
n_layers=1,
get_training=False):
"""Run a NN with a single layer on some data.
Returns the predicted values as well as the confidences.
"""
n_labels = np.max(y) + 1
n_features = X.shape[1]
x = tf.placeholder(tf.float32, [None, n_features])
y_ = tf.placeholder(tf.float32, [None, n_labels])
def simple_NN(input_placeholder, n_layers):
W_in = weight_variable([n_features, hidden_units])
b_in = bias_variable([hidden_units])
W_mid = [
weight_variable([hidden_units, hidden_units])
for i in range(n_layers - 1)
]
b_mid = [bias_variable([hidden_units]) for i in range(n_layers - 1)]
W_out = weight_variable([hidden_units, n_labels])
b_out = bias_variable([n_labels])
layers = [tf.nn.relu(tf.matmul(input_placeholder, W_in) + b_in)]
for i in range(n_layers - 1):
layer = tf.nn.relu(tf.matmul(layers[-1], W_mid[i]) + b_mid[i])
layers.append(layer)
logits = tf.matmul(layers[-1], W_out) + b_out
return logits
NN_logits = simple_NN(x, n_layers)
cross_entropy = tf.reduce_mean(
tf.nn.softmax_cross_entropy_with_logits(labels=y_, logits=NN_logits))
train_step = tf.train.AdamOptimizer(learning_rate).minimize(cross_entropy)
correct_prediction = tf.equal(tf.argmax(NN_logits, 1), tf.argmax(y_, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
def one_hot(ns):
return np.eye(n_labels)[ns]
y_onehot = one_hot(y)
y_test_onehot = one_hot(y_test)
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
for i in range(num_iter):
ns = np.random.randint(0, len(X), size=batch_size)
if (i + 1) % display_steps == 0:
train_accuracy = accuracy.eval(feed_dict={x: X, y_: y_onehot})
test_accuracy = accuracy.eval(feed_dict={x: X_test, y_: y_test_onehot})
print("step %d, training accuracy %g, test accuracy %g" %
(i + 1, train_accuracy, test_accuracy))
train_step.run(feed_dict={x: X[ns, :], y_: y_onehot[ns, :]})
testing_logits = NN_logits.eval(feed_dict={x: X_test})
testing_prediction = tf.argmax(NN_logits, 1).eval(feed_dict={x: X_test})
NN_softmax = tf.nn.softmax(NN_logits).eval(feed_dict={x: X_test})
testing_confidence_raw = tf.reduce_max(NN_softmax,
1).eval(feed_dict={x: X_test})
if not get_training:
return testing_prediction, testing_confidence_raw
training_prediction = tf.argmax(NN_logits, 1).eval(feed_dict={x: X})
NN_softmax = tf.nn.softmax(NN_logits).eval(feed_dict={x: X})
training_confidence_raw = tf.reduce_max(NN_softmax,
1).eval(feed_dict={x: X})
return testing_prediction, testing_confidence_raw, training_prediction, training_confidence_raw
def plot_precision_curve(
extra_plot_title,
percentile_levels,
signal_names,
final_TPs,
final_stderrs,
final_misclassification,
model_name="Model",
colors=["blue", "darkorange", "brown", "red", "purple"],
legend_loc=None,
figure_size=None,
ylim=None):
if figure_size is not None:
plt.figure(figsize=figure_size)
title = "Precision Curve" if extra_plot_title == "" else extra_plot_title
plt.title(title, fontsize=20)
colors = colors + list(cm.rainbow(np.linspace(0, 1, len(final_TPs))))
plt.xlabel("Percentile level", fontsize=18)
plt.ylabel("Precision", fontsize=18)
for i, signal_name in enumerate(signal_names):
ls = "--" if ("Model" in signal_name) else "-"
plt.plot(
percentile_levels, final_TPs[i], ls, c=colors[i], label=signal_name)
plt.fill_between(
percentile_levels,
final_TPs[i] - final_stderrs[i],
final_TPs[i] + final_stderrs[i],
color=colors[i],
alpha=0.1)
if legend_loc is None:
if 0. in percentile_levels:
plt.legend(loc="lower right", fontsize=14)
else:
plt.legend(loc="upper left", fontsize=14)
else:
if legend_loc == "outside":
plt.legend(bbox_to_anchor=(1.04, 1), loc="upper left", fontsize=14)
else:
plt.legend(loc=legend_loc, fontsize=14)
if ylim is not None:
plt.ylim(*ylim)
model_acc = 100 * (1 - final_misclassification)
plt.axvline(x=model_acc, linestyle="dotted", color="black")
plt.show()
def run_precision_recall_experiment_general(X,
y,
n_repeats,
percentile_levels,
trainer,
test_size=0.5,
extra_plot_title="",
signals=[],
signal_names=[],
predict_when_correct=False,
skip_print=False):
def get_stderr(L):
return np.std(L) / np.sqrt(len(L))
all_signal_names = ["Model Confidence"] + signal_names
all_TPs = [[[] for p in percentile_levels] for signal in all_signal_names]
misclassifications = []
sign = 1 if predict_when_correct else -1
sss = StratifiedShuffleSplit(
n_splits=n_repeats, test_size=test_size, random_state=0)
for train_idx, test_idx in sss.split(X, y):
X_train = X[train_idx, :]
y_train = y[train_idx]
X_test = X[test_idx, :]
y_test = y[test_idx]
testing_prediction, testing_confidence_raw = trainer(
X_train, y_train, X_test, y_test)
target_points = np.where(
testing_prediction == y_test)[0] if predict_when_correct else np.where(
testing_prediction != y_test)[0]
final_signals = [testing_confidence_raw]
for signal in signals:
signal.fit(X_train, y_train)
final_signals.append(signal.get_score(X_test, testing_prediction))
for p, percentile_level in enumerate(percentile_levels):
all_high_confidence_points = [
np.where(sign * signal >= np.percentile(sign *
signal, percentile_level))[0]
for signal in final_signals
]
if 0 in map(len, all_high_confidence_points):
continue
TP = [
len(np.intersect1d(high_confidence_points, target_points)) /
(1. * len(high_confidence_points))
for high_confidence_points in all_high_confidence_points
]
for i in range(len(all_signal_names)):
all_TPs[i][p].append(TP[i])
misclassifications.append(len(target_points) / (1. * len(X_test)))
final_TPs = [[] for signal in all_signal_names]
final_stderrs = [[] for signal in all_signal_names]
for p, percentile_level in enumerate(percentile_levels):
for i in range(len(all_signal_names)):
final_TPs[i].append(np.mean(all_TPs[i][p]))
final_stderrs[i].append(get_stderr(all_TPs[i][p]))
if not skip_print:
print("Precision at percentile", percentile_level)
ss = ""
for i, signal_name in enumerate(all_signal_names):
ss += (signal_name + (": %.4f " % final_TPs[i][p]))
print(ss)
print()
final_misclassification = np.mean(misclassifications)
if not skip_print:
print("Misclassification rate mean/std", np.mean(misclassifications),
get_stderr(misclassifications))
for i in range(len(all_signal_names)):
final_TPs[i] = np.array(final_TPs[i])
final_stderrs[i] = np.array(final_stderrs[i])
plot_precision_curve(extra_plot_title, percentile_levels, all_signal_names,
final_TPs, final_stderrs, final_misclassification)
return (all_signal_names, final_TPs, final_stderrs, final_misclassification)