Posted:      Updated:

AI 스터디를 하며 ‘파이토치 트랜스포머를 활용한 자연어 처리와 컴퓨터비전 심층학습’ 교재를 정리한 글입니다.

Model

  • 인공 신경망 모듈로 구현
  • 모듈(Module) 클래스 활용
  • 새 모델 클래스 생성 시 모듈 클래스를 상속받아 임의의 서브 클래스 생성
  • 트리 구조로 중첩 가능

모듈 클래스

  • 초기화 메서드(__init__)와 순방향 메서드(forward)를 재정의하여 활용
  • 초기화 메서드: 신경망에 사용될 계층 초기화
  • 순방향 메서드: 모델이 어떤 구조를 갖게 될 지 정의
  • 모델 객체 호출 시 순방향 메서드 실행, 정의한 순서대로 학습 진행

기본형

  • 모델의 인스턴스를 호출하는 순간 호출 메서드(__call__)가 순방향 메서드 실행
  • super 로 부모 클래스 초기화해야 하고, 역방향(backward) 연산은 파이토치의 자동 미분 기능인 Autograd 에서 모델의 매개변수를 역으로 전파하여 자동으로 기울기 또는 변화도를 계산해주므로 정의하지 않아도 됨
import torch
import torch.nn as nn
import torch.nn.functional as F
class Model(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 20, 5)
        self.conv2 = nn.Conv2d(20, 20, 5)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        return x

비선형 회귀 (Non-Linear Regression)

import torch
import pandas as pd
from torch import nn
from torch import optim
from torch.utils.data import Dataset, random_split, DataLoader
# 사용자 정의 데이터셋
class CustomDataset(Dataset):
    def __init__(self, file_path):
        df = pd.read_csv(file_path)  # file_path로 csv 파일 불러오기
        self.x = df.iloc[:, 0].values
        self.y = df.iloc[:, 1].values
        self.length = len(df)

    def __getitem__(self, index):  # x, y값 반환 (2차 방정식 형태)
        x = torch.FloatTensor([self.x[index] ** 2, self.x[index]])
        y = torch.FloatTensor([self.y[index]])
        return x, y

    def __len__(self):
        return self.length
# 사용자 정의 모델
class CustomModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer = nn.Linear(2, 1)

    def forward(self, x):
        x = self.layer(x)
        return x
dataset = CustomDataset("datasets/non_linear.csv")
dataset_size = len(dataset)

