import random
import numpy as np
import torch
import os
import cv2
import pandas as pd
import albumentations as A
from pathlib import Path
from sklearn.model_selection import train_test_split, StratifiedKFold
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import f1_score
import timm
import torch.nn as nn
from torch.optim import AdamW
from torch.optim.lr_scheduler import CosineAnnealingLR

# Check if a GPU (CUDA) is available, and set the device accordingly
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


# Define configurations
class Config:
    seed = 42
    model_name = "swsl_resnext50_32x4d"
    epoch_size = 30
    batch_size = 48
    learning_rate = 1e-4
    early_stop = 5
    k_fold_num = 5


# Set random seeds
def set_random_seeds(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False


# Data preprocessing functions
def img_gather_(img_path):
    class_encoder = {
        'dog': 0,
        'elephant': 1,
        'giraffe': 2,
        'guitar': 3,
        'horse': 4,
        'house': 5,
        'person': 6
    }

    file_lists = []
    label_lists = []

    for class_name in os.listdir(img_path):
        class_dir = os.path.join(img_path, class_name)
        file_list = [os.path.join(class_dir, file) for file in os.listdir(class_dir)]
        label_list = [class_encoder[class_name]] * len(file_list)

        file_lists.extend(file_list)
        label_lists.extend(label_list)

    return np.array(file_lists), np.array(label_lists)


class TrainDataset(Dataset):
    def __init__(self, file_lists, label_lists, transforms=None):
        self.file_lists = file_lists.copy()
        self.label_lists = label_lists.copy()
        self.transforms = transforms

    def __getitem__(self, idx):
        img = cv2.imread(self.file_lists[idx], cv2.IMREAD_COLOR)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        if self.transforms:
            img = self.transforms(image=img)["image"]

        img = img.transpose(2, 0, 1)

        return torch.tensor(img, dtype=torch.float), torch.tensor(self.label_lists[idx], dtype=torch.long)

    def __len__(self):
        assert len(self.file_lists) == len(self.label_lists)
        return len(self.file_lists)


class TestDataset(Dataset):
    def __init__(self, file_lists, transforms=None):
        self.file_lists = file_lists.copy()
        self.transforms = transforms

    def __getitem__(self, idx):
        img = cv2.imread(self.file_lists[idx], cv2.IMREAD_COLOR)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        if self.transforms:
            img = self.transforms(image=img)["image"]

        img = img.transpose(2, 0, 1)

        return torch.tensor(img, dtype=torch.float)

    def __len__(self):
        return len(self.file_lists)


# Create and return the model
def create_model():
    model = timm.create_model(Config.model_name, pretrained=True, num_classes=7)
    # Modify model architecture if needed
    return model.to(device)


# Create optimizer and scheduler
def create_optimizer_scheduler(model):
    feature_extractor = [param for name, param in model.named_parameters() if "fc" not in name]
    classifier = [param for name, param in model.fc.parameters()]
    params = [
        {"params": feature_extractor, "lr": Config.learning_rate * 0.5},
        {"params": classifier, "lr": Config.learning_rate}
    ]
    optimizer = AdamW(params, lr=Config.learning_rate)
    scheduler = CosineAnnealingLR(optimizer, T_max=10, eta_min=0)
    return optimizer, scheduler


# Create loss function
def create_loss_function():
    class_num = [329, 205, 235, 134, 151, 245, 399]
    class_weight = torch.tensor(np.max(class_num) / class_num).to(device=device, dtype=torch.float)
    criterion = nn.CrossEntropyLoss(weight=class_weight)
    return criterion


# Train one epoch
def train_step(model, data_loader, optimizer, criterion, epoch_idx):
    model.train()
    for iter_idx, (train_imgs, train_labels) in enumerate(data_loader["train_loader"], 1):
        train_imgs, train_labels = train_imgs.to(device=device, dtype=torch.float), train_labels.to(device)
        optimizer.zero_grad()
        train_pred = model(train_imgs)
        train_loss = criterion(train_pred, train_labels)
        train_loss.backward()
        optimizer.step()

        print(
            f"[Epoch {epoch_idx}/{Config.epoch_size}] model training iteration {iter_idx}/{len(data_loader['train_loader'])}",
            end="\r")


# Validation function
def validate(model, valid_loader, criterion):
    model.eval()
    valid_loss = []
    valid_acc = []
    valid_f1 = []
    with torch.no_grad():
        for iter_idx, (valid_imgs, valid_labels) in enumerate(valid_loader, 1):
            valid_imgs, valid_labels = valid_imgs.to(device=device, dtype=torch.float), valid_labels.to(device)
            valid_pred = model(valid_imgs)
            loss = criterion(valid_pred, valid_labels)
            valid_loss.append(loss.cpu().item())
            valid_pred_c = valid_pred.argmax(dim=-1)
            valid_acc.extend((valid_pred_c == valid_labels).cpu().tolist())
            f1 = f1_score(y_true=valid_labels.cpu().numpy(), y_pred=valid_pred_c.cpu().numpy(), average="macro")
            valid_f1.append(f1)

            print(f"[Validation] iteration {iter_idx}/{len(valid_loader)}", end="\r")

    valid_loss = np.mean(valid_loss)
    valid_acc = np.mean(valid_acc) * 100
    valid_f1 = np.mean(valid_f1)
    print(f"Validation loss: {valid_loss:.4f} | Validation acc: {valid_acc:.2f}% | Validation f1 score: {valid_f1:.4f}")
    return valid_loss, valid_acc, valid_f1


# Main training function
def train(data_loader):
    model = create_model().to(device)
    optimizer, scheduler = create_optimizer_scheduler(model)
    criterion = create_loss_function()

    best_model_state = None
    best_f1 = 0
    early_stop_count = 0

    for epoch_idx in range(1, Config.epoch_size + 1):
        train_step(model, data_loader, optimizer, criterion, epoch_idx)
        valid_loss, valid_acc, valid_f1 = validate(model, data_loader["valid_loader"], criterion)
        scheduler.step(valid_loss)

        if valid_f1 > best_f1:
            best_f1 = valid_f1
            best_model_state = model.state_dict()
            early_stop_count = 0
        else:
            early_stop_count += 1

        if early_stop_count == Config.early_stop:
            print("Early stopped." + " " * 30)
            break

    return best_model_state


# Main training and inference flow
if __name__ == "__main__":
    set_random_seeds(Config.seed)

    data_lists, data_labels = img_gather_("./data/train")
    best_models = []

    if Config.k_fold_num == -1:
        train_lists, valid_lists, train_labels, valid_labels = train_test_split(
            data_lists, data_labels, train_size=0.8, shuffle=True, random_state=Config.seed, stratify=data_labels
        )
        train_transforms = A.Compose([
            A.Rotate(),
            A.HorizontalFlip(),
            A.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
            A.Normalize()
        ])
        valid_transforms = A.Compose([A.Normalize()])
        train_dataset = TrainDataset(file_lists=train_lists, label_lists=train_labels, transforms=train_transforms)
        valid_dataset = TrainDataset(file_lists=valid_lists, label_lists=valid_labels, transforms=valid_transforms)
        train_loader = DataLoader(train_dataset, batch_size=Config.batch_size, shuffle=True)
        valid_loader = DataLoader(valid_dataset, batch_size=Config.batch_size, shuffle=True)
        data_loader = {"train_loader": train_loader, "valid_loader": valid_loader}
        print("No fold training starts ... ")
        best_model = train(data_loader)
        best_models.append(best_model)
    else:
        skf = StratifiedKFold(n_splits=Config.k_fold_num, random_state=Config.seed, shuffle=True)
        print(f"{Config.k_fold_num} fold training starts ... ")
        for fold_idx, (train_idx, valid_idx) in enumerate(skf.split(data_lists, data_labels), 1):
            train_lists, train_labels = data_lists[train_idx], data_labels[train_idx]
            valid_lists, valid_labels = data_lists[valid_idx], data_labels[valid_idx]
            train_transforms = A.Compose([
                A.Rotate(),
                A.HorizontalFlip(),
                A.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
                A.Normalize()
            ])
            valid_transforms = A.Compose([A.Normalize()])
            train_dataset = TrainDataset(file_lists=train_lists, label_lists=train_labels, transforms=train_transforms)
            valid_dataset = TrainDataset(file_lists=valid_lists, label_lists=valid_labels, transforms=valid_transforms)
            train_loader = DataLoader(train_dataset, batch_size=Config.batch_size, shuffle=True)
            valid_loader = DataLoader(valid_dataset, batch_size=Config.batch_size, shuffle=True)
            data_loader = {"train_loader": train_loader, "valid_loader": valid_loader}
            print(f"- {fold_idx} fold -")
            best_model = train(data_loader)
            best_models.append(best_model)

    test_transforms = A.Compose([A.Normalize()])
    test_files = sorted(Path("./data/test/0").glob("*"))
    test_dataset = TestDataset(file_lists=test_files, transforms=test_transforms)
    test_loader = DataLoader(test_dataset, batch_size=Config.batch_size, shuffle=False)

    answer_logits = []

    model = create_model().to(device)

    for fold_idx, best_model_state in enumerate(best_models, 1):
        model.load_state_dict(best_model_state)
        model.eval()
        fold_logits = []
        with torch.no_grad():
            for iter_idx, test_imgs in enumerate(test_loader, 1):
                test_imgs = test_imgs.to(device)
                test_pred = model(test_imgs)
                fold_logits.extend(test_pred.cpu().tolist())
                print(f"[{fold_idx} fold] inference iteration {iter_idx}/{len(test_loader)}", end="\r")
        answer_logits.append(fold_logits)

    answer_logits = np.mean(answer_logits, axis=0)
    answer_value = np.argmax(answer_logits, axis=-1)

    i = 0
    while True:
        submission_path = f"submissions/submission_{i}.csv"
        if not Path(submission_path).is_file():
            break
        i += 1

    submission = pd.read_csv("test_answer_sample_.csv", index_col=False)
    submission["answer value"] = answer_value
    submission["answer value"].to_csv(submission_path, index=False)
    print("\nAll done.")

'코드테스트' 카테고리의 다른 글

파이토치_1  (1) 2023.10.31
프로그래머스_1  (0) 2023.10.26
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.models as models
import torch.nn.functional as F
from torchvision import transforms, datasets

# Constants
USE_CUDA = torch.cuda.is_available()
DEVICE = torch.device("cuda" if USE_CUDA else "cpu")
BATCH_SIZE = 32
EPOCHS = 10
MEAN = [0.5, 0.5, 0.5]
STD = [0.5, 0.5, 0.5]

print(f'Using Pytorch version: {torch.__version__}, Device: {DEVICE}')


def denormalize(tensor):
    mean_tensor = torch.tensor(MEAN).view(-1, 1, 1)
    std_tensor = torch.tensor(STD).view(-1, 1, 1)
    return tensor * std_tensor + mean_tensor


def get_dataloaders(batch_size):
    data_transform = {
        'train': transforms.Compose([
            transforms.RandomResizedCrop(224),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Normalize(MEAN, STD)
        ]),
        'val': transforms.Compose([
            transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize(MEAN, STD)
        ])
    }

    image_datasets = {x: datasets.ImageFolder("./hymenoptera_data", data_transform[x]) for x in ['train', 'val']}
    return {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=batch_size, num_workers=0, shuffle=True) for x in ['train', 'val']}


