MLFlow nâng cao

High-level Architecture

image.png

Mối quan hệ giữa các thành phần

  1. Training Pipeline tạo ra mô hình → đẩy vào Model Registry.

  2. Deployment Pipeline lấy mô hình từ registry → triển khai trên AWS → cung cấp API inference.

  3. Traffic Data cập nhật ground truth → dùng để cải thiện mô hình qua feedback loop.

Dưới đây là giải thích chi tiết từng thành phần trong kiến trúc bạn cung cấp, được chia theo 3 pipeline chính:

1. Training Pipeline (Quy trình huấn luyện mô hình)

Dataset
Data processing
Training
Evaluation
Registration

2. Deployment Pipeline (Quy trình triển khai mô hình)

Data capturing
Inference endpoint
AWS
Model
Model Registry

3. Traffic Data (Dữ liệu giao thông)

Ground truth
Traffic data and labels
Testing
Reports & Metrics and reports

Tác giả: Đỗ Ngọc Tú
Công Ty Phần Mềm VHTSoft

Cài đặt môi trường

Đảm bảo trên máy đã cài Python 3.12 hoặc cao hơn

python3 -m venv .venv
source .venv/bin/activate
pip3 install -U pip && pip3 install -r requirements.txt

file requirements.txt download tại https://github.com/vhtsoft/machine-learning.git

Tại thời điểm này, bạn sẽ có một môi trường Python đang hoạt động với tất cả các phụ thuộc cần thiết. Bước cuối cùng là tạo một tệp .env bên trong thư mục gốc của kho lưu trữ. Chúng ta sẽ sử dụng tệp này để xác định các biến môi trường cần thiết để chạy các pipeline

echo "KERAS_BACKEND=jax" >> .env

Chạy MLflow

mlflow server --host 127.0.0.1 --port 5000

image.png

Tại Browser

http://localhost:5000/

image.png

Theo mặc định, MLflow theo dõi các thử nghiệm và lưu trữ dữ liệu trong các tệp bên trong thư mục ./mlruns cục bộ. Bạn có thể thay đổi vị trí của thư mục theo dõi hoặc sử dụng cơ sở dữ liệu SQLite bằng tham số --backend-store-uri. Ví dụ sau sử dụng cơ sở dữ liệu SQLite để lưu trữ dữ liệu theo dõi:

mlflow server --host 127.0.0.1 --port 5000 --backend-store-uri sqlite:///mlflow.db

Để biết thêm thông tin, hãy kiểm tra một số cách phổ biến để thiết lập MLflow. Bạn cũng có thể chạy lệnh sau để biết thêm thông tin về máy chủ:

mlflow server --help

Sau khi máy chủ chạy, hãy sửa đổi tệp .env bên trong thư mục gốc của kho lưu trữ để thêm biến môi trường MLFLOW_TRACKING_URI trỏ đến URI theo dõi của máy chủ MLflow. Lệnh sau sẽ thêm biến vào tệp và xuất nó trong shell hiện tại của bạn:

export $((echo "MLFLOW_TRACKING_URI=http://127.0.0.1:5000" >> .env; cat .env) | xargs)

image.png

Tác giả: Đỗ Ngọc Tú
Công Ty Phần Mềm VHTSoft

 

Tổng Quan Qui Trình Pipeline Huấn Luyện Mô Hình

image.png

Đây là một quy trình (pipeline) huấn luyện mô hình học máy theo phương pháp Cross-Validation (kiểm tra chéo), thường được sử dụng để đánh giá hiệu suất mô hình một cách ổn định. Dưới đây là giải thích chi tiết từng bước:


1. Tổng quan

Pipeline này mô tả quy trình K-Fold Cross-Validation, trong đó:


2. Giải thích từng thành phần

a. Split dataset
b. For each fold...
c. Transform dataset
d. Train model
e. Evaluate model
f. Model assets
g. Register model
h. Model registry

3. Luồng hoạt động của pipeline

  1. Chia dữ liệu → K folds.

  2. Với mỗi fold:

    • Transform dữ liệu train/validation.

    • Train mô hình trên train set.

    • Evaluate trên validation set.

  3. Tổng hợp kết quả từ K lần evaluate để tính độ ổn định của mô hình (ví dụ: mean accuracy ± độ lệch chuẩn).

  4. Lưu mô hình tốt nhất vào Model Registry để triển khai.


4. Ứng dụng thực tế