# 데이터셋 분리 (훈련용, 검증용, 테스트용)
train_size = int(dataset_size * 0.8)
validation_size = int(dataset_size * 0.1)
test_size = dataset_size - train_size - validation_size
# 무작위 분리 함수로 데이터셋 분리
# random_split: 분리 길이만큼 데이터셋의 서브셋을 생성
train_dataset, validation_dataset, test_dataset = random_split(dataset, [train_size, validation_size, test_size])
print(f"Training Data Size : {len(train_dataset)}")
print(f"Validation Data Size : {len(validation_dataset)}")
print(f"Testing Data Size : {len(test_dataset)}")
train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True, drop_last=True)
validation_dataloader = DataLoader(validation_dataset, batch_size=4, shuffle=True, drop_last=True)
test_dataloader = DataLoader(test_dataset, batch_size=4, shuffle=True, drop_last=True)
device = "cuda" if torch.cuda.is_available() else "cpu" 
model = CustomModel().to(device)
criterion = nn.MSELoss().to(device)
optimizer = optim.SGD(model.parameters(), lr=0.0001)
for epoch in range(10000):
    cost = 0.0
    for x, y in train_dataloader:
        x = x.to(device)
        y = y.to(device)

        output = model(x)
        loss = criterion(output, y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        cost += loss

    cost = cost / len(train_dataloader)

    if (epoch + 1) % 1000 == 0:
        print(f"Epoch : {epoch + 1:4d}, Model : {list(model.parameters())}, Cost : {cost:.3f}")

출력

Training Data Size : 160
Validation Data Size : 20
Testing Data Size : 20
Epoch : 1000, Model : [Parameter containing:
tensor([[ 3.1024, -1.7030]], device='cuda:0', requires_grad=True), Parameter containing:
tensor([0.3497], device='cuda:0', requires_grad=True)], Cost : 0.089
Epoch : 2000, Model : [Parameter containing:
tensor([[ 3.1013, -1.7039]], device='cuda:0', requires_grad=True), Parameter containing:
tensor([0.4473], device='cuda:0', requires_grad=True)], Cost : 0.078
Epoch : 3000, Model : [Parameter containing:
tensor([[ 3.1009, -1.7039]], device='cuda:0', requires_grad=True), Parameter containing:
tensor([0.4859], device='cuda:0', requires_grad=True)], Cost : 0.077
Epoch : 4000, Model : [Parameter containing:
tensor([[ 3.0994, -1.7037]], device='cuda:0', requires_grad=True), Parameter containing:
tensor([0.5012], device='cuda:0', requires_grad=True)], Cost : 0.078
Epoch : 5000, Model : [Parameter containing:
tensor([[ 3.1005, -1.7040]], device='cuda:0', requires_grad=True), Parameter containing:
tensor([0.5073], device='cuda:0', requires_grad=True)], Cost : 0.078
Epoch : 6000, Model : [Parameter containing:
tensor([[ 3.0994, -1.7039]], device='cuda:0', requires_grad=True), Parameter containing:
tensor([0.5096], device='cuda:0', requires_grad=True)], Cost : 0.076
Epoch : 7000, Model : [Parameter containing:
tensor([[ 3.1008, -1.7040]], device='cuda:0', requires_grad=True), Parameter containing:
tensor([0.5106], device='cuda:0', requires_grad=True)], Cost : 0.076
Epoch : 8000, Model : [Parameter containing:
tensor([[ 3.1001, -1.7041]], device='cuda:0', requires_grad=True), Parameter containing:
tensor([0.5110], device='cuda:0', requires_grad=True)], Cost : 0.078
Epoch : 9000, Model : [Parameter containing:
tensor([[ 3.0995, -1.7039]], device='cuda:0', requires_grad=True), Parameter containing:
tensor([0.5112], device='cuda:0', requires_grad=True)], Cost : 0.079
Epoch : 10000, Model : [Parameter containing:
tensor([[ 3.1006, -1.7039]], device='cuda:0', requires_grad=True), Parameter containing:
tensor([0.5113], device='cuda:0', requires_grad=True)], Cost : 0.075

모델 평가

학습에 사용하지 않은 데이터를 모델에 입력하여 결과 확인

with torch.no_grad():  # no_grad: 기울기 계산 비활성화, 테스트 데이터 평가에 사용
    model.eval()  # 모델 평가 모드로 변경

    for x, y in validation_dataloader:
        x = x.to(device)
        y = y.to(device)
    
        outputs = model(x)
        print(f"X : {x}")
        print(f"Y : {y}")
        print(f"Outputs : {outputs}")
        print("--------------------")

출력

X : tensor([[13.6900, -3.7000],
        [36.0000,  6.0000],
        [40.9600, -6.4000],
        [96.0400,  9.8000]], device='cuda:0')
Y : tensor([[ 49.6900],
        [102.1200],
        [138.1000],
        [281.4000]], device='cuda:0')
Outputs : tensor([[ 49.2623],
        [101.9077],
        [138.4148],
        [281.5900]], device='cuda:0')
--------------------
X : tensor([[22.0900, -4.7000],
        [44.8900,  6.7000],
        [30.2500,  5.5000],
        [40.9600,  6.4000]], device='cuda:0')
Y : tensor([[ 76.6600],
        [127.8900],
        [ 85.2600],
        [116.1800]], device='cuda:0')
Outputs : tensor([[ 77.0108],
        [128.2789],
        [ 84.9315],
        [116.6049]], device='cuda:0')
--------------------
X : tensor([[39.6900, -6.3000],
        [ 2.8900,  1.7000],
        [ 5.7600,  2.4000],
        [42.2500, -6.5000]], device='cuda:0')
Y : tensor([[133.8000],
        [  6.6200],
        [ 13.8100],
        [142.6600]], device='cuda:0')
Outputs : tensor([[134.3068],
        [  6.5753],
        [ 14.2811],
        [142.5849]], device='cuda:0')
--------------------
X : tensor([[67.2400,  8.2000],
        [84.6400,  9.2000],
        [ 0.6400,  0.8000],
        [57.7600,  7.6000]], device='cuda:0')
Y : tensor([[195.2000],
        [247.6000],
        [  1.0400],
        [166.8600]], device='cuda:0')
Outputs : tensor([[195.0204],
        [247.2661],
        [  1.1325],
        [166.6495]], device='cuda:0')
--------------------
X : tensor([[90.2500,  9.5000],
        [30.2500, -5.5000],
        [70.5600, -8.4000],
        [62.4100, -7.9000]], device='cuda:0')
Y : tensor([[263.8000],
        [103.5500],
        [233.6700],
        [207.5900]], device='cuda:0')
Outputs : tensor([[264.1490],
        [103.6744],
        [233.5990],
        [207.4775]], device='cuda:0')
--------------------

모델 저장

  • 해당 모델을 나중에 다시 활용할 수 있음
  • 추가적인 평가, 추론에 활용 가능
  • 특정 시점의 모델 상태 보존
torch.save(
    model,
    "models/model.pt"
)
torch.save(
    model.state_dict(),
    "models/model_state_dict.pt"
)

모델 불러오기

import torch
import torch.nn as nn
# 불러올 모델과 동일한 구조로 선언
class CustomModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer = nn.Linear(2, 1)

    def forward(self, x):
        x = self.layer(x)
        return x
device = "cuda" if torch.cuda.is_available() else "cpu"
model = torch.load("models/model.pt", map_location=device)
model
with torch.no_grad():
    model.eval()
    inputs = torch.FloatTensor(
        [
            [1 ** 2, 1],
            [5 ** 2, 5],
            [11 ** 2, 11]
        ]
    ).to(device)
    outputs = model(inputs)
    print(outputs)

출력

CustomModel(
  (layer): Linear(in_features=2, out_features=1, bias=True)
)
tensor([[  1.9154],
        [ 69.4928],
        [356.8351]], device='cuda:0')

모델 구조 확인

class CustomModel(nn.Module):
    pass
device = "cuda" if torch.cuda.is_available() else "cpu"
model = torch.load("models/model.pt", map_location=device)
print(model)

출력

CustomModel(
  (layer): Linear(in_features=2, out_features=1, bias=True)
)

모델 상태 저장

  • 모델 전체를 저장하지 않고 모델 상태 값만 저장
  • model 변수가 아닌 state_dict 메서드로 모델 상태 저장
  • 모델 상태(torch.state_dict): 모델에서 학습이 가능한 매개변수를 순서가 있는 딕셔너리(OrderedDict) 형식으로 반환
  • 추론에 필요한 데이터와 가져와 저장하는 방식
torch.save(
    model.state_dict(),
    "models/model_state_dict.pt"
)

모델 상태 불러오기

  • 모델 상태만 불러올 경우 CustomModel 에 학습 결과 반영
  • 상태만 불러오면 모델 구조를 알 수 없으므로 CustomModel 클래스가 동일하게 구현되어 있어야 함
import torch
from torch import nn
class CustomModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer = nn.Linear(2, 1)

    def forward(self, x):
        x = self.layer(x)
        return x
device = "cuda" if torch.cuda.is_available() else "cpu"
model = CustomModel().to(device)
model_state_dict = torch.load("models/model_state_dict.pt", map_location=device)
model.load_state_dict(model_state_dict)  # 모델 상태 반영
with torch.no_grad():
    model.eval()
    inputs = torch.FloatTensor(
        [
            [1 ** 2, 1],
            [5 ** 2, 5],
            [11 ** 2, 11]
        ]
    ).to(device)
    outputs = model(inputs)
    print(outputs)

출력

<All keys matched successfully>
tensor([[  1.9154],
        [ 69.4928],
        [356.8351]], device='cuda:0')

체크포인트 저장을 추가한 비선형 회귀

  • 체크포인트: 학습 과정의 특정 지점마다 저장
  • epoch 마다 학습된 결과를 저장해 나중에 이어서 학습하게 함
import torch
import pandas as pd
from torch import nn, optim
from torch.utils.data import Dataset, DataLoader
class CustomDataset(Dataset):
    def __init__(self, file_path):
        df = pd.read_csv(file_path)  # file_path로 csv 파일 불러오기
        self.x = df.iloc[:, 0].values
        self.y = df.iloc[:, 1].values
        self.length = len(df)

    def __getitem__(self, index):  # x, y값 반환 (2차 방정식 형태)
        x = torch.FloatTensor([self.x[index] ** 2, self.x[index]])
        y = torch.FloatTensor([self.y[index]])
        return x, y

    def __len__(self):
        return self.length
class CustomModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer = nn.Linear(2, 1)

    def forward(self, x):
        x = self.layer(x)
        return x
dataset = CustomDataset("datasets/non_linear.csv")
dataset_size = len(dataset)

train_size = int(dataset_size * 0.8)
validation_size = int(dataset_size * 0.1)
test_size = dataset_size - train_size - validation_size
train_dataset, validation_dataset, test_dataset = random_split(dataset, [train_size, validation_size, test_size])
print(f"Training Data Size : {len(train_dataset)}")
print(f"Validation Data Size : {len(validation_dataset)}")
print(f"Testing Data Size : {len(test_dataset)}")
train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True, drop_last=True)
validation_dataloader = DataLoader(validation_dataset, batch_size=4, shuffle=True, drop_last=True)
test_dataloader = DataLoader(test_dataset, batch_size=4, shuffle=True, drop_last=True)
device = "cuda" if torch.cuda.is_available() else "cpu" 
model = CustomModel().to(device)
criterion = nn.MSELoss().to(device)
optimizer = optim.SGD(model.parameters(), lr=0.0001)
checkpoint = 1
for epoch in range(10000):
    cost = 0.0
    
    for x, y in train_dataloader:
        x = x.to(device)
        y = y.to(device)

        output = model(x)
        loss = criterion(output, y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        cost += loss

    cost = cost / len(train_dataloader)

    if (epoch + 1) % 1000 == 0:
        # 다양한 정보를 저장하기 위해 딕셔너리 형식으로 값 할당
        # 학습을 이허서 진행해야 하므로 epoch, model.state_dict, optimizer.state_dict 필수 포함
        torch.save(
            {
                "model": "CustomModel",
                "epoch": epoch,
                "model_state_dict": model.state_dict(),
                "optimizer_state_dict": optimizer.state_dict(),
                "cost": cost,
                "description": f"CustomModel 체크포인트-{checkpoint}",
            },
            f"models/checkpoint-{checkpoint}.pt",
        )
        
        checkpoint += 1

출력

Training Data Size : 160
Validation Data Size : 20
Testing Data Size : 20

체크포인트 불러오기를 추가한 비선형 회귀

import torch
import pandas as pd
from torch import nn, optim
from torch.utils.data import Dataset, DataLoader
class CustomDataset(Dataset):
    def __init__(self, file_path):
        df = pd.read_csv(file_path)  # file_path로 csv 파일 불러오기
        self.x = df.iloc[:, 0].values
        self.y = df.iloc[:, 1].values
        self.length = len(df)

    def __getitem__(self, index):  # x, y값 반환 (2차 방정식 형태)
        x = torch.FloatTensor([self.x[index] ** 2, self.x[index]])
        y = torch.FloatTensor([self.y[index]])
        return x, y

    def __len__(self):
        return self.length
class CustomModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer = nn.Linear(2, 1)

    def forward(self, x):
        x = self.layer(x)
        return x
dataset = CustomDataset("datasets/non_linear.csv")
dataset_size = len(dataset)

train_size = int(dataset_size * 0.8)
validation_size = int(dataset_size * 0.1)
test_size = dataset_size - train_size - validation_size
train_dataset, validation_dataset, test_dataset = random_split(dataset, [train_size, validation_size, test_size])
print(f"Training Data Size : {len(train_dataset)}")
print(f"Validation Data Size : {len(validation_dataset)}")
print(f"Testing Data Size : {len(test_dataset)}")
train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True, drop_last=True)
validation_dataloader = DataLoader(validation_dataset, batch_size=4, shuffle=True, drop_last=True)
test_dataloader = DataLoader(test_dataset, batch_size=4, shuffle=True, drop_last=True)
device = "cuda" if torch.cuda.is_available() else "cpu" 
model = CustomModel().to(device)
criterion = nn.MSELoss().to(device)
optimizer = optim.SGD(model.parameters(), lr=0.0001)
checkpoint = torch.load("models/checkpoint-6.pt")

