Back to Article
lista5.ipynb
Download Notebook

Instalar optuna no Colab (único módulo dos que não usarei que não está pré instalado)

In [2]:
!pip install optuna

Importando módulos

In [2]:
from google.colab import drive
import os
import glob
import random
import numpy as np
from PIL import Image
from tqdm import tqdm

import pandas as pd

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
import torchvision.transforms as T
import torchvision.models as models

from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, precision_score, recall_score, confusion_matrix

import optuna

Criando conexão com o meu drive, onde subi as imagens

In [3]:
drive.mount('/content/drive')
Mounted at /content/drive

Definindo o caminho (remoto) dos dados

In [4]:
ROOT = '/content/drive/MyDrive/colab_data/rna1/lista5/Treino'

Definindo questões operacionais: Modo fast debug para rodar rapidinho e verificar erros e problemas, desligar para rodar de verdade

fixando o tamanho da imagem para upscale - como estou usando modelos pré treinados para 224x224, a recomendação da bibliografia é (neste caso) aumentar o tamanho das imagens de 96x103 para 224x224, para garantir o correto funcionamento dos filtros kernel de imagem da forma que foram concebidos nos modelos originais.

In [5]:
FAST_DEBUG = False
if FAST_DEBUG:
    N_EPOCHS = 2
    N_TRIALS = 2
    BATCH_SIZE = 16
else:
    N_EPOCHS = 20
    N_TRIALS = 30
    BATCH_SIZE = 32

IMAGE_HEIGHT = 224
IMAGE_WIDTH  = 224

Garantindo possibilidades: rodar na GPU se possível, no processador, caso não tenha.

In [6]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("DEVICE:", device)
print("FAST_DEBUG:", FAST_DEBUG)
DEVICE: cuda
FAST_DEBUG: False

Trazer os dados para o python

In [7]:
def collect_image_files(root):
    exts = ("*.bmp", "*.BMP")
    files = []
    for e in exts:
        files.extend(glob.glob(os.path.join(root, e)))
    files = sorted(files)
    return files

all_files = collect_image_files(ROOT)
if len(all_files) == 0:
    raise RuntimeError(f"Nenhuma imagem encontrada")

Preparando os dados pré-separação em treino e validação

In [8]:
class FingerprintDataset(Dataset):
    def __init__(self, files_list, transform=None):
        self.files = list(files_list)
        self.transform = transform
        self.labels = []
        for f in self.files:
            name = os.path.basename(f)
            first = name[0].upper() if len(name) > 0 else ""
            if first == "F":
                self.labels.append(1)
            elif first == "M":
                self.labels.append(0)
            else:
                base = name.split("_")[0].upper() if "_" in name else first
                if base == "F":
                    self.labels.append(1)
                elif base == "M":
                    self.labels.append(0)
                else:
                    raise ValueError(f"Nome de arquivo não tem F/M na frente: {name}")

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        path = self.files[idx]
        img = Image.open(path)
        if self.transform:
            img = self.transform(img)
        label = self.labels[idx]
        return img, label

Criando os tensores - data augmentation sendo feito nesta etapa

obs: a normalização está sendo feita com os pesos dos modelos pré treinados, em detrimento das estatísticas dos meus dados.

In [9]:
train_tf = T.Compose([

    T.Resize((IMAGE_HEIGHT, IMAGE_WIDTH)),
    T.Grayscale(num_output_channels=3),

    T.RandomApply([T.ColorJitter(brightness=0.1, contrast=0.1)], p=0.2),
    T.RandomApply([T.GaussianBlur(kernel_size=3, sigma=(0.1, 0.5))], p=0.1),
    T.RandomHorizontalFlip(p=0.5),
    T.RandomRotation(degrees=(-3, 3)),
    T.RandomResizedCrop(size=(IMAGE_HEIGHT, IMAGE_WIDTH), scale=(0.95, 1.0)),
    T.RandomAffine(degrees=0, translate=(0.03, 0.03)),

    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),

    ])

val_tf = T.Compose([

    T.Resize((IMAGE_HEIGHT, IMAGE_WIDTH)),
    T.Grayscale(num_output_channels=3),

    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),

])

Split treino/validação (80/20)

In [10]:
labels_all = []
for f in all_files:
    name = os.path.basename(f)
    first = name[0].upper() if len(name) > 0 else ""
    if first == "F":
        labels_all.append(1)
    elif first == "M":
        labels_all.append(0)
    else:
        base = name.split("_")[0].upper() if "_" in name else first
        labels_all.append(1 if base == "F" else 0)

