import random
import numpy as np
import torch
import os
import cv2
import pandas as pd
import albumentations as A
from pathlib import Path
from sklearn.model_selection import train_test_split, StratifiedKFold
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import f1_score
import timm
import torch.nn as nn
from torch.optim import AdamW
from torch.optim.lr_scheduler import CosineAnnealingLR

# Check if a GPU (CUDA) is available, and set the device accordingly
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


# Define configurations
class Config:
    seed = 42
    model_name = "swsl_resnext50_32x4d"
    epoch_size = 30
    batch_size = 48
    learning_rate = 1e-4
    early_stop = 5
    k_fold_num = 5


# Set random seeds
def set_random_seeds(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False


# Data preprocessing functions
def img_gather_(img_path):
    class_encoder = {
        'dog': 0,
        'elephant': 1,
        'giraffe': 2,
        'guitar': 3,
        'horse': 4,
        'house': 5,
        'person': 6
    }

    file_lists = []
    label_lists = []

    for class_name in os.listdir(img_path):
        class_dir = os.path.join(img_path, class_name)
        file_list = [os.path.join(class_dir, file) for file in os.listdir(class_dir)]
        label_list = [class_encoder[class_name]] * len(file_list)

        file_lists.extend(file_list)
        label_lists.extend(label_list)

    return np.array(file_lists), np.array(label_lists)


class TrainDataset(Dataset):
    def __init__(self, file_lists, label_lists, transforms=None):
        self.file_lists = file_lists.copy()
        self.label_lists = label_lists.copy()
        self.transforms = transforms

    def __getitem__(self, idx):
        img = cv2.imread(self.file_lists[idx], cv2.IMREAD_COLOR)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        if self.transforms:
            img = self.transforms(image=img)["image"]

        img = img.transpose(2, 0, 1)

        return torch.tensor(img, dtype=torch.float), torch.tensor(self.label_lists[idx], dtype=torch.long)

    def __len__(self):
        assert len(self.file_lists) == len(self.label_lists)
        return len(self.file_lists)


class TestDataset(Dataset):
    def __init__(self, file_lists, transforms=None):
        self.file_lists = file_lists.copy()
        self.transforms = transforms

    def __getitem__(self, idx):
        img = cv2.imread(self.file_lists[idx], cv2.IMREAD_COLOR)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        if self.transforms:
            img = self.transforms(image=img)["image"]

        img = img.transpose(2, 0, 1)

        return torch.tensor(img, dtype=torch.float)

    def __len__(self):
        return len(self.file_lists)


# Create and return the model
def create_model():
    model = timm.create_model(Config.model_name, pretrained=True, num_classes=7)
    # Modify model architecture if needed
    return model.to(device)


# Create optimizer and scheduler
def create_optimizer_scheduler(model):
    feature_extractor = [param for name, param in model.named_parameters() if "fc" not in name]
    classifier = [param for name, param in model.fc.parameters()]
    params = [
        {"params": feature_extractor, "lr": Config.learning_rate * 0.5},
        {"params": classifier, "lr": Config.learning_rate}
    ]
    optimizer = AdamW(params, lr=Config.learning_rate)
    scheduler = CosineAnnealingLR(optimizer, T_max=10, eta_min=0)
    return optimizer, scheduler


# Create loss function
def create_loss_function():
    class_num = [329, 205, 235, 134, 151, 245, 399]
    class_weight = torch.tensor(np.max(class_num) / class_num).to(device=device, dtype=torch.float)
    criterion = nn.CrossEntropyLoss(weight=class_weight)
    return criterion


# Train one epoch
def train_step(model, data_loader, optimizer, criterion, epoch_idx):
    model.train()
    for iter_idx, (train_imgs, train_labels) in enumerate(data_loader["train_loader"], 1):
        train_imgs, train_labels = train_imgs.to(device=device, dtype=torch.float), train_labels.to(device)
        optimizer.zero_grad()
        train_pred = model(train_imgs)
        train_loss = criterion(train_pred, train_labels)
        train_loss.backward()
        optimizer.step()

        print(
            f"[Epoch {epoch_idx}/{Config.epoch_size}] model training iteration {iter_idx}/{len(data_loader['train_loader'])}",
            end="\r")


# Validation function
def validate(model, valid_loader, criterion):
    model.eval()
    valid_loss = []
    valid_acc = []
    valid_f1 = []
    with torch.no_grad():
        for iter_idx, (valid_imgs, valid_labels) in enumerate(valid_loader, 1):
            valid_imgs, valid_labels = valid_imgs.to(device=device, dtype=torch.float), valid_labels.to(device)
            valid_pred = model(valid_imgs)
            loss = criterion(valid_pred, valid_labels)
            valid_loss.append(loss.cpu().item())
            valid_pred_c = valid_pred.argmax(dim=-1)
            valid_acc.extend((valid_pred_c == valid_labels).cpu().tolist())
            f1 = f1_score(y_true=valid_labels.cpu().numpy(), y_pred=valid_pred_c.cpu().numpy(), average="macro")
            valid_f1.append(f1)

            print(f"[Validation] iteration {iter_idx}/{len(valid_loader)}", end="\r")

    valid_loss = np.mean(valid_loss)
    valid_acc = np.mean(valid_acc) * 100
    valid_f1 = np.mean(valid_f1)
    print(f"Validation loss: {valid_loss:.4f} | Validation acc: {valid_acc:.2f}% | Validation f1 score: {valid_f1:.4f}")
    return valid_loss, valid_acc, valid_f1


# Main training function
def train(data_loader):
    model = create_model().to(device)
    optimizer, scheduler = create_optimizer_scheduler(model)
    criterion = create_loss_function()

    best_model_state = None
    best_f1 = 0
    early_stop_count = 0

    for epoch_idx in range(1, Config.epoch_size + 1):
        train_step(model, data_loader, optimizer, criterion, epoch_idx)
        valid_loss, valid_acc, valid_f1 = validate(model, data_loader["valid_loader"], criterion)
        scheduler.step(valid_loss)

        if valid_f1 > best_f1:
            best_f1 = valid_f1
            best_model_state = model.state_dict()
            early_stop_count = 0
        else:
            early_stop_count += 1

        if early_stop_count == Config.early_stop:
            print("Early stopped." + " " * 30)
            break

    return best_model_state


# Main training and inference flow
if __name__ == "__main__":
    set_random_seeds(Config.seed)

    data_lists, data_labels = img_gather_("./data/train")
    best_models = []

    if Config.k_fold_num == -1:
        train_lists, valid_lists, train_labels, valid_labels = train_test_split(
            data_lists, data_labels, train_size=0.8, shuffle=True, random_state=Config.seed, stratify=data_labels
        )
        train_transforms = A.Compose([
            A.Rotate(),
            A.HorizontalFlip(),
            A.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
            A.Normalize()
        ])
        valid_transforms = A.Compose([A.Normalize()])
        train_dataset = TrainDataset(file_lists=train_lists, label_lists=train_labels, transforms=train_transforms)
        valid_dataset = TrainDataset(file_lists=valid_lists, label_lists=valid_labels, transforms=valid_transforms)
        train_loader = DataLoader(train_dataset, batch_size=Config.batch_size, shuffle=True)
        valid_loader = DataLoader(valid_dataset, batch_size=Config.batch_size, shuffle=True)
        data_loader = {"train_loader": train_loader, "valid_loader": valid_loader}
        print("No fold training starts ... ")
        best_model = train(data_loader)
        best_models.append(best_model)
    else:
        skf = StratifiedKFold(n_splits=Config.k_fold_num, random_state=Config.seed, shuffle=True)
        print(f"{Config.k_fold_num} fold training starts ... ")
        for fold_idx, (train_idx, valid_idx) in enumerate(skf.split(data_lists, data_labels), 1):
            train_lists, train_labels = data_lists[train_idx], data_labels[train_idx]
            valid_lists, valid_labels = data_lists[valid_idx], data_labels[valid_idx]
            train_transforms = A.Compose([
                A.Rotate(),
                A.HorizontalFlip(),
                A.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
                A.Normalize()
            ])
            valid_transforms = A.Compose([A.Normalize()])
            train_dataset = TrainDataset(file_lists=train_lists, label_lists=train_labels, transforms=train_transforms)
            valid_dataset = TrainDataset(file_lists=valid_lists, label_lists=valid_labels, transforms=valid_transforms)
            train_loader = DataLoader(train_dataset, batch_size=Config.batch_size, shuffle=True)
            valid_loader = DataLoader(valid_dataset, batch_size=Config.batch_size, shuffle=True)
            data_loader = {"train_loader": train_loader, "valid_loader": valid_loader}
            print(f"- {fold_idx} fold -")
            best_model = train(data_loader)
            best_models.append(best_model)

    test_transforms = A.Compose([A.Normalize()])
    test_files = sorted(Path("./data/test/0").glob("*"))
    test_dataset = TestDataset(file_lists=test_files, transforms=test_transforms)
    test_loader = DataLoader(test_dataset, batch_size=Config.batch_size, shuffle=False)

    answer_logits = []

    model = create_model().to(device)

    for fold_idx, best_model_state in enumerate(best_models, 1):
        model.load_state_dict(best_model_state)
        model.eval()
        fold_logits = []
        with torch.no_grad():
            for iter_idx, test_imgs in enumerate(test_loader, 1):
                test_imgs = test_imgs.to(device)
                test_pred = model(test_imgs)
                fold_logits.extend(test_pred.cpu().tolist())
                print(f"[{fold_idx} fold] inference iteration {iter_idx}/{len(test_loader)}", end="\r")
        answer_logits.append(fold_logits)

    answer_logits = np.mean(answer_logits, axis=0)
    answer_value = np.argmax(answer_logits, axis=-1)

    i = 0
    while True:
        submission_path = f"submissions/submission_{i}.csv"
        if not Path(submission_path).is_file():
            break
        i += 1

    submission = pd.read_csv("test_answer_sample_.csv", index_col=False)
    submission["answer value"] = answer_value
    submission["answer value"].to_csv(submission_path, index=False)
    print("\nAll done.")

'코드테스트' 카테고리의 다른 글

파이토치_1  (1) 2023.10.31
프로그래머스_1  (0) 2023.10.26
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.models as models
import torch.nn.functional as F
from torchvision import transforms, datasets

# Constants
USE_CUDA = torch.cuda.is_available()
DEVICE = torch.device("cuda" if USE_CUDA else "cpu")
BATCH_SIZE = 32
EPOCHS = 10
MEAN = [0.5, 0.5, 0.5]
STD = [0.5, 0.5, 0.5]

print(f'Using Pytorch version: {torch.__version__}, Device: {DEVICE}')


def denormalize(tensor):
    mean_tensor = torch.tensor(MEAN).view(-1, 1, 1)
    std_tensor = torch.tensor(STD).view(-1, 1, 1)
    return tensor * std_tensor + mean_tensor


def get_dataloaders(batch_size):
    data_transform = {
        'train': transforms.Compose([
            transforms.RandomResizedCrop(224),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Normalize(MEAN, STD)
        ]),
        'val': transforms.Compose([
            transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize(MEAN, STD)
        ])
    }

    image_datasets = {x: datasets.ImageFolder("./hymenoptera_data", data_transform[x]) for x in ['train', 'val']}
    return {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=batch_size, num_workers=0, shuffle=True) for x in ['train', 'val']}


