KEY INSIGHT
Building an AI SaaS MVP requires integrating all previously covered components—multi-tenant architecture, billing, compliance, and monitoring—into a cohesive system that can serve Nigerian businesses reliably.
This final chapter synthesizes all previous learning into a complete MVP implementation, demonstrating how the components work together in a production-ready system.
```python
"""
AI SaaS MVP - Complete Implementation
=====================================
This module demonstrates the complete integration of:
- Multi-tenant architecture (Chapter 2-5)
- Authentication and authorization (Chapter 6-8)
- AI integration (Chapter 9-10)
- Database design (Chapter 11-12)
- Billing and payments (Chapter 13-15)
- Monitoring and compliance (Chapter 16-23)
"""
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialize and shutdown application resources."""
logger.info("Starting AI SaaS MVP")
await initialize_services()
await run_migrations()
await verify_configuration()
logger.info("Application started successfully")
yield
logger.info("Shutting down AI SaaS MVP")
await cleanup_resources()
app = FastAPI(
title="AI SaaS MVP",
description="Multi-tenant AI SaaS platform for Nigerian market",
version="1.0.0",
lifespan=lifespan
)
app.add_middleware(
CORSMiddleware,
allow_origins=["https://app.example.com"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
from api.routes import router as api_router
from billing.routes import router as billing_router
from admin.routes import router as admin_router
app.include_router(api_router, prefix="/api/v1")
app.include_router(billing_router, prefix="/api/v1/billing")
app.include_router(admin_router, prefix="/api/v1/admin")
```
**Complete API Implementation:**
```python
# api/routes.py
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from typing import Optional
from datetime import datetime
router = APIRouter(prefix="/ai", tags=["AI Services"])
class AIRequest(BaseModel):
prompt: str
model: str = "gpt-3.5-turbo"
max_tokens: Optional[int] = 1000
temperature: Optional[float] = 0.7
class AIResponse(BaseModel):
request_id: str
completion: str
usage: dict
model: str
processing_time_ms: int
@router.post("/complete", response_model=AIResponse)
async def ai_complete(
request: AIRequest,
tenant: Tenant = Depends(get_current_tenant)
):
"""Process AI completion request with full tracking."""
start_time = datetime.utcnow()
allowed, quota_status = quota_manager.check_quota(
tenant.id,
QuotaType.AI_TOKENS,
estimate_tokens(request.prompt, request.max_tokens)
)
if not allowed:
raise HTTPException(
status_code=429,
detail={
'error': 'quota_exceeded',
'current_usage': quota_status['current_usage'],
'limit': quota_status['hard_limit']
}
)
try:
response = await openai_client.complete(
prompt=request.prompt,
model=request.model,
max_tokens=request.max_tokens,
temperature=request.temperature
)
processing_time = (datetime.utcnow() - start_time).total_seconds() * 1000
await quota_manager.consume_quota(
tenant.id,
QuotaType.AI_TOKENS,
response.usage.total_tokens,
metadata={
'model': request.model,
'request_id': response.id
}
)
await metrics_collector.record_request(
tenant.id,
tenant.plan,
'/ai/complete',
200,
processing_time / 1000
)
return AIResponse(
request_id=response.id,
completion=response.choices[0].text,
usage={
'prompt_tokens': response.usage.prompt_tokens,
'completion_tokens': response.usage.completion_tokens,
'total_tokens': response.usage.total_tokens
},
model=request.model,
processing_time_ms=int(processing_time)
)
except Exception as e:
logger.error(f"AI completion failed: {str(e)}")
await metrics_collector.record_request(
tenant.id,
tenant.plan,
'/ai/complete',
500,
(datetime.utcnow() - start_time).total_seconds() * 1000
)
raise HTTPException(status_code=500, detail="AI processing failed")
```
**Complete Billing Implementation:**
```python
# billing/routes.py
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import Optional
router = APIRouter(prefix="/subscriptions", tags=["Billing"])
class SubscriptionCreate(BaseModel):
plan: str
payment_method: str
billing_email: str
class SubscriptionResponse(BaseModel):
subscription_id: str
plan: str
status: str
next_billing_date: datetime
amount_ngn: float
@router.post("/", response_model=SubscriptionResponse)
async def create_subscription(
request: SubscriptionCreate,
tenant: Tenant = Depends(get_current_tenant)
):
"""Create new subscription with payment processing."""
plan = get_plan_config(request.plan)
payment_processor = get_payment_processor(request.payment_method)
payment = await payment_processor.initiate_payment(
tenant_id=tenant.id,
amount_ngn=plan['price_ngn'],
payment_method=request.payment_method,
description=f"Subscription to {request.plan} plan"
)
subscription = Subscription(
tenant_id=tenant.id,
plan=request.plan,
status='pending_payment',
payment_reference=payment['reference'],
billing_email=request.billing_email,
created_at=datetime.utcnow()
)
db.add(subscription)
db.commit()
return SubscriptionResponse(
subscription_id=subscription.id,
plan=request.plan,
status='pending_payment',
next_billing_date=subscription.next_billing_date,
amount_ngn=plan['price_ngn']
)
@router.get("/{subscription_id}/invoices")
async def list_invoices(
subscription_id: str,
tenant: Tenant = Depends(get_current_tenant)
):
"""List all invoices for a subscription."""
invoices = db.query(Invoice).filter(
Invoice.tenant_id == tenant.id,
Invoice.subscription_id == subscription_id
).order_by(Invoice.created_at.desc()).all()
return [
{
'id': inv.id,
'invoice_number': inv.invoice_number,
'amount_ngn': inv.total,
'status': inv.status.value,
'created_at': inv.created_at,
'due_date': inv.due_date,
'payment_url': inv.payment_url
}
for inv in invoices
]
```
**Production Deployment Configuration:**
```python
# docker-compose.yml for production
version: '3.8'
services:
api:
build:
context: .
dockerfile: Dockerfile
environment:
- DATABASE_URL=postgresql://user:pass@db:5432/saas
- REDIS_URL=redis://redis:6379
- PAYSTACK_SECRET_KEY=${PAYSTACK_SECRET}
- FLUTTERWAVE_SECRET=${FLUTTERWAVE_SECRET}
- OPENAI_API_KEY=${OPENAI_KEY}
- ENVIRONMENT=production
depends_on:
- db
- redis
deploy:
replicas: 3
resources:
limits:
cpus: '1'
memory: 2G
reservations:
cpus: '0.5'
memory: 1G
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
db:
image: postgres:15
environment:
- POSTGRES_DB=saas
- POSTGRES_USER=user
- POSTGRES_PASSWORD=${DB_PASSWORD}
volumes:
- pgdata:/var/lib/postgresql/data
- ./migrations:/docker-entrypoint-initdb.d
deploy:
resources:
limits:
cpus: '2'
memory: 4G
redis:
image: redis:7-alpine
deploy:
resources:
limits:
cpus: '0.5'
memory: 1G
volumes:
pgdata:
```
**Monitoring Stack:**
```python
# monitoring/prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
alerting:
alertmanagers:
- static_configs:
- targets:
- alertmanager:9093
rule_files:
- "/etc/prometheus/rules/*.yml"
scrape_configs:
- job_name: 'ai-saas-api'
kubernetes_sd_configs:
- role: pod
relabel_configs:
- source_labels: [__meta_kubernetes_pod_name]
action: keep
regex: api-.*
metrics_path: /metrics
- job_name: 'ai-saas-services'
static_configs:
- targets:
- billing-service:9090
- ai-service:9090
- notification-service:9090
# alerting rules
groups:
- name: saas_alerts
rules:
- alert: HighErrorRate
expr: rate(http_requests_total{status=~"5.."}[5m]) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "High error rate detected"
- alert: QuotaExceeded
expr: quota_remaining < 0
for: 1m
labels:
severity: warning
annotations:
summary: "Tenant quota exceeded"
```
**MVP Testing:**
```python
# tests/test_mvp_complete.py
import pytest
from fastapi.testclient import TestClient
@pytest.fixture
def client():
from main import app
return TestClient(app)
@pytest.fixture
def authenticated_tenant():
return create_test_tenant(plan='starter')
def test_full_user_flow(client, authenticated_tenant):
"""Test complete user flow from signup to AI usage."""
response = client.post(
"/api/v1/auth/token",
data={"username": authenticated_tenant.email, "password": "test123"}
)
assert response.status_code == 200
token = response.json()["access_token"]
headers = {"Authorization": f"Bearer {token}"}
response = client.post(
"/api/v1/ai/complete",
headers=headers,
json={"prompt": "Hello, world!", "model": "gpt-3.5-turbo"}
)
assert response.status_code == 200
assert "completion" in response.json()
response = client.get(
"/api/v1/subscriptions/current",
headers=headers
)
assert response.status_code == 200
subscription = response.json()
assert subscription["plan"] == "starter"
def test_quota_enforcement(client, authenticated_tenant):
"""Test quota limiting works correctly."""
tenant = authenticated_tenant
tenant.plan = "free"
db.commit()
set_quota(tenant.id, QuotaType.AI_TOKENS, 100)
headers = {"Authorization": f"Bearer {get_token(tenant)}"}
for i in range(5):
response = client.post(
"/api/v1/ai/complete",
headers=headers,
json={"prompt": "Test prompt", "max_tokens": 100}
)
response = client.post(
"/api/v1/ai/complete",
headers=headers,
json={"prompt": "Test", "max_tokens": 50}
)
assert response.status_code == 429
assert "quota_exceeded" in response.json()["detail"]["error"]
def test_nigerian_payment_flow(client, authenticated_tenant):
"""Test payment processing with Nigerian methods."""
headers = {"Authorization": f"Bearer {get_token(authenticated_tenant)}"}
response = client.post(
"/api/v1/billing/subscriptions",
headers=headers,
json={
"plan": "professional",
"payment_method": "paystack_transfer",
"billing_email": authenticated_tenant.billing_email
}
)
assert response.status_code == 200
subscription = response.json()
assert subscription["status"] == "pending_payment"
assert subscription["amount_ngn"] == 45000
```