This commit is contained in:
Alejandro Moreo Fernandez 2020-05-01 22:48:31 +02:00
parent 8c70e61bbb
commit efe9d90f89
6 changed files with 168 additions and 422 deletions

45
src/data/fetch_imdb62.py Normal file
View File

@ -0,0 +1,45 @@
import numpy as np
from sklearn.model_selection import train_test_split
import random
from data.AuthorshipDataset import AuthorshipDataset, LabelledCorpus
class Imdb62(AuthorshipDataset):
TEST_SIZE = 0.30
NUM_AUTHORS = 62
NUM_DOCS_BY_AUTHOR = int(1000-(1000*TEST_SIZE))
def __init__(self, data_path='../data/imdb62/imdb62.txt', n_authors=-1, docs_by_author=-1, n_open_set_authors = 0, random_state=42):
super().__init__(data_path, n_authors, docs_by_author, n_open_set_authors, random_state)
def _fetch_and_split(self):
file = open(self.data_path,'rt', encoding= "utf-8").readlines()
splits = [line.split('\t') for line in file]
reviews = np.asarray([split[4]+' '+split[5] for split in splits])
authors=[]
authors_ids = dict()
for s in splits:
author_key = s[1]
if author_key not in authors_ids:
authors_ids[author_key]=len(authors_ids)
author_id = authors_ids[author_key]
authors.append(author_id)
authors = np.array(authors)
authors_names = sorted(np.unique(authors))
train_data, test_data, train_labels, test_labels = \
train_test_split(reviews, authors, test_size=Imdb62.TEST_SIZE, stratify=authors)
return LabelledCorpus(train_data, train_labels), LabelledCorpus(test_data, test_labels), authors_names
def _check_n_authors(self, n_authors, n_open_set_authors):
if n_authors==-1: return
elif n_authors+n_open_set_authors > Imdb62.NUM_AUTHORS:
raise ValueError(f'Too many authors requested. Max is {Imdb62.NUM_AUTHORS}')

View File

@ -1,47 +1,14 @@
import numpy as np
from data.fetch_imdb62 import Imdb62
from index import Index
from model.model import RNNProjection, AuthorshipAttributionClassifier, SameAuthorClassifier, FullAuthorClassifier
from model.classifiers import AuthorshipAttributionClassifier, SameAuthorClassifier, FullAuthorClassifier
from data.fetch_victorian import Victorian
from evaluation import eval
import torch
from model.transformations import CNNProjection
import sys
from model.cnn import CNNProjection
if torch.cuda.is_available():
device = torch.device('cuda')
else:
device = torch.device('cpu')
print(f'running on {device}')
dataset = Victorian(data_path='../../authorship_analysis/data/victoria', n_authors=5, docs_by_author=25)
Xtr, ytr = dataset.train.data, dataset.train.target
Xte, yte = dataset.test.data, dataset.test.target
A = np.unique(ytr)
#X = X[:100]
#y = y[:100]
#Xte = Xte[:100]
#yte = yte[:100]
#X = [
# "esto, es una primera prueba",
# "esto: es una segunda prueba un poco más larga",
# "vamos ahi con la tercera! a ver",
# "una cuarta prueba con otro trozo de texto"
#]
#y = [0,0,1,1]
index = Index(analyzer='char')
Xtr = index.fit_transform(Xtr)
Xte = index.transform(Xte)
pad_index = index.add_word('PADTOKEN')
shuffle1 = np.random.permutation(Xte.shape[0])
shuffle2 = np.random.permutation(Xte.shape[0])
x1, y1 = Xte[shuffle1], yte[shuffle1]
x2, y2 = Xte[shuffle2], yte[shuffle2]
paired_y = y1==y2
hidden_size=128
channels_out=128
@ -50,13 +17,38 @@ kernel_sizes=[3,5,7,11,13]
pad_length=1000
batch_size=64
n_epochs=256
"""
hidden_size=16
output_size=32
pad_length=100
batch_size=10
n_epochs=2
"""
bigrams=True
#hidden_size=16
#output_size=32
#pad_length=100
#batch_size=10
#n_epochs=20
if torch.cuda.is_available():
device = torch.device('cuda')
else:
device = torch.device('cpu')
print(f'running on {device}')
#dataset = Victorian(data_path='../../authorship_analysis/data/victoria', n_authors=5, docs_by_author=25)
dataset = Imdb62(data_path='../../authorship_analysis/data/imdb62/imdb62.txt', n_authors=-1, docs_by_author=-1)
Xtr, ytr = dataset.train.data, dataset.train.target
Xte, yte = dataset.test.data, dataset.test.target
A = np.unique(ytr)
print(f'num authors={len(A)}')
index = Index(analyzer='char', ngram_range=(2,2) if bigrams else (1,1))
Xtr = index.fit_transform(Xtr)
Xte = index.transform(Xte)
pad_index = index.add_word('PADTOKEN')
print(f'vocabulary size={index.vocabulary_size()}')
#shuffle1 = np.random.permutation(Xte.shape[0])
#shuffle2 = np.random.permutation(Xte.shape[0])
#x1, y1 = Xte[shuffle1], yte[shuffle1]
#x2, y2 = Xte[shuffle2], yte[shuffle2]
#paired_y = y1==y2
# attribution
print('Attribution')