def display_images(dataloader):
    for (X_train, y_train) in dataloader:
        print(f'X_train: {X_train.size()}, type: {X_train.type()}')
        print(f'y_train: {y_train.size()}, type: {y_train.type()}')
        break

    plt.figure(figsize=(10, 1))
    for i in range(10):
        plt.subplot(1, 10, i + 1)
        plt.axis('off')
        image_to_display = denormalize(X_train[i]).detach().cpu().numpy()
        plt.imshow(np.transpose(image_to_display, (1, 2, 0)))
        plt.title(f'Class: {y_train[i].item()}')
    plt.show()


def train_and_evaluate(model, train_loader, val_loader, optimizer, epochs, save_path="best_model_weights.pth"):
    best_val_loss = float("inf")  # initialize with a high value

    for epoch in range(1, epochs + 1):
        # Training loop
        model.train()
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(DEVICE), target.to(DEVICE)
            optimizer.zero_grad()
            output = model(data)
            loss = F.cross_entropy(output, target)
            loss.backward()
            optimizer.step()

            if batch_idx % 10 == 0:
                print(f"Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)} ({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}")

        # Evaluation loop
        model.eval()
        test_loss, correct = 0, 0
        with torch.no_grad():
            for data, target in val_loader:
                data, target = data.to(DEVICE), target.to(DEVICE)
                output = model(data)
                test_loss += F.cross_entropy(output, target, reduction="sum").item()
                prediction = output.argmax(dim=1)
                correct += prediction.eq(target).sum().item()

        test_loss /= len(val_loader.dataset)
        test_accuracy = 100. * correct / len(val_loader.dataset)
        print(f"[{epoch}] Test Loss: {test_loss:.4f}, accuracy: {test_accuracy:.2f}%\n")

        # Save the model weights if this epoch has the best validation loss so far
        if test_loss < best_val_loss:
            best_val_loss = test_loss
            torch.save(model.state_dict(), save_path)
            print(f"Model weights saved to {save_path} with validation loss: {best_val_loss:.4f}")


