18. Data Analysis Platform Project
Chapter 18 of 18 · 30 min
This final chapter integrates all previous concepts into a complete, production-ready data analysis platform. The system automates data ingestion, analysis, and reporting.
Project Structure
data_analysis_platform/
├── config/
│ └── sources.yaml
├── src/
│ ├── __init__.py
│ ├── ingestion.py
│ ├── analysis.py
│ ├── reporting.py
│ └── api.py
├── data/
│ └── processed/
├── reports/
├── tests/
├── main.py
└── requirements.txt
Configuration Management
# config/sources.yaml
sources:
- name: sales
type: csv
path: data/sales.csv
schema:
date_column: order_date
numeric_columns:
- revenue
- quantity
categorical_columns:
- product_category
- region
- name: inventory
type: json
path: data/inventory.json
schema:
date_column: timestamp
analysis:
anomaly_detection:
method: isolation_forest
contamination: 0.01
features:
- revenue
- quantity
- unit_price
correlation:
threshold: 0.7
methods:
- pearson
- spearman
time_series:
frequency: D
trend_window: 30
seasonality_period: 7
reporting:
schedule: daily
formats:
- markdown
- html
recipients:
- [email protected]
Main Application
# main.py
import argparse
import logging
from pathlib import Path
from datetime import datetime
from src.ingestion import DataIngestion
from src.analysis import DataAnalysis
from src.reporting import ReportGenerator
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class DataAnalysisPlatform:
def __init__(self, config_path):
self.config = self.load_config(config_path)
self.ingestion = DataIngestion(self.config)
self.analysis = DataAnalysis(self.config)
self.reporting = ReportGenerator(self.config)
def load_config(self, path):
import yaml
with open(path) as f:
return yaml.safe_load(f)
def run_full_pipeline(self):
logger.info("Starting data analysis pipeline")
# Step 1: Ingest data from all sources
logger.info("Ingesting data...")
df = self.ingestion.load_all()
logger.info(f"Loaded {len(df):,} records")
# Step 2: Perform analysis
logger.info("Running analysis...")
results = self.analysis.run_all(df)
logger.info(f"Analysis complete. Found {results['anomaly_count']} anomalies")
# Step 3: Generate reports
logger.info("Generating reports...")
report_path = self.reporting.generate(df, results)
logger.info(f"Report saved to {report_path}")
return results
def run_analysis_only(self, df):
return self.analysis.run_all(df)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Data Analysis Platform')
parser.add_argument('--config', default='config/sources.yaml', help='Config file path')
parser.add_argument('--mode', choices=['full', 'analysis-only'], default='full')
parser.add_argument('--input', help='Input CSV for analysis-only mode')
args = parser.parse_args()
platform = DataAnalysisPlatform(args.config)
if args.mode == 'full':
platform.run_full_pipeline()
else:
df = pd.read_csv(args.input)
results = platform.run_analysis_only(df)
print(results)
Ingestion Module
# src/ingestion.py
import pandas as pd
import logging
from pathlib import Path
logger = logging.getLogger(__name__)
class DataIngestion:
def __init__(self, config):
self.config = config
self.sources = config.get('sources', [])
def load_csv(self, source):
path = Path(source['path'])
if not path.exists():
raise FileNotFoundError(f"Data file not found: {path}")
parse_dates = [source['schema']['date_column']] if 'date_column' in source['schema'] else None
df = pd.read_csv(
path,
parse_dates=parse_dates,
low_memory=False
)
df['source'] = source['name']
return df
def load_json(self, source):
return pd.read_json(source['path'])
def load_sql(self, source):
import sqlite3
conn = sqlite3.connect(source['db_path'])
df = pd.read_sql(source['query'], conn)
conn.close()
df['source'] = source['name']
return df
def load_all(self):
data_frames = []
for source in self.sources:
try:
if source['type'] == 'csv':
df = self.load_csv(source)
elif source['type'] == 'json':
df = self.load_json(source)
elif source['type'] == 'sql':
df = self.load_sql(source)
data_frames.append(df)
logger.info(f"Loaded {len(df)} records from {source['name']}")
except Exception as e:
logger.error(f"Failed to load {source['name']}: {e}")
continue
combined = pd.concat(data_frames, ignore_index=True)
logger.info(f"Total records after merge: {len(combined)}")
return combined
Analysis Module
# src/analysis.py
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from scipy import stats
import logging
logger = logging.getLogger(__name__)
class DataAnalysis:
def __init__(self, config):
self.config = config
self.analysis_config = config.get('analysis', {})
def detect_anomalies(self, df):
if 'anomaly_detection' not in self.analysis_config:
return df, {'count': 0, 'details': []}
cfg = self.analysis_config['anomaly_detection']
features = cfg.get('features', df.select_dtypes(include=[np.number]).columns.tolist())
X = df[features].fillna(0)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
iso = IsolationForest(
contamination=cfg.get('contamination', 0.01),
n_estimators=100,
random_state=42
)
predictions = iso.fit_predict(X_scaled)
df['anomaly'] = predictions == -1
anomaly_count = (predictions == -1).sum()
logger.info(f"Detected {anomaly_count} anomalies")
return df, {
'count': anomaly_count,
'percentage': anomaly_count / len(df) * 100,
'features_used': features
}
def calculate_correlations(self, df):
numeric_cols = df.select_dtypes(include=[np.number]).columns
corr_matrix = df[numeric_cols].corr()
# Find high correlations
high_corr = []
for i in range(len(corr_matrix.columns)):
for j in range(i+1, len(corr_matrix.columns)):
if abs(corr_matrix.iloc[i, j]) > 0.7:
high_corr.append({
'col1': corr_matrix.columns[i],
'col2': corr_matrix.columns[j],
'correlation': corr_matrix.iloc[i, j]
})
return {'matrix': corr_matrix, 'high_correlations': high_corr}
def analyze_trends(self, df):
if 'time_series' not in self.analysis_config:
return {}
cfg = self.analysis_config['time_series']
freq = cfg.get('frequency', 'D')
window = cfg.get('trend_window', 30)
results = {}
for col in ['revenue', 'quantity']:
if col not in df.columns:
continue
series = df.set_index('date')[col].resample(freq).sum()
rolling = series.rolling(window).mean()
# Calculate trend direction
if len(rolling.dropna()) > 10:
x = np.arange(len(rolling.dropna()))
slope, _, r_value, _, _ = stats.linregress(x, rolling.dropna())
results[col] = {
'slope': slope,
'r_squared': r_value**2,
'trend_direction': 'up' if slope > 0 else 'down',
'latest_value': rolling.iloc[-1],
'moving_average': rolling.tolist()
}
return results
def run_all(self, df):
logger.info("Starting analysis pipeline")
results = {
'record_count': len(df),
'columns': df.columns.tolist(),
'timestamp': pd.Timestamp.now().isoformat()
}
# Run all analyses
df, anomaly_results = self.detect_anomalies(df)
results['anomalies'] = anomaly_results
correlation_results = self.calculate_correlations(df)
results['correlations'] = correlation_results
trend_results = self.analyze_trends(df)
results['trends'] = trend_results
# Summary statistics
results['summary'] = df.describe().to_dict()
return results
Reporting Module
# src/reporting.py
import pandas as pd
from pathlib import Path
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class ReportGenerator:
def __init__(self, config):
self.config = config
self.report_config = config.get('reporting', {})
self.output_dir = Path('reports')
self.output_dir.mkdir(exist_ok=True)
def generate_markdown(self, df, results):
timestamp = datetime.now().strftime('%Y%m%d_%H%M')
report = f"""# Data Analysis Report
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
## Dataset Overview
- Records: {len(df):,}
- Columns: {len(df.columns)}
- Sources: {df['source'].unique().tolist()}
## Key Metrics
| Metric | Value |
|--------|-------|
| Total Rows | {len(df):,} |
| Anomalies | {results['anomalies']['count']} ({results['anomalies']['percentage']:.2f}%) |
| High Correlations | {len(results['correlations']['high_correlations'])} |
## Anomaly Details
"""
if results['anomalies']['count'] > 0:
report += f"\nFeatures used for anomaly detection: {results['anomalies']['features_used']}\n"
report += "\n## High Correlations\n"
for corr in results['correlations']['high_correlations']:
report += f"- {corr['col1']} ↔ {corr['col2']}: {corr['correlation']:.3f}\n"
report += "\n## Trend Analysis\n"
for col, trend in results['trends'].items():
report += f"\n### {col}\n"
report += f"- Direction: {trend['trend_direction']}\n"
report += f"- Slope: {trend['slope']:.4f}\n"
report += f"- R²: {trend['r_squared']:.4f}\n"
report += "\n## Summary Statistics\n"
report += df.describe().to_markdown()
return report
def generate(self, df, results):
timestamp = datetime.now().strftime('%Y%m%d_%H%M')
# Generate Markdown
markdown_report = self.generate_markdown(df, results)
md_path = self.output_dir / f"report_{timestamp}.md"
with open(md_path, 'w') as f:
f.write(markdown_report)
logger.info(f"Report saved to {md_path}")
return md_path
Testing
# tests/test_analysis.py
import pytest
import pandas as pd
import numpy as np
from src.analysis import DataAnalysis
@pytest.fixture
def sample_data():
np.random.seed(42)
dates = pd.date_range('2024-01-01', periods=100)
df = pd.DataFrame({
'date': dates,
'revenue': np.random.randn(100).cumsum() + 1000,
'quantity': np.random.randint(1, 100, 100),
'category': np.random.choice(['A', 'B', 'C'], 100)
})
return df
def test_anomaly_detection(sample_data):
config = {
'analysis': {
'anomaly_detection': {
'features': ['revenue', 'quantity'],
'contamination': 0.05
}
}
}
analysis = DataAnalysis(config)
df_result, results = analysis.detect_anomalies(sample_data)
assert 'anomaly' in df_result.columns
assert results['count'] >= 0
assert results['count'] <= len(sample_data)
def test_correlation_calculation(sample_data):
analysis = DataAnalysis({})
results = analysis.calculate_correlations(sample_data)
assert 'matrix' in results
assert 'high_correlations' in results
def test_trend_analysis(sample_data):
config = {
'analysis': {
'time_series': {
'frequency': 'D',
'trend_window': 7
}
}
}
analysis = DataAnalysis(config)
results = analysis.analyze_trends(sample_data)
assert 'revenue' in results or len(results) >= 0
Deployment
# Run full pipeline
python main.py --config config/sources.yaml --mode full
# Run analysis only
python main.py --config config/sources.yaml --mode analysis-only --input data/new_data.csv
# Run tests
pytest tests/ -v
# Schedule with cron (run at 8 AM daily)
0 8 * * * cd /path/to/project && python main.py --config config/sources.yaml >> logs/cron.log 2>&1
EXERCISE
Extend the platform by adding: (1) email notification on anomaly detection, (2) visualization generation integrated into reports, (3) a CLI command for querying specific analysis results. This completes Data Analysis with Local AI (I014). Continue with other courses in the runlocalai curriculum for additional topics.