当前位置:网站首页>Pytorch implementation of transformer
Pytorch implementation of transformer
2022-04-23 10:48:00 【qq1033930618】
One 、 For word vector form implementation
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import math, copy, time
from torch.autograd import Variable
class EncoderDecoder(nn.Module):
""" A standard Encoder-Decoder architecture. Base for this and many other models. """
def __init__(self, encoder, decoder, src_embed, tgt_embed, generator):
super(EncoderDecoder, self).__init__()
self.encoder = encoder
self.decoder = decoder
self.src_embed = src_embed # mapping [B,L] -> [B,L,d_model]
self.tgt_embed = tgt_embed # mapping [B,L_out] -> [B,L_out,d_model_out]
self.generator = generator
def forward(self, src, tgt, src_mask, tgt_mask):
""" Take in and process masked src and target sequences. """
return self.decode(self.encode(src, src_mask), src_mask, tgt, tgt_mask)
def encode(self, src, src_mask):
""" call Encoder class among x change but mask unchanged src_embed [batch,len,d_model], src_mask [batch,1,len]->[batch,1,1,len] src Not in 0 place True """
return self.encoder(self.src_embed(src), src_mask)
def decode(self, memory, src_mask, tgt, tgt_mask):
""" tgt_embed [batch,len-1,d_model] memory [batch,len,d_model] src_mask [batch,1,1,len] tgt_mask [batch,len-1,len-1] among [batch,len-1] yes tgt Remove the last column Copy each sentence several times -> [batch,1,len-1,len-1] The upper right corner of the mask is False Lower left corner and main diagonal Not for 0 The word for is True """
return self.decoder(self.tgt_embed(tgt), memory, src_mask, tgt_mask)
class Generator(nn.Module):
""" Define standard linear + softmax generation step. [B,L,d_model] -> [B,L,vocab] """
def __init__(self, d_model, vocab):
super(Generator, self).__init__()
self.proj = nn.Linear(d_model, vocab)
def forward(self, x):
return F.log_softmax(self.proj(x), dim=-1) # softmax The result is logarithmic Mapping to a negative interval Prevent overflow
def clones(module, N):
""" Produce N identical layers. Returns a list of Put the input module Copy N Time """
return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])
class Encoder(nn.Module):
"""Core encoder is a stack of N layers"""
def __init__(self, layer, N):
super(Encoder, self).__init__()
self.layers = clones(layer, N)
self.norm = LayerNorm(layer.size)
def forward(self, x, mask):
"""Pass the input (and mask) through each layer in turn."""
for layer in self.layers:
x = layer(x, mask)
return self.norm(x)
class LayerNorm(nn.Module):
""" Construct a layernorm module (See citation for details). initialization features yes .shape form need x.shape initialization [B,L,d_model] -> [B,L,d_model] Subtract the mean by the last dimension Variance estimation """
def __init__(self, features, eps=1e-6):
super(LayerNorm, self).__init__()
self.a_2 = nn.Parameter(torch.ones(features))
self.b_2 = nn.Parameter(torch.zeros(features))
self.eps = eps
def forward(self, x):
mean = x.mean(-1, keepdim=True)
std = x.std(-1, keepdim=True)
return self.a_2 * (x - mean) / (std + self.eps) + self.b_2
class SublayerConnection(nn.Module):
""" A residual connection followed by a layer norm. Note for code simplicity the norm is first as opposed to last. initialization x.shape dropout Parameters Input x And a certain floor Output The dimensions remain the same amount to norm sublayer dropout residual """
def __init__(self, size, dropout):
super(SublayerConnection, self).__init__()
self.norm = LayerNorm(size)
self.dropout = nn.Dropout(dropout) # Some increase proportionally Some places 0
def forward(self, x, sublayer):
"""Apply residual connection to any sublayer with the same size."""
return x + self.dropout(sublayer(self.norm(x)))
class EncoderLayer(nn.Module):
""" Encoder is made up of self-attn and feed forward (defined below) Pay more attention to yourself In another feed_forward """
def __init__(self, size, self_attn, feed_forward, dropout):
super(EncoderLayer, self).__init__()
self.self_attn = self_attn
self.feed_forward = feed_forward
self.sublayer = clones(SublayerConnection(size, dropout), 2)
self.size = size
def forward(self, x, mask):
""" Follow Figure 1 (left) for connections. lambda For anonymous functions """
x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, mask))
return self.sublayer[1](x, self.feed_forward)
class Decoder(nn.Module):
"""Generic N layer decoder with masking."""
def __init__(self, layer, N):
super(Decoder, self).__init__()
self.layers = clones(layer, N)
self.norm = LayerNorm(layer.size)
def forward(self, x, memory, src_mask, tgt_mask):
for layer in self.layers:
x = layer(x, memory, src_mask, tgt_mask)
return self.norm(x)
class DecoderLayer(nn.Module):
""" Decoder is made of self-attn, src-attn, and feed forward (defined below) every last DecoderLayer Of memory It's all the same Many times EncoderLayer After the output results """
def __init__(self, size, self_attn, src_attn, feed_forward, dropout):
super(DecoderLayer, self).__init__()
self.size = size
self.self_attn = self_attn
self.src_attn = src_attn
self.feed_forward = feed_forward
self.sublayer = clones(SublayerConnection(size, dropout), 3)
def forward(self, x, memory, src_mask, tgt_mask):
"""Follow Figure 1 (right) for connections."""
m = memory
x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, tgt_mask))
x = self.sublayer[1](x, lambda x: self.src_attn(x, m, m, src_mask))
return self.sublayer[2](x, self.feed_forward)
def subsequent_mask(size):
""" Mask out subsequent positions. triu Returns the upper triangular matrix [1,size,size] Upper right corner F Lower left corner and diagonal True """
attn_shape = (1, size, size)
subsequent_mask = np.triu(np.ones(attn_shape), k=1).astype('uint8')
return torch.from_numpy(subsequent_mask) == 0
def attention(query, key, value, mask=None, dropout=None):
""" Compute 'Scaled Dot Product Attention' qkv[B,h,L,d_model/h] src_mask[B,1,1,L] tgt_mask[] score(qkT)[B,h,L,L] src_mask[B,1,1,L] -> [B,h,L,L] B tube B L Mask all innermost layers only score(qkT)[B,h,L-1,L-1] tgt_mask[B,1,L-1,L-1] [[q1k1,q1k2,q1k3] [q2k1,q2k2,q2k3] [q3k1,q3k2,q3k3]] tgt The three elements in the upper right corner are set to 0 src Don't pay attention to 0 q Don't pay attention to 0 Situated k Pay attention to each other q[B,h,L-1,d_model/h] kv[B,h,L,d_model/h] [[q1k1 q1k2->0 q1k3->0] [q2k1 q2k2 q2k3->0]] """
d_k = query.size(-1)
scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
# score [B,L,L] mask [B,L,L] or [L,L] radio broadcast mask In Chinese, it means 0 The place of score The corresponding place in the is filled with infinitesimal
if mask is not None:
scores = scores.masked_fill(mask == 0, -1e9)
p_attn = F.softmax(scores, dim=-1)
if dropout is not None:
p_attn = dropout(p_attn)
return torch.matmul(p_attn, value), p_attn
class MultiHeadedAttention(nn.Module):
def __init__(self, h, d_model, dropout=0.1):
""" Take in model size and number of heads. [B,L,d_model] -> [B,L,d_model] """
super(MultiHeadedAttention, self).__init__()
assert d_model % h == 0 # Not for 0 Throw an exception
# We assume d_v always equals d_k
self.d_k = d_model // h
self.h = h
self.linears = clones(nn.Linear(d_model, d_model), 4)
self.attn = None
self.dropout = nn.Dropout(p=dropout)
def forward(self, query, key, value, mask=None):
"""Implements Figure 2"""
if mask is not None:
# Same mask applied to all h heads.
mask = mask.unsqueeze(1) # mask[B,1,1,L]
nbatches = query.size(0)
# 1) Do all the linear projections in batch from d_model => h x d_k
# [B,L,d_model] Linearization [B,L,d_model]->[B,L,h,d_model/h]->[B,h,L,d_model/h]
query, key, value = [l(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2) for l, x in zip(self.linears, (query, key, value))]
# 2) Apply attention on all the projected vectors in batch.
x, self.attn = attention(query, key, value, mask=mask, dropout=self.dropout)
# x [B,h,L,d_model/h] self.attn [B,h,L,L]
# 3) "Concat" using a view and apply a final linear.
x = x.transpose(1, 2).contiguous().view(nbatches, -1, self.h * self.d_k)
# x [B,h,L,d_model/h] -> [B,L,h,d_model/h] -> [B,L,d_model]
return self.linears[-1](x)
class PositionwiseFeedForward(nn.Module):
""" Implements FFN equation. Two MLP [B,L,d_model]->[B,L,d_model] """
def __init__(self, d_model, d_ff, dropout=0.1):
super(PositionwiseFeedForward, self).__init__()
self.w_1 = nn.Linear(d_model, d_ff)
self.w_2 = nn.Linear(d_ff, d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
return self.w_2(self.dropout(F.relu(self.w_1(x))))
class Embeddings(nn.Module):
""" [B,L] -> [B,L,d_model] [B,L] The elements in are natural numbers vocab At least larger than the largest natural number 1 d_model Dimensions you want to embed vocab Is greater than or equal to the maximum number of index words +1 """
def __init__(self, d_model, vocab):
super(Embeddings, self).__init__()
self.lut = nn.Embedding(vocab, d_model)
self.d_model = d_model
def forward(self, x):
return self.lut(x) * math.sqrt(self.d_model)
class PositionalEncoding(nn.Module):
""" Implement the PE function. Add sentence position coding [B,L,d_model]->[B,L,d_model] """
def __init__(self, d_model, dropout, max_len=5000):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(p=dropout)
# Compute the positional encodings once in log space.
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len).unsqueeze(1) # [max_len,1] [[0],[1],...,[max_len-1]]
div_term = torch.exp(torch.arange(0, d_model, 2) * (-(math.log(10000.0) / d_model))) # [0,2,4,..., barring d_model]
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0)
self.register_buffer('pe', pe) # When the network is updated, it does not update
def forward(self, x):
# shape [1,x.size(1),d_model] Broadcast mechanism be-all batch That is, each sentence With the same coding matrix
x = x + Variable(self.pe[:, :x.size(1)], requires_grad=False)
return self.dropout(x)
def make_model(src_vocab, tgt_vocab, N=6, d_model=512, d_ff=2048, h=8, dropout=0.1):
"""Helper: Construct a model from hyperparameters."""
c = copy.deepcopy
attn = MultiHeadedAttention(h, d_model)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
position = PositionalEncoding(d_model, dropout)
model = EncoderDecoder(
Encoder(EncoderLayer(d_model, c(attn), c(ff), dropout), N), # Two coding layers are connected in series
Decoder(DecoderLayer(d_model, c(attn), c(attn), c(ff), dropout), N), # Two decoding layers are connected in series Two free memory Don't pick up
nn.Sequential(Embeddings(d_model, src_vocab), c(position)), # Sequential reference container src_vocab There are several kinds of words entered
nn.Sequential(Embeddings(d_model, tgt_vocab), c(position)), # Sequential reference container tgt_vocab There are several kinds of words output
Generator(d_model, tgt_vocab)) # One MLP
# This was important from their code.
# Initialize parameters with Glorot / fan_avg. Initialize parameters
for p in model.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
return model
class Batch:
"""Object for holding a batch of data with mask during training."""
def __init__(self, src, trg=None, pad=0):
self.src = src
self.src_mask = (src != pad).unsqueeze(-2) # Add a dimension src[batch,len] src_mask[batch,1,len] src Not in 0 place True
if trg is not None:
self.trg = trg[:, :-1] # trg Remove last column [batch,len-1]
self.trg_y = trg[:, 1:] # trg Remove the first column [batch,len-1]
self.trg_mask = self.make_std_mask(self.trg, pad)
self.ntokens = (self.trg_y != pad).data.sum() # return trg Remove the first column Not 0 The number of element
# Static methods Methods can also be called directly through classes without instantiating classes
@staticmethod
def make_std_mask(tgt, pad):
"""Create a mask to hide padding and future words."""
tgt_mask = (tgt != pad).unsqueeze(-2) # [batch,1,len-1] trg Remove last column Not for 0True
tgt_mask = tgt_mask & Variable(subsequent_mask(tgt.size(-1)).type_as(tgt_mask.data)) # [1,len-1,len-1] Upper right corner F Lower left corner and diagonal True
# [batch,len-1,len-1]
# First broadcast the sentence [batch,len-1]->[batch,len-1,len-1] Each sentence was originally represented by a vector Now repeat the vector again
# [B,i,:] see B A sentence The first i It means to look before i Word
return tgt_mask
def run_epoch(data_iter, model, loss_compute):
"""Standard Training and Logging Function"""
start = time.time()
total_tokens = 0
total_loss = 0
tokens = 0
for i, batch in enumerate(data_iter):
out = model.forward(batch.src, batch.trg, batch.src_mask, batch.trg_mask)
# batch.src [batch,len]
# batch.trg [batch,len]
# batch.src_mask [batch,1,len] src Not in 0 place True
# batch.trg_mask [batch,len-1,len-1] trg Remove last column repeat len-1 Time Not for 0True And then with the top right F The intersection
# batch.trg_y [batch,len-1] trg Remove the first column
# batch.ntokens return trg Remove the first column Not 0 The number of element
# out [batch,len-1,d_model]->[batch,len-1,vocab]
loss = loss_compute(out, batch.trg_y, batch.ntokens)
total_loss += loss
total_tokens += batch.ntokens
tokens += batch.ntokens
if i % 50 == 1:
elapsed = time.time() - start
print("Epoch Step: %d Loss: %f Tokens per Sec: %f" % (i, loss / batch.ntokens, tokens / elapsed))
start = time.time()
tokens = 0
return total_loss / total_tokens
global max_src_in_batch, max_tgt_in_batch
def batch_size_fn(new, count, sofar):
"""Keep augmenting batch and calculate total number of tokens + padding."""
global max_src_in_batch, max_tgt_in_batch
if count == 1:
max_src_in_batch = 0
max_tgt_in_batch = 0
max_src_in_batch = max(max_src_in_batch, len(new.src))
max_tgt_in_batch = max(max_tgt_in_batch, len(new.trg) + 2)
src_elements = count * max_src_in_batch
tgt_elements = count * max_tgt_in_batch
return max(src_elements, tgt_elements)
class NoamOpt:
"""Optim wrapper that implements rate."""
def __init__(self, model_size, factor, warmup, optimizer):
self.optimizer = optimizer
self._step = 0
self.warmup = warmup
self.factor = factor
self.model_size = model_size
self._rate = 0
def step(self):
"""Update parameters and rate"""
self._step += 1
rate = self.rate()
for p in self.optimizer.param_groups:
p['lr'] = rate
self._rate = rate
self.optimizer.step()
def rate(self, step=None):
"""Implement `lrate` above"""
if step is None:
step = self._step
return self.factor * (self.model_size ** (-0.5) * min(step ** (-0.5), step * self.warmup ** (-1.5)))
def get_std_opt(model):
return NoamOpt(model.src_embed[0].d_model, 2, 4000,
torch.optim.Adam(model.parameters(), lr=0, betas=(0.9, 0.98), eps=1e-9))
class LabelSmoothing(nn.Module):
"""Implement label smoothing."""
def __init__(self, size, padding_idx, smoothing=0.0):
super(LabelSmoothing, self).__init__()
self.criterion = nn.KLDivLoss(reduction='sum')
self.padding_idx = padding_idx
self.confidence = 1.0 - smoothing
self.smoothing = smoothing
self.size = size
self.true_dist = None
def forward(self, x, target):
# x [B(l-1),V] target [B(l-1)]
assert x.size(1) == self.size
true_dist = x.data.clone() # Copy a x To true_dist
true_dist.fill_(self.smoothing / (self.size - 2)) # true_dist All numbers in the are replaced with super parameters
# target.data.unsqueeze(1) [B(l-1),1]
print(true_dist.shape)
print(target.data.unsqueeze(1).shape)
true_dist.scatter_(1, target.data.unsqueeze(1), self.confidence) # Fill or modify elements
# hold true_dist Replace some elements in with self.confidence In every line target Index element
true_dist[:, self.padding_idx] = 0 # All right 0 As a 0
print(target.data.shape)
print(target.data)
mask = torch.nonzero(target.data == self.padding_idx) # Write down k A for 0(self.padding_idx) Element location ( The latter formula holds )
print(mask)
print(mask.dim())
# mask.dim()=2 [[1],[4],[7]]
if mask.dim() > 0:
true_dist.index_fill_(0, mask.squeeze(), 0.0) # padding Location All elements of the row need to be set to 0 As long as the source is pad place The corresponding mapping vector is set to 0
self.true_dist = true_dist
print(x.shape)
print(true_dist.shape)
return self.criterion(x, Variable(true_dist, requires_grad=False))
def data_gen(V, batch, nbatches):
"""Generate random data for a src-tgt copy task."""
for i in range(nbatches):
data = torch.from_numpy(np.random.randint(1, V, size=(batch, 10)))
data[:, 0] = 1
src = Variable(data, requires_grad=False) # [batch, len]
tgt = Variable(data, requires_grad=False) # [batch, len]
yield Batch(src, tgt, 0)
class SimpleLossCompute:
"""A simple loss compute and train function."""
def __init__(self, generator, criterion, opt=None):
self.generator = generator
self.criterion = criterion
self.opt = opt
def __call__(self, x, y, norm):
x = self.generator(x)
# y A one-dimensional x A two-dimensional .contiguous() Often with .view() Continuous use Indicates opening up new memory
# x [B,len-1,V] -> [B(len-1),V] V Refers to how many words are mapped to
# y [B,len-1] -> [B(len-1)]
loss = self.criterion(x.contiguous().view(-1, x.size(-1)),
y.contiguous().view(-1)) / norm
loss.backward()
if self.opt is not None:
self.opt.step()
self.opt.optimizer.zero_grad()
return loss.item() * norm
# # Train the simple copy task.
# V = 11 # There are a total of inputs and outputs 11 Kind of word
# criterion = LabelSmoothing(size=V, padding_idx=0, smoothing=0.0)
# model = make_model(V, V, N=2)
# model_opt = NoamOpt(model.src_embed[0].d_model, 1, 400, torch.optim.Adam(model.parameters(), lr=0, betas=(0.9, 0.98), eps=1e-9))
#
# for epoch in range(10):
# model.train()
# run_epoch(data_gen(V, 30, 20), model, SimpleLossCompute(model.generator, criterion, model_opt))
# model.eval()
# print(run_epoch(data_gen(V, 30, 5), model, SimpleLossCompute(model.generator, criterion, None)))
# # for i, batch in enumerate(data_gen(11, 30, 20)):
# # print(i)
# # print(batch)
my_batch = 3
my_len_src = 5
my_len_tgt = 7
my_src_vocab = 15
my_tgt_vocab = 20
my_src = torch.tensor([[2,3,7,4,0],[12,5,7,0,0],[13,4,2,8,5]])
my_tgt = torch.tensor([[1,15,2,3,4,0,0],[1,18,3,1,0,0,0],[4,17,5,2,0,0,0]])
my_src_mask = torch.ones([my_batch,1,my_len_src])
my_tgt_mask = torch.ones([my_batch,my_len_tgt,my_len_tgt])
my_model = make_model(my_src_vocab, my_tgt_vocab)
print(my_model(my_src,my_tgt,my_src_mask,my_tgt_mask).shape)
# [my_batch,my_len_tgt,d_model]
Two 、 General form implementation
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import math, copy
from torch.autograd import Variable
'''
model = make_model(d_input, d_output)
Inputs [batch, input_len, d_input]
Outputs [batch, output_len, d_output]
Output Probabilities [batch, output_len, d_prob]
src_mask = torch.ones([batch,1,input_len]) [batch,1,input_len] ( Used to process each input batch Effective length len Inconsistency )
tgt_mask = subsequent_mask(output_len) [1,output_len,output_len] ( On the top right is False Bottom left and diagonal True)
model(Inputs,Outputs,src_mask,tgt_mask)
src [batch,input_len,d_model]
tgt [batch,output_len,d_model]
'''
class EncoderDecoder(nn.Module):
"""
A standard Encoder-Decoder architecture. Base for this and many
other models.
"""
def __init__(self, encoder, decoder, src_embed, tgt_embed, generator):
super(EncoderDecoder, self).__init__()
self.encoder = encoder
self.decoder = decoder
self.src_embed = src_embed # mapping [B,L] -> [B,L,d_model]
self.tgt_embed = tgt_embed # mapping [B,L_out] -> [B,L_out,d_model_out]
self.generator = generator
def forward(self, src, tgt, src_mask, tgt_mask):
"""
Take in and process masked src and target sequences.
"""
return self.generator(self.decode(self.encode(src, src_mask), src_mask, tgt, tgt_mask))
def encode(self, src, src_mask):
"""
call Encoder class among x change but mask unchanged
src_embed [batch,len,d_model], src_mask [batch,1,len]->[batch,1,1,len] src Not in 0 place True
"""
return self.encoder(self.src_embed(src), src_mask)
def decode(self, memory, src_mask, tgt, tgt_mask):
"""
tgt_embed [batch,len-1,d_model]
memory [batch,len,d_model]
src_mask [batch,1,1,len]
tgt_mask [batch,len-1,len-1] among [batch,len-1] yes tgt Remove the last column Copy each sentence several times -> [batch,1,len-1,len-1]
The upper right corner of the mask is False Lower left corner and main diagonal Not for 0 The word for is True
"""
return self.decoder(self.tgt_embed(tgt), memory, src_mask, tgt_mask)
class Generator(nn.Module):
"""
Define standard linear + softmax generation step.
[B,L,d_model] -> [B,L,vocab]
"""
def __init__(self, d_model, vocab):
super(Generator, self).__init__()
self.proj = nn.Linear(d_model, vocab)
def forward(self, x):
return F.log_softmax(self.proj(x), dim=-1) # softmax The result is logarithmic Mapping to a negative interval Prevent overflow
def clones(module, N):
"""
Produce N identical layers.
Returns a list of Put the input module Copy N Time
"""
return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])
class Encoder(nn.Module):
"""Core encoder is a stack of N layers"""
def __init__(self, layer, N):
super(Encoder, self).__init__()
self.layers = clones(layer, N)
self.norm = LayerNorm(layer.size)
def forward(self, x, mask):
"""Pass the input (and mask) through each layer in turn."""
for layer in self.layers:
x = layer(x, mask)
return self.norm(x)
class LayerNorm(nn.Module):
"""
Construct a layernorm module (See citation for details).
initialization features yes .shape form need x.shape The last dimension size initialization
[B,L,d_model] -> [B,L,d_model] Subtract the mean by the last dimension Variance estimation
"""
def __init__(self, features, eps=1e-6):
super(LayerNorm, self).__init__()
self.a_2 = nn.Parameter(torch.ones(features))
self.b_2 = nn.Parameter(torch.zeros(features))
self.eps = eps
def forward(self, x):
mean = x.mean(-1, keepdim=True)
std = x.std(-1, keepdim=True)
return self.a_2 * (x - mean) / (std + self.eps) + self.b_2
class SublayerConnection(nn.Module):
"""
A residual connection followed by a layer norm.
Note for code simplicity the norm is first as opposed to last.
initialization x.shape The last dimension dropout Parameters
Input x And a certain floor Output The dimensions remain the same amount to norm sublayer dropout residual
"""
def __init__(self, size, dropout):
super(SublayerConnection, self).__init__()
self.norm = LayerNorm(size)
self.dropout = nn.Dropout(dropout) # Some increase proportionally Some places 0
def forward(self, x, sublayer):
"""Apply residual connection to any sublayer with the same size."""
return x + self.dropout(sublayer(self.norm(x)))
class EncoderLayer(nn.Module):
"""
Encoder is made up of self-attn and feed forward (defined below)
Pay more attention to yourself In another feed_forward
"""
def __init__(self, size, self_attn, feed_forward, dropout):
super(EncoderLayer, self).__init__()
self.self_attn = self_attn
self.feed_forward = feed_forward
self.sublayer = clones(SublayerConnection(size, dropout), 2)
self.size = size
def forward(self, x, mask):
"""
Follow Figure 1 (left) for connections.
lambda For anonymous functions
"""
x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, mask))
return self.sublayer[1](x, self.feed_forward)
class Decoder(nn.Module):
"""Generic N layer decoder with masking."""
def __init__(self, layer, N):
super(Decoder, self).__init__()
self.layers = clones(layer, N)
self.norm = LayerNorm(layer.size)
def forward(self, x, memory, src_mask, tgt_mask):
for layer in self.layers:
x = layer(x, memory, src_mask, tgt_mask)
return self.norm(x)
class DecoderLayer(nn.Module):
"""
Decoder is made of self-attn, src-attn, and feed forward (defined below)
every last DecoderLayer Of memory It's all the same Many times EncoderLayer After the output results
"""
def __init__(self, size, self_attn, src_attn, feed_forward, dropout):
super(DecoderLayer, self).__init__()
self.size = size
self.self_attn = self_attn
self.src_attn = src_attn
self.feed_forward = feed_forward
self.sublayer = clones(SublayerConnection(size, dropout), 3)
def forward(self, x, memory, src_mask, tgt_mask):
"""Follow Figure 1 (right) for connections."""
m = memory
x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, tgt_mask))
x = self.sublayer[1](x, lambda x: self.src_attn(x, m, m, src_mask))
return self.sublayer[2](x, self.feed_forward)
def subsequent_mask(size):
"""
Mask out subsequent positions.
triu Returns the upper triangular matrix
[1,size,size] Upper right corner F Lower left corner and diagonal True
"""
attn_shape = (1, size, size)
subsequent_mask = np.triu(np.ones(attn_shape), k=1).astype('uint8')
return torch.from_numpy(subsequent_mask) == 0
def attention(query, key, value, mask=None, dropout=None):
"""
Compute 'Scaled Dot Product Attention'
code
qkv[B,h,L,d_model/h] src_mask[B,1,L]->[B,1,1,L]
score(qkT)[B,h,L,L] src_mask[B,1,1,L] -> [B,h,L,L] B tube B L Mask all innermost layers only
[[q1k1,q1k2->-1e9,q1k3]
[q2k1,q2k2->-1e9,q2k3]
[q3k1,q3k2->-1e9,q3k3]] If the mask doesn't want the second word to participate in the calculation, everyone doesn't pay attention to the second word
decode
score(qkT)[B,h,L,L] tgt_mask[1,1,L,L]
[[q1k1,q1k2->-1e9,q1k3->-1e9]
[q2k1,q2k2 ,q2k3->-1e9]
[q3k1,q3k2 ,q3k3]] tgt The three elements in the upper right corner are set to -1e9 src Don't pay attention to 0 q Don't pay attention to 0 Situated k
Pay attention to each other
q[B,h,output_len,d_model/h] kv[B,h,input_len,d_model/h] src_mask[B,1,input_len]->[B,1,1,input_len]( Be sure to comply with kv.shape)
score(qkT)[B,h,output_len,input_len]
[[q1k1 q1k2->-1e9 q1k3]
[q2k1 q2k2->-1e9 q2k3]]
"""
d_k = query.size(-1)
scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
# scores code [batch,h,input_len,input_len] decode [batch,h,output_len,output_len] Pay attention to each other [batch,h,output_len,input_len]
# score [B,L,L] mask [B,L,L] or [L,L] radio broadcast mask In Chinese, it means 0 The place of score The corresponding place in the is filled with infinitesimal
if mask is not None:
scores = scores.masked_fill(mask == 0, -1e9)
p_attn = F.softmax(scores, dim=-1)
if dropout is not None:
p_attn = dropout(p_attn)
return torch.matmul(p_attn, value), p_attn
class MultiHeadedAttention(nn.Module):
def __init__(self, h, d_model, dropout=0.1):
"""
Take in model size and number of heads.
[B,L,d_model] -> [B,L,d_model]
"""
super(MultiHeadedAttention, self).__init__()
assert d_model % h == 0 # Not for 0 Throw an exception
# We assume d_v always equals d_k
self.d_k = d_model // h
self.h = h
self.linears = clones(nn.Linear(d_model, d_model), 4) # The first three are for qkv The last one is used to output
self.attn = None
self.dropout = nn.Dropout(p=dropout)
def forward(self, query, key, value, mask=None):
"""Implements Figure 2"""
if mask is not None:
# Same mask applied to all h heads.
mask = mask.unsqueeze(1) # mask[B,1,1,L]
nbatches = query.size(0)
# 1) Do all the linear projections in batch from d_model => h x d_k
# [B,L,d_model] Linearization [B,L,d_model]->[B,L,h,d_model/h]->[B,h,L,d_model/h]
query, key, value = [l(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2)
for l, x in zip(self.linears, (query, key, value))]
# 2) Apply attention on all the projected vectors in batch.
x, self.attn = attention(query, key, value, mask=mask, dropout=self.dropout)
# x [B,h,L,d_model/h] self.attn [B,h,L,L]
# 3) "Concat" using a view and apply a final linear.
x = x.transpose(1, 2).contiguous().view(nbatches, -1, self.h * self.d_k)
# x [B,h,L,d_model/h] -> [B,L,h,d_model/h] -> [B,L,d_model]
return self.linears[-1](x)
class PositionwiseFeedForward(nn.Module):
"""
Implements FFN equation.
Two MLP [B,L,d_model]->[B,L,d_model]
"""
def __init__(self, d_model, d_ff, dropout=0.1):
super(PositionwiseFeedForward, self).__init__()
self.w_1 = nn.Linear(d_model, d_ff)
self.w_2 = nn.Linear(d_ff, d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
return self.w_2(self.dropout(F.relu(self.w_1(x))))
class Embeddings(nn.Module):
"""
[B,L,vocab] -> [B,L,d_model]
"""
def __init__(self, d_model, vocab):
super(Embeddings, self).__init__()
self.lut = nn.Linear(vocab, d_model)
self.d_model = d_model
def forward(self, x):
return self.lut(x) * math.sqrt(self.d_model)
class PositionalEncoding(nn.Module):
"""
Implement the PE function. Add sentence position coding
[B,L,d_model]->[B,L,d_model]
"""
def __init__(self, d_model, dropout, max_len=5000):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(p=dropout)
# Compute the positional encodings once in log space.
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len).unsqueeze(1) # [max_len,1] [[0],[1],...,[max_len-1]]
div_term = torch.exp(torch.arange(0, d_model, 2) * (-(math.log(10000.0) / d_model))) # [0,2,4,..., barring d_model]
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0)
self.register_buffer('pe', pe) # When the network is updated, it does not update
def forward(self, x):
# shape [1,x.size(1),d_model] Broadcast mechanism be-all batch That is, each sentence With the same coding matrix
x = x + Variable(self.pe[:, :x.size(1)], requires_grad=False)
return self.dropout(x)
def make_model(src_vocab, tgt_vocab, prob_vocab, N=6, d_model=512, d_ff=2048, h=8, dropout=0.1):
"""
Helper: Construct a model from hyperparameters.
src_vocab Inputs Last dimension
tgt_vocab Outputs Last dimension
tgt_vocab Output Probabilities Last dimension
"""
c = copy.deepcopy
attn = MultiHeadedAttention(h, d_model)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
position = PositionalEncoding(d_model, dropout)
model = EncoderDecoder(
Encoder(EncoderLayer(d_model, c(attn), c(ff), dropout), N), # Two coding layers are connected in series
Decoder(DecoderLayer(d_model, c(attn), c(attn), c(ff), dropout), N), # Two decoding layers are connected in series Two free memory Don't pick up
nn.Sequential(Embeddings(d_model, src_vocab), c(position)), # Sequential reference container src_vocab There are several kinds of words entered
nn.Sequential(Embeddings(d_model, tgt_vocab), c(position)), # Sequential reference container tgt_vocab There are several kinds of words output
Generator(d_model, prob_vocab)) # One MLP
# This was important from their code.
# Initialize parameters with Glorot / fan_avg. Initialize parameters
for p in model.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
return model
batch = 3
input_len = 5
d_input = 15
output_len = 7
d_output = 17
d_prob = 19
my_model = make_model(d_input, d_output, d_prob)
Inputs = torch.ones([batch, input_len, d_input])
Outputs = torch.ones([batch, output_len, d_output])
src_mask = torch.ones([batch,1,input_len])
tgt_mask = subsequent_mask(output_len)
print(my_model(Inputs,Outputs,src_mask,tgt_mask).shape)
版权声明
本文为[qq1033930618]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204230618497119.html
边栏推荐
- MySql常用语句
- Full stack cross compilation x86 completion process experience sharing
- SQL Server recursive query of superior and subordinate
- JUC concurrent programming 06 -- in-depth analysis of AQS source code of queue synchronizer
- 使用zerotier让异地设备组局域网
- Ueditor -- limitation of 4m size of image upload component
- Sim Api User Guide(7)
- How can swagger2 custom parameter annotations not be displayed
- 景联文科技—专业数据标注公司和智能数据标注平台
- Ansible cloud computing automation
猜你喜欢
How can swagger2 custom parameter annotations not be displayed
/Can etc / shadow be cracked?
Deploy jar package
[provincial election joint examination 2022 d2t1] card (state compression DP, FWT convolution)
Ueditor -- limitation of 4m size of image upload component
Reading integrity monitoring techniques for vision navigation systems - 5 Results
基于PyQt5实现弹出任务进度条功能示例
JUC concurrent programming 06 -- in-depth analysis of AQS source code of queue synchronizer
Manjaro installation and configuration (vscode, wechat, beautification, input method)
Idea - indexing or scanning files to index every time you start
随机推荐
SQL Server cursor circular table data
What about Jerry's stack overflow? [chapter]
/Can etc / shadow be cracked?
MapReduce core and foundation demo
What if Jerry's function to locate the corresponding address is not accurate sometimes? [chapter]
Introduction to data analysis 𞓜 kaggle Titanic mission (III) - > explore data analysis
解决方案架构师的小锦囊 - 架构图的 5 种类型
Charles 功能介绍和使用教程
【leetcode】102.二叉树的层序遍历
Jerry's more accurate determination of abnormal address [chapter]
最强日期正则表达式
Full stack cross compilation x86 completion process experience sharing
206、反转链表(链表)
59. Spiral matrix (array)
142、环形链表||
解决方案架构师的小锦囊 - 架构图的 5 种类型
242. Valid Letter ectopic words (hash table)
A diary of dishes | 238 Product of arrays other than itself
Notes on concurrent programming of vegetables (IX) asynchronous IO to realize concurrent crawler acceleration
LeetCode 1249. Minimum remove to make valid parents - FB high frequency question 1