본문 바로가기
MLOps

MLflow

by estela19 2022. 3. 8.

본 글은 가짜연구소 MLflow 세미나를 바탕으로 작성되었으며, https://github.com/vhrehfdl/MLflow_tutorial를 참고하였습니다.

 

MLflow

mlflow는 End-to-End machine laerning lifecycle을 관리하는 오픈소스 플랫폼이다.

 

MLflow의 기능

  • Tracking : 파라미터와 결과를 비교하기 위해 실험 결과를 저장
  • Projects : 머신러닝 코드를 재사용하고 재현 가능한 형태로 포장
  • Models : 다양한 ML 라이브러리에서 모델을 관리하고 배포, Serving, 추론

 

MLflow 의 필요성

최적의 모델을 찾기위해 수 많은 실험들을 진행하게 되는데, 그 때마다 accuracy, loss, parameter등을 일일이 기록하기 힘들다. 또한 그 변화를 시각화해서 본다면 훨씬 더 좋을것이다!

 

(parameter 기록은 wandb 또한 무척 잘 되어 있지만, 기업 라이센스는 무척 비싼편이다)

 

MLflow UI

overview
실험 상세보기
train accuracy

Tutorial 준비

# 필요 라이브러리
pip install torch==1.7.1
pip install torchtext==0.11.1
pip instaall pandas
pip install sklearn

# 예제코드
git clone https://github.com/vhrehfdl/MLflow_tutorial

 

 

MLflow tracking

  • log_param : 파라미터 저장
  • log_metric : 평가 저장 (파라미터와 다르게 시계열하게 기록 저장 가능)
  • log_artifact : 이미지, csv, 문서 등 모델과 관련된 파일 저장 가능
  • log_model : 학습 모델 저장 가능

<예제코드>

import time
import torch
import mlflow
import pickle
import collections
import pandas as pd
import torch.nn.functional as F

from torch import nn
from torch.utils.data import DataLoader
from torchtext.vocab import build_vocab_from_iterator
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from mlflow.tracking import MlflowClient


def load_data(train_dir):
    df_train = pd.read_csv(train_dir)
    train_x, train_y = df_train["sentence"].tolist(), df_train["label"].tolist()

    encoder = LabelEncoder()
    train_y = encoder.fit_transform(train_y)
    
    return train_x, train_y, encoder


def collate_batch(batch):
    text_pipeline = lambda x: vocab(x)
    label_list, text_list = [], []
    for (_label, _text) in batch:
        label_list.append(_label)
        text_list.append(text_pipeline(_text))

    label_list, text_list = torch.tensor(label_list, dtype=torch.int64), torch.tensor(text_list)
    return label_list.to(device), text_list.to(device)


def text_padding(pos_x, y, max_len, pad_token):
    iterator = []
    for i in range(0, len(pos_x)):
        if len(pos_x[i]) > max_len:
            iterator.append((y[i], pos_x[i][0:max_len]))
        else:
            iterator.append((y[i], pos_x[i][0:] + ([pad_token]*(max_len-len(pos_x[i])))))

    return iterator


class TextCNN(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_class, max_len):
        super(TextCNN, self).__init__()
        num_channels = 100
        kernel_size = [3, 4, 5]
        dropout_keep = 0.5

        self.embeddings = nn.Embedding(vocab_size, embed_dim)
        
        self.conv1 = nn.Sequential(
            nn.Conv1d(in_channels=embed_dim, out_channels=num_channels, kernel_size=kernel_size[0]),
            nn.ReLU(),
            nn.MaxPool1d(max_len - kernel_size[0] + 1)
        )
        self.conv2 = nn.Sequential(
            nn.Conv1d(in_channels=embed_dim, out_channels=num_channels, kernel_size=kernel_size[1]),
            nn.ReLU(),
            nn.MaxPool1d(max_len - kernel_size[1] + 1)
        )
        self.conv3 = nn.Sequential(
            nn.Conv1d(in_channels=embed_dim, out_channels=num_channels, kernel_size=kernel_size[2]),
            nn.ReLU(),
            nn.MaxPool1d(max_len - kernel_size[2] + 1)
        )

        self.dropout = nn.Dropout(dropout_keep)
        self.fc = nn.Linear(num_channels * len(kernel_size), num_class)
        self.softmax = nn.Softmax()

    def forward(self, x):
        embedded_sent = self.embeddings(x).permute(0, 2, 1)
        
        conv_out1 = self.conv1(embedded_sent).squeeze(2)
        conv_out2 = self.conv2(embedded_sent).squeeze(2)
        conv_out3 = self.conv3(embedded_sent).squeeze(2)

        all_out = torch.cat((conv_out1, conv_out2, conv_out3), 1)
        final_feature_map = self.dropout(all_out)
        final_out = self.fc(final_feature_map)

        return self.softmax(final_out)


def train(dataloader, model):
    model.train()
    total_acc, total_count = 0, 0
    log_interval = 10

    for idx, (label, text) in enumerate(dataloader):
        optimizer.zero_grad()
        predicted_label = model(text)
        loss = F.cross_entropy(predicted_label, label)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.1)
        optimizer.step()
        total_acc += (predicted_label.argmax(1) == label).sum().item()
        total_count += label.size(0)

        if idx % log_interval == 0 and idx > 0:
            print('| epoch {:3d} | {:5d}/{:5d} batches | accuracy {:8.3f}'.format(epoch, idx, len(dataloader), total_acc/total_count))
            mlflow.log_metric("train loss", loss.item())
            mlflow.log_metric("train acc", total_acc/total_count)
            
            total_acc, total_count = 0, 0


def test(dataloader, model):
    model.eval()
    total_acc, total_count = 0, 0

    with torch.no_grad():
        for idx, (label, text) in enumerate(dataloader):
            predicted_label = model(text)
            loss = F.cross_entropy(predicted_label, label)
            total_acc += (predicted_label.argmax(1) == label).sum().item()
            total_count += label.size(0)

    return total_acc / total_count


if __name__ == "__main__":
    mlflow.set_experiment('nlp')
    with mlflow.start_run() as run:
        # Hyper Parameters
        train_dir = "train.csv"
        epochs = 3
        max_len = 30
        hidden_dim = 300
        lr = 0.001
        batch_size = 4
        total_acc = None
        device = torch.device("cpu")

        # Flow
        print("1. Load Data")
        train_x, train_y, encoder = load_data(train_dir)
        train_x, val_x, train_y, val_y = train_test_split(train_x, train_y, test_size=0.1, random_state=321, stratify=train_y)

        print("2. Pre Processing")
        train_x = [sentence.split(" ") for sentence in train_x]
        val_x = [sentence.split(" ") for sentence in val_x]

        vocab = build_vocab_from_iterator(train_x, specials=["<unk>"])
        vocab.set_default_index(vocab["<unk>"])

        with open('vocab.pickle', 'wb') as f:
            pickle.dump(vocab, f, pickle.HIGHEST_PROTOCOL)

        train_iter = text_padding(train_x, train_y, max_len, pad_token="<unk>")
        val_iter = text_padding(val_x, val_y, max_len, pad_token="<unk>")

        train_dataloader = DataLoader(train_iter, batch_size=batch_size, shuffle=False, collate_fn=collate_batch)
        val_dataloader = DataLoader(val_iter, batch_size=batch_size, shuffle=False, collate_fn=collate_batch)

        print("3. Build Model")
        model = TextCNN(len(vocab), hidden_dim, num_class=len(list(set(train_y))), max_len=max_len).to(device)
        optimizer = torch.optim.Adam(model.parameters(), lr=lr)
        scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.1)

        print("4. Train")
        for epoch in range(1, epochs + 1):
            start_time = time.time()
            train(train_dataloader, model)
            acc_val = test(val_dataloader, model)
            if total_acc is not None and total_acc > acc_val:
                scheduler.step()
            else:
                total_acc = acc_val

            print('-' * 59)
            print('| end of epoch {:3d} | time: {:5.2f}s | valid accuracy {:8.3f} '.format(epoch, time.time()-start_time, total_acc))
            print('-' * 59)

        # MLflow
        import random
        random_no = random.randrange(0, len(train_x))
        train_y = encoder.inverse_transform(train_y)

        mlflow.log_param("train", train_dir)
        mlflow.log_param("train num", len(train_x))
        mlflow.log_param("class num", len(set(train_y)))
        mlflow.log_param("class", collections.Counter(train_y))
        mlflow.log_param("train example", train_x[random_no])
        mlflow.log_param("train text max length", max([len(x) for x in train_x]))
        mlflow.log_param("train text average length", sum([len(x) for x in train_x])/len(train_x))
        mlflow.log_param("epochs", epochs)

        mlflow.pytorch.log_model(model, "model", pip_requirements=[f"torch=={torch.__version__}"])

 

다음 코드로 ui를 띄울 수 있다.

mlflow ui -h 0.0.0.0 -p 1010

 

MLflow Inference

다음 코드로 artifacts에 저장한 모델을 사용하는 api 서버를 만들 수 있다.

 

<예제코드>

import mlflow
import pickle
import numpy as np


if __name__ == "__main__":
    max_len = 30
    test_x = "좋은 영화 입니다"

    with open('vocab.pickle', 'rb') as f:
        vocab = pickle.load(f)

    test_x = test_x.split(" ")
    text_pipeline = lambda x: vocab(x)
    test_x = text_pipeline(test_x)
    
    if len(test_x) > max_len:
        test_x = test_x[0:max_len]
    else:
        test_x = test_x[0:] + ([0]*(max_len-len(test_x)))

    print(test_x)
    test_x = np.array([test_x])
    loaded_model = mlflow.pyfunc.load_model('runs:/1830630787574f2b9192ffcf7f28052f/model')
    print(loaded_model.predict(test_x))
# 추론서버 실행
mlflow models serve -h 0.0.0.0 -p 1010 -m runs:/f1abc2a1e94145cb813e99f0063aa099/titanic_model --no-conda

 

MLflow Registry

여러명의 모델러가 작업을 할때 한 모델에 대한 결과를 함께 공유하고 싶을때 사용.

tracking server를 띄운 후, 각자 실험결과를 전송하는 방식이다.

위의 코드와 거의 같으나, log_artifact, log_model은 리눅스환경에서 다른 명령어를 사용해야 하므로 사용할 수 없다.

 

import time
import torch
import mlflow
import pickle
import collections
import pandas as pd
import torch.nn.functional as F

from torch import nn
from torch.utils.data import DataLoader
from torchtext.vocab import build_vocab_from_iterator
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from mlflow.tracking import MlflowClient


def load_data(train_dir):
    df_train = pd.read_csv(train_dir)
    train_x, train_y = df_train["sentence"].tolist(), df_train["label"].tolist()

    encoder = LabelEncoder()
    train_y = encoder.fit_transform(train_y)
    
    return train_x, train_y, encoder


def collate_batch(batch):
    text_pipeline = lambda x: vocab(x)
    label_list, text_list = [], []
    for (_label, _text) in batch:
        label_list.append(_label)
        text_list.append(text_pipeline(_text))

    label_list, text_list = torch.tensor(label_list, dtype=torch.int64), torch.tensor(text_list)
    return label_list.to(device), text_list.to(device)


def text_padding(pos_x, y, max_len, pad_token):
    iterator = []
    for i in range(0, len(pos_x)):
        if len(pos_x[i]) > max_len:
            iterator.append((y[i], pos_x[i][0:max_len]))
        else:
            iterator.append((y[i], pos_x[i][0:] + ([pad_token]*(max_len-len(pos_x[i])))))

    return iterator


class TextCNN(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_class, max_len):
        super(TextCNN, self).__init__()
        num_channels = 100
        kernel_size = [3, 4, 5]
        dropout_keep = 0.5

        self.embeddings = nn.Embedding(vocab_size, embed_dim)
        
        self.conv1 = nn.Sequential(
            nn.Conv1d(in_channels=embed_dim, out_channels=num_channels, kernel_size=kernel_size[0]),
            nn.ReLU(),
            nn.MaxPool1d(max_len - kernel_size[0] + 1)
        )
        self.conv2 = nn.Sequential(
            nn.Conv1d(in_channels=embed_dim, out_channels=num_channels, kernel_size=kernel_size[1]),
            nn.ReLU(),
            nn.MaxPool1d(max_len - kernel_size[1] + 1)
        )
        self.conv3 = nn.Sequential(
            nn.Conv1d(in_channels=embed_dim, out_channels=num_channels, kernel_size=kernel_size[2]),
            nn.ReLU(),
            nn.MaxPool1d(max_len - kernel_size[2] + 1)
        )

        self.dropout = nn.Dropout(dropout_keep)
        self.fc = nn.Linear(num_channels * len(kernel_size), num_class)
        self.softmax = nn.Softmax()

    def forward(self, x):
        embedded_sent = self.embeddings(x).permute(0, 2, 1)
        
        conv_out1 = self.conv1(embedded_sent).squeeze(2)
        conv_out2 = self.conv2(embedded_sent).squeeze(2)
        conv_out3 = self.conv3(embedded_sent).squeeze(2)

        all_out = torch.cat((conv_out1, conv_out2, conv_out3), 1)
        final_feature_map = self.dropout(all_out)
        final_out = self.fc(final_feature_map)

        return self.softmax(final_out)


