github
각 모델 구현 코드 설명 - models.py
Train code
일단 필요한 라이브러리들을 import 해줍니다
import argparse
import sys
import time
import os
import datetime
from unittest import result
from models.resnet import Resnet
from models.vgg import VGG
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder
from torchvision import transforms
from torch.utils.tensorboard import SummaryWriter
Python
복사
accuracy 계산에 쓰이는 accuracy 함수
def accuracy(pred_tensor, y_tensor):
# accuracy = torch.eq(pred_tensor, y_tensor).sum().item() / len(pred_tensor)
accuracy = torch.sum(pred_tensor == y_tensor) / len(pred_tensor)
return accuracy
Python
복사
처음에 torch.eq로 짰다가 torch.sum으로 하는게 더 빠르다고 생각해 (알고리즘 상으로) 수정했습니다.
pred tensor==y_tensor는 같은 위치의 요소를 비교해 요소의 값이 같은 위치는 1, 값이 다른 위치는 0인 리스트를 반환합니다.
거기에 torch.sum을 해주면 같은 개수가 나오게 되고 그 값을 텐서의 길이, 즉 요소의 개수로 나눠주면 전체에 비해 맞은 비율 == accuracy가 나옵니다
test set으로 loss 및 acc 를 계산하는 evaluate함수
def evaluate(model, testloader, device):
model.eval()
test_acc = 0
test_loss = 0
# Starts batch test
for x_batch, y_batch in testloader:
x_batch, y_batch = x_batch.to(device), y_batch.to(device)
pred = model(x_batch)
pred = F.softmax(pred, dim=1)
loss = nn.CrossEntropyLoss()
loss_output=loss(pred, y_batch).item()
test_loss += loss_output
# acc = accuracy(pred, y_batch)
prediction = torch.argmax(pred, dim=1)
acc = accuracy(prediction, y_batch)
test_acc += acc
test_acc = test_acc/ len(testloader)
test_loss = test_loss / len(testloader)
return test_loss, test_acc
Python
복사
train함수에서 만드는 model, testlaoder와 device를 넘겨주면 testloader의 데이터들로 acc와 loss를 계산합니다.
일단 평가기 때문에 model은 evaluation 모드로 바꿔주고 (model.eval())
x_batch, y_batch불러와 device에 넣고
model(x_batch) 로 예측을 수행한 후
F.softmax를 통해 softmax를 취한 값을 y_batch와 함께 loss 함수에 넣어주어 loss 를 구해줍니다.
이 때 주의할점 !
loss_output=loss(pred, y_batch).item() 에서 .item()을 해주지 않으면 loss의 값만이 저장되는 것이 아닌 tensor 자체가 계속해서 쌓이게 되어서 OOM 에러가 발생하게 됩니다 ㅠㅠ
예측값에 argmax를 통해 예측 라벨을 구해준 후 y_batch와 accuracy함수에 넘겨주어 accuracy를 계산합니다
batch마다 loss와 acc를 더해주다가 마지막에 testloader의 길이만큼( 전체 데이터 수 / batch 수 = iteration수 ) 나눠주어 Test_loss 와 test _acc를 반환해줍니다
train 함수
train.py의 마지막 부분에서는 이렇게 main함수를 실행시켜주는데요, argumentparser로 argument들을 추출한 뒤 그것들을 train함수에 넘겨줍니다. epoch수, batch size, model 이름 등이 있습니다~
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--num_epochs', type=int, default=50, help='the number of epochs')
parser.add_argument('--batch_size', type=int, default=8, help='batch size')
parser.add_argument('--model_name', type=str, default='resnet', help='the name of model - resnet or vgg16')
parser.add_argument('--dataset_path', type=str, default='../data_1', help='dataset directory path')
parser.add_argument('--start_epoch', type=int, default=0, help='the start number of epochs')
parser.add_argument('--load_model',default=None, type=str, help='the name of saved model file (.pt)')
parser.add_argument('--save_txt', type=bool, default=True, help='if it''s true, the result of trainig will be saved as txt file.')
opt = parser.parse_args()
train(opt)
Python
복사
def train(opt):
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
num_epochs, batch_size, model_name = opt.num_epochs, opt.batch_size, opt.model_name
start_epoch, load_model, dataset_path, save_txt = opt.start_epoch, opt.load_model, opt.dataset_path, opt.save_txt
print( f"%% model name : {model_name}, num epochs : {num_epochs}, batch size : {batch_size} %%")
if model_name == 'resnet':
number = input("the number of resnet layers(18, 34, 50, 101, 152): ")
number = int(number)
model = Resnet(number).to(device)
data_transform = transforms.Compose([
transforms.ToTensor(), # normalize는 PIL이미지 형태가 아닌 Tensor형태에서 수행되어야하므로 앞에 이 줄을 꼭 넣어줘야함
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
elif model_name == 'vgg':
number = input("the number of vgg layers(11, 13, 16, 19): ")
number = int(number)
model = VGG(number).to(device)
data_transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(), # normalize는 PIL이미지 형태가 아닌 Tensor형태에서 수행되어야하므로 앞에 이 줄을 꼭 넣어줘야함
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
else:
print("it's not appropriate name")
sys.exit(0)
if load_model is not None:
model.load_state_dict(torch.load(load_model))
trainset = ImageFolder(root=dataset_path+"/train", transform=data_transform)
# testset = ImageFolder(root=dataset_path+"/test", transform=data_transform)
testset = ImageFolder(root=dataset_path+"/val", transform=data_transform)
trainloader = DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=2)
testloader = DataLoader(testset, batch_size=1, shuffle=False, num_workers=2)
# print(trainset.classes) [female, male]
# print(trainset.class_to_idx) female:0, male:1
trainloader_length = len(trainloader)
# optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9, dampening=0, weight_decay=0.0001, nesterov=False) # batch size 클 때
Python
복사
일단 opt로 넘겨진 인자들을 함수 내의 지역변수로 되받아주고
model_name에 따라 모델을 생성해줍니다. 모델에 따라 transforms들을 저장해주는 data_trasform 이 조금 다릅니다.
trasforms.Compose를 할 떄 주의할 점은, Normalize는 이미지를 tensor로 바꾸어 시행해주어야 하므로 transforms.ToTensor()를 앞에 해줍니다
이후 폴더별로 데이터셋을 생성해주는 torchvisions.datasets.ImageFolder로 trainset과 testset을 만들어주고
torch.utils.data.DataLoader로 각각 trainloader와 testloader를 생성해줍니다
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.999), eps=1e-08, weight_decay=0.0001, amsgrad=False)
# nesterov : 현재 위치의 그래디언트 g(\theta_t) 를 이용하는 것이 아니고 현재 위치에서 속도 \mu v_t만큼 전진한 후의 그래디언트 g(\theta_t + \mu v_t) 를 이용합니다. 사람들은 이를 가리켜 선험적으로 혹은 모험적으로 먼저 진행한 후 에러를 교정한다라고 표현합니다.
lr_scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=10)
writer = SummaryWriter()
start = time.time()
best_test_acc = 0
Python
복사
이 부분은 optimizer와 learning rate를 조정해주는 lr_scheduler 정의 부분인데,
Adam과 SGD를 둘 다 사용하였었는데, 아무래도 배치사이즈가 크지 않아서 Adam이 더 효율적이라 Adam을 사용했습니다.
또한 lr scheduler는 ReduceLROnPlateu를 통해 10epoch동안에 Test loss가 줄어들지 않으면 lr을 0.1배씩해주는 방식을 사용했습니다.
이후에는 Tensorboard에 기록하기 위한 SummaryWriter객체를 생성해주고
시간을 측정하기 위한 시작시간을 time 라이브러리로 저장해주고 (start = time.time() )
test_acc가 가장 높은 웨이트를 저장해줄것이기 떄문에 test acc를 저장해줄 best_test_acc 변수를 초기화 해줍니다
dt = datetime.datetime.now()
save_model_path = './save_files/'+f"{model_name}_{dt.month}-{dt.day}-{dt.hour}-{dt.minute}"
os.mkdir(save_model_path)
if save_txt:
f = open(save_model_path + '/result.txt','w')
for epoch in range(start_epoch, num_epochs):
# Set model in training model
model.train()
train_acc = 0
train_loss = 0
# Starts batch training
iter=0
for x_batch, y_batch in trainloader:
iter +=1
msg = '\riteration %d / %d'%(iter, trainloader_length)
print(' '*len(msg), end='')
print(msg, end='')
time.sleep(0.1)
x_batch, y_batch = x_batch.to(device), y_batch.to(device)
optimizer.zero_grad()
pred = model(x_batch)
# torch에서 nn.CrossEntropyLoss()는 softmax도 함꼐 이루어지므로 굳이 안해도 됨
# pred = F.softmax(pred, dim=1)
loss = nn.CrossEntropyLoss()
loss_output = loss(pred, y_batch)
# gradient calculation
loss_output.requires_grad_(True) # RuntimeError: element 0 of tensors does not require grad and does not have a grad_fn 해결
loss_output.backward()
# gradient update
optimizer.step()
prediction = torch.argmax(pred, dim=1)
acc = accuracy(prediction, y_batch)
train_acc += acc
train_loss += loss_output.item()
# evaluate after 1 epoch training
torch.cuda.empty_cache()
test_loss, test_acc = evaluate(model, testloader, device)
if test_acc > best_test_acc:
torch.save(model.state_dict(), save_model_path+ f'/{model_name}_{num_epochs}ep_{batch_size}b_best.pt')
best_test_acc = test_acc
train_acc = train_acc / trainloader_length
train_loss = train_loss / trainloader_length
lr_scheduler.step(test_loss)
writer.add_scalars('Loss', {'trainloss':train_loss, 'testloss':test_loss}, epoch)
writer.add_scalars('Accuracy', {'trainacc':train_acc, 'testacc':test_acc}, epoch)
# Metrics calculation
result_txt = "\nEpoch: %d, loss: %.8f, Train accuracy: %.8f, Test accuracy: %.8f, Test loss: %.8f, lr: %5f" % (epoch+1, train_loss, train_acc, test_acc, test_loss, optimizer.param_groups[0]['lr'])
f.write(result_txt)
print(result_txt)
# print("\nEpoch: %d, loss: %.8f, Train accuracy: %.8f, Test accuracy: %.8f, Test loss: %.8f, lr: %5f" % (epoch+1, train_loss, train_acc, test_acc, test_loss, optimizer.param_groups[0]['lr']))
finish = time.time()
print("---------training finish---------")
total_result_txt = "\nTotal time: %d(sec), Total Epoch: %d, loss: %.5f, Train accuracy: %.5f, Test accuracy: %.5f, Test loss: %.5f" % (finish-start, num_epochs, train_loss, train_acc, test_acc, test_loss)
# print("\nTotal time: %d(sec), Total Epoch: %d, loss: %.5f, Train accuracy: %.5f, Test accuracy: %.5f, Test loss: %.5f" % (finish-start, num_epochs, train_loss, train_acc, test_acc, test_loss))
print(total_result_txt)
f.write(total_result_txt)
f.close()
torch.save(model.state_dict(), save_model_path+ f'/{num_epochs}ep_{batch_size}b_final.pt')
Python
복사
이 부분에서는 일단 이번 학습의 결과의 txt파일과 웨이트 파일을 저장할 폴더를 os.mkdir로 만들어주고 (이름 지정에 대해서는 생략하겠습니다)
for epoch in range(start_epoch, num_epochs)로 본격적인 학습을 시작합니다.
model을 train모드로 바꿔준 다음에
for x_batch, y_batch in trainloader: 로 데이터를 불러와서
.to(device)로 지정 device에 넣어주고,
다음 부분이 중요하기에 한번 더 쓰겠습니다
optimizer.zero_grad()
pred = model(x_batch)
# torch에서 nn.CrossEntropyLoss()는 softmax도 함꼐 이루어지므로 굳이 안해도 됨
# pred = F.softmax(pred, dim=1)
loss = nn.CrossEntropyLoss()
loss_output = loss(pred, y_batch)
# gradient calculation
loss_output.requires_grad_(True) # RuntimeError: element 0 of tensors does not require grad and does not have a grad_fn 해결
loss_output.backward()
# gradient update
optimizer.step()
prediction = torch.argmax(pred, dim=1)
acc = accuracy(prediction, y_batch)
train_acc += acc
train_loss += loss_output.item()
Python
복사
optimizer의 gradient가 계속 쌓이지 않도록 .zero_grad()로 초기화 해주고,
model에 x_batch를 넣어 예측해줍니다
이후 crossentropyloss를 계산하는데, pred와 y_batch를 넘겨줍니다
그 이후 계산된 loss 인 loss_output을 통해 backpropagation을 진행하기 우해 output_loss.backward()를 해주는데, 이 때 loss는 grad를 필요로 하지 않는다는 식의 오류가 떠서 loss_output.requires_grad_(True) 로 해결해줬습니다.
그 이후에 optimizer.step()으로 weight 업데이트를 진행해줍니다.
zerograd - pred - loss - loss backward - opt step 이 순서를 기억해주세요
이후 argmax로 예측 라벨값을 뽑아 acc계산해주면 끝
한 epoch가 끝나면
test_loss, test_acc = evaluate(model, testloader, device)
if test_acc > best_test_acc:
torch.save(model.state_dict(), save_model_path+ f'/{model_name}_{num_epochs}ep_{batch_size}b_best.pt')
best_test_acc = test_acc
train_acc = train_acc / trainloader_length
train_loss = train_loss / trainloader_length
lr_scheduler.step(test_loss)
writer.add_scalars('Loss', {'trainloss':train_loss, 'testloss':test_loss}, epoch)
writer.add_scalars('Accuracy', {'trainacc':train_acc, 'testacc':test_acc}, epoch)
# Metrics calculation
result_txt = "\nEpoch: %d, loss: %.8f, Train accuracy: %.8f, Test accuracy: %.8f, Test loss: %.8f, lr: %5f" % (epoch+1, train_loss, train_acc, test_acc, test_loss, optimizer.param_groups[0]['lr'])
f.write(result_txt)
print(result_txt)
Python
복사
이렇게 evaluate함수 돌려주고
지금까지 중 best test acc라면 torch.save로 웨이트 파일을 저장해줍니다
test loss도 계산되었기 떄문에 lr_scheduler도 step해줘요
모든 학습이 끝나면
# print("\nEpoch: %d, loss: %.8f, Train accuracy: %.8f, Test accuracy: %.8f, Test loss: %.8f, lr: %5f" % (epoch+1, train_loss, train_acc, test_acc, test_loss, optimizer.param_groups[0]['lr']))
finish = time.time()
print("---------training finish---------")
total_result_txt = "\nTotal time: %d(sec), Total Epoch: %d, loss: %.5f, Train accuracy: %.5f, Test accuracy: %.5f, Test loss: %.5f" % (finish-start, num_epochs, train_loss, train_acc, test_acc, test_loss)
# print("\nTotal time: %d(sec), Total Epoch: %d, loss: %.5f, Train accuracy: %.5f, Test accuracy: %.5f, Test loss: %.5f" % (finish-start, num_epochs, train_loss, train_acc, test_acc, test_loss))
print(total_result_txt)
f.write(total_result_txt)
f.close()
torch.save(model.state_dict(), save_model_path+ f'/{num_epochs}ep_{batch_size}b_final.pt')
Python
복사
이렇게 끝나는 시간 측정해 총 걸린 시간, epoch, 최종 loss값, acc값을 출력해주고 마지막 weight를 torch.save해줍니다
Inference code
$ python3 train.py --num_epochs 30 --batch_size 4 --model_name resnet
Python
복사