first commit
This commit is contained in:
commit
fe5bcfe61b
|
@ -0,0 +1,141 @@
|
|||
from abc import ABC, abstractmethod
|
||||
import random
|
||||
import numpy as np
|
||||
from collections import Counter
|
||||
|
||||
|
||||
class LabelledCorpus:
|
||||
|
||||
def __init__(self, documents, labels):
|
||||
if not isinstance(documents, np.ndarray): documents = np.asarray(documents)
|
||||
if not isinstance(labels, np.ndarray): labels = np.asarray(labels)
|
||||
self.data = documents
|
||||
self.target = labels
|
||||
|
||||
def __len__(self):
|
||||
return len(self.data)
|
||||
|
||||
@classmethod
|
||||
def filter(cls, labelled_corpus, to_drop):
|
||||
sel_data, sel_target = [], []
|
||||
for i in range(len(labelled_corpus)):
|
||||
if labelled_corpus.target[i] not in to_drop:
|
||||
sel_data.append(labelled_corpus.data[i])
|
||||
sel_target.append(labelled_corpus.target[i])
|
||||
return LabelledCorpus(sel_data, sel_target)
|
||||
|
||||
|
||||
class AuthorshipDataset(ABC):
|
||||
|
||||
def __init__(self, data_path, n_authors=-1, docs_by_author=-1, n_open_set_authors = 0, random_state=42):
|
||||
self.data_path = data_path
|
||||
|
||||
random.seed(random_state)
|
||||
np.random.seed(random_state)
|
||||
|
||||
self._check_n_authors(n_authors, n_open_set_authors)
|
||||
|
||||
self.train, self.test, self.target_names = self._fetch_and_split()
|
||||
|
||||
self._assure_docs_by_author(docs_by_author)
|
||||
|
||||
self._reduce_authors_documents(n_authors, docs_by_author, n_open_set_authors)
|
||||
|
||||
self._remove_label_gaps()
|
||||
|
||||
super().__init__()
|
||||
|
||||
|
||||
@abstractmethod
|
||||
def _fetch_and_split(self):
|
||||
pass
|
||||
|
||||
|
||||
@abstractmethod
|
||||
def _check_n_authors(self, n_authors, n_open_set_authors):
|
||||
pass
|
||||
|
||||
|
||||
def _reduce_authors_documents(self, n_authors, n_docs_by_author, n_open_set_authors):
|
||||
|
||||
if n_authors != -1 or n_docs_by_author != -1:
|
||||
#training data only (test contains all examples by author)
|
||||
if n_docs_by_author != -1:
|
||||
docs_by_author = self.group_by(self.train.data, self.train.target)
|
||||
train_labels, train_data = [], []
|
||||
for author, documents in docs_by_author.items():
|
||||
if n_docs_by_author > len(documents):
|
||||
continue
|
||||
selected_docs = random.sample(documents, n_docs_by_author)
|
||||
train_labels.extend([author] * n_docs_by_author)
|
||||
train_data.extend(selected_docs)
|
||||
|
||||
self.train = LabelledCorpus(train_data, train_labels)
|
||||
|
||||
if n_authors == -1:
|
||||
selected_authors = self.target_names
|
||||
else:
|
||||
selected_authors = random.sample(self.target_names, n_authors+n_open_set_authors)
|
||||
self.test = self.extract_documents_from_authors(self.test, selected_authors)
|
||||
self.train = self.extract_documents_from_authors(self.train, selected_authors)
|
||||
else:
|
||||
selected_authors = np.unique(self.train.target)
|
||||
|
||||
if n_open_set_authors > 0:
|
||||
self.train, self.test, self.test_out = self.disjoint_train_test_authors(
|
||||
self.train, self.test, n_open_set_authors, selected_authors
|
||||
)
|
||||
else:
|
||||
self.test_out = None
|
||||
|
||||
|
||||
# reindex labels so that the unique labels are equal to range(#num_different_authors)
|
||||
# and unique training labels are range(#num_different_training_authors)
|
||||
def _remove_label_gaps(self):
|
||||
|
||||
# reindex the training labels first, so that they contain no gaps
|
||||
unique_labels = np.unique(self.train.target)
|
||||
recode={old:new for old,new in zip(unique_labels,range(len(unique_labels)))}
|
||||
self.train.target=np.array([recode[l] for l in self.train.target])
|
||||
self.test.target = np.array([recode[l] for l in self.test.target])
|
||||
|
||||
#test_out_labels (if requested) contains additional authors
|
||||
if self.test_out is not None:
|
||||
for l in np.unique(self.test_out.target):
|
||||
if l not in recode:
|
||||
recode[l] = len(recode)
|
||||
self.test_out.target = np.array([recode[l] for l in self.test_out.target])
|
||||
|
||||
|
||||
def group_by(self, docs, authors):
|
||||
return {i: docs[authors == i].tolist() for i in np.unique(authors)}
|
||||
|
||||
def extract_documents_from_authors(self, labelled_docs, authors):
|
||||
X, y = labelled_docs.data, labelled_docs.target
|
||||
if not isinstance(X, np.ndarray): X = np.asarray(X)
|
||||
if not isinstance(y, np.ndarray): y = np.asarray(y)
|
||||
idx = np.logical_or.reduce([y == i for i in authors])
|
||||
return LabelledCorpus(X[idx], y[idx])
|
||||
|
||||
def disjoint_train_test_authors(self, train, test, n_open_test_authors, selected_authors):
|
||||
train_authors, test_authors = selected_authors[n_open_test_authors:], selected_authors[:n_open_test_authors]
|
||||
|
||||
train = self.extract_documents_from_authors(train, train_authors)
|
||||
test_in = self.extract_documents_from_authors(test, train_authors)
|
||||
test_out = self.extract_documents_from_authors(test, test_authors)
|
||||
|
||||
return train, test_in, test_out
|
||||
|
||||
def _assure_docs_by_author(self, docs_by_author):
|
||||
if docs_by_author == -1:
|
||||
return
|
||||
|
||||
author_doc_count = Counter(self.train.target)
|
||||
to_remove = frozenset([id for id,count in author_doc_count.most_common() if count<docs_by_author])
|
||||
assert len(to_remove) < len(author_doc_count), 'impossible selection'
|
||||
if len(to_remove)>0:
|
||||
self.train = LabelledCorpus.filter(self.train, to_remove)
|
||||
self.test = LabelledCorpus.filter(self.test, to_remove)
|
||||
self.target_names = sorted(set(self.target_names) - to_remove)
|
||||
|
||||
|
|
@ -0,0 +1,35 @@
|
|||
import numpy as np
|
||||
import csv
|
||||
from sklearn.model_selection import train_test_split
|
||||
from data.AuthorshipDataset import AuthorshipDataset, LabelledCorpus
|
||||
|
||||
|
||||
class Victorian(AuthorshipDataset):
|
||||
|
||||
TEST_SIZE = 0.30
|
||||
|
||||
def __init__(self, data_path='../data/victoria', n_authors=-1, docs_by_author=-1, n_open_set_authors=0, random_state=42):
|
||||
super().__init__(data_path, n_authors, docs_by_author, n_open_set_authors, random_state)
|
||||
|
||||
def _fetch_and_split(self):
|
||||
|
||||
data, labels = [], []
|
||||
|
||||
with open (f'{self.data_path}/Gungor_2018_VictorianAuthorAttribution_data-train.csv','r',encoding="latin-1") as file:
|
||||
csv_reader = csv.reader(file, delimiter = ',')
|
||||
next(csv_reader)
|
||||
for row in csv_reader:
|
||||
# if row[0]!='text':
|
||||
data.append(row[0])
|
||||
labels.append(int(row[1]))
|
||||
|
||||
target_names = sorted(np.unique(labels))
|
||||
|
||||
train_data, test_data, train_labels, test_labels = \
|
||||
train_test_split(data, labels, test_size=Victorian.TEST_SIZE, stratify=labels)
|
||||
|
||||
return LabelledCorpus(train_data, train_labels), LabelledCorpus(test_data, test_labels), target_names
|
||||
|
||||
|
||||
def _check_n_authors(self, n_authors, n_open_set_authors):
|
||||
pass
|
|
@ -0,0 +1,9 @@
|
|||
from sklearn.metrics import f1_score, accuracy_score
|
||||
|
||||
|
||||
def eval(y_true, y_pred):
|
||||
acc = accuracy_score(y_true, y_pred)
|
||||
f1 = f1_score(y_true, y_pred, average='macro')
|
||||
print(f'acc={acc * 100:.2f}%')
|
||||
print(f'macro-f1={f1:.2f}')
|
||||
return acc, f1
|
|
@ -0,0 +1,52 @@
|
|||
import numpy as np
|
||||
from sklearn.feature_extraction.text import CountVectorizer
|
||||
from tqdm import tqdm
|
||||
|
||||
|
||||
class Index:
|
||||
def __init__(self, **kwargs):
|
||||
"""
|
||||
:param kwargs: keyworded arguments from _sklearn.feature_extraction.text.CountVectorizer_
|
||||
"""
|
||||
self.vect = CountVectorizer(**kwargs)
|
||||
self.unk = -1 # a valid index is assigned after fit
|
||||
|
||||
def fit(self, X):
|
||||
"""
|
||||
:param X: a list of strings
|
||||
:return: self
|
||||
"""
|
||||
self.vect.fit(X)
|
||||
self.analyzer = self.vect.build_analyzer()
|
||||
self.vocabulary = self.vect.vocabulary_
|
||||
self.unk = self.add_word('UNKTOKEN')
|
||||
return self
|
||||
|
||||
def transform(self, X):
|
||||
assert self.unk > 0, 'transform called before fit'
|
||||
return np.asarray([
|
||||
[self.vocabulary.get(word, self.unk) for word in self.analyzer(doc)] for doc in tqdm(X, desc='indexing')]
|
||||
)
|
||||
|
||||
def fit_transform(self, X):
|
||||
return self.fit(X).transform(X)
|
||||
|
||||
def vocabulary_size(self):
|
||||
return len(self.vocabulary) + 1 # the reserved unk token
|
||||
|
||||
def add_word(self, word):
|
||||
if word in self.vocabulary:
|
||||
raise ValueError(f'word {word} already in dictionary')
|
||||
self.vocabulary[word] = len(self.vocabulary)
|
||||
return self.vocabulary[word]
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,81 @@
|
|||
import numpy as np
|
||||
from index import Index
|
||||
from model import RNNProjection, AuthorshipAttributionClassifier, Batch, SameAuthorClassifier, FullAuthorClassifier
|
||||
from data.fetch_victorian import Victorian
|
||||
from evaluation import eval
|
||||
import torch
|
||||
|
||||
if torch.cuda.is_available():
|
||||
device = torch.device('cuda')
|
||||
else:
|
||||
device = torch.device('cpu')
|
||||
print(f'running on {device}')
|
||||
|
||||
dataset = Victorian(data_path='../../authorship_analysis/data/victoria', n_authors=5, docs_by_author=25)
|
||||
Xtr, ytr = dataset.train.data, dataset.train.target
|
||||
Xte, yte = dataset.test.data, dataset.test.target
|
||||
A = np.unique(ytr)
|
||||
|
||||
#X = X[:100]
|
||||
#y = y[:100]
|
||||
#Xte = Xte[:100]
|
||||
#yte = yte[:100]
|
||||
|
||||
#X = [
|
||||
# "esto, es una primera prueba",
|
||||
# "esto: es una segunda prueba un poco más larga",
|
||||
# "vamos ahi con la tercera! a ver",
|
||||
# "una cuarta prueba con otro trozo de texto"
|
||||
#]
|
||||
#y = [0,0,1,1]
|
||||
|
||||
|
||||
index = Index(analyzer='char')
|
||||
Xtr = index.fit_transform(Xtr)
|
||||
Xte = index.transform(Xte)
|
||||
pad_index = index.add_word('PADTOKEN')
|
||||
|
||||
shuffle1 = np.random.permutation(Xte.shape[0])
|
||||
shuffle2 = np.random.permutation(Xte.shape[0])
|
||||
x1, y1 = Xte[shuffle1], yte[shuffle1]
|
||||
x2, y2 = Xte[shuffle2], yte[shuffle2]
|
||||
paired_y = y1==y2
|
||||
|
||||
hidden_size=64
|
||||
output_size=128
|
||||
pad_length=1000
|
||||
batch_size=50
|
||||
n_epochs=10
|
||||
|
||||
hidden_size=16
|
||||
output_size=32
|
||||
pad_length=100
|
||||
batch_size=10
|
||||
n_epochs=2
|
||||
|
||||
|
||||
# attribution
|
||||
print('Attribution')
|
||||
phi = RNNProjection(vocab_size=index.vocabulary_size(), hidden_size=hidden_size, output_size=output_size)
|
||||
cls = AuthorshipAttributionClassifier(phi, num_authors=A.size, pad_index=pad_index, pad_length=pad_length, device=device)
|
||||
cls.fit(Xtr, ytr, batch_size=batch_size, epochs=n_epochs)
|
||||
yte_ = cls.predict(Xte)
|
||||
eval(yte, yte_)
|
||||
|
||||
# verification
|
||||
print('Verification')
|
||||
phi = RNNProjection(vocab_size=index.vocabulary_size(), hidden_size=hidden_size, output_size=output_size)
|
||||
cls = SameAuthorClassifier(phi, num_authors=A.size, pad_index=pad_index, pad_length=pad_length, device=device)
|
||||
cls.fit(Xtr, ytr, batch_size=batch_size, epochs=n_epochs)
|
||||
paired_y_ = cls.predict(x1,x2)
|
||||
eval(paired_y, paired_y_)
|
||||
|
||||
# attribution & verification
|
||||
print('Attribution & Verification')
|
||||
phi = RNNProjection(vocab_size=index.vocabulary_size(), hidden_size=hidden_size, output_size=output_size)
|
||||
cls = FullAuthorClassifier(phi, num_authors=A.size, pad_index=pad_index, pad_length=pad_length, device=device)
|
||||
cls.fit(Xtr, ytr, batch_size=batch_size, epochs=n_epochs)
|
||||
yte_ = cls.predict_labels(Xte)
|
||||
eval(yte, yte_)
|
||||
paired_y_ = cls.predict_sav(x1,x2)
|
||||
eval(paired_y, paired_y_)
|
|
@ -0,0 +1,303 @@
|
|||
import numpy as np
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
from tqdm import tqdm
|
||||
import math
|
||||
|
||||
|
||||
class AuthorshipAttributionClassifier(nn.Module):
|
||||
def __init__(self, projector, num_authors, pad_index, pad_length=500, device='cpu'):
|
||||
super(AuthorshipAttributionClassifier, self).__init__()
|
||||
self.projector = projector.to(device)
|
||||
self.label = nn.Linear(self.projector.space_dimensions(), num_authors).to(device)
|
||||
self.padder = Padding(pad_index=pad_index, max_length=pad_length, dynamic=True, pad_at_end=False)
|
||||
self.device=device
|
||||
|
||||
def fit(self, X, y, batch_size, epochs, lr=0.001):
|
||||
self.train()
|
||||
batcher = Batch(batch_size=batch_size, n_epochs=epochs)
|
||||
criterion = torch.nn.CrossEntropyLoss().to(self.device)
|
||||
optim = torch.optim.Adam(self.parameters(), lr=lr)
|
||||
|
||||
pbar = tqdm(range(batcher.n_epochs))
|
||||
for epoch in pbar:
|
||||
losses = []
|
||||
for xi, yi in batcher.epoch(X, y):
|
||||
optim.zero_grad()
|
||||
xi = self.padder.transform(xi)
|
||||
logits = self.forward(xi)
|
||||
loss = criterion(logits, torch.as_tensor(yi))
|
||||
loss.backward()
|
||||
#clip_gradient(model)
|
||||
optim.step()
|
||||
losses.append(loss.item())
|
||||
pbar.set_description(f'training epoch={epoch} loss={np.mean(losses):.5f}')
|
||||
|
||||
def predict(self, x, batch_size=100):
|
||||
self.eval()
|
||||
batcher = Batch(batch_size=batch_size, n_epochs=1, shuffle=False)
|
||||
predictions = []
|
||||
for xi in tqdm(batcher.epoch(x), desc='test'):
|
||||
xi = self.padder.transform(xi)
|
||||
logits = self.forward(xi)
|
||||
prediction = torch.argmax(logits, dim=1).view(-1).detach().numpy()
|
||||
predictions.append(prediction)
|
||||
return np.concatenate(predictions)
|
||||
|
||||
def forward(self, x):
|
||||
phi = self.projector(x)
|
||||
return self.label(phi)
|
||||
|
||||
|
||||
class SameAuthorClassifier(nn.Module):
|
||||
def __init__(self, projector, num_authors, pad_index, pad_length=500, device='cpu'):
|
||||
super(SameAuthorClassifier, self).__init__()
|
||||
self.projector = projector.to(device)
|
||||
self.padder = Padding(pad_index=pad_index, max_length=pad_length, dynamic=True, pad_at_end=False)
|
||||
self.device = device
|
||||
|
||||
def fit(self, X, y, batch_size, epochs, lr=0.001, steps_per_epoch=100):
|
||||
self.train()
|
||||
batcher = TwoClassBatch(batch_size=batch_size, n_epochs=epochs, steps_per_epoch=steps_per_epoch)
|
||||
optim = torch.optim.Adam(self.parameters(), lr=lr)
|
||||
|
||||
pbar = tqdm(range(batcher.n_epochs))
|
||||
for epoch in pbar:
|
||||
losses = []
|
||||
for xi, yi in batcher.epoch(X, y):
|
||||
optim.zero_grad()
|
||||
xi = self.padder.transform(xi)
|
||||
phi = self.projector(xi)
|
||||
#normalize phi to have norm 1? maybe better as the last step of projector
|
||||
kernel = torch.matmul(phi, phi.T)
|
||||
ideal_kernel = torch.as_tensor(1 * (np.outer(1 + yi, 1 / (yi + 1)) == 1))
|
||||
loss = KernelAlignmentLoss(kernel, ideal_kernel)
|
||||
loss.backward()
|
||||
#clip_gradient(model)
|
||||
optim.step()
|
||||
losses.append(loss.item())
|
||||
pbar.set_description(f'training epoch={epoch} loss={np.mean(losses):.5f}')
|
||||
|
||||
def predict(self, x, z, batch_size=100):
|
||||
self.eval()
|
||||
batcher = Batch(batch_size=batch_size, n_epochs=1, shuffle=False)
|
||||
predictions = []
|
||||
for xi, zi in tqdm(batcher.epoch(x, z), desc='test'):
|
||||
xi = self.padder.transform(xi)
|
||||
zi = self.padder.transform(zi)
|
||||
inners = self.forward(xi, zi)
|
||||
prediction = inners.detach().numpy() > 0.5 # is this correct? should it be > 0 and the ideal kernel in field {-1,+1}?
|
||||
predictions.append(prediction)
|
||||
return np.concatenate(predictions)
|
||||
|
||||
def forward(self, x, z):
|
||||
assert x.shape == z.shape, 'shape mismatch between matrices x and z'
|
||||
phi_x = self.projector(x)
|
||||
phi_z = self.projector(z)
|
||||
rows, cols = phi_x.shape
|
||||
pairwise_inners = torch.bmm(phi_x.view(rows, 1, cols), phi_z.view(rows, cols, 1)).squeeze()
|
||||
return pairwise_inners
|
||||
|
||||
|
||||
class FullAuthorClassifier(nn.Module):
|
||||
def __init__(self, projector, num_authors, pad_index, pad_length=500, device='cpu'):
|
||||
super(FullAuthorClassifier, self).__init__()
|
||||
self.projector = projector.to(device)
|
||||
self.label = nn.Linear(self.projector.space_dimensions(), num_authors).to(device)
|
||||
self.padder = Padding(pad_index=pad_index, max_length=pad_length, dynamic=True, pad_at_end=False)
|
||||
self.device = device
|
||||
|
||||
def fit(self, X, y, batch_size, epochs, lr=0.001, steps_per_epoch=100):
|
||||
self.train()
|
||||
batcher = TwoClassBatch(batch_size=batch_size, n_epochs=epochs, steps_per_epoch=steps_per_epoch)
|
||||
criterion = torch.nn.CrossEntropyLoss().to(self.device)
|
||||
optim = torch.optim.Adam(self.parameters(), lr=lr)
|
||||
alpha = 0.5
|
||||
|
||||
pbar = tqdm(range(batcher.n_epochs))
|
||||
for epoch in pbar:
|
||||
losses, sav_losses, attr_losses = [], [], []
|
||||
for xi, yi in batcher.epoch(X, y):
|
||||
optim.zero_grad()
|
||||
xi = self.padder.transform(xi)
|
||||
phi = self.projector(xi)
|
||||
#normalize phi to have norm 1? maybe better as the last step of projector
|
||||
|
||||
#sav-loss
|
||||
kernel = torch.matmul(phi, phi.T)
|
||||
ideal_kernel = torch.as_tensor(1 * (np.outer(1 + yi, 1 / (yi + 1)) == 1))
|
||||
sav_loss = KernelAlignmentLoss(kernel, ideal_kernel)
|
||||
sav_losses.append(sav_loss.item())
|
||||
|
||||
#attr-loss
|
||||
logits = self.label(phi)
|
||||
attr_loss = criterion(logits, torch.as_tensor(yi))
|
||||
attr_losses.append(attr_loss.item())
|
||||
|
||||
#loss
|
||||
loss = (alpha)*sav_loss + (1-alpha)*attr_loss
|
||||
losses.append(loss.item())
|
||||
|
||||
loss.backward()
|
||||
#clip_gradient(model)
|
||||
optim.step()
|
||||
pbar.set_description(
|
||||
f'training epoch={epoch} '
|
||||
f'sav-loss={np.mean(sav_losses):.5f} '
|
||||
f'attr-loss={np.mean(attr_losses):.5f} '
|
||||
f'loss={np.mean(losses):.5f}'
|
||||
)
|
||||
|
||||
def predict_sav(self, x, z, batch_size=100):
|
||||
self.eval()
|
||||
batcher = Batch(batch_size=batch_size, n_epochs=1, shuffle=False)
|
||||
predictions = []
|
||||
for xi, zi in tqdm(batcher.epoch(x, z), desc='test'):
|
||||
xi = self.padder.transform(xi)
|
||||
zi = self.padder.transform(zi)
|
||||
phi_xi = self.projector(xi)
|
||||
phi_zi = self.projector(zi)
|
||||
rows, cols = phi_xi.shape
|
||||
pairwise_inners = torch.bmm(phi_xi.view(rows, 1, cols), phi_zi.view(rows, cols, 1)).squeeze()
|
||||
prediction = pairwise_inners.detach().numpy() > 0.5 # is this correct? should it be > 0 and the ideal kernel in field {-1,+1}?
|
||||
predictions.append(prediction)
|
||||
return np.concatenate(predictions)
|
||||
|
||||
def predict_labels(self, x, batch_size=100):
|
||||
self.eval()
|
||||
batcher = Batch(batch_size=batch_size, n_epochs=1, shuffle=False)
|
||||
predictions = []
|
||||
for xi in tqdm(batcher.epoch(x), desc='test'):
|
||||
xi = self.padder.transform(xi)
|
||||
phi = self.projector(xi)
|
||||
logits = self.label(phi)
|
||||
prediction = torch.argmax(logits, dim=1).view(-1).detach().numpy()
|
||||
predictions.append(prediction)
|
||||
return np.concatenate(predictions)
|
||||
|
||||
|
||||
def KernelAlignmentLoss(K, Y):
|
||||
n_el = K.shape[0]*K.shape[1]
|
||||
loss = torch.norm(K - Y, p='fro') # in Nello's paper this is different
|
||||
loss = loss / n_el # this is in order to factor out the accumulation which is only due to the size
|
||||
return loss
|
||||
|
||||
|
||||
class RNNProjection(nn.Module):
|
||||
def __init__(self, vocab_size, hidden_size, output_size):
|
||||
super(RNNProjection, self).__init__()
|
||||
self.output_size = output_size
|
||||
self.hidden_size = hidden_size
|
||||
self.vocab_size = vocab_size
|
||||
self.num_layers=1
|
||||
self.num_directions=1
|
||||
|
||||
self.embedding = nn.Embedding(vocab_size, hidden_size)
|
||||
self.rnn = nn.GRU(
|
||||
input_size=hidden_size,
|
||||
hidden_size=hidden_size,
|
||||
num_layers=self.num_layers,
|
||||
bidirectional=(self.num_directions == 2),
|
||||
batch_first=True
|
||||
)
|
||||
self.projection = nn.Linear(self.num_layers * self.num_directions * self.hidden_size, output_size)
|
||||
|
||||
def init_hidden(self, batch_size):
|
||||
return torch.zeros(self.num_layers * self.num_directions, batch_size, self.hidden_size) #.cuda()
|
||||
|
||||
def forward(self, input):
|
||||
x = torch.as_tensor(input)
|
||||
batch_size = x.shape[0]
|
||||
x = self.embedding(x)
|
||||
output, hn = self.rnn(x, self.init_hidden(batch_size))
|
||||
hn = hn.view(self.num_layers, self.num_directions, batch_size, self.hidden_size)
|
||||
hn = hn.permute(2, 0, 1, 3).reshape(batch_size, -1)
|
||||
return self.projection(hn)
|
||||
|
||||
def space_dimensions(self):
|
||||
return self.output_size
|
||||
|
||||
|
||||
class Batch:
|
||||
def __init__(self, batch_size, n_epochs, shuffle=True):
|
||||
self.batch_size = batch_size
|
||||
self.n_epochs = n_epochs
|
||||
self.shuffle = shuffle
|
||||
self.current_epoch = 0
|
||||
|
||||
def epoch(self, *args):
|
||||
lengths = list(map(len, args))
|
||||
assert max(lengths) == min(lengths), 'inconsistent sizes in args'
|
||||
n_batches = math.ceil(lengths[0] / self.batch_size)
|
||||
offset = 0
|
||||
if self.shuffle:
|
||||
index = np.random.permutation(len(args[0]))
|
||||
args = [arg[index] for arg in args]
|
||||
for b in range(n_batches):
|
||||
batch_idx = slice(offset, offset+self.batch_size)
|
||||
batch = [arg[batch_idx] for arg in args]
|
||||
yield batch if len(batch) > 1 else batch[0]
|
||||
offset += self.batch_size
|
||||
self.current_epoch += 1
|
||||
|
||||
|
||||
class TwoClassBatch:
|
||||
"""
|
||||
given a X and y (multi-label) produces batches of elements of X, y for two classes (e.g., c1, c2)
|
||||
of equal size, i.e., the batch is [(x1,c1), ..., (xn,c1), (xn+1,c2), ..., (x2n,c2)]
|
||||
"""
|
||||
def __init__(self, batch_size, n_epochs, steps_per_epoch):
|
||||
self.batch_size = batch_size
|
||||
self.n_epochs = n_epochs
|
||||
self.steps_per_epoch = steps_per_epoch
|
||||
self.current_epoch = 0
|
||||
if self.batch_size % 2 != 0:
|
||||
raise ValueError('warning, batch size is not even')
|
||||
|
||||
def epoch(self, X, y):
|
||||
n_el = len(y)
|
||||
assert X.shape[0] == n_el, 'inconsistent sizes in X, y'
|
||||
classes = np.unique(y)
|
||||
groups = {ci: X[y==ci] for ci in classes}
|
||||
class_prevalences = [len(groups[ci])/n_el for ci in classes]
|
||||
n_choices = self.batch_size // 2
|
||||
|
||||
for b in range(self.steps_per_epoch):
|
||||
class1, class2 = np.random.choice(classes, p=class_prevalences, size=2, replace=False)
|
||||
X1 = np.random.choice(groups[class1], size=n_choices)
|
||||
X2 = np.random.choice(groups[class2], size=n_choices)
|
||||
X_batch = np.concatenate([X1,X2])
|
||||
y_batch = np.repeat([class1, class2], repeats=[n_choices,n_choices])
|
||||
yield X_batch, y_batch
|
||||
self.current_epoch += 1
|
||||
|
||||
|
||||
class Padding:
|
||||
def __init__(self, pad_index, max_length, dynamic=True, pad_at_end=True):
|
||||
"""
|
||||
:param pad_index: the index representing the PAD token
|
||||
:param max_length: the length that defines the padding
|
||||
:param dynamic: if True (default) pads at min(max_length, max_local_length) where max_local_length is the
|
||||
length of the longest example
|
||||
:param pad_at_end: if True, the pad tokens are added at the end of the lists, if otherwise they are added
|
||||
at the beginning
|
||||
"""
|
||||
self.pad = pad_index
|
||||
self.max_length = max_length
|
||||
self.dynamic = dynamic
|
||||
self.pad_at_end = pad_at_end
|
||||
|
||||
def transform(self, X):
|
||||
"""
|
||||
:param X: a list of lists of indexes (integers)
|
||||
:return: a ndarray of shape (n,m) where n is the number of elements in X and m is the pad length (the maximum
|
||||
in elements of X if dynamic, or self.max_length if otherwise)
|
||||
"""
|
||||
X = [x[:self.max_length] for x in X]
|
||||
lengths = list(map(len, X))
|
||||
pad_length = min(max(lengths), self.max_length) if self.dynamic else self.max_length
|
||||
if self.pad_at_end:
|
||||
padded = [x + [self.pad] * (pad_length - x_len) for x, x_len in zip(X, lengths)]
|
||||
else:
|
||||
padded = [[self.pad] * (pad_length - x_len) + x for x, x_len in zip(X, lengths)]
|
||||
return np.asarray(padded, dtype=int)
|
Loading…
Reference in New Issue