상세 컨텐츠

본문 제목

RNN

인공지능/자연어 처리

by Ryuzy 2025. 2. 5. 01:53

본문

728x90
반응형

1. 시퀀스 데이터

시퀀스 데이터(Sequence Data)는 시간적 혹은 순서적 관계가 중요한 데이터를 말합니다. 일반적인 데이터가 순서와 무관하게 독립적으로 다뤄지는 것과 달리, 시퀀스 데이터는 각 요소가 특정 순서에 따라 나열되어 있고 앞뒤 맥락이 의미를 가집니다. 대표적인 예로는 자연어 문장(단어의 순서 중요), 시계열 데이터(주가, 날씨 변화), 음성 신호(시간에 따른 파형), 영상 프레임(연속된 이미지) 등이 있습니다. 따라서 시퀀스 데이터는 단순한 값들의 모음이 아니라, 순서와 맥락이 포함된 정보 구조로 이해하는 것이 핵심입니다.

 

 

2. RNN

RNN(Recurrent Neural Network, 순환 신경망)은 시계열 데이터나 연속적인 데이터를 다룰 때 사용되는 인공 신경망으로, 일반적인 신경망(CNN, MLP 등)이 입력을 한 번 처리하고 끝나는 것과 달리, 과거의 정보를 기억하며 다음 계산에 반영하는 특징이 있습니다. 일반적인 신경망은 현재 입력만 보고 예측하지만, 시계열 데이터나 자연어처럼 이전 정보가 중요한 경우에는 적절하지 않기 때문에 RNN이 필요합니다. RNN은 기존 신경망과 달리 자신의 출력을 다시 입력으로 사용하여 과거 정보를 기억하는 구조를 가지며, 이를 통해 시계열 데이터의 패턴을 학습할 수 있습니다.  그러나 일반적인 RNN은 장기 의존성(Long-Term Dependency) 문제로 인해 학습이 어려울 수 있으며, 이를 해결하기 위해 장단기 메모리(Long Short-Term Memory, LSTM)나 게이트 순환 유닛(Gated Recurrent Unit, GRU)과 같은 변형 모델이 개발되었습니다. 

 

 

 

은닉층에 있는 RNN의 처리 단위를 셀(cell)이라고 부르며, 셀의 출력을 은닉 상태(hidden state)라고 합니다. RNN은 시점(time step)에 따라서 입력을 받는데 현재 시점의 hidden state인 h𝑡연산을 위해 직전 시점의 hidden state인 h𝑡−1를 입력받습니다. 이것이 RNN이 과거의 정보를 기억할 수 있는 방법입니다.

 

 

 

하이퍼볼릭탄젠트 함수는 시그모이드 함수와 달리 -1 ~ 1의 범위 값으로 값을 반환하는 함수입니다. 반환값의 범위가 시그모이드 함수보다 크므로 일반적으로 은닉층에서는 시그모이드 함수보다 더 잘 동작합니다.

 

 

2. RNN 모델로 이미지 분류하기

import torch
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision.datasets as dset

train_dataset = dset.MNIST(root='.',
                           train=True,
                           transform=transforms.ToTensor(),
                           download=True)

test_dataset = dset.MNIST(root='.',
                          train=False,
                          transform=transforms.ToTensor())

 

batch_size = 100

train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                           batch_size=batch_size,
                                           shuffle=True)

test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
                                          batch_size=batch_size,
                                          shuffle=False)

 

class RNNModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, layer_dim, output_dim):
        super(RNNModel, self).__init__()
        self.hidden_dim = hidden_dim
        self.layer_dim = layer_dim

        self.rnn = nn.RNN(input_dim, hidden_dim, layer_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        h0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).to(x.device)
        out, hn = self.rnn(x, h0)
        out = self.fc(out[:, -1, :])
        return out

 

input_dim = 28
hidden_dim = 100
layer_dim = 1
output_dim = 10
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

model = RNNModel(input_dim, hidden_dim, layer_dim, output_dim).to(device)

 

criterion = nn.CrossEntropyLoss()
learning_rate = 0.01
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

 

num_epochs = 20


for epoch in range(num_epochs):
    model.train()
    for images, labels in train_loader:
        images = images.to(device)
        labels = labels.to(device)

        images = images.view(-1, input_dim, input_dim)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)

        loss.backward()
        optimizer.step()

    model.eval()
    correct = 0
    total = 0
    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.to(device)

        images = images.view(-1, 28, input_dim)

        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)

        total += labels.size(0)
        correct += (predicted == labels).sum()

    accuracy = 100 * correct / total

    print('Epoch: {}. Loss: {}. Accuracy: {}'.format(epoch, loss.item(), accuracy))

 

 

3. RNN을 이용한 KOSPI 예측

아래 데이터를 다운받고 구글드라이브에 업로드하세요

kospi.csv
0.32MB

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim

from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import DataLoader, TensorDataset

 

seed = 2025
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

 

df = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/kospi.csv')
if 'Date' in df.columns:
    df['Date'] = pd.to_datetime(df['Date'], errors='coerce')
    df = df.sort_values('Date').reset_index(drop=True)

df = df.ffill().bfill()

 

FEATS = ['Open', 'High', 'Low', 'Close']

split_ratio = 0.5
split_idx = int(len(df) * split_ratio)

train_df = df.iloc[:split_idx].copy()
test_df  = df.iloc[split_idx:].copy()

scaler_x = MinMaxScaler()
scaler_y = MinMaxScaler()

X_train = train_df[FEATS].values
X_test  = test_df[FEATS].values
y_train = train_df[['Close']].values   # (N,1) 모양
y_test  = test_df[['Close']].values

