lab + some experimental methods based on distribution matching

This commit is contained in:
Alejandro Moreo Fernandez 2023-05-04 17:26:17 +02:00
parent 2df89c83e8
commit ca25f1d601
6 changed files with 564 additions and 2 deletions

View File

@ -0,0 +1,244 @@
from scipy.sparse import csc_matrix, csr_matrix
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.feature_extraction.text import TfidfTransformer, TfidfVectorizer, CountVectorizer
import numpy as np
from joblib import Parallel, delayed
import sklearn
import math
from scipy.stats import t
class ContTable:
def __init__(self, tp=0, tn=0, fp=0, fn=0):
self.tp=tp
self.tn=tn
self.fp=fp
self.fn=fn
def get_d(self): return self.tp + self.tn + self.fp + self.fn
def get_c(self): return self.tp + self.fn
def get_not_c(self): return self.tn + self.fp
def get_f(self): return self.tp + self.fp
def get_not_f(self): return self.tn + self.fn
def p_c(self): return (1.0*self.get_c())/self.get_d()
def p_not_c(self): return 1.0-self.p_c()
def p_f(self): return (1.0*self.get_f())/self.get_d()
def p_not_f(self): return 1.0-self.p_f()
def p_tp(self): return (1.0*self.tp) / self.get_d()
def p_tn(self): return (1.0*self.tn) / self.get_d()
def p_fp(self): return (1.0*self.fp) / self.get_d()
def p_fn(self): return (1.0*self.fn) / self.get_d()
def tpr(self):
c = 1.0*self.get_c()
return self.tp / c if c > 0.0 else 0.0
def fpr(self):
_c = 1.0*self.get_not_c()
return self.fp / _c if _c > 0.0 else 0.0
def __ig_factor(p_tc, p_t, p_c):
den = p_t * p_c
if den != 0.0 and p_tc != 0:
return p_tc * math.log(p_tc / den, 2)
else:
return 0.0
def information_gain(cell):
return __ig_factor(cell.p_tp(), cell.p_f(), cell.p_c()) + \
__ig_factor(cell.p_fp(), cell.p_f(), cell.p_not_c()) +\
__ig_factor(cell.p_fn(), cell.p_not_f(), cell.p_c()) + \
__ig_factor(cell.p_tn(), cell.p_not_f(), cell.p_not_c())
def squared_information_gain(cell):
return information_gain(cell)**2
def posneg_information_gain(cell):
ig = information_gain(cell)
if cell.tpr() < cell.fpr():
return -ig
else:
return ig
def pos_information_gain(cell):
if cell.tpr() < cell.fpr():
return 0
else:
return information_gain(cell)
def pointwise_mutual_information(cell):
return __ig_factor(cell.p_tp(), cell.p_f(), cell.p_c())
def gss(cell):
return cell.p_tp()*cell.p_tn() - cell.p_fp()*cell.p_fn()
def chi_square(cell):
den = cell.p_f() * cell.p_not_f() * cell.p_c() * cell.p_not_c()
if den==0.0: return 0.0
num = gss(cell)**2
return num / den
def conf_interval(xt, n):
if n>30:
z2 = 3.84145882069 # norm.ppf(0.5+0.95/2.0)**2
else:
z2 = t.ppf(0.5 + 0.95 / 2.0, df=max(n-1,1)) ** 2
p = (xt + 0.5 * z2) / (n + z2)
amplitude = 0.5 * z2 * math.sqrt((p * (1.0 - p)) / (n + z2))
return p, amplitude
def strength(minPosRelFreq, minPos, maxNeg):
if minPos > maxNeg:
return math.log(2.0 * minPosRelFreq, 2.0)
else:
return 0.0
#set cancel_features=True to allow some features to be weighted as 0 (as in the original article)
#however, for some extremely imbalanced dataset caused all documents to be 0
def conf_weight(cell, cancel_features=False):
c = cell.get_c()
not_c = cell.get_not_c()
tp = cell.tp
fp = cell.fp
pos_p, pos_amp = conf_interval(tp, c)
neg_p, neg_amp = conf_interval(fp, not_c)
min_pos = pos_p-pos_amp
max_neg = neg_p+neg_amp
den = (min_pos + max_neg)
minpos_relfreq = min_pos / (den if den != 0 else 1)
str_tplus = strength(minpos_relfreq, min_pos, max_neg);
if str_tplus == 0 and not cancel_features:
return 1e-20
return str_tplus;
def get_tsr_matrix(cell_matrix, tsr_score_funtion):
nC = len(cell_matrix)
nF = len(cell_matrix[0])
tsr_matrix = [[tsr_score_funtion(cell_matrix[c,f]) for f in range(nF)] for c in range(nC)]
return np.array(tsr_matrix)
def feature_label_contingency_table(positive_document_indexes, feature_document_indexes, nD):
tp_ = len(positive_document_indexes & feature_document_indexes)
fp_ = len(feature_document_indexes - positive_document_indexes)
fn_ = len(positive_document_indexes - feature_document_indexes)
tn_ = nD - (tp_ + fp_ + fn_)
return ContTable(tp=tp_, tn=tn_, fp=fp_, fn=fn_)
def category_tables(feature_sets, category_sets, c, nD, nF):
return [feature_label_contingency_table(category_sets[c], feature_sets[f], nD) for f in range(nF)]
def get_supervised_matrix(coocurrence_matrix, label_matrix, n_jobs=-1):
"""
Computes the nC x nF supervised matrix M where Mcf is the 4-cell contingency table for feature f and class c.
Efficiency O(nF x nC x log(S)) where S is the sparse factor
"""
nD, nF = coocurrence_matrix.shape
nD2, nC = label_matrix.shape
if nD != nD2:
raise ValueError('Number of rows in coocurrence matrix shape %s and label matrix shape %s is not consistent' %
(coocurrence_matrix.shape,label_matrix.shape))
def nonzero_set(matrix, col):
return set(matrix[:, col].nonzero()[0])
if isinstance(coocurrence_matrix, csr_matrix):
coocurrence_matrix = csc_matrix(coocurrence_matrix)
feature_sets = [nonzero_set(coocurrence_matrix, f) for f in range(nF)]
category_sets = [nonzero_set(label_matrix, c) for c in range(nC)]
cell_matrix = Parallel(n_jobs=n_jobs, backend="threading")(delayed(category_tables)(feature_sets, category_sets, c, nD, nF) for c in range(nC))
return np.array(cell_matrix)
class TSRweighting(BaseEstimator,TransformerMixin):
"""
Supervised Term Weighting function based on any Term Selection Reduction (TSR) function (e.g., information gain,
chi-square, etc.) or, more generally, on any function that could be computed on the 4-cell contingency table for
each category-feature pair.
The supervised_4cell_matrix (a CxF matrix containing the 4-cell contingency tables
for each category-feature pair) can be pre-computed (e.g., during the feature selection phase) and passed as an
argument.
When C>1, i.e., in multiclass scenarios, a global_policy is used in order to determine a single feature-score which
informs about its relevance. Accepted policies include "max" (takes the max score across categories), "ave" and "wave"
(take the average, or weighted average, across all categories -- weights correspond to the class prevalence), and "sum"
(which sums all category scores).
"""
def __init__(self, tsr_function, global_policy='max', supervised_4cell_matrix=None, sublinear_tf=True, norm='l2', min_df=3, n_jobs=-1):
if global_policy not in ['max', 'ave', 'wave', 'sum']: raise ValueError('Global policy should be in {"max", "ave", "wave", "sum"}')
self.tsr_function = tsr_function
self.global_policy = global_policy
self.supervised_4cell_matrix = supervised_4cell_matrix
self.sublinear_tf=sublinear_tf
self.norm=norm
self.min_df = min_df
self.n_jobs=n_jobs
def fit(self, X, y):
self.count_vectorizer = CountVectorizer(min_df=self.min_df)
X = self.count_vectorizer.fit_transform(X)
self.tf_vectorizer = TfidfTransformer(
norm=None, use_idf=False, smooth_idf=False, sublinear_tf=self.sublinear_tf).fit(X)
if len(y.shape) == 1:
y = np.expand_dims(y, axis=1)
nD, nC = y.shape
nF = len(self.tf_vectorizer.get_feature_names_out())
if self.supervised_4cell_matrix is None:
self.supervised_4cell_matrix = get_supervised_matrix(X, y, n_jobs=self.n_jobs)
else:
if self.supervised_4cell_matrix.shape != (nC, nF): raise ValueError("Shape of supervised information matrix is inconsistent with X and y")
tsr_matrix = get_tsr_matrix(self.supervised_4cell_matrix, self.tsr_function)
if self.global_policy == 'ave':
self.global_tsr_vector = np.average(tsr_matrix, axis=0)
elif self.global_policy == 'wave':
category_prevalences = [sum(y[:,c])*1.0/nD for c in range(nC)]
self.global_tsr_vector = np.average(tsr_matrix, axis=0, weights=category_prevalences)
elif self.global_policy == 'sum':
self.global_tsr_vector = np.sum(tsr_matrix, axis=0)
elif self.global_policy == 'max':
self.global_tsr_vector = np.amax(tsr_matrix, axis=0)
return self
def fit_transform(self, X, y):
return self.fit(X,y).transform(X)
def transform(self, X):
if not hasattr(self, 'global_tsr_vector'): raise NameError('TSRweighting: transform method called before fit.')
X = self.count_vectorizer.transform(X)
tf_X = self.tf_vectorizer.transform(X).toarray()
weighted_X = np.multiply(tf_X, self.global_tsr_vector)
if self.norm is not None and self.norm!='none':
weighted_X = sklearn.preprocessing.normalize(weighted_X, norm=self.norm, axis=1, copy=False)
return csr_matrix(weighted_X)

