KEY INSIGHT
Multi-tenant analytics in Nigerian SaaS must segment by tenant while providing aggregate insights for platform operations, with dashboard loading optimized for varying internet speeds in Lagos and Port Harcourt.
Analytics dashboards for AI SaaS serve two audiences: tenants viewing their own usage patterns, and platform operators monitoring overall health. The architecture must efficiently serve both without creating performance bottlenecks.
```python
from datetime import datetime, timedelta
from typing import Optional
import pandas as pd
from sqlalchemy import func, case
from dataclasses import dataclass
@dataclass
class DashboardMetric:
name: str
value: any
unit: str
change_pct: float
trend: list
class TenantAnalyticsService:
"""Analytics service for tenant-specific metrics."""
def __init__(self, db_session, cache_service):
self.db = db_session
self.cache = cache_service
def get_dashboard_metrics(
self,
tenant_id: str,
period: str = '30d'
) -> dict:
"""Get thorough dashboard metrics for tenant."""
cache_key = f"dashboard:{tenant_id}:{period}"
cached = self.cache.get(cache_key)
if cached:
return cached
date_range = self._parse_period(period)
metrics = {
'api_usage': self._get_api_usage(tenant_id, date_range),
'ai_tokens': self._get_ai_token_usage(tenant_id, date_range),
'response_times': self._get_response_time_stats(tenant_id, date_range),
'error_rates': self._get_error_rates(tenant_id, date_range),
'top_endpoints': self._get_top_endpoints(tenant_id, date_range),
'usage_trend': self._get_usage_trend(tenant_id, date_range),
'quota_status': self._get_quota_status(tenant_id),
'cost_breakdown': self._get_cost_breakdown(tenant_id, date_range)
}
self.cache.set(cache_key, metrics, ttl=300)
return metrics
def _get_ai_token_usage(self, tenant_id: str, date_range: tuple) -> dict:
"""Get AI token usage breakdown by model."""
results = self.db.query(
UsageRecord.quota_type,
func.sum(UsageRecord.amount).label('total'),
func.count(UsageRecord.id).label('requests')
).filter(
UsageRecord.tenant_id == tenant_id,
UsageRecord.created_at >= date_range[0],
UsageRecord.created_at <= date_range[1]
).group_by(
UsageRecord.quota_type
).all()
by_model = {}
total_tokens = 0
for row in results:
model = row.quota_type.replace('ai_tokens_', '')
by_model[model] = {
'tokens': row.total,
'requests': row.requests,
'avg_tokens_per_request': row.total / row.requests if row.requests > 0 else 0
}
total_tokens += row.total
return {
'total': total_tokens,
'by_model': by_model,
'period': {
'start': date_range[0].isoformat(),
'end': date_range[1].isoformat()
}
}
def _get_response_time_stats(self, tenant_id: str, date_range: tuple) -> dict:
"""Get response time statistics."""
results = self.db.query(
func.avg(APIRequest.duration_ms).label('avg'),
func.percentile_cont(0.5).within(
APIRequest.duration_ms
).label('p50'),
func.percentile_cont(0.95).within(
APIRequest.duration_ms
).label('p95'),
func.percentile_cont(0.99).within(
APIRequest.duration_ms
).label('p99')
).filter(
APIRequest.tenant_id == tenant_id,
APIRequest.created_at >= date_range[0],
APIRequest.created_at <= date_range[1]
).first()
return {
'avg_ms': round(float(results.avg), 2) if results.avg else 0,
'p50_ms': round(float(results.p50), 2) if results.p50 else 0,
'p95_ms': round(float(results.p95), 2) if results.p95 else 0,
'p99_ms': round(float(results.p99), 2) if results.p99 else 0
}
def _get_cost_breakdown(self, tenant_id: str, date_range: tuple) -> dict:
"""Calculate cost breakdown in NGN."""
ai_costs = self._calculate_ai_costs(tenant_id, date_range)
api_costs = self._calculate_api_costs(tenant_id, date_range)
total_ngn = ai_costs['total'] + api_costs['total']
return {
'total_ngn': total_ngn,
'breakdown': {
'ai_tokens': ai_costs,
'api_calls': api_costs
},
'currency': 'NGN'
}
```
**Optimizing for Nigerian Network Conditions:**
Dashboard loading must account for variable internet speeds. Large datasets should be paginated and visualizations should use progressive loading to avoid timeout issues.
```python
class DashboardDataService:
"""Service for optimized dashboard data retrieval."""
def __init__(self, db_session, analytics_db):
self.db = db_session
self.analytics = analytics_db
def get_trend_data(
self,
tenant_id: str,
metric: str,
period: str = '30d',
granularity: str = 'daily'
) -> list:
"""Get time-series trend data with appropriate granularity."""
granularity_map = {
'hourly': 'YYYY-MM-DD HH24:00',
'daily': 'YYYY-MM-DD',
'weekly': 'IYYY-IW',
'monthly': 'YYYY-MM'
}
date_format = granularity_map.get(granularity, 'YYYY-MM-DD')
if self.analytics == 'clickhouse':
query = f"""
SELECT
formatDateTime(created_at, '{date_format}') as period,
sum(amount) as total,
count() as count
FROM usage_records
WHERE tenant_id = %(tenant_id)s
AND created_at >= %(start_date)s
AND created_at <= %(end_date)s
AND quota_type = %(metric)s
GROUP BY period
ORDER BY period
"""
else:
query = """
SELECT
to_char(created_at, :date_format) as period,
sum(amount) as total,
count(*) as count
FROM usage_records
WHERE tenant_id = :tenant_id
AND created_at >= :start_date
AND created_at <= :end_date
AND quota_type = :metric
GROUP BY period
ORDER BY period
"""
return self.db.execute(query, {
'tenant_id': tenant_id,
'metric': metric,
'date_format': date_format,
'start_date': datetime.utcnow() - timedelta(days=30),
'end_date': datetime.utcnow()
}).fetchall()
```
**Platform-Wide Analytics:**
```python
class PlatformAnalyticsService:
"""Analytics for platform operators."""
def __init__(self, db_session):
self.db = db_session
def get_platform_summary(self, period: str = '7d') -> dict:
"""Get platform-wide analytics summary."""
date_range = self._parse_period(period)
mrr = self._calculate_mrr()
active_tenants = self._count_active_tenants(date_range)
usage_growth = self._calculate_usage_growth(date_range)
top_tenants = self._get_top_tenants_by_usage(date_range, limit=20)
return {
'mrr_ngn': mrr,
'active_tenants': active_tenants,
'usage_growth_pct': usage_growth,
'top_tenants': top_tenants,
'period': period
}
def _calculate_mrr(self) -> dict:
"""Calculate Monthly Recurring Revenue."""
subscriptions = self.db.query(Subscription).filter(
Subscription.status == 'active'
).all()
total = sum(s.monthly_amount for s in subscriptions)
breakdown = self.db.query(
Tenant.plan,
func.count(Tenant.id).label('count'),
func.sum(Subscription.monthly_amount).label('revenue')
).join(
Subscription, Subscription.tenant_id == Tenant.id
).filter(
Subscription.status == 'active'
).group_by(
Tenant.plan
).all()
return {
'total': float(total),
'currency': 'NGN',
'by_plan': [{'plan': r.plan, 'count': r.count, 'revenue': float(r.revenue)} for r in breakdown]
}
```
**Common Failure Modes:**
Dashboard queries against large datasets without proper indexes cause timeouts. Always create composite indexes on (tenant_id, created_at) for usage tables and implement query result caching with invalidation.
```python
# Migrations for analytics performance
def upgrade_analytics():
"""Add indexes for dashboard performance."""
migrations = [
("CREATE INDEX CONCURRENTLY idx_usage_tenant_date ON usage_records(tenant_id, created_at)",
"Index for tenant time-series queries"),
("CREATE INDEX CONCURRENTLY idx_requests_tenant_duration ON api_requests(tenant_id, duration_ms)",
"Index for response time analytics"),
("CREATE INDEX CONCURRENTLY idx_subscriptions_active ON subscriptions(tenant_id) WHERE status = 'active'",
"Partial index for active subscriptions"),
]
for sql, description in migrations:
execute_migration(sql, description)
```