RUNLOCALAIv38
->Will it run?Best GPUCompareTroubleshootStartLearnPulseModelsHardwareToolsBench
Run check
RUNLOCALAI

Independently operated catalog for local-AI hardware and software. Hand-written verdicts. Source-cited claims. Reproducible commands when we have them.

OP·Fredoline Eruo
DIR
  • Models
  • Hardware
  • Tools
  • Benchmarks
TOOLS
  • Will it run?
  • Compare hardware
  • Cost vs cloud
  • Choose my GPU
  • Prompting kits
  • Quick answers
REF
  • All buyer guides
  • Learn local AI
  • Methodology
  • Glossary
  • Errors KB
  • Trust
EDITOR
  • About
  • Author
  • How we make money
  • Editorial policy
  • Contact
LEGAL
  • Privacy
  • Terms
  • Sitemap
MAIL · MONTHLY DIGEST
Get monthly local AI changes
Monthly recap. No spam.
DISCLOSURE

Some links on this site are affiliate links (Amazon Associates and other first-class retailers). When you buy through them, we earn a small commission at no extra cost to you. Affiliate links do not influence our verdicts — there are cards we rate highly that we don't have affiliate relationships with, and cards that sell well that we refuse to recommend. Read more →

© 2026 runlocalai.coIndependently operated
RUNLOCALAI · v38
  1. >
  2. Home
  3. /Learn
  4. /Courses
  5. /Data Analysis with Local AI
  6. /Ch. 18
Data Analysis with Local AI

18. Data Analysis Platform Project

Chapter 18 of 18 · 30 min
KEY INSIGHT

Production platforms separate concerns—ingestion, analysis, and reporting are independent modules that communicate through well-defined interfaces and data structures.

This final chapter integrates all previous concepts into a complete, production-ready data analysis platform. The system automates data ingestion, analysis, and reporting.

Project Structure

data_analysis_platform/
├── config/
│   └── sources.yaml
├── src/
│   ├── __init__.py
│   ├── ingestion.py
│   ├── analysis.py
│   ├── reporting.py
│   └── api.py
├── data/
│   └── processed/
├── reports/
├── tests/
├── main.py
└── requirements.txt

Configuration Management

# config/sources.yaml
sources:
  - name: sales
    type: csv
    path: data/sales.csv
    schema:
      date_column: order_date
      numeric_columns:
        - revenue
        - quantity
      categorical_columns:
        - product_category
        - region

  - name: inventory
    type: json
    path: data/inventory.json
    schema:
      date_column: timestamp

analysis:
  anomaly_detection:
    method: isolation_forest
    contamination: 0.01
    features:
      - revenue
      - quantity
      - unit_price

  correlation:
    threshold: 0.7
    methods:
      - pearson
      - spearman

  time_series:
    frequency: D
    trend_window: 30
    seasonality_period: 7

reporting:
  schedule: daily
  formats:
    - markdown
    - html
  recipients:
    - [email protected]

Main Application

# main.py
import argparse
import logging
from pathlib import Path
from datetime import datetime

from src.ingestion import DataIngestion
from src.analysis import DataAnalysis
from src.reporting import ReportGenerator

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class DataAnalysisPlatform:
    def __init__(self, config_path):
        self.config = self.load_config(config_path)
        self.ingestion = DataIngestion(self.config)
        self.analysis = DataAnalysis(self.config)
        self.reporting = ReportGenerator(self.config)
        
    def load_config(self, path):
        import yaml
        with open(path) as f:
            return yaml.safe_load(f)
    
    def run_full_pipeline(self):
        logger.info("Starting data analysis pipeline")
        
        # Step 1: Ingest data from all sources
        logger.info("Ingesting data...")
        df = self.ingestion.load_all()
        logger.info(f"Loaded {len(df):,} records")
        
        # Step 2: Perform analysis
        logger.info("Running analysis...")
        results = self.analysis.run_all(df)
        logger.info(f"Analysis complete. Found {results['anomaly_count']} anomalies")
        
        # Step 3: Generate reports
        logger.info("Generating reports...")
        report_path = self.reporting.generate(df, results)
        logger.info(f"Report saved to {report_path}")
        
        return results
    
    def run_analysis_only(self, df):
        return self.analysis.run_all(df)

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Data Analysis Platform')
    parser.add_argument('--config', default='config/sources.yaml', help='Config file path')
    parser.add_argument('--mode', choices=['full', 'analysis-only'], default='full')
    parser.add_argument('--input', help='Input CSV for analysis-only mode')
    
    args = parser.parse_args()
    
    platform = DataAnalysisPlatform(args.config)
    
    if args.mode == 'full':
        platform.run_full_pipeline()
    else:
        df = pd.read_csv(args.input)
        results = platform.run_analysis_only(df)
        print(results)

