import itertools import os from copy import deepcopy from os.path import join from dataclasses import dataclass from typing import List, Union, Callable from abc import ABC, abstractmethod import numpy as np import pandas as pd import pickle from sklearn.linear_model import LogisticRegression import quapy as qp from quapy.data import LabelledCollection from quapy.method.aggregative import PACC from quapy.protocol import APP, UPP, AbstractProtocol from quapy.model_selection import GridSearchQ from quapy.method.base import BaseQuantifier from result_table.src.table import Table def makedirs(dir): print('creating ', dir) os.makedirs(dir, exist_ok=True) @dataclass class MethodDescriptor: id: str name: str instance: BaseQuantifier hyperparams: dict class Benchmark(ABC): ID_SEPARATOR = '__' # used to separate components in a run-ID, cannot be used within the component IDs def __init__(self, home_dir, n_jobs=3): self.home_dir = home_dir self.n_jobs = n_jobs assert n_jobs!=-1, ('Setting n_jobs=-1 will probably blow your memory. ' 'Specify a positive number.') makedirs(home_dir) makedirs(join(home_dir, 'results')) makedirs(join(home_dir, 'params')) makedirs(join(home_dir, 'tables')) makedirs(join(home_dir, 'plots')) def _run_id(self, method: MethodDescriptor, dataset: str): sep = Benchmark.ID_SEPARATOR assert sep not in method.id, \ (f'separator {sep} cannot be used in method ID ({method.id}), ' f'please change the method ID or redefine {Benchmark.ID_SEPARATOR=}') assert sep not in dataset, \ (f'separator {sep} cannot be used in dataset name ({dataset}), ' f'please redefine {Benchmark.ID_SEPARATOR=}') return sep.join([method.id, dataset]) def _result_path(self, method: MethodDescriptor, dataset: str): id = self._run_id(method, dataset) return join(self.home_dir, 'results', id + '.pkl') def _params_path(self, method: MethodDescriptor, dataset: str): id = self._run_id(method, dataset) chosen = join(self.home_dir, 'params', id + 'chosen.pkl') scores = join(self.home_dir, 'params', id + 'scores.pkl') return chosen, scores def _exist_run(self, method: MethodDescriptor, dataset: str): return os.path.exists(self._result_path(method, dataset)) def _open_method_dataset_result(self, method: MethodDescriptor, dataset: str): if not self._exist_run(method, dataset): raise ValueError(f'cannot open result for method={method.id} and {dataset=}') def check_dataset(self, dataset:str): assert dataset in self.list_datasets(), f'unknown dataset {dataset}' @abstractmethod def list_datasets(self)-> List[str]: ... @abstractmethod def run_method_dataset(self, method: MethodDescriptor, dataset:str, random_state=0)-> pd.DataFrame: ... def gen_tables(self, results, metrics=None): if metrics is None: metrics = ['mae', 'mrae', 'mkld', 'mnkld'] tables = {} for (method, dataset, result) in results: col_metrics = result.columns.values[2:] for metric in metrics: if metric not in col_metrics: print(f'error; requested {metric=} not found among the columns in the dataframe') continue if metric not in tables: tables[metric] = Table(name=metric) table = tables[metric] table.add(dataset, method.name, result[metric].values) Table.LatexPDF(join(self.home_dir, 'tables', 'results.pdf'), list(tables.values())) def gen_plots(self): pass def show_report(self, method, dataset, report: pd.DataFrame): id = method.id MAE = report['mae'].mean() mae_std = report['mae'].std() MRAE = report['mrae'].mean() mrae_std = report['mrae'].std() print(f'{id}\t{dataset}:\t{MAE=:.4f}+-{mae_std:.4f}\t{MRAE=:.4f}+-{mrae_std:.4f}') def run(self, methods: Union[List[MethodDescriptor], MethodDescriptor], datasets:Union[List[str],str]=None, force=False): if not isinstance(methods, list): methods = [methods] if datasets is None: datasets = self.list_datasets() elif not isinstance(datasets, list): datasets = [datasets] results = [] pending_job_args = [] for method, dataset in itertools.product(methods, datasets): self.check_dataset(dataset) if not force and self._exist_run(method, dataset): result = pd.read_pickle(self._result_path(method, dataset)) results.append((method, dataset, result)) else: pending_job_args.append((method, dataset)) if len(pending_job_args)>0: remaining_results = qp.util.parallel_unpack( func=self.run_method_dataset, args=pending_job_args, n_jobs=self.n_jobs, seed=0, asarray=False ) results += [(method, dataset, result) for (method, dataset), result in zip(pending_job_args, remaining_results)] # print results for method, dataset, result in results: self.show_report(method, dataset, result) self.gen_tables(results) self.gen_plots() # def gen_plots(self, methods=None): # if methods is None: def __add__(self, other: 'Benchmark'): return CombinedBenchmark(self, other, self.n_jobs) class CombinedBenchmark(Benchmark): def __init__(self, benchmark_a:Benchmark, benchmark_b:Benchmark, n_jobs=-1): self.router = { **{dataset: benchmark_a for dataset in benchmark_a.list_datasets()}, **{dataset: benchmark_b for dataset in benchmark_b.list_datasets()} } self.datasets = benchmark_a.list_datasets() + benchmark_b.list_datasets() self.n_jobs = n_jobs def list_datasets(self) -> List[str]: return self.datasets def run_method_dataset(self, method: MethodDescriptor, dataset:str, random_state=0) -> pd.DataFrame: return self.router[dataset].run_method_dataset(method, dataset, random_state) def _exist_run(self, method: MethodDescriptor, dataset: str): return self.router[dataset]._exist_run(method, dataset) class TypicalBenchmark(Benchmark): # def __init__(self, home_dir, ): @abstractmethod def get_sample_size(self)-> int: ... @abstractmethod def get_trModsel_valprotModsel_trEval_teprotEval(self, dataset:str)->\ (LabelledCollection, AbstractProtocol, LabelledCollection, AbstractProtocol): ... @abstractmethod def get_target_error_for_modsel(self)-> Union[str, Callable]: ... def run_method_dataset(self, method: MethodDescriptor, dataset: str, random_state=0) -> pd.DataFrame: print(f'Running method={method.id} in {dataset=}') sample_size = self.get_sample_size() qp.environ['SAMPLE_SIZE'] = sample_size q = deepcopy(method.instance) optim_for = self.get_target_error_for_modsel() with qp.util.temp_seed(random_state): # data split trModSel, valprotModSel, trEval, teprotEval = self.get_trModsel_valprotModsel_trEval_teprotEval(dataset) # model selection modsel = GridSearchQ( model=q, param_grid=method.hyperparams, protocol=valprotModSel, error=optim_for, refit=False, n_jobs=-1, raise_errors=True, verbose=True ).fit(trModSel) # fit on the whole training data optimized_model = modsel.best_model_ optimized_model.fit(trEval) # evaluation report = qp.evaluation.evaluation_report( model=optimized_model, protocol=teprotEval, error_metrics=qp.error.QUANTIFICATION_ERROR_NAMES ) # data persistence chosen_path, scores_path = self._params_path(method, dataset) pickle.dump(modsel.best_params_, open(chosen_path, 'wb'), pickle.HIGHEST_PROTOCOL) pickle.dump(modsel.param_scores_, open(scores_path, 'wb'), pickle.HIGHEST_PROTOCOL) result_path = self._result_path(method, dataset) report.to_pickle(result_path) return report class UCIBinaryBenchmark(TypicalBenchmark): def get_trModsel_valprotModsel_trEval_teprotEval(self, dataset: str) -> \ (LabelledCollection, LabelledCollection, LabelledCollection, LabelledCollection): data = qp.datasets.fetch_UCIBinaryDataset(dataset) trEval, teEval = data.train_test trModsel, vaModsel = trEval.split_stratified() valprotModsel = APP(vaModsel, n_prevalences=21, repeats=25) testprotModsel = APP(teEval, n_prevalences=21, repeats=100) return trModsel, valprotModsel, trEval, testprotModsel def get_sample_size(self) -> int: return 100 def get_target_error_for_modsel(self) -> Union[str, Callable]: return 'mae' def list_datasets(self)->List[str]: ignore = ['acute.a', 'acute.b', 'balance.2'] return [d for d in qp.datasets.UCI_BINARY_DATASETS if d not in ignore] class UCIMultiBenchmark(TypicalBenchmark): def list_datasets(self) -> List[str]: return qp.datasets.UCI_MULTICLASS_DATASETS def get_trModsel_valprotModsel_trEval_teprotEval(self, dataset: str) -> \ (LabelledCollection, LabelledCollection, LabelledCollection, LabelledCollection): data = qp.datasets.fetch_UCIMulticlassDataset(dataset) trEval, teEval = data.train_test trModsel, vaModsel = trEval.split_stratified() valprotModsel = UPP(vaModsel, repeats=250) testprotModsel = UPP(teEval, repeats=1000) return trModsel, valprotModsel, trEval, testprotModsel def get_sample_size(self) -> int: return 500 def get_target_error_for_modsel(self) -> Union[str, Callable]: return 'mae' if __name__ == '__main__': from quapy.benchmarking.typical import * # from quapy.method.aggregative import BayesianCC # bayes = MethodDescriptor( # id='Bayesian', # name='Bayesian(LR)', # instance=BayesianCC(LogisticRegression()), # hyperparams=wrap_cls_params(lr_hyper) # ) # bench_bin = UCIBinaryBenchmark('../../Benchmarks/UCIbinary') bench_multi = UCIMultiBenchmark('../../Benchmarks/UCIMulti') # bench = bench_bin + bench_multi bench = bench_multi bench.run(methods=[cc, pcc, acc, pacc, sld, sld_bcts])