ML Pipeline Tutorial: Build Your First Production ML Pipeline (2026)

ML Pipeline Tutorial: Build Your First Production ML Pipeline (2026)

ML Pipeline Tutorial: Build Your First Production ML Pipeline (2026)

Stop duct-taping scripts together. This step-by-step tutorial takes you from raw data to a live, containerized, monitored model — in under 60 minutes.

⏱ 60 min read & build Intermediate level MLflow · Docker · FastAPI
01
What Is an ML Pipeline — And Why Manual Steps Are Killing Your Models

An ML pipeline is an automated, end-to-end sequence of steps that takes raw data and produces a deployed, monitored machine learning model. Think of it as a factory assembly line: each station does one job, passes the result forward, and nothing is left to human memory or one-off scripts.

If you’ve ever re-run a notebook three times because you forgot to re-run a cell — or discovered your production model was trained on un-normalized data — you already know the pain. This ML pipeline tutorial exists to end that.

End-to-end ML pipeline diagram

Figure 1: A complete end-to-end ML pipeline — from data ingestion to production deployment and monitoring

Problem
Manual, error-prone steps
Solution
Automated ML pipeline
Result
Reproducible production models
📖 What You’ll Build:
  • Automated data ingestion + validation
  • Preprocessing module (clean, transform, split)
  • MLflow-tracked model training with logged metrics
  • Validation gate with accuracy thresholds
  • Model registration workflow (Staging → Production)
  • Dockerized FastAPI serving endpoint
✅ Prerequisites: Python 3.10+, pip, Docker Desktop. Basic scikit-learn knowledge helps. All code is provided.

02
Data Ingestion — Load & Validate Your Dataset
~10 min
ML pipeline lifecycle

Figure 2: The ML pipeline lifecycle — from data collection to continuous monitoring and maintenance

Install Dependencies

pip install pandas scikit-learn mlflow fastapi uvicorn

ingest.py — Load & Validate

# ingest.py — Stage 1: Data Ingestion
import pandas as pd
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def load_data(path: str) -> pd.DataFrame:
    """Load CSV dataset and run basic validation."""
    logger.info(f"Loading data from {path}")
    df = pd.read_csv(path)

    # Validation checks
    assert len(df) > 0, "Dataset is empty"
    assert df.isnull().sum().sum() / df.size < 0.1, "Too many nulls (>10%)"
    assert "target" in df.columns, "Missing 'target' column"

    logger.info(f"✓ Loaded {len(df)} rows, {len(df.columns)} cols")
    return df

if __name__ == "__main__":
    df = load_data("data/raw/dataset.csv")
    df.to_parquet("data/ingested/dataset.parquet", index=False)
✦ Pro tip: Always save ingested data as Parquet, not CSV. It’s 3–10× smaller, reads faster, and preserves dtypes exactly.

03
Data Preprocessing — Clean, Transform, Split
~10 min

Preprocessing is where most ML pipelines fall apart. Transformations get applied inconsistently between training and inference, train/test leakage sneaks in, and one-off fixes pile up. The fix: a stateless, reusable preprocessing module that fits on train data only, then transforms everything else.

# preprocess.py — Stage 2: Preprocessing
import pandas as pd
import joblib
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.model_selection import train_test_split

def build_preprocessor() -> Pipeline:
    return Pipeline([
        ("imputer", SimpleImputer(strategy="median")),
        ("scaler",  StandardScaler()),
    ])

def preprocess(parquet_path: str):
    df = pd.read_parquet(parquet_path)

    X = df.drop(columns=["target"])
    y = df["target"]

    # Split BEFORE fitting — no leakage
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )

    preprocessor = build_preprocessor()
    X_train_proc = preprocessor.fit_transform(X_train)
    X_test_proc  = preprocessor.transform(X_test)

    # Save processed splits + fitted preprocessor
    joblib.dump({
        "X_train": X_train_proc, "X_test": X_test_proc,
        "y_train": y_train,      "y_test": y_test,
    }, "data/processed/splits.pkl")
    joblib.dump(preprocessor, "artifacts/preprocessor.pkl")

    return X_train_proc, X_test_proc, y_train, y_test

