import random
import numpy as np
import torch
import os
import cv2
import pandas as pd
import albumentations as A
from pathlib import Path
from sklearn.model_selection import train_test_split, StratifiedKFold
from import Dataset, DataLoader
from sklearn.metrics import f1_score
import timm
import torch.nn as nn
from torch.optim import AdamW
from torch.optim.lr_scheduler import CosineAnnealingLR
# Check if a GPU (CUDA) is available, and set the device accordingly
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Define configurations
class Config:
seed = 42
model_name = "swsl_resnext50_32x4d"
epoch_size = 30
batch_size = 48
learning_rate = 1e-4
early_stop = 5
k_fold_num = 5
# Set random seeds
def set_random_seeds(seed):
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
# Data preprocessing functions
def img_gather_(img_path):
class_encoder = {
'dog': 0,
'elephant': 1,
'giraffe': 2,
'guitar': 3,
'horse': 4,
'house': 5,
'person': 6
file_lists = []
label_lists = []
for class_name in os.listdir(img_path):
class_dir = os.path.join(img_path, class_name)
file_list = [os.path.join(class_dir, file) for file in os.listdir(class_dir)]
label_list = [class_encoder[class_name]] * len(file_list)
return np.array(file_lists), np.array(label_lists)
class TrainDataset(Dataset):
def __init__(self, file_lists, label_lists, transforms=None):
self.file_lists = file_lists.copy()
self.label_lists = label_lists.copy()
self.transforms = transforms
def __getitem__(self, idx):
img = cv2.imread(self.file_lists[idx], cv2.IMREAD_COLOR)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
if self.transforms:
img = self.transforms(image=img)["image"]
img = img.transpose(2, 0, 1)
return torch.tensor(img, dtype=torch.float), torch.tensor(self.label_lists[idx], dtype=torch.long)
def __len__(self):
assert len(self.file_lists) == len(self.label_lists)
return len(self.file_lists)
class TestDataset(Dataset):
def __init__(self, file_lists, transforms=None):
self.file_lists = file_lists.copy()
self.transforms = transforms
def __getitem__(self, idx):
img = cv2.imread(self.file_lists[idx], cv2.IMREAD_COLOR)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
if self.transforms:
img = self.transforms(image=img)["image"]
img = img.transpose(2, 0, 1)
return torch.tensor(img, dtype=torch.float)
def __len__(self):
return len(self.file_lists)
# Create and return the model
def create_model():
model = timm.create_model(Config.model_name, pretrained=True, num_classes=7)
# Modify model architecture if needed
# Create optimizer and scheduler
def create_optimizer_scheduler(model):
feature_extractor = [param for name, param in model.named_parameters() if "fc" not in name]
classifier = [param for name, param in model.fc.parameters()]
params = [
{"params": feature_extractor, "lr": Config.learning_rate * 0.5},
{"params": classifier, "lr": Config.learning_rate}
optimizer = AdamW(params, lr=Config.learning_rate)
scheduler = CosineAnnealingLR(optimizer, T_max=10, eta_min=0)
return optimizer, scheduler
# Create loss function
def create_loss_function():
class_num = [329, 205, 235, 134, 151, 245, 399]
class_weight = torch.tensor(np.max(class_num) / class_num).to(device=device, dtype=torch.float)
criterion = nn.CrossEntropyLoss(weight=class_weight)
return criterion
# Train one epoch
def train_step(model, data_loader, optimizer, criterion, epoch_idx):
for iter_idx, (train_imgs, train_labels) in enumerate(data_loader["train_loader"], 1):
train_imgs, train_labels =, dtype=torch.float),
train_pred = model(train_imgs)
train_loss = criterion(train_pred, train_labels)
f"[Epoch {epoch_idx}/{Config.epoch_size}] model training iteration {iter_idx}/{len(data_loader['train_loader'])}",
# Validation function
def validate(model, valid_loader, criterion):
valid_loss = []
valid_acc = []
valid_f1 = []
with torch.no_grad():
for iter_idx, (valid_imgs, valid_labels) in enumerate(valid_loader, 1):
valid_imgs, valid_labels =, dtype=torch.float),
valid_pred = model(valid_imgs)
loss = criterion(valid_pred, valid_labels)
valid_pred_c = valid_pred.argmax(dim=-1)
valid_acc.extend((valid_pred_c == valid_labels).cpu().tolist())
f1 = f1_score(y_true=valid_labels.cpu().numpy(), y_pred=valid_pred_c.cpu().numpy(), average="macro")
print(f"[Validation] iteration {iter_idx}/{len(valid_loader)}", end="\r")
valid_loss = np.mean(valid_loss)
valid_acc = np.mean(valid_acc) * 100
valid_f1 = np.mean(valid_f1)
print(f"Validation loss: {valid_loss:.4f} | Validation acc: {valid_acc:.2f}% | Validation f1 score: {valid_f1:.4f}")
return valid_loss, valid_acc, valid_f1
# Main training function
def train(data_loader):
model = create_model().to(device)
optimizer, scheduler = create_optimizer_scheduler(model)
criterion = create_loss_function()
best_model_state = None
best_f1 = 0
early_stop_count = 0
for epoch_idx in range(1, Config.epoch_size + 1):
train_step(model, data_loader, optimizer, criterion, epoch_idx)
valid_loss, valid_acc, valid_f1 = validate(model, data_loader["valid_loader"], criterion)
if valid_f1 > best_f1:
best_f1 = valid_f1
best_model_state = model.state_dict()
early_stop_count = 0
early_stop_count += 1
if early_stop_count == Config.early_stop:
print("Early stopped." + " " * 30)
return best_model_state
# Main training and inference flow
if __name__ == "__main__":
data_lists, data_labels = img_gather_("./data/train")
best_models = []
if Config.k_fold_num == -1:
train_lists, valid_lists, train_labels, valid_labels = train_test_split(
data_lists, data_labels, train_size=0.8, shuffle=True, random_state=Config.seed, stratify=data_labels
train_transforms = A.Compose([
A.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
valid_transforms = A.Compose([A.Normalize()])
train_dataset = TrainDataset(file_lists=train_lists, label_lists=train_labels, transforms=train_transforms)
valid_dataset = TrainDataset(file_lists=valid_lists, label_lists=valid_labels, transforms=valid_transforms)
train_loader = DataLoader(train_dataset, batch_size=Config.batch_size, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=Config.batch_size, shuffle=True)
data_loader = {"train_loader": train_loader, "valid_loader": valid_loader}
print("No fold training starts ... ")
best_model = train(data_loader)
skf = StratifiedKFold(n_splits=Config.k_fold_num, random_state=Config.seed, shuffle=True)
print(f"{Config.k_fold_num} fold training starts ... ")
for fold_idx, (train_idx, valid_idx) in enumerate(skf.split(data_lists, data_labels), 1):
train_lists, train_labels = data_lists[train_idx], data_labels[train_idx]
valid_lists, valid_labels = data_lists[valid_idx], data_labels[valid_idx]
train_transforms = A.Compose([
A.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
valid_transforms = A.Compose([A.Normalize()])
train_dataset = TrainDataset(file_lists=train_lists, label_lists=train_labels, transforms=train_transforms)
valid_dataset = TrainDataset(file_lists=valid_lists, label_lists=valid_labels, transforms=valid_transforms)
train_loader = DataLoader(train_dataset, batch_size=Config.batch_size, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=Config.batch_size, shuffle=True)
data_loader = {"train_loader": train_loader, "valid_loader": valid_loader}
print(f"- {fold_idx} fold -")
best_model = train(data_loader)
test_transforms = A.Compose([A.Normalize()])
test_files = sorted(Path("./data/test/0").glob("*"))
test_dataset = TestDataset(file_lists=test_files, transforms=test_transforms)
test_loader = DataLoader(test_dataset, batch_size=Config.batch_size, shuffle=False)
answer_logits = []
model = create_model().to(device)
for fold_idx, best_model_state in enumerate(best_models, 1):
fold_logits = []
with torch.no_grad():
for iter_idx, test_imgs in enumerate(test_loader, 1):
test_imgs =
test_pred = model(test_imgs)
print(f"[{fold_idx} fold] inference iteration {iter_idx}/{len(test_loader)}", end="\r")
answer_logits = np.mean(answer_logits, axis=0)
answer_value = np.argmax(answer_logits, axis=-1)
i = 0
while True:
submission_path = f"submissions/submission_{i}.csv"
if not Path(submission_path).is_file():
i += 1
submission = pd.read_csv("test_answer_sample_.csv", index_col=False)
submission["answer value"] = answer_value
submission["answer value"].to_csv(submission_path, index=False)
print("\nAll done.")
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.models as models
import torch.nn.functional as F
from torchvision import transforms, datasets
# Constants
USE_CUDA = torch.cuda.is_available()
DEVICE = torch.device("cuda" if USE_CUDA else "cpu")
MEAN = [0.5, 0.5, 0.5]
STD = [0.5, 0.5, 0.5]
print(f'Using Pytorch version: {torch.__version__}, Device: {DEVICE}')
def denormalize(tensor):
mean_tensor = torch.tensor(MEAN).view(-1, 1, 1)
std_tensor = torch.tensor(STD).view(-1, 1, 1)
return tensor * std_tensor + mean_tensor
def get_dataloaders(batch_size):
data_transform = {
'train': transforms.Compose([
transforms.Normalize(MEAN, STD)
'val': transforms.Compose([
transforms.Normalize(MEAN, STD)
image_datasets = {x: datasets.ImageFolder("./hymenoptera_data", data_transform[x]) for x in ['train', 'val']}
return {x:[x], batch_size=batch_size, num_workers=0, shuffle=True) for x in ['train', 'val']}
def display_images(dataloader):
for (X_train, y_train) in dataloader:
print(f'X_train: {X_train.size()}, type: {X_train.type()}')
print(f'y_train: {y_train.size()}, type: {y_train.type()}')
plt.figure(figsize=(10, 1))
for i in range(10):
plt.subplot(1, 10, i + 1)
image_to_display = denormalize(X_train[i]).detach().cpu().numpy()
plt.imshow(np.transpose(image_to_display, (1, 2, 0)))
plt.title(f'Class: {y_train[i].item()}')
def train_and_evaluate(model, train_loader, val_loader, optimizer, epochs, save_path="best_model_weights.pth"):
best_val_loss = float("inf") # initialize with a high value
for epoch in range(1, epochs + 1):
# Training loop
for batch_idx, (data, target) in enumerate(train_loader):
data, target =,
output = model(data)
loss = F.cross_entropy(output, target)
if batch_idx % 10 == 0:
print(f"Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)} ({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}")
# Evaluation loop
test_loss, correct = 0, 0
with torch.no_grad():
for data, target in val_loader:
data, target =,
output = model(data)
test_loss += F.cross_entropy(output, target, reduction="sum").item()
prediction = output.argmax(dim=1)
correct += prediction.eq(target).sum().item()
test_loss /= len(val_loader.dataset)
test_accuracy = 100. * correct / len(val_loader.dataset)
print(f"[{epoch}] Test Loss: {test_loss:.4f}, accuracy: {test_accuracy:.2f}%\n")
# Save the model weights if this epoch has the best validation loss so far
if test_loss < best_val_loss:
best_val_loss = test_loss, save_path)
print(f"Model weights saved to {save_path} with validation loss: {best_val_loss:.4f}")
def main():
dataloaders = get_dataloaders(BATCH_SIZE)
model = models.resnet18(pretrained=True)
model.fc = nn.Linear(model.fc.in_features, 2) # Modify the final FC layer
model =
optimizer = optim.Adam(model.parameters(), lr=0.0001)
train_and_evaluate(model, dataloaders["train"], dataloaders["val"], optimizer, EPOCHS)
if __name__ == "__main__":
import os
import json
class CustomerManager:
def __init__(self, input_file_path, output_file_path):
self.input_file_path = input_file_path
self.output_file_path = output_file_path
if not os.path.exists(output_file_path):
with open(os.path.join(input_file_path, 'customer.json'), 'r', encoding='utf-8') as file:
self.customers = json.load(file)
def count_total_customers(self, output_file='problem_1.json'):
total_customers = len(self.customers)
print(f"전체 고객의 수: {total_customers}")
self._write_to_json({'total': total_customers}, output_file)
def check_status_customers(self, output_file='problem_2.json'):
dormant_customers = [customer['customer_id'] for customer in self.customers if customer['status'] == 'dormant']
print("휴먼 상태의 고객 ID 리스트:")
for customer_id in dormant_customers:
self._write_to_json(dormant_customers, output_file)
def _write_to_json(self, data, file_name):
with open(os.path.join(self.output_file_path, file_name), 'w', encoding='utf-8') as file:
json.dump(data, file, ensure_ascii=False, indent=4)
if __name__ == '__main__':
manager = CustomerManager(input_file_path='./data/input', output_file_path='./data/output')
Save the model object into a file for restoration/de-serialization
Pytorch provides two main ways of doing this:
- The less recommended way is to save the entire model object as follows:, PATH_TO_MODEL)
And then, the saved model can be later read as follows:
model = torch.load(PATH_TO_MODEL)
Although this approach looks the most straightforward, this can be problematic in some cases. This is because we are not only saving the model parameters, but also the model classes and directory structure used in our source code. If our class signatures or directory structures change later, loading the model will fail in potentially unfixable ways.
- The second and more recommended way is to only save the model parameters as follows:, PATH_TO_MODEL)
Later, when we need to restore the model, first we instantiate an empty model object and then load the model parameters into that model object as follows:
model = Net()
We will use the morte recommended way to save the model as shown in the following code:
PATH_TO_MODEL = "./convnet.pth", PATH_TO_MODEL)
The convnet.pth file is essentially a pickle file containing model parameters.
※ reference: Mastering Pytorch
define colors using random tuples
COLORS = np.random.randint(0, 255, size=(len(classes), 3), dtype="uint8")
# draw loop
color = [int(c) for c in COLORS[idx]]
※ reference: pytorch computer vision codebook
define the optimizer and the learning rate schedule
define an Adam optmizer object with a learning rate of 1e-4:
from torch import optim
opt = optim.Adam(model_resnet18.parameters(), lr=1e-4)
we can read the current value of the learning rate using the following function:
def get_lr(opt):
for param_group in opt.param_groups:
return param_group['lr']
current_lr = get_lr(opt)
print('current lr = {}'.format(current_lr)
define a learning scheduler using the CosineAnnealingLR method:
from torch.optim.lr_scheduler import CosineAnnealingLR
lr_schedular = CosineAnnealingLR(opt, T_max=2, eta_min=1e-5)
※ reference: pytorch computer vision codebook
visualize the filters of the first CNN layer
let's get the weight of the first layer:
for w in model_resnet18.parameters():
w =
then, normalize the weights:
min_w = torch.min(w)
w1 = (-1 / (2 * min_w)) * w + 0.5
print(torch.min(w1).item(), torch.max(w1).item())
next, make a grid and display it:
grid_size = len(w1)
x_grid = [w1[i] for i in range(grid_size)]
x_grid = utils.make_grid(x_grid, nrow=8, padding=1)
plt.figure(figsize=(10, 10))
※ reference: pytorch computer vision codebook
store best weights
# a deep copy of weights for the best performing model
best_model_wts = copy.deepcopy(model.state_dict())
# initialize best loss to a large value
# main loop
# store best model
if val_loss < best_loss:
best_loss = val loss
best_model_wts = copy.deepcopy(model.state_dict())
# store weights into a local file, path2weights)
print("Copied best model weights!")
Storing and loading models
Once training is complete, we'll want to store the trained parameters in a file for deployment and future use.
There are two ways of doing so.
Let's look at the first method:
1. First, we will store the model parameters or state_dict in a file:
# define path2weights
# store state_dict to file, path2weights)
2. To load the model parameters from the file, we will define an object of the Net class:
# define model: weights are randomly inintiated
_model = Net()
3. Then, we will load state_dict from the file:
4. Next, we will set state_dict to the model:
In the first method, we stored state_dict or model parameters only.
Whenever we need the trained model for deployment, we have to create an object of the model, then load the parameters from the file, and then set the parameters to the model.
This is the recommended method by PyTorch creators.
Let's look at the second method:
1. First, we will store the model in a file:
# define a path2model
# store model and weights into a file, path2model)
2. To load the model parameters from the file, we will define an object of the Net class:
#define model: weights are randomly initiated
_model = Net()
3. Then, we will load the model from the local file:
In second method, we stored the model into a file.
In other words, we stored both the model and state_dict into one file.
Whenever we need the trained model for depolyment, we need to create an object of the Net class.
Then, we loaded the model from the file.
So, there is no actual benefit of doing this compared to the previous method.
※ reference: pytorch computer vision code book
model summary
use torchsummary package to get summary of the model to see the output shape and the number of parameters in each layer.
1. install the torchsummary package:
pip install torchsummary
2. let's get the model summary using torchsummary:
from torchsummary import summary
summary(model, input_size=(channels, H, W))
Layer (type) Output Shape Param #
Conv2d-1 [-1, 10, 24, 24] 260
Conv2d-2 [-1, 20, 8, 8] 5,020
Dropout2d-3 [-1, 20, 8, 8] 0
Linear-4 [-1, 50] 16,050
Linear-5 [-1, 10] 510
Total params: 21,840
Trainable params: 21,840
Non-trainable params: 0
Input size (MB): 0.00
Forward/backward pass size (MB): 0.06
Params size (MB): 0.08
Estimated Total Size (MB): 0.15
※ reference:
