Data Onboarder - Part 1
Can I use DSPy to optimize converting JSON into existing SQL
TL;DR
- Objective: Generate synthetic data with known data manipulations (transformations and corruptions); optimize prompts so an agent can spot said manipulations to ultimately suggest how the json could be incorporated into the SQL
- Process: Created a mock database, Python tools for querying the database, defined manipulations, generated synthetic data based on existing schema, built agent to predict these known data anomalies, attempted to score these predictions
- Outcome: The agent did a good job in identifying the issues in general but we don't know how well yet, since scoring proved to be a challenge. It was too simple to assume the agent would be able to identify root cause data manipluations (ie. delete foregin key) from a series of symptoms (ie. record with id 123 has no foreign key). This may be solvable through better prompting, LLM scoring, and improved manipulation definitions. Additionally, trying to 'pre-define' the impact of data manipulations was too much added complexity.
Intro
I live in a world of mini-projects, where the (often small/non-existant) data seems easier to get as JSON. However any serious project needs to use some form data model (SQL, NoSQL). Therefore this post looks at building an agent that:
- Detects data quality issues in incoming JSON
- Classify them by severity and remediation type
- Suggest fixes that respect your database constraints
The main hypothesis: by generating synthetic data with known corruptions, we can optimize our agent's prompts to better detect real-world data issues.
The Approach: Synthetic Data with Known Faults
Instead of manually crafting test cases, we:
- Generate clean synthetic data that respects our schema
- Apply realistic corruptions (the kinds we see in production)
- Train an agent to detect these known issues
- Optimize the detection prompts using DSPy
This approach lets us systematically improve our data quality checks.
Aside: I know I am not the first to try this, and real world data is a lot messier and complex. But the reality is we have to try this since the 'indie' devs like me don't have the key ingredient for projects - large, clean datasets.
graph LR
A[Clean Synthetic<br/>Data Generator] --> B[Apply Known<br/>Corruptions]
B --> C[Corrupted Data<br/>+ Fault Labels]
C --> D[Data Quality<br/>Agent Analysis]
D --> E[Compare Results<br/>vs Fault Labels]
E --> F[Optimize Agent<br/>Prompts DSPy]
F --> A
Setup uv Environment
%%capture
!uv pip install -U sqlite-utils dspy pydantic
import sqlite3
import pandas as pd
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple, TypedDict, Dict, Any, Union
import dspy
import os
from enum import Enum
import random
from pydantic import BaseModel, Field
# Get OpenAI API key from environment variable
openai_api_key = os.getenv('OPENAI_API_KEY')
if not openai_api_key:
raise ValueError("OPENAI_API_KEY environment variable not found. Please set it with your OpenAI API key.")
# Configure DSPy with O3 model
lm = dspy.LM('openai/o3', api_key=openai_api_key, temperature=1.0, max_tokens=50000)
dspy.configure(lm=lm)
print("✅ DSPy configured with OpenAI O3 model")
print(f"Model: {lm.model}")
print("Ready to use DSPy signatures and modules!")
✅ DSPy configured with OpenAI O3 model
Model: openai/o3
Ready to use DSPy signatures and modules!
# test using dspy
# lm("Is Greenland in Europe or NA?")
Database Setup
Now let's create our SQLite database with the car parts schema and sample data:
# Create database file
db_path = Path("tmp/car_parts.db")
print(f"Creating database at: {db_path}")
# make sure the directory exists
db_path.parent.mkdir(exist_ok=True)
# Connect to database
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Enable foreign key enforcement
cursor.execute("PRAGMA foreign_keys = ON")
print("Foreign key enforcement enabled")
Creating database at: tmp/car_parts.db
Foreign key enforcement enabled
# Drop existing tables if they exist
drop_statements = [
"DROP TABLE IF EXISTS car_model_parts",
"DROP TABLE IF EXISTS parts",
"DROP TABLE IF EXISTS part_categories",
"DROP TABLE IF EXISTS car_models",
"DROP TABLE IF EXISTS manufacturers"
]
for statement in drop_statements:
cursor.execute(statement)
print(f"Executed: {statement}")
conn.commit()
print("All tables dropped successfully")
Executed: DROP TABLE IF EXISTS car_model_parts
Executed: DROP TABLE IF EXISTS parts
Executed: DROP TABLE IF EXISTS part_categories
Executed: DROP TABLE IF EXISTS car_models
Executed: DROP TABLE IF EXISTS manufacturers
All tables dropped successfully
# Create all tables
create_statements = [
"""CREATE TABLE manufacturers (
manufacturer_id INTEGER PRIMARY KEY,
name TEXT NOT NULL UNIQUE
)""",
"""CREATE TABLE car_models (
model_id INTEGER PRIMARY KEY,
manufacturer_id INTEGER NOT NULL,
model_name TEXT NOT NULL,
model_year INTEGER NOT NULL,
FOREIGN KEY (manufacturer_id) REFERENCES manufacturers(manufacturer_id)
ON DELETE CASCADE
)""",
"""CREATE TABLE part_categories (
category_id INTEGER PRIMARY KEY,
category_name TEXT NOT NULL UNIQUE
)""",
"""CREATE TABLE parts (
part_id INTEGER PRIMARY KEY,
category_id INTEGER NOT NULL,
part_number TEXT NOT NULL UNIQUE,
description TEXT NOT NULL,
FOREIGN KEY (category_id) REFERENCES part_categories(category_id)
ON DELETE RESTRICT
)""",
"""CREATE TABLE car_model_parts (
model_id INTEGER NOT NULL,
part_id INTEGER NOT NULL,
qty INTEGER NOT NULL DEFAULT 1,
PRIMARY KEY (model_id, part_id),
FOREIGN KEY (model_id) REFERENCES car_models(model_id)
ON DELETE CASCADE,
FOREIGN KEY (part_id) REFERENCES parts(part_id)
ON DELETE RESTRICT
)"""
]
for i, statement in enumerate(create_statements, 1):
cursor.execute(statement)
table_name = statement.split()[2] # Extract table name
print(f"{i}. Created table: {table_name}")
conn.commit()
print("All tables created successfully!")
1. Created table: manufacturers
2. Created table: car_models
3. Created table: part_categories
4. Created table: parts
5. Created table: car_model_parts
All tables created successfully!
Insert Sample Data
Now let's populate the tables with sample data:
# Insert manufacturers
manufacturers_data = [
('Toyota',),
('Honda',)
]
cursor.executemany("INSERT INTO manufacturers (name) VALUES (?)", manufacturers_data)
print(f"Inserted {len(manufacturers_data)} manufacturers")
# Insert car models
car_models_data = [
(1, 'Camry', 2022),
(1, 'Corolla', 2023),
(2, 'Accord', 2022),
(2, 'Civic', 2023)
]
cursor.executemany("INSERT INTO car_models (manufacturer_id, model_name, model_year) VALUES (?, ?, ?)", car_models_data)
print(f"Inserted {len(car_models_data)} car models")
# Insert part categories
part_categories_data = [
('Engine',),
('Brake',),
('Suspension',)
]
cursor.executemany("INSERT INTO part_categories (category_name) VALUES (?)", part_categories_data)
print(f"Inserted {len(part_categories_data)} part categories")
conn.commit()
print("Basic data inserted successfully!")
Inserted 2 manufacturers
Inserted 4 car models
Inserted 3 part categories
Basic data inserted successfully!
# Insert parts
parts_data = [
(1, 'ENG-2.5L-A25A', '2.5 L I4 engine (Toyota)'),
(1, 'ENG-1.5T-L15B', '1.5 L Turbo I4 engine (Honda)'),
(2, 'BRK-FR-TOY-22', 'Front brake pad set (Toyota 2022+)'),
(2, 'BRK-FR-HON-22', 'Front brake pad set (Honda 2022+)'),
(3, 'SUS-STRUT-CIV-23', 'Front strut (Civic 2023)')
]
cursor.executemany("INSERT INTO parts (category_id, part_number, description) VALUES (?, ?, ?)", parts_data)
print(f"Inserted {len(parts_data)} parts")
# Insert car model parts relationships
car_model_parts_data = [
# Camry 2022
(1, 1, 1), # Camry -> Toyota Engine
(1, 3, 2), # Camry -> Toyota Brake Pads (qty: 2)
# Corolla 2023
(2, 3, 2), # Corolla -> Toyota Brake Pads (qty: 2)
# Accord 2022
(3, 2, 1), # Accord -> Honda Engine
(3, 4, 2), # Accord -> Honda Brake Pads (qty: 2)
# Civic 2023
(4, 2, 1), # Civic -> Honda Engine
(4, 5, 2) # Civic -> Front Strut (qty: 2)
]
cursor.executemany("INSERT INTO car_model_parts (model_id, part_id, qty) VALUES (?, ?, ?)", car_model_parts_data)
print(f"Inserted {len(car_model_parts_data)} car model-part relationships")
conn.commit()
print("All sample data inserted successfully!")
Inserted 5 parts
Inserted 7 car model-part relationships
All sample data inserted successfully!
Verify the Data
Let's verify that our database was created correctly and query some data:
# Check table structure
print("=== DATABASE TABLES ===")
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
tables = cursor.fetchall()
for table in tables:
print(f"✓ {table[0]}")
print(f"\nTotal tables created: {len(tables)}")
# Show row counts
print("\n=== ROW COUNTS ===")
for table in tables:
cursor.execute(f"SELECT COUNT(*) FROM {table[0]}")
count = cursor.fetchone()[0]
print(f"{table[0]}: {count} rows")
=== DATABASE TABLES ===
✓ car_model_parts
✓ car_models
✓ manufacturers
✓ part_categories
✓ parts
Total tables created: 5
=== ROW COUNTS ===
car_model_parts: 7 rows
car_models: 4 rows
manufacturers: 2 rows
part_categories: 3 rows
parts: 5 rows
# Example query: Get all car models with their manufacturers
query = """
SELECT
m.name as manufacturer,
cm.model_name,
cm.model_year
FROM car_models cm
JOIN manufacturers m ON cm.manufacturer_id = m.manufacturer_id
ORDER BY m.name, cm.model_name
"""
df_models = pd.read_sql_query(query, conn)
print("=== CAR MODELS ===")
print(df_models.to_string(index=False))
=== CAR MODELS ===
manufacturer model_name model_year
Honda Accord 2022
Honda Civic 2023
Toyota Camry 2022
Toyota Corolla 2023
# Complex query: Show which parts fit which car models
query = """
SELECT
m.name as manufacturer,
cm.model_name,
cm.model_year,
pc.category_name as part_category,
p.part_number,
p.description,
cmp.qty
FROM car_model_parts cmp
JOIN car_models cm ON cmp.model_id = cm.model_id
JOIN manufacturers m ON cm.manufacturer_id = m.manufacturer_id
JOIN parts p ON cmp.part_id = p.part_id
JOIN part_categories pc ON p.category_id = pc.category_id
ORDER BY m.name, cm.model_name, pc.category_name
"""
df_compatibility = pd.read_sql_query(query, conn)
print("=== PARTS COMPATIBILITY ===")
print(df_compatibility.to_string(index=False))
=== PARTS COMPATIBILITY ===
manufacturer model_name model_year part_category part_number description qty
Honda Accord 2022 Brake BRK-FR-HON-22 Front brake pad set (Honda 2022+) 2
Honda Accord 2022 Engine ENG-1.5T-L15B 1.5 L Turbo I4 engine (Honda) 1
Honda Civic 2023 Engine ENG-1.5T-L15B 1.5 L Turbo I4 engine (Honda) 1
Honda Civic 2023 Suspension SUS-STRUT-CIV-23 Front strut (Civic 2023) 2
Toyota Camry 2022 Brake BRK-FR-TOY-22 Front brake pad set (Toyota 2022+) 2
Toyota Camry 2022 Engine ENG-2.5L-A25A 2.5 L I4 engine (Toyota) 1
Toyota Corolla 2023 Brake BRK-FR-TOY-22 Front brake pad set (Toyota 2022+) 2
# Close database connection
conn.close()
print(f"Database connection closed. Database saved as: {db_path}")
print(f"Database size: {db_path.stat().st_size} bytes")
Database connection closed. Database saved as: /Users/craig/Projects/cas/notebooks/nbs/tmp/car_parts.db
Database size: 40960 bytes
Define Tools
# Database connection helper
def get_db_connection(db_path: str = "tmp/car_parts.db") -> sqlite3.Connection:
"""Get a database connection with foreign keys enabled."""
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA foreign_keys = ON")
return conn
def list_tables() -> List[str]:
"""
Get all table names in the database.
Use this tool when: You need to know what tables exist in the database before
querying or inserting data.
Returns: List of table names as strings.
Example output: ['manufacturers', 'car_models', 'parts']
"""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
tables = [row[0] for row in cursor.fetchall()]
conn.close()
return tables
def get_table_schema(table_name: str) -> pd.DataFrame:
"""
Get detailed schema information for a specific table including column names,
types, constraints, and whether columns allow NULL values.
Use this tool when: You need to understand the structure of a table before
inserting data, including what columns exist, their data types, which are
required (NOT NULL), and which are primary keys.
Args:
table_name: Name of the table to inspect
Returns: DataFrame with columns: column_name, data_type, not_null, default_value, primary_key
Example output for 'manufacturers' table:
| column_name | data_type | not_null | default_value | primary_key |
|-----------------|-----------|----------|---------------|-------------|
| manufacturer_id | INTEGER | 0 | None | 1 |
| name | TEXT | 1 | None | 0 |
"""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(f"PRAGMA table_info({table_name})")
columns = cursor.fetchall()
conn.close()
df = pd.DataFrame(columns, columns=['cid', 'column_name', 'data_type', 'not_null', 'default_value', 'primary_key'])
return df[['column_name', 'data_type', 'not_null', 'default_value', 'primary_key']]
def get_foreign_key_relationships() -> pd.DataFrame:
"""
Get all foreign key relationships in the database showing how tables are connected.
Use this tool when: You need to understand table relationships before inserting
data to ensure referential integrity. This shows which columns reference other
tables and what happens on delete/update.
Returns: DataFrame with columns: from_table, from_column, to_table, to_column, on_delete, on_update
Example output:
| from_table | from_column | to_table | to_column | on_delete | on_update |
|-------------|-----------------|---------------|-----------------|-----------|-----------|
| car_models | manufacturer_id | manufacturers | manufacturer_id | CASCADE | NO ACTION |
"""
conn = get_db_connection()
cursor = conn.cursor()
# Get all tables
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [row[0] for row in cursor.fetchall()]
relationships = []
for table in tables:
cursor.execute(f"PRAGMA foreign_key_list({table})")
fks = cursor.fetchall()
for fk in fks:
relationships.append({
'from_table': table,
'from_column': fk[3], # from column
'to_table': fk[2], # to table
'to_column': fk[4], # to column
'on_delete': fk[6], # on delete action
'on_update': fk[5] # on update action
})
conn.close()
return pd.DataFrame(relationships)
def sample_table_data(table_name: str, limit: int = 5) -> pd.DataFrame:
"""
Get a sample of rows from a table to understand the data format and content.
Use this tool when: You need to see example data in a table to understand
the format, typical values, and data patterns before inserting new records.
Args:
table_name: Name of the table to sample
limit: Maximum number of rows to return (default: 5)
Returns: DataFrame containing sample rows from the table
Example output for 'manufacturers' table:
| manufacturer_id | name |
|-----------------|--------|
| 1 | Toyota |
| 2 | Honda |
"""
conn = get_db_connection()
query = f"SELECT * FROM {table_name} LIMIT {limit}"
df = pd.read_sql_query(query, conn)
conn.close()
return df
def get_unique_values(table_name: str, column_name: str, limit: int = 50) -> List[Any]:
"""
Get unique values from a specific column in a table.
Use this tool when: You need to see what values already exist in a column
to avoid duplicates, understand naming conventions, or see valid options
for foreign key references.
Args:
table_name: Name of the table
column_name: Name of the column to get unique values from
limit: Maximum number of unique values to return (default: 50)
Returns: List of unique values from the column
Example output for manufacturers.name:
['Toyota', 'Honda']
"""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(f"SELECT DISTINCT {column_name} FROM {table_name} ORDER BY {column_name} LIMIT {limit}")
values = [row[0] for row in cursor.fetchall()]
conn.close()
return values
def get_table_row_count(table_name: str) -> int:
"""
Get the total number of rows in a table.
Use this tool when: You need to understand the size of a table or check
if it's empty before inserting data.
Args:
table_name: Name of the table to count
Returns: Integer count of rows in the table
Example output: 2 (for manufacturers table)
"""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
count = cursor.fetchone()[0]
conn.close()
return count
def get_table_columns(table_name: str) -> List[str]:
"""
Get just the column names for a table (simpler than full schema).
Use this tool when: You need a quick list of column names for a table
without the full schema details.
Args:
table_name: Name of the table
Returns: List of column names as strings
Example output for 'manufacturers': ['manufacturer_id', 'name']
"""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(f"PRAGMA table_info({table_name})")
columns = [row[1] for row in cursor.fetchall()] # row[1] is column name
conn.close()
return columns
def execute_query(query: str) -> pd.DataFrame:
"""
Execute a custom SQL query and return results as DataFrame.
Use this tool when: You need to run a specific SQL query to understand
data relationships, check for existing data, or perform complex lookups
that other tools don't cover.
Args:
query: SQL query string to execute
Returns: DataFrame with query results
Example usage: execute_query("SELECT COUNT(*) as total FROM manufacturers")
"""
conn = get_db_connection()
df = pd.read_sql_query(query, conn)
conn.close()
return df
def check_referential_integrity(table_name: str, column_name: str, value: Any) -> bool:
"""
Check if a foreign key value exists in the referenced table.
Use this tool when: You need to verify that a foreign key value exists
in the parent table before inserting a record.
Args:
table_name: Name of the table containing the foreign key
column_name: Name of the foreign key column
value: Value to check for existence
Returns: True if the value exists in the referenced table, False otherwise
Example: check_referential_integrity('car_models', 'manufacturer_id', 1)
Returns: True if manufacturer_id=1 exists in manufacturers table
"""
# First get the foreign key relationship
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(f"PRAGMA foreign_key_list({table_name})")
fks = cursor.fetchall()
target_table = None
target_column = None
for fk in fks:
if fk[3] == column_name: # from column matches
target_table = fk[2] # to table
target_column = fk[4] # to column
break
if not target_table:
conn.close()
return False # No foreign key relationship found
# Check if value exists in target table
cursor.execute(f"SELECT COUNT(*) FROM {target_table} WHERE {target_column} = ?", (value,))
exists = cursor.fetchone()[0] > 0
conn.close()
return exists
def validate_json_types_against_schema(json_data: Dict[str, Any], table_name: str) -> List[str]:
"""
Validate that JSON data types match the expected database schema types.
Use this tool when: You need to verify that incoming JSON data has compatible
types before attempting to insert into the database.
Args:
json_data: Dictionary containing the JSON data to validate
table_name: Name of the target table to validate against
Returns: List of validation error messages (empty list if all valid)
Example output: ['Field "model_year" expected INTEGER but got string "2022"']
"""
errors = []
schema_df = get_table_schema(table_name)
for _, row in schema_df.iterrows():
column_name = row['column_name']
expected_type = row['data_type']
is_required = row['not_null'] == 1
is_primary_key = row['primary_key'] == 1
# Skip auto-increment primary keys
if is_primary_key and expected_type == 'INTEGER':
continue
if column_name in json_data:
value = json_data[column_name]
# Check for null values in required fields
if value is None and is_required:
errors.append(f"Field '{column_name}' is required but got null")
continue
# Skip type checking for null values in optional fields
if value is None:
continue
# Type validation
if expected_type == 'INTEGER':
if not isinstance(value, int):
# Allow string numbers that can be converted
try:
int(value)
except (ValueError, TypeError):
errors.append(f"Field '{column_name}' expected INTEGER but got {type(value).__name__} '{value}'")
elif expected_type == 'TEXT':
if not isinstance(value, str):
errors.append(f"Field '{column_name}' expected TEXT but got {type(value).__name__} '{value}'")
elif expected_type == 'REAL':
if not isinstance(value, (int, float)):
try:
float(value)
except (ValueError, TypeError):
errors.append(f"Field '{column_name}' expected REAL but got {type(value).__name__} '{value}'")
elif is_required and not is_primary_key:
errors.append(f"Required field '{column_name}' is missing from JSON data")
return errors
def find_semantic_duplicates(new_data: Dict[str, Any], table_name: str, similarity_threshold: float = 0.8) -> List[Dict[str, Any]]:
"""
Find existing records that might be semantic duplicates of the new data.
Use this tool when: You need to detect records that aren't exact duplicates
but represent the same entity (e.g., "Toyota" vs "TOYOTA" vs "Toyota Motors").
Args:
new_data: Dictionary containing the new record data
table_name: Name of the table to search for duplicates
similarity_threshold: Minimum similarity score (0.0 to 1.0) to consider a match
Returns: List of potentially duplicate records with similarity scores
Example output: [{'record': {...}, 'similarity': 0.85, 'matched_fields': ['name']}]
"""
import difflib
# Get existing data from the table
existing_df = sample_table_data(table_name, limit=1000) # Limit for performance
if existing_df.empty:
return []
potential_duplicates = []
# Get text columns for comparison (skip IDs and numeric-only fields)
text_columns = []
schema_df = get_table_schema(table_name)
for _, row in schema_df.iterrows():
if row['data_type'] == 'TEXT' and row['column_name'] in new_data:
text_columns.append(row['column_name'])
if not text_columns:
return []
# Compare new data against each existing record
for idx, existing_row in existing_df.iterrows():
total_similarity = 0
matched_fields = []
comparison_count = 0
for column in text_columns:
if column in new_data and pd.notna(existing_row[column]):
new_value = str(new_data[column]).lower().strip()
existing_value = str(existing_row[column]).lower().strip()
if new_value and existing_value:
similarity = difflib.SequenceMatcher(None, new_value, existing_value).ratio()
total_similarity += similarity
comparison_count += 1
if similarity >= similarity_threshold:
matched_fields.append(column)
if comparison_count > 0:
avg_similarity = total_similarity / comparison_count
if avg_similarity >= similarity_threshold:
potential_duplicates.append({
'record': existing_row.to_dict(),
'similarity': round(avg_similarity, 3),
'matched_fields': matched_fields
})
# Sort by similarity score (highest first)
potential_duplicates.sort(key=lambda x: x['similarity'], reverse=True)
return potential_duplicates
def validate_business_rules_with_llm(json_data: Dict[str, Any], table_name: str, business_rules: str) -> Dict[str, Any]:
"""
Use an LLM to perform a "sense check" of the data against business rules.
Use this tool when: You need to validate complex business logic that can't be
expressed as simple database constraints. The LLM will analyze the data and
rules to identify potential violations or concerns.
Args:
json_data: Dictionary containing the data to validate
table_name: Name of the target table
business_rules: String describing the business rules to validate against
Returns: Dictionary with validation results
Example output: {
'valid': False,
'violations': ['Honda part assigned to Toyota model'],
'warnings': ['Unusual quantity: 10 engines for one car'],
'suggestions': ['Consider verifying part compatibility']
}
Note: This is a placeholder function that would integrate with an LLM service.
"""
# Get related data for context
schema_info = get_table_schema(table_name).to_dict('records')
sample_data = sample_table_data(table_name, limit=3).to_dict('records')
# For foreign keys, get some context from related tables
fk_context = {}
fk_relationships = get_foreign_key_relationships()
table_fks = fk_relationships[fk_relationships['from_table'] == table_name]
for _, fk in table_fks.iterrows():
if fk['from_column'] in json_data:
fk_value = json_data[fk['from_column']]
related_data = execute_query(f"SELECT * FROM {fk['to_table']} WHERE {fk['to_column']} = {fk_value}")
if not related_data.empty:
fk_context[fk['from_column']] = related_data.iloc[0].to_dict()
# Placeholder LLM prompt structure
validation_context = {
'table_name': table_name,
'new_data': json_data,
'schema': schema_info,
'sample_existing_data': sample_data,
'foreign_key_context': fk_context,
'business_rules': business_rules
}
# TODO: Replace with actual LLM call
# prompt = f"""
# Analyze this data for business rule violations:
#
# Table: {table_name}
# New Data: {json_data}
# Business Rules: {business_rules}
# Schema Context: {schema_info}
# Related Data: {fk_context}
#
# Identify any violations, warnings, or suggestions.
# """
#
# llm_response = call_llm(prompt)
# Placeholder response - in real implementation, this would be LLM output
placeholder_result = {
'valid': True,
'violations': [],
'warnings': [],
'suggestions': ['Business rule validation requires LLM integration'],
'context_used': validation_context
}
return placeholder_result
# Test the tools
print("✅ All database validation tools defined successfully!")
print(f"Available tables: {list_tables()}")
print("🆕 Added: validate_json_types_against_schema()")
print("🆕 Added: find_semantic_duplicates()")
print("🆕 Added: validate_business_rules_with_llm() [placeholder]")
✅ All database validation tools defined successfully!
Available tables: ['car_model_parts', 'car_models', 'manufacturers', 'part_categories', 'parts']
🆕 Added: validate_json_types_against_schema()
🆕 Added: find_semantic_duplicates()
🆕 Added: validate_business_rules_with_llm() [placeholder]
# Demonstrate the new validation tools
print("=== DEMONSTRATING NEW VALIDATION TOOLS ===\n")
# 1. JSON Type Validation
print("1. JSON Type Validation Examples:")
# Valid data
valid_data = {
"manufacturer_id": 1,
"model_name": "Prius",
"model_year": 2024
}
errors = validate_json_types_against_schema(valid_data, "car_models")
print(f" Valid data errors: {errors}")
# Invalid data - wrong types
invalid_data = {
"manufacturer_id": "not_a_number", # Should be INTEGER
"model_name": 123, # Should be TEXT
"model_year": "2024" # Could be converted to INT
}
errors = validate_json_types_against_schema(invalid_data, "car_models")
print(f" Invalid data errors: {errors}")
# Missing required field
missing_data = {
"model_name": "Prius"
# Missing required manufacturer_id and model_year
}
errors = validate_json_types_against_schema(missing_data, "car_models")
print(f" Missing fields errors: {errors}")
print("\n" + "="*50 + "\n")
# 2. Semantic Duplicate Detection
print("2. Semantic Duplicate Detection:")
# Test with similar manufacturer name
new_manufacturer = {"name": "TOYOTA"} # Similar to existing "Toyota"
duplicates = find_semantic_duplicates(new_manufacturer, "manufacturers")
print(f" Potential duplicates for 'TOYOTA': {len(duplicates)} found")
if duplicates:
for dup in duplicates:
print(f" - Match: {dup['record']['name']} (similarity: {dup['similarity']})")
# Test with different manufacturer
new_manufacturer2 = {"name": "Ford"}
duplicates2 = find_semantic_duplicates(new_manufacturer2, "manufacturers")
print(f" Potential duplicates for 'Ford': {len(duplicates2)} found")
print("\n" + "="*50 + "\n")
# 3. Business Rules Validation (Placeholder)
print("3. Business Rules Validation:")
sample_business_rules = """
1. A car model cannot have more than one engine
2. Honda parts should not be used on Toyota vehicles
3. Model year must be within 10 years of current year
4. Brake parts are required for all car models
"""
sample_data = {
"model_id": 1,
"part_id": 2, # Honda engine
"qty": 1
}
validation_result = validate_business_rules_with_llm(sample_data, "car_model_parts", sample_business_rules)
print(f" Validation result: {validation_result['valid']}")
print(f" Suggestions: {validation_result['suggestions']}")
print(f" Context gathered: {list(validation_result['context_used'].keys())}")
print("\n✅ All new validation tools demonstrated!")
=== DEMONSTRATING NEW VALIDATION TOOLS ===
1. JSON Type Validation Examples:
Valid data errors: []
Invalid data errors: ["Field 'manufacturer_id' expected INTEGER but got str 'not_a_number'", "Field 'model_name' expected TEXT but got int '123'"]
Missing fields errors: ["Required field 'manufacturer_id' is missing from JSON data", "Required field 'model_year' is missing from JSON data"]
==================================================
2. Semantic Duplicate Detection:
Potential duplicates for 'TOYOTA': 1 found
- Match: Toyota (similarity: 1.0)
Potential duplicates for 'Ford': 0 found
==================================================
3. Business Rules Validation:
Validation result: True
Suggestions: ['Business rule validation requires LLM integration']
Context gathered: ['table_name', 'new_data', 'schema', 'sample_existing_data', 'foreign_key_context', 'business_rules']
✅ All new validation tools demonstrated!
# Demonstrate the database inspection tools
print("=== DEMONSTRATING DATABASE INSPECTION TOOLS ===\n")
# 1. List all tables
print("1. Available tables:")
tables = list_tables()
for table in tables:
print(f" - {table}")
print("\n" + "="*50 + "\n")
# 2. Get schema for manufacturers table
print("2. Schema for 'manufacturers' table:")
schema = get_table_schema('manufacturers')
print(schema.to_string(index=False))
print("\n" + "="*50 + "\n")
# 3. Show foreign key relationships
print("3. Foreign key relationships:")
fk_df = get_foreign_key_relationships()
if not fk_df.empty:
print(fk_df.to_string(index=False))
else:
print("No foreign key relationships found")
print("\n" + "="*50 + "\n")
# 4. Sample data from manufacturers
print("4. Sample data from 'manufacturers' table:")
sample = sample_table_data('manufacturers', 3)
print(sample.to_string(index=False))
print("\n" + "="*50 + "\n")
# 5. Get unique values from a column
print("5. Unique manufacturer names:")
unique_names = get_unique_values('manufacturers', 'name')
print(f" {unique_names}")
print("\n" + "="*50 + "\n")
# 6. Row counts for all tables
print("6. Row counts for all tables:")
for table in tables:
count = get_table_row_count(table)
print(f" {table}: {count} rows")
print("\n" + "="*50 + "\n")
# 7. Column names for car_models table
print("7. Column names for 'car_models' table:")
columns = get_table_columns('car_models')
print(f" {columns}")
print("\n" + "="*50 + "\n")
# 8. Custom query example
print("8. Custom query - Car models with manufacturer names:")
query_result = execute_query("""
SELECT m.name as manufacturer, cm.model_name, cm.model_year
FROM car_models cm
JOIN manufacturers m ON cm.manufacturer_id = m.manufacturer_id
ORDER BY m.name
""")
print(query_result.to_string(index=False))
print("\n" + "="*50 + "\n")
# 9. Check referential integrity
print("9. Referential integrity checks:")
print(f" manufacturer_id=1 exists: {check_referential_integrity('car_models', 'manufacturer_id', 1)}")
print(f" manufacturer_id=99 exists: {check_referential_integrity('car_models', 'manufacturer_id', 99)}")
print("\n✅ All tools working correctly!")
=== DEMONSTRATING DATABASE INSPECTION TOOLS ===
1. Available tables:
- car_model_parts
- car_models
- manufacturers
- part_categories
- parts
==================================================
2. Schema for 'manufacturers' table:
column_name data_type not_null default_value primary_key
manufacturer_id INTEGER 0 None 1
name TEXT 1 None 0
==================================================
3. Foreign key relationships:
from_table from_column to_table to_column on_delete on_update
car_models manufacturer_id manufacturers manufacturer_id CASCADE NO ACTION
parts category_id part_categories category_id RESTRICT NO ACTION
car_model_parts part_id parts part_id RESTRICT NO ACTION
car_model_parts model_id car_models model_id CASCADE NO ACTION
==================================================
4. Sample data from 'manufacturers' table:
manufacturer_id name
1 Toyota
2 Honda
==================================================
5. Unique manufacturer names:
['Honda', 'Toyota']
==================================================
6. Row counts for all tables:
car_model_parts: 7 rows
car_models: 4 rows
manufacturers: 2 rows
part_categories: 3 rows
parts: 5 rows
==================================================
7. Column names for 'car_models' table:
['model_id', 'manufacturer_id', 'model_name', 'model_year']
==================================================
8. Custom query - Car models with manufacturer names:
manufacturer model_name model_year
Honda Accord 2022
Honda Civic 2023
Toyota Camry 2022
Toyota Corolla 2023
==================================================
9. Referential integrity checks:
manufacturer_id=1 exists: True
manufacturer_id=99 exists: False
✅ All tools working correctly!
Define Transformations
We categorize data issues into three types:
Schema-Verifiable Violations: Issues detectable through database constraints
- Missing required fields
- Foreign key violations
- Type mismatches
Business Logic Violations: Issues requiring domain knowledge
- Invalid date ranges
- Semantic duplicates
- Workflow state violations
Data Transformations: Structural changes from source systems
- Renamed columns
- Flattened relationships
- Added noise fields
# -----------------------------------------------------------------------------
# FULL ADVANCED SYNTHETIC DATA PARAMETER DEFINITIONS
# - All violation keys (schema-verifiable & non-verifiable)
# - All transformation keys (shape & volume)
# - Helpers to sample according to desired probabilities
# -----------------------------------------------------------------------------
# Helper
def opts(*details):
"""Wrap detail strings in a uniform structure."""
return {"detail_options": list(details)}
# --- 1. Violations discoverable via DB schema & simple queries ---------------
schema_verifiable_params = {
# NOT-NULL / required
"required_fields_missing": opts(
"omit one required column",
"omit several required columns",
"omit every required column",
),
"null_in_not_null_columns": opts(
"set single NOT-NULL column to NULL",
"set multiple NOT-NULL columns to NULL",
),
# FK integrity
"foreign_key_references_missing": opts(
"one FK points at missing parent",
"many FKs point at missing parents",
),
"foreign_key_wrong_type": opts(
"string instead of integer FK",
"float instead of integer FK",
),
"foreign_key_circular_references": opts(
"simple two-table cycle",
"complex three-table cycle",
),
"orphaned_child_records": opts(
"single orphan child row",
"many orphan child rows",
),
"foreign_key_null_when_required": opts(
"one required FK column NULL",
"multiple required FK columns NULL",
),
# PK / UNIQUE
"primary_key_duplicates": opts(
"duplicate PK once",
"duplicate PK across several rows",
),
"primary_key_missing": opts(
"row with NULL PK",
"row missing PK column entirely",
),
"primary_key_wrong_type": opts(
"string PK where integer expected",
),
"unique_constraint_violations": opts(
"duplicate single-column UNIQUE value",
"duplicate composite UNIQUE value",
),
"composite_unique_violations": opts(
"same composite key duplicated twice",
"same composite key duplicated many times",
),
# Data-type & length
"data_type_mismatches": opts(
"string in integer column",
"boolean in text column",
"mixed types in one column",
),
"string_length_violations": opts(
"value exceeds max length",
"empty string in NOT-NULL text column",
),
# Composite keys & duplicates
"incomplete_composite_keys": opts(
"missing last column of composite PK",
"only first column supplied",
),
"exact_duplicate_rows": opts(
"insert identical row twice",
"insert identical row ten times",
),
"redundant_relationships": opts(
"duplicate association row",
"conflicting duplicate association rows",
),
# Multi-table referential chains
"referential_integrity_chains": opts(
"break chain at mid-level",
"multiple broken links in chain",
),
"cascade_delete_conflicts": opts(
"delete parent restricted by child",
"cascade loop across multiple tables",
),
}
# --- 2. Violations requiring business / semantic reasoning --------------------
non_schema_verifiable_params = {
"numeric_range_violations": opts(
"value below allowed minimum",
"value above allowed maximum",
),
"date_format_violations": opts(
"invalid date format",
"impossible calendar date",
),
"enum_value_violations": opts(
"single invalid enum value",
"multiple invalid enum values",
),
"business_rule_violations": opts(
"violate one business rule",
"violate several business rules",
),
"cross_column_check_failures": opts(
"date range invalid",
"amount inconsistent with total",
),
"calculated_field_mismatches": opts(
"derived field mismatch",
),
"partial_record_sets": opts(
"missing related lookup row",
"incomplete hierarchy levels",
),
"missing_required_relationships": opts(
"parent without children",
"children without parent",
),
"semantic_duplicates": opts(
"similar names differing in case",
"fuzzy duplicate across spelling variants",
),
"case_sensitivity_problems": opts(
"mixed-case duplicates",
"case inconsistency in column",
),
"special_character_handling": opts(
"suspicious SQL-injection pattern",
"unicode edge-case characters",
),
"temporal_consistency_violations": opts(
"events out of chronological order",
"overlapping validity periods",
),
"transaction_boundary_violations": opts(
"partial commit leaves inconsistent state",
"rolled-back transaction not isolated",
),
"concurrent_modification_conflicts": opts(
"version conflict detected",
"row-level lock conflict",
),
"workflow_state_violations": opts(
"invalid workflow transition",
"missing prerequisite state",
),
"temporal_logic_violations": opts(
"event occurs before creation date",
"end date earlier than start date",
),
"quantity_balance_violations": opts(
"negative inventory quantity",
"quantity exceeds capacity",
),
"version_compatibility_issues": opts(
"use deprecated field",
"reference new required field",
),
"schema_migration_conflicts": opts(
"incompatible type change",
"potential data loss on migration",
),
}
# --- 3a. JSON shape / structure transformations ------------------------------
shape_transformations = {
"add_related_data": opts(
"attach new related column to existing table",
"introduce entirely new related table",
),
"add_noise": opts(
"insert irrelevant columns",
"insert unrelated nested object",
),
"flatten_data": opts(
"denormalise child into parent table",
"merge nested keys into root object",
),
"rename_columns": opts(
"rename one column",
"rename multiple columns",
),
"rename_tables": opts(
"rename a single table",
"rename several tables",
),
"drop_columns": opts(
"drop one expected column",
"drop multiple expected columns",
),
"drop_tables": opts(
"drop one expected table",
"drop several expected tables",
),
}
# --- 3b. Volume / depth stress-test transformations --------------------------
output_transformation_params = {
# "record_volume": opts(1, 10, 100, 1000),
# "batch_size_variations": opts(
# "single_record",
# "small_batch (≤10)",
# "large_batch (≈100)",
# "oversized_batch (>1000)",
# ),
"nested_relationship_depth": opts(
"shallow (1 level)",
"moderate (2–3 levels)",
"deep (4–5 levels)",
"excessive (>5 levels)",
),
}
# Combine all transformations
transformation_params = {**shape_transformations, **output_transformation_params}
# =============================================================
# LIGHTWEIGHT FAULT-LABELLING + SAMPLING HELPERS (v2025-06-20)
# =============================================================
# ---------- 1. ENUMS & LABEL STRUCTURE -----------------------
class Severity(str, Enum):
BLOCKER = "blocker" # must fix before insert
RISKY = "risky" # can insert but follow-up needed
INFO = "info" # cosmetic
class FixType(str, Enum):
AUTO = "auto" # agent repairs silently
ASK_VALUE = "ask_value" # missing scalar / FK / range
ASK_MAP = "ask_map" # ambiguous rename / enum ext.
ASK_SCHEMA = "ask_schema" # requires DDL approval
class FaultLabel(TypedDict):
name: str # name of the violation or transformation
severity: Severity
fix: FixType
target: str # filled later by agent
# ---------- 2. RULES (key → severity / fix) ------------------
# Schema-verifiable: almost always BLOCKER or RISKY
_blocker_keys = {
"required_fields_missing",
"foreign_key_references_missing",
"primary_key_duplicates",
"foreign_key_circular_references",
"cascade_delete_conflicts",
}
_risky_value_keys = {
"null_in_not_null_columns",
"foreign_key_wrong_type",
"foreign_key_null_when_required",
"data_type_mismatches",
"string_length_violations",
"numeric_range_violations",
"enum_value_violations",
"date_format_violations",
}
# Shape transformations
_shape_ask_map = {"rename_columns", "rename_tables"}
_shape_ask_schema = {
"add_related_data",
"drop_columns",
"add_noise",
"drop_tables",
"flatten_data",
}
def severity_for(key: str) -> Severity:
if key in _blocker_keys:
return Severity.BLOCKER
if key in _risky_value_keys or key in _shape_ask_map or key in _shape_ask_schema:
return Severity.RISKY
return Severity.INFO
def fix_for(key: str) -> FixType:
if key in _shape_ask_schema:
return FixType.ASK_SCHEMA
if key in _shape_ask_map:
return FixType.ASK_MAP
if key in _risky_value_keys:
return FixType.ASK_VALUE
if key in _blocker_keys:
return FixType.ASK_VALUE
return FixType.AUTO
def classify_key(key: str) -> FaultLabel:
return {
"name": key,
"severity": severity_for(key),
"fix": fix_for(key),
"target": '', # agent fills once it inspects payload
}
# ---------- 3. UPDATED SAMPLERS ------------------------------
def pick_violation(pool: Dict, *, skip_probability: float = 0.05):
"""Return None or {type, detail, label}"""
if random.random() < skip_probability:
return None
key = random.choice(list(pool))
detail = random.choice(pool[key]["detail_options"])
return {"type": key, "detail": detail, "label": classify_key(key)}
def pick_transformations(
pool: Dict,
*,
none_probability: float = 0.10,
weights: tuple[float, float, float] = (0.60, 0.25, 0.15),
):
"""Return None or list[{type, detail, label}]"""
if random.random() < none_probability:
return None
n = random.choices([1, 2, 3], weights=weights, k=1)[0]
n = min(n, len(pool))
keys = random.sample(list(pool), n)
out = []
for k in keys:
detail = random.choice(pool[k]["detail_options"])
out.append({"type": k, "detail": detail, "label": classify_key(k)})
return out
# ---------- 4. QUICK SELF-TEST -------------------------------
print("Sample violation:")
print(pick_violation(schema_verifiable_params))
print("\nSample transformations:")
print(pick_transformations(transformation_params))
Sample violation:
{'type': 'composite_unique_violations', 'detail': 'same composite key duplicated many times', 'label': {'name': 'composite_unique_violations', 'severity': <Severity.INFO: 'info'>, 'fix': <FixType.AUTO: 'auto'>, 'target': ''}}
Sample transformations:
[{'type': 'add_related_data', 'detail': 'introduce entirely new related table', 'label': {'name': 'add_related_data', 'severity': <Severity.RISKY: 'risky'>, 'fix': <FixType.ASK_SCHEMA: 'ask_schema'>, 'target': ''}}]
Generate Data
# helper functions to collate schema and current sample data
def collate_schema() -> str:
"""
Collate all database schema information into a single formatted string.
This function combines table schemas, foreign key relationships, and
structural information into a comprehensive schema description suitable
for LLM processing.
Returns: A formatted string containing complete schema information
"""
output_lines = []
output_lines.append("=== DATABASE SCHEMA INFORMATION ===\n")
# Get all tables
tables = list_tables()
output_lines.append(f"Database contains {len(tables)} tables: {', '.join(tables)}\n")
# Get foreign key relationships first (for context)
fk_relationships = get_foreign_key_relationships()
if not fk_relationships.empty:
output_lines.append("--- FOREIGN KEY RELATIONSHIPS ---")
for _, fk in fk_relationships.iterrows():
relationship_desc = (
f"{fk['from_table']}.{fk['from_column']} -> "
f"{fk['to_table']}.{fk['to_column']} "
f"(ON DELETE {fk['on_delete']}, ON UPDATE {fk['on_update']})"
)
output_lines.append(relationship_desc)
output_lines.append("")
# Get detailed schema for each table
output_lines.append("--- TABLE SCHEMAS ---")
for table_name in sorted(tables):
output_lines.append(f"\nTable: {table_name}")
output_lines.append("-" * (len(table_name) + 7))
# Get schema info
schema_df = get_table_schema(table_name)
# Format schema information
for _, row in schema_df.iterrows():
column_info = f" {row['column_name']}: {row['data_type']}"
# Add constraints
constraints = []
if row['primary_key'] == 1:
constraints.append("PRIMARY KEY")
if row['not_null'] == 1:
constraints.append("NOT NULL")
if row['default_value'] is not None:
constraints.append(f"DEFAULT {row['default_value']}")
if constraints:
column_info += f" ({', '.join(constraints)})"
output_lines.append(column_info)
# Add foreign key info for this table
table_fks = fk_relationships[fk_relationships['from_table'] == table_name] if not fk_relationships.empty else []
if len(table_fks) > 0:
output_lines.append(" Foreign Keys:")
for _, fk in table_fks.iterrows():
fk_info = f" {fk['from_column']} references {fk['to_table']}.{fk['to_column']}"
output_lines.append(fk_info)
# Add row count
row_count = get_table_row_count(table_name)
output_lines.append(f" Current rows: {row_count}")
return "\n".join(output_lines)
def collate_sample_data(max_rows_per_table: int = 3) -> str:
"""
Collate sample data from all tables into a single formatted string.
This function retrieves sample data from each table and formats it
in a way that's easy for an LLM to understand the data patterns,
formats, and relationships.
Args:
max_rows_per_table: Maximum number of sample rows to include per table
Returns: A formatted string containing sample data from all tables
"""
output_lines = []
output_lines.append("=== SAMPLE DATA FROM ALL TABLES ===\n")
# Get all tables
tables = list_tables()
for table_name in sorted(tables):
output_lines.append(f"Table: {table_name}")
output_lines.append("-" * (len(table_name) + 7))
# Get sample data
sample_df = sample_table_data(table_name, limit=max_rows_per_table)
if sample_df.empty:
output_lines.append(" (No data)")
else:
# Convert DataFrame to a more readable format
output_lines.append(f" Columns: {', '.join(sample_df.columns.tolist())}")
output_lines.append(" Sample rows:")
for idx, row in sample_df.iterrows():
row_data = []
for col in sample_df.columns:
value = row[col]
if pd.isna(value):
value = "NULL"
elif isinstance(value, str):
value = f"'{value}'"
row_data.append(f"{col}={value}")
output_lines.append(f" [{', '.join(row_data)}]")
output_lines.append("")
return "\n".join(output_lines)
def natty_lang_violations(violation_output) -> List[str] | None:
"""
Convert violation output from pick_violation into natural language.
Args:
violation_output: Output from pick_violation function (None or dict with 'type' and 'detail' keys)
Returns: None if input is None, otherwise list of strings describing the violation
"""
if violation_output is None:
return None
violation_type = violation_output.get('type', 'unknown_violation')
violation_detail = violation_output.get('detail', 'no details provided')
# Convert snake_case to readable format
readable_type = violation_type.replace('_', ' ').title()
# Create natural language description
description = f"Data Violation: {readable_type} - {violation_detail}"
return [description]
def natty_lang_transformations(transformations_output) -> List[str] | None:
"""
Convert transformations output from pick_transformations into natural language.
Args:
transformations_output: Output from pick_transformations function (None or list of dicts with 'type' and 'detail' keys)
Returns: None if input is None, otherwise list of strings describing the transformations
"""
if transformations_output is None:
return None
descriptions = []
for transformation in transformations_output:
transformation_type = transformation.get('type', 'unknown_transformation')
transformation_detail = transformation.get('detail', 'no details provided')
# Convert snake_case to readable format
readable_type = transformation_type.replace('_', ' ').title()
# Create natural language description
description = f"Data Transformation: {readable_type} - {transformation_detail}"
descriptions.append(description)
return descriptions
# Module 1: Generate clean, valid synthetic data based on schema and sample data
class GenerateCleanSynthData(dspy.Signature):
"""
Generate *new, clean, valid* synthetic data that conforms to the database schema.
Use the sample data to understand patterns, naming conventions, and data formats.
IMPORTANT: Create realistic automotive industry data with natural variation:
- Use real car manufacturers beyond the samples (Ford, Chevrolet, Nissan, BMW, etc.)
- Generate authentic part numbers following automotive conventions
- Create realistic model names and years (recent but not current sample years)
- Use proper automotive terminology in descriptions
- Maintain referential integrity across all foreign key relationships
- Include variety in data formatting while staying within schema constraints
The output should be well-formed JSON data that could be successfully inserted
into the database without any violations. This clean data will later be corrupted
with realistic data quality issues for testing purposes.
Generate 3-5 records per table to provide good variety for testing.
"""
current_schema = dspy.InputField(
desc="Complete database schema with table structures, relationships, and constraints"
)
sample_data = dspy.InputField(
desc="Sample of existing data showing patterns and formats"
)
clean_synthetic_data: dict = dspy.OutputField(
desc="Valid JSON data organized by table name, ready for database insertion"
)
class GenerateCleanData(dspy.Module):
"""First module: Generate clean synthetic data that conforms to schema"""
def __init__(self):
self.schema = collate_schema()
self.sample = collate_sample_data()
self.sig = dspy.Predict(GenerateCleanSynthData)
def forward(self):
response = self.sig(current_schema=self.schema, sample_data=self.sample)
return response
class CorruptDataItem(BaseModel):
name: str = Field(description="The name of the applied violation or transformation as provided by the `pick_violation` or `pick_transformations` input")
detail: str = Field(description="An explicit, natural language description of the applied violation or transformation. Must all names of the table, column, or value that was affected.")
# SIMPLIFIED Module 2: Apply violations and transformations AND output fault labels directly
class ApplyDataCorruptionSimplified(dspy.Signature):
"""
Take clean, valid synthetic data and apply specific violations and transformations
to create REALISTIC, ORGANIC test scenarios that mimic real-world data onboarding problems.
IMPORTANT: For each violation or transformation applied, output a `CorruptDataItem` object with the name of the violation or transformation and a explicit detail string.
❌ AVOID creating obvious artificial problems like:
- Tables named "noise", "junk", or "test_data"
- Completely unrelated data (like food items in a car parts database)
✅ CREATE realistic automotive industry data problems like:
- Manufacturer name variations: "Toyota" vs "TOYOTA MOTOR CORP" vs "Toyota Motors"
- Inconsistent part numbering: "BRK-FR-TOY-22" vs "BRK_FR_TOY_22" vs "BRKFRTOY22"
- Missing required fields, NULL values in NOT NULL columns
- Renamed columns from legacy systems: "mfg_name" instead of "name"
- Dropped tables/columns during data migration
"""
clean_data = dspy.InputField(desc="Clean, valid synthetic data organized by table")
schema_info = dspy.InputField(desc="Database schema information for constraint validation")
violation_instructions = dspy.InputField(desc="Natural language description of data violation to apply")
transformation_instructions = dspy.InputField(desc="Natural language description of data transformations to apply")
corrupted_data: dict = dspy.OutputField(desc="Realistically corrupted data that maintains automotive domain authenticity")
corruption_details: List[CorruptDataItem] = dspy.OutputField(desc="List of CorruptDataItem objects with the name of the violation or transformation and an explicit detail string")
class ApplyViolationsAndTransformationsSimplified(dspy.Module):
"""SIMPLIFIED module: Apply violations and transformations to clean data and output fault labels directly"""
def __init__(self, clean_data: dict):
self.clean_data = clean_data
self.schema = collate_schema()
# Generate random violations and transformations
self.violation = pick_violation(schema_verifiable_params)
self.transformations = pick_transformations(transformation_params)
self.sig = dspy.Predict(ApplyDataCorruptionSimplified)
def forward(self):
# Convert violations and transformations to natural language instructions
violation_instructions = natty_lang_violations(self.violation)
transformation_instructions = natty_lang_transformations(self.transformations)
# Convert to strings for LLM input
violation_str = "; ".join(violation_instructions) if violation_instructions else "No violations to apply"
transformation_str = "; ".join(transformation_instructions) if transformation_instructions else "No transformations to apply"
response = self.sig(
clean_data=self.clean_data,
schema_info=self.schema,
violation_instructions=violation_str,
transformation_instructions=transformation_str
)
# Process corruption_details into proper FaultLabel objects
# Use the original violation/transformation objects instead of relying on LLM output
processed_fault_labels = []
# Create fault labels from original violations and transformations
original_violations_and_transformations = []
if self.violation:
original_violations_and_transformations.append(self.violation)
if self.transformations:
original_violations_and_transformations.extend(self.transformations)
# Map corruption details to original violations/transformations
# Handle cases where LLM might return different number of items than expected
num_items = min(len(response.corruption_details), len(original_violations_and_transformations))
for i in range(num_items):
corruption_item = response.corruption_details[i]
original_item = original_violations_and_transformations[i]
original_key = original_item["type"]
# Use the original key for classification
fault_classification = classify_key(original_key)
# Create proper fault label with correct enums
processed_fault = {
"severity": fault_classification["severity"],
"fix": fault_classification["fix"],
"target": corruption_item.detail,
"violation_type": original_key, # Use original key
}
processed_fault_labels.append(processed_fault)
# If LLM returned more corruption details than we had original items,
# try to map them by creating a reverse mapping from title case back to snake_case
if len(response.corruption_details) > len(original_violations_and_transformations):
# Create reverse mapping dictionary
all_keys = {**schema_verifiable_params, **transformation_params}
reverse_mapping = {}
for key in all_keys.keys():
title_case_key = key.replace('_', ' ').title()
reverse_mapping[title_case_key] = key
reverse_mapping[key] = key # Also map original key to itself
# Process remaining items
for i in range(len(original_violations_and_transformations), len(response.corruption_details)):
corruption_item = response.corruption_details[i]
# Try to find original key using reverse mapping
original_key = reverse_mapping.get(corruption_item.name, corruption_item.name)
# Use the original key for classification
fault_classification = classify_key(original_key)
# Create proper fault label with correct enums
processed_fault = {
"severity": fault_classification["severity"],
"fix": fault_classification["fix"],
"target": corruption_item.detail,
"violation_type": original_key,
}
processed_fault_labels.append(processed_fault)
# Add fault_labels to response
response.fault_labels = processed_fault_labels
return response
# SIMPLIFIED combined pipeline
class GenerateSyntheticTestDataSimplified(dspy.Module):
"""SIMPLIFIED pipeline: Generate clean data, apply violations/transformations with direct fault label output"""
def __init__(self):
self.clean_generator = GenerateCleanData()
def forward(self):
# Create faster model for clean data generation
faster_lm = dspy.LM('openai/gpt-4.1', api_key=os.getenv('OPENAI_API_KEY'), temperature=1.0, max_tokens=5000)
# Step 1: Generate clean data (using faster model)
with dspy.context(lm=faster_lm):
clean_response = self.clean_generator()
clean_data = clean_response.clean_synthetic_data
# Step 2: Apply violations and transformations AND get fault labels directly (keep original model)
corruptor = ApplyViolationsAndTransformationsSimplified(clean_data)
corrupted_response = corruptor()
return {
'clean_data': clean_data,
'corrupted_data': corrupted_response.corrupted_data,
'fault_labels': corrupted_response.fault_labels, # Fault labels directly from ApplyDataCorruption
'expected_detections': len(corrupted_response.fault_labels)
}
# Test the SIMPLIFIED pipeline
print("🔧 Testing SIMPLIFIED GenerateSyntheticTestData pipeline...")
# Create and run the simplified pipeline
simplified_pipeline = GenerateSyntheticTestDataSimplified()
simplified_result = simplified_pipeline()
print("\n=== SIMPLIFIED PIPELINE RESULTS ===")
print(f"Tables in corrupted data: {list(simplified_result['corrupted_data'].keys())}")
print(f"Number of fault labels: {simplified_result['expected_detections']}")
print("\n--- FAULT LABELS FOR EVALUATION ---")
for i, fault_label in enumerate(simplified_result['fault_labels'], 1):
print(f"\nFault {i}:")
print(f" Type: {fault_label.get('violation_type', 'unknown')}")
print(f" Severity: {fault_label['severity']}")
print(f" Fix: {fault_label['fix']}")
print(f" Target: {fault_label['target']}")
🔧 Testing SIMPLIFIED GenerateSyntheticTestData pipeline...
=== SIMPLIFIED PIPELINE RESULTS ===
Tables in corrupted data: ['manufacturers', 'part_categories', 'car_models', 'parts', 'car_model_parts']
Number of fault labels: 2
--- FAULT LABELS FOR EVALUATION ---
Fault 1:
Type: required_fields_missing
Severity: Severity.BLOCKER
Fix: FixType.ASK_VALUE
Target: Removed NOT NULL columns in several tables: manufacturers.name absent for manufacturer_id=4 and manufacturers.manufacturer_id missing for new row 'Toyota Motor Corp'; part_categories.category_name omitted for category_id=5; car_models.model_year omitted for model_id=5 and manufacturer_id omitted for model_id=7; parts.part_number missing for part_id=7 and parts.category_id missing for part_id=9; car_model_parts.qty omitted for (model_id=5, part_id=8) and car_model_parts.model_id omitted for row with part_id=10.
Fault 2:
Type: nested_relationship_depth
Severity: Severity.INFO
Fix: FixType.AUTO
Target: Replaced the flat TEXT value in parts.description for part_id=8 with a 5-level nested JSON structure (partDetails → fitment → vehicle and partDetails → package → dimensions → length/width/height) to introduce excessive nesting.
!uv pip install nbformat
[2mUsing Python 3.12.8 environment at: /Users/craig/miniforge3/envs/cas[0m
[2K[2mResolved [1m11 packages[0m [2min 177ms[0m[0m [0m
[2K[37m⠙[0m [2mPreparing packages...[0m (0/2)
[2K[1A[37m⠙[0m [2mPreparing packages...[0m (0/2)--------------[0m[0m 0 B/23.36 KiB [1A
[2K[1A[37m⠙[0m [2mPreparing packages...[0m (0/2)--------------[0m[0m 0 B/23.36 KiB [1A
[2mfastjsonschema [0m [32m[2m------------------------------[0m[0m 0 B/23.36 KiB
[2K[2A[37m⠙[0m [2mPreparing packages...[0m (0/2)--------------[0m[0m 0 B/76.62 KiB [2A
[2mfastjsonschema [0m [32m--------------------[2m----------[0m[0m 14.91 KiB/23.36 KiB
[2K[2A[37m⠙[0m [2mPreparing packages...[0m (0/2)--------------[0m[0m 0 B/76.62 KiB [2A
[2mfastjsonschema [0m [32m--------------------[2m----------[0m[0m 14.91 KiB/23.36 KiB
[2K[2A[37m⠙[0m [2mPreparing packages...[0m (0/2)--------------[0m[0m 16.00 KiB/76.62 KiB [2A
[2mfastjsonschema [0m [32m------------------------------[2m[0m[0m 23.36 KiB/23.36 KiB
[2K[2A[37m⠙[0m [2mPreparing packages...[0m (0/2)--------------[0m[0m 16.00 KiB/76.62 KiB [2A
[2mfastjsonschema [0m [32m------------------------------[2m[0m[0m 23.36 KiB/23.36 KiB
[2K[2A[37m⠙[0m [2mPreparing packages...[0m (0/2)--------------[0m[0m 32.00 KiB/76.62 KiB [2A
[2K[1A[37m⠙[0m [2mPreparing packages...[0m (0/2)--------------[0m[0m 32.00 KiB/76.62 KiB [1A
[2K[1A[37m⠙[0m [2mPreparing packages...[0m (0/2)[2m-----------[0m[0m 48.00 KiB/76.62 KiB [1A
[2K[1A[37m⠙[0m [2mPreparing packages...[0m (0/2)------[2m----[0m[0m 64.00 KiB/76.62 KiB [1A
[2K[2mPrepared [1m2 packages[0m [2min 34ms[0m[0m [1A
[2K[2mInstalled [1m2 packages[0m [2min 10ms[0m[0m [0m
[32m+[39m [1mfastjsonschema[0m[2m==2.21.1[0m
[32m+[39m [1mnbformat[0m[2m==5.10.4[0m
Data Agent
# Simplified FaultLabel without name field
class DetectedFault(BaseModel):
severity: str = Field(description="One of: blocker, risky, info")
fix: str = Field(description="One of: auto, ask_value, ask_map, ask_schema")
target: str = Field(
description="Detailed description of what was detected and where"
)
# DSPy Signature for the Data Agent
class AnalyzeDataQuality(dspy.Signature):
"""
Analyze provided JSON data against database schema to detect data quality issues.
You have access to database inspection tools. Use them systematically to:
1. Check schema compliance (data types, constraints, foreign keys)
2. Detect missing required fields or tables
3. Identify referential integrity violations
4. Find semantic issues (duplicates, invalid values)
5. Spot structural transformations (renamed/dropped columns)
For each issue found, determine:
- Severity: blocker (prevents insertion), risky (needs follow-up), info (cosmetic)
- Fix type: auto (can fix automatically), ask_value (need user input for values),
ask_map (need mapping info), ask_schema (need schema changes)
- Target: Specific description of the issue with table/column/value details
Be thorough but avoid false positives. Focus on actual data problems.
Use the tools provided to inspect the data and schema.
"""
json_data: dict = dspy.InputField(
desc="JSON data to analyze, organized by table name"
)
detected_faults: List[DetectedFault] = dspy.OutputField(
desc="List of detected data quality issues with severity, fix type, and details"
)
# Create a dictionary of all the database tools
db_tools = [list_tables, get_table_schema, get_foreign_key_relationships, sample_table_data, get_unique_values, get_table_row_count, get_table_columns, execute_query, check_referential_integrity, validate_json_types_against_schema, find_semantic_duplicates, validate_business_rules_with_llm, collate_schema, collate_sample_data]
# DSPy ReAct Agent
data_quality_agent = dspy.ReAct(AnalyzeDataQuality, tools=db_tools)
# Run the agent on the corrupted synthetic data
print("🔍 Analyzing corrupted data with Data Quality Agent...")
# Use the corrupted data from the previous pipeline run
corrupted_data = simplified_result["corrupted_data"]
# Run the agent
agent_result = data_quality_agent(json_data=corrupted_data)
print(f"\n✅ Agent analysis complete!")
🔍 Analyzing corrupted data with Data Quality Agent...
✅ Agent analysis complete!
simplified_result
{'clean_data': {'manufacturers': [{'manufacturer_id': 3, 'name': 'Ford'},
{'manufacturer_id': 4, 'name': 'BMW'},
{'manufacturer_id': 5, 'name': 'Chevrolet'}],
'part_categories': [{'category_id': 4, 'category_name': 'Electrical'},
{'category_id': 5, 'category_name': 'Transmission'},
{'category_id': 6, 'category_name': 'Exhaust'}],
'car_models': [{'model_id': 4,
'manufacturer_id': 3,
'model_name': 'F-150',
'model_year': 2021},
{'model_id': 5,
'manufacturer_id': 4,
'model_name': '3 Series',
'model_year': 2020},
{'model_id': 6,
'manufacturer_id': 5,
'model_name': 'Silverado',
'model_year': 2021},
{'model_id': 7,
'manufacturer_id': 3,
'model_name': 'Escape',
'model_year': 2019}],
'parts': [{'part_id': 6,
'category_id': 4,
'part_number': 'ELC-ALT-FD21',
'description': 'Alternator assembly (Ford F-150 2021-)'},
{'part_id': 7,
'category_id': 5,
'part_number': 'TRN-6SPD-GM21',
'description': '6-speed automatic transmission (Chevrolet Silverado 2021-)'},
{'part_id': 8,
'category_id': 6,
'part_number': 'EXH-MUF-BMW20',
'description': 'Muffler assembly (BMW 3 Series 2020-2023)'},
{'part_id': 9,
'category_id': 4,
'part_number': 'ELC-BAT-FD19',
'description': '12V battery (Ford Escape 2019-2021)'},
{'part_id': 10,
'category_id': 5,
'part_number': 'TRN-CVT-FD19',
'description': 'CVT transmission (Ford Escape 2019-2021)'}],
'car_model_parts': [{'model_id': 4, 'part_id': 6, 'qty': 1},
{'model_id': 4, 'part_id': 9, 'qty': 1},
{'model_id': 5, 'part_id': 8, 'qty': 1},
{'model_id': 6, 'part_id': 7, 'qty': 1},
{'model_id': 7, 'part_id': 9, 'qty': 1},
{'model_id': 7, 'part_id': 10, 'qty': 1},
{'model_id': 6, 'part_id': 7, 'qty': 2}]},
'corrupted_data': {'manufacturers': [{'manufacturer_id': 3, 'name': 'Ford'},
{'manufacturer_id': 4},
{'manufacturer_id': 5, 'name': 'Chevrolet'},
{'name': 'Toyota Motor Corp'}],
'part_categories': [{'category_id': 4, 'category_name': 'Electrical'},
{'category_id': 5},
{'category_id': 6, 'category_name': 'Exhaust'}],
'car_models': [{'model_id': 4,
'manufacturer_id': 3,
'model_name': 'F-150',
'model_year': 2021},
{'model_id': 5, 'manufacturer_id': 4, 'model_name': '3 Series'},
{'model_id': 6,
'manufacturer_id': 5,
'model_name': 'Silverado',
'model_year': 2021},
{'model_id': 7, 'model_name': 'Escape', 'model_year': 2019}],
'parts': [{'part_id': 6,
'category_id': 4,
'part_number': 'ELC-ALT-FD21',
'description': 'Alternator assembly (Ford F-150 2021-)'},
{'part_id': 7,
'category_id': 5,
'description': '6-speed automatic transmission (Chevrolet Silverado 2021-)'},
{'part_id': 8,
'category_id': 6,
'part_number': 'EXH-MUF-BMW20',
'description': {'partDetails': {'fitment': {'vehicle': {'make': 'BMW',
'model': '3 Series',
'year': 2020}},
'package': {'dimensions': {'length': {'value': 120, 'unit': 'cm'},
'width': {'value': 40, 'unit': 'cm'},
'height': {'value': 30, 'unit': 'cm'}}}}}},
{'part_id': 9,
'part_number': 'ELC-BAT-FD19',
'description': '12V battery (Ford Escape 2019-2021)'},
{'part_id': 10,
'category_id': 5,
'part_number': 'TRN-CVT-FD19',
'description': 'CVT transmission (Ford Escape 2019-2021)'}],
'car_model_parts': [{'model_id': 4, 'part_id': 6, 'qty': 1},
{'model_id': 4, 'part_id': 9, 'qty': 1},
{'model_id': 5, 'part_id': 8},
{'model_id': 6, 'part_id': 7, 'qty': 1},
{'model_id': 7, 'part_id': 9, 'qty': 1},
{'part_id': 10, 'qty': 1},
{'model_id': 6, 'part_id': 7, 'qty': 2}]},
'fault_labels': [{'severity': <Severity.BLOCKER: 'blocker'>,
'fix': <FixType.ASK_VALUE: 'ask_value'>,
'target': "Removed NOT NULL columns in several tables: manufacturers.name absent for manufacturer_id=4 and manufacturers.manufacturer_id missing for new row 'Toyota Motor Corp'; part_categories.category_name omitted for category_id=5; car_models.model_year omitted for model_id=5 and manufacturer_id omitted for model_id=7; parts.part_number missing for part_id=7 and parts.category_id missing for part_id=9; car_model_parts.qty omitted for (model_id=5, part_id=8) and car_model_parts.model_id omitted for row with part_id=10.",
'violation_type': 'required_fields_missing'},
{'severity': <Severity.INFO: 'info'>,
'fix': <FixType.AUTO: 'auto'>,
'target': 'Replaced the flat TEXT value in parts.description for part_id=8 with a 5-level nested JSON structure (partDetails → fitment → vehicle and partDetails → package → dimensions → length/width/height) to introduce excessive nesting.',
'violation_type': 'nested_relationship_depth'}],
'expected_detections': 2}
agent_result['detected_faults']
[DetectedFault(severity='blocker', fix='ask_value', target='manufacturers: row with manufacturer_id=4 is missing required field `name` (NOT NULL).'),
DetectedFault(severity='blocker', fix='ask_value', target='part_categories: row with category_id=5 is missing required field `category_name` (NOT NULL).'),
DetectedFault(severity='blocker', fix='ask_value', target='car_models: row with model_id=5 is missing required field `model_year` (NOT NULL).'),
DetectedFault(severity='blocker', fix='ask_value', target='car_models: row with model_id=7 is missing required field `manufacturer_id` (NOT NULL / FK to manufacturers).'),
DetectedFault(severity='blocker', fix='ask_value', target='parts: row with part_id=7 is missing required field `part_number` (NOT NULL).'),
DetectedFault(severity='blocker', fix='ask_value', target='parts: row with part_id=9 is missing required field `category_id` (NOT NULL / FK to part_categories).'),
DetectedFault(severity='risky', fix='ask_value', target='parts: row with part_id=8 has `description` as an object, expected TEXT.'),
DetectedFault(severity='risky', fix='ask_value', target='car_model_parts: duplicate combination (model_id=6, part_id=7) appears with qty=1 and qty=2.'),
DetectedFault(severity='blocker', fix='ask_value', target='car_model_parts: row {"part_id": 10, "qty": 1} is missing required field `model_id` (NOT NULL / FK).')]
Summary
This is a very abrupt end to the notebook. It was at this point, seeing the valid yet ill-formatted response from our agent, that I realized this notebook was going to be longer than I anticipated and needs some rework. We haven't even got into the meat of using DSPy yet - this will come.
Current Known Limitations
- When generating the 'clean' data in step 1, we are assuming the model does maintain referential integrity. This may work for small tables and datasets, but would need some 'verifier' to really trust the cleanliness of output. You could do this a couple ways: actually try to load the json into SQL (and then delete),
- The database schema itself that we start with is static. In the next iteration, we would add a pre step to generate different schemas with different constraints.
Issues with Current Code
- The ground-truth labels are root-cause (“orphaned_child_records”, “flatten_data”).
- The agent sees only the symptoms in the JSON: FK violations, duplicate columns, etc.
- Nothing in the prompt tells it to invent a higher-level abstraction (“orphaned_child_records”) or to coalesce many row-level violations into one label.
- We expect the agent to emit severity and fix type as well, but those are policy decisions that require extra domain knowledge – they are not directly observable from the data.
NOTE: We also need to consider how we accept new data. Ie. instead of classifying something as 'out of scope', we assume its new data that needs to be included in the database and may require
Other Considerations
After reviewing this code again, I entertained the thought that maybe our corrupted_data
generation was actually too broad. If we delete a foreign key reference column from the JSON data, that affects every row in the JSON table. Conversely, the data inspection agent seems to look at each 'item' of JSON and notices the error for each row without consideration of a larger 'theme' for why this error exists. Therefore if we tried to corrupt the data such that each 'affected' line was output as a 'fault' for the data agent to predict, it might make this easier.
In practice, this becomes more complicated - since the corruption exercise needs to take a broad corruption or transformation, apply it, then go through the result to output each data point impacted. This is a lot of 'napkin math' (aka. context) we are relying on the LLM to keep track of, which is not their strong suit (hence the concept of 'context engineering'). Instead we should still stick with the approach of doing a 2-stage data agent: first step is to ID all data issues, the second is to roll them up to broad issues we can score against. This is still 'inefficient' as I'd love for this to be a single step.
Next Steps
- Remove trying to pre-define the
severity
andimpact
. These are policy decisions - when the policy is defined, we can pass it along with the data and schema to an LLM and reasonably expect it to get these right. o3 was correct in pointing this out. This means the data generation step is simply to introduce the errors as we have done, just without theseverity
andfix
- so not too much wasted code. - Break out the data agent into 3 steps:
- Detection (objective, data-driven)
- Task: enumerate every concrete defect that can be verified mechanically (missing parent row, unexpected column, duplicate PK, etc.).
- Output: DetectedIssue{table, column/row reference, description}.
- NOTE: unless we change something in the corruption generation code, this step won't have a metric (making DSPy less helpful). However if we did update the corruption module to 'break-down' the changes into their 'component parts' then we could assess score?
- Root-cause grouping (heuristic / LLM)
- Input: the Stage A list.
- Task: cluster issues that share a causal pattern (e.g. “all these FK errors stem from missing part_categories.category_id=5”) and assign a human-readable group label.
- This is where terms like “orphaned_child_records” or “flatten_data” live and thus we can use metrics to improve prompts
- Good place for 'few-shot' examples
- Impact & remediation classification (policy)
- Input: grouped root causes from Stage B.
- Task: map to Severity ∈ {blocker, risky, …}, FixType ∈ {auto, ask_value, …}.
- Can be a rule table or another small model; decoupling lets you tweak policy without retraining detection.
- See above note about outlining our policy about how we would want a model to handle this
- Detection (objective, data-driven)
Future Directions
- Improving the scoring
- Feedback loop (incorporate human responses to new plan)
- Vary database schema and introduce more tables for added complexity; run 'clean' generated data to SQL to ensure it was defined properly
Notes on Synthetic SQL Data Limitations
Notes from a HN post that describes gotchas when generating synthetic data:
b0a04gl:
seen this pattern a before too. faker holds shape without flow. real tables come from actions : retry, decline, manual review, all that. you just set col types, you might miss why the row even happened. gen needs to simulate behavior, not format
Replies
ajd555:
Was looking for this exact comment. I completely agree with this method, especially if you're testing an entire flow, and not just a UI tool. You want to test the service that interfaces between the API and the database.
I've been writing custom simulation agents (just simple go programs) that simulate different users of my system. I can scale appropriately and see test data flow in. If metabase could generate these simulation agents based on a schema and some instructions, now that would be quite neat! Good job on this first version of the tool, though!
matthewhefferon:
That's a solid callout, appreciate you pointing it out. I'll definitely dig into that more.
tomrod:
The best synthetic data are those that capture ingestion and action, instead of just relationship.
Relationship is important, but your data structure might capture a virtually infinite number of unexpected behaviors that you would preferably call errors or bugs.