forked from moreo/QuaPy
148 lines
6.1 KiB
Python
148 lines
6.1 KiB
Python
import gzip
|
|
import quapy as qp
|
|
from Ordinal.utils import load_simple_sample_raw
|
|
from quapy.data import LabelledCollection
|
|
import quapy.functional as F
|
|
import os
|
|
from os.path import join
|
|
from pathlib import Path
|
|
import numpy as np
|
|
|
|
|
|
datadir = '/media/moreo/Volume/Datasets/Amazon/reviews'
|
|
outdir = './data/'
|
|
real_prev_path = './data/Books-real-prevalence-by-product_votes1_reviews100.csv'
|
|
domain = 'Books'
|
|
seed = 7
|
|
|
|
tr_size = 20000
|
|
val_size = 1000
|
|
te_size = 1000
|
|
nval = 1000
|
|
nte = 5000
|
|
|
|
|
|
def from_text(path, encoding='utf-8', class2int=True):
|
|
"""
|
|
Reads a labelled colletion of documents.
|
|
File fomart <0-4>\t<document>\n
|
|
|
|
:param path: path to the labelled collection
|
|
:param encoding: the text encoding used to open the file
|
|
:return: a list of sentences, and a list of labels
|
|
"""
|
|
all_sentences, all_labels = [], []
|
|
file = open(path, 'rt', encoding=encoding).readlines()
|
|
for line in file:
|
|
line = line.strip()
|
|
if line:
|
|
try:
|
|
label, sentence = line.split('\t')
|
|
sentence = sentence.strip()
|
|
if class2int:
|
|
label = int(label)
|
|
if label >= 0:
|
|
if sentence:
|
|
all_sentences.append(sentence)
|
|
all_labels.append(label)
|
|
except ValueError:
|
|
print(f'format error in {line}')
|
|
return all_sentences, all_labels
|
|
|
|
|
|
def write_txt_sample(sample: LabelledCollection, path):
|
|
os.makedirs(Path(path).parent, exist_ok=True)
|
|
with open(path, 'wt') as foo:
|
|
for document, label in zip(*sample.Xy):
|
|
foo.write(f'{label}\t{document}\n')
|
|
|
|
|
|
def gen_samples_APP(pool: LabelledCollection, nsamples, sample_size, outdir, prevpath):
|
|
os.makedirs(outdir, exist_ok=True)
|
|
with open(prevpath, 'wt') as prevfile:
|
|
prevfile.write('id,' + ','.join(f'{c}' for c in pool.classes_) + '\n')
|
|
for i, prev in enumerate(F.uniform_simplex_sampling(n_classes=pool.n_classes, size=nsamples)):
|
|
sample = pool.sampling(sample_size, *prev)
|
|
write_txt_sample(sample, join(outdir, f'{i}.txt'))
|
|
prevfile.write(f'{i},' + ','.join(f'{p:.3f}' for p in sample.prevalence()) + '\n')
|
|
|
|
|
|
def gen_samples_NPP(pool: LabelledCollection, nsamples, sample_size, outdir, prevpath):
|
|
os.makedirs(outdir, exist_ok=True)
|
|
with open(prevpath, 'wt') as prevfile:
|
|
prevfile.write('id,' + ','.join(f'{c}' for c in pool.classes_) + '\n')
|
|
for i, sample in enumerate(pool.natural_sampling_generator(sample_size, repeats=nsamples)):
|
|
write_txt_sample(sample, join(outdir, f'{i}.txt'))
|
|
prevfile.write(f'{i},' + ','.join(f'{p:.3f}' for p in sample.prevalence()) + '\n')
|
|
|
|
|
|
def gen_samples_real_prevalences(real_prevalences, pool: LabelledCollection, sample_size, outdir, prevpath_out):
|
|
os.makedirs(outdir, exist_ok=True)
|
|
with open(prevpath_out, 'wt') as prevfile:
|
|
prevfile.write('id,' + ','.join(f'{c}' for c in pool.classes_) + '\n')
|
|
for i, prev in enumerate(real_prevalences):
|
|
sample = pool.sampling(sample_size, *prev[:-1])
|
|
write_txt_sample(sample, join(outdir, f'{i}.txt'))
|
|
prevfile.write(f'{i},' + ','.join(f'{p:.3f}' for p in sample.prevalence()) + '\n')
|
|
|
|
|
|
# fullpath = join(datadir,domain)+'.txt.gz' <- deprecated; there were duplicates
|
|
# data = LabelledCollection.load(fullpath, from_gz_text)
|
|
|
|
fullpath = './data/Books/Books.txt'
|
|
data = LabelledCollection.load(fullpath, from_text)
|
|
|
|
print(len(data))
|
|
print(data.classes_)
|
|
print(data.prevalence())
|
|
|
|
with qp.util.temp_seed(seed):
|
|
train, rest = data.split_stratified(train_prop=tr_size)
|
|
|
|
devel, test = rest.split_stratified(train_prop=0.5)
|
|
print(len(train))
|
|
print(len(devel))
|
|
print(len(test))
|
|
|
|
domaindir = join(outdir, domain)
|
|
|
|
write_txt_sample(train, join(domaindir, 'training_data.txt'))
|
|
write_txt_sample(devel, join(domaindir, 'development_data.txt'))
|
|
write_txt_sample(test, join(domaindir, 'test_data.txt'))
|
|
|
|
# this part is to be used when the partitions have already been created, in order to avoid re-generating them
|
|
#train = load_simple_sample_raw(domaindir, 'training_data')
|
|
#devel = load_simple_sample_raw(domaindir, 'development_data')
|
|
#test = load_simple_sample_raw(domaindir, 'test_data')
|
|
|
|
gen_samples_APP(devel, nsamples=nval, sample_size=val_size, outdir=join(domaindir, 'app', 'dev_samples'),
|
|
prevpath=join(domaindir, 'app', 'dev_prevalences.txt'))
|
|
gen_samples_APP(test, nsamples=nte, sample_size=te_size, outdir=join(domaindir, 'app', 'test_samples'),
|
|
prevpath=join(domaindir, 'app', 'test_prevalences.txt'))
|
|
|
|
# gen_samples_NPP(devel, nsamples=nval, sample_size=val_size, outdir=join(domaindir, 'npp', 'dev_samples'),
|
|
# prevpath=join(domaindir, 'npp', 'dev_prevalences.txt'))
|
|
# gen_samples_NPP(test, nsamples=nte, sample_size=te_size, outdir=join(domaindir, 'npp', 'test_samples'),
|
|
# prevpath=join(domaindir, 'npp', 'test_prevalences.txt'))
|
|
|
|
|
|
# this part generates samples based on real prevalences (in this case, prevalences of sets of books reviews
|
|
# groupped by product). It loads the real prevalences (computed elsewhere), and randomly extract 5000 for test
|
|
# and 1000 for val (disjoint). Then realize the samplings
|
|
|
|
assert os.path.exists(real_prev_path), f'real prevalence file does not seem to exist...'
|
|
real_prevalences = np.genfromtxt(real_prev_path, delimiter='\t')
|
|
|
|
nrows = real_prevalences.shape[0]
|
|
rand_sel = np.random.permutation(nrows)
|
|
real_prevalences_val = real_prevalences[rand_sel[:nval]]
|
|
real_prevalences_te = real_prevalences[rand_sel[nval:nval+nte]]
|
|
|
|
gen_samples_real_prevalences(real_prevalences_val, devel, sample_size=val_size, outdir=join(domaindir, 'real', 'dev_samples'),
|
|
prevpath_out=join(domaindir, 'real', 'dev_prevalences.txt'))
|
|
gen_samples_real_prevalences(real_prevalences_te, test, sample_size=te_size, outdir=join(domaindir, 'real', 'test_samples'),
|
|
prevpath_out=join(domaindir, 'real', 'test_prevalences.txt'))
|
|
|
|
|
|
|