import numpy as np from sklearn.linear_model import LogisticRegression, LogisticRegressionCV from sklearn.neighbors import KernelDensity from scipy.special import logsumexp from sklearn.model_selection import StratifiedKFold, cross_val_predict def class_scale_factors(X, y): lambdas = {} scales = [] for c in np.unique(y): Xc = X[y == c] cov = np.cov(Xc.T) scale = np.trace(cov) lambdas[c] = scale scales.append(scale) mean_scale = np.mean(scales) for c in lambdas: lambdas[c] /= mean_scale return lambdas class ClassConditionalKDE: def __init__(self, kernel="gaussian", lambdas=None): self.kernel = kernel self.lambdas = lambdas or {} self.models = {} def fit(self, X, y, bandwidth): self.classes_ = np.unique(y) for c in self.classes_: h_c = bandwidth * self.lambdas.get(c, 1.0) kde = KernelDensity(kernel=self.kernel, bandwidth=h_c) kde.fit(X[y == c]) self.models[c] = kde return self def log_density(self, X): logp = np.column_stack([ self.models[c].score_samples(X) for c in self.classes_ ]) return logp def conditional_log_likelihood(logp, y, priors=None): if priors is None: priors = np.ones(logp.shape[1]) / logp.shape[1] log_prior = np.log(priors) log_joint = logp + log_prior denom = logsumexp(log_joint, axis=1) num = log_joint[np.arange(len(y)), y] return np.sum(num - denom) def cv_objective( bandwidth, X, y, lambdas, kernel="gaussian", n_splits=5, ): skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=0) score = 0.0 for train, test in skf.split(X, y): model = ClassConditionalKDE(kernel=kernel, lambdas=lambdas) model.fit(X[train], y[train], bandwidth) logp = model.log_density(X[test]) score += conditional_log_likelihood(logp, y[test]) return score if __name__ == '__main__': from BayesianKDEy.datasets import UCIMulticlassHandler from quapy.method.aggregative import KDEyML from quapy.model_selection import GridSearchQ from quapy.evaluation import evaluation_report dataset = UCIMulticlassHandler('academic-success') training = dataset.get_training() X, y = training.Xy cls = LogisticRegression() P = cross_val_predict(cls, X, y, cv=5, n_jobs=-1, method='predict_proba') bandwidths = np.logspace(-3, 0, 50) lambdas=None scores = [ cv_objective(h, P, y, lambdas) for h in bandwidths ] best_h = bandwidths[np.argmax(scores)] print(best_h) cls = LogisticRegression() kdey = KDEyML(cls, val_split=5, random_state=0) train, val_prot = dataset.get_train_valprot_for_modsel() modsel = GridSearchQ(kdey, param_grid={'bandwidth': bandwidths}, protocol=val_prot, n_jobs=-1, verbose=True) modsel.fit(*train.Xy) best_bandwidth = modsel.best_params_['bandwidth'] print(best_bandwidth) print(f'First experiment, with bandwidth={best_h:.4f}') cls = LogisticRegression() kdey=KDEyML(cls, val_split=5, random_state=0, bandwidth=best_h) train, test_prot = dataset.get_train_testprot_for_eval() kdey.fit(*train.Xy) report = evaluation_report(kdey, test_prot, error_metrics=['ae']) print(report.mean(numeric_only=True)) print(f'Second experiment, with bandwidth={best_bandwidth:.4f}') cls = LogisticRegression() kdey=KDEyML(cls, val_split=5, random_state=0, bandwidth=best_bandwidth) # train, test_prot = dataset.get_train_testprot_for_eval() kdey.fit(*train.Xy) report = evaluation_report(kdey, test_prot, error_metrics=['ae']) print(report.mean(numeric_only=True))