17. Multi-Source Analysis
Chapter 17 of 18 · 20 min
Real analysis combines data from multiple sources—databases, APIs, files, streams. This chapter covers patterns for integrating heterogeneous data.
Reading from Multiple Formats
def load_all_sources(config):
data_frames = []
for source in config['sources']:
if source['type'] == 'csv':
df = pd.read_csv(source['path'], parse_dates=[source.get('date_col')])
elif source['type'] == 'excel':
df = pd.read_excel(source['path'], sheet_name=source.get('sheet'))
elif source['type'] == 'json':
df = pd.read_json(source['path'], orient='records')
elif source['type'] == 'parquet':
df = pd.read_parquet(source['path'])
elif source['type'] == 'sql':
import sqlite3
conn = sqlite3.connect(source['db'])
df = pd.read_sql(source['query'], conn)
conn.close()
df['source'] = source['name']
data_frames.append(df)
return pd.concat(data_frames, ignore_index=True)
config = {
'sources': [
{'type': 'csv', 'path': 'sales.csv', 'date_col': 'sale_date', 'name': 'sales'},
{'type': 'excel', 'path': 'inventory.xlsx', 'sheet': 0, 'name': 'inventory'},
{'type': 'json', 'path': 'feedback.json', 'name': 'feedback'}
]
}
combined_df = load_all_sources(config)
Schema Alignment
Different sources often have different column names for the same data:
def align_schema(df, mapping):
"""Rename and transform columns to match target schema."""
df = df.rename(columns=mapping['rename'])
for col, func in mapping.get('transform', {}).items():
df[col] = func(df[col])
return df
mapping = {
'rename': {
'sale_dt': 'date',
'cust_id': 'customer_id',
'sale_amt': 'revenue',
'prod_category': 'category'
},
'transform': {
'revenue': lambda x: x.fillna(0),
'date': lambda x: pd.to_datetime(x)
}
}
aligned_df = align_schema(df, mapping)
Handling Duplicates from Multiple Sources
def merge_with_deduplication(primary_df, secondary_df, on_columns, strategy='update'):
"""
Merge secondary data into primary, handling duplicates.
strategy: 'update' (prioritize primary), 'append' (keep all), 'latest' (newest wins)
"""
combined = pd.concat([primary_df, secondary_df], ignore_index=True)
if strategy == 'latest':
combined = combined.sort_values('updated_at').drop_duplicates(
subset=on_columns, keep='last'
)
elif strategy == 'update':
combined = combined.drop_duplicates(subset=on_columns, keep='first')
return combined.reset_index(drop=True)
# Example: merge customer data from CRM and marketing platform
merged = merge_with_deduplication(
primary_df=crm_data,
secondary_df=marketing_data,
on_columns=['email', 'customer_id'],
strategy='latest'
)
Time-Based Data Alignment
def align_by_time(df1, df2, time_col, tolerance='1h'):
"""Align two DataFrames within time tolerance."""
df1 = df1.copy().set_index(time_col)
df2 = df2.copy().set_index(time_col)
# Reindex df1 to df2's timestamps
df2_aligned = df2.reindex(df1.index, method='nearest', tolerance=pd.Timedelta(tolerance))
return pd.concat([df1, df2_aligned.add_suffix('_secondary')], axis=1).reset_index()
# Align sensor readings with environmental data
aligned = align_by_time(sensor_df, env_df, time_col='timestamp', tolerance='30min')
Multi-Source Aggregation
def aggregate_across_sources(df, group_cols, agg_funcs):
"""Aggregate metrics across all sources."""
agg_dict = {}
for col, func in agg_funcs.items():
if func == 'sum':
agg_dict[col] = 'sum'
elif func == 'mean':
agg_dict[col] = 'mean'
elif func == 'count':
agg_dict[col] = 'count'
elif func == 'first':
agg_dict[col] = 'first'
result = df.groupby(group_cols + ['source']).agg(agg_dict).reset_index()
# Pivot for source comparison
pivot = result.pivot_table(index=group_cols, columns='source', values=list(agg_funcs.keys()))
return pivot
EXERCISE
Create a pipeline that loads data from CSV, JSON, and a SQLite database, aligns schemas, merges duplicates using 'latest' strategy, and produces a unified analysis dataset.