def display_images(dataloader):
    for (X_train, y_train) in dataloader:
        print(f'X_train: {X_train.size()}, type: {X_train.type()}')
        print(f'y_train: {y_train.size()}, type: {y_train.type()}')
        break

    plt.figure(figsize=(10, 1))
    for i in range(10):
        plt.subplot(1, 10, i + 1)
        plt.axis('off')
        image_to_display = denormalize(X_train[i]).detach().cpu().numpy()
        plt.imshow(np.transpose(image_to_display, (1, 2, 0)))
        plt.title(f'Class: {y_train[i].item()}')
    plt.show()


def train_and_evaluate(model, train_loader, val_loader, optimizer, epochs, save_path="best_model_weights.pth"):
    best_val_loss = float("inf")  # initialize with a high value

    for epoch in range(1, epochs + 1):
        # Training loop
        model.train()
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(DEVICE), target.to(DEVICE)
            optimizer.zero_grad()
            output = model(data)
            loss = F.cross_entropy(output, target)
            loss.backward()
            optimizer.step()

            if batch_idx % 10 == 0:
                print(f"Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)} ({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}")

        # Evaluation loop
        model.eval()
        test_loss, correct = 0, 0
        with torch.no_grad():
            for data, target in val_loader:
                data, target = data.to(DEVICE), target.to(DEVICE)
                output = model(data)
                test_loss += F.cross_entropy(output, target, reduction="sum").item()
                prediction = output.argmax(dim=1)
                correct += prediction.eq(target).sum().item()

        test_loss /= len(val_loader.dataset)
        test_accuracy = 100. * correct / len(val_loader.dataset)
        print(f"[{epoch}] Test Loss: {test_loss:.4f}, accuracy: {test_accuracy:.2f}%\n")

        # Save the model weights if this epoch has the best validation loss so far
        if test_loss < best_val_loss:
            best_val_loss = test_loss
            torch.save(model.state_dict(), save_path)
            print(f"Model weights saved to {save_path} with validation loss: {best_val_loss:.4f}")