View File

@ -1,311 +0,0 @@
import numpy as np
import torch
import torch.nn as nn
from tqdm import tqdm
import math
def tensor2numpy(t,device):
if device=='cpu':
return t.detach().numpy()
else:
return t.cpu().detach().numpy()
class AuthorshipAttributionClassifier(nn.Module):
def __init__(self, projector, num_authors, pad_index, pad_length=500, device='cpu'):
super(AuthorshipAttributionClassifier, self).__init__()
self.projector = projector.to(device)
self.label = nn.Linear(self.projector.space_dimensions(), num_authors).to(device)
self.padder = Padding(pad_index=pad_index, max_length=pad_length, dynamic=True, pad_at_end=False)
self.device=device
def fit(self, X, y, batch_size, epochs, lr=0.001):
self.train()
batcher = Batch(batch_size=batch_size, n_epochs=epochs)
criterion = torch.nn.CrossEntropyLoss().to(self.device)
optim = torch.optim.Adam(self.parameters(), lr=lr)
pbar = tqdm(range(batcher.n_epochs))
for epoch in pbar:
losses = []
for xi, yi in batcher.epoch(X, y):
optim.zero_grad()
xi = self.padder.transform(xi)
logits = self.forward(xi)
loss = criterion(logits, torch.as_tensor(yi).to(self.device))
loss.backward()
#clip_gradient(model)
optim.step()
losses.append(loss.item())
pbar.set_description(f'training epoch={epoch} loss={np.mean(losses):.5f}')
def predict(self, x, batch_size=100):
self.eval()
batcher = Batch(batch_size=batch_size, n_epochs=1, shuffle=False)
predictions = []
for xi in tqdm(batcher.epoch(x), desc='test'):
xi = self.padder.transform(xi)
logits = self.forward(xi)
prediction = tensor2numpy(torch.argmax(logits, dim=1).view(-1), self.device)
predictions.append(prediction)
return np.concatenate(predictions)
def forward(self, x):
phi = self.projector(x)
return self.label(phi)
class SameAuthorClassifier(nn.Module):
def __init__(self, projector, num_authors, pad_index, pad_length=500, device='cpu'):
super(SameAuthorClassifier, self).__init__()
self.projector = projector.to(device)
self.padder = Padding(pad_index=pad_index, max_length=pad_length, dynamic=True, pad_at_end=False)
self.device = device
def fit(self, X, y, batch_size, epochs, lr=0.001, steps_per_epoch=100):
self.train()
batcher = TwoClassBatch(batch_size=batch_size, n_epochs=epochs, steps_per_epoch=steps_per_epoch)
optim = torch.optim.Adam(self.parameters(), lr=lr)
pbar = tqdm(range(batcher.n_epochs))
for epoch in pbar:
losses = []
for xi, yi in batcher.epoch(X, y):
optim.zero_grad()
xi = self.padder.transform(xi)
phi = self.projector(xi)
#normalize phi to have norm 1? maybe better as the last step of projector
kernel = torch.matmul(phi, phi.T)
ideal_kernel = torch.as_tensor(1 * (np.outer(1 + yi, 1 / (yi + 1)) == 1)).to(self.device)
loss = KernelAlignmentLoss(kernel, ideal_kernel)
loss.backward()
#clip_gradient(model)
optim.step()
losses.append(loss.item())
pbar.set_description(f'training epoch={epoch} loss={np.mean(losses):.5f}')
def predict(self, x, z, batch_size=100):
self.eval()
batcher = Batch(batch_size=batch_size, n_epochs=1, shuffle=False)
predictions = []
for xi, zi in tqdm(batcher.epoch(x, z), desc='test'):
xi = self.padder.transform(xi)
zi = self.padder.transform(zi)
inners = self.forward(xi, zi)
prediction = tensor2numpy(inners, device=self.device) > 0.5 # is this correct? should it be > 0 and the ideal kernel in field {-1,+1}?
predictions.append(prediction)
return np.concatenate(predictions)
def forward(self, x, z):
assert x.shape == z.shape, 'shape mismatch between matrices x and z'
phi_x = self.projector(x)
phi_z = self.projector(z)
rows, cols = phi_x.shape
pairwise_inners = torch.bmm(phi_x.view(rows, 1, cols), phi_z.view(rows, cols, 1)).squeeze()
return pairwise_inners
class FullAuthorClassifier(nn.Module):
def __init__(self, projector, num_authors, pad_index, pad_length=500, device='cpu'):
super(FullAuthorClassifier, self).__init__()
self.projector = projector.to(device)
self.label = nn.Linear(self.projector.space_dimensions(), num_authors).to(device)
self.padder = Padding(pad_index=pad_index, max_length=pad_length, dynamic=True, pad_at_end=False)
self.device = device
def fit(self, X, y, batch_size, epochs, lr=0.001, steps_per_epoch=100):
self.train()
batcher = TwoClassBatch(batch_size=batch_size, n_epochs=epochs, steps_per_epoch=steps_per_epoch)
criterion = torch.nn.CrossEntropyLoss().to(self.device)
optim = torch.optim.Adam(self.parameters(), lr=lr)
alpha = 0.5
pbar = tqdm(range(batcher.n_epochs))
for epoch in pbar:
losses, sav_losses, attr_losses = [], [], []
for xi, yi in batcher.epoch(X, y):
optim.zero_grad()
xi = self.padder.transform(xi)
phi = self.projector(xi)
#normalize phi to have norm 1? maybe better as the last step of projector
#sav-loss
kernel = torch.matmul(phi, phi.T)
ideal_kernel = torch.as_tensor(1 * (np.outer(1 + yi, 1 / (yi + 1)) == 1)).to(self.device)
sav_loss = KernelAlignmentLoss(kernel, ideal_kernel)
sav_losses.append(sav_loss.item())
#attr-loss
logits = self.label(phi)
attr_loss = criterion(logits, torch.as_tensor(yi).to(self.device))
attr_losses.append(attr_loss.item())
#loss
loss = (alpha)*sav_loss + (1-alpha)*attr_loss
losses.append(loss.item())
loss.backward()
#clip_gradient(model)
optim.step()
pbar.set_description(
f'training epoch={epoch} '
f'sav-loss={np.mean(sav_losses):.5f} '
f'attr-loss={np.mean(attr_losses):.5f} '
f'loss={np.mean(losses):.5f}'
)
def predict_sav(self, x, z, batch_size=100):
self.eval()
batcher = Batch(batch_size=batch_size, n_epochs=1, shuffle=False)
predictions = []
for xi, zi in tqdm(batcher.epoch(x, z), desc='test'):
xi = self.padder.transform(xi)
zi = self.padder.transform(zi)
phi_xi = self.projector(xi)
phi_zi = self.projector(zi)
rows, cols = phi_xi.shape
pairwise_inners = torch.bmm(phi_xi.view(rows, 1, cols), phi_zi.view(rows, cols, 1)).squeeze()
prediction = tensor2numpy(pairwise_inners, device=self.device) > 0.5 # is this correct? should it be > 0 and the ideal kernel in field {-1,+1}?
predictions.append(prediction)
return np.concatenate(predictions)
def predict_labels(self, x, batch_size=100):
self.eval()
batcher = Batch(batch_size=batch_size, n_epochs=1, shuffle=False)
predictions = []
for xi in tqdm(batcher.epoch(x), desc='test'):
xi = self.padder.transform(xi)
phi = self.projector(xi)
logits = self.label(phi)
prediction =tensor2numpy( torch.argmax(logits, dim=1).view(-1), device=self.device)
predictions.append(prediction)
return np.concatenate(predictions)
def KernelAlignmentLoss(K, Y):
n_el = K.shape[0]*K.shape[1]
loss = torch.norm(K - Y, p='fro') # in Nello's paper this is different
loss = loss / n_el # this is in order to factor out the accumulation which is only due to the size
return loss
class RNNProjection(nn.Module):
def __init__(self, vocab_size, hidden_size, output_size, device='cpu'):
super(RNNProjection, self).__init__()
self.output_size = output_size
self.hidden_size = hidden_size
self.vocab_size = vocab_size
self.num_layers=1
self.num_directions=1
self.device=device
self.embedding = nn.Embedding(vocab_size, hidden_size).to(device)
self.rnn = nn.GRU(
input_size=hidden_size,
hidden_size=hidden_size,
num_layers=self.num_layers,
bidirectional=(self.num_directions == 2),
batch_first=True
).to(device)
self.projection = nn.Linear(self.num_layers * self.num_directions * self.hidden_size, output_size).to(device)
def init_hidden(self, batch_size):
return torch.zeros(self.num_layers * self.num_directions, batch_size, self.hidden_size).to(self.device)
def forward(self, input):
x = torch.as_tensor(input).to(self.device)
batch_size = x.shape[0]
x = self.embedding(x)
output, hn = self.rnn(x, self.init_hidden(batch_size))
hn = hn.view(self.num_layers, self.num_directions, batch_size, self.hidden_size)
hn = hn.permute(2, 0, 1, 3).reshape(batch_size, -1)
return self.projection(hn)
def space_dimensions(self):
return self.output_size
class Batch:
def __init__(self, batch_size, n_epochs, shuffle=True):
self.batch_size = batch_size
self.n_epochs = n_epochs
self.shuffle = shuffle
self.current_epoch = 0
def epoch(self, *args):
lengths = list(map(len, args))
assert max(lengths) == min(lengths), 'inconsistent sizes in args'
n_batches = math.ceil(lengths[0] / self.batch_size)
offset = 0
if self.shuffle:
index = np.random.permutation(len(args[0]))
args = [arg[index] for arg in args]
for b in range(n_batches):
batch_idx = slice(offset, offset+self.batch_size)
batch = [arg[batch_idx] for arg in args]
yield batch if len(batch) > 1 else batch[0]
offset += self.batch_size
self.current_epoch += 1
class TwoClassBatch:
"""
given a X and y (multi-label) produces batches of elements of X, y for two classes (e.g., c1, c2)
of equal size, i.e., the batch is [(x1,c1), ..., (xn,c1), (xn+1,c2), ..., (x2n,c2)]
"""
def __init__(self, batch_size, n_epochs, steps_per_epoch):
self.batch_size = batch_size
self.n_epochs = n_epochs
self.steps_per_epoch = steps_per_epoch
self.current_epoch = 0
if self.batch_size % 2 != 0:
raise ValueError('warning, batch size is not even')
def epoch(self, X, y):
n_el = len(y)
assert X.shape[0] == n_el, 'inconsistent sizes in X, y'
classes = np.unique(y)
groups = {ci: X[y==ci] for ci in classes}
class_prevalences = [len(groups[ci])/n_el for ci in classes]
n_choices = self.batch_size // 2
for b in range(self.steps_per_epoch):
class1, class2 = np.random.choice(classes, p=class_prevalences, size=2, replace=False)
X1 = np.random.choice(groups[class1], size=n_choices)
X2 = np.random.choice(groups[class2], size=n_choices)
X_batch = np.concatenate([X1,X2])
y_batch = np.repeat([class1, class2], repeats=[n_choices,n_choices])
yield X_batch, y_batch
self.current_epoch += 1
class Padding:
def __init__(self, pad_index, max_length, dynamic=True, pad_at_end=True):
"""
:param pad_index: the index representing the PAD token
:param max_length: the length that defines the padding
:param dynamic: if True (default) pads at min(max_length, max_local_length) where max_local_length is the
length of the longest example
:param pad_at_end: if True, the pad tokens are added at the end of the lists, if otherwise they are added
at the beginning
"""
self.pad = pad_index
self.max_length = max_length
self.dynamic = dynamic
self.pad_at_end = pad_at_end
def transform(self, X):
"""
:param X: a list of lists of indexes (integers)
:return: a ndarray of shape (n,m) where n is the number of elements in X and m is the pad length (the maximum
in elements of X if dynamic, or self.max_length if otherwise)
"""
X = [x[:self.max_length] for x in X]
lengths = list(map(len, X))
pad_length = min(max(lengths), self.max_length) if self.dynamic else self.max_length
if self.pad_at_end:
padded = [x + [self.pad] * (pad_length - x_len) for x, x_len in zip(X, lengths)]
else:
padded = [[self.pad] * (pad_length - x_len) + x for x, x_len in zip(X, lengths)]
return np.asarray(padded, dtype=int)

