RUNLOCALAIv38
->Will it run?Best GPUCompareTroubleshootStartLearnPulseModelsHardwareToolsBench
Run check
RUNLOCALAI

Independently operated catalog for local-AI hardware and software. Hand-written verdicts. Source-cited claims. Reproducible commands when we have them.

OP·Fredoline Eruo
DIR
  • Models
  • Hardware
  • Tools
  • Benchmarks
TOOLS
  • Will it run?
  • Compare hardware
  • Cost vs cloud
  • Choose my GPU
  • Prompting kits
  • Quick answers
REF
  • All buyer guides
  • Learn local AI
  • Methodology
  • Glossary
  • Errors KB
  • Trust
EDITOR
  • About
  • Author
  • How we make money
  • Editorial policy
  • Contact
LEGAL
  • Privacy
  • Terms
  • Sitemap
MAIL · MONTHLY DIGEST
Get monthly local AI changes
Monthly recap. No spam.
DISCLOSURE

Some links on this site are affiliate links (Amazon Associates and other first-class retailers). When you buy through them, we earn a small commission at no extra cost to you. Affiliate links do not influence our verdicts — there are cards we rate highly that we don't have affiliate relationships with, and cards that sell well that we refuse to recommend. Read more →

© 2026 runlocalai.coIndependently operated
RUNLOCALAI · v38
  1. >
  2. Home
  3. /Learn
  4. /Courses
  5. /AI-Powered SaaS Products
COURSE · OPS · A016

AI-Powered SaaS Products

Learn ai-powered saas products through RunLocalAI's practical lens: saas, multi tenant, billing and paystack, hardware fit, runtime settings, verification habits and local-vs-cloud tradeoffs.

24 chapters·18h·Operator track·By Fredoline Eruo
PREREQUISITES
  • A009

Why this course matters

AI-Powered SaaS Products is for operators making local AI reliable, measurable and cheaper to run. It connects saas, multi tenant, billing, paystack and naira to the questions RunLocalAI wants every reader to answer before they install, upgrade or scale a model: will it run, what will it cost in memory, what setting changes the result, and how do you verify the answer instead of trusting a demo?

What you will be able to do

By the end, you should be able to explain the main tradeoffs in plain language, choose a safe next experiment, and use the chapter exercises as a repeatable operator checklist. The course favors local evidence, hardware fit, context limits, latency and failure modes over generic AI vocabulary.

How to use this course

Start at chapter one if the topic is new. If you already have a working stack, scan for chapters such as SaaS Architecture, Multi-Tenant Design, Tenant Isolation and User Management and use those lessons as a quality-control pass before changing a workstation, team workflow or production-like local deployment.