View File

0
laboratory/main.py Normal file
View File

148
laboratory/method_dxs.py Normal file
View File

@ -0,0 +1,148 @@
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.linear_model import LogisticRegression
import quapy as qp
from data import LabelledCollection
import numpy as np
from laboratory.custom_vectorizers import *
from protocol import APP
from quapy.method.aggregative import _get_divergence, HDy, DistributionMatching
from quapy.method.base import BaseQuantifier
from scipy import optimize
import pandas as pd
# TODO: explore the bernoulli (term presence/absence) variant
# TODO: explore the multinomial (term frequency) variant
# TODO: explore the multinomial + length normalization variant
# TODO: consolidate the TSR-variant (e.g., using information gain) variant;
# - works better with the idf?
# - works better with length normalization?
# - etc
class DxS(BaseQuantifier):
def __init__(self, vectorizer=None, divergence='topsoe'):
self.vectorizer = vectorizer
self.divergence = divergence
# def __as_distribution(self, instances):
# return np.asarray(instances.sum(axis=0) / instances.sum()).flatten()
def __as_distribution(self, instances):
dist = instances.sum(axis=0) / instances.sum()
return np.asarray(dist).flatten()
def fit(self, data: LabelledCollection):
text_instances, labels = data.Xy
if self.vectorizer is not None:
text_instances = self.vectorizer.fit_transform(text_instances, y=labels)
distributions = []
for class_i in data.classes_:
distributions.append(self.__as_distribution(text_instances[labels == class_i]))
self.validation_distribution = np.asarray(distributions)
return self
def quantify(self, text_instances):
if self.vectorizer is not None:
text_instances = self.vectorizer.transform(text_instances)
test_distribution = self.__as_distribution(text_instances)
divergence = _get_divergence(self.divergence)
n_classes, n_feats = self.validation_distribution.shape
def match(prev):
prev = np.expand_dims(prev, axis=0)
mixture_distribution = (prev @ self.validation_distribution).flatten()
return divergence(test_distribution, mixture_distribution)
# the initial point is set as the uniform distribution
uniform_distribution = np.full(fill_value=1 / n_classes, shape=(n_classes,))
# solutions are bounded to those contained in the unit-simplex
bounds = tuple((0, 1) for x in range(n_classes)) # values in [0,1]
constraints = ({'type': 'eq', 'fun': lambda x: 1 - sum(x)}) # values summing up to 1
r = optimize.minimize(match, x0=uniform_distribution, method='SLSQP', bounds=bounds, constraints=constraints)
return r.x
if __name__ == '__main__':
qp.environ['SAMPLE_SIZE'] = 250
qp.environ['N_JOBS'] = -1
min_df = 10
# dataset = 'imdb'
repeats = 10
error = 'mae'
div = 'HD'
# generates tuples (dataset, method, method_name)
# (the dataset is needed for methods that process the dataset differently)
def gen_methods():
for dataset in qp.datasets.REVIEWS_SENTIMENT_DATASETS:
data = qp.datasets.fetch_reviews(dataset, tfidf=False)
bernoulli_vectorizer = CountVectorizer(min_df=min_df, binary=True)
dxs = DxS(divergence=div, vectorizer=bernoulli_vectorizer)
yield data, dxs, 'DxS-Bernoulli'
multinomial_vectorizer = CountVectorizer(min_df=min_df, binary=False)
dxs = DxS(divergence=div, vectorizer=multinomial_vectorizer)
yield data, dxs, 'DxS-multinomial'
tf_vectorizer = TfidfVectorizer(sublinear_tf=False, use_idf=False, min_df=min_df, norm=None)
dxs = DxS(divergence=div, vectorizer=tf_vectorizer)
yield data, dxs, 'DxS-TF'
logtf_vectorizer = TfidfVectorizer(sublinear_tf=True, use_idf=False, min_df=min_df, norm=None)
dxs = DxS(divergence=div, vectorizer=logtf_vectorizer)
yield data, dxs, 'DxS-logTF'
tfidf_vectorizer = TfidfVectorizer(use_idf=True, min_df=min_df, norm=None)
dxs = DxS(divergence=div, vectorizer=tfidf_vectorizer)
yield data, dxs, 'DxS-TFIDF'
tfidf_vectorizer = TfidfVectorizer(use_idf=True, min_df=min_df, norm='l2')
dxs = DxS(divergence=div, vectorizer=tfidf_vectorizer)
yield data, dxs, 'DxS-TFIDF-l2'
tsr_vectorizer = TSRweighting(tsr_function=information_gain, min_df=min_df, norm='l2')
dxs = DxS(divergence=div, vectorizer=tsr_vectorizer)
yield data, dxs, 'DxS-TFTSR-l2'
data = qp.datasets.fetch_reviews(dataset, tfidf=True, min_df=min_df)
hdy = HDy(LogisticRegression())
yield data, hdy, 'HDy'
dm = DistributionMatching(LogisticRegression(), divergence=div, nbins=5)
yield data, dm, 'DM-5b'
dm = DistributionMatching(LogisticRegression(), divergence=div, nbins=10)
yield data, dm, 'DM-10b'
result_path = 'results.csv'
with open(result_path, 'wt') as csv:
csv.write(f'Method\tDataset\tMAE\tMRAE\n')
for data, quantifier, quant_name in gen_methods():
quantifier.fit(data.training)
report = qp.evaluation.evaluation_report(quantifier, APP(data.test, repeats=repeats), error_metrics=['mae','mrae'], verbose=True)
means = report.mean()
csv.write(f'{quant_name}\t{data.name}\t{means["mae"]:.5f}\t{means["mrae"]:.5f}\n')
df = pd.read_csv(result_path, sep='\t')
# print(df)
pv = df.pivot_table(index='Method', columns="Dataset", values=["MAE", "MRAE"])
print(pv)

