Transformer architecture

Author

차상진

Published

April 19, 2025

1. 트랜스포머 인코딩 층 구현

step 1. scaled dot-product attetion

from transformers import AutoTokenizer
from bertviz.transformers_neuron_view import BertModel
from bertviz.neuron_view import show
from torch import nn
from transformers import AutoConfig
import torch
from math import sqrt
import torch.nn.functional as F

# step 1: 모델, 토크나이저, text 선언
model_ckpt = "bert-base-uncased" # 원하는 모델 선언
tokenizer = AutoTokenizer.from_pretrained(model_ckpt) # 모델에서 사용된 pretrained 된 tokenizer를 불러온다.
model = BertModel.from_pretrained(model_ckpt) # pretrained 된 model을 불러온다.
text = "time flies like an arrow"


# step 2: 토크나이징
inputs = tokenizer(text, return_tensors="pt", add_special_tokens=False) # pytorch를 사용해서 CLS,SEP 토큰을 제외하여 토크나이징하는 코드
inputs.input_ids # 결과는 토크나이저에 있는 어휘사전에 고유한 ID에 매핑된 값이다.


# step 3: 모델의 하이퍼파라미터를 불러오고 임베딩
config = AutoConfig.from_pretrained(model_ckpt) # config란 모델의 하이퍼파라미터들을 저장하는 객체, 모델에서 pretrained된 정보를 불러오는 코드.
token_emb = nn.Embedding(config.vocab_size, config.hidden_size) # 임베딩의 층을 생성하는 것
#config.vocab_size는 모델이 사용하는 단어사전의 크기. config.hidden_size는 모델의 임베딩 차원이다. 각 단어를 몇 차원의 벡터로 변환할지에 대한 것이다
# 결과의 30522는 단어사전의 크기이고 768은 임베딩 차원이다.
inputs_embeds = token_emb(inputs.input_ids) #위에서 선언한 token_emb을 이용해서 5개의 단어를 임베딩시켰다.
# 임베딩 시켰다는 것은 각 단어가 768개의 특성을 가지는 벡터값으로 변환된 것이라고 이해하면 된다.
inputs_embeds.size()


# step 4: 점곱과 softmax를 이용한 가중치 계산
query = key = value = inputs_embeds
dim_k = key.size(-1) # 임베딩 차원을 선택
scores = torch.bmm(query, key.transpose(1,2)) / sqrt(dim_k) # 점곱을 사용. self attention의 경우 batch단위로 연산이 필요하기에 torch.bmm을 사용
# 점곱을 진행하면 dim_k개의 요소가 더해지므로 dim_k와 비례해서 점곱 값이 결정됨. 그래서 dim_k의 제곱근으로 나누면 통계적으로 표준화와 비슷한 역할을 한다.
weights = F.softmax(scores, dim=-1) # softmax를 사용