if __name__ == "__main__":
    preprocess("data/ingested/dataset.parquet")
⚠ Train/test leakage is the #1 silent killer of ML pipelines. Always fit your preprocessor on training data only, then use transform() for test/inference.

04
Model Training with MLflow Tracking
~15 min
MLflow tracking UI

Figure 3: MLflow tracking UI — compare experiment runs, parameters, and metrics side-by-side

Start MLflow Tracking Server

mlflow server --backend-store-uri sqlite:///mlflow.db \
              --default-artifact-root ./mlruns \
              --host 0.0.0.0 --port 5000

train.py — Experiment Tracking

# train.py — Stage 3: MLflow-tracked training
import joblib, mlflow, mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score

mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("ml-pipeline-tutorial")

def train(n_estimators: int = 100, max_depth: int = 8):
    splits = joblib.load("data/processed/splits.pkl")
    X_train = splits["X_train"]; y_train = splits["y_train"]
    X_test  = splits["X_test"];  y_test  = splits["y_test"]

    with mlflow.start_run():
        # Log hyperparameters
        mlflow.log_params({
            "n_estimators": n_estimators,
            "max_depth":    max_depth,
            "model_type":   "RandomForest",
        })

        # Train
        model = RandomForestClassifier(
            n_estimators=n_estimators,
            max_depth=max_depth,
            random_state=42, n_jobs=-1
        )
        model.fit(X_train, y_train)

        # Evaluate & log metrics
        preds = model.predict(X_test)
        acc = accuracy_score(y_test, preds)
        f1  = f1_score(y_test, preds, average="weighted")

        mlflow.log_metrics({"accuracy": acc, "f1_score": f1})

        # Log model artifact
        mlflow.sklearn.log_model(model, artifact_path="model")
        run_id = mlflow.active_run().info.run_id

    print(f"✓ Run ID: {run_id} | Accuracy: {acc:.4f} | F1: {f1:.4f}")
    return run_id, acc

if __name__ == "__main__":
    train(n_estimators=200, max_depth=10)
✦ MLflow UI: After running, open http://localhost:5000 to see every experiment, compare runs side-by-side, and inspect artifacts.

05
Model Validation — Enforce Accuracy Thresholds
~5 min
# validate.py — Stage 4: Model Validation Gate
import mlflow

ACCURACY_THRESHOLD = 0.85   # must beat this
BASELINE_ACCURACY  = 0.78   # current production baseline

def validate_run(run_id: str) -> bool:
    client = mlflow.MlflowClient()
    run = client.get_run(run_id)
    acc = run.data.metrics["accuracy"]
    f1  = run.data.metrics["f1_score"]

    print(f"Accuracy : {acc:.4f}  (threshold: {ACCURACY_THRESHOLD})")
    print(f"Baseline : {BASELINE_ACCURACY}")

    passed = (acc >= ACCURACY_THRESHOLD) and (acc > BASELINE_ACCURACY)

    if passed:
        print("✓ PASSED — model clears all gates")
    else:
        raise ValueError(f"✗ FAILED — accuracy {acc:.4f} below threshold")

    return passed

Why Thresholds Matter

ScenarioAccuracyGateAction
Model improves0.91PASSContinue to registration
Below threshold0.79FAILPipeline halts, alert fires
Worse than baseline0.82WARNFail + retrain triggered

06
Model Registration — Staging → Production
~5 min
# register.py — Stage 5: Model Registration
import mlflow

MODEL_NAME = "ml-pipeline-classifier"

def register_model(run_id: str) -> str:
    model_uri = f"runs:/{run_id}/model"
    client = mlflow.MlflowClient()

    # Register → creates version N in MLflow Registry
    result = mlflow.register_model(model_uri, MODEL_NAME)
    version = result.version
    print(f"✓ Registered as version {version}")

    # Promote to Staging
    client.transition_model_version_stage(
        name=MODEL_NAME, version=version, stage="Staging",
        archive_existing_versions=False
    )
    print(f"✓ Version {version} → Staging")

    # After integration tests pass, promote to Production
    client.transition_model_version_stage(
        name=MODEL_NAME, version=version, stage="Production",
        archive_existing_versions=True
    )
    print(f"✓ Version {version} → Production 🚀")
    return version