X_train = scaler_x.fit_transform(X_train)
X_test  = scaler_x.transform(X_test)

y_train = scaler_y.fit_transform(y_train)
y_test  = scaler_y.transform(y_test)

 

def make_seq(X, y, L: int):
    Xs, ys = [], []
    for i in range(len(X) - L):
        Xs.append(X[i:i+L])     # (L, feat)
        ys.append(y[i+L])       # 다음 시점 y
    Xs = torch.tensor(np.array(Xs), dtype=torch.float32)
    ys = torch.tensor(np.array(ys), dtype=torch.float32).view(-1, 1)
    return Xs, ys

 

sequence_length = 5
X_train_seq, y_train_seq = make_seq(X_train, y_train, sequence_length)
X_test_seq,  y_test_seq  = make_seq(X_test,  y_test,  sequence_length)

print("Train seq:", X_train_seq.shape, y_train_seq.shape)
print("Test  seq:", X_test_seq.shape,  y_test_seq.shape)

 

batch_size = 16
train_loader = DataLoader(TensorDataset(X_train_seq, y_train_seq),
                          batch_size=batch_size, shuffle=True, drop_last=False)
test_loader  = DataLoader(TensorDataset(X_test_seq, y_test_seq),
                          batch_size=batch_size, shuffle=False, drop_last=False)

 

class RNNRegressor(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers):
        super().__init__()
        self.rnn = nn.RNN(input_size=input_size,
                          hidden_size=hidden_size,
                          num_layers=num_layers,
                          batch_first=True)  # tanh 기본
        self.fc = nn.Linear(hidden_size, 1)    # << Sigmoid 제거(회귀)
    def forward(self, x):
        B = x.size(0)
        h0 = torch.zeros(self.rnn.num_layers, B, self.rnn.hidden_size, device=x.device)
        out, _ = self.rnn(x, h0)               # out: [B, L, H]
        last = out[:, -1, :]                   # 마지막 시점 은닉만 사용
        yhat = self.fc(last)                   # 선형 출력
        return yhat

 

input_size  = X_train_seq.size(2)             # 4
hidden_size = 32
num_layers  = 2
model = RNNRegressor(input_size, hidden_size, num_layers).to(device)

criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

 

epochs = 200
clip_norm = 1.0

for epoch in range(1, epochs+1):
    model.train()
    running = 0.0
    for xb, yb in train_loader:
        xb = xb.to(device)   # [B, L, 4]
        yb = yb.to(device)   # [B, 1]

        optimizer.zero_grad()
        pred = model(xb)
        loss = criterion(pred, yb)
        loss.backward()
        # ReLU RNN을 쓸 경우 특히 유용하지만 tanh에도 안전장치로 둡니다.
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=clip_norm)
        optimizer.step()

        running += loss.item()

    if epoch % 20 == 0 or epoch == 1:
        print(f"[{epoch:03d}/{epochs}] train MSE: {running/len(train_loader):.6f}")

 

model.eval()
pred_list, true_list = [], []
with torch.no_grad():
    for xb, yb in test_loader:
        xb = xb.to(device)
        yb = yb.to(device)
        pred = model(xb)                # 스케일된 예측
        pred_list.append(pred.cpu().numpy())
        true_list.append(yb.cpu().numpy())

pred_scaled = np.vstack(pred_list)      # (N_test, 1)
true_scaled = np.vstack(true_list)

# 원 스케일로 되돌리기(inverse transform)
pred_unscaled = scaler_y.inverse_transform(pred_scaled)
true_unscaled = scaler_y.inverse_transform(true_scaled)

from sklearn.metrics import mean_squared_error, mean_absolute_error
rmse = mean_squared_error(true_unscaled, pred_unscaled)
mae  = mean_absolute_error(true_unscaled, pred_unscaled)
print(f"Test RMSE: {rmse:.4f} | MAE: {mae:.4f}")

print("예측(앞 5개):", np.round(pred_unscaled[:5, 0], 2))
print("실제(앞 5개):", np.round(true_unscaled[:5, 0], 2))

 

import numpy as np
import matplotlib.pyplot as plt

# 1) 기준 인덱스들
L = sequence_length          # 예: 5
t0 = split_idx               # train/test 절단 시점 (원본 df 기준)
N_pred = len(pred_unscaled)  # 테스트 예측 길이 = len(test_df) - L

# 2) 실제값(테스트 구간)과 예측값 준비
y_actual_test = df['Close'].values[t0 + L : t0 + L + N_pred]   # 테스트 구간 실제 종가
y_pred_test   = pred_unscaled.ravel()                          # 예측(원 스케일)

# 3) x축(원본 인덱스)에 정확히 맞춰 그림
x_test = np.arange(t0 + L, t0 + L + N_pred)

plt.figure(figsize=(20, 10))

plt.plot(df['Close'].values, color='gray', alpha=0.3, label='full series')

# 테스트 실제/예측 정렬해서 겹치기
plt.plot(x_test, y_actual_test, 'b', label='actual (test)')
plt.plot(x_test, y_pred_test, 'r', linewidth=0.8, label='prediction')

plt.legend()
plt.title('KOSPI Close — Test segment (aligned)')
plt.show()

 

728x90
반응형

'인공지능 > 자연어 처리' 카테고리의 다른 글

Seq2Seq  (0) 2025.02.11
LSTM과 GRU  (0) 2025.02.10
신경망 기반의 벡터화  (2) 2025.02.04
벡터화  (1) 2025.01.22
IMDB Dataset를 활용한 데이터 전처리  (3) 2025.01.22

관련글 더보기