def main():
    dataloaders = get_dataloaders(BATCH_SIZE)
    display_images(dataloaders['train'])

    model = models.resnet18(pretrained=True)
    model.fc = nn.Linear(model.fc.in_features, 2)  # Modify the final FC layer
    model = model.to(DEVICE)

    optimizer = optim.Adam(model.parameters(), lr=0.0001)
    train_and_evaluate(model, dataloaders["train"], dataloaders["val"], optimizer, EPOCHS)


if __name__ == "__main__":
    main()

'코드테스트' 카테고리의 다른 글

파이토치_2  (1) 2023.10.31
프로그래머스_1  (0) 2023.10.26
## 문제

### 요청 사항 1. 전체 고객 수

---

#### ✅   구현 조건
- 고객 정보에 존재 하는 전체 고객의 수를 확인해 주세요.
#### ✅   정답 예시

```json
// problem_1.json
{
  "total": 100
}
```

### 요청 사항 2. 휴면 고객 리스트

---

#### ✅   구현 조건
- 현재 고객 상태(status)가 휴면(dormant)인 고객을 대상으로 이벤트를 기획하고 있습니다. 휴면 고객 ID 리스트를 출력해 주세요.

#### ✅   제약 사항
- 고객 ID를 기준으로 오름차순 정렬하여 출력합니다.

#### ✅   정답 예시

```json
// problem_2.json
[
    100,
    101,
    104,
    106,
    110,
    ...
]
```

 

 

import os
import json

class CustomerManager:

    def __init__(self, input_file_path, output_file_path):
        self.input_file_path = input_file_path
        self.output_file_path = output_file_path
        if not os.path.exists(output_file_path):
            os.makedirs(output_file_path)
        with open(os.path.join(input_file_path, 'customer.json'), 'r', encoding='utf-8') as file:
            self.customers = json.load(file)

    def count_total_customers(self, output_file='problem_1.json'):
        total_customers = len(self.customers)
        print(f"전체 고객의 수: {total_customers}")
        self._write_to_json({'total': total_customers}, output_file)

    def check_status_customers(self, output_file='problem_2.json'):
        dormant_customers = [customer['customer_id'] for customer in self.customers if customer['status'] == 'dormant']
        dormant_customers.sort()
        print("휴먼 상태의 고객 ID 리스트:")
        for customer_id in dormant_customers:
            print(customer_id)
        self._write_to_json(dormant_customers, output_file)

    def _write_to_json(self, data, file_name):
        with open(os.path.join(self.output_file_path, file_name), 'w', encoding='utf-8') as file:
            json.dump(data, file, ensure_ascii=False, indent=4)


if __name__ == '__main__':
    manager = CustomerManager(input_file_path='./data/input', output_file_path='./data/output')
    manager.count_total_customers()
    manager.check_status_customers()

 

'코드테스트' 카테고리의 다른 글

파이토치_2  (1) 2023.10.31
파이토치_1  (1) 2023.10.31

+ Recent posts