from pathlib import Path from typing import List, Tuple import numpy as np import pandas as pd from quacc import plot from quacc.environment import env from quacc.utils import fmt_line_md def _get_metric(metric: str): return slice(None) if metric is None else metric def _get_estimators(estimators: List[str], cols: np.ndarray): return slice(None) if estimators is None else cols[np.in1d(cols, estimators)] class EvaluationReport: def __init__(self, name=None): self.data: pd.DataFrame = None self.fit_score = None self.name = name if name is not None else "default" def append_row(self, basep: np.ndarray | Tuple, **row): bp = basep[1] _keys, _values = zip(*row.items()) # _keys = list(row.keys()) # _values = list(row.values()) if self.data is None: _idx = 0 self.data = pd.DataFrame( {k: [v] for k, v in row.items()}, index=pd.MultiIndex.from_tuples([(bp, _idx)]), columns=_keys, ) return _idx = len(self.data.loc[(bp,), :]) if (bp,) in self.data.index else 0 not_in_data = np.setdiff1d(list(row.keys()), self.data.columns.unique(0)) self.data.loc[:, not_in_data] = np.nan self.data.loc[(bp, _idx), :] = row return @property def columns(self) -> np.ndarray: return self.data.columns.unique(0) @property def prevs(self): return np.sort(self.data.index.unique(0)) class CompReport: def __init__( self, reports: List[EvaluationReport], name="default", train_prev=None, valid_prev=None, times=None, ): self._data = ( pd.concat( [er.data for er in reports], keys=[er.name for er in reports], axis=1, ) .swaplevel(0, 1, axis=1) .sort_index(axis=1, level=0, sort_remaining=False) .sort_index(axis=0, level=0) ) self.fit_scores = { er.name: er.fit_score for er in reports if er.fit_score is not None } self.train_prev = train_prev self.valid_prev = valid_prev self.times = times @property def prevs(self) -> np.ndarray: return np.sort(self._data.index.unique(0)) @property def np_prevs(self) -> np.ndarray: return np.around([(1.0 - p, p) for p in self.prevs], decimals=2) def data(self, metric: str = None, estimators: List[str] = None) -> pd.DataFrame: _metric = _get_metric(metric) _estimators = _get_estimators(estimators, self._data.columns.unique(1)) f_data: pd.DataFrame = self._data.copy().loc[:, (_metric, _estimators)] if len(f_data.columns.unique(0)) == 1: f_data = f_data.droplevel(level=0, axis=1) return f_data def shift_data( self, metric: str = None, estimators: List[str] = None ) -> pd.DataFrame: shift_idx_0 = np.around( np.abs( self._data.index.get_level_values(0).to_numpy() - self.train_prev[1] ), decimals=2, ) shift_idx_1 = np.empty(shape=shift_idx_0.shape, dtype=" pd.DataFrame: f_dict = self.data(metric=metric, estimators=estimators) return f_dict.groupby(level=0).mean() def stdev_by_prevs( self, metric: str = None, estimators: List[str] = None ) -> pd.DataFrame: f_dict = self.data(metric=metric, estimators=estimators) return f_dict.groupby(level=0).std() def table(self, metric: str = None, estimators: List[str] = None) -> pd.DataFrame: f_data = self.data(metric=metric, estimators=estimators) avg_p = f_data.groupby(level=0).mean() avg_p.loc["avg", :] = f_data.mean() return avg_p def get_plots( self, mode="delta", metric="acc", estimators=None, conf="default", stdev=False ) -> List[Tuple[str, Path]]: if mode == "delta": avg_data = self.avg_by_prevs(metric=metric, estimators=estimators) return plot.plot_delta( base_prevs=self.np_prevs, columns=avg_data.columns.to_numpy(), data=avg_data.T.to_numpy(), metric=metric, name=conf, train_prev=self.train_prev, ) elif mode == "delta_stdev": avg_data = self.avg_by_prevs(metric=metric, estimators=estimators) st_data = self.stdev_by_prevs(metric=metric, estimators=estimators) return plot.plot_delta( base_prevs=self.np_prevs, columns=avg_data.columns.to_numpy(), data=avg_data.T.to_numpy(), metric=metric, name=conf, train_prev=self.train_prev, stdevs=st_data.T.to_numpy(), ) elif mode == "diagonal": f_data = self.data(metric=metric + "_score", estimators=estimators) ref: pd.Series = f_data.loc[:, "ref"] f_data.drop(columns=["ref"], inplace=True) return plot.plot_diagonal( reference=ref.to_numpy(), columns=f_data.columns.to_numpy(), data=f_data.T.to_numpy(), metric=metric, name=conf, train_prev=self.train_prev, ) elif mode == "shift": shift_data = ( self.shift_data(metric=metric, estimators=estimators) .groupby(level=0) .mean() ) shift_prevs = np.around( [(1.0 - p, p) for p in np.sort(shift_data.index.unique(0))], decimals=2, ) return plot.plot_shift( shift_prevs=shift_prevs, columns=shift_data.columns.to_numpy(), data=shift_data.T.to_numpy(), metric=metric, name=conf, train_prev=self.train_prev, ) def to_md(self, conf="default", metric="acc", estimators=None, stdev=False) -> str: res = f"## {int(np.around(self.train_prev, decimals=2)[1]*100)}% positives\n" res += fmt_line_md(f"train: {str(self.train_prev)}") res += fmt_line_md(f"validation: {str(self.valid_prev)}") for k, v in self.times.items(): res += fmt_line_md(f"{k}: {v:.3f}s") res += "\n" res += self.table(metric=metric, estimators=estimators).to_html() + "\n\n" plot_modes = np.array(["delta", "diagonal", "shift"], dtype="object") if stdev: whd = np.where(plot_modes == "delta")[0] if len(whd) > 0: plot_modes = np.insert(plot_modes, whd + 1, "delta_stdev") for mode in plot_modes: op = self.get_plots( mode=mode, metric=metric, estimators=estimators, conf=conf, stdev=stdev, ) res += f"![plot_{mode}]({op.relative_to(env.OUT_DIR).as_posix()})\n" return res class DatasetReport: def __init__(self, name, crs=None): self.name = name self.crs: List[CompReport] = [] if crs is None else crs def data(self, metric: str = None, estimators: str = None) -> pd.DataFrame: def _cr_train_prev(cr: CompReport): return cr.train_prev[1] def _cr_data(cr: CompReport): return cr.data(metric, estimators) _crs_sorted = sorted( [(_cr_train_prev(cr), _cr_data(cr)) for cr in self.crs], key=lambda cr: len(cr[1].columns), reverse=True, ) _crs_train, _crs_data = zip(*_crs_sorted) _data = pd.concat(_crs_data, axis=0, keys=_crs_train) _data = _data.sort_index(axis=0, level=0) return _data def shift_data(self, metric: str = None, estimators: str = None) -> pd.DataFrame: _shift_data: pd.DataFrame = pd.concat( sorted( [cr.shift_data(metric, estimators) for cr in self.crs], key=lambda d: len(d.columns), reverse=True, ), axis=0, ) shift_idx_0 = _shift_data.index.get_level_values(0) shift_idx_1 = np.empty(shape=shift_idx_0.shape, dtype=" 0: a = np.insert(a, whb + 1, "pippo") print(a) print("-" * 100) dff: pd.DataFrame = df.loc[:, ("a",)] print(dff.to_dict(orient="list")) dff = dff.drop(columns=["v"]) print(dff) s: pd.Series = dff.loc[:, "e"] print(s) print(s.to_numpy()) print(type(s.to_numpy())) print("-" * 100) df3 = pd.concat([df, df], axis=0, keys=[0.5, 0.3]).sort_index(axis=0, level=0) print(df3) df3n = pd.concat([df, df], axis=0).sort_index(axis=0, level=0) print(df3n) df = df3 print("-" * 100) print(df.groupby(level=1).mean(), df.groupby(level=1).count()) print("-" * 100) print(df) for ls in df.T.to_numpy(): print(ls) print("-" * 100) if __name__ == "__main__": __test()