RUNLOCALAIv38
->Will it run?Best GPUCompareTroubleshootStartLearnPulseModelsHardwareToolsBench
Run check
RUNLOCALAI

Independently operated catalog for local-AI hardware and software. Hand-written verdicts. Source-cited claims. Reproducible commands when we have them.

OP·Fredoline Eruo
DIR
  • Models
  • Hardware
  • Tools
  • Benchmarks
TOOLS
  • Will it run?
  • Compare hardware
  • Cost vs cloud
  • Choose my GPU
  • Prompting kits
  • Quick answers
REF
  • All buyer guides
  • Learn local AI
  • Methodology
  • Glossary
  • Errors KB
  • Trust
EDITOR
  • About
  • Author
  • How we make money
  • Editorial policy
  • Contact
LEGAL
  • Privacy
  • Terms
  • Sitemap
MAIL · MONTHLY DIGEST
Get monthly local AI changes
Monthly recap. No spam.
DISCLOSURE

Some links on this site are affiliate links (Amazon Associates and other first-class retailers). When you buy through them, we earn a small commission at no extra cost to you. Affiliate links do not influence our verdicts — there are cards we rate highly that we don't have affiliate relationships with, and cards that sell well that we refuse to recommend. Read more →

© 2026 runlocalai.coIndependently operated
RUNLOCALAI · v38
  1. >
  2. Home
  3. /Learn
  4. /Courses
  5. /Data Analysis with Local AI
  6. /Ch. 17
Data Analysis with Local AI

17. Multi-Source Analysis

Chapter 17 of 18 · 20 min
KEY INSIGHT

Define a canonical schema before loading data. Map all sources to this schema during load rather than patching mismatches later.

Real analysis combines data from multiple sources—databases, APIs, files, streams. This chapter covers patterns for integrating heterogeneous data.

Reading from Multiple Formats

def load_all_sources(config):
    data_frames = []
    
    for source in config['sources']:
        if source['type'] == 'csv':
            df = pd.read_csv(source['path'], parse_dates=[source.get('date_col')])
        elif source['type'] == 'excel':
            df = pd.read_excel(source['path'], sheet_name=source.get('sheet'))
        elif source['type'] == 'json':
            df = pd.read_json(source['path'], orient='records')
        elif source['type'] == 'parquet':
            df = pd.read_parquet(source['path'])
        elif source['type'] == 'sql':
            import sqlite3
            conn = sqlite3.connect(source['db'])
            df = pd.read_sql(source['query'], conn)
            conn.close()
        
        df['source'] = source['name']
        data_frames.append(df)
    
    return pd.concat(data_frames, ignore_index=True)

config = {
    'sources': [
        {'type': 'csv', 'path': 'sales.csv', 'date_col': 'sale_date', 'name': 'sales'},
        {'type': 'excel', 'path': 'inventory.xlsx', 'sheet': 0, 'name': 'inventory'},
        {'type': 'json', 'path': 'feedback.json', 'name': 'feedback'}
    ]
}

combined_df = load_all_sources(config)

Schema Alignment

Different sources often have different column names for the same data:

def align_schema(df, mapping):
    """Rename and transform columns to match target schema."""
    df = df.rename(columns=mapping['rename'])
    
    for col, func in mapping.get('transform', {}).items():
        df[col] = func(df[col])
    
    return df

mapping = {
    'rename': {
        'sale_dt': 'date',
        'cust_id': 'customer_id',
        'sale_amt': 'revenue',
        'prod_category': 'category'
    },
    'transform': {
        'revenue': lambda x: x.fillna(0),
        'date': lambda x: pd.to_datetime(x)
    }
}

aligned_df = align_schema(df, mapping)

Handling Duplicates from Multiple Sources

def merge_with_deduplication(primary_df, secondary_df, on_columns, strategy='update'):
    """
    Merge secondary data into primary, handling duplicates.
    strategy: 'update' (prioritize primary), 'append' (keep all), 'latest' (newest wins)
    """
    combined = pd.concat([primary_df, secondary_df], ignore_index=True)
    
    if strategy == 'latest':
        combined = combined.sort_values('updated_at').drop_duplicates(
            subset=on_columns, keep='last'
        )
    elif strategy == 'update':
        combined = combined.drop_duplicates(subset=on_columns, keep='first')
    
    return combined.reset_index(drop=True)

# Example: merge customer data from CRM and marketing platform
merged = merge_with_deduplication(
    primary_df=crm_data,
    secondary_df=marketing_data,
    on_columns=['email', 'customer_id'],
    strategy='latest'
)

Time-Based Data Alignment

def align_by_time(df1, df2, time_col, tolerance='1h'):
    """Align two DataFrames within time tolerance."""
    df1 = df1.copy().set_index(time_col)
    df2 = df2.copy().set_index(time_col)
    
    # Reindex df1 to df2's timestamps
    df2_aligned = df2.reindex(df1.index, method='nearest', tolerance=pd.Timedelta(tolerance))
    
    return pd.concat([df1, df2_aligned.add_suffix('_secondary')], axis=1).reset_index()

# Align sensor readings with environmental data
aligned = align_by_time(sensor_df, env_df, time_col='timestamp', tolerance='30min')

Multi-Source Aggregation

def aggregate_across_sources(df, group_cols, agg_funcs):
    """Aggregate metrics across all sources."""
    agg_dict = {}
    for col, func in agg_funcs.items():
        if func == 'sum':
            agg_dict[col] = 'sum'
        elif func == 'mean':
            agg_dict[col] = 'mean'
        elif func == 'count':
            agg_dict[col] = 'count'
        elif func == 'first':
            agg_dict[col] = 'first'
    
    result = df.groupby(group_cols + ['source']).agg(agg_dict).reset_index()
    
    # Pivot for source comparison
    pivot = result.pivot_table(index=group_cols, columns='source', values=list(agg_funcs.keys()))
    
    return pivot
EXERCISE

Create a pipeline that loads data from CSV, JSON, and a SQLite database, aligns schemas, merges duplicates using 'latest' strategy, and produces a unified analysis dataset.

← Chapter 16
Report Generation
Chapter 18 →
Data Analysis Platform Project