# step 5: 최종적으로 계산된 가중치와 value를 점곱
attn_outputs = torch.bmm(weights, value) # 마지막으로 value에 가중치를 곱해준다.
attn_outputs.size()
/root/anaconda3/envs/asdf/lib/python3.9/site-packages/huggingface_hub/file_download.py:797: FutureWarning: `resume_download` is deprecated and will be removed in version 1.0.0. Downloads always resume when possible. If you want to force a new download, use `force_download=True`.
  warnings.warn(
torch.Size([1, 5, 768])

step 2. multi head attention

# scaled_dot_product_attention 함수로 정의
def scaled_dot_product_attention(query, key, value):
    dim_k = query.size(-1)
    scores = torch.bmm(query, key.transpose(1, 2)) / sqrt(dim_k)
    weights = F.softmax(scores, dim=-1)
    return torch.bmm(weights, value)

# 앞에서는 q,k,v를 모두 같은 값을 사용했지만 실제로는 독립적인 선형 변환 3개를 이용해서 q,k,v를 생성한다.
class AttentionHead(nn.Module):
    def __init__(self, embed_dim, head_dim):
        super().__init__()
        self.q = nn.Linear(embed_dim, head_dim)
        self.k = nn.Linear(embed_dim, head_dim)
        self.v = nn.Linear(embed_dim, head_dim)

    def forward(self, hidden_state):
        attn_outputs = scaled_dot_product_attention(
            self.q(hidden_state), self.k(hidden_state), self.v(hidden_state))
        return attn_outputs

class MultiHeadAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        embed_dim = config.hidden_size
        num_heads = config.num_attention_heads
        head_dim = embed_dim // num_heads # 헤드마다 계산이 일정하도록 선택
        self.heads = nn.ModuleList(
            [AttentionHead(embed_dim, head_dim) for _ in range(num_heads)]
        ) # 어텐션 헤드를 리스트로 저장하고 각각의 어텐션 헤드가 입력(hidden_state)을 독립적으로 학습
        # 어텐션 헤드는 self attention을 수행하는 작은 모듈이고 그것을 결합하는 것이 multi head attention이다.
        self.output_linear = nn.Linear(embed_dim, embed_dim) # 어텐션 헤드를 연결하고 최종적으로 선형변환

    def forward(self, hidden_state):
        x = torch.cat([h(hidden_state) for h in self.heads], dim=-1) #각각에서 attention을 실행하고 합치는 코드
        x = self.output_linear(x) # 최종 선형변환
        return x
multihead_attn = MultiHeadAttention(config)
attn_output = multihead_attn(inputs_embeds)
attn_output.size()
torch.Size([1, 5, 768])

step 3. feed forward layer

class FeedForward(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.linear_1 = nn.Linear(config.hidden_size, config.intermediate_size)
        self.linear_2 = nn.Linear(config.intermediate_size, config.hidden_size)
        self.gelu = nn.GELU()
        self.dropout = nn.Dropout(config.hidden_dropout_prob) # 일부 뉴런을 랜덤하게 0으로 만들어서 특정 뉴런에 의존하는 것을 막는다.

    def forward(self, x):
        x = self.linear_1(x)
        x = self.gelu(x)
        x = self.linear_2(x)
        x = self.dropout(x)
        return x
feed_forward = FeedForward(config)
ff_outputs = feed_forward(attn_outputs)
ff_outputs.size()
torch.Size([1, 5, 768])

step 4. layer normalization

class TransformerEncoderLayer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.layer_norm_1 = nn.LayerNorm(config.hidden_size)
        self.layer_norm_2 = nn.LayerNorm(config.hidden_size)
        self.attention = MultiHeadAttention(config)
        self.feed_forward = FeedForward(config)

    def forward(self, x):
        # 층 정규화를 적용하고 입력을 쿼리, 키, 값으로 복사합니다.
        hidden_state = self.layer_norm_1(x)
        x = x + self.attention(hidden_state) # 어텐션 연산 후 스킵연결
        x = x + self.feed_forward(self.layer_norm_2(x)) # 피드포워드 연산 후 스킵연결
        # 스킵연결은 기울기소실 문제 완화, 학습 안정화
        return x
encoder_layer = TransformerEncoderLayer(config)
inputs_embeds.shape, encoder_layer(inputs_embeds).size()
(torch.Size([1, 5, 768]), torch.Size([1, 5, 768]))

step 5. 위치 임베딩

class Embeddings(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.token_embeddings = nn.Embedding(config.vocab_size,
                                             config.hidden_size)
        self.position_embeddings = nn.Embedding(config.max_position_embeddings,
                                                config.hidden_size)
        self.layer_norm = nn.LayerNorm(config.hidden_size, eps=1e-12)
        self.dropout = nn.Dropout()

    def forward(self, input_ids):
        # 입력 시퀀스에 대해 위치 ID를 만듭니다.
        seq_length = input_ids.size(1)
        position_ids = torch.arange(seq_length, dtype=torch.long).unsqueeze(0)
        # 토큰 임베딩과 위치 임베딩을 만듭니다.
        token_embeddings = self.token_embeddings(input_ids)
        position_embeddings = self.position_embeddings(position_ids)
        # 토큰 임베딩과 위치 임베딩을 합칩니다.
        embeddings = token_embeddings + position_embeddings
        embeddings = self.layer_norm(embeddings)
        embeddings = self.dropout(embeddings)
        return embeddings
embedding_layer = Embeddings(config)
embedding_layer(inputs.input_ids).size()
torch.Size([1, 5, 768])

step 6. 임베딩과 인코더 층 연결

class TransformerEncoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.embeddings = Embeddings(config)
        self.layers = nn.ModuleList([TransformerEncoderLayer(config)
                                     for _ in range(config.num_hidden_layers)])

    def forward(self, x):
        x = self.embeddings(x)
        for layer in self.layers:
            x = layer(x)
        return x
encoder = TransformerEncoder(config)
encoder(inputs.input_ids).size()
torch.Size([1, 5, 768])

step 7. 분류 헤드 추가

class TransformerForSequenceClassification(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.encoder = TransformerEncoder(config)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.classifier = nn.Linear(config.hidden_size, config.num_labels)

    def forward(self, x):
        x = self.encoder(x)[:, 0, :] # [CLS] 토큰의 은닉 상태를 선택합니다.
        # [CLS]는 문장의 전체 의미를 대표하는 벡터로 학습되기때문에 [CLS]만 가져오는 것
        x = self.dropout(x)
        x = self.classifier(x)
        return x
config.num_labels = 3
encoder_classifier = TransformerForSequenceClassification(config)
encoder_classifier(inputs.input_ids).size()
torch.Size([1, 3])

2. Decoder 구현

step 1. Mask matrix

- 마스크 행렬

seq_len = inputs.input_ids.size(-1) # 마지막 차원의 크기이며 토큰 개수를 의미한다.
mask = torch.tril(torch.ones(seq_len,seq_len)).unsqueeze(0) # unsqueeze를 취하면 인덱스 0번 자리에 해당하는 차원이 하나 늘어난다.

- mask에서 .unsqueeze(0)을 하는 이유

unsqueeze를 취하면 인덱스 0번 자리에 해당하는 차원이 하나 늘어난다. 딥러닝 모델에서는 일반적으로 한 번에 여러 문장을 처리하기 위해 batch형태로 계산한다.

특히 트랜스포머 모델은 attention mask를 사용할 때 배치차원까지 추가된 형태를 기대하기에 .unsqueeze(0)을 이용해서 맞춰주는 것이다.

- 의문점

Q. seq의 길이가 input ids의 마지막 차원이면 토큰 개수인데 그럼 출력 길이가 입력 길이와 같다는 의미인가요?

A. 디코더는 ’출력 시퀀스’에 대해 self attention을 수행한다. 즉 지금까지 생성한 출력 토큰들끼리 관계를 파악하는 것이 목적이다. seq_len은 지금까지 생성된 출력의 길이를 의미한다.

print(scores)
print(scores.masked_fill(mask == 0, -float('inf')))
tensor([[[26.5910,  1.8838, -0.6318,  0.3911, -0.6419],
         [ 1.8838, 28.6324, -0.9585,  0.3705,  1.0006],
         [-0.6318, -0.9585, 26.4333, -0.5234, -0.8233],
         [ 0.3911,  0.3705, -0.5234, 28.6680, -0.4031],
         [-0.6419,  1.0006, -0.8233, -0.4031, 27.1448]]],
       grad_fn=<DivBackward0>)
tensor([[[26.5910,    -inf,    -inf,    -inf,    -inf],
         [ 1.8838, 28.6324,    -inf,    -inf,    -inf],
         [-0.6318, -0.9585, 26.4333,    -inf,    -inf],
         [ 0.3911,  0.3705, -0.5234, 28.6680,    -inf],
         [-0.6419,  1.0006, -0.8233, -0.4031, 27.1448]]],
       grad_fn=<MaskedFillBackward0>)

- -inf로 설정하는 이유

대각선 위의 값을 음의 무한대로 설정하면, 소프트맥스 함수를 적용할 때 \(e^{-inf}\) = 0 이므로 어텐션 가중치가 모두 0이 된다.

- softmax 값이 0이 되면 어떻게 되는지?

selt-attetion 계산은 softmax(Q @ K.T / sqrt(d_k)) 에서 Q @ K.T을 연산한 후에 그 결과에 masked_fill을 이용해서 하삼각행렬로 만든다. 그래야 self-attention이 어디를 보면 안되는지 지정해준다.

def scaled_dot_product_attetion(query, key, value, mask=None):
    dim_k = query.size(-1)
    scores = torch.bmm(query, key.transpose(1,2)) / sqrt(dim_k) # bmm = batch matrix multiplication 즉, 배치(batch) 단위로 행렬 곱셈을 수행하는 함수
    if mask is not None:
        scores = socres.masked_fill(mask==0, float('-inf'))
    weights = F.softmax(scores,dim = -1)
    return weights.bmm(value)

여기서 if mask is not None 조건문은 어떻게 실행되는지 궁금할 수 있다. (내가 궁금했음)

mask는 함수를 실행할 때 입력으로 받는 인자(argument)이다. 보통 하삼각행렬 형태로 받는다.

import torch
import torch.nn as nn
import torch.nn.functional as F
from math import sqrt

# 디코더 층 구현
class DecoderLayer(nn.Module):
    def __init__(self, dim_model, num_heads, dim_feedforward, dropout=0.1):
        super(DecoderLayer, self).__init__()
        
        # Self-attention (masked)
        self.self_attention = nn.MultiheadAttention(embed_dim=dim_model, num_heads=num_heads, dropout=dropout)
        
        # Encoder-Decoder attention
        self.enc_dec_attention = nn.MultiheadAttention(embed_dim=dim_model, num_heads=num_heads, dropout=dropout)
        
        # Feedforward network
        self.feedforward = nn.Sequential(
            nn.Linear(dim_model, dim_feedforward),
            nn.ReLU(),
            nn.Linear(dim_feedforward, dim_model)
        )
        
        # Layer normalization
        self.layer_norm1 = nn.LayerNorm(dim_model)
        self.layer_norm2 = nn.LayerNorm(dim_model)
        self.layer_norm3 = nn.LayerNorm(dim_model)
        
        # Dropout
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, tgt, memory, tgt_mask=None, memory_mask=None):
        # Self-attention (masked)
        attn_output, _ = self.self_attention(tgt, tgt, tgt, attn_mask=tgt_mask) # tgt = 디코더 입력 토큰
        tgt = self.layer_norm1(tgt + self.dropout(attn_output))
        
        # Encoder-Decoder attention
        attn_output, _ = self.enc_dec_attention(tgt, memory, memory, attn_mask=memory_mask) # 디코더가 인코더의 정보를 참고함
        tgt = self.layer_norm2(tgt + self.dropout(attn_output))
        
        # Feedforward 
        ff_output = self.feedforward(tgt)
        tgt = self.layer_norm3(tgt + self.dropout(ff_output))
        
        return tgt

# 디코더 모델 전체 구성
class TransformerDecoder(nn.Module):
    def __init__(self, num_layers, dim_model, num_heads, dim_feedforward, vocab_size, dropout=0.1):
        super(TransformerDecoder, self).__init__()
        
        self.embedding = nn.Embedding(vocab_size, dim_model)  # 임베딩 레이어
        self.positional_encoding = nn.Parameter(torch.zeros(1, 1000, dim_model)) # 위치 인코딩 
        
        # 여러 개의 디코더 층
        self.decoder_layers = nn.ModuleList([DecoderLayer(dim_model, num_heads, dim_feedforward, dropout) for _ in range(num_layers)])
        
        self.output_layer = nn.Linear(dim_model, vocab_size)  # 최종 출력 (어휘 크기)
    
    def forward(self, tgt, memory, tgt_mask=None, memory_mask=None):
        # 임베딩 + 위치 인코딩
        tgt = self.embedding(tgt) + self.positional_encoding[:, :tgt.size(1), :]
        
        # 여러 개의 디코더 층을 통과
        for layer in self.decoder_layers:
            tgt = layer(tgt, memory, tgt_mask=tgt_mask, memory_mask=memory_mask)
        
        # 출력 레이어
        output = self.output_layer(tgt)
        return output
dim_model = 512  # 모델 차원
num_heads = 8  # 헤드 개수
dim_feedforward = 2048  # 피드포워드 네트워크 차원
vocab_size = 10000  # 어휘 크기 
num_layers = 6  # 디코더 층 수

decoder = TransformerDecoder(num_layers, dim_model, num_heads, dim_feedforward, vocab_size)

# 테스트용 데이터 생성
batch_size = 2  # 배치 크기
seq_len = 5  # 시퀀스 길이

# 타겟 시퀀스 (임의의 토큰 인덱스)
tgt = torch.randint(0, vocab_size, (batch_size, seq_len))  # (배치 크기, 시퀀스 길이)

# 메모리 (인코더의 출력, 임의로 생성)
memory = torch.randn(batch_size, seq_len, dim_model)  # (배치 크기, 시퀀스 길이, 모델 차원)

# 마스크는 None으로 설정 (마스크가 필요하지 않으면 None 사용)
tgt_mask = None
memory_mask = None

# 디코더 실행
output = decoder(tgt, memory, tgt_mask=tgt_mask, memory_mask=memory_mask)

# 결과 출력
print(f"Output shape: {output.shape}")
Output shape: torch.Size([2, 5, 10000])