Tác giả: Đỗ Ngọc Tú
Công Ty Phần Mềm VHTSoft

Xây Dựng Hàm Phụ Trợ PipeLine

Trong bài này sử dụng tập dữ liệu Penguins để đào tạo một mô hình phân loại các loài chim cánh cụt.

I. DatasetMixin Class

tại thư mục gốc tạo pipelines/common.py

import logging
import logging.config
import sys
import time
from io import StringIO
from pathlib import Path

import pandas as pd
from metaflow import IncludeFile, current

PYTHON = "3.12.8"

PACKAGES = {
    "keras": "3.8.0",
    "scikit-learn": "1.6.1",
    "mlflow": "2.20.2",
    "tensorflow": "2.18.0",
}


class DatasetMixin:
    """A mixin for loading and preparing a dataset.

    This mixin is designed to be combined with any pipeline that requires accessing
    a dataset.
    """

    dataset = IncludeFile(
        "dataset",
        is_text=True,
        help="Dataset that will be used to train the model.",
        default="data/penguins.csv",
    )

    def load_dataset(self):
        """Load and prepare the dataset."""
        import numpy as np

        # The raw data is passed as a string, so we need to convert it into a DataFrame.
        data = pd.read_csv(StringIO(self.dataset))

        # Replace extraneous values in the sex column with NaN. We can handle missing
        # values later in the pipeline.
        data["sex"] = data["sex"].replace(".", np.nan)

        # We want to shuffle the dataset. For reproducibility, we can fix the seed value
        # when running in development mode. When running in production mode, we can use
        # the current time as the seed to ensure a different shuffle each time the
        # pipeline is executed.
        seed = int(time.time() * 1000) if current.is_production else 42
        generator = np.random.default_rng(seed=seed)
        data = data.sample(frac=1, random_state=generator)

        logging.info("Loaded dataset with %d samples", len(data))

        return data


def packages(*names: str):
    """Return a dictionary of the specified packages and their corresponding version.

    This function is useful to set up the different pipelines while keeping the
    package versions consistent and centralized in a single location.

    Any packages that should be locked to a specific version will be part of the
    `PACKAGES` dictionary. If a package is not present in the dictionary, it will be
    installed using the latest version available.
    """
    return {name: PACKAGES.get(name, "") for name in names}


def configure_logging():
    """Configure logging handlers and return a logger instance."""
    if Path("logging.conf").exists():
        logging.config.fileConfig("logging.conf")
    else:
        logging.basicConfig(
            format="%(asctime)s [%(levelname)s] %(message)s",
            handlers=[logging.StreamHandler(sys.stdout)],
            level=logging.INFO,
        )


def build_features_transformer():
    """Build a Scikit-Learn transformer to preprocess the feature columns."""
    from sklearn.compose import ColumnTransformer, make_column_selector
    from sklearn.impute import SimpleImputer
    from sklearn.pipeline import make_pipeline
    from sklearn.preprocessing import OneHotEncoder, StandardScaler

    numeric_transformer = make_pipeline(
        SimpleImputer(strategy="mean"),
        StandardScaler(),
    )

    categorical_transformer = make_pipeline(
        SimpleImputer(strategy="most_frequent"),
        # We can use the `handle_unknown="ignore"` parameter to ignore unseen categories
        # during inference. When encoding an unknown category, the transformer will
        # return an all-zero vector.
        OneHotEncoder(handle_unknown="ignore"),
    )

    return ColumnTransformer(
        transformers=[
            (
                "numeric",
                numeric_transformer,
                # We'll apply the numeric transformer to all columns that are not
                # categorical (object).
                make_column_selector(dtype_exclude="object"),
            ),
            (
                "categorical",
                categorical_transformer,
                # We want to make sure we ignore the target column which is also a
                # categorical column. To accomplish this, we can specify the column
                # names we only want to encode.
                ["island", "sex"],
            ),
        ],
    )


def build_target_transformer():
    """Build a Scikit-Learn transformer to preprocess the target column."""
    from sklearn.compose import ColumnTransformer
    from sklearn.preprocessing import OrdinalEncoder

    return ColumnTransformer(
        transformers=[("species", OrdinalEncoder(), ["species"])],
    )