def train(dataloader, model):
    model.train()
    total_acc, total_count = 0, 0
    log_interval = 10

    for idx, (label, text) in enumerate(dataloader):
        optimizer.zero_grad()
        predicted_label = model(text)
        loss = F.cross_entropy(predicted_label, label)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.1)
        optimizer.step()
        total_acc += (predicted_label.argmax(1) == label).sum().item()
        total_count += label.size(0)

        if idx % log_interval == 0 and idx > 0:
            print('| epoch {:3d} | {:5d}/{:5d} batches | accuracy {:8.3f}'.format(epoch, idx, len(dataloader), total_acc/total_count))
            mlflow.log_metric("train loss", loss.item())
            mlflow.log_metric("train acc", total_acc/total_count)
            
            total_acc, total_count = 0, 0


def test(dataloader, model):
    model.eval()
    total_acc, total_count = 0, 0

    with torch.no_grad():
        for idx, (label, text) in enumerate(dataloader):
            predicted_label = model(text)
            loss = F.cross_entropy(predicted_label, label)
            total_acc += (predicted_label.argmax(1) == label).sum().item()
            total_count += label.size(0)

    return total_acc / total_count


if __name__ == "__main__":
    mlflow.set_tracking_uri("http://127.0.0.1:1010")
    exp_info = MlflowClient().get_experiment_by_name("nlp")
    exp_id = exp_info.experiment_id if exp_info else MlflowClient().create_experiment("nlp")
    with mlflow.start_run(experiment_id=exp_id) as run:

    mlflow.set_experiment('nlp')
    # Hyper Parameters
    train_dir = "train.csv"
    epochs = 3
    max_len = 30
    hidden_dim = 300
    lr = 0.001
    batch_size = 4
    total_acc = None
    device = torch.device("cpu")

    # Flow
    print("1. Load Data")
    train_x, train_y, encoder = load_data(train_dir)
    train_x, val_x, train_y, val_y = train_test_split(train_x, train_y, test_size=0.1, random_state=321, stratify=train_y)

    print("2. Pre Processing")
    train_x = [sentence.split(" ") for sentence in train_x]
    val_x = [sentence.split(" ") for sentence in val_x]

    vocab = build_vocab_from_iterator(train_x, specials=["<unk>"])
    vocab.set_default_index(vocab["<unk>"])

    with open('vocab.pickle', 'wb') as f:
        pickle.dump(vocab, f, pickle.HIGHEST_PROTOCOL)

    train_iter = text_padding(train_x, train_y, max_len, pad_token="<unk>")
    val_iter = text_padding(val_x, val_y, max_len, pad_token="<unk>")

    train_dataloader = DataLoader(train_iter, batch_size=batch_size, shuffle=False, collate_fn=collate_batch)
    val_dataloader = DataLoader(val_iter, batch_size=batch_size, shuffle=False, collate_fn=collate_batch)

    print("3. Build Model")
    model = TextCNN(len(vocab), hidden_dim, num_class=len(list(set(train_y))), max_len=max_len).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.1)

    print("4. Train")
    for epoch in range(1, epochs + 1):
        start_time = time.time()
        train(train_dataloader, model)
        acc_val = test(val_dataloader, model)
        if total_acc is not None and total_acc > acc_val:
            scheduler.step()
        else:
            total_acc = acc_val

        print('-' * 59)
        print('| end of epoch {:3d} | time: {:5.2f}s | valid accuracy {:8.3f} '.format(epoch, time.time()-start_time, total_acc))
        print('-' * 59)

    # MLflow
    import random
    random_no = random.randrange(0, len(train_x))
    train_y = encoder.inverse_transform(train_y)

    mlflow.log_param("train", train_dir)
    mlflow.log_param("train num", len(train_x))
    mlflow.log_param("class num", len(set(train_y)))
    mlflow.log_param("class", collections.Counter(train_y))
    mlflow.log_param("train example", train_x[random_no])
    mlflow.log_param("train text max length", max([len(x) for x in train_x]))
    mlflow.log_param("train text average length", sum([len(x) for x in train_x])/len(train_x))
    mlflow.log_param("epochs", epochs)

    # mlflow.pytorch.log_model(model, "model", pip_requirements=[f"torch=={torch.__version__}"])

서버로 보내기 위해서는 해당 폴더에서 mlflow ui를 한번 실행한 후, 서버에 기록할때 mlruns 폴더안의 모델 라벨을 기록해준다.

 

 

위의 mlruns 폴더안의 모델라벨 'f1abc~' 을 복사하여 기록을 전송할때 runs:/<모델라벨>/<모델이름> 형태로 실행

# 최초실행시
mlflow ui -h 0.0.0.0 -p 1010

# 기록전송
mlflow models serve -h 0.0.0.0 -p 1010 -m runs:/f1abc2a1e94145cb813e99f0063aa099/titanic_model --no-conda

 

'MLOps' 카테고리의 다른 글

MLOps 란?  (0) 2022.03.18

댓글