QuaPy/BayesianKDEy/kdey_bandwidth_selection.py

135 lines
3.7 KiB
Python

import numpy as np
from sklearn.linear_model import LogisticRegression, LogisticRegressionCV
from sklearn.neighbors import KernelDensity
from scipy.special import logsumexp
from sklearn.model_selection import StratifiedKFold, cross_val_predict
def class_scale_factors(X, y):
lambdas = {}
scales = []
for c in np.unique(y):
Xc = X[y == c]
cov = np.cov(Xc.T)
scale = np.trace(cov)
lambdas[c] = scale
scales.append(scale)
mean_scale = np.mean(scales)
for c in lambdas:
lambdas[c] /= mean_scale
return lambdas
class ClassConditionalKDE:
def __init__(self, kernel="gaussian", lambdas=None):
self.kernel = kernel
self.lambdas = lambdas or {}
self.models = {}
def fit(self, X, y, bandwidth):
self.classes_ = np.unique(y)
for c in self.classes_:
h_c = bandwidth * self.lambdas.get(c, 1.0)
kde = KernelDensity(kernel=self.kernel, bandwidth=h_c)
kde.fit(X[y == c])
self.models[c] = kde
return self
def log_density(self, X):
logp = np.column_stack([
self.models[c].score_samples(X)
for c in self.classes_
])
return logp
def conditional_log_likelihood(logp, y, priors=None):
if priors is None:
priors = np.ones(logp.shape[1]) / logp.shape[1]
log_prior = np.log(priors)
log_joint = logp + log_prior
denom = logsumexp(log_joint, axis=1)
num = log_joint[np.arange(len(y)), y]
return np.sum(num - denom)
def cv_objective(
bandwidth,
X,
y,
lambdas,
kernel="gaussian",
n_splits=5,
):
skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=0)
score = 0.0
for train, test in skf.split(X, y):
model = ClassConditionalKDE(kernel=kernel, lambdas=lambdas)
model.fit(X[train], y[train], bandwidth)
logp = model.log_density(X[test])
score += conditional_log_likelihood(logp, y[test])
return score
if __name__ == '__main__':
from BayesianKDEy.datasets import UCIMulticlassHandler
from quapy.method.aggregative import KDEyML
from quapy.model_selection import GridSearchQ
from quapy.evaluation import evaluation_report
dataset = UCIMulticlassHandler('academic-success')
training = dataset.get_training()
X, y = training.Xy
cls = LogisticRegression()
P = cross_val_predict(cls, X, y, cv=5, n_jobs=-1, method='predict_proba')
bandwidths = np.logspace(-3, 0, 50)
lambdas=None
scores = [
cv_objective(h, P, y, lambdas)
for h in bandwidths
]
best_h = bandwidths[np.argmax(scores)]
print(best_h)
cls = LogisticRegression()
kdey = KDEyML(cls, val_split=5, random_state=0)
train, val_prot = dataset.get_train_valprot_for_modsel()
modsel = GridSearchQ(kdey, param_grid={'bandwidth': bandwidths}, protocol=val_prot, n_jobs=-1, verbose=True)
modsel.fit(*train.Xy)
best_bandwidth = modsel.best_params_['bandwidth']
print(best_bandwidth)
print(f'First experiment, with bandwidth={best_h:.4f}')
cls = LogisticRegression()
kdey=KDEyML(cls, val_split=5, random_state=0, bandwidth=best_h)
train, test_prot = dataset.get_train_testprot_for_eval()
kdey.fit(*train.Xy)
report = evaluation_report(kdey, test_prot, error_metrics=['ae'])
print(report.mean(numeric_only=True))
print(f'Second experiment, with bandwidth={best_bandwidth:.4f}')
cls = LogisticRegression()
kdey=KDEyML(cls, val_split=5, random_state=0, bandwidth=best_bandwidth)
# train, test_prot = dataset.get_train_testprot_for_eval()
kdey.fit(*train.Xy)
report = evaluation_report(kdey, test_prot, error_metrics=['ae'])
print(report.mean(numeric_only=True))