ℹ Note: Setting archive_existing_versions=True automatically moves the previously active Production model to Archived.

07
Deployment — Containerize with Docker, Serve with FastAPI
~15 min
FastAPI Swagger UI

Figure 4: FastAPI auto-generates interactive API documentation at /docs — test predictions without writing any code

serve.py — FastAPI Inference Endpoint

# serve.py — Stage 6: FastAPI Model Server
import mlflow, joblib, numpy as np
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List

app = FastAPI(
    title="ML Pipeline Model API",
    description="Production ML model serving endpoint",
    version="1.0.0"
)

# Load model & preprocessor at startup
MODEL_URI      = "models:/ml-pipeline-classifier/Production"
PREPROCESSOR   = "artifacts/preprocessor.pkl"

model        = mlflow.sklearn.load_model(MODEL_URI)
preprocessor = joblib.load(PREPROCESSOR)

class PredictRequest(BaseModel):
    features: List[List[float]]

class PredictResponse(BaseModel):
    predictions: List[int]
    probabilities: List[List[float]]

@app.get("/health")
async def health():
    return {"status": "healthy", "model": MODEL_URI}

@app.post("/predict", response_model=PredictResponse)
async def predict(req: PredictRequest):
    try:
        X = preprocessor.transform(np.array(req.features))
        preds  = model.predict(X).tolist()
        probas = model.predict_proba(X).tolist()
        return PredictResponse(predictions=preds, probabilities=probas)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

Dockerfile

# Dockerfile — Production-ready ML serving container
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY serve.py .
COPY artifacts/ artifacts/

EXPOSE 8000

CMD ["uvicorn", "serve:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"]

Build & Run

docker build -t ml-pipeline:v1 .
docker run -p 8000:8000 ml-pipeline:v1

curl -X POST http://localhost:8000/predict \
  -H "Content-Type: application/json" \
  -d '{"features": [[5.1, 3.5, 1.4, 0.2]]}'

# → {"predictions":[0],"probabilities":[[0.97,0.02,0.01]]}
Docker containers deployment

Figure 5: Docker containers enable consistent deployment across any environment — from laptop to production cluster

✦ Auto-generated docs: Your FastAPI server includes interactive Swagger UI at http://localhost:8000/docs.

08
The Complete ML Pipeline — All 6 Stages
📥
Data Ingestion
⚙️
Preprocessing
🧠
Model Training
🎯
Validation
📦
Registration
🚀
Deployment

09
Level Up Your ML Pipeline
🔄

Add Orchestration

Use Apache Airflow or Kubeflow Pipelines to schedule and monitor your pipeline as a DAG.

📊

Add Model Monitoring

Detect data drift and performance degradation with Evidently AI or Whylogs.

☁️

Cloud Deployment

Push your Docker image to AWS ECR or GCP Artifact Registry. Deploy to ECS or GKE.

🔁

CI/CD for ML

Wire your pipeline into GitHub Actions for automatic retraining and deployment.

🔗 Related Tutorials: See Article #9 — MLflow Tutorial for hyperparameter sweeps, and Article #7 — Deploy ML Models with Docker & MLflow for deeper coverage.

Continue Reading

7

Deploy ML Models with FastAPI and Docker

Deep dive into model serving, Docker containers, and production deployment.

8

Kubeflow vs Airflow: Which Pipeline Tool Should You Use?

Choose the right orchestration engine for your ML pipelines.

9

MLflow Tutorial: Track ML Experiments in 20 Minutes

Master experiment tracking, run comparison, and model logging with MLflow.

12

MLflow vs Kubeflow: Which MLOps Platform Should You Choose?

Compare the two leading MLOps platforms side-by-side.

ML Pipeline Tutorial · Updated April 2026 · Built with MLflow · scikit-learn · FastAPI · Docker

Leave a Comment