def build_model(input_shape, learning_rate=0.01):
    """Build and compile the neural network to predict the species of a penguin."""
    from keras import Input, layers, models, optimizers

    model = models.Sequential(
        [
            Input(shape=(input_shape,)),
            layers.Dense(10, activation="relu"),
            layers.Dense(8, activation="relu"),
            layers.Dense(3, activation="softmax"),
        ],
    )

    model.compile(
        optimizer=optimizers.SGD(learning_rate=learning_rate),
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"],
    )

    return model

Class DatasetMixin

Là một mixin — class phụ dùng để thêm khả năng load_dataset() vào các pipeline Metaflow.

dataset = IncludeFile(
    "dataset",
    is_text=True,
    help="Dataset that will be used to train the model.",
    default="data/penguins.csv",
)

load_dataset()

def load_dataset(self):
    ...

Hàm packages(*names)

def packages(*names: str):
    return {name: PACKAGES.get(name, "") for name in names}

Khi bạn viết Metaflow @conda_base hoặc @conda decorator, bạn có thể truyền gọn:

@conda(packages=packages("keras", "scikit-learn"))

Hàm configure_logging()

def configure_logging():
    ...

build_features_transformer()

def build_features_transformer():
    ...

Trả về một ColumnTransformer để xử lý:

["island", "sex"]  # là các cột categorical cụ thể

handle_unknown="ignore" giúp model không crash khi gặp category mới trong inference.

build_target_transformer()

def build_target_transformer():
    ...

build_model(input_shape, learning_rate)

def build_model(input_shape, learning_rate=0.01):
    ...

Tổng Kết

Phần Mục đích
DatasetMixin Load CSV dataset dùng IncludeFile, shuffle, xử lý giá trị thiếu
packages() Tập trung quản lý version package
configure_logging() Ghi log ra terminal hoặc file
build_features_transformer() Chuẩn hóa, encode dữ liệu đầu vào
build_target_transformer() Encode target sang số
build_model() Khởi tạo mô hình MLP với Keras

Tác giả: Đỗ Ngọc Tú
Công Ty Phần Mềm VHTSoft

Bắt đầu với Pipeline

 @card
    @step
    def start(self):
        """Start and prepare the Training pipeline."""
        import mlflow

        mlflow.set_tracking_uri(self.mlflow_tracking_uri)
        logging.info("MLflow tracking server: %s", self.mlflow_tracking_uri)

        self.mode = "production" if current.is_production else "development"
        logging.info("Running flow in %s mode.", self.mode)

        self.data = self.load_dataset()

        try:
            # Let's start a new MLflow run to track the execution of this flow. We want
            # to set the name of the MLflow run to the Metaflow run ID so we can easily
            # recognize how they relate to each other.
            run = mlflow.start_run(run_name=current.run_id)
            self.mlflow_run_id = run.info.run_id
        except Exception as e:
            message = f"Failed to connect to MLflow server {self.mlflow_tracking_uri}."
            raise RuntimeError(message) from e

        # Now that everything is set up, we want to run a cross-validation process
        # to evaluate the model and train a final model on the entire dataset. Since
        # these two steps are independent, we can run them in parallel.
        self.next(self.cross_validation, self.transform)

1. Decorators

@card
@step
def start(self):

2. Khởi tạo MLflow Tracking

 

import mlflow
mlflow.set_tracking_uri(self.mlflow_tracking_uri)
logging.info("MLflow tracking server: %s", self.mlflow_tracking_uri)

3. Xác định chế độ chạy

self.mode = "production" if current.is_production else "development"
logging.info("Running flow in %s mode.", self.mode)

4. Tải dữ liệu

self.data = self.load_dataset()
def load_dataset(self):
    return pd.read_csv("penguins.csv")

5. Thiết lập MLflow Run

try:
    run = mlflow.start_run(run_name=current.run_id)
    self.mlflow_run_id = run.info.run_id
except Exception as e:
    message = f"Failed to connect to MLflow server {self.mlflow_tracking_uri}."
    raise RuntimeError(message) from e

6. Chia nhánh pipeline

self.next(self.cross_validation, self.transform)

Tóm tắt luồng xử lý

  1. Thiết lập MLflow Tracking Server.

  2. Xác định chế độ chạy (production/development).

  3. Tải dữ liệu từ nguồn (CSV, database, API...).

  4. Bắt đầu MLflow Run và liên kết với Metaflow Run.

  5. Chia thành 2 nhánh song song: Cross-Validation và Transform.

 

Tác giả: Đỗ Ngọc Tú
Công Ty Phần Mềm VHTSoft