1
0
Fork 0
QuaPy/eDiscovery/method.py

326 lines
11 KiB
Python

from typing import Union
import numpy as np
from sklearn.base import BaseEstimator, clone
from sklearn.cluster import KMeans, OPTICS
from sklearn.decomposition import TruncatedSVD
from sklearn.linear_model import LogisticRegression
from sklearn.mixture import GaussianMixture
from quapy.method.base import BaseQuantifier, BinaryQuantifier
from quapy.data import LabelledCollection
from quapy.method.aggregative import ACC, PACC, PCC
class RegionAdjustmentQ(BaseQuantifier):
def __init__(self, quantifier: BaseQuantifier, k=10):
self.quantifier = quantifier
self.k = k # number of regions
def fit(self, data: LabelledCollection):
X, y = data.Xy
Xp, Xn = X[y==1], X[y==0]
nk_per_class = (data.prevalence() * self.k).round().astype(int)
print(f'number of regions per class {nk_per_class}')
kmeans_neg = KMeans(n_clusters=nk_per_class[0])
rn = kmeans_neg.fit_predict(Xn) # regions negative
kmeans_pos = KMeans(n_clusters=nk_per_class[1])
rp = kmeans_pos.fit_predict(Xp) + nk_per_class[0] # regions positive
classes = np.arange(self.k)
pos = LabelledCollection(Xp, rp, classes_=classes)
neg = LabelledCollection(Xn, rn, classes_=classes)
region_data = pos + neg
self.quantifier.fit(region_data)
self.reg2class = {r: (0 if r < nk_per_class[0] else 1) for r in range(2 * self.k)}
return self
def quantify(self, instances):
region_prevalence = self.quantifier.quantify(instances)
bin_prevalence = np.zeros(shape=2, dtype=np.float)
for r, prev in enumerate(region_prevalence):
bin_prevalence[self.reg2class[r]] += prev
return bin_prevalence
def set_params(self, **parameters):
pass
def get_params(self, deep=True):
pass
@property
def classes_(self):
return np.asarray([0,1])
class RegionAdjustment(ACC):
def __init__(self, learner: BaseEstimator, val_split=0.4, k=2):
self.learner = learner
self.val_split = val_split
# lets say k is the number of regions (here: clusters of k-means) for each class
self.k = k
def fit(self, data: LabelledCollection, fit_learner=True, val_split: Union[float, int, LabelledCollection] = None):
X, y = data.Xy
Xp, Xn = X[y==1], X[y==0]
nk_per_class = (data.prevalence() * self.k).round().astype(int)
print(f'number of clusters per class {nk_per_class}')
kmeans_neg = KMeans(n_clusters=nk_per_class[0])
rn = kmeans_neg.fit_predict(Xn) # regions negative
kmeans_pos = KMeans(n_clusters=nk_per_class[1])
rp = kmeans_pos.fit_predict(Xp) + nk_per_class[0] # regions positive
classes = np.arange(self.k)
pos = LabelledCollection(Xp, rp, classes_=classes)
neg = LabelledCollection(Xn, rn, classes_=classes)
region_data = pos + neg
super(RegionProbAdjustment, self).fit(region_data, fit_learner, val_split)
self.reg2class = {r: (0 if r < nk_per_class[0] else 1) for r in range(2 * self.k)}
return self
def classify(self, data):
regions = super(RegionAdjustment, self).classify(data)
return regions
def aggregate(self, classif_predictions):
region_prevalence = super(RegionAdjustment, self).aggregate(classif_predictions)
bin_prevalence = np.zeros(shape=2, dtype=np.float)
for r, prev in enumerate(region_prevalence):
bin_prevalence[self.reg2class[r]] += prev
return bin_prevalence
class RegionProbAdjustment(PACC):
def __init__(self, learner: BaseEstimator, val_split=0.4, k=2):
self.learner = learner
self.val_split = val_split
# lets say k is the number of regions (here: clusters of k-means) for all classes
self.k = k
def fit(self, data: LabelledCollection, fit_learner=True, val_split: Union[float, int, LabelledCollection] = None):
X, y = data.Xy
Xp, Xn = X[y==1], X[y==0]
nk_per_class = (data.prevalence()*self.k).round().astype(int)
print(f'number of clusters per class {nk_per_class}')
kmeans_neg = KMeans(n_clusters=nk_per_class[0])
rn = kmeans_neg.fit_predict(Xn) # regions negative
kmeans_pos = KMeans(n_clusters=nk_per_class[1])
rp = kmeans_pos.fit_predict(Xp)+nk_per_class[0] # regions positive
classes = np.arange(self.k)
pos = LabelledCollection(Xp, rp, classes_=classes)
neg = LabelledCollection(Xn, rn, classes_=classes)
region_data = pos + neg
super(RegionProbAdjustment, self).fit(region_data, fit_learner, val_split)
self.reg2class = {r:(0 if r < nk_per_class[0] else 1) for r in range(2*self.k)}
return self
def classify(self, data):
regions = super(RegionProbAdjustment, self).classify(data)
return regions
def aggregate(self, classif_predictions):
region_prevalence = super(RegionProbAdjustment, self).aggregate(classif_predictions)
bin_prevalence = np.zeros(shape=2, dtype=np.float)
for r, prev in enumerate(region_prevalence):
bin_prevalence[self.reg2class[r]] += prev
return bin_prevalence
class RegionProbAdjustmentGlobal(BaseQuantifier):
def __init__(self, quantifier_fn: BaseQuantifier, k=5, clustering='gmm'):
self.quantifier_fn = quantifier_fn
self.k = k
self.clustering = clustering
def _find_regions(self, X):
if self.clustering == 'gmm':
self.svd = TruncatedSVD(n_components=500)
X = self.svd.fit_transform(X)
lowest_bic = np.infty
bic = []
for n_components in range(3, 8):
# Fit a Gaussian mixture with EM
gmm = GaussianMixture(n_components).fit(X)
bic.append(gmm.bic(X))
print(bic)
if bic[-1] < lowest_bic:
lowest_bic = bic[-1]
best_gmm = gmm
print(f'choosen GMM with {len(best_gmm.weights_)} components')
self.cluster = best_gmm
regions = self.cluster.predict(X)
elif self.clustering == 'kmeans':
print(f'kmeans with k={self.k}')
self.cluster = KMeans(n_clusters=self.k)
regions = self.cluster.fit_predict(X)
elif self.clustering == 'optics':
print('optics')
self.svd = TruncatedSVD(n_components=500)
X = self.svd.fit_transform(X)
self.cluster = OPTICS()
regions = self.cluster.fit_predict(X)
else:
raise NotImplementedError
return regions
def _get_regions(self, X):
if self.clustering == 'gmm':
return self.cluster.predict(self.svd.transform(X))
elif self.clustering == 'kmeans':
return self.cluster.predict(X)
elif self.clustering == 'optics':
return self.cluster.predict(self.svd.transform(X))
else:
raise NotImplementedError
def fit(self, data: LabelledCollection, fit_learner=True, val_split: Union[float, int, LabelledCollection] = None):
self.classes = data.classes_
# first k-means (all classes involved), then PACC local to each cluster
g = self._find_regions(data.instances)
# g = self._get_regions(data.instances)
X, y = data.Xy
self.g_quantifiers = {}
trivial, trivial_data = 0, 0
for gi in np.unique(g):
qi_data = LabelledCollection(X[g==gi], y[g==gi], classes_=data.classes_)
if qi_data.counts()[1] <= 1:
# check for <= 1 instead of prevalence==0, since PACC requires at least two
# examples for performing stratified split
# some class is (almost) empty
# if qi_data.prevalence()[0] == 1: # all negatives
self.g_quantifiers[gi] = TrivialRejectorQuantifier()
trivial+=1
trivial_data += len(qi_data)
elif qi_data.counts()[0] <= 1: # (almost) all positives
self.g_quantifiers[gi] = TrivialAcceptorQuantifier()
trivial += 1
trivial_data += len(qi_data)
else:
self.g_quantifiers[gi] = self.quantifier_fn().fit(qi_data)
print(f'trivials={trivial} amounting to {trivial_data*100.0/len(data):.2f}% of the data')
return self
@property
def classes_(self):
return self.classes
def quantify(self, instances):
# g = self.cluster.predict(instances)
g = self._get_regions(instances)
prevalence = np.zeros(len(self.classes_), dtype=np.float)
for gi in np.unique(g):
proportion_gi = (g==gi).mean()
prev_gi = self.g_quantifiers[gi].quantify(instances[g==gi])
prevalence += prev_gi * proportion_gi
return prevalence
def get_params(self, deep=True):
pass
def set_params(self, **parameters):
pass
class TrivialRejectorQuantifier(BinaryQuantifier):
def fit(self, data: LabelledCollection):
return self
def quantify(self, instances):
return np.asarray([1,0])
def set_params(self, **parameters):
pass
def get_params(self, deep=True):
pass
@property
def classes_(self):
return np.asarray([0,1])
class TrivialAcceptorQuantifier(BinaryQuantifier):
def fit(self, data: LabelledCollection):
return self
def quantify(self, instances):
return np.asarray([0,1])
def set_params(self, **parameters):
pass
def get_params(self, deep=True):
pass
@property
def classes_(self):
return np.asarray([0,1])
class ClassWeightPCC(BaseQuantifier):
def __init__(self, estimator=LogisticRegression):
self.estimator = estimator
self.learner = PACC(self.estimator())
self.deployed = False
def fit(self, data: LabelledCollection, fit_learner=True):
self.train = data
self.learner.fit(self.train)
return self
def quantify(self, instances):
guessed_prevalence = self.learner.quantify(instances)
class_weight = self._get_class_weight(guessed_prevalence)
base_estimator = clone(self.learner.learner)
base_estimator.set_params(class_weight=class_weight)
pcc = PCC(base_estimator)
return pcc.fit(self.train).quantify(instances)
def _get_class_weight(self, prevalence):
# class_weight = compute_class_weight('balanced', classes=[0, 1], y=mock_y(prevalence))
# return {0: class_weight[1], 1: class_weight[0]}
# weights = prevalence/prevalence.min()
weights = prevalence / self.train.prevalence()
normfactor = weights.min()
if normfactor <= 0:
normfactor = 1E-3
weights /= normfactor
return {0:weights[0], 1:weights[1]}
def set_params(self, **parameters):
# parameters = {p:v for p,v in parameters.items()}
# print(parameters)
self.learner.set_params(**parameters)
def get_params(self, deep=True):
return self.learner.get_params()
@property
def classes_(self):
return self.train.classes_