345 lines
11 KiB
Python
345 lines
11 KiB
Python
import inspect
|
|
from abc import ABC, abstractmethod
|
|
from functools import lru_cache
|
|
|
|
import numpy as np
|
|
from pathlib import Path
|
|
import quapy as qp
|
|
|
|
from commons import MockClassifierFromPosteriors
|
|
from quapy.protocol import UPP
|
|
from quapy.data import LabelledCollection
|
|
from quapy.method.aggregative import EMQ
|
|
|
|
|
|
|
|
def fetchVisual(modality, dataset, net, data_home='./data'):
|
|
MODALITY = ('features', 'predictions', 'logits')
|
|
assert modality in MODALITY, f'unknown modality, valid ones are {MODALITY}'
|
|
|
|
file_prefix = f'{dataset}_{net}'
|
|
data_home = Path(data_home) / file_prefix
|
|
|
|
# Load training data
|
|
train_data = np.load(data_home/f'{file_prefix}_train_out.npz')
|
|
train_X = train_data[modality]
|
|
train_y = train_data['targets']
|
|
|
|
# Load validation data
|
|
val_data = np.load(data_home/f'{file_prefix}_val_out.npz')
|
|
val_X = val_data[modality]
|
|
val_y = val_data['targets']
|
|
|
|
# Load test data
|
|
test_data = np.load(data_home/f'{file_prefix}_test_out.npz')
|
|
test_X = test_data[modality]
|
|
test_y = test_data['targets']
|
|
|
|
train = LabelledCollection(train_X, train_y)
|
|
val = LabelledCollection(val_X, val_y, classes=train.classes_)
|
|
test = LabelledCollection(test_X, test_y, classes=train.classes_)
|
|
|
|
def show_prev_stats(data:LabelledCollection):
|
|
p = data.prevalence()
|
|
return f'prevs in [{min(p)*100:.3f}%, {max(p)*100:.3f}%]'
|
|
print(f'loaded {dataset} ({modality=}): '
|
|
f'#train={len(train)}({show_prev_stats(train)}), '
|
|
f'#val={len(val)}({show_prev_stats(val)}), '
|
|
f'#test={len(test)}({show_prev_stats(test)}), '
|
|
f'#features={train_X.shape[1]}, '
|
|
f'#classes={len(set(train_y))}')
|
|
|
|
return train, val, test
|
|
|
|
|
|
def fetchMNIST(modality, data_home='./data'):
|
|
return fetchVisual(modality, dataset='mnist', net='basiccnn', data_home=data_home)
|
|
|
|
def fetchCIFAR100coarse(modality, data_home='./data'):
|
|
return fetchVisual(modality, dataset='cifar100coarse', net='resnet18', data_home=data_home)
|
|
|
|
def fetchCIFAR100(modality, data_home='./data'):
|
|
return fetchVisual(modality, dataset='cifar100', net='resnet18', data_home=data_home)
|
|
|
|
def fetchCIFAR10(modality, data_home='./data'):
|
|
return fetchVisual(modality, dataset='cifar10', net='resnet18', data_home=data_home)
|
|
|
|
def fetchFashionMNIST(modality, data_home='./data'):
|
|
return fetchVisual(modality, dataset='fashionmnist', net='basiccnn', data_home=data_home)
|
|
|
|
def fetchSVHN(modality, data_home='./data'):
|
|
return fetchVisual(modality, dataset='svhn', net='resnet18', data_home=data_home)
|
|
|
|
|
|
|
|
class DatasetHandler(ABC):
|
|
|
|
def __init__(self, name):
|
|
self.name = name
|
|
|
|
@classmethod
|
|
def get_defaults(cls):
|
|
sig = inspect.signature(cls.__init__)
|
|
|
|
defaults = {
|
|
name: param.default
|
|
for name, param in sig.parameters.items()
|
|
if param.default is not inspect.Parameter.empty
|
|
}
|
|
|
|
return defaults
|
|
|
|
@abstractmethod
|
|
def get_training(self): ...
|
|
|
|
@abstractmethod
|
|
def get_train_testprot_for_eval(self): ...
|
|
|
|
@abstractmethod
|
|
def get_train_valprot_for_modsel(self): ...
|
|
|
|
@classmethod
|
|
@abstractmethod
|
|
def get_datasets(cls): ...
|
|
|
|
@classmethod
|
|
def iter(cls, **kwargs):
|
|
for name in cls.get_datasets():
|
|
yield cls(name, **kwargs)
|
|
|
|
def __repr__(self):
|
|
return f'{self.__class__.__name__}({self.name})'
|
|
|
|
@classmethod
|
|
@abstractmethod
|
|
def is_binary(self): ...
|
|
|
|
|
|
class VisualDataHandler(DatasetHandler):
|
|
|
|
def __init__(self, name, n_val_samples=100, n_test_samples=100, sample_size=500, random_state=0):
|
|
# mode features : feature-based, the idea is to learn a LogisticRegression on top
|
|
# mode predictions : posterior probabilities
|
|
# assert modality in ['features', 'predictions'], f'unknown {modality=}'
|
|
super().__init__(name=name)
|
|
modality = 'predictions'
|
|
if name.endswith('-f'):
|
|
modality = 'features'
|
|
elif name.endswith('-l'):
|
|
modality = 'logits'
|
|
self.modality = modality
|
|
self.n_val_samples = n_val_samples
|
|
self.n_test_samples = n_test_samples
|
|
self.sample_size = sample_size
|
|
self.random_state = random_state
|
|
|
|
@lru_cache(maxsize=None)
|
|
def dataset(self):
|
|
name = self.name.lower()
|
|
name = name.replace('-f', '')
|
|
name = name.replace('-l', '')
|
|
|
|
if name=='mnist':
|
|
data = fetchMNIST(modality=self.modality)
|
|
elif name=='cifar100coarse':
|
|
data = fetchCIFAR100coarse(modality=self.modality)
|
|
elif name=='cifar100':
|
|
data = fetchCIFAR100(modality=self.modality)
|
|
elif name=='cifar10':
|
|
data = fetchCIFAR10(modality=self.modality)
|
|
elif name=='fashionmnist':
|
|
data = fetchFashionMNIST(modality=self.modality)
|
|
elif name=='svhn':
|
|
data = fetchSVHN(modality=self.modality)
|
|
else:
|
|
raise ValueError(f'unknown dataset {name}')
|
|
|
|
# the training set was used to extract features;
|
|
# we use the validation portion as a training set for quantifiers
|
|
net_train, val, test = data
|
|
train, val = val.split_stratified(train_prop=0.6, random_state=self.random_state)
|
|
return train, val, test
|
|
|
|
def get_training(self):
|
|
train, val, test = self.dataset()
|
|
return train
|
|
|
|
def get_validation(self):
|
|
train, val, test = self.dataset()
|
|
return val
|
|
|
|
def get_train_testprot_for_eval(self):
|
|
train, val, test = self.dataset()
|
|
test_prot = UPP(test, sample_size=self.sample_size, repeats=self.n_test_samples, random_state=self.random_state)
|
|
return train+val, test_prot
|
|
|
|
def get_train_valprot_for_modsel(self):
|
|
train, val, test = self.dataset()
|
|
val_prot = UPP(val, sample_size=self.sample_size, repeats=self.n_val_samples, random_state=self.random_state)
|
|
return train, val_prot
|
|
|
|
@classmethod
|
|
def get_datasets(cls):
|
|
datasets = ['cifar100coarse', 'cifar10', 'mnist', 'fashionmnist', 'svhn'] #+ ['cifar100']
|
|
# datasets_feat = [f'{d}-f' for d in datasets]
|
|
datasets_feat = [f'{d}-l' for d in datasets]
|
|
return datasets_feat # + datasets
|
|
|
|
|
|
@classmethod
|
|
def iter(cls, **kwargs):
|
|
for name in cls.get_datasets():
|
|
yield cls(name, **kwargs)
|
|
|
|
def __repr__(self):
|
|
return f'{self.name}'
|
|
|
|
@classmethod
|
|
def is_binary(self):
|
|
return False
|
|
|
|
|
|
class CIFAR100Handler(VisualDataHandler):
|
|
def __init__(self, name, n_val_samples=100, n_test_samples=100, sample_size=2000, random_state=0):
|
|
super().__init__(name=name, n_val_samples=n_val_samples, n_test_samples=n_test_samples, sample_size=sample_size, random_state=random_state)
|
|
|
|
@classmethod
|
|
def get_datasets(cls):
|
|
datasets = ['cifar100']
|
|
# datasets_feat = [f'{d}-f' for d in datasets]
|
|
datasets_feat = [f'{d}-l' for d in datasets]
|
|
return datasets_feat # + datasets
|
|
|
|
|
|
# LeQua multiclass tasks
|
|
class LeQuaHandler(DatasetHandler):
|
|
|
|
DATASETS = ['LeQua2022', 'LeQua2024']
|
|
|
|
def __init__(self, name):
|
|
super().__init__(name)
|
|
self.sample_size = 1000
|
|
|
|
def get_training(self):
|
|
return self.dataset()[0]
|
|
|
|
def get_train_testprot_for_eval(self):
|
|
training, _, test_generator = self.dataset()
|
|
return training, test_generator
|
|
|
|
def get_train_valprot_for_modsel(self):
|
|
training, val_generator, _ = self.dataset()
|
|
return training, val_generator
|
|
|
|
@lru_cache(maxsize=None)
|
|
def dataset(self):
|
|
if self.name=='LeQua2022':
|
|
return qp.datasets.fetch_lequa2022(task='T1B')
|
|
elif self.name=='LeQua2024':
|
|
return qp.datasets.fetch_lequa2024(task='T2')
|
|
else:
|
|
raise ValueError(f'unexpected dataset name {self.name}; valid ones are {self.DATASETS}')
|
|
|
|
def __repr__(self):
|
|
return self.name
|
|
|
|
@classmethod
|
|
def iter(cls):
|
|
for name in cls.DATASETS:
|
|
yield cls(name)
|
|
|
|
@classmethod
|
|
def is_binary(self):
|
|
return False
|
|
|
|
@classmethod
|
|
def get_datasets(cls):
|
|
return cls.DATASETS
|
|
|
|
|
|
class UCIDatasetHandler(DatasetHandler, ABC):
|
|
DATASETS = []
|
|
|
|
def __init__(self, name, n_val_samples, n_test_samples, sample_size, random_state):
|
|
super().__init__(name=name)
|
|
self.sample_size = sample_size
|
|
self.n_val_samples = n_val_samples
|
|
self.n_test_samples = n_test_samples
|
|
self.random_state = random_state
|
|
|
|
def get_training(self):
|
|
return self.dataset().training
|
|
|
|
def get_train_testprot_for_eval(self):
|
|
training, test = self.dataset().train_test
|
|
test_generator = qp.protocol.UPP(test, sample_size=self.sample_size, repeats=self.n_test_samples,
|
|
random_state=self.random_state)
|
|
return training, test_generator
|
|
|
|
def get_train_valprot_for_modsel(self):
|
|
training = self.dataset().training
|
|
training, val = training.split_stratified(train_prop=0.6, random_state=self.random_state)
|
|
val_generator = qp.protocol.UPP(val, sample_size=self.sample_size, repeats=self.n_val_samples,
|
|
random_state=self.random_state)
|
|
return training, val_generator
|
|
|
|
@classmethod
|
|
def get_datasets(cls):
|
|
return cls.DATASETS
|
|
|
|
@classmethod
|
|
def iter(cls):
|
|
for name in cls.DATASETS:
|
|
yield cls(name)
|
|
|
|
|
|
class UCIMulticlassHandler(UCIDatasetHandler):
|
|
|
|
DATASETS = sorted([d for d in qp.datasets.UCI_MULTICLASS_DATASETS if d not in frozenset(['hcv', 'poker_hand'])])
|
|
|
|
def __init__(self, name, n_val_samples=100, n_test_samples=100, sample_size=1000, random_state=0):
|
|
super().__init__(name, n_val_samples, n_test_samples, sample_size, random_state)
|
|
|
|
@lru_cache(maxsize=None)
|
|
def dataset(self):
|
|
return qp.datasets.fetch_UCIMulticlassDataset(self.name, min_class_support=0.01)
|
|
|
|
def __repr__(self):
|
|
return f'{self.name}(UCI-multiclass)'
|
|
|
|
@classmethod
|
|
def is_binary(self):
|
|
return False
|
|
|
|
|
|
class UCIBinaryHandler(DatasetHandler):
|
|
|
|
DATASETS = qp.datasets.UCI_BINARY_DATASETS.copy()
|
|
|
|
def __init__(self, name, n_val_samples=100, n_test_samples=100, sample_size=500, random_state=0):
|
|
super().__init__(name, n_val_samples, n_test_samples, sample_size, random_state)
|
|
|
|
@lru_cache(maxsize=None)
|
|
def dataset(self):
|
|
return qp.datasets.fetch_UCIBinaryDataset(self.name)
|
|
|
|
def __repr__(self):
|
|
return f'{self.name}(UCI-binary)'
|
|
|
|
|
|
@classmethod
|
|
def is_binary(self):
|
|
return True
|
|
|
|
|
|
if __name__ == '__main__':
|
|
train, val, test = fetchMNIST(modality='predictions')
|
|
cls = MockClassifierFromPosteriors()
|
|
cls.fit(*train.Xy)
|
|
# q = KDEyML(classifier=cls, fit_classifier=False)
|
|
# q = PACC(classifier=cls, fit_classifier=False)
|
|
q = EMQ(classifier=cls, fit_classifier=False)
|
|
q.fit(*val.Xy)
|
|
test_prot = UPP(test, repeats=100, sample_size=500)
|
|
report = qp.evaluation.evaluation_report(q, test_prot, verbose=True)
|
|
print(report.mean(numeric_only=True)) |