Ingestion Module

# src/ingestion.py
import pandas as pd
import logging
from pathlib import Path

logger = logging.getLogger(__name__)

class DataIngestion:
    def __init__(self, config):
        self.config = config
        self.sources = config.get('sources', [])
    
    def load_csv(self, source):
        path = Path(source['path'])
        if not path.exists():
            raise FileNotFoundError(f"Data file not found: {path}")
        
        parse_dates = [source['schema']['date_column']] if 'date_column' in source['schema'] else None
        
        df = pd.read_csv(
            path,
            parse_dates=parse_dates,
            low_memory=False
        )
        
        df['source'] = source['name']
        return df
    
    def load_json(self, source):
        return pd.read_json(source['path'])
    
    def load_sql(self, source):
        import sqlite3
        conn = sqlite3.connect(source['db_path'])
        df = pd.read_sql(source['query'], conn)
        conn.close()
        df['source'] = source['name']
        return df
    
    def load_all(self):
        data_frames = []
        
        for source in self.sources:
            try:
                if source['type'] == 'csv':
                    df = self.load_csv(source)
                elif source['type'] == 'json':
                    df = self.load_json(source)
                elif source['type'] == 'sql':
                    df = self.load_sql(source)
                
                data_frames.append(df)
                logger.info(f"Loaded {len(df)} records from {source['name']}")
                
            except Exception as e:
                logger.error(f"Failed to load {source['name']}: {e}")
                continue
        
        combined = pd.concat(data_frames, ignore_index=True)
        logger.info(f"Total records after merge: {len(combined)}")
        
        return combined

Analysis Module

# src/analysis.py
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from scipy import stats
import logging

logger = logging.getLogger(__name__)

class DataAnalysis:
    def __init__(self, config):
        self.config = config
        self.analysis_config = config.get('analysis', {})
    
    def detect_anomalies(self, df):
        if 'anomaly_detection' not in self.analysis_config:
            return df, {'count': 0, 'details': []}
        
        cfg = self.analysis_config['anomaly_detection']
        features = cfg.get('features', df.select_dtypes(include=[np.number]).columns.tolist())
        
        X = df[features].fillna(0)
        scaler = StandardScaler()
        X_scaled = scaler.fit_transform(X)
        
        iso = IsolationForest(
            contamination=cfg.get('contamination', 0.01),
            n_estimators=100,
            random_state=42
        )
        
        predictions = iso.fit_predict(X_scaled)
        df['anomaly'] = predictions == -1
        
        anomaly_count = (predictions == -1).sum()
        logger.info(f"Detected {anomaly_count} anomalies")
        
        return df, {
            'count': anomaly_count,
            'percentage': anomaly_count / len(df) * 100,
            'features_used': features
        }
    
    def calculate_correlations(self, df):
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        corr_matrix = df[numeric_cols].corr()
        
        # Find high correlations
        high_corr = []
        for i in range(len(corr_matrix.columns)):
            for j in range(i+1, len(corr_matrix.columns)):
                if abs(corr_matrix.iloc[i, j]) > 0.7:
                    high_corr.append({
                        'col1': corr_matrix.columns[i],
                        'col2': corr_matrix.columns[j],
                        'correlation': corr_matrix.iloc[i, j]
                    })
        
        return {'matrix': corr_matrix, 'high_correlations': high_corr}
    
    def analyze_trends(self, df):
        if 'time_series' not in self.analysis_config:
            return {}
        
        cfg = self.analysis_config['time_series']
        freq = cfg.get('frequency', 'D')
        window = cfg.get('trend_window', 30)
        
        results = {}
        for col in ['revenue', 'quantity']:
            if col not in df.columns:
                continue
                
            series = df.set_index('date')[col].resample(freq).sum()
            rolling = series.rolling(window).mean()
            
            # Calculate trend direction
            if len(rolling.dropna()) > 10:
                x = np.arange(len(rolling.dropna()))
                slope, _, r_value, _, _ = stats.linregress(x, rolling.dropna())
                
                results[col] = {
                    'slope': slope,
                    'r_squared': r_value**2,
                    'trend_direction': 'up' if slope > 0 else 'down',
                    'latest_value': rolling.iloc[-1],
                    'moving_average': rolling.tolist()
                }
        
        return results
    
    def run_all(self, df):
        logger.info("Starting analysis pipeline")
        
        results = {
            'record_count': len(df),
            'columns': df.columns.tolist(),
            'timestamp': pd.Timestamp.now().isoformat()
        }
        
        # Run all analyses
        df, anomaly_results = self.detect_anomalies(df)
        results['anomalies'] = anomaly_results
        
        correlation_results = self.calculate_correlations(df)
        results['correlations'] = correlation_results
        
        trend_results = self.analyze_trends(df)
        results['trends'] = trend_results
        
        # Summary statistics
        results['summary'] = df.describe().to_dict()
        
        return results