168
laboratory/method_kdey.py Normal file
View File

@ -0,0 +1,168 @@
from typing import Union, Callable
import numpy as np
from sklearn.base import BaseEstimator
from sklearn.linear_model import LogisticRegression
import pandas as pd
from sklearn.neighbors import KernelDensity
import quapy as qp
from data import LabelledCollection
from protocol import APP, UPP
from quapy.method.aggregative import AggregativeProbabilisticQuantifier, _training_helper, cross_generate_predictions, \
DistributionMatching, _get_divergence
import scipy
from scipy import optimize
class KDEy(AggregativeProbabilisticQuantifier):
BANDWIDTH_METHOD = ['auto', 'scott', 'silverman']
ENGINE = ['scipy', 'sklearn']
def __init__(self, classifier: BaseEstimator, val_split=0.4, divergence: Union[str, Callable]='HD',
bandwidth_method='scott', engine='sklearn', n_jobs=None):
self.classifier = classifier
self.val_split = val_split
self.divergence = divergence
self.bandwidth_method = bandwidth_method
self.engine = engine
self.n_jobs = n_jobs
assert bandwidth_method in KDEy.BANDWIDTH_METHOD, f'unknown bandwidth_method, valid ones are {KDEy.BANDWIDTH_METHOD}'
assert engine in KDEy.ENGINE, f'unknown engine, valid ones are {KDEy.ENGINE}'
def get_kde(self, posteriors):
if self.engine == 'scipy':
# scipy treats columns as datapoints, and need the datapoints not to lie in a lower-dimensional subspace, which
# requires removing the last dimension which is constrained
posteriors = posteriors[:,:-1].T
kde = scipy.stats.gaussian_kde(posteriors)
kde.set_bandwidth(self.bandwidth_method)
elif self.engine == 'sklearn':
kde = KernelDensity(bandwidth=self.bandwidth_method).fit(posteriors)
return kde
def pdf(self, kde, posteriors):
if self.engine == 'scipy':
return kde(posteriors[:,:-1].T)
elif self.engine == 'sklearn':
return np.exp(kde.score_samples(posteriors))
def fit(self, data: LabelledCollection, fit_classifier=True, val_split: Union[float, LabelledCollection] = None):
"""
Trains the classifier (if requested) and generates the validation distributions out of the training data.
The validation distributions have shape `(n, ch, nbins)`, with `n` the number of classes, `ch` the number of
channels (a channel is a description, in form of a histogram, of a specific class -- there are as many channels
as classes, although in the binary case one can use only one channel, since the other one is constrained),
and `nbins` the number of bins. In particular, let `V` be the validation distributions; `di=V[i]`
are the distributions obtained from training data labelled with class `i`; `dij = di[j]` is the discrete
distribution of posterior probabilities `P(Y=j|X=x)` for training data labelled with class `i`, and `dij[k]`
is the fraction of instances with a value in the `k`-th bin.
:param data: the training set
:param fit_classifier: set to False to bypass the training (the learner is assumed to be already fit)
:param val_split: either a float in (0,1) indicating the proportion of training instances to use for
validation (e.g., 0.3 for using 30% of the training set as validation data), or a LabelledCollection
indicating the validation set itself, or an int indicating the number k of folds to be used in kFCV
to estimate the parameters
"""
if val_split is None:
val_split = self.val_split
self.classifier, y, posteriors, classes, class_count = cross_generate_predictions(
data, self.classifier, val_split, probabilistic=True, fit_classifier=fit_classifier, n_jobs=self.n_jobs
)
self.val_densities = [self.get_kde(posteriors[y == cat]) for cat in range(data.n_classes)]
self.val_posteriors = posteriors
return self
def val_pdf(self, prev):
"""
Returns a function that computes the mixture model with the given prev as mixture factor
:param prev: a prevalence vector, ndarray
:return: a function implementing the validation distribution with fixed mixture factor
"""
return lambda posteriors: sum(prev_i * self.pdf(kde_i, posteriors) for kde_i, prev_i in zip(self.val_densities, prev))
def aggregate(self, posteriors: np.ndarray):
"""
Searches for the mixture model parameter (the sought prevalence values) that yields a validation distribution
(the mixture) that best matches the test distribution, in terms of the divergence measure of choice.
In the multiclass case, with `n` the number of classes, the test and mixture distributions contain
`n` channels (proper distributions of binned posterior probabilities), on which the divergence is computed
independently. The matching is computed as an average of the divergence across all channels.
:param instances: instances in the sample
:return: a vector of class prevalence estimates
"""
test_density = self.get_kde(posteriors)
# val_test_posteriors = np.concatenate([self.val_posteriors, posteriors])
test_likelihood = self.pdf(test_density, posteriors)
divergence = _get_divergence(self.divergence)
n_classes = len(self.val_densities)
def match(prev):
val_pdf = self.val_pdf(prev)
val_likelihood = val_pdf(posteriors)
return divergence(val_likelihood, test_likelihood)
# the initial point is set as the uniform distribution
uniform_distribution = np.full(fill_value=1 / n_classes, shape=(n_classes,))
# solutions are bounded to those contained in the unit-simplex
bounds = tuple((0, 1) for _ in range(n_classes)) # values in [0,1]
constraints = ({'type': 'eq', 'fun': lambda x: 1 - sum(x)}) # values summing up to 1
r = optimize.minimize(match, x0=uniform_distribution, method='SLSQP', bounds=bounds, constraints=constraints)
return r.x
if __name__ == '__main__':
qp.environ['SAMPLE_SIZE'] = 100
qp.environ['N_JOBS'] = -1
div = 'HD'
# generates tuples (dataset, method, method_name)
# (the dataset is needed for methods that process the dataset differently)
def gen_methods():
for dataset in qp.datasets.TWITTER_SENTIMENT_DATASETS_TEST:
data = qp.datasets.fetch_twitter(dataset, min_df=3, pickle=True)
# kdey = KDEy(LogisticRegression(), divergence=div, bandwidth_method='scott')
# yield data, kdey, f'KDEy-{div}-scott'
kdey = KDEy(LogisticRegression(), divergence=div, bandwidth_method='silverman', engine='sklearn')
yield data, kdey, f'KDEy-{div}-silverman'
dm = DistributionMatching(LogisticRegression(), divergence=div, nbins=5)
yield data, dm, f'DM-5b-{div}'
# dm = DistributionMatching(LogisticRegression(), divergence=div, nbins=10)
# yield data, dm, f'DM-10b-{div}'
result_path = 'results_kdey.csv'
with open(result_path, 'wt') as csv:
csv.write(f'Method\tDataset\tMAE\tMRAE\n')
for data, quantifier, quant_name in gen_methods():
quantifier.fit(data.training)
protocol = UPP(data.test, repeats=100)
report = qp.evaluation.evaluation_report(quantifier, protocol, error_metrics=['mae','mrae'], verbose=True)
means = report.mean()
csv.write(f'{quant_name}\t{data.name}\t{means["mae"]:.5f}\t{means["mrae"]:.5f}\n')
csv.flush()
df = pd.read_csv(result_path, sep='\t')
# print(df)
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
pv = df.pivot_table(index='Dataset', columns="Method", values=["MAE", "MRAE"])
print(pv)