model.load_state_dict(checkpoint["model_state_dict"])
optimizer.load_state_dict(checkpoint["optimizer_state_dict"])

checkpoint_epoch = checkpoint["epoch"]
checkpoint_description = checkpoint["description"]
print(checkpoint_description)
for epoch in range(checkpoint_epoch + 1, 10000):
    cost = 0.0

    for x, y in train_dataloader:
        x = x.to(device)
        y = y.to(device)

        output = model(x)
        loss = criterion(output, y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        cost += loss

    cost = cost / len(train_dataloader)

    if (epoch + 1) % 1000 == 0:
        print(f"Epoch : {epoch + 1:4d}, Model : {list(model.parameters())}, Cost : {cost:.3f}")

출력

Training Data Size : 160
Validation Data Size : 20
Testing Data Size : 20
CustomModel 체크포인트-6
Epoch : 7000, Model : [Parameter containing:
tensor([[ 3.0995, -1.7039]], device='cuda:0', requires_grad=True), Parameter containing:
tensor([0.4932], device='cuda:0', requires_grad=True)], Cost : 0.075
Epoch : 8000, Model : [Parameter containing:
tensor([[ 3.1001, -1.7039]], device='cuda:0', requires_grad=True), Parameter containing:
tensor([0.4905], device='cuda:0', requires_grad=True)], Cost : 0.075
Epoch : 9000, Model : [Parameter containing:
tensor([[ 3.1003, -1.7039]], device='cuda:0', requires_grad=True), Parameter containing:
tensor([0.4893], device='cuda:0', requires_grad=True)], Cost : 0.075
Epoch : 10000, Model : [Parameter containing:
tensor([[ 3.0999, -1.7035]], device='cuda:0', requires_grad=True), Parameter containing:
tensor([0.4888], device='cuda:0', requires_grad=True)], Cost : 0.075

Categories:

댓글남기기