labels_all = np.array(labels_all)

train_idx, val_idx = train_test_split(
    np.arange(len(all_files)),
    test_size=0.2,
    stratify=labels_all,
    random_state=42
)
train_files = [all_files[i] for i in train_idx]
val_files   = [all_files[i] for i in val_idx]

train_dataset = FingerprintDataset(train_files, transform=train_tf)
val_dataset   = FingerprintDataset(val_files, transform=val_tf)

Aplicando pesos para a classe com menor frequência (impressões digitais de mulheres)

In [11]:
train_labels = np.array(train_dataset.labels)
class_counts = np.bincount(train_labels)

class_weights = 1.0 / (class_counts)
sample_weights = class_weights[train_labels]

sampler = WeightedRandomSampler(
    weights=sample_weights.tolist(),
    num_samples=len(sample_weights),
    replacement=True
)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, sampler=sampler, num_workers=0, pin_memory=False)
val_loader   = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0, pin_memory=False)

Definindo modelos: Ao invés de montar uma arquitetura própria, trouxe algumas sugestões de arquiteturas pré-treinadas para classificação de imagens

In [12]:
def create_model(trial):
    model_name = trial.suggest_categorical(
        "model_type",
        ["resnet18", "resnet50", "mobilenet_v2", "efficientnet_b0", "densenet121"]
    )

    if model_name == "resnet18":
        model = models.resnet18(weights="IMAGENET1K_V1")
        model.fc = nn.Linear(model.fc.in_features, 1)

    elif model_name == "resnet50":
        model = models.resnet50(weights="IMAGENET1K_V1")
        model.fc = nn.Linear(model.fc.in_features, 1)

    elif model_name == "mobilenet_v2":
        model = models.mobilenet_v2(weights="IMAGENET1K_V1")
        model.classifier[1] = nn.Linear(model.classifier[1].in_features, 1)

    elif model_name == "efficientnet_b0":
        model = models.efficientnet_b0(weights="IMAGENET1K_V1")
        model.classifier[1] = nn.Linear(model.classifier[1].in_features, 1)

    elif model_name == "densenet121":
        model = models.densenet121(weights="IMAGENET1K_V1")
        model.classifier = nn.Linear(model.classifier.in_features, 1)

    return model.to(device)

Treino e avaliação dos modelos

In [13]:
def train_and_evaluate(model, trial=None, save_path=None):
    lr = trial.suggest_float("lr", 1e-5, 1e-3, log=True) if trial is not None else 1e-4
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.BCEWithLogitsLoss()

    best_f1 = 0.0
    best_state = None
    patience = 3
    no_improve = 0

    for epoch in range(N_EPOCHS):
        model.train()
        running_loss = 0.0
        for imgs, labels in train_loader:
            imgs = imgs.to(device)
            labels = labels.float().to(device)

            optimizer.zero_grad()
            logits = model(imgs).squeeze(1)
            loss = criterion(logits, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * imgs.size(0)

        model.eval()
        preds = []
        trues = []
        with torch.no_grad():
            for imgs, labels in val_loader:
                imgs = imgs.to(device)
                labels = labels.to(device)
                logits = model(imgs).squeeze(1)
                probs = torch.sigmoid(logits)
                pred_bin = (probs > 0.5).long().cpu().numpy()
                preds.extend(pred_bin.tolist())
                trues.extend(labels.cpu().numpy().tolist())

        f1 = f1_score(trues, preds, zero_division=0)
        prec = precision_score(trues, preds, zero_division=0)
        rec  = recall_score(trues, preds, zero_division=0)

        print(f"Epoch {epoch+1}/{N_EPOCHS} - loss: {running_loss/len(train_dataset):.4f} - F1: {f1:.4f} - P: {prec:.4f} - R: {rec:.4f}")

        if f1 > best_f1:
            best_f1 = f1
            no_improve = 0
            best_state = model.state_dict()
            if save_path is not None:
                torch.save(best_state, save_path)
        else:
            no_improve += 1
            if no_improve >= patience:
                print("Early stopping.")
                break

        if trial is not None:
            trial.report(best_f1, epoch)
            if trial.should_prune():
                raise optuna.exceptions.TrialPruned()

    return best_f1, best_state

Função objetivo do optuna

In [14]:
def objective(trial):
    model = create_model(trial)
    f1, _ = train_and_evaluate(model, trial=trial, save_path=None)
    return f1

Rodar optuna

In [17]:
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=N_TRIALS, n_jobs=-1, show_progress_bar=True)

