import os.path from typing import List, Tuple, Union import pandas as pd import quapy as qp import numpy as np import sklearn import re from glob import glob import constants def load_category_map(path): cat2code = {} with open(path, 'rt') as fin: for line in fin: category, code = line.split() cat2code[category] = int(code) code2cat = [cat for cat, code in sorted(cat2code.items(), key=lambda x:x[1])] return cat2code, code2cat def load_raw_documents(path, vectorizer=None): df = pd.read_csv(path) documents = list(df["text"].values) if vectorizer: documents = vectorizer.transform(documents) labels = None if "label" in df.columns: labels = df["label"].values.astype(np.int) return documents, labels def load_vector_documents(path): D = pd.read_csv(path).to_numpy(dtype=np.float) labelled = D.shape[1] == 301 if labelled: X, y = D[:,1:], D[:,0].astype(np.int).flatten() else: X, y = D, None return X, y def __gen_load_samples_with_groudtruth(path_dir:str, return_id:bool, ground_truth_path:str, load_fn, **load_kwargs): true_prevs = ResultSubmission.load(ground_truth_path) for id, prevalence in true_prevs.iterrows(): sample, _ = load_fn(os.path.join(path_dir, f'{id}.txt'), **load_kwargs) yield (id, sample, prevalence) if return_id else (sample, prevalence) def __gen_load_samples_without_groudtruth(path_dir:str, return_id:bool, load_fn, **load_kwargs): nsamples = len(glob(os.path.join(path_dir, f'*.txt'))) for id in range(nsamples): sample, _ = load_fn(os.path.join(path_dir, f'{id}.txt'), **load_kwargs) yield (id, sample) if return_id else sample def gen_load_samples(path_dir:str, ground_truth_path:str = None, return_id=False, load_fn=load_vector_documents, **load_kwargs): if ground_truth_path is None: # the generator function returns tuples (docid:str, sample:csr_matrix or str) gen_fn = __gen_load_samples_without_groudtruth(path_dir, return_id, load_fn, **load_kwargs) else: # the generator function returns tuples (docid:str, sample:csr_matrix or str, prevalence:ndarray) gen_fn = __gen_load_samples_with_groudtruth(path_dir, return_id, ground_truth_path, load_fn, **load_kwargs) for r in gen_fn: yield r class ResultSubmission: def __init__(self): self.df = None def __init_df(self, categories:int): if not isinstance(categories, int) or categories < 2: raise TypeError('wrong format for categories: an int (>=2) was expected') df = pd.DataFrame(columns=list(range(categories))) df.index.set_names('id', inplace=True) self.df = df @property def n_categories(self): return len(self.df.columns.values) def add(self, sample_id:int, prevalence_values:np.ndarray): if not isinstance(sample_id, int): raise TypeError(f'error: expected int for sample_sample, found {type(sample_id)}') if not isinstance(prevalence_values, np.ndarray): raise TypeError(f'error: expected np.ndarray for prevalence_values, found {type(prevalence_values)}') if self.df is None: self.__init_df(categories=len(prevalence_values)) if sample_id in self.df.index.values: raise ValueError(f'error: prevalence values for "{sample_id}" already added') if prevalence_values.ndim!=1 and prevalence_values.size != self.n_categories: raise ValueError(f'error: wrong shape found for prevalence vector {prevalence_values}') if (prevalence_values<0).any() or (prevalence_values>1).any(): raise ValueError(f'error: prevalence values out of range [0,1] for "{sample_id}"') if np.abs(prevalence_values.sum()-1) > constants.ERROR_TOL: raise ValueError(f'error: prevalence values do not sum up to one for "{sample_id}"' f'(error tolerance {constants.ERROR_TOL})') self.df.loc[sample_id] = prevalence_values def __len__(self): return len(self.df) @classmethod def load(cls, path: str) -> 'ResultSubmission': df = ResultSubmission.check_file_format(path) r = ResultSubmission() r.df = df return r def dump(self, path:str): ResultSubmission.check_dataframe_format(self.df) self.df.to_csv(path) def prevalence(self, sample_id:int): sel = self.df.loc[sample_id] if sel.empty: return None else: return sel.values.flatten() def iterrows(self): for index, row in self.df.iterrows(): prevalence = row.values.flatten() yield index, prevalence @classmethod def check_file_format(cls, path) -> Union[pd.DataFrame, Tuple[pd.DataFrame, str]]: try: df = pd.read_csv(path, index_col=0) except Exception as e: print(f'the file {path} does not seem to be a valid csv file. ') print(e) return ResultSubmission.check_dataframe_format(df, path=path) @classmethod def check_dataframe_format(cls, df, path=None) -> Union[pd.DataFrame, Tuple[pd.DataFrame, str]]: hint_path = '' # if given, show the data path in the error message if path is not None: hint_path = f' in {path}' if df.index.name != 'id' or len(df.columns) < 2: raise ValueError(f'wrong header{hint_path}, ' f'the format of the header should be "id,0,...,n-1", ' f'where n is the number of categories') if [int(ci) for ci in df.columns.values] != list(range(len(df.columns))): raise ValueError(f'wrong header{hint_path}, category ids should be 0,1,2,...,n-1, ' f'where n is the number of categories') if df.empty: raise ValueError(f'error{hint_path}: results file is empty') elif len(df) != constants.DEV_SAMPLES and len(df) != constants.TEST_SAMPLES: raise ValueError(f'wrong number of prevalence values found{hint_path}; ' f'expected {constants.DEV_SAMPLES} for development sets and ' f'{constants.TEST_SAMPLES} for test sets; found {len(df)}') ids = set(df.index.values) expected_ids = set(range(len(df))) if ids != expected_ids: missing = expected_ids - ids if missing: raise ValueError(f'there are {len(missing)} missing ids{hint_path}: {sorted(missing)}') unexpected = ids - expected_ids if unexpected: raise ValueError(f'there are {len(missing)} unexpected ids{hint_path}: {sorted(unexpected)}') for category_id in df.columns: if (df[category_id] < 0).any() or (df[category_id] > 1).any(): raise ValueError(f'error{hint_path} column "{category_id}" contains values out of range [0,1]') prevs = df.values round_errors = np.abs(prevs.sum(axis=-1) - 1.) > constants.ERROR_TOL if round_errors.any(): raise ValueError(f'warning: prevalence values in rows with id {np.where(round_errors)[0].tolist()} ' f'do not sum up to 1 (error tolerance {constants.ERROR_TOL}), ' f'probably due to some rounding errors.') return df def evaluate_submission(true_prevs: ResultSubmission, predicted_prevs: ResultSubmission, sample_size=None, average=True): if sample_size is None: if qp.environ['SAMPLE_SIZE'] is None: raise ValueError('Relative Absolute Error cannot be computed: ' 'neither sample_size nor qp.environ["SAMPLE_SIZE"] have been specified') else: sample_size = qp.environ['SAMPLE_SIZE'] if len(true_prevs) != len(predicted_prevs): raise ValueError(f'size mismatch, ground truth file has {len(true_prevs)} entries ' f'while the file of predictions contain {len(predicted_prevs)} entries') if true_prevs.n_categories != predicted_prevs.n_categories: raise ValueError(f'these result files are not comparable since the categories are different: ' f'true={true_prevs.n_categories} categories vs. ' f'predictions={predicted_prevs.n_categories} categories') rae, ae = [], [] for sample_id, true_prevalence in true_prevs.iterrows(): pred_prevalence = predicted_prevs.prevalence(sample_id) rae.append(qp.error.rae(true_prevalence, pred_prevalence, eps=1./(2*sample_size))) ae.append(qp.error.ae(true_prevalence, pred_prevalence)) rae = np.asarray(rae) ae = np.asarray(ae) if average: return rae.mean(), ae.mean() else: return rae, ae