[Project1] Binary_classification_of_E.coli
0. 서론
사진을 input으로 넣었을 때, E.coli 사진인지 구분하는 binary_classification model을 만들어보려 한다.
따라서, 준비해야하는 것은 E.coli 사진과 E.coli가 아닌 사진이 필요하다.
내가 이번에 모은 사진은 E.coli 50장 , non_E.coli 50장이다.
모으는 방법은 google image에서 수작업으로 진행했다. 다음번 프로젝트에서는 웹 크롤링을 이용해서 모아보려 한다.
모은 사진은 아래와 같이 폴더 구조를 만들자.
dataset/
├── E_coli/
│ ├── image1.jpg
│ ├── image2.jpg
│ └── ...
└── Non_E_coli/
├── image1.jpg
├── image2.jpg
└── ...
1. 데이터 전처리
이제 데이터 전처리를 진행할 것이다.
첫번째로, data를 나누는 과정을 진행할 것이다.
import shutil
from os import listdir
from os.path import isfile, join
def split_data(source, train_dir, test_dir, split_ratio=0.8):
"""
데이터를 train과 test 폴더로 분할하는 함수.
"""
if not os.path.exists(train_dir):
os.makedirs(train_dir)
if not os.path.exists(test_dir):
os.makedirs(test_dir)
classes = [d for d in os.listdir(source) if os.path.isdir(join(source, d))]
for cls in classes:
cls_source = join(source, cls)
images = [f for f in listdir(cls_source) if isfile(join(cls_source, f))]
train_images, test_images = train_test_split(images, train_size=split_ratio, random_state=42)
cls_train_dir = join(train_dir, cls)
cls_test_dir = join(test_dir, cls)
if not os.path.exists(cls_train_dir):
os.makedirs(cls_train_dir)
if not os.path.exists(cls_test_dir):
os.makedirs(cls_test_dir)
for img in train_images:
shutil.copy(join(cls_source, img), join(cls_train_dir, img))
for img in test_images:
shutil.copy(join(cls_source, img), join(cls_test_dir, img))
# 원본 데이터 경로
source_dir = data_dir
# Train과 Test 디렉토리 생성
train_dir = '/content/drive/My Drive/dataset_split/train'
test_dir = '/content/drive/My Drive/dataset_split/test'
# 데이터 분할 실행
split_data(source_dir, train_dir, test_dir, split_ratio=0.8)
dataset_split / train / E_coli , Non_E_coli
dataset_split / test / E_coli , Non_E_coli
코드를 실행하고나면, 위와 같이 dataset_split이 진행된다.
이제, transfer-learning을 이용하기위해, 기존의 모델(resnet)이 학습했던 형식에 맞게 사진을 바꿔야한다.
# 하이퍼파라미터 설정
input_size = 224 # VGG16, ResNet 등 사전 학습된 모델이 기대하는 이미지 크기
# 데이터 전처리 및 증강 설정
data_transforms = {
'train': transforms.Compose([
transforms.RandomResizedCrop(input_size),
transforms.RandomHorizontalFlip(),
transforms.RandomRotation(15),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], # ImageNet 평균
[0.229, 0.224, 0.225]) # ImageNet 표준편차
]),
'test': transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(input_size),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406],
[0.229, 0.224, 0.225])
]),
}
위의 코드를 실행하면, 이제 resnet이 학습했던 데이터처럼 바뀐다.
2. Dataset load
이제 dataset 을 load 하자.
datasets.ImageFolder를 이용해, 라벨링을 해주고, 객체를 생성한다.
이후, DataLoader를 통해서, batch 크기와 shuffle,num_workers를 설정해준다.
# 데이터셋 로드
image_datasets = {x: datasets.ImageFolder(os.path.join('/content/drive/My Drive/dataset_split', x),
data_transforms[x])
for x in ['train', 'test']}
# 데이터 로더 생성
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=16,
shuffle=True, num_workers=2)
for x in ['train', 'test']}
# 데이터셋 크기 확인
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'test']}
print(dataset_sizes)
# 클래스 이름 확인
class_names = image_datasets['train'].classes
print(class_names)
3. ResNet 18 model 불러오기
# 사전 학습된 ResNet18 모델 불러오기
model_ft = models.resnet18(pretrained=True)
# 마지막 완전 연결 층 수정 (Binary Classification을 위해 출력 노드를 1개로 설정)
num_ftrs = model_ft.fc.in_features
model_ft.fc = nn.Sequential(
nn.Linear(num_ftrs, 1),
nn.Sigmoid()
)
# GPU 사용 가능 여부 확인
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model_ft = model_ft.to(device)
# 손실 함수 정의 (Binary Classification을 위해 BCELoss 사용)
criterion = nn.BCELoss()
# 옵티마이저 정의 (모델의 모든 파라미터를 업데이트)
optimizer_ft = optim.Adam(model_ft.parameters(), lr=1e-4)
# 학습 스케줄러 정의 (학습률 감소)
exp_lr_scheduler = optim.lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)
# 모델 요약 확인
print(model_ft)
ResNet 18 은 1000개의 class를 예측하는 모델이다. 나는 1개의 class만 예측하면 되기때문에, fc부분을 1로 설정해줘야한다. 또한, 확률값을 표시해주기위해서 sigmoid를 이용했다.
손실함수는 BinaryCrossEntropy를 사용했다. 이진분류에서는 BCE가 자주 사용된다고한다.
optimizer는 adam을 사용했다.
4. (선택) 데이터에 문제가 있을 때 실행
데이터에 문제가 있어, training이 진행되지않을때는, 아래 코드를 실행해서 데이터를 제거해줘야한다.
실제로 , 나도 실행하는 중 문제가 있어서, 데이터를 삭제하고 새로운 데이터를 추가해줬다.
from PIL import Image
# 문제 이미지 경로
problem_image_path = '/content/drive/My Drive/dataset_split/train/E_coli/ecoli2.png'
try:
img = Image.open(problem_image_path)
img.verify() # 이미지가 손상되지 않았는지 확인
print(f"이미지 {problem_image_path}는 정상입니다.")
except Exception as e:
print(f"이미지 {problem_image_path}를 여는 중 오류 발생: {e}")
import os
from PIL import Image
def find_and_remove_corrupted_images(directory):
"""
주어진 디렉토리 내의 모든 이미지를 검사하고, 손상된 이미지를 제거합니다.
"""
corrupted_images = []
for root, _, files in os.walk(directory):
for file in files:
if file.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif', '.tiff')):
file_path = os.path.join(root, file)
try:
with Image.open(file_path) as img:
img.verify() # 이미지가 손상되지 않았는지 확인
except Exception as e:
print(f"손상된 이미지 발견: {file_path} - {e}")
corrupted_images.append(file_path)
# 손상된 이미지 삭제
for img_path in corrupted_images:
try:
os.remove(img_path)
print(f"삭제된 이미지: {img_path}")
except Exception as e:
print(f"이미지 삭제 실패: {img_path} - {e}")
# 데이터셋 경로 설정
train_dir = '/content/drive/My Drive/dataset_split/train'
test_dir = '/content/drive/My Drive/dataset_split/test'
# 훈련 데이터에서 손상된 이미지 제거
print("훈련 데이터에서 손상된 이미지 검사 중...")
find_and_remove_corrupted_images(train_dir)
# 테스트 데이터에서 손상된 이미지 제거
print("\n테스트 데이터에서 손상된 이미지 검사 중...")
find_and_remove_corrupted_images(test_dir)
5. training and testing
def train_model(model, criterion, optimizer, scheduler, num_epochs=25):
since = time.time()
best_model_wts = copy.deepcopy(model.state_dict())
best_acc = 0.0
# 기록을 저장할 리스트
train_acc_history = []
test_acc_history = []
train_loss_history = []
test_loss_history = []
for epoch in range(num_epochs):
print('Epoch {}/{}'.format(epoch +1, num_epochs))
print('-' * 10)
# 각 에포크는 훈련과 테스트 단계로 구성
for phase in ['train', 'test']:
if phase == 'train':
model.train() # 모델을 훈련 모드로 설정
else:
model.eval() # 모델을 평가 모드로 설정
running_loss = 0.0
running_corrects = 0
# 데이터 로더에서 배치 단위로 데이터 가져오기
for inputs, labels in dataloaders[phase]:
inputs = inputs.to(device)
labels = labels.to(device).float().unsqueeze(1) # BCE Loss를 위해 형상 맞추기
# 옵티마이저 초기화
optimizer.zero_grad()
# 순전파
with torch.set_grad_enabled(phase == 'train'):
outputs = model(inputs)
loss = criterion(outputs, labels)
preds = (outputs > 0.5).float()
# 역전파 및 최적화 (훈련 단계에서만)
if phase == 'train':
loss.backward()
optimizer.step()
# 통계
running_loss += loss.item() * inputs.size(0)
running_corrects += torch.sum(preds == labels.data)
if phase == 'train':
scheduler.step()
epoch_loss = running_loss / dataset_sizes[phase]
epoch_acc = running_corrects.double() / dataset_sizes[phase]
print('{} Loss: {:.4f} Acc: {:.4f}'.format(
phase, epoch_loss, epoch_acc))
# 기록 저장
if phase == 'train':
train_loss_history.append(epoch_loss)
train_acc_history.append(epoch_acc.item())
else:
test_loss_history.append(epoch_loss)
test_acc_history.append(epoch_acc.item())
# 모델 성능이 향상되면 모델 가중치 저장
if epoch_acc > best_acc:
best_acc = epoch_acc
best_model_wts = copy.deepcopy(model.state_dict())
print()
time_elapsed = time.time() - since
print('Training complete in {:.0f}m {:.0f}s'.format(
time_elapsed // 60, time_elapsed % 60))
print('Best Test Acc: {:4f}'.format(best_acc))
# 최상의 모델 가중치 로드
model.load_state_dict(best_model_wts)
# 기록 반환
history = {
'train_loss': train_loss_history,
'train_acc': train_acc_history,
'test_loss': test_loss_history,
'test_acc': test_acc_history
}
return model, history
# 학습 실행 (예: 20 에포크)
model_ft, history = train_model(model_ft, criterion, optimizer_ft, exp_lr_scheduler,
num_epochs=20)
output
6. test dataset 평가
# 테스트 데이터셋 평가
def evaluate_model(model, dataloader, dataset_size):
model.eval()
running_corrects = 0
running_loss = 0.0
with torch.no_grad():
for inputs, labels in dataloaders['test']:
inputs = inputs.to(device)
labels = labels.to(device).float().unsqueeze(1)
outputs = model(inputs)
loss = criterion(outputs, labels)
preds = (outputs > 0.5).float()
running_loss += loss.item() * inputs.size(0)
running_corrects += torch.sum(preds == labels.data)
total_loss = running_loss / dataset_size
total_acc = running_corrects.double() / dataset_size
print('Test Loss: {:.4f} Acc: {:.4f}'.format(total_loss, total_acc))
evaluate_model(model_ft, dataloaders['test'], dataset_sizes['test'])
output :
Test Loss: 0.1849 Acc: 0.9487
7. 모델 저장
# 모델 저장
model_save_path = '/content/drive/My Drive/models/ecoli_classifier.pth'
torch.save(model_ft.state_dict(), model_save_path)
print('Model saved to', model_save_path)
# 모델 로드
model_loaded = models.resnet18(pretrained=True)
num_ftrs = model_loaded.fc.in_features
model_loaded.fc = nn.Sequential(
nn.Linear(num_ftrs, 1),
nn.Sigmoid()
)
model_loaded.load_state_dict(torch.load(model_save_path))
model_loaded = model_loaded.to(device)
print('Model loaded from', model_save_path)
8. 예측해보기
from PIL import Image
# 예측 함수 정의
def predict_image(model, image_path, transform, class_names):
model.eval()
image = Image.open(image_path).convert('RGB')
image = transform(image).unsqueeze(0) # 배치 차원 추가
image = image.to(device)
with torch.no_grad():
output = model(image)
probability = output.item()
prediction = 1 if probability > 0.5 else 0
class_label = class_names[prediction]
return probability, class_label
# 예측 예시
# Google Drive에 있는 새로운 이미지의 경로를 설정
new_image_path = "/content/drive/MyDrive/dataset/E_coli/ecoli.png"
# 변환 정의 (전처리와 동일하게 설정)
predict_transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(input_size),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406],
[0.229, 0.224, 0.225])
])
prob, label = predict_image(model_ft, new_image_path, predict_transform, class_names)
print(f'Probability of E. coli: {prob:.4f}')
print(f'Predicted Label: {label}')
output :
Probability of E. coli: 0.0016
Predicted Label: E_coli