View File

@ -3,12 +3,7 @@ import torch
import torch.nn as nn
from tqdm import tqdm
import math
def tensor2numpy(t, device):
if device == 'cpu':
t = t.cpu()
return t.detach().numpy()
from sklearn.model_selection import train_test_split
class AuthorshipAttributionClassifier(nn.Module):
@ -18,28 +13,48 @@ class AuthorshipAttributionClassifier(nn.Module):
self.ff = FFProjection(input_size=projector.space_dimensions(),
hidden_sizes=[1024],
output_size=num_authors).to(device)
self.padder = Padding(pad_index=pad_index, max_length=pad_length, dynamic=True, pad_at_end=False)
self.padder = Padding(pad_index=pad_index, max_length=pad_length, dynamic=True, pad_at_end=False, device=device)
self.device = device
def fit(self, X, y, batch_size, epochs, lr=0.001):
self.train()
def fit(self, X, y, batch_size, epochs, lr=0.001, val_prop=0.2, log='../log/tmp.csv'):
batcher = Batch(batch_size=batch_size, n_epochs=epochs)
criterion = torch.nn.CrossEntropyLoss().to(self.device)
optim = torch.optim.Adam(self.parameters(), lr=lr)
pbar = tqdm(range(batcher.n_epochs))
for epoch in pbar:
losses = []
for xi, yi in batcher.epoch(X, y):
optim.zero_grad()
xi = self.padder.transform(xi)
logits = self.forward(torch.as_tensor(xi).to(self.device))
loss = criterion(logits, torch.as_tensor(yi).to(self.device))
loss.backward()
#clip_gradient(model)
optim.step()
losses.append(loss.item())
pbar.set_description(f'training epoch={epoch} loss={np.mean(losses):.5f}')
X, Xval, y, yval = train_test_split(X, y, test_size=val_prop, stratify=y)
with open(log, 'wt') as foo:
foo.write('epoch\ttr-loss\tval-loss\n')
tr_loss, val_loss = -1, -1
pbar = tqdm(range(1,batcher.n_epochs+1))
for epoch in pbar:
# training
self.train()
losses = []
for xi, yi in batcher.epoch(X, y):
optim.zero_grad()
loss = self._compute_loss(xi, yi, criterion)
loss.backward()
#clip_gradient(model)
optim.step()
losses.append(loss.item())
tr_loss = np.mean(losses)
pbar.set_description(f'training epoch={epoch} loss={tr_loss:.5f} val_loss={val_loss:.5f}')
# validation
self.eval()
losses = []
for xi, yi in batcher.epoch(Xval, yval):
loss = self._compute_loss(xi, yi, criterion)
losses.append(loss.item())
val_loss = np.mean(losses)
foo.write(f'{epoch}\t{tr_loss:.8f}\t{val_loss:.8f}\n')
def _compute_loss(self, x, y, criterion):
x = self.padder.transform(x)
logits = self.forward(x)
return criterion(logits, torch.as_tensor(y).to(self.device))
def predict(self, x, batch_size=100):
self.eval()
@ -47,8 +62,8 @@ class AuthorshipAttributionClassifier(nn.Module):
predictions = []
for xi in tqdm(batcher.epoch(x), desc='test'):
xi = self.padder.transform(xi)
logits = self.forward(torch.as_tensor(xi).to(self.device))
prediction = tensor2numpy(torch.argmax(logits, dim=1).view(-1), self.device)
logits = self.forward(xi)
prediction = tensor2numpy(torch.argmax(logits, dim=1).view(-1))
predictions.append(prediction)
return np.concatenate(predictions)
@ -61,7 +76,7 @@ class SameAuthorClassifier(nn.Module):
def __init__(self, projector, num_authors, pad_index, pad_length=500, device='cpu'):
super(SameAuthorClassifier, self).__init__()
self.projector = projector.to(device)
self.padder = Padding(pad_index=pad_index, max_length=pad_length, dynamic=True, pad_at_end=False)
self.padder = Padding(pad_index=pad_index, max_length=pad_length, dynamic=True, pad_at_end=False, device=device)
self.device = device
def fit(self, X, y, batch_size, epochs, lr=0.001, steps_per_epoch=100):
@ -94,7 +109,7 @@ class SameAuthorClassifier(nn.Module):
xi = self.padder.transform(xi)
zi = self.padder.transform(zi)
inners = self.forward(xi, zi)
prediction = tensor2numpy(inners, device=self.device) > 0.5 # is this correct? should it be > 0 and the ideal kernel in field {-1,+1}?
prediction = tensor2numpy(inners) > 0.5 # is this correct? should it be > 0 and the ideal kernel in field {-1,+1}?
predictions.append(prediction)
return np.concatenate(predictions)
@ -114,7 +129,7 @@ class FullAuthorClassifier(nn.Module):
self.ff = FFProjection(input_size=projector.space_dimensions(),
hidden_sizes=[1024],
output_size=num_authors).to(device)
self.padder = Padding(pad_index=pad_index, max_length=pad_length, dynamic=True, pad_at_end=False)
self.padder = Padding(pad_index=pad_index, max_length=pad_length, dynamic=True, pad_at_end=False, device=device)
self.device = device
def fit(self, X, y, batch_size, epochs, lr=0.001, steps_per_epoch=100):
@ -169,7 +184,7 @@ class FullAuthorClassifier(nn.Module):
phi_zi = self.projector(zi)
rows, cols = phi_xi.shape
pairwise_inners = torch.bmm(phi_xi.view(rows, 1, cols), phi_zi.view(rows, cols, 1)).squeeze()
prediction = tensor2numpy(pairwise_inners, device=self.device) > 0.5 # is this correct? should it be > 0 and the ideal kernel in field {-1,+1}?
prediction = tensor2numpy(pairwise_inners) > 0.5 # is this correct? should it be > 0 and the ideal kernel in field {-1,+1}?
predictions.append(prediction)
return np.concatenate(predictions)
@ -181,7 +196,7 @@ class FullAuthorClassifier(nn.Module):
xi = self.padder.transform(xi)
phi = self.projector(xi)
logits = self.ff(phi)
prediction = tensor2numpy( torch.argmax(logits, dim=1).view(-1), device=self.device)
prediction = tensor2numpy( torch.argmax(logits, dim=1).view(-1))
predictions.append(prediction)
return np.concatenate(predictions)
@ -209,41 +224,6 @@ class FFProjection(nn.Module):
x = self.ff[-1](x)
return x
class RNNProjection(nn.Module):
def __init__(self, vocab_size, hidden_size, output_size, device='cpu'):
super(RNNProjection, self).__init__()
self.output_size = output_size
self.hidden_size = hidden_size
self.vocab_size = vocab_size
self.num_layers=1
self.num_directions=1
self.device=device
self.embedding = nn.Embedding(vocab_size, hidden_size).to(device)
self.rnn = nn.GRU(
input_size=hidden_size,
hidden_size=hidden_size,
num_layers=self.num_layers,
bidirectional=(self.num_directions == 2),
batch_first=True
).to(device)
self.projection = nn.Linear(self.num_layers * self.num_directions * self.hidden_size, output_size).to(device)
def init_hidden(self, batch_size):
return torch.zeros(self.num_layers * self.num_directions, batch_size, self.hidden_size).to(self.device)
def forward(self, input):
x = torch.as_tensor(input).to(self.device)
batch_size = x.shape[0]
x = self.embedding(x)
output, hn = self.rnn(x, self.init_hidden(batch_size))
hn = hn.view(self.num_layers, self.num_directions, batch_size, self.hidden_size)
hn = hn.permute(2, 0, 1, 3).reshape(batch_size, -1)
return self.projection(hn)
def space_dimensions(self):
return self.output_size
class Batch:
def __init__(self, batch_size, n_epochs, shuffle=True):
@ -300,7 +280,7 @@ class TwoClassBatch:
class Padding:
def __init__(self, pad_index, max_length, dynamic=True, pad_at_end=True):
def __init__(self, pad_index, max_length, dynamic=True, pad_at_end=True, device='cpu'):
"""
:param pad_index: the index representing the PAD token
:param max_length: the length that defines the padding
@ -313,6 +293,7 @@ class Padding:
self.max_length = max_length
self.dynamic = dynamic
self.pad_at_end = pad_at_end
self.device = device
def transform(self, X):
"""
@ -327,4 +308,8 @@ class Padding:
padded = [x + [self.pad] * (pad_length - x_len) for x, x_len in zip(X, lengths)]
else:
padded = [[self.pad] * (pad_length - x_len) + x for x, x_len in zip(X, lengths)]
return np.asarray(padded, dtype=int)
return torch.from_numpy(np.asarray(padded, dtype=int)).to(self.device)
def tensor2numpy(t):
return t.to('cpu').detach().numpy()