Reporting Module

# src/reporting.py
import pandas as pd
from pathlib import Path
from datetime import datetime
import logging

logger = logging.getLogger(__name__)

class ReportGenerator:
    def __init__(self, config):
        self.config = config
        self.report_config = config.get('reporting', {})
        self.output_dir = Path('reports')
        self.output_dir.mkdir(exist_ok=True)
    
    def generate_markdown(self, df, results):
        timestamp = datetime.now().strftime('%Y%m%d_%H%M')
        
        report = f"""# Data Analysis Report
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

## Dataset Overview
- Records: {len(df):,}
- Columns: {len(df.columns)}
- Sources: {df['source'].unique().tolist()}

## Key Metrics

| Metric | Value |
|--------|-------|
| Total Rows | {len(df):,} |
| Anomalies | {results['anomalies']['count']} ({results['anomalies']['percentage']:.2f}%) |
| High Correlations | {len(results['correlations']['high_correlations'])} |

## Anomaly Details
"""
        
        if results['anomalies']['count'] > 0:
            report += f"\nFeatures used for anomaly detection: {results['anomalies']['features_used']}\n"
        
        report += "\n## High Correlations\n"
        for corr in results['correlations']['high_correlations']:
            report += f"- {corr['col1']} ↔ {corr['col2']}: {corr['correlation']:.3f}\n"
        
        report += "\n## Trend Analysis\n"
        for col, trend in results['trends'].items():
            report += f"\n### {col}\n"
            report += f"- Direction: {trend['trend_direction']}\n"
            report += f"- Slope: {trend['slope']:.4f}\n"
            report += f"- R²: {trend['r_squared']:.4f}\n"
        
        report += "\n## Summary Statistics\n"
        report += df.describe().to_markdown()
        
        return report
    
    def generate(self, df, results):
        timestamp = datetime.now().strftime('%Y%m%d_%H%M')
        
        # Generate Markdown
        markdown_report = self.generate_markdown(df, results)
        md_path = self.output_dir / f"report_{timestamp}.md"
        
        with open(md_path, 'w') as f:
            f.write(markdown_report)
        
        logger.info(f"Report saved to {md_path}")
        
        return md_path

Testing

# tests/test_analysis.py
import pytest
import pandas as pd
import numpy as np
from src.analysis import DataAnalysis

@pytest.fixture
def sample_data():
    np.random.seed(42)
    dates = pd.date_range('2024-01-01', periods=100)
    df = pd.DataFrame({
        'date': dates,
        'revenue': np.random.randn(100).cumsum() + 1000,
        'quantity': np.random.randint(1, 100, 100),
        'category': np.random.choice(['A', 'B', 'C'], 100)
    })
    return df

def test_anomaly_detection(sample_data):
    config = {
        'analysis': {
            'anomaly_detection': {
                'features': ['revenue', 'quantity'],
                'contamination': 0.05
            }
        }
    }
    
    analysis = DataAnalysis(config)
    df_result, results = analysis.detect_anomalies(sample_data)
    
    assert 'anomaly' in df_result.columns
    assert results['count'] >= 0
    assert results['count'] <= len(sample_data)

def test_correlation_calculation(sample_data):
    analysis = DataAnalysis({})
    results = analysis.calculate_correlations(sample_data)
    
    assert 'matrix' in results
    assert 'high_correlations' in results

def test_trend_analysis(sample_data):
    config = {
        'analysis': {
            'time_series': {
                'frequency': 'D',
                'trend_window': 7
            }
        }
    }
    
    analysis = DataAnalysis(config)
    results = analysis.analyze_trends(sample_data)
    
    assert 'revenue' in results or len(results) >= 0

Deployment

# Run full pipeline
python main.py --config config/sources.yaml --mode full

# Run analysis only
python main.py --config config/sources.yaml --mode analysis-only --input data/new_data.csv

# Run tests
pytest tests/ -v

# Schedule with cron (run at 8 AM daily)
0 8 * * * cd /path/to/project && python main.py --config config/sources.yaml >> logs/cron.log 2>&1
EXERCISE

Extend the platform by adding: (1) email notification on anomaly detection, (2) visualization generation integrated into reports, (3) a CLI command for querying specific analysis results. This completes Data Analysis with Local AI (I014). Continue with other courses in the runlocalai curriculum for additional topics.

← Chapter 17
Multi-Source Analysis
Course complete →
Browse all courses