Byte Pair Encoding

Author

차상진

Published

April 24, 2025

1. Byte Pair Encoding?

BPE(Byte Pair Encoding) 는 긴 단어나 새로운 단어를 작은 단위(조각)로 쪼개서 처리할 수 있게 해주는 토큰화 기법이다.

예를 들어 “lowest” 같은 단어는 “low” + “est”로 나눌 수 있고,

처음 보는 단어 “slowestestestestest”는 → “s”, “l”, “o”, “w”, “est”, “est”, “est”, “est”, “est” 처럼 처리할 수 있습니다.

이렇게 하면 모델이 모르는 단어도 일부 조각으로 처리 가능하게 됩니다.

Byte Pair Encoding을 이용해서 Corpus를 토큰화하는 과정을 구현합니다.

BPEvocabulary를 구축하는 단계인 Train단계, 구축된 vocabulary을 이용해서 새로운 Corpus를 토큰화하는 Infer 단계로 나누어서 진행하겠습니다.

2. Imports

import re
from collections import defaultdict

3. train

# 1-1. load file
def load_file(file_path):
    with open(file_path, 'r', encoding='utf-8') as f:
        return f.read()
        
# 1-2. calculate best pair
def get_char_vocab(vocab):
    char_vocab = defaultdict(int)
    for word in vocab:
        word_tuple = tuple(word)
        for i in range(len(word_tuple) - 1):
            pair = (word_tuple[i], word_tuple[i + 1])  # 인접 문자쌍
            char_vocab[pair] += 1
    return char_vocab

# 1-3. merge best pair
def merge_best_pair(tokenized_vocab, best_pair):
    new_tokenized_vocab = []
    
    for word in tokenized_vocab:
        new_word = []
        i = 0
        while i < len(word):
            if i < len(word) - 1 and (word[i], word[i + 1]) == best_pair:
                new_word.append(''.join(best_pair))  # 병합된 문자 추가
                i += 2  # 두 개를 하나로 병합했으므로 2 증가
            else:
                new_word.append(word[i])
                i += 1
        new_tokenized_vocab.append(new_word)

    return new_tokenized_vocab

# 1-4. BPE Algorithm execution
def bpe_algorithm(file_path, max_vocab):
    text = load_file(file_path)
    vocab = set() # 빈 vocab 리스트
    words = text.split() 
    tokenized_vocab = [list(word) for word in words]  # 문자 단위로 변환
    
    while len(vocab) < max_vocab:
        char_vocab = get_char_vocab(tokenized_vocab)
        
        if not char_vocab:  # 더 이상 병합할 것이 없으면 종료
            break
        
        best_pair = max(char_vocab, key=char_vocab.get)  # 가장 빈번한 문자쌍 선택
        tokenized_vocab = merge_best_pair(tokenized_vocab, best_pair)  # 병합 실행
        vocab.add(''.join(best_pair))  # 병합된 문자쌍을 vocab에 추가

         # 50개씩 vocab 크기가 증가할 때마다 출력
        if len(vocab) % 50 == 0:
            print(f"Current vocab size: {len(vocab)}/{max_vocab}")

    return list(vocab), tokenized_vocab

# 1-5. execution
file_path = 'pg100.txt'
max_vocab = 1  # 최대 vocab 크기 설정
final_vocab, final_tokenized_vocab = bpe_algorithm(file_path, max_vocab) # final_tokenized_vocab는 병합된 단어가 계속 쌓아가는 상황을 보여주기에 결과가 길고 지저분하다. 그래서 보이게 하지 않는 것이다.

# 1-6. result
print(f"최종 vocab 크기: {len(final_vocab)}")
print(f"일부 vocab: {final_vocab[:20]}")

# 1-7. make vocab
with open('vocab.txt', 'w', encoding='utf-8') as f:
    f.write("\n".join(final_vocab))

4. infer

# 2-1. Vocab 불러오기
def load_vocab(vocab_file):
    with open(vocab_file, 'r', encoding='utf-8') as f:
        return set(line.strip() for line in f)  # 줄바꿈 제거 후 집합으로 저장

# 2-2. 입력 데이터 불러오기
def load_text(input_file):
    with open(input_file, 'r', encoding='utf-8') as f:
        return f.read()

# # 2-3. BPE 기반 토큰화 수행
def tokenize(text, vocab):
    words = text.split()  # 공백 기준 단어 분리
    tokenized_output = [] # 토큰화 된 것을 저장하기 위한 빈 리스트

    for word in words:
        subword_tokens = []
        current = word

        while current:
            # 가능한 가장 긴 서브워드를 찾음
            for i in range(1,len(current)+1)[::-1]: # ex) abcd -> 4,3,2,1 순으로 인덱스
                subword = current[:i]  # 오른쪽의 알파벳을 하나씩 떼면서 vocab에 단어가 있는지 확인
                if subword in vocab: # 오른쪽 알파벳 하나씩 떼다가 vocab에 같은 거 발견한다면!
                    subword_tokens.append(subword) # subword_tokens에 추가
                    current = current[i:]  # 뗐던 부분에서 vocab에 단어가 있을 수 있으니 다시 확인하기 위해 current로 저장
                    break
            else:  # for loop를 돌았지만 break가 발생하지 않는다면 else 실행
                # 해당하는 서브워드가 vocab에 없으면 그냥 문자 단위로 분리
                subword_tokens.append(current[0]) # 첫 글자 분리
                current = current[1:] # 첫 글자 떼고 나머지는 다시 for loop 실행

        # 첫 번째 토큰을 제외한 나머지 서브워드에 '##' 추가
        for i in range(1, len(subword_tokens)):
            subword_tokens[i] = '##' + subword_tokens[i]

        tokenized_output.extend(subword_tokens) # .extend() -> 리스트에 원소를 추가 

    return ' '.join(tokenized_output) # 공백을 추가하여 하나로 정리

# 2-5. 결과 저장
def save_output(output_text, output_file):
    with open(output_file, 'w', encoding='utf-8') as f:
        f.write(output_text)

# 2-6. 실행
vocab_file = 'vocab.txt'  # 저장된 BPE vocab
input_file = 'infer.txt'  # 토큰화할 입력 텍스트
output_file = 'output.txt'  # 결과 저장할 파일

vocab = load_vocab(vocab_file)
text = load_text(input_file)
tokenized_text = tokenize(text, vocab)
save_output(tokenized_text, output_file)

print("Tokenization completed. Check", output_file)
Tokenization completed. Check output.txt