print("Melhor trial:", study.best_trial.params)
print("Melhor F1 obtido (val):", study.best_value)

Coletando melhor modelo do estudo de hiperparâmetros

In [16]:
best_params = study.best_trial.params
class Dummy:
    def __init__(self, params):
        self.params = params
    def suggest_float(self, *args, **kwargs):
        return self.params.get("lr", 1e-4)
    def suggest_categorical(self, name, choices):
        return self.params.get("model_type", choices[0])

dummy_trial = Dummy(best_params)
final_model = create_model(dummy_trial)
best_model_path = "best_model_final.pth"
best_f1, best_state = train_and_evaluate(final_model, trial=None, save_path=best_model_path)
print("Final F1 (val):", best_f1)
print("Best model saved to:", best_model_path)
Epoch 1/20 - loss: 0.5652 - F1: 0.4317 - P: 0.3053 - R: 0.7365
Epoch 2/20 - loss: 0.4471 - F1: 0.5226 - P: 0.4029 - R: 0.7432
Epoch 3/20 - loss: 0.3927 - F1: 0.4959 - P: 0.4155 - R: 0.6149
Epoch 4/20 - loss: 0.3358 - F1: 0.4978 - P: 0.3709 - R: 0.7568
Epoch 5/20 - loss: 0.2652 - F1: 0.5101 - P: 0.5067 - R: 0.5135
Early stopping.
Final F1 (val): 0.5225653206650831
Best model saved to: best_model_final.pth

Avaliação “final”

In [17]:
if best_state is not None:
    final_model.load_state_dict(best_state)

final_model.eval()
preds = []
trues = []
with torch.no_grad():
    for imgs, labels in val_loader:
        imgs = imgs.to(device)
        labels = labels.to(device)
        logits = final_model(imgs).squeeze(1)
        probs = torch.sigmoid(logits)
        pred_bin = (probs > 0.5).long().cpu().numpy()
        preds.extend(pred_bin.tolist())
        trues.extend(labels.cpu().numpy().tolist())

print("Precision:", precision_score(trues, preds, zero_division=0))
print("Recall:", recall_score(trues, preds, zero_division=0))
print("F1 Score:", f1_score(trues, preds, zero_division=0))
print("Confusion Matrix:\n", confusion_matrix(trues, preds))
Precision: 0.5066666666666667
Recall: 0.5135135135135135
F1 Score: 0.5100671140939598
Confusion Matrix:
 [[578  74]
 [ 72  76]]

Utilizando agora o modelo selecionado para classificar as imagens no conjunto de teste

In [24]:
TEST_ROOT = '/content/drive/MyDrive/colab_data/rna1/lista5/Teste'

def collect_image_files(root):
    exts = ("*.bmp", "*.BMP")
    files = []
    for e in exts:
        files.extend(glob.glob(os.path.join(root, e)))
    files = sorted(files)
    return files

test_files = collect_image_files(TEST_ROOT)

if best_state is None and os.path.exists("best_model_final.pth"):
    final_model.load_state_dict(torch.load("best_model_final.pth", map_location=device))
elif best_state is not None:
    final_model.load_state_dict(best_state)

class TestDataset(Dataset):
    def __init__(self, files_list, transform=None):
        self.files = files_list
        self.transform = transform

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        path = self.files[idx]
        img = Image.open(path)
        file_id = os.path.basename(path).split('.')[0]
        if self.transform:
            img = self.transform(img)
        return img, file_id

test_dataset = TestDataset(test_files, transform=val_tf)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)

final_model.eval()
final_model.to(device)

predictions = []

with torch.no_grad():
    for imgs, file_ids in tqdm(test_loader, desc="Classificando imagens de teste"):
        imgs = imgs.to(device)
        logits = final_model(imgs).squeeze(1)
        probs = torch.sigmoid(logits)
        pred_bin = (probs > 0.5).long().cpu().numpy()

        for file_id, pred in zip(file_ids, pred_bin):
            label_str = "M" if pred == 0 else "F"
            predictions.append([file_id, label_str])

df_predictions = pd.DataFrame(predictions, columns=["ID", "PREDIT"])
Classificando imagens de teste: 100%|██████████| 63/63 [00:39<00:00,  1.59it/s]

Salvando os resultados em CSV

In [25]:
output_csv_path = "/content/drive/MyDrive/colab_data/rna1/lista5/test_predictions.csv"
df_predictions.to_csv(output_csv_path, index=False)