import numpy as np import torch import torch.nn as nn from tqdm import tqdm import math def tensor2numpy(t, device): if device == 'cpu': t = t.cpu() return t.detach().numpy() class AuthorshipAttributionClassifier(nn.Module): def __init__(self, projector, num_authors, pad_index, pad_length=500, device='cpu'): super(AuthorshipAttributionClassifier, self).__init__() self.projector = projector.to(device) self.ff = FFProjection(input_size=projector.space_dimensions(), hidden_sizes=[1024], output_size=num_authors).to(device) self.padder = Padding(pad_index=pad_index, max_length=pad_length, dynamic=True, pad_at_end=False) self.device = device def fit(self, X, y, batch_size, epochs, lr=0.001): self.train() batcher = Batch(batch_size=batch_size, n_epochs=epochs) criterion = torch.nn.CrossEntropyLoss().to(self.device) optim = torch.optim.Adam(self.parameters(), lr=lr) pbar = tqdm(range(batcher.n_epochs)) for epoch in pbar: losses = [] for xi, yi in batcher.epoch(X, y): optim.zero_grad() xi = self.padder.transform(xi) logits = self.forward(torch.as_tensor(xi).to(self.device)) loss = criterion(logits, torch.as_tensor(yi).to(self.device)) loss.backward() #clip_gradient(model) optim.step() losses.append(loss.item()) pbar.set_description(f'training epoch={epoch} loss={np.mean(losses):.5f}') def predict(self, x, batch_size=100): self.eval() batcher = Batch(batch_size=batch_size, n_epochs=1, shuffle=False) predictions = [] for xi in tqdm(batcher.epoch(x), desc='test'): xi = self.padder.transform(xi) logits = self.forward(torch.as_tensor(xi).to(self.device)) prediction = tensor2numpy(torch.argmax(logits, dim=1).view(-1), self.device) predictions.append(prediction) return np.concatenate(predictions) def forward(self, x): phi = self.projector(x) return self.ff(phi) class SameAuthorClassifier(nn.Module): def __init__(self, projector, num_authors, pad_index, pad_length=500, device='cpu'): super(SameAuthorClassifier, self).__init__() self.projector = projector.to(device) self.padder = Padding(pad_index=pad_index, max_length=pad_length, dynamic=True, pad_at_end=False) self.device = device def fit(self, X, y, batch_size, epochs, lr=0.001, steps_per_epoch=100): self.train() batcher = TwoClassBatch(batch_size=batch_size, n_epochs=epochs, steps_per_epoch=steps_per_epoch) optim = torch.optim.Adam(self.parameters(), lr=lr) pbar = tqdm(range(batcher.n_epochs)) for epoch in pbar: losses = [] for xi, yi in batcher.epoch(X, y): optim.zero_grad() xi = self.padder.transform(xi) phi = self.projector(xi) #normalize phi to have norm 1? maybe better as the last step of projector kernel = torch.matmul(phi, phi.T) ideal_kernel = torch.as_tensor(1 * (np.outer(1 + yi, 1 / (yi + 1)) == 1)).to(self.device) loss = KernelAlignmentLoss(kernel, ideal_kernel) loss.backward() #clip_gradient(model) optim.step() losses.append(loss.item()) pbar.set_description(f'training epoch={epoch} loss={np.mean(losses):.5f}') def predict(self, x, z, batch_size=100): self.eval() batcher = Batch(batch_size=batch_size, n_epochs=1, shuffle=False) predictions = [] for xi, zi in tqdm(batcher.epoch(x, z), desc='test'): xi = self.padder.transform(xi) zi = self.padder.transform(zi) inners = self.forward(xi, zi) prediction = tensor2numpy(inners, device=self.device) > 0.5 # is this correct? should it be > 0 and the ideal kernel in field {-1,+1}? predictions.append(prediction) return np.concatenate(predictions) def forward(self, x, z): assert x.shape == z.shape, 'shape mismatch between matrices x and z' phi_x = self.projector(x) phi_z = self.projector(z) rows, cols = phi_x.shape pairwise_inners = torch.bmm(phi_x.view(rows, 1, cols), phi_z.view(rows, cols, 1)).squeeze() return pairwise_inners class FullAuthorClassifier(nn.Module): def __init__(self, projector, num_authors, pad_index, pad_length=500, device='cpu'): super(FullAuthorClassifier, self).__init__() self.projector = projector.to(device) self.ff = FFProjection(input_size=projector.space_dimensions(), hidden_sizes=[1024], output_size=num_authors).to(device) self.padder = Padding(pad_index=pad_index, max_length=pad_length, dynamic=True, pad_at_end=False) self.device = device def fit(self, X, y, batch_size, epochs, lr=0.001, steps_per_epoch=100): self.train() batcher = TwoClassBatch(batch_size=batch_size, n_epochs=epochs, steps_per_epoch=steps_per_epoch) criterion = torch.nn.CrossEntropyLoss().to(self.device) optim = torch.optim.Adam(self.parameters(), lr=lr) alpha = 0.5 pbar = tqdm(range(batcher.n_epochs)) for epoch in pbar: losses, sav_losses, attr_losses = [], [], [] for xi, yi in batcher.epoch(X, y): optim.zero_grad() xi = self.padder.transform(xi) phi = self.projector(xi) #normalize phi to have norm 1? maybe better as the last step of projector #sav-loss kernel = torch.matmul(phi, phi.T) ideal_kernel = torch.as_tensor(1 * (np.outer(1 + yi, 1 / (yi + 1)) == 1)).to(self.device) sav_loss = KernelAlignmentLoss(kernel, ideal_kernel) sav_losses.append(sav_loss.item()) #attr-loss logits = self.ff(phi) attr_loss = criterion(logits, torch.as_tensor(yi).to(self.device)) attr_losses.append(attr_loss.item()) #loss loss = (alpha)*sav_loss + (1-alpha)*attr_loss losses.append(loss.item()) loss.backward() #clip_gradient(model) optim.step() pbar.set_description( f'training epoch={epoch} ' f'sav-loss={np.mean(sav_losses):.5f} ' f'attr-loss={np.mean(attr_losses):.5f} ' f'loss={np.mean(losses):.5f}' ) def predict_sav(self, x, z, batch_size=100): self.eval() batcher = Batch(batch_size=batch_size, n_epochs=1, shuffle=False) predictions = [] for xi, zi in tqdm(batcher.epoch(x, z), desc='test'): xi = self.padder.transform(xi) zi = self.padder.transform(zi) phi_xi = self.projector(xi) phi_zi = self.projector(zi) rows, cols = phi_xi.shape pairwise_inners = torch.bmm(phi_xi.view(rows, 1, cols), phi_zi.view(rows, cols, 1)).squeeze() prediction = tensor2numpy(pairwise_inners, device=self.device) > 0.5 # is this correct? should it be > 0 and the ideal kernel in field {-1,+1}? predictions.append(prediction) return np.concatenate(predictions) def predict_labels(self, x, batch_size=100): self.eval() batcher = Batch(batch_size=batch_size, n_epochs=1, shuffle=False) predictions = [] for xi in tqdm(batcher.epoch(x), desc='test'): xi = self.padder.transform(xi) phi = self.projector(xi) logits = self.ff(phi) prediction = tensor2numpy( torch.argmax(logits, dim=1).view(-1), device=self.device) predictions.append(prediction) return np.concatenate(predictions) def KernelAlignmentLoss(K, Y): n_el = K.shape[0]*K.shape[1] loss = torch.norm(K - Y, p='fro') # in Nello's paper this is different loss = loss / n_el # this is in order to factor out the accumulation which is only due to the size return loss class FFProjection(nn.Module): def __init__(self, input_size, hidden_sizes, output_size, activation=nn.functional.relu, dropout=0.5): super(FFProjection, self).__init__() sizes = [input_size] + hidden_sizes + [output_size] self.ff = nn.ModuleList([ nn.Linear(sizes[i], sizes[i+1]) for i in range(len(sizes)-1) ]) self.activation = activation self.dropout = nn.Dropout(p=dropout) def forward(self, x): for linear in self.ff[:-1]: x = self.dropout(self.activation(linear(x))) x = self.ff[-1](x) return x class RNNProjection(nn.Module): def __init__(self, vocab_size, hidden_size, output_size, device='cpu'): super(RNNProjection, self).__init__() self.output_size = output_size self.hidden_size = hidden_size self.vocab_size = vocab_size self.num_layers=1 self.num_directions=1 self.device=device self.embedding = nn.Embedding(vocab_size, hidden_size).to(device) self.rnn = nn.GRU( input_size=hidden_size, hidden_size=hidden_size, num_layers=self.num_layers, bidirectional=(self.num_directions == 2), batch_first=True ).to(device) self.projection = nn.Linear(self.num_layers * self.num_directions * self.hidden_size, output_size).to(device) def init_hidden(self, batch_size): return torch.zeros(self.num_layers * self.num_directions, batch_size, self.hidden_size).to(self.device) def forward(self, input): x = torch.as_tensor(input).to(self.device) batch_size = x.shape[0] x = self.embedding(x) output, hn = self.rnn(x, self.init_hidden(batch_size)) hn = hn.view(self.num_layers, self.num_directions, batch_size, self.hidden_size) hn = hn.permute(2, 0, 1, 3).reshape(batch_size, -1) return self.projection(hn) def space_dimensions(self): return self.output_size class Batch: def __init__(self, batch_size, n_epochs, shuffle=True): self.batch_size = batch_size self.n_epochs = n_epochs self.shuffle = shuffle self.current_epoch = 0 def epoch(self, *args): lengths = list(map(len, args)) assert max(lengths) == min(lengths), 'inconsistent sizes in args' n_batches = math.ceil(lengths[0] / self.batch_size) offset = 0 if self.shuffle: index = np.random.permutation(len(args[0])) args = [arg[index] for arg in args] for b in range(n_batches): batch_idx = slice(offset, offset+self.batch_size) batch = [arg[batch_idx] for arg in args] yield batch if len(batch) > 1 else batch[0] offset += self.batch_size self.current_epoch += 1 class TwoClassBatch: """ given a X and y (multi-label) produces batches of elements of X, y for two classes (e.g., c1, c2) of equal size, i.e., the batch is [(x1,c1), ..., (xn,c1), (xn+1,c2), ..., (x2n,c2)] """ def __init__(self, batch_size, n_epochs, steps_per_epoch): self.batch_size = batch_size self.n_epochs = n_epochs self.steps_per_epoch = steps_per_epoch self.current_epoch = 0 if self.batch_size % 2 != 0: raise ValueError('warning, batch size is not even') def epoch(self, X, y): n_el = len(y) assert X.shape[0] == n_el, 'inconsistent sizes in X, y' classes = np.unique(y) groups = {ci: X[y==ci] for ci in classes} class_prevalences = [len(groups[ci])/n_el for ci in classes] n_choices = self.batch_size // 2 for b in range(self.steps_per_epoch): class1, class2 = np.random.choice(classes, p=class_prevalences, size=2, replace=False) X1 = np.random.choice(groups[class1], size=n_choices) X2 = np.random.choice(groups[class2], size=n_choices) X_batch = np.concatenate([X1,X2]) y_batch = np.repeat([class1, class2], repeats=[n_choices,n_choices]) yield X_batch, y_batch self.current_epoch += 1 class Padding: def __init__(self, pad_index, max_length, dynamic=True, pad_at_end=True): """ :param pad_index: the index representing the PAD token :param max_length: the length that defines the padding :param dynamic: if True (default) pads at min(max_length, max_local_length) where max_local_length is the length of the longest example :param pad_at_end: if True, the pad tokens are added at the end of the lists, if otherwise they are added at the beginning """ self.pad = pad_index self.max_length = max_length self.dynamic = dynamic self.pad_at_end = pad_at_end def transform(self, X): """ :param X: a list of lists of indexes (integers) :return: a ndarray of shape (n,m) where n is the number of elements in X and m is the pad length (the maximum in elements of X if dynamic, or self.max_length if otherwise) """ X = [x[:self.max_length] for x in X] lengths = list(map(len, X)) pad_length = min(max(lengths), self.max_length) if self.dynamic else self.max_length if self.pad_at_end: padded = [x + [self.pad] * (pad_length - x_len) for x, x_len in zip(X, lengths)] else: padded = [[self.pad] * (pad_length - x_len) + x for x, x_len in zip(X, lengths)] return np.asarray(padded, dtype=int)