ML Pipeline Tutorial: Build Your First Production ML Pipeline (2026)
Stop duct-taping scripts together. This step-by-step tutorial takes you from raw data to a live, containerized, monitored model — in under 60 minutes.
An ML pipeline is an automated, end-to-end sequence of steps that takes raw data and produces a deployed, monitored machine learning model. Think of it as a factory assembly line: each station does one job, passes the result forward, and nothing is left to human memory or one-off scripts.
If you’ve ever re-run a notebook three times because you forgot to re-run a cell — or discovered your production model was trained on un-normalized data — you already know the pain. This ML pipeline tutorial exists to end that.
Figure 1: A complete end-to-end ML pipeline — from data ingestion to production deployment and monitoring
- Automated data ingestion + validation
- Preprocessing module (clean, transform, split)
- MLflow-tracked model training with logged metrics
- Validation gate with accuracy thresholds
- Model registration workflow (Staging → Production)
- Dockerized FastAPI serving endpoint
Figure 2: The ML pipeline lifecycle — from data collection to continuous monitoring and maintenance
Install Dependencies
pip install pandas scikit-learn mlflow fastapi uvicorn
ingest.py — Load & Validate
# ingest.py — Stage 1: Data Ingestion
import pandas as pd
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def load_data(path: str) -> pd.DataFrame:
"""Load CSV dataset and run basic validation."""
logger.info(f"Loading data from {path}")
df = pd.read_csv(path)
# Validation checks
assert len(df) > 0, "Dataset is empty"
assert df.isnull().sum().sum() / df.size < 0.1, "Too many nulls (>10%)"
assert "target" in df.columns, "Missing 'target' column"
logger.info(f"✓ Loaded {len(df)} rows, {len(df.columns)} cols")
return df
if __name__ == "__main__":
df = load_data("data/raw/dataset.csv")
df.to_parquet("data/ingested/dataset.parquet", index=False)
Preprocessing is where most ML pipelines fall apart. Transformations get applied inconsistently between training and inference, train/test leakage sneaks in, and one-off fixes pile up. The fix: a stateless, reusable preprocessing module that fits on train data only, then transforms everything else.
# preprocess.py — Stage 2: Preprocessing
import pandas as pd
import joblib
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.model_selection import train_test_split
def build_preprocessor() -> Pipeline:
return Pipeline([
("imputer", SimpleImputer(strategy="median")),
("scaler", StandardScaler()),
])
def preprocess(parquet_path: str):
df = pd.read_parquet(parquet_path)
X = df.drop(columns=["target"])
y = df["target"]
# Split BEFORE fitting — no leakage
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
preprocessor = build_preprocessor()
X_train_proc = preprocessor.fit_transform(X_train)
X_test_proc = preprocessor.transform(X_test)
# Save processed splits + fitted preprocessor
joblib.dump({
"X_train": X_train_proc, "X_test": X_test_proc,
"y_train": y_train, "y_test": y_test,
}, "data/processed/splits.pkl")
joblib.dump(preprocessor, "artifacts/preprocessor.pkl")
return X_train_proc, X_test_proc, y_train, y_test
if __name__ == "__main__":
preprocess("data/ingested/dataset.parquet")
Figure 3: MLflow tracking UI — compare experiment runs, parameters, and metrics side-by-side
Start MLflow Tracking Server
mlflow server --backend-store-uri sqlite:///mlflow.db \
--default-artifact-root ./mlruns \
--host 0.0.0.0 --port 5000
train.py — Experiment Tracking
# train.py — Stage 3: MLflow-tracked training
import joblib, mlflow, mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("ml-pipeline-tutorial")
def train(n_estimators: int = 100, max_depth: int = 8):
splits = joblib.load("data/processed/splits.pkl")
X_train = splits["X_train"]; y_train = splits["y_train"]
X_test = splits["X_test"]; y_test = splits["y_test"]
with mlflow.start_run():
# Log hyperparameters
mlflow.log_params({
"n_estimators": n_estimators,
"max_depth": max_depth,
"model_type": "RandomForest",
})
# Train
model = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
random_state=42, n_jobs=-1
)
model.fit(X_train, y_train)
# Evaluate & log metrics
preds = model.predict(X_test)
acc = accuracy_score(y_test, preds)
f1 = f1_score(y_test, preds, average="weighted")
mlflow.log_metrics({"accuracy": acc, "f1_score": f1})
# Log model artifact
mlflow.sklearn.log_model(model, artifact_path="model")
run_id = mlflow.active_run().info.run_id
print(f"✓ Run ID: {run_id} | Accuracy: {acc:.4f} | F1: {f1:.4f}")
return run_id, acc
if __name__ == "__main__":
train(n_estimators=200, max_depth=10)
# validate.py — Stage 4: Model Validation Gate
import mlflow
ACCURACY_THRESHOLD = 0.85 # must beat this
BASELINE_ACCURACY = 0.78 # current production baseline
def validate_run(run_id: str) -> bool:
client = mlflow.MlflowClient()
run = client.get_run(run_id)
acc = run.data.metrics["accuracy"]
f1 = run.data.metrics["f1_score"]
print(f"Accuracy : {acc:.4f} (threshold: {ACCURACY_THRESHOLD})")
print(f"Baseline : {BASELINE_ACCURACY}")
passed = (acc >= ACCURACY_THRESHOLD) and (acc > BASELINE_ACCURACY)
if passed:
print("✓ PASSED — model clears all gates")
else:
raise ValueError(f"✗ FAILED — accuracy {acc:.4f} below threshold")
return passed
Why Thresholds Matter
| Scenario | Accuracy | Gate | Action |
|---|---|---|---|
| Model improves | 0.91 | PASS | Continue to registration |
| Below threshold | 0.79 | FAIL | Pipeline halts, alert fires |
| Worse than baseline | 0.82 | WARN | Fail + retrain triggered |
# register.py — Stage 5: Model Registration
import mlflow
MODEL_NAME = "ml-pipeline-classifier"
def register_model(run_id: str) -> str:
model_uri = f"runs:/{run_id}/model"
client = mlflow.MlflowClient()
# Register → creates version N in MLflow Registry
result = mlflow.register_model(model_uri, MODEL_NAME)
version = result.version
print(f"✓ Registered as version {version}")
# Promote to Staging
client.transition_model_version_stage(
name=MODEL_NAME, version=version, stage="Staging",
archive_existing_versions=False
)
print(f"✓ Version {version} → Staging")
# After integration tests pass, promote to Production
client.transition_model_version_stage(
name=MODEL_NAME, version=version, stage="Production",
archive_existing_versions=True
)
print(f"✓ Version {version} → Production 🚀")
return version
Figure 4: FastAPI auto-generates interactive API documentation at /docs — test predictions without writing any code
serve.py — FastAPI Inference Endpoint
# serve.py — Stage 6: FastAPI Model Server
import mlflow, joblib, numpy as np
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List
app = FastAPI(
title="ML Pipeline Model API",
description="Production ML model serving endpoint",
version="1.0.0"
)
# Load model & preprocessor at startup
MODEL_URI = "models:/ml-pipeline-classifier/Production"
PREPROCESSOR = "artifacts/preprocessor.pkl"
model = mlflow.sklearn.load_model(MODEL_URI)
preprocessor = joblib.load(PREPROCESSOR)
class PredictRequest(BaseModel):
features: List[List[float]]
class PredictResponse(BaseModel):
predictions: List[int]
probabilities: List[List[float]]
@app.get("/health")
async def health():
return {"status": "healthy", "model": MODEL_URI}
@app.post("/predict", response_model=PredictResponse)
async def predict(req: PredictRequest):
try:
X = preprocessor.transform(np.array(req.features))
preds = model.predict(X).tolist()
probas = model.predict_proba(X).tolist()
return PredictResponse(predictions=preds, probabilities=probas)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Dockerfile
# Dockerfile — Production-ready ML serving container
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY serve.py .
COPY artifacts/ artifacts/
EXPOSE 8000
CMD ["uvicorn", "serve:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"]
Build & Run
docker build -t ml-pipeline:v1 .
docker run -p 8000:8000 ml-pipeline:v1
curl -X POST http://localhost:8000/predict \
-H "Content-Type: application/json" \
-d '{"features": [[5.1, 3.5, 1.4, 0.2]]}'
# → {"predictions":[0],"probabilities":[[0.97,0.02,0.01]]}
Figure 5: Docker containers enable consistent deployment across any environment — from laptop to production cluster
Continue Reading
Deploy ML Models with FastAPI and Docker
Deep dive into model serving, Docker containers, and production deployment.
Kubeflow vs Airflow: Which Pipeline Tool Should You Use?
Choose the right orchestration engine for your ML pipelines.
MLflow Tutorial: Track ML Experiments in 20 Minutes
Master experiment tracking, run comparison, and model logging with MLflow.
MLflow vs Kubeflow: Which MLOps Platform Should You Choose?
Compare the two leading MLOps platforms side-by-side.