Deploy machine learning models to production using Flask, FastAPI, Docker, cloud platforms (AWS, GCP, Azure), and model serving frameworks
Model Deployment
Overview
Model deployment is the process of taking a trained machine learning model and making it available for production use through APIs, web services, or batch processing systems.
When to Use
When productionizing trained models for real-world inference and predictions
When building REST APIs or web services for model serving
When scaling predictions to serve multiple users or applications
When deploying models to cloud platforms, edge devices, or containers
When implementing CI/CD pipelines for ML model updates
When creating batch processing systems for large-scale predictions
Deployment Approaches
REST APIs: Flask, FastAPI for synchronous inference
Batch Processing: Scheduled jobs for large-scale predictions
Real-time Streaming: Kafka, Spark Streaming for continuous data
Serverless: AWS Lambda, Google Cloud Functions
Edge Deployment: TensorFlow Lite, ONNX for edge devices
Model Serving: TensorFlow Serving, Seldon Core, BentoML
Key Considerations
Model Format: Pickle, SavedModel, ONNX, PMML
Scalability: Load balancing, auto-scaling
Latency: Response time requirements
Monitoring: Model drift, performance metrics
Versioning: Multiple model versions in production
Python Implementation
import numpy as np
import pandas as pd
import pickle
import json
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import make_classification
import joblib
# FastAPI for REST API
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field
import uvicorn
# For model serving
import mlflow.pyfunc
import mlflow.sklearn
# Docker and deployment
import logging
import time
from typing import List, Dict
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
print("=== 1. Train and Save Model ===")
# Create dataset
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# Train model
model = RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42)
model.fit(X_scaled, y)
# Save model and preprocessing
model_path = '/tmp/model.pkl'
scaler_path = '/tmp/scaler.pkl'
joblib.dump(model, model_path)
joblib.dump(scaler, scaler_path)
print(f"Model saved to {model_path}")
print(f"Scaler saved to {scaler_path}")
# 2. Model Serving Class
print("\n=== 2. Model Serving Class ===")
class ModelPredictor:
def __init__(self, model_path, scaler_path):
self.model = joblib.load(model_path)
self.scaler = joblib.load(scaler_path)
self.load_time = time.time()
self.predictions_count = 0
logger.info("Model loaded successfully")
def predict(self, features: List[List[float]]) -> Dict:
try:
X = np.array(features)
X_scaled = self.scaler.transform(X)
predictions = self.model.predict(X_scaled)
probabilities = self.model.predict_proba(X_scaled)
self.predictions_count += len(X)
return {
'predictions': predictions.tolist(),
'probabilities': probabilities.tolist(),
'count': len(X),
'timestamp': time.time()
}
except Exception as e:
logger.error(f"Prediction error: {str(e)}")
raise
def health_check(self) -> Dict:
return {
'status': 'healthy',
'uptime': time.time() - self.load_time,
'predictions': self.predictions_count
}
# Initialize predictor
predictor = ModelPredictor(model_path, scaler_path)
# 3. FastAPI Application
print("\n=== 3. FastAPI Application ===")
app = FastAPI(
title="ML Model API",
description="Production ML model serving API",
version="1.0.0"
)
class PredictionRequest(BaseModel):
features: List[List[float]] = Field(..., example=[[1.0, 2.0, 3.0]])
class PredictionResponse(BaseModel):
predictions: List[int]
probabilities: List[List[float]]
count: int
timestamp: float
class HealthResponse(BaseModel):
status: str
uptime: float
predictions: int
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""Health check endpoint"""
return predictor.health_check()
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
"""Make predictions"""
try:
result = predictor.predict(request.features)
return result
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/predict-batch")
async def predict_batch(requests: List[PredictionRequest], background_tasks: BackgroundTasks):
"""Batch prediction with background processing"""
all_features = []
for req in requests:
all_features.extend(req.features)
result = predictor.predict(all_features)
background_tasks.add_task(logger.info, f"Batch prediction processed: {result['count']} samples")
return result
@app.get("/stats")
async def get_stats():
"""Get model statistics"""
return {
'model_type': type(predictor.model).__name__,
'n_estimators': predictor.model.n_estimators,
'max_depth': predictor.model.max_depth,
'feature_importance': predictor.model.feature_importances_.tolist(),
'total_predictions': predictor.predictions_count
}
# 4. Dockerfile template
print("\n=== 4. Dockerfile Template ===")
dockerfile_content = '''FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY model.pkl .
COPY scaler.pkl .
COPY app.py .
EXPOSE 8000
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
'''
print("Dockerfile content:")
print(dockerfile_content)
# 5. Requirements file
print("\n=== 5. Requirements.txt ===")
requirements = """fastapi==0.104.1
uvicorn[standard]==0.24.0
numpy==1.24.0
pandas==2.1.0
scikit-learn==1.3.2
joblib==1.3.2
pydantic==2.5.0
mlflow==2.8.1
"""
print("Requirements:")
print(requirements)
# 6. Docker Compose for deployment
print("\n=== 6. Docker Compose Template ===")
docker_compose = '''version: '3.8'
services:
ml-api:
build: .
ports:
- "8000:8000"
environment:
- LOG_LEVEL=info
- WORKERS=4
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 10s
timeout: 5s
retries: 3
ml-monitor:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
command:
- "--config.file=/etc/prometheus/prometheus.yml"
ml-dashboard:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- ./grafana/dashboards:/etc/grafana/provisioning/dashboards
'''
print("Docker Compose content:")
print(docker_compose)
# 7. Testing the API
print("\n=== 7. Testing the API ===")
def test_predictor():
# Test single prediction
test_features = [[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0,
1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1, 8.1, 9.1, 10.1]]
result = predictor.predict(test_features)
print(f"Prediction result: {result}")
# Health check
health = predictor.health_check()
print(f"Health status: {health}")
# Batch predictions
batch_features = [
[1.0] * 20,
[2.0] * 20,
[3.0] * 20,
]
batch_result = predictor.predict(batch_features)
print(f"Batch prediction: {batch_result['count']} samples processed")
test_predictor()
# 8. Model versioning and registry
print("\n=== 8. Model Registry with MLflow ===")
# Log model to MLflow
with mlflow.start_run():
mlflow.sklearn.log_model(model, "model")
mlflow.log_param("max_depth", 10)
mlflow.log_param("n_estimators", 100)
mlflow.log_metric("accuracy", 0.95)
model_uri = "runs:/" + mlflow.active_run().info.run_id + "/model"
print(f"Model logged to MLflow: {model_uri}")
# 9. Deployment monitoring code
print("\n=== 9. Monitoring Setup ===")
class ModelMonitor:
def __init__(self):
self.predictions = []
self.latencies = []
def log_prediction(self, features, prediction, latency):
self.predictions.append({
'timestamp': time.time(),
'features_mean': np.mean(features),
'prediction': prediction,
'latency_ms': latency * 1000
})
def check_model_drift(self):
if len(self.predictions) < 100:
return {'drift_detected': False}
recent_predictions = [p['prediction'] for p in self.predictions[-100:]]
historical_mean = np.mean([p['prediction'] for p in self.predictions[:-100]])
recent_mean = np.mean(recent_predictions)
drift = abs(recent_mean - historical_mean) > 0.1
return {
'drift_detected': drift,
'historical_mean': float(historical_mean),
'recent_mean': float(recent_mean),
'threshold': 0.1
}
def get_stats(self):
if not self.latencies:
return {}
return {
'avg_latency_ms': np.mean(self.latencies) * 1000,
'p95_latency_ms': np.percentile(self.latencies, 95) * 1000,
'p99_latency_ms': np.percentile(self.latencies, 99) * 1000,
'total_predictions': len(self.predictions)
}
monitor = ModelMonitor()
print("\nDeployment setup completed!")
print("To run FastAPI server: uvicorn app:app --reload")
Deployment Checklist
Model format and serialization
Input/output validation
Error handling and logging
Authentication and security
Rate limiting and throttling
Health check endpoints
Monitoring and alerting
Version management
Rollback procedures
Cloud Deployment Options
AWS: SageMaker, Lambda, EC2
GCP: Vertex AI, Cloud Run, App Engine
Azure: Machine Learning, App Service
Kubernetes: Self-managed on-premises
Performance Optimization
Model quantization for smaller size
Caching predictions
Batch processing
GPU acceleration
Request pooling
Deliverables
Deployed model endpoint
API documentation
Docker configuration
Monitoring dashboard
Deployment guide
Performance benchmarks
Scaling recommendationsdon't have the plugin yet? install it then click "run inline in claude" again.