def main():
    dataloaders = get_dataloaders(BATCH_SIZE)
    display_images(dataloaders['train'])

    model = models.resnet18(pretrained=True)
    model.fc = nn.Linear(model.fc.in_features, 2)  # Modify the final FC layer
    model = model.to(DEVICE)

    optimizer = optim.Adam(model.parameters(), lr=0.0001)
    train_and_evaluate(model, dataloaders["train"], dataloaders["val"], optimizer, EPOCHS)


if __name__ == "__main__":
    main()

'코드테스트' 카테고리의 다른 글

파이토치_2  (1) 2023.10.31
프로그래머스_1  (0) 2023.10.26
## 문제

### 요청 사항 1. 전체 고객 수

---

#### ✅   구현 조건
- 고객 정보에 존재 하는 전체 고객의 수를 확인해 주세요.
#### ✅   정답 예시

```json
// problem_1.json
{
  "total": 100
}
```

### 요청 사항 2. 휴면 고객 리스트

---

#### ✅   구현 조건
- 현재 고객 상태(status)가 휴면(dormant)인 고객을 대상으로 이벤트를 기획하고 있습니다. 휴면 고객 ID 리스트를 출력해 주세요.

#### ✅   제약 사항
- 고객 ID를 기준으로 오름차순 정렬하여 출력합니다.