CHAPTERS
  1. 01SaaS ArchitectureSaaS architecture separates concerns into frontend clients, API services, and data stores, with authentication and billing as core cross-cutting concerns. A software-as-a-service product serving multiple customers requires a deliberate architectural approach. Unlike single-tenant applications where resources can be statically allocated, AI-powered SaaS must handle variable demand, usage-based billing, and strict data isolation across tenants. The foundational architecture consists of three layers. The presentation layer handles user interfaces—dashboards, API key management UIs, and usage visualization. These communicate with the service layer via REST or GraphQL APIs. The service layer contains the core business logic: AI model routing, token counting, rate limiting, and subscription management. The data layer stores everything from tenant records to individual API call logs. ```python # Typical SaaS request flow from fastapi import FastAPI, Request, Depends from sqlalchemy.orm import Session app = FastAPI() @app.middleware("http") async def tenant_middleware(request: Request, call_next): # Extract tenant from subdomain or header tenant_id = request.headers.get("X-Tenant-ID") if not tenant_id: tenant_id = extract_subdomain(request.url) request.state.tenant_id = tenant_id response = await call_next(request) response.headers["X-Tenant-ID"] = tenant_id return response ``` One common failure: hardcoding tenant identification logic. When tenant resolution is scattered across multiple files, debugging billing disputes becomes painful. Centralize tenant context in middleware and inject it into route handlers via dependency injection. The API gateway pattern works well here. Instead of each microservice handling authentication, a gateway layer validates API keys, identifies the tenant, and attaches tenant context to downstream requests. This reduces repeated auth logic and provides a single point for rate limiting. ```python # Gateway context propagation from contextvars import ContextVar tenant_context: ContextVar[str] = ContextVar("tenant_id") def get_current_tenant() -> str: return tenant_context.get() @app.get("/api/v1/models") async def list_models(tenant_id: str = Depends(get_tenant_from_header)): tenant_context.set(tenant_id) # All downstream calls inherit tenant context return model_service.list_available() ``` State management matters. Context variables prevent tenant leakage in async code but require discipline. Functions that don't receive tenant_id as a parameter should explicitly fetch it from context. Missing this step causes cross-tenant data access bugs that are difficult to reproduce in testing.15 min
  2. 02Multi-Tenant DesignMulti-tenant data models trade off between isolation, performance, and operational complexity—choose row-level isolation for flexibility, schema separation for compliance-heavy use cases. Multi-tenancy means multiple customers share infrastructure while believing they have dedicated resources. The degree of sharing determines the complexity. Three common patterns exist: shared database with tenant discriminator column, separate schemas per tenant, and completely separate databases. Row-level tenancy uses a `tenant_id` column on every table. This approach maximizes resource utilization—one database instance serves thousands of customers. Query performance depends on proper indexing. Without composite indexes on `(tenant_id, target_column)`, queries become sequential scans. ```python from sqlalchemy import Column, String, Integer, ForeignKey, Index from sqlalchemy.orm import declarative_base Base = declarative_base() class UsageLog(Base): __tablename__ = "usage_logs" id = Column(Integer, primary_key=True) tenant_id = Column(String(36), nullable=False, index=True) model_name = Column(String(100), nullable=False) tokens_used = Column(Integer, nullable=False) cost_kobo = Column(Integer, nullable=False) # Kobo for Naira precision created_at = Column(DateTime, default=datetime.utcnow) __table_args__ = ( Index("ix_usage_tenant_created", "tenant_id", "created_at"), ) ``` Schema-per-tenant provides stronger isolation. Each tenant gets their own schema (PostgreSQL) or database (MySQL). The application connects to the correct tenant schema at runtime. This pattern simplifies compliance with data residency requirements and makes tenant deletion straightforward—drop the schema. However, schema migrations become complex: each upgrade must run across all tenant schemas. ```python # Schema-per-tenant connection from sqlalchemy import create_engine, text TENANT_SCHEMA_PREFIX = "tenant_" def get_tenant_engine(tenant_id: str) -> Engine: # In production, connection pooling per tenant becomes expensive connection_string = f"{BASE_URL}/{TENANT_SCHEMA_PREFIX}{tenant_id}" return create_engine(connection_string) # Migrations across all tenant schemas def migrate_all_tenants(engine: Engine): with engine.connect() as conn: result = conn.execute(text("SELECT tenant_id FROM tenants")) tenant_ids = [row[0] for row in result] for tenant_id in tenant_ids: tenant_engine = get_tenant_engine(tenant_id) run_migrations(tenant_engine) ``` The hybrid approach works for many AI SaaS products: row-level isolation for development and small customers, schema separation for enterprise tiers. This adds routing complexity but handles the full customer spectrum.15 min
  3. 03Tenant IsolationTenant isolation goes beyond database separation—cache keys, file storage, external API calls, and logs must all respect tenant boundaries. Data isolation failures typically occur at system boundaries. The database query is properly filtered, but the cache returns another tenant's data, or the S3 bucket contains mixed tenant files. A defense-in-depth approach treats every data access point as a potential breach vector. Cache isolation prevents tenant data leakage in Redis or Memcached. Every cache key must include the tenant identifier. ```python import redis from functools import wraps redis_client = redis.Redis(host="localhost", port=6379, db=0) def tenant_cache(key_template: str, ttl: int = 300): """Decorator that prefixes cache keys with tenant ID.""" def decorator(func): @wraps(func) def wrapper(tenant_id: str, *args, **kwargs): cache_key = f"tenant:{tenant_id}:{key_template.format(*args, **kwargs)}" cached = redis_client.get(cache_key) if cached: return json.loads(cached) result = func(tenant_id, *args, **kwargs) redis_client.setex(cache_key, ttl, json.dumps(result)) return result return wrapper return decorator @tenant_cache("models:{model_id}", ttl=600) def get_model_config(tenant_id: str, model_id: str) -> dict: return db.query(ModelConfig).filter_by( tenant_id=tenant_id, id=model_id ).first() ``` File storage isolation matters when tenants upload files or when the system generates outputs. Object stores like S3 or MinIO require careful bucket and key design. ```python import boto3 from typing import BinaryIO class TenantFileStore: def __init__(self, bucket_name: str): self.s3 = boto3.client("s3") self.bucket = bucket_name def _validate_path(self, tenant_id: str, key: str): """Prevent path traversal attacks.""" if ".." in key or key.startswith("/"): raise ValueError("Invalid path") # Enforce tenant prefix expected_prefix = f"tenants/{tenant_id}/" if not key.startswith(expected_prefix): raise ValueError(f"Path must start with {expected_prefix}") def upload(self, tenant_id: str, key: str, file_obj: BinaryIO): self._validate_path(tenant_id, key) s3_key = f"tenants/{tenant_id}/{key}" self.s3.upload_fileobj(file_obj, self.bucket, s3_key) ``` Failure mode: omitting validation in file operations allows one tenant to overwrite another tenant's files or access sensitive data. The `_validate_path` function prevents directory traversal but a missing check anywhere in the upload/download chain creates vulnerability.15 min
  4. 04User ManagementUser management in SaaS requires three-tier hierarchy—organizations, workspaces, and individual users—with role assignments at each level. AI SaaS products serve different customer types. A freelance developer and a corporation with 500 employees both need accounts. The data model must accommodate both patterns without forcing a rigid structure. The recommended model has three entities: `Organization` (the paying customer), `Workspace` (logical grouping for projects or teams), and `User` (individual accounts that may belong to multiple organizations). ```python from sqlalchemy import Column, String, DateTime, ForeignKey, Table from sqlalchemy.orm import relationship import uuid class Organization(Base): __tablename__ = "organizations" id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) name = Column(String(255), nullable=False) subscription_tier = Column(String(50), nullable=False, default="free") created_at = Column(DateTime, default=datetime.utcnow) # Relationships workspaces = relationship("Workspace", back_populates="organization") memberships = relationship("OrganizationMembership", back_populates="organization") class Workspace(Base): __tablename__ = "workspaces" id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) organization_id = Column(String(36), ForeignKey("organizations.id"), nullable=False) name = Column(String(255), nullable=False) organization = relationship("Organization", back_populates="workspaces") api_keys = relationship("ApiKey", back_populates="workspace") class User(Base): __tablename__ = "users" id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) email = Column(String(255), unique=True, nullable=False) password_hash = Column(String(255), nullable=False) created_at = Column(DateTime, default=datetime.utcnow) memberships = relationship("OrganizationMembership", back_populates="user") class OrganizationMembership(Base): __tablename__ = "organization_memberships" id = Column(String(36), primary_key=True) user_id = Column(String(36), ForeignKey("users.id"), nullable=False) organization_id = Column(String(36), ForeignKey("organizations.id"), nullable=False) role = Column(String(50), nullable=False) # 'owner', 'admin', 'member', 'viewer' user = relationship("User", back_populates="memberships") organization = relationship("Organization", back_populates="memberships") ``` Role hierarchies need careful definition. Organization-level roles control billing, user management, and subscription changes. Workspace-level roles control API key creation, usage monitoring, and model access. A user might be an organization owner but a workspace viewer.15 min
  5. 05AuthenticationSaaS authentication requires balancing security with usability—email/password for humans, API keys for machines, and careful handling of token lifecycle across organization membership changes. Authentication in AI SaaS serves two distinct clients: humans accessing dashboards and programmatic clients using APIs. Each has different security requirements and threat models. Email/password authentication for humans should use established patterns. Bcrypt or Argon2 for password hashing, secure session management, and protection against timing attacks. ```python from passlib.context import CryptContext from itsdangerous import URLSafeTimedSerializer import secrets pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") class AuthService: def __init__(self, secret_key: str): self.serializer = URLSafeTimedSerializer(secret_key) def hash_password(self, password: str) -> str: return pwd_context.hash(password) def verify_password(self, plain_password: str, hashed: str) -> bool: return pwd_context.verify(plain_password, hashed) def create_session_token(self, user_id: str, tenant_id: str) -> str: """Create a signed session token with embedded claims.""" payload = { "sub": user_id, "tenant_id": tenant_id, "jti": secrets.token_urlsafe(16), # Token ID for revocation "type": "session" } return self.serializer.dumps(payload) def validate_session(self, token: str, max_age: int = 86400) -> dict: """Validate and decode session token.""" try: payload = self.serializer.loads(token, max_age=max_age) if payload.get("type") != "session": raise ValueError("Invalid token type") return payload except Exception as e: raise ValueError(f"Token validation failed: {e}") ``` API key authentication for machine clients needs special handling. API keys must be stored as hashed values with the plaintext returned only once during creation. ```python import hashlib import secrets class ApiKeyService: def __init__(self, db: Session): self.db = db def generate_api_key(self, workspace_id: str, name: str) -> tuple[str, str]: """Generate a new API key. Returns (key_id, secret_hash). The plaintext key is returned ONLY here—store the hash. """ key_id = f"sk_{secrets.token_urlsafe(8)}" secret = secrets.token_urlsafe(32) secret_hash = hashlib.sha256(secret.encode()).hexdigest() api_key = ApiKey( id=key_id, workspace_id=workspace_id, name=name, key_hash=secret_hash, created_at=datetime.utcnow() ) self.db.add(api_key) self.db.commit() # Return the full key for the user to copy return f"{key_id}_{secret}", secret_hash def validate_api_key(self, full_key: str) -> ApiKey: """Validate an API key and return the associated record.""" if "_" not in full_key: raise ValueError("Invalid API key format") key_id, secret = full_key.split("_", 1) secret_hash = hashlib.sha256(secret.encode()).hexdigest() api_key = self.db.query(ApiKey).filter_by( id=key_id, key_hash=secret_hash, is_active=True ).first() if not api_key: raise ValueError("Invalid or inactive API key") return api_key ``` A common failure: not invalidating sessions when a user's organization membership changes. If an employee is removed from an organization, their session tokens must be revoked. Track token IDs in a blacklist or use short-lived tokens with refresh mechanisms.20 min
  6. 06AuthorizationAuthorization in multi-tenant SaaS requires checking tenant membership AND permission scope at every data access point—missing checks at either level creates security vulnerabilities. Authorization answers "what can this user do?" Authentication answers "who is this user?" Both are necessary, neither is sufficient. A common mistake is authenticating users but skipping authorization checks, allowing users to access data outside their permissions. Role-Based Access Control (RBAC) defines permissions by role. Permissions are granular actions: `billing:read`, `api_keys:create`, `users:invite`. Roles are collections of permissions. ```python from enum import Enum from functools import wraps class Permission(Enum): BILLING_READ = "billing:read" BILLING_MANAGE = "billing:manage" API_KEYS_CREATE = "api_keys:create" API_KEYS_READ = "api_keys:read" API_KEYS_DELETE = "api_keys:delete" USERS_INVITE = "users:invite" USERS_MANAGE = "users:manage" MODELS_READ = "models:read" ROLE_PERMISSIONS = { "owner": [p for p in Permission], # All permissions "admin": [ Permission.BILLING_READ, Permission.API_KEYS_CREATE, Permission.API_KEYS_READ, Permission.API_KEYS_DELETE, Permission.USERS_INVITE, Permission.USERS_MANAGE, Permission.MODELS_READ, ], "member": [ Permission.API_KEYS_CREATE, Permission.API_KEYS_READ, Permission.MODELS_READ, ], "viewer": [ Permission.API_KEYS_READ, Permission.MODELS_READ, ], } def require_permission(permission: Permission): """Decorator that checks permission before allowing access.""" def decorator(func): @wraps(func) async def wrapper(tenant_id: str, user_id: str, *args, **kwargs): # First: verify user belongs to this tenant membership = db.query(OrganizationMembership).filter_by( user_id=user_id, organization_id=tenant_id ).first() if not membership: raise PermissionError("User does not belong to this organization") # Second: check permission user_permissions = ROLE_PERMISSIONS.get(membership.role, []) if permission not in user_permissions: raise PermissionError(f"Missing permission: {permission.value}") return await func(tenant_id, user_id, *args, **kwargs) return wrapper return decorator ``` Using the decorator: ```python @require_permission(Permission.API_KEYS_CREATE) async def create_api_key(tenant_id: str, user_id: str, name: str): # Implementation only runs if authorization passes return api_key_service.generate_api_key(tenant_id, name) ``` The two-step check—membership verification then permission check—prevents both cross-tenant access and unauthorized actions within a tenant. Skipping the membership check allows users to manipulate resources by guessing IDs. Skipping the permission check allows users to perform admin actions.15 min
  7. 07Usage MeteringUsage metering tracks resource consumption at the API call level, enabling accurate billing and preventing abuse before it impacts margins. AI API costs are measured in tokens. Every request to an AI model consumes tokens, which translates directly to costs from model providers plus your margin. Without accurate metering, billing becomes guesswork and margin erosion goes unnoticed. The metering system records each API call with sufficient granularity to reconstruct billing reports and detect anomalies. ```python from sqlalchemy import Column, String, Integer, DateTime, Text from datetime import datetime class UsageRecord(Base): __tablename__ = "usage_records" id = Column(String(36), primary_key=True) tenant_id = Column(String(36), nullable=False, index=True) workspace_id = Column(String(36), nullable=False, index=True) api_key_id = Column(String(36), nullable=False, index=True) # Request details model_name = Column(String(100), nullable=False) request_tokens = Column(Integer, nullable=False, default=0) response_tokens = Column(Integer, nullable=False, default=0) total_tokens = Column(Integer, nullable=False) # Cost tracking (in kobo for Naira) cost_kobo = Column(Integer, nullable=False) # Timestamps created_at = Column(DateTime, default=datetime.utcnow, index=True) # Request hash for deduplication request_hash = Column(String(64), nullable=False) class UsageMeter: def __init__(self, db: Session): self.db = db def record_usage( self, tenant_id: str, workspace_id: str, api_key_id: str, model_name: str, request_tokens: int, response_tokens: int, cost_kobo: int, request_hash: str ) -> UsageRecord: """Record usage for a single API call.""" # Check for duplicate (idempotency) existing = self.db.query(UsageRecord).filter_by( request_hash=request_hash ).first() if existing: return existing record = UsageRecord( id=str(uuid.uuid4()), tenant_id=tenant_id, workspace_id=workspace_id, api_key_id=api_key_id, model_name=model_name, request_tokens=request_tokens, response_tokens=response_tokens, total_tokens=request_tokens + response_tokens, cost_kobo=cost_kobo, request_hash=request_hash ) self.db.add(record) self.db.commit() return record ``` Deduplication matters. Network failures cause clients to retry requests. Without deduplication via request hashing, retry attempts get billed multiple times. The `request_hash` should be a hash of the unique request identifier sent by the client, or a hash of (timestamp + model + truncated prompt) for client-generated requests. ```python import hashlib import json def generate_request_hash( api_key_id: str, model_name: str, prompt: str, timestamp: datetime ) -> str: """Generate deterministic hash for deduplication.""" # Use truncated prompt to limit hash input size payload = { "key": api_key_id, "model": model_name, "prompt": prompt[:500], # First 500 chars "ts": timestamp.isoformat() } return hashlib.sha256(json.dumps(payload, sort_keys=True).encode()).hexdigest() ```15 min
  8. 08Token TrackingToken tracking extends metering to include model-specific costs, enabling tiered pricing and accurate margin calculation across different AI providers. Different AI models have different costs. GPT-4 tokens cost more than GPT-3.5 tokens. Claude tokens cost differently than OpenAI tokens. Token tracking captures this granularity so pricing can be model-aware. Token counting happens at two points: input tokens (from the prompt) and output tokens (from the response). Most AI providers return token counts in the response metadata. ```python from dataclasses import dataclass from enum import Enum class ModelTier(Enum): STANDARD = "standard" PREMIUM = "premium" ENTERPRISE = "enterprise" @dataclass class ModelPricing: name: str provider: str input_cost_per_1k_kobo: int # Kobo cost per 1,000 input tokens output_cost_per_1k_kobo: int # Kobo cost per 1,000 output tokens tier: ModelTier # Current pricing (simplified—verify against actual provider pricing) MODEL_CATALOG = { "gpt-4o": ModelPricing( name="gpt-4o", provider="openai", input_cost_per_1k_kobo=350, # ₦3.50 per 1K input output_cost_per_1k_kobo=1050, # ₦10.50 per 1K output tier=ModelTier.PREMIUM ), "gpt-4o-mini": ModelPricing( name="gpt-4o-mini", provider="openai", input_cost_per_1k_kobo=22, # ₦0.22 per 1K input output_cost_per_1k_kobo=88, # ₦0.88 per 1K output tier=ModelTier.STANDARD ), "gpt-3.5-turbo": ModelPricing( name="gpt-3.5-turbo", provider="openai", input_cost_per_1k_kobo=11, # ₦0.11 per 1K input output_cost_per_1k_kobo=33, # ₦0.33 per 1K output tier=ModelTier.STANDARD ), } class TokenTracker: def calculate_cost( self, model_name: str, input_tokens: int, output_tokens: int ) -> int: """Calculate cost in kobo for a single request.""" pricing = MODEL_CATALOG.get(model_name) if not pricing: raise ValueError(f"Unknown model: {model_name}") # Calculate input cost input_cost = (input_tokens / 1000) * pricing.input_cost_per_1k_kobo output_cost = (output_tokens / 1000) * pricing.output_cost_per_1k_kobo # Round to nearest kobo return round(input_cost + output_cost) def track_request( self, db: Session, api_key: ApiKey, model_name: str, input_tokens: int, output_tokens: int, response_id: str ) -> UsageRecord: """Track a completed request and record costs.""" cost_kobo = self.calculate_cost(model_name, input_tokens, output_tokens) # Update running totals api_key.total_usage_kobo += cost_kobo api_key.total_tokens += input_tokens + output_tokens return self.meter.record_usage( tenant_id=api_key.workspace.organization_id, workspace_id=api_key.workspace_id, api_key_id=api_key.id, model_name=model_name, request_tokens=input_tokens, response_tokens=output_tokens, cost_kobo=cost_kobo, request_hash=response_id # Use provider response ID ) ``` A critical failure mode: provider API changes that affect token counting. If OpenAI changes their tokenizer or pricing, hardcoded values become stale. Build a pricing configuration table that can be updated without code deployment.15 min
  9. 09API Key ManagementAPI keys are the primary authentication mechanism for programmatic access—treat them like passwords, implement rotation, and provide fine-grained access controls. API key management in multi-tenant SaaS requires balancing security with developer experience. Keys should be easy to create and rotate but difficult to compromise and easy to revoke. Key scopes limit what an API key can access. A key for development might only access test models; a production key might access all models but have IP restrictions. ```python from sqlalchemy import Column, String, Integer, Boolean, DateTime, JSON from datetime import datetime, timedelta class ApiKey(Base): __tablename__ = "api_keys" id = Column(String(50), primary_key=True) workspace_id = Column(String(36), ForeignKey("workspaces.id"), nullable=False) name = Column(String(255), nullable=False) key_hash = Column(String(64), nullable=False) # Scoping allowed_models = Column(JSON, nullable=True) # None = all models allowed_ips = Column(JSON, nullable=True) # None = any IP max_requests_per_day = Column(Integer, nullable=True) # Status is_active = Column(Boolean, default=True) expires_at = Column(DateTime, nullable=True) last_used_at = Column(DateTime, nullable=True) # Rate tracking total_usage_kobo = Column(Integer, default=0) total_tokens = Column(Integer, default=0) workspace = relationship("Workspace", back_populates="api_keys") class ApiKeyManager: def __init__(self, db: Session): self.db = db self.token_tracker = TokenTracker() def create_key( self, workspace_id: str, name: str, allowed_models: list[str] | None = None, allowed_ips: list[str] | None = None, expires_in_days: int | None = 365 ) -> tuple[str, ApiKey]: """Create a new API key. Returns (full_key, database_record). The full_key is shown ONLY once—after this, only the hash exists. """ key_id = f"sk_{secrets.token_urlsafe(12)}" secret = secrets.token_urlsafe(32) key_hash = hashlib.sha256(secret.encode()).hexdigest() expires_at = None if expires_in_days: expires_at = datetime.utcnow() + timedelta(days=expires_in_days) api_key = ApiKey( id=key_id, workspace_id=workspace_id, name=name, key_hash=key_hash, allowed_models=allowed_models, allowed_ips=allowed_ips, expires_at=expires_at ) self.db.add(api_key) self.db.commit() # Return full key for user to copy return f"{key_id}_{secret}", api_key def validate_key( self, full_key: str, client_ip: str | None = None ) -> ApiKey: """Validate API key and check scope restrictions.""" key_id, secret = full_key.split("_", 1) secret_hash = hashlib.sha256(secret.encode()).hexdigest() api_key = self.db.query(ApiKey).filter_by( id=key_id, key_hash=secret_hash ).first() if not api_key or not api_key.is_active: raise ValueError("Invalid or inactive API key") if api_key.expires_at and api_key.expires_at < datetime.utcnow(): raise ValueError("API key has expired") if api_key.allowed_ips and client_ip not in api_key.allowed_ips: raise ValueError(f"IP {client_ip} is not allowed for this key") # Update last used timestamp api_key.last_used_at = datetime.utcnow() self.db.commit() return api_key ``` A failure scenario: keys created for testing in development environments getting used in production. If allowed_models isn't set, any model is accessible. Consider requiring explicit model whitelisting for production workspaces.15 min
  10. 10Subscription BillingSubscription billing requires mapping customer tiers to entitlements, tracking plan changes mid-cycle, and calculating prorated charges accurately. Billing transforms usage data into invoices. A well-designed billing system handles recurring subscriptions, usage-based charges, plan changes, and credits. Subscription tiers define what customers get access to. Each tier has a base price, included usage, and overage rates. ```python from decimal import Decimal from enum import Enum class SubscriptionTier(Enum): FREE = "free" STARTER = "starter" PROFESSIONAL = "professional" ENTERPRISE = "enterprise" @dataclass class TierConfig: name: str monthly_price_kobo: int # Monthly subscription in kobo included_tokens: int # Tokens included in base price overage_rate_per_1k_kobo: int # Rate for tokens beyond included max_api_keys: int allowed_models: list[str] support_level: str SUBSCRIPTION_TIERS = { SubscriptionTier.FREE: TierConfig( name="Free", monthly_price_kobo=0, included_tokens=10000, # 10K tokens free overage_rate_per_1k_kobo=50, # ₦0.50 per 1K overage max_api_keys=2, allowed_models=["gpt-3.5-turbo"], support_level="community" ), SubscriptionTier.STARTER: TierConfig( name="Starter", monthly_price_kobo=150000, # ₦1,500/month included_tokens=100000, # 100K tokens overage_rate_per_1k_kobo=30, max_api_keys=5, allowed_models=["gpt-3.5-turbo", "gpt-4o-mini"], support_level="email" ), SubscriptionTier.PROFESSIONAL: TierConfig( name="Professional", monthly_price_kobo=500000, # ₦5,000/month included_tokens=500000, overage_rate_per_1k_kobo=20, max_api_keys=20, allowed_models=["gpt-3.5-turbo", "gpt-4o-mini", "gpt-4o"], support_level="priority" ), } class BillingService: def __init__(self, db: Session): self.db = db def calculate_monthly_invoice( self, organization_id: str, period_start: datetime, period_end: datetime ) -> dict: """Calculate monthly invoice for an organization.""" org = self.db.query(Organization).filter_by(id=organization_id).first() tier_config = SUBSCRIPTION_TIERS.get(org.subscription_tier) # Get usage for the period usage = self.db.query(UsageRecord).filter( UsageRecord.tenant_id == organization_id, UsageRecord.created_at >= period_start, UsageRecord.created_at < period_end ).all() total_tokens = sum(u.total_tokens for u in usage) total_cost = sum(u.cost_kobo for u in usage) # Calculate base subscription cost base_price = tier_config.monthly_price_kobo # Calculate included tokens value included_value = (tier_config.included_tokens / 1000) * 20 # Simplified # Calculate overage included_tokens_value = included_value if total_tokens > tier_config.included_tokens: overage_tokens = total_tokens - tier_config.included_tokens overage_cost = round((overage_tokens / 1000) * tier_config.overage_rate_per_1k_kobo) else: overage_cost = 0 return { "organization_id": organization_id, "period_start": period_start, "period_end": period_end, "tier": org.subscription_tier.value, "base_price_kobo": base_price, "usage": { "total_tokens": total_tokens, "included_tokens": tier_config.included_tokens, "overage_tokens": max(0, total_tokens - tier_config.included_tokens), "usage_cost_kobo": total_cost, }, "overage_cost_kobo": overage_cost, "total_kobo": base_price + overage_cost, "line_items": [ {"description": f"{tier_config.name} Plan", "amount_kobo": base_price}, {"description": "Usage Overage", "amount_kobo": overage_cost}, ] } ``` Proration handles mid-cycle upgrades and downgrades. When upgrading from Starter to Professional mid-month, charge the prorated difference for the remaining days.15 min
  11. 11Paystack IntegrationPaystack provides Nigerian payment infrastructure with Naira support, recurring billing via authorization codes, and webhook-driven payment confirmation. Paystack integration enables Nigerian customers to pay with local payment methods—cards, bank transfers, USSD. The integration requires handling payment initialization, webhook verification, and subscription lifecycle management. Payment flow: initialize transaction → redirect customer to Paystack → receive webhook confirmation → update subscription status. ```python import requests from typing import Optional class PaystackService: def __init__(self, secret_key: str): self.secret_key = secret_key self.base_url = "https://api.paystack.co" self.headers = { "Authorization": f"Bearer {secret_key}", "Content-Type": "application/json" } def initialize_transaction( self, email: str, amount_kobo: int, reference: str, callback_url: str, metadata: dict ) -> dict: """Initialize a Paystack payment.""" payload = { "email": email, "amount": amount_kobo, "reference": reference, "callback_url": callback_url, "metadata": metadata } response = requests.post( f"{self.base_url}/transaction/initialize", json=payload, headers=self.headers ) if response.status_code != 200: raise PaystackError(f"Initialization failed: {response.text}") data = response.json() if not data["status"]: raise PaystackError(f"Paystack error: {data['message']}") return data["data"] def verify_transaction(self, reference: str) -> dict: """Verify a transaction by reference.""" response = requests.get( f"{self.base_url}/transaction/verify/{reference}", headers=self.headers ) if response.status_code != 200: raise PaystackError(f"Verification failed: {response.text}") data = response.json() if not data["status"]: raise PaystackError(f"Verification failed: {data['message']}") return data["data"] def charge_authorization( self, authorization_code: str, email: str, amount_kobo: int, reference: str, metadata: dict ) -> dict: """Charge a previously authorized payment method (recurring).""" payload = { "authorization_code": authorization_code, "email": email, "amount": amount_kobo, "reference": reference, "metadata": metadata } response = requests.post( f"{self.base_url}/transaction/charge_authorization", json=payload, headers=self.headers ) data = response.json() return data["data"] if data["status"] else {"status": "failed", "message": data.get("message")} class PaystackWebhookHandler: def __init__(self, db: Session, paystack: PaystackService): self.db = db self.paystack = paystack def handle_event(self, event: dict) -> None: """Process Paystack webhook events.""" event_type = event.get("event") payload = event.get("data", {}) # Verify payload signature in production # if not self._verify_signature(payload, headers): # raise ValueError("Invalid webhook signature") if event_type == "charge.success": self._handle_successful_charge(payload) elif event_type == "subscription.create": self._handle_subscription_created(payload) elif event_type == "subscription.disable": self._handle_subscription_disabled(payload) elif event_type == "subscription.not_renew": self._handle_subscription_expiring(payload) def _handle_successful_charge(self, data: dict) -> None: """Handle successful payment.""" reference = data["reference"] amount = data["amount"] # Already in kobo metadata = data.get("metadata", {}) organization_id = metadata.get("organization_id") description = metadata.get("description", "Payment") # Record payment payment = Payment( id=str(uuid.uuid4()), organization_id=organization_id, amount_kobo=amount, reference=reference, status="completed", description=description, paid_at=datetime.fromisoformat(data["paid_at"]) ) self.db.add(payment) self._update_subscription_if_needed(organization_id, amount) self.db.commit() def _update_subscription_if_needed(self, organization_id: str, amount: int) -> None: """Extend subscription period based on payment amount.""" # Implementation depends on pricing structure pass ``` Webhook security is critical. Paystack webhooks can be replayed or spoofed. Always verify the signature and check for duplicate event processing. ```python # Webhook signature verification import hmac import hashlib class PaystackWebhookVerifier: def __init__(self, secret_key: str): self.secret_key = secret_key def verify(self, payload: bytes, headers: dict) -> bool: """Verify Paystack webhook signature.""" signature = headers.get("x-paystack-signature") if not signature: return False expected = hmac.new( self.secret_key.encode(), payload, hashlib.sha512 ).hexdigest() return hmac.compare_digest(signature, expected) ```20 min
  12. 12Naira Pricing TiersNigerian market pricing requires Naira-denominated tiers, awareness of payment method availability, and pricing that reflects local purchasing power. Pricing for the Nigerian market involves different considerations than USD pricing. Naira pricing tiers should align with local payment capabilities, competitor positioning, and sustainable unit economics. Pricing should be straightforward and avoid currency confusion. ₦1,500/month is more comprehensible than $2.00/month for most Nigerian customers. ```python @dataclass class NairaPricingTier: id: str name: str price_naira: int included_tokens: int features: list[str] priority_support: bool api_key_limit: int NAIRA_TIERS = [ NairaPricingTier( id="free", name="Free", price_naira=0, included_tokens=10_000, features=[ "Access to GPT-3.5 Turbo", "Up to 2 API keys", "Community support", "Basic analytics" ], priority_support=False, api_key_limit=2 ), NairaPricingTier( id="starter", name="Starter", price_naira=1_500, included_tokens=100_000, features=[ "Access to GPT-3.5 Turbo, GPT-4o Mini", "Up to 5 API keys", "Email support", "Standard analytics", "Webhooks" ], priority_support=False, api_key_limit=5 ), NairaPricingTier( id="professional", name="Professional", price_naira=5_000, included_tokens=500_000, features=[ "Access to all models", "Up to 20 API keys", "Priority support", "Advanced analytics", "Webhooks", "Team seats (up to 5)" ], priority_support=True, api_key_limit=20 ), NairaPricingTier( id="enterprise", name="Enterprise", price_naira=25_000, included_tokens=2_000_000, features=[ "Everything in Professional", "Unlimited API keys", "Dedicated support", "Custom integrations", "SLA guarantee", "Unlimited team seats", "Custom models" ], priority_support=True, api_key_limit=-1 # Unlimited ), ] def format_naira(amount: int) -> str: """Format kobo as Naira string.""" naira = amount / 100 return f"₦{naira:,.2f}" ``` Pricing communication matters. Display prices as "₦1,500/month" not "₦150,000/kobo/month." Internal calculations use kobo for integer precision, but user-facing displays use Naira. ```python class PricingDisplay: def format_tier_card(self, tier: NairaPricingTier) -> dict: """Format tier for display in pricing page.""" return { "id": tier.id, "name": tier.name, "price_display": f"₦{tier.price_naira:,}" if tier.price_naira > 0 else "Free", "period": "/month", "included_tokens_display": self._format_tokens(tier.included_tokens), "features": tier.features, "cta": "Get Started" if tier.id != "enterprise" else "Contact Sales" } def _format_tokens(self, count: int) -> str: """Format token count for display.""" if count >= 1_000_000: return f"{count // 1_000_000}M" elif count >= 1_000: return f"{count // 1_000}K" return str(count) ``` Overage pricing needs careful consideration. Nigerian customers may be surprised by bills that exceed their subscription. Consider flat-rate overage with a notification threshold.15 min
  13. 13Invoice GenerationNigerian SaaS requires Naira-native invoicing with Paystack integration, where invoice generation must handle timezone differences, Nigerian public holidays, and recurring billing edge cases that break Western-built systems. Invoice generation in a Nigerian AI SaaS environment extends beyond simple PDF creation. The system must account for NGN currency formatting, tax compliance with FIRS requirements, and integration with local payment processors that have different settlement timelines than Stripe or PayPal. ```python from datetime import datetime, timedelta from decimal import Decimal from enum import Enum from typing import Optional import jinja2 from weasyprint import HTML class InvoiceStatus(Enum): DRAFT = "draft" PENDING = "pending" PAID = "paid" OVERDUE = "overdue" CANCELLED = "cancelled" class InvoiceGenerator: """Handles invoice generation with Nigerian market specifics.""" def __init__(self, db_session, paystack_client, template_dir: str): self.db = db_session self.paystack = paystack_client self.env = jinja2.Environment( loader=jinja2.FileSystemLoader(template_dir) ) self.nigerian_holidays = self._load_holidays() def generate_invoice( self, tenant_id: str, billing_period_start: datetime, billing_period_end: datetime, line_items: list[dict] ) -> str: """Generate invoice for tenant with NGN formatting.""" tenant = self.db.query(Tenant).filter( Tenant.id == tenant_id ).first() if not tenant: raise ValueError(f"Tenant {tenant_id} not found") subtotal = sum(Decimal(str(item['amount'])) for item in line_items) vat_rate = Decimal('0.075') # 7.5% VAT for professional services vat_amount = subtotal * vat_rate total = subtotal + vat_amount invoice = Invoice( tenant_id=tenant_id, invoice_number=self._generate_invoice_number(tenant), billing_period_start=billing_period_start, billing_period_end=billing_period_end, subtotal=subtotal, vat_amount=vat_amount, total=total, currency='NGN', status=InvoiceStatus.DRAFT, due_date=datetime.utcnow() + timedelta(days=30) ) self.db.add(invoice) self.db.flush() for item in line_items: invoice_line = InvoiceLine( invoice_id=invoice.id, description=item['description'], quantity=item.get('quantity', 1), unit_price=Decimal(str(item['unit_price'])), total=Decimal(str(item['amount'])) ) self.db.add(invoice_line) self.db.commit() pdf_path = self._render_pdf(invoice, tenant) invoice.pdf_path = pdf_path self.db.commit() return invoice.id def _generate_invoice_number(self, tenant: Tenant) -> str: """Generate sequential invoice numbers with tenant prefix.""" year = datetime.utcnow().year prefix = tenant.invoice_prefix or f"INV-{tenant.id[:4].upper()}" last_invoice = self.db.query(Invoice).filter( Invoice.tenant_id == tenant.id, Invoice.invoice_number.like(f"{prefix}-{year}%") ).order_by(Invoice.invoice_number.desc()).first() if last_invoice: sequence = int(last_invoice.invoice_number.split('-')[-1]) + 1 else: sequence = 1 return f"{prefix}-{year}-{sequence:04d}" def send_invoice(self, invoice_id: str, recipient_email: str): """Send invoice via Paystack invoice or email.""" invoice = self.db.query(Invoice).filter( Invoice.id == invoice_id ).first() if invoice.status != InvoiceStatus.DRAFT: raise ValueError(f"Cannot send invoice with status {invoice.status}") payment_link = self.paystack.create_invoice( amount=int(invoice.total * 100), # kobo currency='NGN', customer_email=recipient_email, description=f"Invoice {invoice.invoice_number}", due_date=invoice.due_date.isoformat() ) invoice.paystack_invoice_id = payment_link['id'] invoice.payment_url = payment_link['url'] invoice.status = InvoiceStatus.PENDING invoice.sent_at = datetime.utcnow() self.db.commit() self._send_email_notification(invoice, recipient_email) ``` **Common Failure Modes:** The timezone handling between Lagos and server UTC causes invoices to show wrong due dates. Nigeria operates in West Africa Time (WAT, UTC+1), so due date calculations must explicitly convert when displaying to users. ```python from zoneinfo import ZoneInfo def calculate_due_date_ngn( invoice_date: datetime, due_days: int = 30 ) -> datetime: """Calculate due date properly for Nigerian display.""" lagosp_tz = ZoneInfo("Africa/Lagos") due_date = invoice_date + timedelta(days=due_days) # Adjust for weekend due dates (shift to Monday) if due_date.weekday() == 5: # Saturday due_date += timedelta(days=2) elif due_date.weekday() == 6: # Sunday due_date += timedelta(days=1) return due_date ``` Recurring invoice generation requires careful handling of mid-cycle upgrades or downgrades. When a tenant upgrades mid-month, prorated calculations must be accurate to the day, which many billing libraries handle poorly. ```python def calculate_proration( days_in_period: int, days_used: int, monthly_price: Decimal ) -> Decimal: """Calculate prorated amount for mid-cycle changes.""" daily_rate = monthly_price / Decimal(str(days_in_period)) return daily_rate * Decimal(str(days_used)) ```20 min
  14. 14API Rate LimitingRate limiting in a Nigerian SaaS must balance fair usage across different tiers while handling the bursty traffic patterns common in Lagos and Abuja internet connections, where connection quality varies significantly. Rate limiting serves two purposes: protecting system resources and enforcing tier boundaries. For a Nigerian AI SaaS, the implementation must handle variance in client behavior while providing clear feedback when limits are approached. ```python from typing import Optional from datetime import datetime, timedelta from collections import defaultdict import time import hashlib from redis import Redis class RateLimiter: """Token bucket rate limiter with Redis backend.""" def __init__(self, redis_client: Redis): self.redis = redis_client self.default_limits = { 'free': {'requests': 100, 'window': 60}, # 100/min 'starter': {'requests': 1000, 'window': 60}, # 1000/min 'professional': {'requests': 5000, 'window': 60}, # 5000/min 'enterprise': {'requests': 20000, 'window': 60} } def check_rate_limit( self, tenant_id: str, endpoint: str, tier: str ) -> dict: """Check and update rate limit, returning limit status.""" limits = self.default_limits.get(tier, self.default_limits['free']) max_requests = limits['requests'] window_seconds = limits['window'] key = f"ratelimit:{tenant_id}:{endpoint}:{int(time.time() / window_seconds)}" current = self.redis.incr(key) if current == 1: self.redis.expire(key, window_seconds) remaining = max(0, max_requests - current) reset_time = (int(time.time() / window_seconds) + 1) * window_seconds return { 'allowed': current <= max_requests, 'limit': max_requests, 'remaining': remaining, 'reset': reset_time, 'retry_after': max(0, window_seconds - (int(time.time()) % window_seconds)) if current > max_requests else None } ``` For AI endpoints specifically, rate limiting must account for token consumption, not just request counts. An AI SaaS making 50 small requests consumes different resources than 50 large requests. ```python class AIEndpointRateLimiter: """Rate limiter for AI endpoints with token awareness.""" def __init__(self, redis_client: Redis, openai_client): self.redis = redis_client self.openai = openai_client self.limits = { 'free': {'monthly_tokens': 100000}, 'starter': {'monthly_tokens': 1000000}, 'professional': {'monthly_tokens': 5000000}, 'enterprise': {'monthly_tokens': 50000000} } def check_and_consume( self, tenant_id: str, tier: str, prompt_tokens: int, completion_tokens: int ) -> tuple[bool, dict]: """Check token budget and consume from allocation.""" total_tokens = prompt_tokens + completion_tokens monthly_limit = self.limits.get(tier, {}).get('monthly_tokens', 0) month_start = datetime.utcnow().replace(day=1, hour=0, minute=0, second=0, microsecond=0) key = f"ai_tokens:{tenant_id}:{month_start.isoformat()}" current_usage = int(self.redis.get(key) or 0) new_usage = current_usage + total_tokens if new_usage > monthly_limit: return False, { 'current_usage': current_usage, 'monthly_limit': monthly_limit, 'requested': total_tokens, 'shortage': new_usage - monthly_limit } self.redis.incrby(key, total_tokens) self.redis.expire(key, 86400 * 35) # Keep for full billing period return True, { 'current_usage': new_usage, 'monthly_limit': monthly_limit, 'remaining': monthly_limit - new_usage } ``` **Handling Burst Traffic:** Nigerian internet connections often have inconsistent latency, leading to clients retrying failed requests multiple times. This creates artificial spikes in rate limit consumption. ```python def handle_burst_gracefully( limiter: RateLimiter, tenant_id: str, endpoint: str, tier: str, request_count: int ) -> dict: """Handle burst requests with jitter-aware limiting.""" result = limiter.check_rate_limit(tenant_id, endpoint, tier) if not result['allowed']: # Apply jitter to spread retry attempts jitter = random.uniform(1.1, 1.5) base_wait = result['retry_after'] or 60 suggested_wait = int(base_wait * jitter) return { 'allowed': False, 'retry_after': min(suggested_wait, 300), # Cap at 5 minutes 'message': f"Rate limit exceeded. Retry after {suggested_wait} seconds." } return result ``` **Common Failure Modes:** Redis rate limiting fails silently when Redis is unavailable, causing all checks to pass and overwhelming the system. Always implement a fallback that denies requests when the rate limiter cannot confirm allowance. ```python def check_rate_limit_with_fallback( limiter: RateLimiter, tenant_id: str, endpoint: str, tier: str ) -> dict: """Check rate limit with circuit breaker fallback.""" try: return limiter.check_rate_limit(tenant_id, endpoint, tier) except RedisConnectionError: logger.error(f"Redis unavailable for rate limit check, tenant {tenant_id}") # Fail closed: deny request when limiter unavailable return { 'allowed': False, 'error': 'rate_limiter_unavailable', 'retry_after': 5 } ```25 min
  15. 15Quota ManagementQuota management in multi-tenant AI SaaS requires tracking both hard limits (absolute caps) and soft limits (warning thresholds), with Nigerian billing cycles requiring alignment between usage tracking and Naira payments. Quota management extends beyond simple counters. It must handle partial usage, rollover policies, and the complex logic of resetting quotas while maintaining historical accuracy for billing disputes. ```python from dataclasses import dataclass from datetime import datetime, timedelta from enum import Enum class QuotaType(Enum): API_CALLS = "api_calls" AI_TOKENS = "ai_tokens" STORAGE_GB = "storage_gb" TEAM_MEMBERS = "team_members" PROJECTS = "projects" @dataclass class QuotaLimit: """Represents a quota limit configuration.""" quota_type: QuotaType hard_limit: int soft_limit_percentage: float = 0.8 warning_enabled: bool = True class QuotaManager: """Manages tenant quotas with tracking and enforcement.""" def __init__(self, db_session, redis_client, notification_service): self.db = db_session self.redis = redis_client self.notifications = notification_service self.quota_config = self._load_quota_config() def check_quota( self, tenant_id: str, quota_type: QuotaType, requested_amount: int = 1 ) -> tuple[bool, dict]: """Check if tenant can consume quota, return status.""" tenant = self.db.query(Tenant).filter( Tenant.id == tenant_id ).first() plan = self._get_plan_limits(tenant.plan) limit = plan.get(quota_type) if not limit: return True, {'unlimited': True} key = self._get_quota_key(tenant_id, quota_type) current_usage = int(self.redis.get(key) or 0) would_exceed = current_usage + requested_amount > limit.hard_limit soft_limit = int(limit.hard_limit * limit.soft_limit_percentage) approaching_limit = current_usage >= soft_limit and current_usage < limit.hard_limit if approaching_limit and limit.warning_enabled: self._send_warning_notification(tenant_id, quota_type, current_usage, limit.hard_limit) return not would_exceed, { 'current_usage': current_usage, 'hard_limit': limit.hard_limit, 'remaining': max(0, limit.hard_limit - current_usage - requested_amount), 'approaching': approaching_limit, 'exceeded': would_exceed } def consume_quota( self, tenant_id: str, quota_type: QuotaType, amount: int = 1, metadata: dict = None ) -> bool: """Consume quota and record usage.""" allowed, status = self.check_quota(tenant_id, quota_type, amount) if not allowed: logger.warning(f"Quota exceeded for tenant {tenant_id}, type {quota_type.value}") raise QuotaExceededError( f"Quota limit reached for {quota_type.value}", current=status['current_usage'], limit=status['hard_limit'] ) key = self._get_quota_key(tenant_id, quota_type) new_usage = self.redis.incrby(key, amount) usage_record = UsageRecord( tenant_id=tenant_id, quota_type=quota_type.value, amount=amount, metadata=metadata, created_at=datetime.utcnow() ) self.db.add(usage_record) self.db.commit() return True def reset_quota( self, tenant_id: str, quota_type: QuotaType, billing_cycle_start: datetime ) -> dict: """Reset quota for new billing cycle with archive.""" key = self._get_quota_key(tenant_id, quota_type) current = int(self.redis.get(key) or 0) archive_key = f"quota_archive:{tenant_id}:{quota_type.value}:{billing_cycle_start.isoformat()}" self.redis.set(archive_key, current, ex=86400 * 90) self.redis.delete(key) return { 'archived_usage': current, 'reset_at': datetime.utcnow(), 'cycle_start': billing_cycle_start } ``` **Nigerian Billing Cycle Alignment:** Nigerian businesses often operate on monthly cycles aligned with calendar months, but some prefer to align with their fiscal year or contract start date. ```python def get_billing_cycle_dates(tenant: Tenant) -> tuple[datetime, datetime]: """Determine billing cycle start and end dates.""" if tenant.billing_anchor_day: today = datetime.utcnow() anchor = tenant.billing_anchor_day cycle_start = today.replace(day=anchor) if today.day < anchor: cycle_start = (today - timedelta(days=30)).replace(day=anchor) cycle_end = (cycle_start + timedelta(days=32)).replace(day=1) cycle_end = cycle_end - timedelta(days=1) else: cycle_start = today.replace(day=1) cycle_end = (cycle_start + timedelta(days=32)).replace(day=1) cycle_end = cycle_end - timedelta(days=1) return cycle_start, cycle_end def _get_plan_limits(self, plan: str) -> dict[QuotaType, QuotaLimit]: """Get quota limits for a plan.""" limits = { 'free': { QuotaType.API_CALLS: QuotaLimit(QuotaType.API_CALLS, 1000), QuotaType.AI_TOKENS: QuotaLimit(QuotaType.AI_TOKENS, 100000), QuotaType.STORAGE_GB: QuotaLimit(QuotaType.STORAGE_GB, 1), QuotaType.TEAM_MEMBERS: QuotaLimit(QuotaType.TEAM_MEMBERS, 3), QuotaType.PROJECTS: QuotaLimit(QuotaType.PROJECTS, 2), }, 'starter': { QuotaType.API_CALLS: QuotaLimit(QuotaType.API_CALLS, 50000), QuotaType.AI_TOKENS: QuotaLimit(QuotaType.AI_TOKENS, 1000000), QuotaType.STORAGE_GB: QuotaLimit(QuotaType.STORAGE_GB, 10), QuotaType.TEAM_MEMBERS: QuotaLimit(QuotaType.TEAM_MEMBERS, 10), QuotaType.PROJECTS: QuotaLimit(QuotaType.PROJECTS, 10), }, 'professional': { QuotaType.API_CALLS: QuotaLimit(QuotaType.API_CALLS, 500000), QuotaType.AI_TOKENS: QuotaLimit(QuotaType.AI_TOKENS, 10000000), QuotaType.STORAGE_GB: QuotaLimit(QuotaType.STORAGE_GB, 100), QuotaType.TEAM_MEMBERS: QuotaLimit(QuotaType.TEAM_MEMBERS, 50), QuotaType.PROJECTS: QuotaLimit(QuotaType.PROJECTS, 100), }, 'enterprise': {}, } return limits.get(plan, limits['free']) ``` **Common Failure Modes:** Quota consumption in high-concurrency scenarios causes race conditions where multiple requests pass the check simultaneously, exceeding the limit before any consumption is recorded. ```python def consume_quota_atomic( self, tenant_id: str, quota_type: QuotaType, amount: int = 1 ) -> tuple[bool, dict]: """Atomic quota consumption using Lua script.""" script = """ local key = KEYS[1] local limit = tonumber(ARGV[1]) local requested = tonumber(ARGV[2]) local current = tonumber(redis.call('GET', key) or '0') local new_total = current + requested if new_total > limit then return {0, current, limit, limit - current} end redis.call('INCRBY', key, requested) return {1, new_total, limit, limit - new_total} """ limit = self._get_plan_limits(tenant_id, quota_type) result = self.redis.eval( script, 1, self._get_quota_key(tenant_id, quota_type), limit, amount ) allowed = bool(result[0]) return allowed, { 'current_usage': result[1], 'hard_limit': result[2], 'remaining': result[3] } ```25 min
  16. 16Analytics DashboardMulti-tenant analytics in Nigerian SaaS must segment by tenant while providing aggregate insights for platform operations, with dashboard loading optimized for varying internet speeds in Lagos and Port Harcourt. Analytics dashboards for AI SaaS serve two audiences: tenants viewing their own usage patterns, and platform operators monitoring overall health. The architecture must efficiently serve both without creating performance bottlenecks. ```python from datetime import datetime, timedelta from typing import Optional import pandas as pd from sqlalchemy import func, case from dataclasses import dataclass @dataclass class DashboardMetric: name: str value: any unit: str change_pct: float trend: list class TenantAnalyticsService: """Analytics service for tenant-specific metrics.""" def __init__(self, db_session, cache_service): self.db = db_session self.cache = cache_service def get_dashboard_metrics( self, tenant_id: str, period: str = '30d' ) -> dict: """Get thorough dashboard metrics for tenant.""" cache_key = f"dashboard:{tenant_id}:{period}" cached = self.cache.get(cache_key) if cached: return cached date_range = self._parse_period(period) metrics = { 'api_usage': self._get_api_usage(tenant_id, date_range), 'ai_tokens': self._get_ai_token_usage(tenant_id, date_range), 'response_times': self._get_response_time_stats(tenant_id, date_range), 'error_rates': self._get_error_rates(tenant_id, date_range), 'top_endpoints': self._get_top_endpoints(tenant_id, date_range), 'usage_trend': self._get_usage_trend(tenant_id, date_range), 'quota_status': self._get_quota_status(tenant_id), 'cost_breakdown': self._get_cost_breakdown(tenant_id, date_range) } self.cache.set(cache_key, metrics, ttl=300) return metrics def _get_ai_token_usage(self, tenant_id: str, date_range: tuple) -> dict: """Get AI token usage breakdown by model.""" results = self.db.query( UsageRecord.quota_type, func.sum(UsageRecord.amount).label('total'), func.count(UsageRecord.id).label('requests') ).filter( UsageRecord.tenant_id == tenant_id, UsageRecord.created_at >= date_range[0], UsageRecord.created_at <= date_range[1] ).group_by( UsageRecord.quota_type ).all() by_model = {} total_tokens = 0 for row in results: model = row.quota_type.replace('ai_tokens_', '') by_model[model] = { 'tokens': row.total, 'requests': row.requests, 'avg_tokens_per_request': row.total / row.requests if row.requests > 0 else 0 } total_tokens += row.total return { 'total': total_tokens, 'by_model': by_model, 'period': { 'start': date_range[0].isoformat(), 'end': date_range[1].isoformat() } } def _get_response_time_stats(self, tenant_id: str, date_range: tuple) -> dict: """Get response time statistics.""" results = self.db.query( func.avg(APIRequest.duration_ms).label('avg'), func.percentile_cont(0.5).within( APIRequest.duration_ms ).label('p50'), func.percentile_cont(0.95).within( APIRequest.duration_ms ).label('p95'), func.percentile_cont(0.99).within( APIRequest.duration_ms ).label('p99') ).filter( APIRequest.tenant_id == tenant_id, APIRequest.created_at >= date_range[0], APIRequest.created_at <= date_range[1] ).first() return { 'avg_ms': round(float(results.avg), 2) if results.avg else 0, 'p50_ms': round(float(results.p50), 2) if results.p50 else 0, 'p95_ms': round(float(results.p95), 2) if results.p95 else 0, 'p99_ms': round(float(results.p99), 2) if results.p99 else 0 } def _get_cost_breakdown(self, tenant_id: str, date_range: tuple) -> dict: """Calculate cost breakdown in NGN.""" ai_costs = self._calculate_ai_costs(tenant_id, date_range) api_costs = self._calculate_api_costs(tenant_id, date_range) total_ngn = ai_costs['total'] + api_costs['total'] return { 'total_ngn': total_ngn, 'breakdown': { 'ai_tokens': ai_costs, 'api_calls': api_costs }, 'currency': 'NGN' } ``` **Optimizing for Nigerian Network Conditions:** Dashboard loading must account for variable internet speeds. Large datasets should be paginated and visualizations should use progressive loading to avoid timeout issues. ```python class DashboardDataService: """Service for optimized dashboard data retrieval.""" def __init__(self, db_session, analytics_db): self.db = db_session self.analytics = analytics_db def get_trend_data( self, tenant_id: str, metric: str, period: str = '30d', granularity: str = 'daily' ) -> list: """Get time-series trend data with appropriate granularity.""" granularity_map = { 'hourly': 'YYYY-MM-DD HH24:00', 'daily': 'YYYY-MM-DD', 'weekly': 'IYYY-IW', 'monthly': 'YYYY-MM' } date_format = granularity_map.get(granularity, 'YYYY-MM-DD') if self.analytics == 'clickhouse': query = f""" SELECT formatDateTime(created_at, '{date_format}') as period, sum(amount) as total, count() as count FROM usage_records WHERE tenant_id = %(tenant_id)s AND created_at >= %(start_date)s AND created_at <= %(end_date)s AND quota_type = %(metric)s GROUP BY period ORDER BY period """ else: query = """ SELECT to_char(created_at, :date_format) as period, sum(amount) as total, count(*) as count FROM usage_records WHERE tenant_id = :tenant_id AND created_at >= :start_date AND created_at <= :end_date AND quota_type = :metric GROUP BY period ORDER BY period """ return self.db.execute(query, { 'tenant_id': tenant_id, 'metric': metric, 'date_format': date_format, 'start_date': datetime.utcnow() - timedelta(days=30), 'end_date': datetime.utcnow() }).fetchall() ``` **Platform-Wide Analytics:** ```python class PlatformAnalyticsService: """Analytics for platform operators.""" def __init__(self, db_session): self.db = db_session def get_platform_summary(self, period: str = '7d') -> dict: """Get platform-wide analytics summary.""" date_range = self._parse_period(period) mrr = self._calculate_mrr() active_tenants = self._count_active_tenants(date_range) usage_growth = self._calculate_usage_growth(date_range) top_tenants = self._get_top_tenants_by_usage(date_range, limit=20) return { 'mrr_ngn': mrr, 'active_tenants': active_tenants, 'usage_growth_pct': usage_growth, 'top_tenants': top_tenants, 'period': period } def _calculate_mrr(self) -> dict: """Calculate Monthly Recurring Revenue.""" subscriptions = self.db.query(Subscription).filter( Subscription.status == 'active' ).all() total = sum(s.monthly_amount for s in subscriptions) breakdown = self.db.query( Tenant.plan, func.count(Tenant.id).label('count'), func.sum(Subscription.monthly_amount).label('revenue') ).join( Subscription, Subscription.tenant_id == Tenant.id ).filter( Subscription.status == 'active' ).group_by( Tenant.plan ).all() return { 'total': float(total), 'currency': 'NGN', 'by_plan': [{'plan': r.plan, 'count': r.count, 'revenue': float(r.revenue)} for r in breakdown] } ``` **Common Failure Modes:** Dashboard queries against large datasets without proper indexes cause timeouts. Always create composite indexes on (tenant_id, created_at) for usage tables and implement query result caching with invalidation. ```python # Migrations for analytics performance def upgrade_analytics(): """Add indexes for dashboard performance.""" migrations = [ ("CREATE INDEX CONCURRENTLY idx_usage_tenant_date ON usage_records(tenant_id, created_at)", "Index for tenant time-series queries"), ("CREATE INDEX CONCURRENTLY idx_requests_tenant_duration ON api_requests(tenant_id, duration_ms)", "Index for response time analytics"), ("CREATE INDEX CONCURRENTLY idx_subscriptions_active ON subscriptions(tenant_id) WHERE status = 'active'", "Partial index for active subscriptions"), ] for sql, description in migrations: execute_migration(sql, description) ```25 min
  17. 17DevOps AutomationDevOps automation for Nigerian SaaS must account for limited bandwidth between Lagos and international cloud regions, with deployment pipelines optimized to minimize data transfer while maintaining security standards. DevOps automation in a Nigerian SaaS context requires careful consideration of infrastructure costs, local compliance requirements, and the operational realities of managing systems from Lagos. ```python from dataclasses import dataclass from typing import Optional import subprocess import json @dataclass class DeploymentConfig: environment: str region: str instance_type: str min_instances: int max_instances: int autoscaling_threshold: float class DeploymentAutomation: """Automated deployment system for SaaS infrastructure.""" def __init__(self, config_service, secrets_manager, notification_service): self.config = config_service self.secrets = secrets_manager self.notifications = notification_service self.environments = { 'staging': DeploymentConfig( environment='staging', region='eu-west-1', instance_type='t3.medium', min_instances=1, max_instances=2, autoscaling_threshold=0.7 ), 'production': DeploymentConfig( environment='production', region='eu-west-1', instance_type='t3.large', min_instances=2, max_instances=10, autoscaling_threshold=0.6 ) } def deploy( self, environment: str, version: str, skip_tests: bool = False ) -> dict: """Execute deployment with validation and rollback.""" config = self.environments.get(environment) if not config: raise ValueError(f"Unknown environment: {environment}") deployment_id = self._initialize_deployment(environment, version) try: if not skip_tests: test_result = self._run_tests(version) if not test_result['passed']: self._fail_deployment(deployment_id, "Tests failed") return {'status': 'failed', 'reason': 'tests_failed'} build_result = self._build_docker_image(version) self._deploy_to_environment( config, build_result['image_tag'], deployment_id ) self._run_smoke_tests(environment) self._finalize_deployment(deployment_id) return { 'status': 'success', 'deployment_id': deployment_id, 'version': version, 'environment': environment } except Exception as e: self._rollback(deployment_id, config) return {'status': 'failed', 'reason': str(e)} def _build_docker_image(self, version: str) -> dict: """Build Docker image with caching for bandwidth optimization.""" dockerfile = """ FROM python:3.11-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . RUN python -m compileall -q . EXPOSE 8000 CMD ["gunicorn", "-c", "gunicorn.conf.py", "app:app"] """ build_cmd = [ 'docker', 'build', '--cache-from', f'registry.example.com/saas:{version}', '--tag', f'registry.example.com/saas:{version}', '--tag', f'registry.example.com/saas:latest', '--build-arg', f'BUILD_VERSION={version}' ] result = subprocess.run( build_cmd, capture_output=True, text=True ) if result.returncode != 0: raise BuildError(f"Docker build failed: {result.stderr}") return {'image_tag': f'registry.example.com/saas:{version}'} ``` **Infrastructure as Code:** ```python import boto3 from typing import dict class InfrastructureManager: """Manages AWS infrastructure with Nigerian market optimizations.""" def __init__(self, aws_client): self.ec2 = boto3.client('ec2') self.ecs = boto3.client('ecs') self.rds = boto3.client('rds') def setup_production_infrastructure(self) -> dict: """Create production infrastructure with proper configuration.""" vpc = self._create_vpc() private_subnets = self._create_subnets( vpc['VpcId'], availability_zones=['eu-west-1a', 'eu-west-1b'], public=False ) nat_gateways = self._setup_nat_gateways(vpc, private_subnets) ecs_cluster = self._create_ecs_cluster( 'saas-production', instance_type='t3.large', min_capacity=2, max_capacity=10 ) rds_instance = self._create_database( instance_class='db.t3.medium', allocated_storage=100, multi_az=True ) elasticache = self._create_redis_cluster( node_type='cache.t3.medium', num_nodes=2 ) load_balancer = self._create_alb( vpc['VpcId'], subnets=private_subnets ) return { 'vpc': vpc, 'ecs_cluster': ecs_cluster, 'rds': rds_instance, 'elasticache': elasticache, 'load_balancer': load_balancer } def _create_database(self, instance_class: str, allocated_storage: int, multi_az: bool) -> dict: """Create RDS database with Nigerian compliance considerations.""" db_instance = self.rds.create_db_instance( DBInstanceIdentifier='saas-production-db', DBInstanceClass=instance_class, Engine='postgres', EngineVersion='15.3', AllocatedStorage=allocated_storage, MultiAZ=multi_az, StorageEncrypted=True, KmsKeyId='arn:aws:kms:eu-west-1:123456789:key/production-db', BackupRetentionPeriod=30, PreferredBackupWindow='03:00-04:00', PreferredMaintenanceWindow='mon:04:00-mon:05:00', PublicAccessibility=False, VPCSecurityGroupIds=[self._get_db_security_group()], DBParameterGroupName='saas-production-params', Tags=[ {'Key': 'Environment', 'Value': 'production'}, {'Key': 'Compliance', 'Value': 'soc2'} ] ) return db_instance ``` **Cost Optimization for Nigerian Operations:** ```python class CostOptimizationScheduler: """Schedule non-production resources to reduce costs.""" def __init__(self, ecs_client, ec2_client): self.ecs = ecs_client self.ec2 = ec2_client def schedule_staging_shutdown(self): """Shut down staging environment during off-hours (Nigerian business hours).""" lagos_tz = pytz.timezone('Africa/Lagos') now = datetime.now(lagos_tz) business_hours = (8, 18) # 8 AM to 6 PM Lagos time weekdays = (0, 1, 2, 3, 4) # Monday to Friday should_run = ( now.weekday() in weekdays and business_hours[0] <= now.hour < business_hours[1] ) if should_run: self._scale_up_staging() else: self._scale_down_staging() def _scale_down_staging(self): """Scale staging to minimum for cost savings.""" self.ecs.update_service( cluster='saas-staging', service='saas-staging-api', desiredCount=1 ) self.ec2.modify_instance_attribute( InstanceId='staging-instance-id', InstanceInitiatedShutdownBehavior='terminate' ) ``` **Common Failure Modes:** Deployment automation that doesn't account for Lagos bandwidth limitations fails during large image pulls. Always implement local registry caching and use multi-stage builds to minimize image sizes. ```python def optimize_for_bandwidth(): """Configure Docker for low-bandwidth environments.""" config = { 'registry-mirrors': ['https://registry.example.com'], 'max-concurrent-downloads': 3, 'max-concurrent-uploads': 2 } daemon_json = json.dumps(config) subprocess.run([ 'sudo', 'sh', '-c', f'echo "{daemon_json}" > /etc/docker/daemon.json' ]) subprocess.run(['sudo', 'systemctl', 'restart', 'docker']) ```25 min
  18. 18CI/CD for SaaSCI/CD pipelines for Nigerian SaaS must handle the unique requirements of multi-tenant deployments while accounting for payment processor integration testing and varying tenant configurations. CI/CD for SaaS requires more than simple build and deploy. The pipeline must handle multi-tenant configuration, payment processor webhooks, and compliance requirements specific to Nigerian operations. ```python import yaml from dataclasses import dataclass from typing import Optional import subprocess @dataclass class PipelineStage: name: str commands: list[str] timeout: int = 600 allow_failure: bool = False class SaaSCICDPipeline: """CI/CD pipeline for multi-tenant SaaS.""" def __init__(self, config_dir: str): self.config_dir = config_dir self.stages = self._load_pipeline_config() def execute_pipeline( self, environment: str, skip_stages: list = None ) -> dict: """Execute full CI/CD pipeline.""" results = [] skip_stages = skip_stages or [] for stage_config in self.stages: stage = PipelineStage(**stage_config) if stage.name in skip_stages: results.append({ 'stage': stage.name, 'status': 'skipped' }) continue if environment == 'production' and stage.name in ['security-scan', 'integration-tests']: pass # Required for production elif environment == 'staging' and stage.name in ['penetration-test']: continue # Skip in staging try: result = self._execute_stage(stage, environment) results.append(result) if not stage.allow_failure and result['status'] == 'failed': return {'status': 'failed', 'results': results} except Exception as e: results.append({ 'stage': stage.name, 'status': 'failed', 'error': str(e) }) return {'status': 'failed', 'results': results} return {'status': 'success', 'results': results} def _execute_stage(self, stage: PipelineStage, environment: str) -> dict: """Execute a single pipeline stage.""" start_time = datetime.utcnow() for command in stage.commands: env_vars = self._get_environment_vars(environment) result = subprocess.run( command, shell=True, capture_output=True, timeout=stage.timeout, env=env_vars ) if result.returncode != 0: return { 'stage': stage.name, 'status': 'failed', 'error': result.stderr.decode(), 'duration': (datetime.utcnow() - start_time).seconds } return { 'stage': stage.name, 'status': 'success', 'duration': (datetime.utcnow() - start_time).seconds } def _get_environment_vars(self, environment: str) -> dict: """Get environment-specific variables.""" secrets = self._load_secrets(environment) return { 'DATABASE_URL': secrets['database_url'], 'REDIS_URL': secrets['redis_url'], 'PAYSTACK_SECRET_KEY': secrets['paystack_key'], 'FLUTTERWAVE_SECRET_KEY': secrets['flutterwave_key'], 'ENVIRONMENT': environment, 'SENTRY_DSN': secrets.get('sentry_dsn', '') } ``` **Testing Multi-Tenant Scenarios:** ```python class MultiTenantTestRunner: """Run tests across multiple tenant configurations.""" def __init__(self, test_client, db_fixtures): self.client = test_client self.fixtures = db_fixtures def run_tenant_matrix_tests(self) -> dict: """Test across all plan combinations.""" plans = ['free', 'starter', 'professional', 'enterprise'] features = ['api_access', 'ai_tokens', 'webhooks', 'sso'] results = [] for plan in plans: for features_subset in self._generate_feature_combinations(features): tenant = self._create_test_tenant(plan, features_subset) test_results = self._run_tenant_tests(tenant) results.append({ 'plan': plan, 'features': features_subset, 'passed': test_results['passed'], 'failed': test_results['failed'] }) return { 'total': len(results), 'passed': sum(1 for r in results if r['failed'] == 0), 'failed': sum(1 for r in results if r['failed'] > 0), 'details': results } def _run_tenant_tests(self, tenant: Tenant) -> dict: """Run tests for a specific tenant configuration.""" passed = 0 failed = 0 if tenant.has_feature('api_access'): if not self._test_api_access(tenant): failed += 1 else: passed += 1 if tenant.has_feature('ai_tokens'): if not self._test_ai_integration(tenant): failed += 1 else: passed += 1 if tenant.has_feature('webhooks'): if not self._test_webhook_delivery(tenant): failed += 1 else: passed += 1 return {'passed': passed, 'failed': failed} ``` **Payment Integration Testing:** ```python class PaymentIntegrationTest: """Test payment processor integrations.""" def __init__(self, paystack_client, flutterwave_client): self.paystack = paystack_client self.flutterwave = flutterwave_client def test_nigerian_payment_flow(self) -> dict: """Test complete payment flow with Nigerian payment methods.""" results = {} results['paystack_card'] = self._test_paystack_card() results['paystack_transfer'] = self._test_paystack_transfer() results['flutterwave_card'] = self._test_flutterwave_card() results['flutterwave_ussd'] = self._test_flutterwave_ussd() return results def _test_paystack_transfer(self) -> dict: """Test Paystack bank transfer payment.""" test_txn = { 'amount': 45000, 'email': '[email protected]', 'currency': 'NGN', 'payment_type': 'bank_transfer' } initialization = self.paystack.initialize_transaction(test_txn) authorization = self.paystack.verify_authorization( initialization['reference'], mock_verification=True ) settlement = self._verify_settlement( 'paystack', authorization['reference'] ) return { 'initialized': initialization['status'] == 'success', 'authorized': authorization['status'] == 'success', 'settled': settlement, 'currency_handled': 'NGN' } ``` **Common Failure Modes:** CI/CD pipelines that run full test suites against every commit create unnecessary costs and delays. Implement smart test selection that runs only relevant tests based on changed files. ```python def determine_relevant_tests(changed_files: list) -> list: """Determine which tests to run based on changes.""" test_mapping = { 'services/billing': ['tests/billing/*', 'tests/payments/*'], 'services/ai': ['tests/ai/*', 'tests/models/*'], 'services/api': ['tests/api/*'], 'frontend': ['tests/frontend/*', 'tests/e2e/*'], 'shared': ['tests/*'] } tests = set() for changed_file in changed_files: for module, module_tests in test_mapping.items(): if changed_file.startswith(module): tests.update(module_tests) tests.update(test_mapping['shared']) return list(tests) ```25 min
  19. 19Monitoring Multi-TenantMulti-tenant monitoring requires tenant-context isolation in metrics while providing platform-wide views, with alerting thresholds calibrated for Nigerian network conditions where latency spikes are common. Monitoring a multi-tenant SaaS requires balancing detailed per-tenant visibility with system-wide health monitoring, all while keeping costs manageable for Nigerian operations. ```python from dataclasses import dataclass from typing import Optional import time from prometheus_client import Counter, Histogram, Gauge @dataclass class TenantMetrics: """Metrics container for a single tenant.""" requests_total: Counter requests_errors: Counter request_duration: Histogram ai_tokens_used: Counter quota_remaining: Gauge class MultiTenantMetricsCollector: """Collect and manage metrics across tenants.""" def __init__(self): self._metrics_cache = {} self._tenant_labels = ['tenant_id', 'plan', 'region'] def get_tenant_metrics(self, tenant_id: str, plan: str) -> TenantMetrics: """Get or create metrics for a tenant.""" if tenant_id not in self._metrics_cache: self._metrics_cache[tenant_id] = TenantMetrics( requests_total=Counter( 'saas_requests_total', 'Total requests', self._tenant_labels ), requests_errors=Counter( 'saas_requests_errors_total', 'Total errors', self._tenant_labels ), request_duration=Histogram( 'saas_request_duration_seconds', 'Request duration', self._tenant_labels, buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0] ), ai_tokens_used=Counter( 'saas_ai_tokens_total', 'AI tokens consumed', self._tenant_labels ), quota_remaining=Gauge( 'saas_quota_remaining', 'Remaining quota', self._tenant_labels ) ) return self._metrics_cache[tenant_id] def record_request( self, tenant_id: str, plan: str, endpoint: str, status: int, duration: float ): """Record metrics for a request.""" metrics = self.get_tenant_metrics(tenant_id, plan) labels = {'tenant_id': tenant_id, 'plan': plan, 'endpoint': endpoint} metrics.requests_total.labels(**labels).inc() if status >= 400: metrics.requests_errors.labels(**labels).inc() metrics.request_duration.labels(**labels).observe(duration) ``` **Alert Configuration for Nigerian Conditions:** ```python from typing import Callable class AlertingManager: """Manage alerts with tenant-aware thresholds.""" def __init__(self, alert_manager_client): self.client = alert_manager_client self.default_thresholds = { 'error_rate_percent': 5.0, 'latency_p95_ms': 2000, 'latency_p99_ms': 5000, 'quota_usage_percent': 90, 'payment_failure_rate_percent': 10 } def configure_alerts(self, environment: str): """Configure alert rules for environment.""" common_rules = [ { 'name': 'high_error_rate', 'expr': 'rate(saas_requests_errors_total[5m]) / rate(saas_requests_total[5m]) > 0.05', 'severity': 'critical', 'annotations': { 'summary': 'High error rate detected', 'description': 'Error rate exceeds 5% for 5 minutes' } }, { 'name': 'high_latency', 'expr': 'histogram_quantile(0.95, rate(saas_request_duration_seconds_bucket[5m])) > 2', 'severity': 'warning', 'annotations': { 'summary': 'High API latency', 'description': 'P95 latency exceeds 2 seconds' } }, { 'name': 'payment_webhook_failures', 'expr': 'rate(payment_webhook_failures_total[5m]) > 0.1', 'severity': 'critical', 'annotations': { 'summary': 'Payment webhook failures', 'description': 'Payment processor webhooks failing' } } ] for rule in common_rules: self._create_alert_rule(rule) def configure_tenant_alerts(self, tenant_id: str, plan: str): """Configure tenant-specific alerts.""" thresholds = self._get_tenant_thresholds(plan) tenant_rules = [ { 'name': f'tenant_quota_exceeded_{tenant_id}', 'expr': f'saas_quota_remaining{{tenant_id="{tenant_id}"}} < 0', 'severity': 'warning', 'labels': {'tenant_id': tenant_id}, 'annotations': { 'description': f'Tenant {tenant_id} has exceeded quota' } }, { 'name': f'tenant_high_usage_{tenant_id}', 'expr': f'rate(saas_requests_total{{tenant_id="{tenant_id}"}}[1h]) > {thresholds["max_requests_per_hour"]}', 'severity': 'warning', 'labels': {'tenant_id': tenant_id}, 'annotations': { 'description': f'Tenant {tenant_id} approaching rate limit' } } ] for rule in tenant_rules: self._create_alert_rule(rule) ``` **Real-Time Monitoring Dashboard:** ```python class MonitoringDashboard: """Generate monitoring dashboard configuration.""" def __init__(self, grafana_client): self.grafana = grafana_client def create_platform_dashboard(self) -> dict: """Create platform-wide monitoring dashboard.""" dashboard = { 'title': 'SaaS Platform Overview', 'tags': ['platform', 'production'], 'timezone': 'Africa/Lagos', 'refresh': '30s', 'panels': [ self._create_overview_row(), self._create_tenant_health_row(), self._create_payment_status_row(), self._create_infrastructure_row() ] } return self.grafana.create_dashboard(dashboard) def _create_overview_row(self) -> dict: """Create overview metrics row.""" return { 'title': 'Platform Overview', 'gridPos': {'h': 8, 'w': 24, 'x': 0, 'y': 0}, 'targets': [ { 'expr': 'sum(rate(saas_requests_total[5m]))', 'legendFormat': 'Total RPS', 'refId': 'A' }, { 'expr': 'sum(rate(saas_requests_errors_total[5m])) / sum(rate(saas_requests_total[5m])) * 100', 'legendFormat': 'Error Rate %', 'refId': 'B' }, { 'expr': 'histogram_quantile(0.95, sum(rate(saas_request_duration_seconds_bucket[5m])) by (le))', 'legendFormat': 'P95 Latency', 'refId': 'C' } ] } def _create_tenant_health_row(self) -> dict: """Create tenant health monitoring row.""" return { 'title': 'Tenant Health', 'gridPos': {'h': 10, 'w': 24, 'x': 0, 'y': 8}, 'targets': [ { 'expr': 'topk(10, sum(rate(saas_requests_total[5m])) by (tenant_id))', 'legendFormat': '{{tenant_id}}', 'refId': 'A' }, { 'expr': 'topk(10, sum(rate(saas_requests_errors_total[5m])) by (tenant_id))', 'legendFormat': 'Errors - {{tenant_id}}', 'refId': 'B' } ] } ``` **Common Failure Modes:** Monitoring systems that collect excessive metrics quickly become expensive and difficult to query. Implement metric cardinality limits and aggregate tenant metrics into bucketed categories (small/medium/large) for platform-level views. ```python def aggregate_tenant_metrics(): """Aggregate tenant metrics for efficient querying.""" aggregation_queries = [ """ SELECT CASE WHEN tenant_plan IN ('free', 'starter') THEN 'small' WHEN tenant_plan = 'professional' THEN 'medium' ELSE 'enterprise' END as tenant_category, date_trunc('hour', created_at) as hour, sum(requests) as total_requests, sum(errors) as total_errors, avg(avg_latency) as avg_latency FROM tenant_metrics GROUP BY 1, 2 """, """ CREATE MATERIALIZED VIEW tenant_metrics_hourly AS SELECT tenant_id, date_trunc('hour', created_at) as hour, sum(amount) as total_usage, count(*) as request_count FROM usage_records GROUP BY 1, 2 """ ] return aggregation_queries ```25 min
  20. 20Cost AllocationCost allocation in Nigerian SaaS must account for NGN/USD exchange rate volatility, with infrastructure costs tracked per-tenant while aggregating shared platform expenses for accurate unit economics. Accurate cost allocation enables proper pricing decisions and helps identify unprofitable tenants, critical for sustainable operations in markets with significant currency fluctuation. ```python from dataclasses import dataclass from datetime import datetime from decimal import Decimal from typing import Optional import requests class CostAllocationService: """Track and allocate costs across tenants.""" def __init__(self, db_session, aws_cost_client, azure_cost_client): self.db = db_session self.aws_costs = aws_cost_client self.azure_costs = azure_cost_client self.exchange_rate_service = ExchangeRateService() def calculate_monthly_costs(self, billing_month: datetime) -> dict: """Calculate total platform costs for a billing month.""" start_date = billing_month.replace(day=1) if billing_month.month == 12: end_date = billing_month.replace(year=billing_month.year + 1, month=1, day=1) else: end_date = billing_month.replace(month=billing_month.month + 1, day=1) aws_costs = self._get_aws_costs(start_date, end_date) azure_costs = self._get_azure_costs(start_date, end_date) ai_costs = self._get_ai_provider_costs(start_date, end_date) third_party = self._get_third_party_costs(start_date, end_date) ngn_rate = self.exchange_rate_service.get_rate('USD', 'NGN', start_date) total_usd = aws_costs + azure_costs + ai_costs + third_party total_ngn = Decimal(str(total_usd)) * Decimal(str(ngn_rate)) return { 'period': {'start': start_date, 'end': end_date}, 'costs_usd': { 'aws': aws_costs, 'azure': azure_costs, 'ai_providers': ai_costs, 'third_party': third_party, 'total': total_usd }, 'costs_ngn': float(total_ngn), 'exchange_rate': ngn_rate } def allocate_costs_to_tenants(self, billing_month: datetime) -> list[dict]: """Allocate platform costs to individual tenants.""" platform_costs = self.calculate_monthly_costs(billing_month) tenants = self.db.query(Tenant).filter( Tenant.status == 'active' ).all() allocations = [] shared_costs = self._calculate_shared_costs(platform_costs) direct_costs = self._calculate_direct_costs(tenants, billing_month) for tenant in tenants: tenant_direct = direct_costs.get(tenant.id, {}) tenant_share = self._calculate_tenant_share( tenant, shared_costs, platform_costs ) allocations.append({ 'tenant_id': tenant.id, 'tenant_name': tenant.name, 'plan': tenant.plan, 'direct_costs': tenant_direct, 'allocated_shared': tenant_share, 'total_costs_ngn': sum(tenant_direct.values()) + tenant_share }) return allocations ``` **Per-Tenant Cost Tracking:** ```python class TenantCostTracker: """Track detailed costs per tenant.""" def __init__(self, db_session, metrics_collector): self.db = db_session self.metrics = metrics_collector def track_computation_cost( self, tenant_id: str, service: str, resource_usage: dict, pricing: dict ) -> dict: """Track and calculate computation cost for tenant.""" cost_breakdown = {} total_cost = Decimal('0') if 'compute_seconds' in resource_usage: compute_cost = Decimal(str(resource_usage['compute_seconds'])) * Decimal(str(pricing['compute_per_second'])) cost_breakdown['compute'] = float(compute_cost) total_cost += compute_cost if 'storage_gb_months' in resource_usage: storage_cost = Decimal(str(resource_usage['storage_gb_months'])) * Decimal(str(pricing['storage_per_gb'])) cost_breakdown['storage'] = float(storage_cost) total_cost += storage_cost if 'api_calls' in resource_usage: api_cost = Decimal(str(resource_usage['api_calls'])) * Decimal(str(pricing['api_per_call'])) cost_breakdown['api_calls'] = float(api_cost) total_cost += api_cost if 'ai_tokens' in resource_usage: tokens_cost = self._calculate_ai_cost(resource_usage['ai_tokens'], pricing) cost_breakdown['ai_tokens'] = float(tokens_cost) total_cost += tokens_cost cost_record = TenantCostRecord( tenant_id=tenant_id, service=service, billing_month=datetime.utcnow().replace(day=1), costs=cost_breakdown, total_ngn=float(total_cost), exchange_rate=self._get_current_rate(), created_at=datetime.utcnow() ) self.db.add(cost_record) self.db.commit() return cost_breakdown def _calculate_ai_cost(self, token_usage: dict, pricing: dict) -> Decimal: """Calculate AI provider costs by model.""" total = Decimal('0') model_prices = { 'gpt-4': {'prompt': 0.03, 'completion': 0.06}, # per 1K tokens USD 'gpt-3.5-turbo': {'prompt': 0.0015, 'completion': 0.002} } for model, usage in token_usage.items(): if model in model_prices: prices = model_prices[model] cost = ( Decimal(str(usage.get('prompt_tokens', 0))) * Decimal(str(prices['prompt'])) / 1000 + Decimal(str(usage.get('completion_tokens', 0))) * Decimal(str(prices['completion'])) / 1000 ) total += cost return total ``` **Unit Economics Calculation:** ```python class UnitEconomicsCalculator: """Calculate unit economics for SaaS operations.""" def __init__(self, db_session): self.db = db_session def calculate_unit_economics(self, period_start: datetime, period_end: datetime) -> dict: """Calculate thorough unit economics.""" tenants = self._get_active_tenants(period_start, period_end) revenue_per_tenant = self._calculate_revenue_per_tenant(tenants) cost_per_tenant = self._calculate_cost_per_tenant(tenants, period_start, period_end) gross_margin = (revenue_per_tenant - cost_per_tenant) / revenue_per_tenant if revenue_per_tenant > 0 else 0 cac = self._calculate_customer_acquisition_cost(period_start, period_end) ltv = self._calculate_lifetime_value(tenants) ltv_cac_ratio = ltv / cac if cac > 0 else 0 months_to_recover = cac / (revenue_per_tenant / 12) if revenue_per_tenant > 0 else 0 return { 'revenue_per_tenant_monthly': revenue_per_tenant, 'cost_per_tenant_monthly': cost_per_tenant, 'gross_margin_percent': gross_margin * 100, 'cac_ngn': cac, 'ltv_ngn': ltv, 'ltv_cac_ratio': ltv_cac_ratio, 'months_to_recover': months_to_recover } def _calculate_cost_per_tenant(self, tenants: list, period_start: datetime, period_end: datetime) -> Decimal: """Calculate average cost per tenant.""" total_costs = Decimal('0') for tenant in tenants: costs = self.db.query(TenantCostRecord).filter( TenantCostRecord.tenant_id == tenant.id, TenantCostRecord.billing_month >= period_start, TenantCostRecord.billing_month <= period_end ).all() tenant_total = sum(Decimal(str(c.total_ngn)) for c in costs) total_costs += tenant_total return total_costs / Decimal(str(len(tenants))) if tenants else Decimal('0') ``` **Common Failure Modes:** Cost allocation that doesn't account for currency fluctuations creates billing disputes. Always lock exchange rates at the time of billing cycle creation and store historical rates with each cost record. ```python def store_cost_with_exchange_rate(tenant_id: str, cost_usd: float, billing_month: datetime): """Store costs with exchange rate for future reconciliation.""" rate = get_exchange_rate('USD', 'NGN', billing_month) cost_record = { 'tenant_id': tenant_id, 'cost_usd': cost_usd, 'exchange_rate': rate, 'cost_ngn': cost_usd * rate, 'rate_locked_at': billing_month, 'billing_month': billing_month } return cost_record ```25 min
  21. 21Scaling StrategyScaling Nigerian SaaS requires balancing cost optimization with performance requirements, where strategies differ significantly between low-cost development environments and production systems handling real Naira transactions. Scaling strategy must account for the unpredictable traffic patterns common in Nigerian markets, where viral content or viral marketing can create sudden traffic spikes that overwhelm unprepared systems. ```python from dataclasses import dataclass from typing import Optional import boto3 @dataclass class ScalingConfig: min_instances: int max_instances: int target_cpu_utilization: float scale_up_cooldown: int scale_down_cooldown: int class ScalingStrategy: """Implement multi-tier scaling strategy for SaaS.""" def __init__(self, ecs_client, cloudwatch_client): self.ecs = ecs_client self.cloudwatch = cloudwatch_client self.strategies = { 'aggressive': ScalingConfig(2, 20, 50, 60, 300), 'conservative': ScalingConfig(2, 8, 70, 120, 600), 'balanced': ScalingConfig(2, 12, 60, 90, 450) } def apply_scaling_strategy( self, service_name: str, strategy: str ) -> dict: """Apply scaling strategy to an ECS service.""" config = self.strategies.get(strategy, self.strategies['balanced']) scaling_policy = { 'TargetGroupConfigurations': [{ 'TargetGroupArn': self._get_target_group_arn(service_name), 'ContainerPort': 8000 }], 'ScalableTargetDimension': 'service:DesiredCount', 'MinCapacity': config.min_instances, 'MaxCapacity': config.max_instances } target = self.ecs.register_scalable_target(**scaling_policy) cpu_scaling = self._create_step_scaling_policy( 'CpuScaling', 'AverageCPUUtilization', config.target_cpu_utilization, config.scale_up_cooldown, config.scale_down_cooldown ) return { 'target': target, 'policies': [cpu_scaling] } ``` **Horizontal Pod Autoscaling for Kubernetes:** ```python from kubernetes import client, config class KubernetesScalingManager: """Manage K8s autoscaling for SaaS workloads.""" def __init__(self): config.load_kube_config() self.autoscaling = client.AutoscalingV2Api() def deploy_hpa(self, service_name: str, config: dict) -> dict: """Deploy Horizontal Pod Autoscaler.""" hpa = client.HorizontalPodAutoscaler( api_version='autoscaling/v2', kind='HorizontalPodAutoscaler', metadata=client.V1ObjectMeta( name=f'{service_name}-hpa', namespace='default', labels={'app': service_name} ), spec=client.HorizontalPodAutoscalerSpec( scale_target_ref=client.CrossVersionObjectReference( api_version='apps/v1', kind='Deployment', name=service_name ), min_replicas=config.get('min_replicas', 2), max_replicas=config.get('max_replicas', 10), metrics=[ client.MetricSpec( type='Resource', resource=client.ResourceMetricStatus( name='cpu', target=client.MetricTarget( type='Utilization', average_utilization=config.get('target_cpu_percent', 60) ) ) ), client.MetricSpec( type='Pods', pods=client.PodsMetricStatus( metric=client.MetricIdentifier( name='requests_per_second' ), target=client.MetricTarget( type='AverageValue', average_value=client.Quantity('100') ) ) ) ], behavior=client.HorizontalPodAutoscalerBehavior( scale_up=client.ScalingRules( stabilization_window_seconds=0, policies=[ client.ScalingPolicy( type='Percent', value=100, period_seconds=15 ) ] ), scale_down=client.ScalingRules( stabilization_window_seconds=300, policies=[ client.ScalingPolicy( type='Percent', value=10, period_seconds=60 ) ] ) ) ) ) return self.autoscaling.create_namespaced_horizontal_pod_autoscaler( namespace='default', body=hpa ) ``` **Database Scaling Strategy:** ```python class DatabaseScalingManager: """Manage database scaling with read replicas and connection pooling.""" def __init__(self, rds_client, proxy_client): self.rds = rds_client self.proxy = proxy_client def setup_read_replica(self, primary_db_arn: str, region: str) -> dict: """Create read replica for read scaling.""" replica = self.rds.create_db_instance_read_replica( DBInstanceIdentifier=f'saas-read-replica-{region}', SourceDBInstanceIdentifier=primary_db_arn, DBInstanceClass='db.t3.medium', AvailabilityZone=f'{region}a', PubliclyAccessible=False, Tags=[ {'Key': 'Purpose', 'Value': 'read-replica'}, {'Key': 'Region', 'Value': region} ] ) return replica def configure_connection_pool( self, pool_size: int, max_connections: int ) -> dict: """Configure PgBouncer connection pooling.""" pooler_config = { 'pool_size': pool_size, 'max_connections': max_connections, 'server_idle_timeout': 600, 'server_lifetime': 3600, 'query_timeout': 30, 'pool_mode': 'transaction' } return self.proxy.update_pool_configuration(pooler_config) ``` **Predictive Scaling with ML:** ```python from sklearn.linear_model import LinearRegression from datetime import datetime, timedelta import numpy as np class PredictiveScalingService: """Predict traffic and scale proactively.""" def __init__(self, metrics_client): self.metrics = metrics_client self.model = LinearRegression() def train_model(self, historical_data: pd.DataFrame): """Train scaling prediction model.""" X = historical_data[['hour', 'day_of_week', 'is_business_day', 'marketing_campaign_active']] y = historical_data['requests_per_second'] self.model.fit(X, y) def predict_load(self, target_datetime: datetime) -> dict: """Predict load for a future time.""" features = self._prepare_features(target_datetime) predicted_load = self.model.predict([features])[0] confidence = self._calculate_confidence(features) recommended_instances = self._calculate_instances( predicted_load, confidence ) return { 'predicted_requests_per_second': predicted_load, 'confidence': confidence, 'recommended_instances': recommended_instances, 'predicted_at': datetime.utcnow() } def _calculate_instances(self, predicted_load: float, confidence: float) -> int: """Calculate recommended instance count.""" base_instances = 2 load_factor = predicted_load / 1000 safety_factor = 1.5 if confidence < 0.7 else 1.2 return int(base_instances + load_factor * safety_factor) ``` **Common Failure Modes:** Auto-scaling that responds to short traffic spikes causes thrashing. Always implement stabilization windows and minimum scale-down thresholds. ```python def configure_safe_autoscaling(): """Configure autoscaling with safety measures.""" rules = { 'scale_up': { 'stabilization_window': 0, 'policies': [{'type': 'percent', 'value': 100, 'period': 15}] }, 'scale_down': { 'stabilization_window': 300, 'policies': [ {'type': 'percent', 'value': 10, 'period': 60}, {'type': 'absolute', 'value': 1, 'period': 300} ] } } return rules ```25 min
  22. 22Nigerian Market StrategyNigerian market strategy for AI SaaS requires deep understanding of local payment preferences, business practices, and the specific challenges of reaching Lagos and Abuja enterprise clients. Nigerian market strategy must address payment preferences (bank transfers dominant over cards), currency handling (NGN/USD volatility), and local business relationship building. ```python from dataclasses import dataclass from typing import Optional from datetime import datetime @dataclass class NigerianMarketConfig: primary_currency: str = 'NGN' secondary_currency: str = 'USD' payment_preferences: list[str] preferred_billing_cycle: str enterprise_decision_makers: list[str] class NigerianMarketStrategy: """Implement market strategy for Nigerian SaaS.""" def __init__(self, db_session, payment_service, crm_service): self.db = db_session self.payment = payment_service self.crm = crm_service self.config = self._load_market_config() def configure_for_nigerian_market(self) -> dict: """Configure SaaS for Nigerian market requirements.""" pricing_config = self._configure_pricing() payment_config = self._configure_payments() compliance_config = self._configure_compliance() return { 'pricing': pricing_config, 'payments': payment_config, 'compliance': compliance_config } def _configure_pricing(self) -> dict: """Configure pricing for Nigerian market.""" exchange_rate = self._get_current_ngn_usd_rate() pricing_tiers = [ { 'name': 'Starter', 'price_ngn': 15000, 'price_usd': round(15000 / exchange_rate, 2), 'features': ['100K API calls', '1M AI tokens', 'Email support'], 'target': 'Startups and small agencies' }, { 'name': 'Professional', 'price_ngn': 45000, 'price_usd': round(45000 / exchange_rate, 2), 'features': ['500K API calls', '10M AI tokens', 'Priority support', 'Webhooks'], 'target': 'Growing businesses and agencies' }, { 'name': 'Enterprise', 'price_ngn': 150000, 'price_usd': round(150000 / exchange_rate, 2), 'features': ['Unlimited API', '50M AI tokens', 'Dedicated support', 'SLA', 'Custom integrations'], 'target': 'Large enterprises and government' } ] return { 'currency': 'NGN', 'exchange_rate_lock_days': 30, 'tiers': pricing_tiers } def _configure_payments(self) -> dict: """Configure payment preferences for Nigerian market.""" return { 'payment_methods': [ {'type': 'paystack_transfer', 'priority': 1, 'label': 'Bank Transfer (Nigerian Naira)'}, {'type': 'paystack_card', 'priority': 2, 'label': 'Card Payment'}, {'type': 'flutterwave_transfer', 'priority': 3, 'label': 'Flutterwave Transfer'}, {'type': 'flutterwave_ussd', 'priority': 4, 'label': 'USSD Payment'}, {'type': 'international_card', 'priority': 5, 'label': 'International Card (USD)'} ], 'invoice_terms': { 'default_days': 30, 'enterprise_days': 45, 'grace_period_days': 7 }, 'recurring_billing': { 'enabled': True, 'default_cycle': 'monthly', 'supported_cycles': ['monthly', 'quarterly', 'annually'], 'annual_discount_percent': 20 } } ``` **Local Payment Integration:** ```python class NigerianPaymentProcessor: """Handle Nigerian payment methods.""" def __init__(self, paystack_client, flutterwave_client): self.paystack = paystack_client self.flutterwave = flutterwave_client def initiate_payment( self, tenant_id: str, amount_ngn: float, payment_method: str, description: str ) -> dict: """Initiate payment with preferred Nigerian method.""" tenant = self.db.query(Tenant).filter(Tenant.id == tenant_id).first() if payment_method == 'bank_transfer': return self._initiate_paystack_transfer(amount_ngn, tenant, description) elif payment_method == 'ussd': return self._initiate_ussd_payment(amount_ngn, tenant, description) else: return self._initiate_card_payment(amount_ngn, tenant, description) def _initiate_paystack_transfer(self, amount_ngn: float, tenant: Tenant, description: str) -> dict: """Initiate bank transfer payment via Paystack.""" transfer_recipient = self._get_or_create_transfer_recipient(tenant) transaction = self.paystack.transactions.create( amount=int(amount_ngn * 100), currency='NGN', email=tenant.billing_email, description=description, metadata={ 'tenant_id': tenant.id, 'plan': tenant.plan, 'invoice_number': self._generate_invoice_number(tenant) } ) return { 'provider': 'paystack', 'reference': transaction['reference'], 'status': 'pending', 'bank_details': transfer_recipient, 'amount_ngn': amount_ngn, 'expires_at': datetime.utcnow() + timedelta(hours=48) } def _initiate_ussd_payment(self, amount_ngn: float, tenant: Tenant, description: str) -> dict: """Initiate USSD payment via Flutterwave.""" if amount_ngn > 500000: raise ValueError("USSD payments limited to ₦500,000") flutterwave_txn = self.flutterwave.charges.create( amount=amount_ngn, currency='NGN', email=tenant.billing_email, tx_ref=f"USS-{tenant.id[:8]}-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}", channel='ussd', meta={ 'tenant_id': tenant.id, 'description': description }, ussd={ 'endurl': 'https://app.example.com/payment/complete' } ) return { 'provider': 'flutterwave', 'reference': flutterwave_txn['reference'], 'status': 'pending', 'ussd_code': flutterwave_txn['meta']['flutterwave_ussd_code'], 'amount_ngn': amount_ngn, 'expires_at': datetime.utcnow() + timedelta(minutes=10) } ``` **Enterprise Sales Strategy:** ```python class NigerianEnterpriseSales: """Handle enterprise sales in Nigerian market.""" def __init__(self, crm_client, email_service): self.crm = crm_client self.email = email_service def qualify_lead(self, company_info: dict) -> dict: """Qualify enterprise lead based on Nigerian market criteria.""" qualification_score = 0 if company_info.get('headquarters') in ['Lagos', 'Abuja', 'Port Harcourt']: qualification_score += 20 if company_info.get('employee_count', 0) > 50: qualification_score += 25 if company_info.get('annual_revenue_ngn', 0) > 50000000: qualification_score += 30 if company_info.get('industry') in ['fintech', 'banking', 'government', 'telecommunications']: qualification_score += 15 decision_makers = self._identify_decision_makers(company_info) if len(decision_makers) >= 2: qualification_score += 10 return { 'score': qualification_score, 'tier': self._get_lead_tier(qualification_score), 'decision_makers': decision_makers, 'recommended_approach': self._get_sales_approach(qualification_score) } def _identify_decision_makers(self, company_info: dict) -> list[dict]: """Identify key decision makers for enterprise sales.""" common_roles = [ {'title_pattern': 'Chief', 'weight': 3}, {'title_pattern': 'Director', 'weight': 2}, {'title_pattern': 'Head of', 'weight': 2}, {'title_pattern': 'Manager', 'weight': 1} ] decision_makers = [] for contact in company_info.get('contacts', []): for role in common_roles: if role['title_pattern'].lower() in contact.get('title', '').lower(): decision_makers.append({ 'name': contact.get('name'), 'title': contact.get('title'), 'weight': role['weight'], 'linkedin': contact.get('linkedin_url') }) return sorted(decision_makers, key=lambda x: x['weight'], reverse=True)[:5] ``` **Common Failure Modes:** Pricing that doesn't account for NGN/USD volatility creates margin erosion. Always implement pricing with automatic exchange rate adjustments and build in currency risk buffers. ```python def calculate_nigerian_pricing(base_usd_price: float, margin_buffer: float = 0.15) -> dict: """Calculate Nigerian pricing with volatility buffer.""" current_rate = get_exchange_rate('USD', 'NGN') base_ngn = base_usd_price * current_rate volatility_buffer = current_rate * 0.05 recommended_ngn = base_ngn + volatility_buffer return { 'base_usd': base_usd_price, 'current_rate': current_rate, 'base_ngn': round(base_ngn, 2), 'with_buffer_ngn': round(recommended_ngn, 2), 'buffer_amount': round(volatility_buffer, 2), 'updated_at': datetime.utcnow() } ```25 min
  23. 23ComplianceNigerian SaaS compliance requires adherence to NDPR for data protection, NDIC guidelines for financial transactions, and NCC regulations for telecommunications, with documentation and audit trails essential for legal operations. Compliance in the Nigerian context extends beyond technical security to encompass regulatory requirements specific to data handling, payment processing, and telecommunications. ```python from dataclasses import dataclass from typing import Optional from datetime import datetime import hashlib @dataclass class ComplianceRequirement: regulation: str requirement: str implementation: str audit_frequency: str documentation_required: list[str] class NigerianComplianceManager: """Manage compliance requirements for Nigerian SaaS.""" def __init__(self, db_session, audit_service): self.db = db_session self.audit = audit_service self.requirements = self._load_requirements() def _load_requirements(self) -> dict: """Load compliance requirements for Nigerian regulations.""" return { 'ndpr': { 'regulation': 'Nigeria Data Protection Regulation', 'key_requirements': [ 'Lawful basis for processing', 'Data minimization', 'Purpose limitation', 'Storage limitation', 'Accuracy', 'Integrity and confidentiality', 'Accountability' ], 'implementation_guide': self._implement_ndpr }, 'ndic': { 'regulation': 'NDIC Guidelines', 'key_requirements': [ 'KYC compliance', 'Transaction monitoring', 'Fraud detection', 'Customer verification' ], 'implementation_guide': self._implement_ndic_compliance }, 'ncc': { 'regulation': 'NCC Regulations', 'key_requirements': [ 'Service availability', 'Quality of service', 'Consumer protection', 'Data retention' ], 'implementation_guide': self._implement_ncc_compliance } } def _implement_ndpr(self) -> dict: """Implement NDPR compliance requirements.""" implementation = { 'data_classification': self._setup_data_classification(), 'consent_management': self._setup_consent_management(), 'data_subject_rights': self._setup_data_subject_rights(), 'dpoc_appointment': self._setup_dpoc(), 'breach_notification': self._setup_breach_notification() } return implementation def _setup_data_classification(self) -> dict: """Implement data classification for NDPR.""" classification_levels = [ { 'level': 'public', 'description': 'Information intended for public disclosure', 'handling': 'No restrictions', 'encryption': False }, { 'level': 'internal', 'description': 'Internal business information', 'handling': 'Internal access only', 'encryption': True }, { 'level': 'confidential', 'description': 'Sensitive business or customer data', 'handling': 'Restricted access, audit required', 'encryption': True }, { 'level': 'restricted', 'description': 'Highly sensitive personal or financial data', 'handling': 'Maximum protection, encrypted at rest and in transit', 'encryption': True, 'additional_controls': ['mfa', 'ip_whitelist', 'audit_trail'] } ] return { 'levels': classification_levels, 'classification_policy': 'Data classification required for all new data assets', 'review_frequency': 'Quarterly' } ``` **Data Subject Rights Implementation:** ```python class NDPRDataSubjectRights: """Implement NDPR data subject rights.""" def __init__(self, db_session, storage_service): self.db = db_session self.storage = storage_service def handle_access_request(self, data_subject_id: str, request_id: str) -> dict: """Handle data access request within 30 days.""" data_subject = self.db.query(DataSubject).filter( DataSubject.id == data_subject_id ).first() if not data_subject: raise ValueError("Data subject not found") request = DataAccessRequest( id=request_id, data_subject_id=data_subject_id, request_type='access', received_at=datetime.utcnow(), deadline=datetime.utcnow() + timedelta(days=30), status='in_progress' ) tenant_data = self._collect_subject_data(data_subject) data_package = self._compile_data_package(tenant_data) secure_link = self.storage.create_secure_link( data_package, expiry_seconds=86400 * 7 ) request.completed_at = datetime.utcnow() request.data_link = secure_link request.status = 'completed' self.db.commit() self._send_completion_notification(data_subject.email, secure_link) return { 'request_id': request_id, 'status': 'completed', 'data_link': secure_link, 'expires_in': '7 days' } def handle_deletion_request(self, data_subject_id: str, request_id: str) -> dict: """Handle data deletion request.""" data_subject = self.db.query(DataSubject).filter( DataSubject.id == data_subject_id ).first() legal_holds = self.db.query(LegalHold).filter( LegalHold.data_subject_id == data_subject_id, LegalHold.status == 'active' ).all() if legal_holds: return { 'request_id': request_id, 'status': 'blocked', 'reason': 'Legal hold exists', 'hold_ids': [h.id for h in legal_holds] } deletion_tasks = [] deletion_tasks.append(self._schedule_database_deletion(data_subject_id)) deletion_tasks.append(self._schedule_file_deletion(data_subject_id)) deletion_tasks.append(self._schedule_backup_deletion(data_subject_id)) request = DeletionRequest( id=request_id, data_subject_id=data_subject_id, received_at=datetime.utcnow(), tasks=deletion_tasks, status='scheduled', scheduled_completion=datetime.utcnow() + timedelta(days=30) ) self.db.add(request) self.db.commit() return { 'request_id': request_id, 'status': 'scheduled', 'tasks': deletion_tasks, 'estimated_completion': request.scheduled_completion } def _collect_subject_data(self, data_subject: DataSubject) -> dict: """Collect all data for a data subject.""" data = { 'profile': data_subject.to_dict(), 'usage_records': self.db.query(UsageRecord).filter( UsageRecord.tenant_id == data_subject.tenant_id ).all(), 'api_logs': self.db.query(APILog).filter( APILog.tenant_id == data_subject.tenant_id ).all(), 'payments': self.db.query(Payment).filter( Payment.tenant_id == data_subject.tenant_id ).all() } return data ``` **Audit Trail Implementation:** ```python class ComplianceAuditTrail: """Implement thorough audit trail for compliance.""" def __init__(self, audit_db): self.audit_db = audit_db def log_security_event( self, event_type: str, actor_id: str, resource_type: str, resource_id: str, action: str, outcome: str, metadata: dict = None ) -> str: """Log security event for compliance audit.""" event = AuditEvent( id=self._generate_event_id(), timestamp=datetime.utcnow(), event_type=event_type, actor_id=actor_id, actor_type='user', resource_type=resource_type, resource_id=resource_id, action=action, outcome=outcome, ip_address=metadata.get('ip_address') if metadata else None, user_agent=metadata.get('user_agent') if metadata else None, metadata=metadata, compliance_tags=['ndpr', 'ndic'] ) self.audit_db.add(event) self.audit_db.commit() if event_type in ['login_failed', 'permission_denied', 'data_access']: self._check_anomaly(event) return event.id def generate_compliance_report( self, start_date: datetime, end_date: datetime, regulations: list[str] ) -> dict: """Generate compliance report for specified regulations.""" events = self.audit_db.query(AuditEvent).filter( AuditEvent.timestamp >= start_date, AuditEvent.timestamp <= end_date, AuditEvent.compliance_tags.overlap(regulations) ).all() report = { 'period': {'start': start_date, 'end': end_date}, 'regulations': regulations, 'total_events': len(events), 'event_breakdown': self._categorize_events(events), 'access_requests': self._report_access_requests(events), 'deletion_requests': self._report_deletion_requests(events), 'security_incidents': self._report_security_incidents(events), 'generated_at': datetime.utcnow() } return report def _check_anomaly(self, event: AuditEvent): """Check for anomalous patterns.""" recent_events = self.audit_db.query(AuditEvent).filter( AuditEvent.actor_id == event.actor_id, AuditEvent.timestamp >= datetime.utcnow() - timedelta(hours=1) ).count() if recent_events > 10: self._trigger_security_alert(event.actor_id, 'high_event_frequency') ``` **Common Failure Modes:** Compliance documentation that isn't updated when systems change creates liability gaps. Implement automated compliance checks that verify documentation matches actual system configurations. ```python def verify_compliance_state(): """Verify current compliance state against requirements.""" checks = [ { 'requirement': 'Encryption at rest', 'verify': lambda: check_rds_encryption() == True, 'evidence': 'AWS RDS encryption screenshot' }, { 'requirement': 'MFA for all admin access', 'verify': lambda: verify_mfa_enabled('admin'), 'evidence': 'IAM policy screenshot' }, { 'requirement': 'Data retention policy documented', 'verify': lambda: check_policy_exists('retention'), 'evidence': 'Policy document location' } ] results = [] for check in checks: results.append({ 'requirement': check['requirement'], 'status': 'pass' if check['verify']() else 'fail', 'evidence': check['evidence'] }) return results ```25 min
  24. 24AI SaaS MVP ProjectBuilding an AI SaaS MVP requires integrating all previously covered components—multi-tenant architecture, billing, compliance, and monitoring—into a cohesive system that can serve Nigerian businesses reliably. This final chapter synthesizes all previous learning into a complete MVP implementation, demonstrating how the components work together in a production-ready system. ```python """ AI SaaS MVP - Complete Implementation ===================================== This module demonstrates the complete integration of: - Multi-tenant architecture (Chapter 2-5) - Authentication and authorization (Chapter 6-8) - AI integration (Chapter 9-10) - Database design (Chapter 11-12) - Billing and payments (Chapter 13-15) - Monitoring and compliance (Chapter 16-23) """ from fastapi import FastAPI, HTTPException, Depends from fastapi.middleware.cors import CORSMiddleware from contextlib import asynccontextmanager import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @asynccontextmanager async def lifespan(app: FastAPI): """Initialize and shutdown application resources.""" logger.info("Starting AI SaaS MVP") await initialize_services() await run_migrations() await verify_configuration() logger.info("Application started successfully") yield logger.info("Shutting down AI SaaS MVP") await cleanup_resources() app = FastAPI( title="AI SaaS MVP", description="Multi-tenant AI SaaS platform for Nigerian market", version="1.0.0", lifespan=lifespan ) app.add_middleware( CORSMiddleware, allow_origins=["https://app.example.com"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) from api.routes import router as api_router from billing.routes import router as billing_router from admin.routes import router as admin_router app.include_router(api_router, prefix="/api/v1") app.include_router(billing_router, prefix="/api/v1/billing") app.include_router(admin_router, prefix="/api/v1/admin") ``` **Complete API Implementation:** ```python # api/routes.py from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from typing import Optional from datetime import datetime router = APIRouter(prefix="/ai", tags=["AI Services"]) class AIRequest(BaseModel): prompt: str model: str = "gpt-3.5-turbo" max_tokens: Optional[int] = 1000 temperature: Optional[float] = 0.7 class AIResponse(BaseModel): request_id: str completion: str usage: dict model: str processing_time_ms: int @router.post("/complete", response_model=AIResponse) async def ai_complete( request: AIRequest, tenant: Tenant = Depends(get_current_tenant) ): """Process AI completion request with full tracking.""" start_time = datetime.utcnow() allowed, quota_status = quota_manager.check_quota( tenant.id, QuotaType.AI_TOKENS, estimate_tokens(request.prompt, request.max_tokens) ) if not allowed: raise HTTPException( status_code=429, detail={ 'error': 'quota_exceeded', 'current_usage': quota_status['current_usage'], 'limit': quota_status['hard_limit'] } ) try: response = await openai_client.complete( prompt=request.prompt, model=request.model, max_tokens=request.max_tokens, temperature=request.temperature ) processing_time = (datetime.utcnow() - start_time).total_seconds() * 1000 await quota_manager.consume_quota( tenant.id, QuotaType.AI_TOKENS, response.usage.total_tokens, metadata={ 'model': request.model, 'request_id': response.id } ) await metrics_collector.record_request( tenant.id, tenant.plan, '/ai/complete', 200, processing_time / 1000 ) return AIResponse( request_id=response.id, completion=response.choices[0].text, usage={ 'prompt_tokens': response.usage.prompt_tokens, 'completion_tokens': response.usage.completion_tokens, 'total_tokens': response.usage.total_tokens }, model=request.model, processing_time_ms=int(processing_time) ) except Exception as e: logger.error(f"AI completion failed: {str(e)}") await metrics_collector.record_request( tenant.id, tenant.plan, '/ai/complete', 500, (datetime.utcnow() - start_time).total_seconds() * 1000 ) raise HTTPException(status_code=500, detail="AI processing failed") ``` **Complete Billing Implementation:** ```python # billing/routes.py from fastapi import APIRouter, HTTPException from pydantic import BaseModel from typing import Optional router = APIRouter(prefix="/subscriptions", tags=["Billing"]) class SubscriptionCreate(BaseModel): plan: str payment_method: str billing_email: str class SubscriptionResponse(BaseModel): subscription_id: str plan: str status: str next_billing_date: datetime amount_ngn: float @router.post("/", response_model=SubscriptionResponse) async def create_subscription( request: SubscriptionCreate, tenant: Tenant = Depends(get_current_tenant) ): """Create new subscription with payment processing.""" plan = get_plan_config(request.plan) payment_processor = get_payment_processor(request.payment_method) payment = await payment_processor.initiate_payment( tenant_id=tenant.id, amount_ngn=plan['price_ngn'], payment_method=request.payment_method, description=f"Subscription to {request.plan} plan" ) subscription = Subscription( tenant_id=tenant.id, plan=request.plan, status='pending_payment', payment_reference=payment['reference'], billing_email=request.billing_email, created_at=datetime.utcnow() ) db.add(subscription) db.commit() return SubscriptionResponse( subscription_id=subscription.id, plan=request.plan, status='pending_payment', next_billing_date=subscription.next_billing_date, amount_ngn=plan['price_ngn'] ) @router.get("/{subscription_id}/invoices") async def list_invoices( subscription_id: str, tenant: Tenant = Depends(get_current_tenant) ): """List all invoices for a subscription.""" invoices = db.query(Invoice).filter( Invoice.tenant_id == tenant.id, Invoice.subscription_id == subscription_id ).order_by(Invoice.created_at.desc()).all() return [ { 'id': inv.id, 'invoice_number': inv.invoice_number, 'amount_ngn': inv.total, 'status': inv.status.value, 'created_at': inv.created_at, 'due_date': inv.due_date, 'payment_url': inv.payment_url } for inv in invoices ] ``` **Production Deployment Configuration:** ```python # docker-compose.yml for production version: '3.8' services: api: build: context: . dockerfile: Dockerfile environment: - DATABASE_URL=postgresql://user:pass@db:5432/saas - REDIS_URL=redis://redis:6379 - PAYSTACK_SECRET_KEY=${PAYSTACK_SECRET} - FLUTTERWAVE_SECRET=${FLUTTERWAVE_SECRET} - OPENAI_API_KEY=${OPENAI_KEY} - ENVIRONMENT=production depends_on: - db - redis deploy: replicas: 3 resources: limits: cpus: '1' memory: 2G reservations: cpus: '0.5' memory: 1G healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8000/health"] interval: 30s timeout: 10s retries: 3 db: image: postgres:15 environment: - POSTGRES_DB=saas - POSTGRES_USER=user - POSTGRES_PASSWORD=${DB_PASSWORD} volumes: - pgdata:/var/lib/postgresql/data - ./migrations:/docker-entrypoint-initdb.d deploy: resources: limits: cpus: '2' memory: 4G redis: image: redis:7-alpine deploy: resources: limits: cpus: '0.5' memory: 1G volumes: pgdata: ``` **Monitoring Stack:** ```python # monitoring/prometheus.yml global: scrape_interval: 15s evaluation_interval: 15s alerting: alertmanagers: - static_configs: - targets: - alertmanager:9093 rule_files: - "/etc/prometheus/rules/*.yml" scrape_configs: - job_name: 'ai-saas-api' kubernetes_sd_configs: - role: pod relabel_configs: - source_labels: [__meta_kubernetes_pod_name] action: keep regex: api-.* metrics_path: /metrics - job_name: 'ai-saas-services' static_configs: - targets: - billing-service:9090 - ai-service:9090 - notification-service:9090 # alerting rules groups: - name: saas_alerts rules: - alert: HighErrorRate expr: rate(http_requests_total{status=~"5.."}[5m]) > 0.05 for: 5m labels: severity: critical annotations: summary: "High error rate detected" - alert: QuotaExceeded expr: quota_remaining < 0 for: 1m labels: severity: warning annotations: summary: "Tenant quota exceeded" ``` **MVP Testing:** ```python # tests/test_mvp_complete.py import pytest from fastapi.testclient import TestClient @pytest.fixture def client(): from main import app return TestClient(app) @pytest.fixture def authenticated_tenant(): return create_test_tenant(plan='starter') def test_full_user_flow(client, authenticated_tenant): """Test complete user flow from signup to AI usage.""" response = client.post( "/api/v1/auth/token", data={"username": authenticated_tenant.email, "password": "test123"} ) assert response.status_code == 200 token = response.json()["access_token"] headers = {"Authorization": f"Bearer {token}"} response = client.post( "/api/v1/ai/complete", headers=headers, json={"prompt": "Hello, world!", "model": "gpt-3.5-turbo"} ) assert response.status_code == 200 assert "completion" in response.json() response = client.get( "/api/v1/subscriptions/current", headers=headers ) assert response.status_code == 200 subscription = response.json() assert subscription["plan"] == "starter" def test_quota_enforcement(client, authenticated_tenant): """Test quota limiting works correctly.""" tenant = authenticated_tenant tenant.plan = "free" db.commit() set_quota(tenant.id, QuotaType.AI_TOKENS, 100) headers = {"Authorization": f"Bearer {get_token(tenant)}"} for i in range(5): response = client.post( "/api/v1/ai/complete", headers=headers, json={"prompt": "Test prompt", "max_tokens": 100} ) response = client.post( "/api/v1/ai/complete", headers=headers, json={"prompt": "Test", "max_tokens": 50} ) assert response.status_code == 429 assert "quota_exceeded" in response.json()["detail"]["error"] def test_nigerian_payment_flow(client, authenticated_tenant): """Test payment processing with Nigerian methods.""" headers = {"Authorization": f"Bearer {get_token(authenticated_tenant)}"} response = client.post( "/api/v1/billing/subscriptions", headers=headers, json={ "plan": "professional", "payment_method": "paystack_transfer", "billing_email": authenticated_tenant.billing_email } ) assert response.status_code == 200 subscription = response.json() assert subscription["status"] == "pending_payment" assert subscription["amount_ngn"] == 45000 ```30 min
← All coursesStart chapter 1 →