View File

@ -770,7 +770,9 @@ class DistributionMatching(AggregativeProbabilisticQuantifier):
""" """
Trains the classifier (if requested) and generates the validation distributions out of the training data. Trains the classifier (if requested) and generates the validation distributions out of the training data.
The validation distributions have shape `(n, ch, nbins)`, with `n` the number of classes, `ch` the number of The validation distributions have shape `(n, ch, nbins)`, with `n` the number of classes, `ch` the number of
channels, and `nbins` the number of bins. In particular, let `V` be the validation distributions; `di=V[i]` channels (a channel is a description, in form of a histogram, of a specific class -- there are as many channels
as classes, although in the binary case one can use only one channel, since the other one is constrained),
and `nbins` the number of bins. In particular, let `V` be the validation distributions; `di=V[i]`
are the distributions obtained from training data labelled with class `i`; `dij = di[j]` is the discrete are the distributions obtained from training data labelled with class `i`; `dij = di[j]` is the discrete
distribution of posterior probabilities `P(Y=j|X=x)` for training data labelled with class `i`, and `dij[k]` distribution of posterior probabilities `P(Y=j|X=x)` for training data labelled with class `i`, and `dij[k]`
is the fraction of instances with a value in the `k`-th bin. is the fraction of instances with a value in the `k`-th bin.
@ -819,7 +821,7 @@ class DistributionMatching(AggregativeProbabilisticQuantifier):
uniform_distribution = np.full(fill_value=1 / n_classes, shape=(n_classes,)) uniform_distribution = np.full(fill_value=1 / n_classes, shape=(n_classes,))
# solutions are bounded to those contained in the unit-simplex # solutions are bounded to those contained in the unit-simplex
bounds = tuple((0, 1) for x in range(n_classes)) # values in [0,1] bounds = tuple((0, 1) for _ in range(n_classes)) # values in [0,1]
constraints = ({'type': 'eq', 'fun': lambda x: 1 - sum(x)}) # values summing up to 1 constraints = ({'type': 'eq', 'fun': lambda x: 1 - sum(x)}) # values summing up to 1
r = optimize.minimize(match, x0=uniform_distribution, method='SLSQP', bounds=bounds, constraints=constraints) r = optimize.minimize(match, x0=uniform_distribution, method='SLSQP', bounds=bounds, constraints=constraints)
return r.x return r.x