View File

@ -44,5 +44,40 @@ class CNNProjection(nn.Module):
logit = self.fc1(x) # (N, C)
return logit
def space_dimensions(self):
return self.output_size
class RNNProjection(nn.Module):
def __init__(self, vocab_size, hidden_size, output_size, device='cpu'):
super(RNNProjection, self).__init__()
self.output_size = output_size
self.hidden_size = hidden_size
self.vocab_size = vocab_size
self.num_layers=1
self.num_directions=1
self.device = device
self.embedding = nn.Embedding(vocab_size, hidden_size).to(device)
self.rnn = nn.GRU(
input_size=hidden_size,
hidden_size=hidden_size,
num_layers=self.num_layers,
bidirectional=(self.num_directions == 2),
batch_first=True
).to(device)
self.projection = nn.Linear(self.num_layers * self.num_directions * self.hidden_size, output_size).to(device)
def init_hidden(self, batch_size):
return torch.zeros(self.num_layers * self.num_directions, batch_size, self.hidden_size).to(self.device)
def forward(self, x):
batch_size = x.shape[0]
x = self.embedding(x)
output, hn = self.rnn(x, self.init_hidden(batch_size))
hn = hn.view(self.num_layers, self.num_directions, batch_size, self.hidden_size)
hn = hn.permute(2, 0, 1, 3).reshape(batch_size, -1)
return self.projection(hn)
def space_dimensions(self):
return self.output_size