시퀀스 데이터(Sequence Data)는 시간적 혹은 순서적 관계가 중요한 데이터를 말합니다. 일반적인 데이터가 순서와 무관하게 독립적으로 다뤄지는 것과 달리, 시퀀스 데이터는 각 요소가 특정 순서에 따라 나열되어 있고 앞뒤 맥락이 의미를 가집니다. 대표적인 예로는 자연어 문장(단어의 순서 중요), 시계열 데이터(주가, 날씨 변화), 음성 신호(시간에 따른 파형), 영상 프레임(연속된 이미지) 등이 있습니다. 따라서 시퀀스 데이터는 단순한 값들의 모음이 아니라, 순서와 맥락이 포함된 정보 구조로 이해하는 것이 핵심입니다.
RNN(Recurrent Neural Network, 순환 신경망)은 시계열 데이터나 연속적인 데이터를 다룰 때 사용되는 인공 신경망으로, 일반적인 신경망(CNN, MLP 등)이 입력을 한 번 처리하고 끝나는 것과 달리, 과거의 정보를 기억하며 다음 계산에 반영하는 특징이 있습니다. 일반적인 신경망은 현재 입력만 보고 예측하지만, 시계열 데이터나 자연어처럼 이전 정보가 중요한 경우에는 적절하지 않기 때문에 RNN이 필요합니다. RNN은 기존 신경망과 달리 자신의 출력을 다시 입력으로 사용하여 과거 정보를 기억하는 구조를 가지며, 이를 통해 시계열 데이터의 패턴을 학습할 수 있습니다. 그러나 일반적인 RNN은 장기 의존성(Long-Term Dependency) 문제로 인해 학습이 어려울 수 있으며, 이를 해결하기 위해 장단기 메모리(Long Short-Term Memory, LSTM)나 게이트 순환 유닛(Gated Recurrent Unit, GRU)과 같은 변형 모델이 개발되었습니다.
은닉층에 있는 RNN의 처리 단위를 셀(cell)이라고 부르며, 셀의 출력을 은닉 상태(hidden state)라고 합니다. RNN은 시점(time step)에 따라서 입력을 받는데 현재 시점의 hidden state인 h𝑡연산을 위해 직전 시점의 hidden state인 h𝑡−1를 입력받습니다. 이것이 RNN이 과거의 정보를 기억할 수 있는 방법입니다.
하이퍼볼릭탄젠트 함수는 시그모이드 함수와 달리 -1 ~ 1의 범위 값으로 값을 반환하는 함수입니다. 반환값의 범위가 시그모이드 함수보다 크므로 일반적으로 은닉층에서는 시그모이드 함수보다 더 잘 동작합니다.
import torch
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision.datasets as dset
train_dataset = dset.MNIST(root='.',
train=True,
transform=transforms.ToTensor(),
download=True)
test_dataset = dset.MNIST(root='.',
train=False,
transform=transforms.ToTensor())
batch_size = 100
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
batch_size=batch_size,
shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
batch_size=batch_size,
shuffle=False)
class RNNModel(nn.Module):
def __init__(self, input_dim, hidden_dim, layer_dim, output_dim):
super(RNNModel, self).__init__()
self.hidden_dim = hidden_dim
self.layer_dim = layer_dim
self.rnn = nn.RNN(input_dim, hidden_dim, layer_dim, batch_first=True)
self.fc = nn.Linear(hidden_dim, output_dim)
def forward(self, x):
h0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).to(x.device)
out, hn = self.rnn(x, h0)
out = self.fc(out[:, -1, :])
return out
input_dim = 28
hidden_dim = 100
layer_dim = 1
output_dim = 10
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = RNNModel(input_dim, hidden_dim, layer_dim, output_dim).to(device)
criterion = nn.CrossEntropyLoss()
learning_rate = 0.01
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)
num_epochs = 20
for epoch in range(num_epochs):
model.train()
for images, labels in train_loader:
images = images.to(device)
labels = labels.to(device)
images = images.view(-1, input_dim, input_dim)
optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
model.eval()
correct = 0
total = 0
for images, labels in test_loader:
images = images.to(device)
labels = labels.to(device)
images = images.view(-1, 28, input_dim)
outputs = model(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum()
accuracy = 100 * correct / total
print('Epoch: {}. Loss: {}. Accuracy: {}'.format(epoch, loss.item(), accuracy))
아래 데이터를 다운받고 구글드라이브에 업로드하세요
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import DataLoader, TensorDataset
seed = 2025
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)
df = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/kospi.csv')
if 'Date' in df.columns:
df['Date'] = pd.to_datetime(df['Date'], errors='coerce')
df = df.sort_values('Date').reset_index(drop=True)
df = df.ffill().bfill()
FEATS = ['Open', 'High', 'Low', 'Close']
split_ratio = 0.5
split_idx = int(len(df) * split_ratio)
train_df = df.iloc[:split_idx].copy()
test_df = df.iloc[split_idx:].copy()
scaler_x = MinMaxScaler()
scaler_y = MinMaxScaler()
X_train = train_df[FEATS].values
X_test = test_df[FEATS].values
y_train = train_df[['Close']].values # (N,1) 모양
y_test = test_df[['Close']].values
X_train = scaler_x.fit_transform(X_train)
X_test = scaler_x.transform(X_test)
y_train = scaler_y.fit_transform(y_train)
y_test = scaler_y.transform(y_test)
def make_seq(X, y, L: int):
Xs, ys = [], []
for i in range(len(X) - L):
Xs.append(X[i:i+L]) # (L, feat)
ys.append(y[i+L]) # 다음 시점 y
Xs = torch.tensor(np.array(Xs), dtype=torch.float32)
ys = torch.tensor(np.array(ys), dtype=torch.float32).view(-1, 1)
return Xs, ys
sequence_length = 5
X_train_seq, y_train_seq = make_seq(X_train, y_train, sequence_length)
X_test_seq, y_test_seq = make_seq(X_test, y_test, sequence_length)
print("Train seq:", X_train_seq.shape, y_train_seq.shape)
print("Test seq:", X_test_seq.shape, y_test_seq.shape)
batch_size = 16
train_loader = DataLoader(TensorDataset(X_train_seq, y_train_seq),
batch_size=batch_size, shuffle=True, drop_last=False)
test_loader = DataLoader(TensorDataset(X_test_seq, y_test_seq),
batch_size=batch_size, shuffle=False, drop_last=False)
class RNNRegressor(nn.Module):
def __init__(self, input_size, hidden_size, num_layers):
super().__init__()
self.rnn = nn.RNN(input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True) # tanh 기본
self.fc = nn.Linear(hidden_size, 1) # << Sigmoid 제거(회귀)
def forward(self, x):
B = x.size(0)
h0 = torch.zeros(self.rnn.num_layers, B, self.rnn.hidden_size, device=x.device)
out, _ = self.rnn(x, h0) # out: [B, L, H]
last = out[:, -1, :] # 마지막 시점 은닉만 사용
yhat = self.fc(last) # 선형 출력
return yhat
input_size = X_train_seq.size(2) # 4
hidden_size = 32
num_layers = 2
model = RNNRegressor(input_size, hidden_size, num_layers).to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
epochs = 200
clip_norm = 1.0
for epoch in range(1, epochs+1):
model.train()
running = 0.0
for xb, yb in train_loader:
xb = xb.to(device) # [B, L, 4]
yb = yb.to(device) # [B, 1]
optimizer.zero_grad()
pred = model(xb)
loss = criterion(pred, yb)
loss.backward()
# ReLU RNN을 쓸 경우 특히 유용하지만 tanh에도 안전장치로 둡니다.
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=clip_norm)
optimizer.step()
running += loss.item()
if epoch % 20 == 0 or epoch == 1:
print(f"[{epoch:03d}/{epochs}] train MSE: {running/len(train_loader):.6f}")
model.eval()
pred_list, true_list = [], []
with torch.no_grad():
for xb, yb in test_loader:
xb = xb.to(device)
yb = yb.to(device)
pred = model(xb) # 스케일된 예측
pred_list.append(pred.cpu().numpy())
true_list.append(yb.cpu().numpy())
pred_scaled = np.vstack(pred_list) # (N_test, 1)
true_scaled = np.vstack(true_list)
# 원 스케일로 되돌리기(inverse transform)
pred_unscaled = scaler_y.inverse_transform(pred_scaled)
true_unscaled = scaler_y.inverse_transform(true_scaled)
from sklearn.metrics import mean_squared_error, mean_absolute_error
rmse = mean_squared_error(true_unscaled, pred_unscaled)
mae = mean_absolute_error(true_unscaled, pred_unscaled)
print(f"Test RMSE: {rmse:.4f} | MAE: {mae:.4f}")
print("예측(앞 5개):", np.round(pred_unscaled[:5, 0], 2))
print("실제(앞 5개):", np.round(true_unscaled[:5, 0], 2))
import numpy as np
import matplotlib.pyplot as plt
# 1) 기준 인덱스들
L = sequence_length # 예: 5
t0 = split_idx # train/test 절단 시점 (원본 df 기준)
N_pred = len(pred_unscaled) # 테스트 예측 길이 = len(test_df) - L
# 2) 실제값(테스트 구간)과 예측값 준비
y_actual_test = df['Close'].values[t0 + L : t0 + L + N_pred] # 테스트 구간 실제 종가
y_pred_test = pred_unscaled.ravel() # 예측(원 스케일)
# 3) x축(원본 인덱스)에 정확히 맞춰 그림
x_test = np.arange(t0 + L, t0 + L + N_pred)
plt.figure(figsize=(20, 10))
plt.plot(df['Close'].values, color='gray', alpha=0.3, label='full series')
# 테스트 실제/예측 정렬해서 겹치기
plt.plot(x_test, y_actual_test, 'b', label='actual (test)')
plt.plot(x_test, y_pred_test, 'r', linewidth=0.8, label='prediction')
plt.legend()
plt.title('KOSPI Close — Test segment (aligned)')
plt.show()
Seq2Seq (0) | 2025.02.11 |
---|---|
LSTM과 GRU (0) | 2025.02.10 |
신경망 기반의 벡터화 (2) | 2025.02.04 |
벡터화 (1) | 2025.01.22 |
IMDB Dataset를 활용한 데이터 전처리 (3) | 2025.01.22 |