기존의 RNN(Recurrent Neural Network)이나 LSTM(Long Short-Term Memory) 모델은 입력 데이터를 순차적으로 처리하기 때문에 긴 문장에서 중요한 정보를 잃어버리거나, 멀리 떨어진 단어 간의 관계를 잘 파악하지 못하는 문제가 있었습니다. 어텐션은 입력 문장의 모든 단어를 한 번에 보고, 어떤 단어가 중요한지 가중치를 계산하여 집중하는 방법입니다.
예를 들어, "나는 오늘 학교에서 수학 시험을 봤다."라는 문장에서 "시험"이라는 단어가 가장 중요한 의미를 가진다고 가정합시다. 어텐션은 이 문장을 처리할 때 "시험"에 더 높은 가중치를 주고, 덜 중요한 단어에는 낮은 가중치를 주는 방식으로 학습합니다.
어텐션 메커니즘은 크게 쿼리(Query), 키(Key), 값(Value) 세 가지 개념을 이용하여 작동합니다. 이를 도서관에서 책을 빌리는 과정에 비유하여 설명해보겠습니다.
1. 쿼리(Query): "어떤 정보를 찾을까?"
2. 키(Key): "각 책이 어떤 내용과 관련이 있는가?"
3. 값(Value): "책에서 실제로 얻을 수 있는 정보"
4. 어텐션 가중치 계산: "어떤 책이 가장 관련 있는가?"
5. 결과(출력) 도출: "중요한 정보를 더 많이 반영"
import os
import requests
import zipfile
import torch
import torch.nn as nn
import torch.optim as optim
import random
import re
import unicodedata
from torch.utils.data import Dataset, DataLoader
from collections import Counter
from torch.nn.utils.rnn import pad_sequence
# 데이터 다운로드 및 압축 해제
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
def download_zip(url, output_path):
response = requests.get(url, headers=headers, stream=True)
if response.status_code == 200:
with open(output_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"ZIP file downloaded to {output_path}")
else:
print(f"Failed to download. HTTP Response Code: {response.status_code}")
url = "http://www.manythings.org/anki/fra-eng.zip"
output_path = "fra-eng.zip"
download_zip(url, output_path)
path = os.getcwd()
zipfilename = os.path.join(path, output_path)
with zipfile.ZipFile(zipfilename, 'r') as zip_ref:
zip_ref.extractall(path)
# 데이터 로드 및 전처리
def unicode_to_ascii(s):
return ''.join(c for c in unicodedata.normalize('NFD', s) if unicodedata.category(c) != 'Mn')
def normalize_string(s):
s = unicode_to_ascii(s.lower().strip())
s = re.sub(r"[^a-zA-Z.!?]+", " ", s)
return s
def load_data(filepath, num_samples=50000):
with open(filepath, encoding='utf-8') as f:
lines = f.read().strip().split("\n")
pairs = [[normalize_string(s) for s in l.split('\t')[:2]] for l in lines[:num_samples]]
return pairs
file_path = os.path.join(path, "fra.txt")
pairs = load_data(file_path, num_samples=50000)
# 단어 사전 생성
class Lang:
def __init__(self):
self.word2index = {"<SOS>": 0, "<EOS>": 1, "<PAD>": 2}
self.index2word = {0: "<SOS>", 1: "<EOS>", 2: "<PAD>"}
self.word_count = Counter()
def add_sentence(self, sentence):
for word in sentence.split():
self.word_count[word] += 1
def build_vocab(self, min_count=1):
for word, count in self.word_count.items():
if count >= min_count:
index = len(self.word2index)
self.word2index[word] = index
self.index2word[index] = word
def sentence_to_indexes(self, sentence):
return [self.word2index.get(word, self.word2index['<PAD>']) for word in sentence.split()]
input_lang = Lang()
target_lang = Lang()
for src, tgt in pairs:
input_lang.add_sentence(src)
target_lang.add_sentence(tgt)
input_lang.build_vocab()
target_lang.build_vocab()
# 데이터셋 및 DataLoader 생성
class TranslationDataset(Dataset):
def __init__(self, pairs, input_lang, target_lang, max_length=20):
self.pairs = pairs
self.input_lang = input_lang
self.target_lang = target_lang
self.max_length = max_length
def __len__(self):
return len(self.pairs)
def __getitem__(self, idx):
src, tgt = self.pairs[idx]
src_idx = self.input_lang.sentence_to_indexes(src)[:self.max_length] + [self.input_lang.word2index['<EOS>']]
tgt_idx = self.target_lang.sentence_to_indexes(tgt)[:self.max_length] + [self.target_lang.word2index['<EOS>']]
return torch.tensor(src_idx), torch.tensor(tgt_idx)
def collate_fn(batch):
src_batch, tgt_batch = zip(*batch)
src_batch = pad_sequence(src_batch, batch_first=True, padding_value=input_lang.word2index['<PAD>'])
tgt_batch = pad_sequence(tgt_batch, batch_first=True, padding_value=target_lang.word2index['<PAD>'])
return src_batch, tgt_batch
dataset = TranslationDataset(pairs, input_lang, target_lang)
dataloader = DataLoader(dataset, batch_size=64, shuffle=True, collate_fn=collate_fn)
# 어텐션 클래스 추가 (Luong Attention)
class Attention(nn.Module):
def __init__(self, hidden_size):
super().__init__()
self.attn = nn.Linear(hidden_size * 3, hidden_size) # 어텐션 가중치 계산
self.v = nn.Parameter(torch.rand(hidden_size)) # 학습 가능한 가중치 벡터
def forward(self, hidden, encoder_outputs):
# hidden: (batch, hidden_size)
# encoder_outputs: (batch, seq_len, hidden_size * 2)
seq_len = encoder_outputs.shape[1]
hidden = hidden.unsqueeze(1).repeat(1, seq_len, 1) # (batch, seq_len, hidden_size)
energy = torch.relu(self.attn(torch.cat((hidden, encoder_outputs), dim=2))) # (batch, seq_len, hidden_size)
attention_weights = torch.sum(self.v * energy, dim=2) # (batch, seq_len)
attention_weights = torch.softmax(attention_weights, dim=1) # 정규화
# 가중치를 인코더 출력에 적용
attention_applied = torch.bmm(attention_weights.unsqueeze(1), encoder_outputs) # (batch, 1, hidden_size * 2)
return attention_applied.squeeze(1), attention_weights # (batch, hidden_size * 2), (batch, seq_len)
# 인코더 수정 (기존 구조 유지)
class Encoder(nn.Module):
def __init__(self, input_size, embedding_size, hidden_size, num_layers=2, dropout=0.3):
super().__init__()
self.embedding = nn.Embedding(input_size, embedding_size)
self.rnn = nn.GRU(embedding_size, hidden_size, num_layers=num_layers, dropout=dropout, batch_first=True, bidirectional=True)
self.fc = nn.Linear(hidden_size * 2, hidden_size)
def forward(self, x):
embedded = self.embedding(x) # (batch, seq_len, embedding_size)
outputs, hidden = self.rnn(embedded) # outputs: (batch, seq_len, hidden_size * 2)
hidden = torch.tanh(self.fc(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1))) # (batch, hidden_size)
return outputs, hidden.unsqueeze(0).repeat(2, 1, 1) # (batch, seq_len, hidden_size * 2), (num_layers, batch, hidden_size)
# 디코더 수정 (어텐션 추가)
class Decoder(nn.Module):
def __init__(self, output_size, embedding_size, hidden_size, num_layers=2, dropout=0.3):
super().__init__()
self.embedding = nn.Embedding(output_size, embedding_size)
self.attention = Attention(hidden_size)
self.rnn = nn.GRU(embedding_size + hidden_size * 2, hidden_size, num_layers=num_layers, dropout=dropout, batch_first=True)
self.fc = nn.Linear(hidden_size * 3, output_size) # 어텐션 적용 후 예측
def forward(self, x, hidden, encoder_outputs):
x = x.unsqueeze(1) # (batch, 1)
embedded = self.embedding(x) # (batch, 1, embedding_size)
attn_context, attn_weights = self.attention(hidden[-1], encoder_outputs) # (batch, hidden_size * 2)
rnn_input = torch.cat((embedded, attn_context.unsqueeze(1)), dim=2) # (batch, 1, embedding_size + hidden_size * 2)
output, hidden = self.rnn(rnn_input, hidden) # output: (batch, 1, hidden_size)
output = torch.cat((output.squeeze(1), attn_context), dim=1) # (batch, hidden_size * 3)
prediction = self.fc(output) # (batch, output_size)
prediction[:, target_lang.word2index["<PAD>"]] -= 100
return prediction, hidden, attn_weights
# 학습 함수 수정 (어텐션 적용)
def train(encoder, decoder, dataloader, optimizer, criterion, device, num_epochs=50, teacher_forcing_ratio=0.7):
for epoch in range(num_epochs):
total_loss = 0
for src, tgt in dataloader:
src, tgt = src.to(device), tgt.to(device)
optimizer.zero_grad()
encoder_outputs, encoder_hidden = encoder(src)
decoder_input = torch.tensor([target_lang.word2index['<SOS>']] * src.shape[0], device=device)
decoder_hidden = encoder_hidden
loss = 0
for t in range(tgt.shape[1]):
output, decoder_hidden, _ = decoder(decoder_input, decoder_hidden, encoder_outputs)
loss += criterion(output, tgt[:, t])
teacher_force = random.random() < teacher_forcing_ratio
decoder_input = tgt[:, t] if teacher_force else output.argmax(1)
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Epoch {epoch+1}, Loss: {total_loss / len(dataloader)}")
import matplotlib.pyplot as plt
import numpy as np
def translate_sentence_with_attention(sentence, encoder, decoder, input_lang, target_lang, device, max_length=30, min_length=5):
encoder.eval()
decoder.eval()
with torch.no_grad():
src_idx = input_lang.sentence_to_indexes(sentence) + [input_lang.word2index['<EOS>']]
src_tensor = torch.tensor(src_idx, device=device).unsqueeze(0)
# 인코더 실행
encoder_outputs, encoder_hidden = encoder(src_tensor)
# 디코더 초기화
decoder_input = torch.tensor([target_lang.word2index['<SOS>']], device=device)
decoder_hidden = encoder_hidden
translated_sentence = []
attention_weights_list = []
# 단어 생성 반복
for _ in range(max_length):
output, decoder_hidden, attn_weights = decoder(decoder_input, decoder_hidden, encoder_outputs)
top_word_idx = output.argmax(1).item()
# 🔹 <PAD>가 나오면 무시
if top_word_idx == target_lang.word2index["<PAD>"]:
continue
# 🔹 최소 길이(min_length) 이상 생성 후에만 <EOS> 허용
if top_word_idx == target_lang.word2index["<EOS>"] and len(translated_sentence) >= min_length:
break
# 어텐션 가중치 저장
attention_weights_list.append(attn_weights.squeeze(0).cpu().numpy())
translated_sentence.append(target_lang.index2word[top_word_idx])
decoder_input = torch.tensor([top_word_idx], device=device)
return translated_sentence, np.array(attention_weights_list), src_idx
def plot_attention(attention_weights, input_sentence, output_sentence):
fig, ax = plt.subplots(figsize=(10, 6))
input_words = input_sentence.split() + ["<EOS>"]
output_words = output_sentence + ["<EOS>"]
ax.matshow(attention_weights, cmap='Blues', aspect='auto')
# 🔹 set_ticks() 먼저 호출한 후 set_ticklabels() 실행
ax.set_xticks(range(len(input_words)))
ax.set_xticklabels(input_words, rotation=45)
ax.set_yticks(range(len(output_words)))
ax.set_yticklabels(output_words)
plt.colorbar(ax.matshow(attention_weights, cmap='Blues', aspect='auto'))
plt.xlabel("Input Sentence")
plt.ylabel("Output Sentence")
plt.show()
# 모델 학습 및 테스트 실행
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
encoder = Encoder(len(input_lang.word2index), 512, 512).to(device)
decoder = Decoder(len(target_lang.word2index), 512, 512).to(device)
optimizer = optim.Adam(list(encoder.parameters()) + list(decoder.parameters()), lr=0.0001)
criterion = nn.CrossEntropyLoss(ignore_index=target_lang.word2index["<PAD>"])
train(encoder, decoder, dataloader, optimizer, criterion, device, num_epochs=200)
test_sentences = [
"i love you so much because you are always kind to me.",
"where is the nearest train station? i need to catch a train.",
"i am studying artificial intelligence and deep learning these days.",
"this restaurant serves the best pasta i have ever had in my life.",
"the weather is beautiful today, so we decided to go for a walk in the park.",
"i have a meeting tomorrow morning at 9 am, so i need to sleep early tonight.",
"can you please tell me how to get to the airport from here?",
"she loves reading books about history and ancient civilizations.",
"the computer program i wrote is finally working without any bugs.",
"our flight was delayed due to bad weather conditions, so we had to wait for hours."
]
# 여러 문장 번역 실행 및 결과 출력
for sentence in test_sentences:
translated_sentence, attention_weights, src_idx = translate_sentence_with_attention(sentence, encoder, decoder, input_lang, target_lang, device)
print(f"🔹 Input: {sentence}")
print(f"🔹 Translated: {' '.join(translated_sentence)}\n")
# 어텐션 시각화
plot_attention(attention_weights, sentence, translated_sentence)