How to use AI for data quality validation within an ETL pipeline to flag anomalies in ingested records
ETL pipeline running, AI model for classification/anomaly detection
What this does
Traditional data quality checks rely on static rules such as range constraints, null thresholds, and regex patterns. These rules fail to catch semantic anomalies—records that pass all structural checks but contain implausible or contradictory data. This guide shows how to add an AI-powered validation step within an ETL pipeline that examines each record's fields in context and flags semantic anomalies, such as a transaction amount that is inconsistent with the vendor category or a customer location that contradicts their shipping address.
Steps
Define the data quality dimensions to check: completeness (missing values), consistency (contradicting values across fields), accuracy (implausible values), and timeliness (stale records). Choose which dimensions the AI will validate.
Build a few-shot prompt with 3-5 examples of both valid and anomalous records, each annotated with a quality judgment and a one-line explanation. Include the expected value ranges and business rules in the prompt preamble.
Write the validation function. It takes one record at a time (or a batch of up to 10 for efficiency), sends it to the AI with the few-shot prompt, and parses the response. The AI must return a structured JSON object with two fields:
"valid": true|falseand"reason": "explanation".Set a confidence threshold. If the AI indicates the record is anomalous but the reason is vague ("seems wrong"), treat it as a warning. If the AI provides a specific semantic contradiction ("amount exceeds 3x the average for this vendor"), treat it as an anomaly and route the record to an anomaly queue.
Integrate the validation step after schema validation but before the load phase. This ensures only structurally valid records reach the AI, reducing unnecessary API costs.
Accumulate anomalies in a dedicated table or file with the record, anomaly reason, and timestamp. Route valid records to the warehouse as normal.
Periodically review the anomaly queue to tune the AI prompt. False positives (valid records flagged as anomalies) indicate the prompt needs more context; false negatives (anomalies that pass) indicate the examples need updating.
Verification
# Verify that known anomalies are correctly flagged and valid records pass
python3 -c "
import subprocess, json, sys
result = subprocess.run(
['python3', 'scripts/validate_data_quality.py'],
capture_output=True, text=True, timeout=60
)
output = json.loads(result.stdout)
assert output['anomalies_detected'] >= output['expected_anomalies'], \
f'Expected >= {output[\"expected_anomalies\"]} anomalies, detected {output[\"anomalies_detected\"]}'
assert output['false_positives'] <= 2, f'Too many false positives: {output[\"false_positives\"]}'
print(f'Verification passed: {output[\"anomalies_detected\"]} anomalies detected, {output[\"false_positives\"]} false positives')
sys.exit(0)
"
# Expected: Verification passed: <N> anomalies detected, <M> false positives
Common failures
- The AI flags too many false positives because the prompt lacks domain-specific context. Add a constraints section to the prompt listing acceptable ranges, known valid patterns, and business rules. Update this section whenever a false positive pattern is identified during review.
- Batch processing slows the pipeline because each AI validation call takes 1-3 seconds. Process records in parallel using
asyncio.gatherwith a concurrency limit of 5-10, matching the AI API rate limit. A pipeline processing 1,000 records per minute with 10 parallel workers keeps the AI step within the same time budget. - The AI returns
"valid": truefor every record when the prompt is too permissive. Test the prompt against known bad records before deploying. If the AI misses more than 20% of known anomalies, rewrite the prompt with more specific examples of what constitutes an anomaly. - The anomaly queue grows unbounded because no one monitors it. Set up a daily summary report that counts anomalies by reason category. If any category exceeds 50 records per day, trigger an alert for manual investigation of a potential systemic data quality issue.
- Version mismatch - The installed package or runtime differs from the command shown; check the version first and rerun the smallest verification command.
- Local environment drift - Another service, virtual environment, model, or path is being used; print the active binary path and configuration before changing the guide steps.