#### ✅   정답 예시

```json
// problem_2.json
[
    100,
    101,
    104,
    106,
    110,
    ...
]
```

 

 

import os
import json

class CustomerManager:

    def __init__(self, input_file_path, output_file_path):
        self.input_file_path = input_file_path
        self.output_file_path = output_file_path
        if not os.path.exists(output_file_path):
            os.makedirs(output_file_path)
        with open(os.path.join(input_file_path, 'customer.json'), 'r', encoding='utf-8') as file:
            self.customers = json.load(file)

    def count_total_customers(self, output_file='problem_1.json'):
        total_customers = len(self.customers)
        print(f"전체 고객의 수: {total_customers}")
        self._write_to_json({'total': total_customers}, output_file)

    def check_status_customers(self, output_file='problem_2.json'):
        dormant_customers = [customer['customer_id'] for customer in self.customers if customer['status'] == 'dormant']
        dormant_customers.sort()
        print("휴먼 상태의 고객 ID 리스트:")
        for customer_id in dormant_customers:
            print(customer_id)
        self._write_to_json(dormant_customers, output_file)

    def _write_to_json(self, data, file_name):
        with open(os.path.join(self.output_file_path, file_name), 'w', encoding='utf-8') as file:
            json.dump(data, file, ensure_ascii=False, indent=4)


if __name__ == '__main__':
    manager = CustomerManager(input_file_path='./data/input', output_file_path='./data/output')
    manager.count_total_customers()
    manager.check_status_customers()

 

'코드테스트' 카테고리의 다른 글

파이토치_2  (1) 2023.10.31
파이토치_1  (1) 2023.10.31

Pytorch provides two main ways of doing this:

  • The less recommended way is to save the entire model object as follows:
torch.save(model, PATH_TO_MODEL)

And then, the saved model can be later read as follows:

model = torch.load(PATH_TO_MODEL)

Although this approach looks the most straightforward, this can be problematic in some cases. This is because we are not only saving the model parameters, but also the model classes and directory structure used in our source code. If our class signatures or directory structures change later, loading the model will fail in potentially unfixable ways.

  • The second and more recommended way is to only save the model parameters as follows:
torch.save(model.state_dict(), PATH_TO_MODEL)

Later, when we need to restore the model, first we instantiate an empty model object and then load the model parameters into that model object as follows:

model = Net()
model.load_state_dict(torch.load(PATH_TO_MODEL)

We will use the morte recommended way to save the model as shown in the following code:

PATH_TO_MODEL = "./convnet.pth"
torch.save(model.state_dict(), PATH_TO_MODEL)

The convnet.pth file is essentially a pickle file containing model parameters.

 

※ reference: Mastering Pytorch

'머신러닝 > Pytorch' 카테고리의 다른 글

define colors using random tuples  (0) 2021.01.06
define the optimizer and the learning rate schedule  (0) 2021.01.06
visualize the filters of the first CNN layer  (0) 2021.01.06
store best weights  (0) 2021.01.04
Storing and loading models  (0) 2020.12.31
COLORS = np.random.randint(0, 255, size=(len(classes), 3), dtype="uint8")
# draw loop
	color = [int(c) for c in COLORS[idx]]

※ reference: pytorch computer vision codebook

define an Adam optmizer object with a learning rate of 1e-4:

from torch import optim
opt = optim.Adam(model_resnet18.parameters(), lr=1e-4)

 

we can read the current value of the learning rate using the following function:

def get_lr(opt):
	for param_group in opt.param_groups:
    	return param_group['lr']
        
current_lr = get_lr(opt)
print('current  lr = {}'.format(current_lr)

 

define a learning scheduler using the CosineAnnealingLR method:

from torch.optim.lr_scheduler import CosineAnnealingLR
lr_schedular = CosineAnnealingLR(opt, T_max=2, eta_min=1e-5)

※ reference: pytorch computer vision codebook

let's get the weight of the first layer:

for w in model_resnet18.parameters():
	w = w.data.cpu()
    print(w.shape)
    break

 

then, normalize the weights:

min_w = torch.min(w)
w1 = (-1 / (2 * min_w)) * w + 0.5
print(torch.min(w1).item(), torch.max(w1).item())

 

next, make a grid and display it:

grid_size = len(w1)
x_grid = [w1[i] for i in range(grid_size)]
x_grid = utils.make_grid(x_grid, nrow=8, padding=1)
print(x_grid.shape)

plt.figure(figsize=(10, 10))
show(x_grid)

 

※ reference: pytorch computer vision codebook

'머신러닝 > Pytorch' 카테고리의 다른 글

define colors using random tuples  (0) 2021.01.06
define the optimizer and the learning rate schedule  (0) 2021.01.06
store best weights  (0) 2021.01.04
Storing and loading models  (0) 2020.12.31
model summary  (0) 2020.12.31
# a deep copy of weights for the best performing model
best_model_wts = copy.deepcopy(model.state_dict())

# initialize best loss to a large value
best_loss=float('inf')

# main loop
	.
	.
	.
	# store best model
	if val_loss < best_loss:
		best_loss = val loss
    	best_model_wts = copy.deepcopy(model.state_dict())
    	# store weights into a local file
    	torch.save(model.state_dict(), path2weights)
    	print("Copied best model weights!")

 

Once training is complete, we'll want to store the trained parameters in a file for deployment and future use.

 

There are two ways of doing so.

 

Let's look at the first method:

 

1. First, we will store the model parameters or state_dict in a file:

# define path2weights
path2weights="./models/weights.pt"

# store state_dict to file
torch.save(model.state_dict(), path2weights)

2. To load the model parameters from the file, we will define an object of the Net class:

# define model: weights are randomly inintiated
_model = Net()

3. Then, we will load state_dict from the file:

weights=torch.load(path2weights)

4. Next, we will set state_dict to the model:

_model.load_state_dict(weights)

In the first method, we stored state_dict or model parameters only.

 

Whenever we need the trained model for deployment, we have to create an object of the model, then load the parameters from the file, and then set the parameters to the model.

 

This is the recommended method by PyTorch creators.

 

Let's look at the second method:

 

1. First, we will store the model in a file:

# define a path2model
path2model="./models/model.pt"

# store model and weights into a file
torch.save(model, path2model)

2. To load the model parameters from the file, we will define an object of the Net class:

#define model: weights are randomly initiated
_model = Net()

3. Then, we will load the model from the local file:

_model=torch.load(path2model)

In second method, we stored the model into a file.

 

In other words, we stored both the model and state_dict into one file. 

 

Whenever we need the trained model for depolyment, we need to create an object of the Net class. 

 

Then, we loaded the model from the file. 

 

So, there is no actual benefit of doing this compared to the previous method.

 

※ reference: pytorch computer vision code book

use torchsummary package to get summary of the model to see the output shape and the number of parameters in each layer.

 

1. install the torchsummary package:

pip install torchsummary

2. let's get the model summary using torchsummary:

from torchsummary import summary
summary(model, input_size=(channels, H, W))

 

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
================================================================
            Conv2d-1           [-1, 10, 24, 24]             260
            Conv2d-2             [-1, 20, 8, 8]           5,020
         Dropout2d-3             [-1, 20, 8, 8]               0
            Linear-4                   [-1, 50]          16,050
            Linear-5                   [-1, 10]             510
================================================================
Total params: 21,840
Trainable params: 21,840
Non-trainable params: 0
----------------------------------------------------------------
Input size (MB): 0.00
Forward/backward pass size (MB): 0.06
Params size (MB): 0.08
Estimated Total Size (MB): 0.15
----------------------------------------------------------------

 

※ reference: github.com/sksq96/pytorch-summary

+ Recent posts