forked from moreo/QuaPy
91 lines
2.3 KiB
Python
91 lines
2.3 KiB
Python
import numpy as np
|
|
import pandas as pd
|
|
from sklearn.preprocessing import StandardScaler
|
|
|
|
np.set_printoptions(linewidth=np.inf)
|
|
|
|
|
|
def load_csv(file, use_yhat=True):
|
|
df = pd.read_csv(file)
|
|
|
|
cod_area = 'cod.prov'
|
|
if use_yhat:
|
|
covariates = ['owner', 'eta', 'work', 'sex', 'year_edu', 'hsize', 'y.hat', 'prob']
|
|
else:
|
|
covariates = ['owner', 'eta', 'work', 'sex', 'year_edu', 'hsize', 'prob']
|
|
y_true = 'y.true'
|
|
|
|
X = df[covariates].values
|
|
A = df[cod_area].values
|
|
|
|
for i, cov in enumerate(covariates):
|
|
print(f'values of col {i} "{cov}" {np.unique(X[:,i])}')
|
|
|
|
if y_true in df.columns:
|
|
y = df[y_true].values
|
|
return A, X, y
|
|
else:
|
|
return A, X
|
|
|
|
|
|
def get_dataset_by_area(A, X, y=None):
|
|
data = []
|
|
for area in np.unique(A):
|
|
sel = (A == area)
|
|
Xsel = X[sel]
|
|
if y is not None:
|
|
ysel = y[sel]
|
|
else:
|
|
ysel = None
|
|
data.append((area, Xsel, ysel))
|
|
return data
|
|
|
|
|
|
class AdjMatrix:
|
|
|
|
def __init__(self, path):
|
|
df = pd.read_csv(path)
|
|
|
|
area_codes = df.columns[1:].values
|
|
area_codes = np.asarray([int(c) for c in area_codes])
|
|
|
|
values = df.values[:, 1:]
|
|
print(area_codes)
|
|
print(values)
|
|
self.area2idx = {area:i for i, area in enumerate(area_codes)}
|
|
self.idx2area = area_codes
|
|
self.M = np.asarray(values)
|
|
|
|
def adjacent(self, cod_1, cod_2):
|
|
idx1 = self.area2idx[cod_1]
|
|
idx2 = self.area2idx[cod_2]
|
|
return (self.M[idx1, idx2] == 1)
|
|
|
|
def get_adjacent(self, cod):
|
|
idx = self.area2idx[cod]
|
|
idx_adj = np.argwhere(self.M[idx]==1).flatten()
|
|
return self.idx2area[idx_adj]
|
|
|
|
|
|
class Preprocessor:
|
|
def __init__(self):
|
|
self.scaler = StandardScaler()
|
|
# self.standardize_col_ids = np.asarray([1, 4, 5]) # eta, year_edu, hsize
|
|
self.standardize_col_ids = np.arange(8) # everything
|
|
|
|
def fit(self, X, y=None):
|
|
Xsel = X[:, self.standardize_col_ids]
|
|
self.scaler.fit(Xsel)
|
|
return self
|
|
|
|
def transform(self, X):
|
|
Xsel = X[:, self.standardize_col_ids]
|
|
Xsel_zscore = self.scaler.transform(Xsel)
|
|
X[:, self.standardize_col_ids] = Xsel_zscore
|
|
return X
|
|
|
|
def fit_transform(self, X, y=None):
|
|
return self.fit(X, y).transform(X)
|
|
|
|
|