Data Onboarder - Part 2
Can I make a reliable data agent that can identify how to integrate (synthetic) JSON into SQL.
This is Part 2 - a follow up to try and simplify how we generate synthetic data and actually run DSPy evals
TL;DR
- Objective: Improve how we generate synthetic JSON by simplifying data (compared to Part 1) and use DSPy optimization to improve how well the data agent identifies JSON data issues against the SQL schema
- Process: Simplified the JSON by removing
severity
andfix
from the synthetic data, as well as used fewer data corruptions overall (but done in a way we can more easily add in new types later). Then built quantitative and qualitative evaluations for use in DSPy to optimize few-shot examples and the data agent prompt. Finally we tried different model combinations and optimizers. - Outcome: Despite trying to 'simplify' how I built synthetic data, the code is still quite complex for my liking. Additionally, creating evals for unlabelled complex data in an attempt to optimize prompts is an art and not a science. Tools like
SIMBA
andMIPROv2
were used for prompt optimization and while they looked promising, they in fact overfit the prompts or were too expensive to run. Trying different models with and without labelled few-shot examples (in other words, different optimizers) is important . None of this analysis would have been possible without generating synthetic data first. - Conclusion: GPT-4.1 and Sonnet-4 are actually better at using a complex network of tools to compare JSON to SQL compared to o3 (bigger isn't always better); adding few-shot examples doesn't seem to really help when using
dspy.ReACT
agent.
Intro
I ended the last notebook with a few takeaways, namely that we are asking the data agent (whose goal is first to identify gaps between incoming JSON and the existing SQL) to 'predict' too many things at once. This meant removing fix
and severity
as outputs from the data agent, we first focus on identifying the issues and will rely on policy (which will be a separate agent) to determine these types of consequences.
I also wanted to have this notebook actually use DSPy for agent optimization. This meant building evaluations and applying them in a DSPy optimization process.
Note that a lot of the setup code below is copied from Part 1. Ideally Id like to not repeat all of this, but I didn't have time to think of another way to avoid DRY.
Setup uv Environment
%%capture
!uv pip install -U sqlite-utils dspy pydantic
import sqlite3
import pandas as pd
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple, TypedDict, Dict, Any, Union
import dspy
import os
from enum import Enum
import random
from pydantic import BaseModel, Field
GENERATE_NEW_DATA = True # Set to True to regenerate data
GENERATE_NEW_DATA = False # Set to True to regenerate data
# Get OpenAI API key from environment variable
openai_api_key = os.getenv('OPENAI_API_KEY')
if not openai_api_key:
raise ValueError("OPENAI_API_KEY environment variable not found. Please set it with your OpenAI API key.")
# Configure DSPy with O3 model
lm = dspy.LM('openai/o3', api_key=openai_api_key, temperature=1.0, max_tokens=50000)
dspy.configure(lm=lm)
print("✅ DSPy configured with OpenAI O3 model")
print(f"Model: {lm.model}")
print("Ready to use DSPy signatures and modules!")
✅ DSPy configured with OpenAI O3 model
Model: openai/o3
Ready to use DSPy signatures and modules!
# test using dspy
# lm("Is Greenland in Europe or NA?")
Database Setup
Now let's create our SQLite database with the car parts schema and sample data:
# Create database file
db_path = Path("tmp/car_parts.db")
print(f"Creating database at: {db_path}")
# make sure the directory exists
db_path.parent.mkdir(exist_ok=True)
# Connect to database
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Enable foreign key enforcement
cursor.execute("PRAGMA foreign_keys = ON")
print("Foreign key enforcement enabled")
Creating database at: tmp/car_parts.db
Foreign key enforcement enabled
# Drop existing tables if they exist
drop_statements = [
"DROP TABLE IF EXISTS car_model_parts",
"DROP TABLE IF EXISTS parts",
"DROP TABLE IF EXISTS part_categories",
"DROP TABLE IF EXISTS car_models",
"DROP TABLE IF EXISTS manufacturers"
]
for statement in drop_statements:
cursor.execute(statement)
print(f"Executed: {statement}")
conn.commit()
print("All tables dropped successfully")
Executed: DROP TABLE IF EXISTS car_model_parts
Executed: DROP TABLE IF EXISTS parts
Executed: DROP TABLE IF EXISTS part_categories
Executed: DROP TABLE IF EXISTS car_models
Executed: DROP TABLE IF EXISTS manufacturers
All tables dropped successfully
# Create all tables
create_statements = [
"""CREATE TABLE manufacturers (
manufacturer_id INTEGER PRIMARY KEY,
name TEXT NOT NULL UNIQUE
)""",
"""CREATE TABLE car_models (
model_id INTEGER PRIMARY KEY,
manufacturer_id INTEGER NOT NULL,
model_name TEXT NOT NULL,
model_year INTEGER NOT NULL,
FOREIGN KEY (manufacturer_id) REFERENCES manufacturers(manufacturer_id)
ON DELETE CASCADE
)""",
"""CREATE TABLE part_categories (
category_id INTEGER PRIMARY KEY,
category_name TEXT NOT NULL UNIQUE
)""",
"""CREATE TABLE parts (
part_id INTEGER PRIMARY KEY,
category_id INTEGER NOT NULL,
part_number TEXT NOT NULL UNIQUE,
description TEXT NOT NULL,
FOREIGN KEY (category_id) REFERENCES part_categories(category_id)
ON DELETE RESTRICT
)""",
"""CREATE TABLE car_model_parts (
model_id INTEGER NOT NULL,
part_id INTEGER NOT NULL,
qty INTEGER NOT NULL DEFAULT 1,
PRIMARY KEY (model_id, part_id),
FOREIGN KEY (model_id) REFERENCES car_models(model_id)
ON DELETE CASCADE,
FOREIGN KEY (part_id) REFERENCES parts(part_id)
ON DELETE RESTRICT
)"""
]
for i, statement in enumerate(create_statements, 1):
cursor.execute(statement)
table_name = statement.split()[2] # Extract table name
print(f"{i}. Created table: {table_name}")
conn.commit()
print("All tables created successfully!")
1. Created table: manufacturers
2. Created table: car_models
3. Created table: part_categories
4. Created table: parts
5. Created table: car_model_parts
All tables created successfully!
Insert Sample Data
Now let's populate the tables with sample data:
# Insert manufacturers
manufacturers_data = [
('Toyota',),
('Honda',)
]
cursor.executemany("INSERT INTO manufacturers (name) VALUES (?)", manufacturers_data)
print(f"Inserted {len(manufacturers_data)} manufacturers")
# Insert car models
car_models_data = [
(1, 'Camry', 2022),
(1, 'Corolla', 2023),
(2, 'Accord', 2022),
(2, 'Civic', 2023)
]
cursor.executemany("INSERT INTO car_models (manufacturer_id, model_name, model_year) VALUES (?, ?, ?)", car_models_data)
print(f"Inserted {len(car_models_data)} car models")
# Insert part categories
part_categories_data = [
('Engine',),
('Brake',),
('Suspension',)
]
cursor.executemany("INSERT INTO part_categories (category_name) VALUES (?)", part_categories_data)
print(f"Inserted {len(part_categories_data)} part categories")
conn.commit()
print("Basic data inserted successfully!")
Inserted 2 manufacturers
Inserted 4 car models
Inserted 3 part categories
Basic data inserted successfully!
# Insert parts
parts_data = [
(1, 'ENG-2.5L-A25A', '2.5 L I4 engine (Toyota)'),
(1, 'ENG-1.5T-L15B', '1.5 L Turbo I4 engine (Honda)'),
(2, 'BRK-FR-TOY-22', 'Front brake pad set (Toyota 2022+)'),
(2, 'BRK-FR-HON-22', 'Front brake pad set (Honda 2022+)'),
(3, 'SUS-STRUT-CIV-23', 'Front strut (Civic 2023)')
]
cursor.executemany("INSERT INTO parts (category_id, part_number, description) VALUES (?, ?, ?)", parts_data)
print(f"Inserted {len(parts_data)} parts")
# Insert car model parts relationships
car_model_parts_data = [
# Camry 2022
(1, 1, 1), # Camry -> Toyota Engine
(1, 3, 2), # Camry -> Toyota Brake Pads (qty: 2)
# Corolla 2023
(2, 3, 2), # Corolla -> Toyota Brake Pads (qty: 2)
# Accord 2022
(3, 2, 1), # Accord -> Honda Engine
(3, 4, 2), # Accord -> Honda Brake Pads (qty: 2)
# Civic 2023
(4, 2, 1), # Civic -> Honda Engine
(4, 5, 2) # Civic -> Front Strut (qty: 2)
]
cursor.executemany("INSERT INTO car_model_parts (model_id, part_id, qty) VALUES (?, ?, ?)", car_model_parts_data)
print(f"Inserted {len(car_model_parts_data)} car model-part relationships")
conn.commit()
print("All sample data inserted successfully!")
Inserted 5 parts
Inserted 7 car model-part relationships
All sample data inserted successfully!
Verify the Data
Let's verify that our database was created correctly and query some data:
# Check table structure
print("=== DATABASE TABLES ===")
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
tables = cursor.fetchall()
for table in tables:
print(f"✓ {table[0]}")
print(f"\nTotal tables created: {len(tables)}")
# Show row counts
print("\n=== ROW COUNTS ===")
for table in tables:
cursor.execute(f"SELECT COUNT(*) FROM {table[0]}")
count = cursor.fetchone()[0]
print(f"{table[0]}: {count} rows")
=== DATABASE TABLES ===
✓ car_model_parts
✓ car_models
✓ manufacturers
✓ part_categories
✓ parts
Total tables created: 5
=== ROW COUNTS ===
car_model_parts: 7 rows
car_models: 4 rows
manufacturers: 2 rows
part_categories: 3 rows
parts: 5 rows
# Example query: Get all car models with their manufacturers
query = """
SELECT
m.name as manufacturer,
cm.model_name,
cm.model_year
FROM car_models cm
JOIN manufacturers m ON cm.manufacturer_id = m.manufacturer_id
ORDER BY m.name, cm.model_name
"""
df_models = pd.read_sql_query(query, conn)
print("=== CAR MODELS ===")
print(df_models.to_string(index=False))
=== CAR MODELS ===
manufacturer model_name model_year
Honda Accord 2022
Honda Civic 2023
Toyota Camry 2022
Toyota Corolla 2023
# Complex query: Show which parts fit which car models
query = """
SELECT
m.name as manufacturer,
cm.model_name,
cm.model_year,
pc.category_name as part_category,
p.part_number,
p.description,
cmp.qty
FROM car_model_parts cmp
JOIN car_models cm ON cmp.model_id = cm.model_id
JOIN manufacturers m ON cm.manufacturer_id = m.manufacturer_id
JOIN parts p ON cmp.part_id = p.part_id
JOIN part_categories pc ON p.category_id = pc.category_id
ORDER BY m.name, cm.model_name, pc.category_name
"""
df_compatibility = pd.read_sql_query(query, conn)
print("=== PARTS COMPATIBILITY ===")
print(df_compatibility.to_string(index=False))
=== PARTS COMPATIBILITY ===
manufacturer model_name model_year part_category part_number description qty
Honda Accord 2022 Brake BRK-FR-HON-22 Front brake pad set (Honda 2022+) 2
Honda Accord 2022 Engine ENG-1.5T-L15B 1.5 L Turbo I4 engine (Honda) 1
Honda Civic 2023 Engine ENG-1.5T-L15B 1.5 L Turbo I4 engine (Honda) 1
Honda Civic 2023 Suspension SUS-STRUT-CIV-23 Front strut (Civic 2023) 2
Toyota Camry 2022 Brake BRK-FR-TOY-22 Front brake pad set (Toyota 2022+) 2
Toyota Camry 2022 Engine ENG-2.5L-A25A 2.5 L I4 engine (Toyota) 1
Toyota Corolla 2023 Brake BRK-FR-TOY-22 Front brake pad set (Toyota 2022+) 2
# Close database connection
conn.close()
print(f"Database connection closed. Database saved as: {db_path}")
print(f"Database size: {db_path.stat().st_size} bytes")
Database connection closed. Database saved as: tmp/car_parts.db
Database size: 40960 bytes
Define Tools
# Database connection helper
def get_db_connection(db_path: str = "tmp/car_parts.db") -> sqlite3.Connection:
"""Get a database connection with foreign keys enabled."""
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA foreign_keys = ON")
return conn
# -----------------------------------------------------------------------------
# NEW SCHEMA DISCOVERY FUNCTION USING PRAGMA (For Data Generation)
# -----------------------------------------------------------------------------
def get_schema(tables: List[str] | None = None,
include_fks: bool = True,
max_cols: int | None = None,
format: str = "json") -> Union[dict, str]:
"""
Return schema metadata for the specified tables using SQLite PRAGMA commands.
This function replaces collate_schema() and is used by data generation agents
to understand the true database structure without hardcoded assumptions.
Args:
tables: List of table names to get schema for. None returns all tables.
include_fks: Whether to include foreign key relationships.
max_cols: Maximum number of columns to return per table (for context control).
format: "json" for structured dict, "text" for LLM-friendly string format
Returns:
If format="json": Dictionary with schema metadata
If format="text": Formatted string similar to collate_schema() output
"""
conn = get_db_connection()
cursor = conn.cursor()
# Get list of tables if not specified
if tables is None:
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
tables = [row[0] for row in cursor.fetchall()]
schema_dict = {}
for table_name in tables:
table_info = {
"table": table_name,
"columns": [],
"foreign_keys": [],
"indexes": [],
"row_count": 0
}
# Get column information using PRAGMA table_info
cursor.execute(f"PRAGMA table_info({table_name})")
columns = cursor.fetchall()
# Get unique constraints using PRAGMA index_list
cursor.execute(f"PRAGMA index_list({table_name})")
indexes = cursor.fetchall()
unique_cols = set()
for idx in indexes:
if idx[2] == 1: # unique index
cursor.execute(f"PRAGMA index_info({idx[1]})")
idx_cols = cursor.fetchall()
for col in idx_cols:
unique_cols.add(col[2]) # column name
# Build column list with all metadata
for i, col in enumerate(columns):
if max_cols and i >= max_cols:
break
col_dict = {
"name": col[1], # column name
"type": col[2], # data type
"not_null": col[3], # not null constraint
"default": col[4], # default value
"pk": col[5], # primary key position (0 if not PK)
"unique": 1 if col[1] in unique_cols else 0
}
table_info["columns"].append(col_dict)
# Get foreign key information if requested
if include_fks:
cursor.execute(f"PRAGMA foreign_key_list({table_name})")
fks = cursor.fetchall()
for fk in fks:
fk_dict = {
"from": fk[3], # from column
"to_table": fk[2], # referenced table
"to_column": fk[4], # referenced column
"on_update": fk[5], # on update action
"on_delete": fk[6], # on delete action
"match": fk[7] # match type
}
table_info["foreign_keys"].append(fk_dict)
# Also mark the column as having FK
for col in table_info["columns"]:
if col["name"] == fk[3]:
col["fk"] = f"{fk[2]}.{fk[4]}"
# Get index information
cursor.execute(f"PRAGMA index_list({table_name})")
indexes = cursor.fetchall()
for idx in indexes:
cursor.execute(f"PRAGMA index_info({idx[1]})")
idx_cols = cursor.fetchall()
idx_dict = {
"name": idx[1],
"unique": idx[2],
"columns": [col[2] for col in idx_cols]
}
table_info["indexes"].append(idx_dict)
# Get row count
cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
table_info["row_count"] = cursor.fetchone()[0]
schema_dict[table_name] = table_info
conn.close()
# Return structured dict if requested
if format == "json":
return schema_dict
# Otherwise, format as text similar to collate_schema output
output_lines = []
output_lines.append("=== DATABASE SCHEMA INFORMATION ===\n")
output_lines.append(f"Database contains {len(tables)} tables: {', '.join(tables)}\n")
# Foreign key relationships summary
all_fks = []
for table_name, info in schema_dict.items():
for fk in info["foreign_keys"]:
all_fks.append({
'from_table': table_name,
'from_column': fk['from'],
'to_table': fk['to_table'],
'to_column': fk['to_column'],
'on_delete': fk['on_delete'],
'on_update': fk['on_update']
})
if all_fks:
output_lines.append("--- FOREIGN KEY RELATIONSHIPS ---")
for fk in all_fks:
relationship_desc = (
f"{fk['from_table']}.{fk['from_column']} -> "
f"{fk['to_table']}.{fk['to_column']} "
f"(ON DELETE {fk['on_delete']}, ON UPDATE {fk['on_update']})"
)
output_lines.append(relationship_desc)
output_lines.append("")
# Detailed table schemas
output_lines.append("--- TABLE SCHEMAS ---")
for table_name in sorted(schema_dict.keys()):
info = schema_dict[table_name]
output_lines.append(f"\nTable: {table_name}")
output_lines.append("-" * (len(table_name) + 7))
# Format column information
for col in info["columns"]:
column_info = f" {col['name']}: {col['type']}"
# Add constraints
constraints = []
if col["pk"] > 0:
constraints.append("PRIMARY KEY")
if col["not_null"] == 1:
constraints.append("NOT NULL")
if col.get("unique") == 1 and col["pk"] == 0: # Don't show UNIQUE for PKs
constraints.append("UNIQUE")
if col["default"] is not None:
constraints.append(f"DEFAULT {col['default']}")
if "fk" in col:
constraints.append(f"REFERENCES {col['fk']}")
if constraints:
column_info += f" ({', '.join(constraints)})"
output_lines.append(column_info)
# Add foreign key info for this table
if info["foreign_keys"]:
output_lines.append(" Foreign Keys:")
for fk in info["foreign_keys"]:
fk_info = f" {fk['from']} references {fk['to_table']}.{fk['to_column']}"
output_lines.append(fk_info)
# Add unique indexes (excluding primary keys)
unique_indexes = [idx for idx in info["indexes"] if idx["unique"] == 1]
if unique_indexes:
output_lines.append(" Unique Indexes:")
for idx in unique_indexes:
output_lines.append(f" {idx['name']}: ({', '.join(idx['columns'])})")
# Add row count
output_lines.append(f" Current rows: {info['row_count']}")
return "\n".join(output_lines)
# Keep existing tools but NOT as DSPy tools - these are for the notebook's use
def list_tables() -> List[str]:
"""
Get all table names in the database.
Use this tool when: You need to know what tables exist in the database before
querying or inserting data.
Returns: List of table names as strings.
Example output: ['manufacturers', 'car_models', 'parts']
"""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
tables = [row[0] for row in cursor.fetchall()]
conn.close()
return tables
def get_table_schema(table_name: str) -> pd.DataFrame:
"""
Get detailed schema information for a specific table including column names,
types, constraints, and whether columns allow NULL values.
Use this tool when: You need to understand the structure of a table before
inserting data, including what columns exist, their data types, which are
required (NOT NULL), and which are primary keys.
Args:
table_name: Name of the table to inspect
Returns: DataFrame with columns: column_name, data_type, not_null, default_value, primary_key
Example output for 'manufacturers' table:
| column_name | data_type | not_null | default_value | primary_key |
|-----------------|-----------|----------|---------------|-------------|
| manufacturer_id | INTEGER | 0 | None | 1 |
| name | TEXT | 1 | None | 0 |
"""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(f"PRAGMA table_info({table_name})")
columns = cursor.fetchall()
conn.close()
df = pd.DataFrame(columns, columns=['cid', 'column_name', 'data_type', 'not_null', 'default_value', 'primary_key'])
return df[['column_name', 'data_type', 'not_null', 'default_value', 'primary_key']]
def get_foreign_key_relationships() -> pd.DataFrame:
"""
Get all foreign key relationships in the database showing how tables are connected.
Use this tool when: You need to understand table relationships before inserting
data to ensure referential integrity. This shows which columns reference other
tables and what happens on delete/update.
Returns: DataFrame with columns: from_table, from_column, to_table, to_column, on_delete, on_update
Example output:
| from_table | from_column | to_table | to_column | on_delete | on_update |
|-------------|-----------------|---------------|-----------------|-----------|-----------|
| car_models | manufacturer_id | manufacturers | manufacturer_id | CASCADE | NO ACTION |
"""
conn = get_db_connection()
cursor = conn.cursor()
# Get all tables
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [row[0] for row in cursor.fetchall()]
relationships = []
for table in tables:
cursor.execute(f"PRAGMA foreign_key_list({table})")
fks = cursor.fetchall()
for fk in fks:
relationships.append({
'from_table': table,
'from_column': fk[3], # from column
'to_table': fk[2], # to table
'to_column': fk[4], # to column
'on_delete': fk[6], # on delete action
'on_update': fk[5] # on update action
})
conn.close()
return pd.DataFrame(relationships)
def sample_table_data(table_name: str, limit: int = 5) -> pd.DataFrame:
"""
Get a sample of rows from a table to understand the data format and content.
Use this tool when: You need to see example data in a table to understand
the format, typical values, and data patterns before inserting new records.
Args:
table_name: Name of the table to sample
limit: Maximum number of rows to return (default: 5)
Returns: DataFrame containing sample rows from the table
Example output for 'manufacturers' table:
| manufacturer_id | name |
|-----------------|--------|
| 1 | Toyota |
| 2 | Honda |
"""
conn = get_db_connection()
query = f"SELECT * FROM {table_name} LIMIT {limit}"
df = pd.read_sql_query(query, conn)
conn.close()
return df
def get_unique_values(table_name: str, column_name: str, limit: int = 50) -> List[Any]:
"""
Get unique values from a specific column in a table.
Use this tool when: You need to see what values already exist in a column
to avoid duplicates, understand naming conventions, or see valid options
for foreign key references.
Args:
table_name: Name of the table
column_name: Name of the column to get unique values from
limit: Maximum number of unique values to return (default: 50)
Returns: List of unique values from the column
Example output for manufacturers.name:
['Toyota', 'Honda']
"""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(f"SELECT DISTINCT {column_name} FROM {table_name} ORDER BY {column_name} LIMIT {limit}")
values = [row[0] for row in cursor.fetchall()]
conn.close()
return values
def get_table_row_count(table_name: str) -> int:
"""
Get the total number of rows in a table.
Use this tool when: You need to understand the size of a table or check
if it's empty before inserting data.
Args:
table_name: Name of the table to count
Returns: Integer count of rows in the table
Example output: 2 (for manufacturers table)
"""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
count = cursor.fetchone()[0]
conn.close()
return count
def get_table_columns(table_name: str) -> List[str]:
"""
Get just the column names for a table (simpler than full schema).
Use this tool when: You need a quick list of column names for a table
without the full schema details.
Args:
table_name: Name of the table
Returns: List of column names as strings
Example output for 'manufacturers': ['manufacturer_id', 'name']
"""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(f"PRAGMA table_info({table_name})")
columns = [row[1] for row in cursor.fetchall()] # row[1] is column name
conn.close()
return columns
def execute_query(query: str) -> pd.DataFrame:
"""
Execute a custom SQL query and return results as DataFrame.
Use this tool when: You need to run a specific SQL query to understand
data relationships, check for existing data, or perform complex lookups
that other tools don't cover.
Args:
query: SQL query string to execute
Returns: DataFrame with query results
Example usage: execute_query("SELECT COUNT(*) as total FROM manufacturers")
"""
conn = get_db_connection()
df = pd.read_sql_query(query, conn)
conn.close()
return df
def check_referential_integrity(table_name: str, column_name: str, value: Any) -> bool:
"""
Check if a foreign key value exists in the referenced table.
Use this tool when: You need to verify that a foreign key value exists
in the parent table before inserting a record.
Args:
table_name: Name of the table containing the foreign key
column_name: Name of the foreign key column
value: Value to check for existence
Returns: True if the value exists in the referenced table, False otherwise
Example: check_referential_integrity('car_models', 'manufacturer_id', 1)
Returns: True if manufacturer_id=1 exists in manufacturers table
"""
# First get the foreign key relationship
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(f"PRAGMA foreign_key_list({table_name})")
fks = cursor.fetchall()
target_table = None
target_column = None
for fk in fks:
if fk[3] == column_name: # from column matches
target_table = fk[2] # to table
target_column = fk[4] # to column
break
if not target_table:
conn.close()
return False # No foreign key relationship found
# Check if value exists in target table
cursor.execute(f"SELECT COUNT(*) FROM {target_table} WHERE {target_column} = ?", (value,))
exists = cursor.fetchone()[0] > 0
conn.close()
return exists
def validate_json_types_against_schema(json_data: Dict[str, Any], table_name: str) -> List[str]:
"""
Validate that JSON data types match the expected database schema types.
Use this tool when: You need to verify that incoming JSON data has compatible
types before attempting to insert into the database.
Args:
json_data: Dictionary containing the JSON data to validate
table_name: Name of the target table to validate against
Returns: List of validation error messages (empty list if all valid)
Example output: ['Field "model_year" expected INTEGER but got string "2022"']
"""
errors = []
schema_df = get_table_schema(table_name)
for _, row in schema_df.iterrows():
column_name = row['column_name']
expected_type = row['data_type']
is_required = row['not_null'] == 1
is_primary_key = row['primary_key'] == 1
# Skip auto-increment primary keys
if is_primary_key and expected_type == 'INTEGER':
continue
if column_name in json_data:
value = json_data[column_name]
# Check for null values in required fields
if value is None and is_required:
errors.append(f"Field '{column_name}' is required but got null")
continue
# Skip type checking for null values in optional fields
if value is None:
continue
# Type validation
if expected_type == 'INTEGER':
if not isinstance(value, int):
# Allow string numbers that can be converted
try:
int(value)
except (ValueError, TypeError):
errors.append(f"Field '{column_name}' expected INTEGER but got {type(value).__name__} '{value}'")
elif expected_type == 'TEXT':
if not isinstance(value, str):
errors.append(f"Field '{column_name}' expected TEXT but got {type(value).__name__} '{value}'")
elif expected_type == 'REAL':
if not isinstance(value, (int, float)):
try:
float(value)
except (ValueError, TypeError):
errors.append(f"Field '{column_name}' expected REAL but got {type(value).__name__} '{value}'")
elif is_required and not is_primary_key:
errors.append(f"Required field '{column_name}' is missing from JSON data")
return errors
def find_semantic_duplicates(new_data: Dict[str, Any], table_name: str, similarity_threshold: float = 0.8) -> List[Dict[str, Any]]:
"""
Find existing records that might be semantic duplicates of the new data.
Use this tool when: You need to detect records that aren't exact duplicates
but represent the same entity (e.g., "Toyota" vs "TOYOTA" vs "Toyota Motors").
Args:
new_data: Dictionary containing the new record data
table_name: Name of the table to search for duplicates
similarity_threshold: Minimum similarity score (0.0 to 1.0) to consider a match
Returns: List of potentially duplicate records with similarity scores
Example output: [{'record': {...}, 'similarity': 0.85, 'matched_fields': ['name']}]
"""
import difflib
# Get existing data from the table
existing_df = sample_table_data(table_name, limit=1000) # Limit for performance
if existing_df.empty:
return []
potential_duplicates = []
# Get text columns for comparison (skip IDs and numeric-only fields)
text_columns = []
schema_df = get_table_schema(table_name)
for _, row in schema_df.iterrows():
if row['data_type'] == 'TEXT' and row['column_name'] in new_data:
text_columns.append(row['column_name'])
if not text_columns:
return []
# Compare new data against each existing record
for idx, existing_row in existing_df.iterrows():
total_similarity = 0
matched_fields = []
comparison_count = 0
for column in text_columns:
if column in new_data and pd.notna(existing_row[column]):
new_value = str(new_data[column]).lower().strip()
existing_value = str(existing_row[column]).lower().strip()
if new_value and existing_value:
similarity = difflib.SequenceMatcher(None, new_value, existing_value).ratio()
total_similarity += similarity
comparison_count += 1
if similarity >= similarity_threshold:
matched_fields.append(column)
if comparison_count > 0:
avg_similarity = total_similarity / comparison_count
if avg_similarity >= similarity_threshold:
potential_duplicates.append({
'record': existing_row.to_dict(),
'similarity': round(avg_similarity, 3),
'matched_fields': matched_fields
})
# Sort by similarity score (highest first)
potential_duplicates.sort(key=lambda x: x['similarity'], reverse=True)
return potential_duplicates
def validate_business_rules_with_llm(json_data: Dict[str, Any], table_name: str, business_rules: str) -> Dict[str, Any]:
"""
Use an LLM to perform a "sense check" of the data against business rules.
Use this tool when: You need to validate complex business logic that can't be
expressed as simple database constraints. The LLM will analyze the data and
rules to identify potential violations or concerns.
Args:
json_data: Dictionary containing the data to validate
table_name: Name of the target table
business_rules: String describing the business rules to validate against
Returns: Dictionary with validation results
Example output: {
'valid': False,
'violations': ['Honda part assigned to Toyota model'],
'warnings': ['Unusual quantity: 10 engines for one car'],
'suggestions': ['Consider verifying part compatibility']
}
Note: This is a placeholder function that would integrate with an LLM service.
"""
# Get related data for context
schema_info = get_table_schema(table_name).to_dict('records')
sample_data = sample_table_data(table_name, limit=3).to_dict('records')
# For foreign keys, get some context from related tables
fk_context = {}
fk_relationships = get_foreign_key_relationships()
table_fks = fk_relationships[fk_relationships['from_table'] == table_name]
for _, fk in table_fks.iterrows():
if fk['from_column'] in json_data:
fk_value = json_data[fk['from_column']]
related_data = execute_query(f"SELECT * FROM {fk['to_table']} WHERE {fk['to_column']} = {fk_value}")
if not related_data.empty:
fk_context[fk['from_column']] = related_data.iloc[0].to_dict()
# Placeholder LLM prompt structure
validation_context = {
'table_name': table_name,
'new_data': json_data,
'schema': schema_info,
'sample_existing_data': sample_data,
'foreign_key_context': fk_context,
'business_rules': business_rules
}
# TODO: Replace with actual LLM call
# prompt = f"""
# Analyze this data for business rule violations:
#
# Table: {table_name}
# New Data: {json_data}
# Business Rules: {business_rules}
# Schema Context: {schema_info}
# Related Data: {fk_context}
#
# Identify any violations, warnings, or suggestions.
# """
#
# llm_response = call_llm(prompt)
# Placeholder response - in real implementation, this would be LLM output
placeholder_result = {
'valid': True,
'violations': [],
'warnings': [],
'suggestions': ['Business rule validation requires LLM integration'],
'context_used': validation_context
}
return placeholder_result
# Test the tools
print("✅ All database validation tools defined successfully!")
print(f"Available tables: {list_tables()}")
print("🆕 Added: get_schema() - PRAGMA-based schema discovery for data generation")
print("🆕 Added: validate_json_types_against_schema()")
print("🆕 Added: find_semantic_duplicates()")
print("🆕 Added: validate_business_rules_with_llm() [placeholder]")
✅ All database validation tools defined successfully!
Available tables: ['car_model_parts', 'car_models', 'manufacturers', 'part_categories', 'parts']
🆕 Added: get_schema() - PRAGMA-based schema discovery for data generation
🆕 Added: validate_json_types_against_schema()
🆕 Added: find_semantic_duplicates()
🆕 Added: validate_business_rules_with_llm() [placeholder]
# Demonstrate the new validation tools
print("=== DEMONSTRATING NEW VALIDATION TOOLS ===\n")
# 1. JSON Type Validation
print("1. JSON Type Validation Examples:")
# Valid data
valid_data = {
"manufacturer_id": 1,
"model_name": "Prius",
"model_year": 2024
}
errors = validate_json_types_against_schema(valid_data, "car_models")
print(f" Valid data errors: {errors}")
# Invalid data - wrong types
invalid_data = {
"manufacturer_id": "not_a_number", # Should be INTEGER
"model_name": 123, # Should be TEXT
"model_year": "2024" # Could be converted to INT
}
errors = validate_json_types_against_schema(invalid_data, "car_models")
print(f" Invalid data errors: {errors}")
# Missing required field
missing_data = {
"model_name": "Prius"
# Missing required manufacturer_id and model_year
}
errors = validate_json_types_against_schema(missing_data, "car_models")
print(f" Missing fields errors: {errors}")
print("\n" + "="*50 + "\n")
# 2. Semantic Duplicate Detection
print("2. Semantic Duplicate Detection:")
# Test with similar manufacturer name
new_manufacturer = {"name": "TOYOTA"} # Similar to existing "Toyota"
duplicates = find_semantic_duplicates(new_manufacturer, "manufacturers")
print(f" Potential duplicates for 'TOYOTA': {len(duplicates)} found")
if duplicates:
for dup in duplicates:
print(f" - Match: {dup['record']['name']} (similarity: {dup['similarity']})")
# Test with different manufacturer
new_manufacturer2 = {"name": "Ford"}
duplicates2 = find_semantic_duplicates(new_manufacturer2, "manufacturers")
print(f" Potential duplicates for 'Ford': {len(duplicates2)} found")
print("\n" + "="*50 + "\n")
# 3. Business Rules Validation (Placeholder)
print("3. Business Rules Validation:")
sample_business_rules = """
1. A car model cannot have more than one engine
2. Honda parts should not be used on Toyota vehicles
3. Model year must be within 10 years of current year
4. Brake parts are required for all car models
"""
sample_data = {
"model_id": 1,
"part_id": 2, # Honda engine
"qty": 1
}
validation_result = validate_business_rules_with_llm(sample_data, "car_model_parts", sample_business_rules)
print(f" Validation result: {validation_result['valid']}")
print(f" Suggestions: {validation_result['suggestions']}")
print(f" Context gathered: {list(validation_result['context_used'].keys())}")
print("\n✅ All new validation tools demonstrated!")
=== DEMONSTRATING NEW VALIDATION TOOLS ===
1. JSON Type Validation Examples:
Valid data errors: []
Invalid data errors: ["Field 'manufacturer_id' expected INTEGER but got str 'not_a_number'", "Field 'model_name' expected TEXT but got int '123'"]
Missing fields errors: ["Required field 'manufacturer_id' is missing from JSON data", "Required field 'model_year' is missing from JSON data"]
==================================================
2. Semantic Duplicate Detection:
Potential duplicates for 'TOYOTA': 1 found
- Match: Toyota (similarity: 1.0)
Potential duplicates for 'Ford': 0 found
==================================================
3. Business Rules Validation:
Validation result: True
Suggestions: ['Business rule validation requires LLM integration']
Context gathered: ['table_name', 'new_data', 'schema', 'sample_existing_data', 'foreign_key_context', 'business_rules']
✅ All new validation tools demonstrated!
# Test the new get_schema() function
print("🧪 Testing new PRAGMA-based get_schema() function...\n")
# Test 1: Get full schema in JSON format
schema_json = get_schema(format="json")
print("1. JSON format - Tables found:", list(schema_json.keys()))
print("\nSample table info (manufacturers):")
manufacturers_info = schema_json.get('manufacturers', {})
print(f" Columns: {len(manufacturers_info.get('columns', []))}")
print(f" Foreign keys: {len(manufacturers_info.get('foreign_keys', []))}")
print(f" Row count: {manufacturers_info.get('row_count', 0)}")
# Show column details with PRAGMA info
print("\n Column details:")
for col in manufacturers_info.get('columns', []):
constraints = []
if col['pk']: constraints.append('PK')
if col['not_null']: constraints.append('NOT NULL')
if col['unique'] and not col['pk']: constraints.append('UNIQUE')
if 'fk' in col: constraints.append(f"FK:{col['fk']}")
constraint_str = f" ({', '.join(constraints)})" if constraints else ""
print(f" - {col['name']}: {col['type']}{constraint_str}")
print("\n" + "="*70 + "\n")
# Test 2: Get schema in text format (replacing collate_schema)
schema_text = get_schema(format="text")
print("2. Text format (collate_schema replacement):")
print("-" * 70)
print(schema_text[:1000] + "..." if len(schema_text) > 1000 else schema_text)
print("\n" + "="*70 + "\n")
# Test 3: Context control - get schema for specific tables with limited columns
limited_schema = get_schema(tables=['parts', 'car_models'], max_cols=3, format="json")
print("3. Context control - Limited schema (parts & car_models, max 3 cols):")
for table, info in limited_schema.items():
print(f"\n {table}:")
print(f" Columns shown: {len(info['columns'])} (limited to 3)")
for col in info['columns']:
print(f" - {col['name']}: {col['type']}")
print("\n" + "="*70 + "\n")
# Test 4: Verify FK detection with PRAGMA
print("4. Foreign Key Detection via PRAGMA:")
all_fks = []
for table, info in schema_json.items():
for fk in info.get('foreign_keys', []):
all_fks.append(f"{table}.{fk['from']} -> {fk['to_table']}.{fk['to_column']}")
print(f" Total foreign keys found: {len(all_fks)}")
for fk in all_fks:
print(f" - {fk}")
print("\n✅ get_schema() function successfully retrieves schema using PRAGMA commands!")
🧪 Testing new PRAGMA-based get_schema() function...
1. JSON format - Tables found: ['car_model_parts', 'car_models', 'manufacturers', 'part_categories', 'parts']
Sample table info (manufacturers):
Columns: 2
Foreign keys: 0
Row count: 2
Column details:
- manufacturer_id: INTEGER (PK)
- name: TEXT (NOT NULL, UNIQUE)
======================================================================
2. Text format (collate_schema replacement):
----------------------------------------------------------------------
=== DATABASE SCHEMA INFORMATION ===
Database contains 5 tables: car_model_parts, car_models, manufacturers, part_categories, parts
--- FOREIGN KEY RELATIONSHIPS ---
car_model_parts.part_id -> parts.part_id (ON DELETE RESTRICT, ON UPDATE NO ACTION)
car_model_parts.model_id -> car_models.model_id (ON DELETE CASCADE, ON UPDATE NO ACTION)
car_models.manufacturer_id -> manufacturers.manufacturer_id (ON DELETE CASCADE, ON UPDATE NO ACTION)
parts.category_id -> part_categories.category_id (ON DELETE RESTRICT, ON UPDATE NO ACTION)
--- TABLE SCHEMAS ---
Table: car_model_parts
----------------------
model_id: INTEGER (PRIMARY KEY, NOT NULL, REFERENCES car_models.model_id)
part_id: INTEGER (PRIMARY KEY, NOT NULL, REFERENCES parts.part_id)
qty: INTEGER (NOT NULL, DEFAULT 1)
Foreign Keys:
part_id references parts.part_id
model_id references car_models.model_id
Unique Indexes:
sqlite_autoindex_car_model_parts_1: (model_id, part_id)
Current rows: 7
Table: car_models
-----...
======================================================================
3. Context control - Limited schema (parts & car_models, max 3 cols):
parts:
Columns shown: 3 (limited to 3)
- part_id: INTEGER
- category_id: INTEGER
- part_number: TEXT
car_models:
Columns shown: 3 (limited to 3)
- model_id: INTEGER
- manufacturer_id: INTEGER
- model_name: TEXT
======================================================================
4. Foreign Key Detection via PRAGMA:
Total foreign keys found: 4
- car_model_parts.part_id -> parts.part_id
- car_model_parts.model_id -> car_models.model_id
- car_models.manufacturer_id -> manufacturers.manufacturer_id
- parts.category_id -> part_categories.category_id
✅ get_schema() function successfully retrieves schema using PRAGMA commands!
Helper Functions
# helper functions to collate schema and current sample data
# NOTE: collate_schema() has been replaced by get_schema(format="text")
# which uses PRAGMA commands for accurate schema discovery
def collate_sample_data(max_rows_per_table: int = 3) -> str:
"""
Collate sample data from all tables into a single formatted string.
This function retrieves sample data from each table and formats it
in a way that's easy for an LLM to understand the data patterns,
formats, and relationships.
Args:
max_rows_per_table: Maximum number of sample rows to include per table
Returns: A formatted string containing sample data from all tables
"""
output_lines = []
output_lines.append("=== SAMPLE DATA FROM ALL TABLES ===\n")
# Get all tables
tables = list_tables()
for table_name in sorted(tables):
output_lines.append(f"Table: {table_name}")
output_lines.append("-" * (len(table_name) + 7))
# Get sample data
sample_df = sample_table_data(table_name, limit=max_rows_per_table)
if sample_df.empty:
output_lines.append(" (No data)")
else:
# Convert DataFrame to a more readable format
output_lines.append(f" Columns: {', '.join(sample_df.columns.tolist())}")
output_lines.append(" Sample rows:")
for idx, row in sample_df.iterrows():
row_data = []
for col in sample_df.columns:
value = row[col]
if pd.isna(value):
value = "NULL"
elif isinstance(value, str):
value = f"'{value}'"
row_data.append(f"{col}={value}")
output_lines.append(f" [{', '.join(row_data)}]")
output_lines.append("")
return "\n".join(output_lines)
Define Transformations
We categorize data issues into three types:
Schema-Verifiable Violations: Issues detectable through database constraints
- Missing required fields
- Foreign key violations
- Type mismatches
Business Logic Violations: Issues requiring domain knowledge
- Invalid date ranges
- Semantic duplicates
- Workflow state violations
Data Transformations: Structural changes from source systems
- Renamed columns
- Flattened relationships
- Added noise fields
# -----------------------------------------------------------------------------
# DSPY MODULE FOR CONTEXT-AWARE INJECTIONS
# -----------------------------------------------------------------------------
class GenerateInjectionData(dspy.Signature):
"""
Generate context-aware data for injection operations based on database schema and existing data.
Use the schema and sample data to understand:
- Table relationships and foreign key constraints
- Data types and formats used in the database
- Naming conventions and patterns
- Business logic implied by the data
Based on the corruption_task, generate appropriate values that:
- Are plausible enough to not be immediately obvious as corruptions
- Follow similar patterns to existing data (but violate specific constraints)
- Make semantic sense in the domain (automotive parts database)
"""
current_schema = dspy.InputField(
desc="Complete database schema with table structures, relationships, and constraints"
)
sample_data = dspy.InputField(
desc="Sample of existing data showing patterns and formats"
)
corruption_task = dspy.InputField(
desc="Specific corruption task to perform (e.g., 'suggest noise columns for parts table', 'generate invalid type value for year field')"
)
generated_data: dict = dspy.OutputField(
desc="Generated data appropriate for the corruption task with keys: values (list), explanation (str)"
)
class InjectionHelper(dspy.Module):
"""Helper module for injections that need LLM assistance with schema context"""
def __init__(self):
# Use faster model for injection generation
self.lm = dspy.LM(
# "openai/gpt-4.1",
"openai/o3",
api_key=os.getenv("OPENAI_API_KEY"),
temperature=1.0,
max_tokens=50000,
)
self.sig = dspy.Predict(GenerateInjectionData)
# Cache schema and sample data
self._schema_cache = None
self._sample_cache = None
def get_schema(self):
if self._schema_cache is None:
# Use new PRAGMA-based get_schema function
self._schema_cache = get_schema(format="text")
return self._schema_cache
def get_sample_data(self):
if self._sample_cache is None:
self._sample_cache = collate_sample_data()
return self._sample_cache
def forward(self, corruption_task: str):
with dspy.context(lm=self.lm):
response = self.sig(
current_schema=self.get_schema(),
sample_data=self.get_sample_data(),
corruption_task=corruption_task,
)
return response.generated_data
# Create a global instance for use by injections
injection_helper = InjectionHelper()
# -----------------------------------------------------------------------------
# LIGHTWEIGHT HELPER CLASSES FOR DETERMINISTIC TRANSFORMATIONS
# -----------------------------------------------------------------------------
from typing import Dict, Any, Tuple, List
import pandas as pd
import random
class EvidenceKind(Enum):
SCHEMA_DIFF = "schema_diff"
RI_DIFF = "ri_diff"
MISSING_REQUIRED = "missing_required"
TYPE_MISMATCH = "type_mismatch"
MISSING_TABLE = "missing_table"
PRIMARY_KEY_DUPLICATE = "primary_key_duplicate"
UNIQUE_VIOLATION = "unique_violation"
UNEXPECTED_TABLE = "unexpected_table"
# TODO: Update this as some injections later on assign 'kind' explicitly, deviating from this Evidence setup below
class Evidence(dict):
"""Tiny wrapper so every transform returns verifiable facts."""
@staticmethod
def schema_diff(
table: str, missing_cols: List[str] = None, unexpected_cols: List[str] = None
):
return Evidence(
kind=EvidenceKind.SCHEMA_DIFF.value,
table=table,
missing_cols=missing_cols or [],
unexpected_cols=unexpected_cols or [],
)
@staticmethod
def ri_diff(child_tbl: str, parent_tbl: str, fk: str, orphan_row_ids: List[Any]):
return Evidence(
kind=EvidenceKind.RI_DIFF.value,
child_tbl=child_tbl,
parent_tbl=parent_tbl,
fk=fk,
orphan_row_ids=orphan_row_ids,
)
@staticmethod
def missing_required(table: str, row_id: Any, missing_fields: List[str]):
return Evidence(
kind=EvidenceKind.MISSING_REQUIRED.value,
table=table,
row_id=row_id,
missing_fields=missing_fields,
)
@staticmethod
def type_mismatch(
table: str, column: str, row_id: Any, expected_type: str, actual_type: str
):
return Evidence(
kind=EvidenceKind.TYPE_MISMATCH.value,
table=table,
column=column,
row_id=row_id,
expected_type=expected_type,
actual_type=actual_type,
)
# -----------------------------------------------------------------
# Allow dot-notation (e.kind) in addition to dict access (e['kind'])
# -----------------------------------------------------------------
def __getattr__(self, item):
try:
return self[item]
except KeyError:
raise AttributeError(item)
def __setattr__(self, key, value):
self[key] = value
class Injection:
symptom_key: str # must match pick_violation keys
root_cause: str # optional
def sample_params(self, db: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
return {}
def apply(
self, db: Dict[str, pd.DataFrame], **params
) -> Tuple[Dict[str, pd.DataFrame], Evidence]:
raise NotImplementedError
# -----------------------------------------------------------------------------
# IMPLEMENT 2-3 REPRESENTATIVE TRANSFORMATIONS
# -----------------------------------------------------------------------------
class RenameColumns(Injection):
symptom_key = "rename_columns"
root_cause = "schema_drift"
def sample_params(self, db: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
# Pick a random table and column to rename
tables_with_columns = [t for t in db if len(db[t].columns) > 0]
if not tables_with_columns:
return dict(table="", mapping={})
table = random.choice(tables_with_columns)
old = random.choice(list(db[table].columns))
new = f"{old}_legacy"
return dict(table=table, mapping={old: new})
def apply(
self, db: Dict[str, pd.DataFrame], table: str, mapping: Dict[str, str]
) -> Tuple[Dict[str, pd.DataFrame], Evidence]:
new_db = db.copy()
if table and table in db and mapping:
new_db[table] = db[table].rename(columns=mapping)
ev = Evidence.schema_diff(
table=table,
missing_cols=list(mapping.keys()),
unexpected_cols=list(mapping.values()),
)
else:
ev = Evidence()
return new_db, ev
class DropTables(Injection):
symptom_key = "drop_tables"
root_cause = "incomplete_migration"
def sample_params(self, db: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
# Don't drop all tables
if len(db) <= 1:
return dict(table="")
# Pick a table to drop (preferably not the main one)
tables = list(db.keys())
# Try to avoid dropping tables with many relationships
table_to_drop = random.choice(tables)
return dict(table=table_to_drop)
def apply(
self, db: Dict[str, pd.DataFrame], table: str
) -> Tuple[Dict[str, pd.DataFrame], Evidence]:
new_db = db.copy()
if table and table in new_db:
del new_db[table]
ev = Evidence(kind="missing_table", table=table)
else:
ev = Evidence()
return new_db, ev
class FlattenData(Injection):
symptom_key = "flatten_data"
root_cause = "denormalization"
def sample_params(self, db: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
# Use PRAGMA-based schema to find actual foreign key relationships
schema_info = get_schema(format="json")
candidates = []
for child_table in db:
table_info = schema_info.get(child_table, {})
for fk in table_info.get('foreign_keys', []):
parent_table = fk['to_table']
fk_column = fk['from']
pk_column = fk['to_column']
if parent_table in db:
candidates.append({
"child_table": child_table,
"parent_table": parent_table,
"fk_column": fk_column,
"pk_column": pk_column
})
if candidates:
chosen = random.choice(candidates)
return chosen
else:
return dict(child_table="", parent_table="", fk_column="", pk_column="")
def apply(
self,
db: Dict[str, pd.DataFrame],
child_table: str,
parent_table: str,
fk_column: str,
pk_column: str,
) -> Tuple[Dict[str, pd.DataFrame], Evidence]:
new_db = db.copy()
# Validate all parameters exist
if not all([child_table, parent_table, fk_column, pk_column]):
return new_db, Evidence()
if child_table not in db or parent_table not in db:
return new_db, Evidence()
child_df = db[child_table]
parent_df = db[parent_table]
# Check if the required columns exist
if fk_column not in child_df.columns or pk_column not in parent_df.columns:
return new_db, Evidence()
# Rename child columns to avoid conflicts
child_renamed = child_df.rename(
columns={
col: f"{child_table}_{col}" if col != fk_column else col
for col in child_df.columns
}
)
# Merge
try:
merged = parent_df.merge(
child_renamed, left_on=pk_column, right_on=fk_column, how="left"
)
# Update parent table with flattened data
new_db[parent_table] = merged
# Remove child table
del new_db[child_table]
# Create evidence
new_cols = [
f"{child_table}_{col}" for col in child_df.columns if col != fk_column
]
ev = Evidence.schema_diff(
table=parent_table, missing_cols=[], unexpected_cols=new_cols
)
except Exception as e:
# If merge fails, return unchanged
return new_db, Evidence()
return new_db, ev
class ForeignKeyNull(Injection):
symptom_key = "foreign_key_null_when_required"
root_cause = "referential_integrity_violation"
def sample_params(self, db: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
# Use PRAGMA-based schema to find actual foreign key columns
schema_info = get_schema(format="json")
candidates = []
for table in db:
table_info = schema_info.get(table, {})
for fk in table_info.get('foreign_keys', []):
fk_col = fk['from']
candidates.append((table, fk_col))
if not candidates:
return dict(table="", fk_column="", row_idx=0)
table, fk_col = random.choice(candidates)
# Pick a random row to null
if len(db[table]) == 0:
return dict(table=table, fk_column=fk_col, row_idx=0)
row_idx = random.randint(0, len(db[table]) - 1)
return dict(table=table, fk_column=fk_col, row_idx=row_idx)
def apply(
self, db: Dict[str, pd.DataFrame], table: str, fk_column: str, row_idx: int
) -> Tuple[Dict[str, pd.DataFrame], Evidence]:
new_db = db.copy()
if (
not table
or table not in db
or not fk_column
or fk_column not in db[table].columns
):
return new_db, Evidence()
new_db[table] = db[table].copy()
# Get the primary key column and value for this row using PRAGMA info
schema_info = get_schema(tables=[table], format="json")
table_info = schema_info.get(table, {})
pk_cols = [col['name'] for col in table_info.get('columns', []) if col['pk'] > 0]
if pk_cols and row_idx < len(new_db[table]):
pk_col = pk_cols[0]
row_id = new_db[table].iloc[row_idx][pk_col]
else:
row_id = row_idx
# Set FK to null
if row_idx < len(new_db[table]):
new_db[table].loc[row_idx, fk_column] = None
# Get parent table from PRAGMA foreign key info
parent_table = None
for fk in table_info.get('foreign_keys', []):
if fk['from'] == fk_column:
parent_table = fk['to_table']
break
if not parent_table:
parent_table = fk_column.replace("_id", "s") # fallback
ev = Evidence.ri_diff(
child_tbl=table,
parent_tbl=parent_table,
fk=fk_column,
orphan_row_ids=[row_id],
)
return new_db, ev
class NullInNotNull(Injection):
symptom_key = "null_in_not_null_columns"
root_cause = "missing_value"
def sample_params(self, db):
# Use PRAGMA to find NOT NULL columns
schema_info = get_schema(format="json")
candidates = []
for table, df in db.items():
table_info = schema_info.get(table, {})
for col_info in table_info.get('columns', []):
if col_info['not_null'] == 1 and col_info['pk'] == 0: # NOT NULL but not PK
col_name = col_info['name']
if col_name in df.columns:
candidates.append((table, col_name))
if not candidates:
# Fallback to heuristic
candidates = [
(t, c)
for t, df in db.items()
for c in df.columns
if c.endswith("_id") or "name" in c
]
if not candidates:
return dict(table="", column="", row_idx=0)
tbl, col = random.choice(candidates)
row_idx = random.randrange(len(db[tbl])) if len(db[tbl]) > 0 else 0
return dict(table=tbl, column=col, row_idx=row_idx)
def apply(self, db, table, column, row_idx):
ndb = db.copy()
ndb[table] = db[table].copy()
ndb[table].loc[row_idx, column] = None
ev = Evidence.missing_required(
table=table,
row_id=row_idx,
missing_fields=[column],
)
return ndb, ev
class AddNoise(Injection):
symptom_key = "add_noise"
root_cause = "noise_columns"
def sample_params(self, db):
table = random.choice(list(db))
return dict(table=table, n_cols=random.randint(1, 3))
def apply(self, db, table, n_cols):
# Use injection helper for context-aware noise columns
task = f"Suggest {n_cols} plausible but irrelevant column names for the {table} table in a car parts database. These should sound reasonable but not actually belong."
result = injection_helper(task)
if "values" in result and result["values"]:
new_cols = result["values"][:n_cols]
else:
# Fallback to simple generation
lm = dspy.get_lm()
suggestions = lm(
f"Suggest {n_cols} plausible but irrelevant column names for a car-parts table."
).text
new_cols = [c.strip() for c in suggestions.split(",")][:n_cols]
# Clean column names
new_cols = [c.strip().lower().replace(" ", "_") for c in new_cols]
ndb = db.copy()
ndb[table] = db[table].copy()
# Add columns with appropriate data types based on names
for col in new_cols:
if "count" in col or "quantity" in col:
ndb[table][col] = random.choices(range(0, 100), k=len(ndb[table]))
elif "flag" in col or "is_" in col:
ndb[table][col] = random.choices([True, False], k=len(ndb[table]))
elif "date" in col:
ndb[table][col] = pd.Timestamp.now()
else:
ndb[table][col] = None # Default to null
ev = Evidence.schema_diff(
table=table, missing_cols=[], unexpected_cols=new_cols
)
return ndb, ev
# Create registry
INJECTION_LIBRARY = {cls.symptom_key: cls() for cls in Injection.__subclasses__()}
print(f"✅ Created injection library with {len(INJECTION_LIBRARY)} transformations:")
print("✅ Updated to use PRAGMA-based schema discovery!")
for key, inj in INJECTION_LIBRARY.items():
print(f" - {key} (root cause: {inj.root_cause})")
✅ Created injection library with 6 transformations:
✅ Updated to use PRAGMA-based schema discovery!
- rename_columns (root cause: schema_drift)
- drop_tables (root cause: incomplete_migration)
- flatten_data (root cause: denormalization)
- foreign_key_null_when_required (root cause: referential_integrity_violation)
- null_in_not_null_columns (root cause: missing_value)
- add_noise (root cause: noise_columns)
class PrimaryKeyDuplicates(Injection):
symptom_key = "primary_key_duplicates"
root_cause = "duplicate_rows"
def sample_params(self, db: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
# Find a table with a primary key and at least one row
candidates = []
for table_name, df in db.items():
if len(df) > 0:
# Look for columns that might be primary keys (ending with _id)
pk_cols = [col for col in df.columns if col.endswith('_id') and table_name.rstrip('s') in col]
if pk_cols:
candidates.append((table_name, pk_cols[0]))
if not candidates:
return dict(table='', pk_column='', row_idx=0)
table, pk_col = random.choice(candidates)
# Pick a random row to duplicate
row_idx = random.randint(0, len(db[table]) - 1)
return dict(table=table, pk_column=pk_col, row_idx=row_idx)
def apply(self, db: Dict[str, pd.DataFrame], table: str, pk_column: str, row_idx: int
) -> Tuple[Dict[str, pd.DataFrame], Evidence]:
new_db = db.copy()
if not table or table not in db or not pk_column or pk_column not in db[table].columns:
return new_db, Evidence()
if row_idx >= len(db[table]):
return new_db, Evidence()
# Copy the dataframe
new_df = db[table].copy()
# Get the row to duplicate
duplicate_row = new_df.iloc[row_idx].copy()
duplicate_pk_value = duplicate_row[pk_column]
# Append the duplicate row
new_df = pd.concat([new_df, pd.DataFrame([duplicate_row])], ignore_index=True)
new_db[table] = new_df
ev = Evidence.schema_diff(
table=table,
missing_cols=[],
unexpected_cols=[]
)
# Add custom field for duplicate info
ev['duplicate_pk'] = duplicate_pk_value
ev['kind'] = 'primary_key_duplicate'
return new_db, ev
class DataTypeMismatches(Injection):
symptom_key = "data_type_mismatches"
root_cause = "type_mismatch"
def sample_params(self, db: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
# Find numeric columns to corrupt with strings
candidates = []
for table_name, df in db.items():
for col in df.columns:
# Target integer columns (often IDs or years)
if df[col].dtype in ['int64', 'int32', 'float64'] and len(df) > 0:
candidates.append((table_name, col))
if not candidates:
return dict(table='', column='', row_idx=0, bad_value='')
table, column = random.choice(candidates)
row_idx = random.randint(0, len(db[table]) - 1)
# Use injection helper to generate context-aware bad values
task = f"Generate a plausible but type-mismatched string value for the {column} field in {table} table (currently numeric). Should look like real data but be non-numeric."
result = injection_helper(task)
if 'values' in result and result['values']:
bad_value = result['values'][0]
else:
# Fallback to default bad values
bad_values = ['not_a_number', 'N/A', 'unknown', 'TBD', '???']
bad_value = random.choice(bad_values)
return dict(table=table, column=column, row_idx=row_idx, bad_value=bad_value)
def apply(self, db: Dict[str, pd.DataFrame], table: str, column: str,
row_idx: int, bad_value: str) -> Tuple[Dict[str, pd.DataFrame], Evidence]:
new_db = db.copy()
if not table or table not in db or not column or column not in db[table].columns:
return new_db, Evidence()
if row_idx >= len(db[table]):
return new_db, Evidence()
new_df = db[table].copy()
# Get the original type
original_dtype = str(new_df[column].dtype)
# Get row identifier for evidence
pk_cols = [col for col in new_df.columns if col.endswith('_id') and table.rstrip('s') in col]
if pk_cols and row_idx < len(new_df):
row_id = new_df.iloc[row_idx][pk_cols[0]]
else:
row_id = row_idx
# Convert column to object type to allow mixed types
new_df[column] = new_df[column].astype('object')
new_df.loc[row_idx, column] = bad_value
new_db[table] = new_df
ev = Evidence.type_mismatch(
table=table,
column=column,
row_id=row_id,
expected_type='INTEGER' if 'int' in original_dtype else 'REAL',
actual_type='TEXT'
)
return new_db, ev
class UniqueConstraintViolations(Injection):
symptom_key = "unique_constraint_violations"
root_cause = "unique_violation"
def sample_params(self, db: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
# Find columns that should be unique (like part_number, name fields)
candidates = []
for table_name, df in db.items():
if len(df) > 0:
for col in df.columns:
# Target columns that typically have unique constraints
if any(keyword in col.lower() for keyword in ['number', 'name', 'code']):
if df[col].dtype == 'object': # String columns
candidates.append((table_name, col))
if not candidates:
return dict(table='', column='', row_idx=0)
table, column = random.choice(candidates)
# Pick a random existing value to duplicate
row_idx = random.randint(0, len(db[table]) - 1)
return dict(table=table, column=column, row_idx=row_idx)
def apply(self, db: Dict[str, pd.DataFrame], table: str, column: str, row_idx: int
) -> Tuple[Dict[str, pd.DataFrame], Evidence]:
new_db = db.copy()
if not table or table not in db or not column or column not in db[table].columns:
return new_db, Evidence()
if row_idx >= len(db[table]) or len(db[table]) < 2:
return new_db, Evidence()
new_df = db[table].copy()
# Get the value to duplicate
value_to_duplicate = new_df.iloc[row_idx][column]
# Find another row to change (not the same row)
other_indices = [i for i in range(len(new_df)) if i != row_idx]
if not other_indices:
return new_db, Evidence()
target_row_idx = random.choice(other_indices)
# Set the duplicate value
new_df.loc[target_row_idx, column] = value_to_duplicate
new_db[table] = new_df
ev = Evidence.schema_diff(
table=table,
missing_cols=[],
unexpected_cols=[]
)
# Add custom fields for unique violation
ev['kind'] = 'unique_violation'
ev['column'] = column
ev['duplicate_value'] = value_to_duplicate
ev['row_indices'] = [row_idx, target_row_idx]
return new_db, ev
class AddRelatedData(Injection):
symptom_key = "add_related_data"
root_cause = "unexpected_data"
def sample_params(self, db: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
# Choose to either add columns to existing table or add a new table
action = random.choice(['add_columns', 'add_table'])
if action == 'add_columns' and db:
table = random.choice(list(db.keys()))
return dict(action=action, table=table, num_columns=random.randint(1, 3))
else:
return dict(action='add_table', table='', num_columns=0)
def apply(self, db: Dict[str, pd.DataFrame], action: str, table: str, num_columns: int
) -> Tuple[Dict[str, pd.DataFrame], Evidence]:
new_db = db.copy()
if action == 'add_columns' and table and table in db:
# Add columns to existing table
new_df = db[table].copy()
# Use injection helper to generate context-aware column names
task = f"Suggest {num_columns} plausible column names that might be mistakenly added to the {table} table in a car parts database. These should be related but not actually belong in this table."
result = injection_helper(task)
if 'values' in result and result['values']:
new_cols = result['values'][:num_columns]
else:
# Fallback
new_cols = [f"extra_col_{i}" for i in range(num_columns)]
# Ensure columns are properly formatted
new_cols = [c.strip().lower().replace(" ", "_") for c in new_cols]
# Add the columns with context-aware data
for col in new_cols:
# Generate appropriate data based on column name
if 'date' in col or 'time' in col:
new_df[col] = pd.Timestamp.now()
elif 'id' in col or 'number' in col:
new_df[col] = range(1000, 1000 + len(new_df))
elif 'flag' in col or 'is_' in col:
new_df[col] = random.choices([True, False], k=len(new_df))
elif 'price' in col or 'cost' in col:
new_df[col] = [round(random.uniform(10.0, 500.0), 2) for _ in range(len(new_df))]
else:
# Use injection helper to generate appropriate values
value_task = f"Generate {min(3, len(new_df))} sample values for a column named '{col}' in the {table} table"
value_result = injection_helper(value_task)
if 'values' in value_result and value_result['values']:
sample_values = value_result['values']
# Repeat or extend to match dataframe length
new_df[col] = [sample_values[i % len(sample_values)] for i in range(len(new_df))]
else:
new_df[col] = [f"{col}_value_{i}" for i in range(len(new_df))]
new_db[table] = new_df
ev = Evidence.schema_diff(
table=table,
missing_cols=[],
unexpected_cols=new_cols
)
else:
# Add a new table
task = "Suggest a plausible table name for a car parts database that would be related but shouldn't exist. Not manufacturers, car_models, parts, part_categories, or car_model_parts."
result = injection_helper(task)
if 'values' in result and result['values']:
table_name = result['values'][0].strip().lower().replace(" ", "_")
else:
table_name = "extra_table"
# Create a simple table with context-aware columns
column_task = f"Suggest 4 column names for a new '{table_name}' table in a car parts database"
column_result = injection_helper(column_task)
if 'values' in column_result and column_result['values']:
columns = column_result['values'][:4]
else:
columns = [f"{table_name}_id", "name", "created_date", "active"]
# Build table data
new_table_data = {}
for i, col in enumerate(columns):
col_clean = col.strip().lower().replace(" ", "_")
if i == 0 or 'id' in col_clean: # First column or ID column
new_table_data[col_clean] = range(1, 4)
elif 'date' in col_clean:
new_table_data[col_clean] = pd.Timestamp.now()
elif 'active' in col_clean or 'flag' in col_clean:
new_table_data[col_clean] = [True, True, False]
else:
new_table_data[col_clean] = [f"{col_clean}_{j}" for j in range(1, 4)]
new_db[table_name] = pd.DataFrame(new_table_data)
ev = Evidence.schema_diff(
table=table_name,
missing_cols=[],
unexpected_cols=list(new_table_data.keys())
)
ev['kind'] = 'unexpected_table'
ev['table'] = table_name
return new_db, ev
# Update the registry - this will automatically include all subclasses
INJECTION_LIBRARY = {cls.symptom_key: cls()
for cls in Injection.__subclasses__()}
print(f"✅ Updated injection library with {len(INJECTION_LIBRARY)} transformations:")
for key, inj in INJECTION_LIBRARY.items():
print(f" - {key} (root cause: {inj.root_cause})")
✅ Updated injection library with 10 transformations:
- rename_columns (root cause: schema_drift)
- drop_tables (root cause: incomplete_migration)
- flatten_data (root cause: denormalization)
- foreign_key_null_when_required (root cause: referential_integrity_violation)
- null_in_not_null_columns (root cause: missing_value)
- add_noise (root cause: noise_columns)
- primary_key_duplicates (root cause: duplicate_rows)
- data_type_mismatches (root cause: type_mismatch)
- unique_constraint_violations (root cause: unique_violation)
- add_related_data (root cause: unexpected_data)
# Test all injection classes
print("🧪 Testing all injection classes...")
# Create a sample database
test_db = {
'manufacturers': pd.DataFrame({
'manufacturer_id': [1, 2, 3],
'name': ['Toyota', 'Honda', 'Ford']
}),
'parts': pd.DataFrame({
'part_id': [1, 2, 3, 4],
'category_id': [1, 1, 2, 3],
'part_number': ['ENG-001', 'ENG-002', 'BRK-001', 'SUS-001'],
'description': ['Engine Part 1', 'Engine Part 2', 'Brake Part 1', 'Suspension Part 1']
}),
'car_models': pd.DataFrame({
'model_id': [1, 2, 3],
'manufacturer_id': [1, 2, 3],
'model_name': ['Camry', 'Accord', 'F-150'],
'model_year': [2023, 2023, 2023]
})
}
# Test each injection
test_results = []
for symptom_key, injection in INJECTION_LIBRARY.items():
try:
# Generate parameters
params = injection.sample_params(test_db)
# Apply injection
corrupted_db, evidence = injection.apply(test_db, **params)
# Check result
test_results.append({
'injection': symptom_key,
'root_cause': injection.root_cause,
'params': params,
'evidence': dict(evidence),
'status': '✅ Success'
})
print(f"✅ {symptom_key}: Success")
print(f" Root cause: {injection.root_cause}")
print(f" Evidence: {evidence.get('kind', 'N/A')}")
except Exception as e:
test_results.append({
'injection': symptom_key,
'root_cause': injection.root_cause,
'error': str(e),
'status': '❌ Failed'
})
print(f"❌ {symptom_key}: Failed - {e}")
print(f"\n📊 Test Summary: {sum(1 for r in test_results if r['status'] == '✅ Success')}/{len(test_results)} passed")
🧪 Testing all injection classes...
✅ rename_columns: Success
Root cause: schema_drift
Evidence: schema_diff
✅ drop_tables: Success
Root cause: incomplete_migration
Evidence: missing_table
✅ flatten_data: Success
Root cause: denormalization
Evidence: schema_diff
✅ foreign_key_null_when_required: Success
Root cause: referential_integrity_violation
Evidence: ri_diff
✅ null_in_not_null_columns: Success
Root cause: missing_value
Evidence: missing_required
✅ add_noise: Success
Root cause: noise_columns
Evidence: schema_diff
✅ primary_key_duplicates: Success
Root cause: duplicate_rows
Evidence: primary_key_duplicate
✅ data_type_mismatches: Success
Root cause: type_mismatch
Evidence: type_mismatch
✅ unique_constraint_violations: Success
Root cause: unique_violation
Evidence: unique_violation
✅ add_related_data: Success
Root cause: unexpected_data
Evidence: unexpected_table
📊 Test Summary: 10/10 passed
# Demonstrate InjectionHelper with schema context
print("🔍 Demonstrating InjectionHelper with schema context...")
# Test a few different tasks
test_tasks = [
"Suggest 3 plausible but irrelevant column names for the parts table",
"Generate a type-mismatched value for a year field that would look realistic but be invalid",
"Suggest a related table name that sounds plausible but shouldn't exist in this car parts database",
"Generate realistic but duplicate part numbers for testing unique constraints"
]
for task in test_tasks:
print(f"\n🎯 Task: {task}")
try:
result = injection_helper(task)
print(f" Generated values: {result.get('values', [])}")
print(f" Explanation: {result.get('explanation', 'No explanation provided')}")
except Exception as e:
print(f" ❌ Error: {e}")
print("\n✅ InjectionHelper demonstration complete!")
🔍 Demonstrating InjectionHelper with schema context...
🎯 Task: Suggest 3 plausible but irrelevant column names for the parts table
Generated values: ['recommended_torque_lbft', 'operating_temperature_c', 'recycling_fee_usd']
Explanation: The suggested columns sound technically credible for automotive components (torque specs, thermal range, environmental fee) and follow snake_case naming like existing part_number and category_id, yet they are not part of the current business logic for identifying or categorising a part in this schema, making them irrelevant noise.
🎯 Task: Generate a type-mismatched value for a year field that would look realistic but be invalid
Generated values: ['2022-23']
Explanation: ‘2022-23’ mimics a common model-year range notation (e.g., for cars produced over two calendar years). Visually it looks like a reasonable year, but the dash makes it a TEXT value rather than an INTEGER, so inserting it into the model_year column (declared INTEGER NOT NULL) will violate the type expectation while remaining plausible to a casual reviewer.
🎯 Task: Suggest a related table name that sounds plausible but shouldn't exist in this car parts database
Generated values: ['manufacturer_contacts']
Explanation: The name follows the existing plural-noun convention (e.g., manufacturers, car_models) and suggests a legitimate extension of the current domain—keeping track of contact information for each manufacturer—yet no such table is defined in the schema. It therefore looks believable while remaining invalid.
🎯 Task: Generate realistic but duplicate part numbers for testing unique constraints
Generated values: ['ENG-2.5L-A25A', 'ENG-2.5L-A25A', 'BRK-FR-TOY-22', 'BRK-FR-TOY-22', 'ENG-1.5T-L15B']
Explanation: Each value is a real part_number already present in the sample data. By submitting the same identifiers multiple times we trigger two levels of uniqueness violations: (1) duplicates within the batch itself, and (2) duplicates against the pre-existing UNIQUE index in the parts table. This makes the data appear legitimate (all part numbers follow the same naming convention) while still exercising the unique-constraint checks.
✅ InjectionHelper demonstration complete!
import uuid, json, pandas as pd, random
import numpy as np
from typing import Dict, Tuple
def convert_numpy_types(obj):
"""Convert NumPy types to native Python types for JSON serialization."""
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
elif isinstance(obj, pd.Timestamp):
return obj.isoformat()
elif isinstance(obj, dict):
return {key: convert_numpy_types(value) for key, value in obj.items()}
elif isinstance(obj, list):
return [convert_numpy_types(item) for item in obj]
else:
return obj
def build_root_cause_map():
"""Build a mapping from symptom keys to root causes from the injection library."""
return {
symptom_key: getattr(injection, 'root_cause', symptom_key)
for symptom_key, injection in INJECTION_LIBRARY.items()
}
def check_constraints_after_injection(db_dict: Dict[str, pd.DataFrame]) -> List[str]:
"""
Lightweight constraint checker that uses PRAGMA-based schema info to validate
corrupted DataFrames against basic database constraints.
This helps catch impossible states like NaN in NOT NULL fields, invalid FKs, etc.
Args:
db_dict: Dictionary of table names to DataFrames
Returns:
List of constraint violation messages
"""
violations = []
try:
# Get schema info for all tables
schema_info = get_schema(format="json")
for table_name, df in db_dict.items():
if table_name not in schema_info:
continue
table_info = schema_info[table_name]
# Check NOT NULL constraints
for col_info in table_info.get('columns', []):
col_name = col_info['name']
if col_info['not_null'] == 1 and col_name in df.columns:
null_count = df[col_name].isna().sum()
if null_count > 0:
violations.append(f"{table_name}.{col_name}: {null_count} NULL values in NOT NULL column")
# Check data types (basic validation)
for col_info in table_info.get('columns', []):
col_name = col_info['name']
expected_type = col_info['type']
if col_name not in df.columns:
continue
# Skip null values for type checking
non_null_values = df[col_name].dropna()
if len(non_null_values) == 0:
continue
# Basic type validation
if expected_type == 'INTEGER':
# Check if any values can't be converted to int
invalid_values = []
for val in non_null_values:
try:
if isinstance(val, str):
int(val)
elif not isinstance(val, (int, np.integer)):
invalid_values.append(val)
except (ValueError, TypeError):
invalid_values.append(val)
if invalid_values:
violations.append(f"{table_name}.{col_name}: {len(invalid_values)} non-integer values in INTEGER column")
elif expected_type == 'TEXT':
non_string_count = sum(1 for val in non_null_values if not isinstance(val, str))
if non_string_count > 0:
violations.append(f"{table_name}.{col_name}: {non_string_count} non-text values in TEXT column")
# Check foreign key constraints (if we have the referenced table)
for fk_info in table_info.get('foreign_keys', []):
fk_col = fk_info['from']
ref_table = fk_info['to_table']
ref_col = fk_info['to_column']
if fk_col not in df.columns or ref_table not in db_dict:
continue
# Get non-null FK values
fk_values = df[fk_col].dropna()
if len(fk_values) == 0:
continue
# Check if referenced values exist
ref_df = db_dict[ref_table]
if ref_col in ref_df.columns:
valid_refs = set(ref_df[ref_col].dropna())
invalid_refs = [val for val in fk_values if val not in valid_refs]
if invalid_refs:
violations.append(f"{table_name}.{fk_col}: {len(invalid_refs)} invalid foreign key references")
# Check unique constraints (where we have index info)
for idx_info in table_info.get('indexes', []):
if idx_info.get('unique') == 1:
idx_cols = idx_info['columns']
# Check if all columns exist in the DataFrame
if all(col in df.columns for col in idx_cols):
# Check for duplicates in unique index columns
subset_df = df[idx_cols].dropna()
if len(subset_df) > 0:
duplicates = subset_df.duplicated().sum()
if duplicates > 0:
violations.append(f"{table_name}: {duplicates} duplicate rows in unique index ({', '.join(idx_cols)})")
except Exception as e:
violations.append(f"Constraint checker error: {str(e)}")
return violations
class ManifestBuilder:
def __init__(self, db_dict, *, allow_duplicates=False):
self.db = db_dict.copy()
self.rows = []
self.seen_root_causes = set()
self.allow_duplicates = allow_duplicates
def inject_one(self, symptom_key: str):
inj = INJECTION_LIBRARY[symptom_key]
if not self.allow_duplicates and inj.root_cause in self.seen_root_causes:
return # skip – root cause already represented
params = inj.sample_params(self.db)
self.db, evidence = inj.apply(self.db, **params)
# Run constraint checker after injection
constraint_violations = check_constraints_after_injection(self.db)
self.rows.append(
dict(
inj_id = str(uuid.uuid4()),
symptom_key = symptom_key,
root_cause = inj.root_cause,
evidence = json.dumps(convert_numpy_types(evidence)),
constraint_violations = constraint_violations, # Track constraint issues
)
)
self.seen_root_causes.add(inj.root_cause)
def random_inject(self, k:int=3) -> Tuple[Dict[str, pd.DataFrame], pd.DataFrame]:
for _ in range(k):
self.inject_one(random.choice(list(INJECTION_LIBRARY)))
return self.db, pd.DataFrame(self.rows)
Generate Data
# Module 1: Generate clean, valid synthetic data based on schema and sample data
class GenerateCleanSynthData(dspy.Signature):
"""
Generate *new, clean, valid* synthetic data that conforms to the database schema.
Use the sample data to understand patterns, naming conventions, and data formats.
IMPORTANT: Create realistic automotive industry data with natural variation:
- Use real car manufacturers beyond the samples (Ford, Chevrolet, Nissan, BMW, etc.)
- Generate authentic part numbers following automotive conventions
- Create realistic model names and years (recent but not current sample years)
- Use proper automotive terminology in descriptions
- Maintain referential integrity across all foreign key relationships
- Include variety in data formatting while staying within schema constraints
The output should be well-formed JSON data that could be successfully inserted
into the database without any violations. This clean data will later be corrupted
with realistic data quality issues for testing purposes.
Generate 3-5 records per table to provide good variety for testing.
"""
current_schema = dspy.InputField(
desc="Complete database schema with table structures, relationships, and constraints"
)
sample_data = dspy.InputField(
desc="Sample of existing data showing patterns and formats"
)
clean_synthetic_data: dict = dspy.OutputField(
desc="Valid JSON data organized by table name, ready for database insertion"
)
class GenerateCleanData(dspy.Module):
"""First module: Generate clean synthetic data that conforms to schema"""
def __init__(self):
# Use new PRAGMA-based get_schema function
self.schema = get_schema(format="text")
self.sample = collate_sample_data()
self.sig = dspy.Predict(GenerateCleanSynthData)
def forward(self):
response = self.sig(current_schema=self.schema, sample_data=self.sample)
return response
def convert_to_dataframes(data_dict: dict) -> Dict[str, pd.DataFrame]:
"""{table: [row, …]} → {table: DataFrame}"""
return {tbl: pd.DataFrame(rows) for tbl, rows in data_dict.items()}
def convert_to_json(db_dict: Dict[str, pd.DataFrame]) -> dict:
"""{table: DataFrame} → {table: [row, …]}"""
return {tbl: df.to_dict(orient="records") for tbl, df in db_dict.items()}
# -----------------------------------------------------------------------------
# SYNTHETIC DATA PIPELINE
# -----------------------------------------------------------------------------
print("🔧 Testing alternative ManifestBuilder pipeline for comparison...")
# Step 1: Generate clean data using the existing generator
clean_generator = GenerateCleanData()
# Use faster model for clean data generation
# faster_lm = dspy.LM('openai/gpt-4.1', api_key=os.getenv('OPENAI_API_KEY'), temperature=1.0, max_tokens=5000)
# with dspy.context(lm=faster_lm):
clean_response = clean_generator()
clean_data = clean_response.clean_synthetic_data
print(f"✅ Generated clean data with {len(clean_data)} tables")
# Convert clean data to DataFrames
db_dict = convert_to_dataframes(clean_data)
# Step 2: Apply injections using ManifestBuilder
builder = ManifestBuilder(db_dict)
corrupted_db, manifest_df = builder.random_inject(k=5)
# Convert back to JSON format
corrupted_data = convert_to_json(corrupted_db)
print("\n=== MANIFEST BUILDER RESULTS ===")
print(f"Applied {len(manifest_df)} injections")
print(f"Tables in corrupted data: {list(corrupted_data.keys())}")
print("\n--- MANIFEST ---")
for _, row in manifest_df.iterrows():
print(f"\nInjection: {row['symptom_key']}")
print(f"Root cause: {row['root_cause']}")
print(f"Evidence: {row['evidence']}")
# Store results for comparison with simplified approach
manifestbuilder_result = {
'clean_data': clean_data,
'corrupted_data': corrupted_data,
'manifest_df': manifest_df,
'expected_detections': len(manifest_df)
}
# Build the root cause map
ROOT_CAUSE_MAP = build_root_cause_map()
print(f"\n✅ Built ROOT_CAUSE_MAP with {len(ROOT_CAUSE_MAP)} mappings")
print(f"📝 Note: This alternative approach uses deterministic injections vs LLM-based corruption")
🔧 Testing alternative ManifestBuilder pipeline for comparison...
✅ Generated clean data with 5 tables
=== MANIFEST BUILDER RESULTS ===
Applied 5 injections
Tables in corrupted data: ['manufacturers', 'part_categories', 'car_models', 'car_model_parts', 'part_warranty_claims']
--- MANIFEST ---
Injection: foreign_key_null_when_required
Root cause: referential_integrity_violation
Evidence: {"kind": "ri_diff", "child_tbl": "parts", "parent_tbl": "part_categories", "fk": "category_id", "orphan_row_ids": [9]}
Injection: flatten_data
Root cause: denormalization
Evidence: {"kind": "schema_diff", "table": "part_categories", "missing_cols": [], "unexpected_cols": ["parts_part_id", "parts_part_number", "parts_description"]}
Injection: add_related_data
Root cause: unexpected_data
Evidence: {"kind": "unexpected_table", "table": "part_warranty_claims", "missing_cols": [], "unexpected_cols": ["claim_id", "part_id", "claim_date", "failure_description"]}
Injection: null_in_not_null_columns
Root cause: missing_value
Evidence: {"kind": "missing_required", "table": "manufacturers", "row_id": 2, "missing_fields": ["name"]}
Injection: unique_constraint_violations
Root cause: unique_violation
Evidence: {"kind": "unique_violation", "table": "car_models", "missing_cols": [], "unexpected_cols": [], "column": "model_name", "duplicate_value": "F-150", "row_indices": [3, 1]}
✅ Built ROOT_CAUSE_MAP with 10 mappings
📝 Note: This alternative approach uses deterministic injections vs LLM-based corruption
# Test the complete PRAGMA-based upgrade
print("🎉 Testing complete PRAGMA-based schema upgrade...\n")
# Test the new constraint checker
print("1. Testing constraint checker:")
test_db = {
"manufacturers": pd.DataFrame(
{
"manufacturer_id": [1, 2, None], # NULL in what should be PK
"name": ["Toyota", "Honda", "Ford"],
}
),
"parts": pd.DataFrame(
{
"part_id": [1, 2, 3],
"category_id": [1, 999, 2], # 999 doesn't exist in part_categories
"part_number": ["P001", "P002", "invalid_type"],
"description": ["Desc1", "Desc2", "Desc3"],
}
),
}
violations = check_constraints_after_injection(test_db)
print(f" Found {len(violations)} constraint violations:")
for violation in violations:
print(f" - {violation}")
print("\n" + "=" * 70 + "\n")
# Test the updated ManifestBuilder with constraint checking
print("2. Testing updated ManifestBuilder with PRAGMA-based injections:")
# Create clean test data
clean_test_db = {
"manufacturers": pd.DataFrame(
{"manufacturer_id": [1, 2], "name": ["Toyota", "Honda"]}
),
"parts": pd.DataFrame(
{
"part_id": [1, 2, 3],
"category_id": [1, 1, 2],
"part_number": ["P001", "P002", "P003"],
"description": ["Engine Part", "Brake Part", "Body Part"],
}
),
}
# Test ManifestBuilder with constraint checking
builder = ManifestBuilder(clean_test_db)
corrupted_db, manifest_df = builder.random_inject(k=2)
print(f" Applied {len(manifest_df)} injections")
print(f" Tables after corruption: {list(corrupted_db.keys())}")
# Show constraint violations detected
if "constraint_violations" in manifest_df.columns:
for _, row in manifest_df.iterrows():
violations = row["constraint_violations"]
if violations:
print(
f" {row['symptom_key']}: {len(violations)} constraint violations detected"
)
else:
print(f" {row['symptom_key']}: No constraint violations")
print("\n" + "=" * 70 + "\n")
# Test the data generation with new schema
print("3. Testing data generation with PRAGMA schema:")
clean_generator = GenerateCleanData()
print(f" Schema source: PRAGMA-based get_schema()")
print(f" Sample data source: collate_sample_data()")
print("\n✅ All PRAGMA-based upgrades working correctly!")
print("\n📋 Summary of improvements:")
print(" ✅ get_schema() replaces collate_schema() with PRAGMA commands")
print(" ✅ Foreign key detection uses actual FK relationships")
print(" ✅ Primary key detection uses PRAGMA table_info")
print(" ✅ NOT NULL constraints from schema, not heuristics")
print(" ✅ Unique constraints from PRAGMA index_list")
print(" ✅ Context filtering available (tables, max_cols parameters)")
print(" ✅ Constraint checker validates corrupted data")
🎉 Testing complete PRAGMA-based schema upgrade...
1. Testing constraint checker:
Found 1 constraint violations:
- manufacturers.manufacturer_id: 2 non-integer values in INTEGER column
======================================================================
2. Testing updated ManifestBuilder with PRAGMA-based injections:
Applied 2 injections
Tables after corruption: ['manufacturers', 'parts']
null_in_not_null_columns: 2 constraint violations detected
add_noise: 2 constraint violations detected
======================================================================
3. Testing data generation with PRAGMA schema:
Schema source: PRAGMA-based get_schema()
Sample data source: collate_sample_data()
✅ All PRAGMA-based upgrades working correctly!
📋 Summary of improvements:
✅ get_schema() replaces collate_schema() with PRAGMA commands
✅ Foreign key detection uses actual FK relationships
✅ Primary key detection uses PRAGMA table_info
✅ NOT NULL constraints from schema, not heuristics
✅ Unique constraints from PRAGMA index_list
✅ Context filtering available (tables, max_cols parameters)
✅ Constraint checker validates corrupted data
Generate Synthetic Test Data with Caching
Now let's generate multiple synthetic datasets with known corruptions for testing our data quality agent. We'll use caching to avoid regenerating data unnecessarily.
# Cache handling functions
def save_datasets_to_cache(datasets: list, cache_file: str):
"""Save generated datasets to cache file"""
cache_path = Path(cache_file)
cache_path.parent.mkdir(exist_ok=True)
with open(cache_path, 'w') as f:
json.dump(datasets, f, indent=2, default=convert_numpy_types)
print(f"💾 Saved {len(datasets)} datasets to {cache_file}")
def load_datasets_from_cache(cache_file: str) -> list:
"""Load datasets from cache file"""
cache_path = Path(cache_file)
if not cache_path.exists():
print(f"❌ Cache file {cache_file} not found")
return None
with open(cache_path, 'r') as f:
datasets = json.load(f)
print(f"📂 Loaded {len(datasets)} datasets from {cache_file}")
return datasets
import asyncio
import json
from pathlib import Path
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
def generate_single_dataset_sync():
"""Synchronous function to generate one dataset"""
# Use faster model for clean data generation
# faster_lm = dspy.LM('openai/gpt-4.1', api_key=os.getenv('OPENAI_API_KEY'), temperature=1.0, max_tokens=5000, cache=False)
# with dspy.context(lm=faster_lm):
# Step 1: Generate clean data using existing generator
clean_generator = GenerateCleanData()
clean_response = clean_generator()
clean_data = clean_response.clean_synthetic_data
# Step 2: Convert to DataFrames
db_dict = convert_to_dataframes(clean_data)
# Step 3: Apply injections using ManifestBuilder
builder = ManifestBuilder(db_dict)
num_corruptions = random.randint(3, 7)
corrupted_db, manifest_df = builder.random_inject(k=num_corruptions)
# Convert back to JSON format
corrupted_data = convert_to_json(corrupted_db)
# Create result dictionary
result = {
'clean_data': clean_data,
'corrupted_data': corrupted_data,
'manifest': manifest_df.to_dict('records'),
'metadata': {
'num_corruptions': num_corruptions,
'timestamp': datetime.now().isoformat(),
'tables_affected': list(corrupted_data.keys())
}
}
return result
def generate_batch_datasets_concurrent(count: int = 2):
"""Generate multiple datasets concurrently using ThreadPoolExecutor"""
print(f"🚀 Generating {count} datasets concurrently...")
with ThreadPoolExecutor(max_workers=count) as executor:
# Submit all tasks to the thread pool
futures = [executor.submit(generate_single_dataset_sync) for _ in range(count)]
# Collect results as they complete
results = []
for future in futures:
results.append(future.result())
print(f"✅ Generated {count} datasets")
return results
# Configuration for synthetic data generation
# GENERATE_NEW_DATA = True # Set to True to regenerate data
CACHE_FILE = "tmp/synthetic_test_data_cache.json"
NUM_DATASETS = 10
print(f"🔧 Configuration:")
print(f" - Generate new data: {GENERATE_NEW_DATA}")
print(f" - Cache file: {CACHE_FILE}")
print(f" - Number of datasets: {NUM_DATASETS}")
# Ensure tmp directory exists
Path("tmp").mkdir(exist_ok=True)
🔧 Configuration:
- Generate new data: False
- Cache file: tmp/synthetic_test_data_cache.json
- Number of datasets: 10
# Main execution cell - Generate or load synthetic datasets
if GENERATE_NEW_DATA:
print("🔨 Generating new synthetic datasets...")
# Generate datasets in batches for efficiency
batch_size = 2
all_datasets = []
for i in range(0, NUM_DATASETS, batch_size):
remaining = min(batch_size, NUM_DATASETS - i)
print(f"\nBatch {i//batch_size + 1}: Generating {remaining} datasets...")
# Use ThreadPoolExecutor for concurrent generation
batch_results = generate_batch_datasets_concurrent(remaining)
all_datasets.extend(batch_results)
print(f"Progress: {len(all_datasets)}/{NUM_DATASETS} datasets completed")
# Save to cache
save_datasets_to_cache(all_datasets, CACHE_FILE)
synthetic_datasets = all_datasets
else:
print("📖 Loading synthetic datasets from cache...")
synthetic_datasets = load_datasets_from_cache(CACHE_FILE)
if synthetic_datasets is None:
print("⚠️ No cache found. Please set GENERATE_NEW_DATA=True to generate new data.")
synthetic_datasets = []
# Display summary
print(f"\n📊 Summary:")
print(f" - Total datasets available: {len(synthetic_datasets)}")
if synthetic_datasets:
print(f" - First dataset has {len(synthetic_datasets[0]['manifest'])} corruptions")
print(f" - Tables in first dataset: {synthetic_datasets[0]['metadata']['tables_affected']}")
📖 Loading synthetic datasets from cache...
📂 Loaded 10 datasets from tmp/synthetic_test_data_cache.json
📊 Summary:
- Total datasets available: 10
- First dataset has 4 corruptions
- Tables in first dataset: ['manufacturers', 'part_categories', 'parts', 'car_models', 'car_model_parts']
# Examine one dataset in detail
if synthetic_datasets:
print("🔍 Examining first synthetic dataset:")
first_dataset = synthetic_datasets[0]
print("\n--- Clean Data Tables ---")
for table, rows in first_dataset["clean_data"].items():
print(f"{table}: {len(rows)} rows")
print("\n--- Corrupted Data Tables ---")
for table, rows in first_dataset["corrupted_data"].items():
print(f"{table}: {len(rows)} rows")
print("\n--- Applied Corruptions ---")
for corruption in first_dataset["manifest"]:
print(f"\n{corruption['symptom_key']} (root cause: {corruption['root_cause']})")
evidence = json.loads(corruption["evidence"])
print(f"Evidence: {evidence}")
🔍 Examining first synthetic dataset:
--- Clean Data Tables ---
manufacturers: 3 rows
part_categories: 2 rows
parts: 5 rows
car_models: 4 rows
car_model_parts: 10 rows
--- Corrupted Data Tables ---
manufacturers: 3 rows
part_categories: 2 rows
parts: 5 rows
car_models: 4 rows
car_model_parts: 10 rows
--- Applied Corruptions ---
add_related_data (root cause: unexpected_data)
Evidence: {'kind': 'schema_diff', 'table': 'manufacturers', 'missing_cols': [], 'unexpected_cols': ['category_id']}
rename_columns (root cause: schema_drift)
Evidence: {'kind': 'schema_diff', 'table': 'manufacturers', 'missing_cols': ['name'], 'unexpected_cols': ['name_legacy']}
data_type_mismatches (root cause: type_mismatch)
Evidence: {'kind': 'type_mismatch', 'table': 'manufacturers', 'column': 'manufacturer_id', 'row_id': 5, 'expected_type': 'INTEGER', 'actual_type': 'TEXT'}
foreign_key_null_when_required (root cause: referential_integrity_violation)
Evidence: {'kind': 'ri_diff', 'child_tbl': 'car_model_parts', 'parent_tbl': 'parts', 'fk': 'part_id', 'orphan_row_ids': [4]}
Data Agent
# -----------------------------------------------------------------------------
# EVIDENCE-BASED DATA QUALITY AGENT WITH SPECIFIC DETAILS
# -----------------------------------------------------------------------------
import re, inspect
from typing import Literal
class AgentEvidence(BaseModel):
"""Evidence with structured table/column information for DSPy agent output"""
kind: str = Field(
description=f"The kind of evidence - should be one of: {[k.value for k in EvidenceKind]}"
)
table: str
columns: List[str] = Field(default_factory=list)
specific_details: str
row_ids: List[int] = Field(default_factory=list)
# DSPy Signature for Evidence-based Data Agent
class AnalyzeDataQuality(dspy.Signature):
"""
Analyze JSON data against any database schema to detect data quality issues.
CRITICAL REQUIREMENTS:
1. ALWAYS specify the exact table name and column names affected
2. NEVER use vague descriptions like "some fields missing"
3. Each issue MUST include:
- table: The specific table name (e.g., "users", "orders", "books", "employees")
- columns: List of specific column names affected (e.g., ["email", "order_id", "title", "salary"])
- specific_details: Precise description with table.column notation
Example GOOD evidence:
- table="users", columns=["email"], specific_details="users.email column is missing (NOT NULL constraint)"
- table="orders", columns=["order_id"], specific_details="orders.order_id has type mismatch: expected INTEGER but found 'A123'"
- table="books", columns=["title"], specific_details="books.title column is missing (required field)"
- table="employees", columns=["salary"], specific_details="employees.salary has type mismatch: expected FLOAT but found 'fifty thousand'"
- table="departments", columns=["manager_id"], specific_details="departments.manager_id violates foreign key constraint: references non-existent employee"
Example BAD evidence (too vague):
- "Some required fields are missing"
- "Data type issues found"
- "Foreign key problems detected"
Use the database tools systematically to:
1. List all tables in the database schema
2. For each table in JSON data, get its schema and check:
- Missing required columns
- Extra unexpected columns
- Type mismatches for each column
- Foreign key constraint violations
3. Check for missing tables from the schema
4. Check for unexpected tables in the JSON
Be exhaustive but precise. Every issue must specify table.column.
"""
json_data: dict = dspy.InputField(
desc="JSON data to analyze, organized by table name"
)
detected_evidence: List[AgentEvidence] = dspy.OutputField(
desc="List of AgentEvidence objects with specific table, columns, and details"
)
db_tools = [
get_schema,
get_table_schema,
get_foreign_key_relationships,
sample_table_data,
get_unique_values,
get_table_row_count,
get_table_columns,
execute_query,
check_referential_integrity,
validate_json_types_against_schema,
find_semantic_duplicates,
# validate_business_rules_with_llm,
]
# Create agent with the AgentEvidence class
data_quality_agent = dspy.ReAct(AnalyzeDataQuality, tools=db_tools)
print("✅ Created Data Quality Agent with AgentEvidence class")
# Test the agent
if synthetic_datasets:
print("\n🤖 Testing Data Agent...")
agent_response = data_quality_agent(json_data=first_dataset['corrupted_data'])
print(f"\n📊 Agent Results:")
print(f" Detected {len(agent_response.detected_evidence)} issues")
# Check specificity of evidence
specific_count = 0
for ev in agent_response.detected_evidence:
if ev.table and (ev.columns or "table" in ev.specific_details.lower()):
specific_count += 1
print(f" Specific evidence: {specific_count}/{len(agent_response.detected_evidence)}")
# Show first few evidence items
print("\n Sample evidence:")
for i, ev in enumerate(agent_response.detected_evidence[:3]):
print(f" {i+1}. Table: {ev.table}, Columns: {ev.columns}")
print(f" Details: {ev.specific_details}")
✅ Created Data Quality Agent with AgentEvidence class
🤖 Testing Data Agent...
📊 Agent Results:
Detected 7 issues
Specific evidence: 7/7
Sample evidence:
1. Table: manufacturers, Columns: ['name']
Details: manufacturers.name column is missing in all JSON records but is defined as TEXT NOT NULL in the schema
2. Table: manufacturers, Columns: ['name_legacy', 'category_id']
Details: manufacturers JSON records include unexpected columns manufacturers.name_legacy and manufacturers.category_id that are not present in the table schema
3. Table: manufacturers, Columns: ['manufacturer_id']
Details: manufacturers.manufacturer_id expects INTEGER but found string value "3A" in one JSON record
Evaluations
# -----------------------------------------------------------------------------
# INSTANCE-LEVEL EVALUATION WITH F1 SCORING (FIXED FOR BOTH TYPES)
# -----------------------------------------------------------------------------
def extract_instance_key(evidence_data: dict) -> str:
"""
Extract a unique key for an evidence instance that includes table/column info.
This allows matching specific issues rather than just kinds.
"""
kind = evidence_data.get('kind', 'unknown')
table = evidence_data.get('table', evidence_data.get('child_tbl', ''))
# Extract column information based on evidence type
if kind == 'schema_diff':
missing_cols = evidence_data.get('missing_cols', [])
unexpected_cols = evidence_data.get('unexpected_cols', [])
cols = missing_cols + unexpected_cols
col_str = ','.join(sorted(cols)) if cols else 'no_cols'
return f"{kind}:{table}:{col_str}"
elif kind == 'ri_diff':
fk = evidence_data.get('fk', '')
parent = evidence_data.get('parent_tbl', '')
return f"{kind}:{table}:{fk}:{parent}"
elif kind == 'missing_required':
fields = evidence_data.get('missing_fields', [])
field_str = ','.join(sorted(fields)) if fields else 'no_fields'
return f"{kind}:{table}:{field_str}"
elif kind == 'type_mismatch':
column = evidence_data.get('column', '')
return f"{kind}:{table}:{column}"
elif kind == 'unique_violation':
column = evidence_data.get('column', '')
return f"{kind}:{table}:{column}"
elif kind == 'primary_key_duplicate':
pk = evidence_data.get('duplicate_pk', '')
return f"{kind}:{table}:pk_{pk}"
else:
# For other kinds, use table if available
return f"{kind}:{table}" if table else kind
def analyze_by_evidence_type_detailed(examples: List, agent) -> Dict[str, Dict[str, int]]:
"""
Analyze agent performance by evidence type across multiple examples.
This function groups evidence by type and calculates detection rates for each type.
It's used in the comprehensive evaluation to understand which types of issues
the agent detects well vs poorly.
Args:
examples: List of DSPy examples with expected evidence
agent: The data quality agent to evaluate
Returns:
Dictionary mapping evidence types to performance metrics:
{
'schema_diff': {'found': 5, 'total': 8},
'type_mismatch': {'found': 2, 'total': 3},
...
}
"""
# Collect all expected evidence by type
expected_by_type = {}
for example in examples:
for evidence in example.detected_evidence:
evidence_kind = evidence.get('kind', 'unknown')
if evidence_kind not in expected_by_type:
expected_by_type[evidence_kind] = {'found': 0, 'total': 0}
expected_by_type[evidence_kind]['total'] += 1
# Run agent on each example and collect detected evidence by type
detected_by_type = {}
for example in examples:
try:
# Run the agent
prediction = agent(json_data=example.json_data)
# Build manifest from example's expected evidence for comparison
manifest = []
for i, evidence in enumerate(example.detected_evidence):
evidence_dict = {
'kind': evidence['kind'],
'table': evidence.get('table', f"unknown_table_{i}"),
}
# Add kind-specific details for matching
if evidence['kind'] == 'schema_diff':
evidence_dict.update({
'missing_cols': evidence.get('columns', []),
'unexpected_cols': evidence.get('columns', [])
})
elif evidence['kind'] == 'type_mismatch':
evidence_dict.update({
'column': evidence.get('columns', ['unknown'])[0] if evidence.get('columns', []) else 'unknown',
'expected_type': 'INTEGER',
'actual_type': 'TEXT'
})
elif evidence['kind'] == 'ri_diff':
evidence_dict.update({
'child_tbl': evidence_dict['table'],
'parent_tbl': f"{evidence_dict['table']}_parent",
'fk': f"{evidence_dict['table']}_id",
'orphan_row_ids': [1]
})
elif evidence['kind'] == 'missing_required':
evidence_dict.update({
'missing_fields': evidence.get('columns', ['unknown_field'])
})
details = evidence.get('specific_details', '')
if ': ' in details and details.count(': ') == 1:
symptom_key, root_cause = details.split(': ')
else:
symptom_key = f"{evidence['kind']}_issue"
root_cause = evidence['kind']
manifest.append({
'symptom_key': symptom_key,
'root_cause': root_cause,
'evidence': json.dumps(evidence_dict)
})
# Use instance-level evaluation to find matches
instance_metrics = instance_level_evaluation(manifest, prediction.detected_evidence)
# Count matches by evidence type
for matched_key in instance_metrics['matched_instances']:
evidence_type = matched_key.split(':')[0]
if evidence_type not in detected_by_type:
detected_by_type[evidence_type] = {'found': 0, 'total': 0}
detected_by_type[evidence_type]['found'] += 1
except Exception as e:
print(f"⚠️ Error analyzing example: {e}")
continue
# Combine results
all_types = set(expected_by_type.keys()) | set(detected_by_type.keys())
type_performance = {}
for evidence_type in all_types:
expected = expected_by_type.get(evidence_type, {'found': 0, 'total': 0})
detected = detected_by_type.get(evidence_type, {'found': 0, 'total': 0})
type_performance[evidence_type] = {
'found': detected['found'],
'total': expected['total']
}
return type_performance
def instance_level_evaluation(manifest: List[dict], detected_evidence: List, debug=False) -> dict:
"""
Instance-level evaluation comparing specific corruptions, not just kinds.
This prevents gaming where one vague statement gets perfect recall.
Handles both Evidence dicts and AgentEvidence Pydantic models.
"""
# Extract instance keys from manifest
manifest_instances = {}
for entry in manifest:
evidence_data = json.loads(entry['evidence'])
instance_key = extract_instance_key(evidence_data)
manifest_instances[instance_key] = {
'symptom_key': entry['symptom_key'],
'root_cause': entry['root_cause'],
'evidence': evidence_data
}
if debug:
print(f"\n🔍 DEBUG: Manifest instances:")
for key, value in manifest_instances.items():
print(f" {key} -> {value['symptom_key']}")
# Extract instance keys from detected evidence
detected_instances = {}
for i, ev in enumerate(detected_evidence):
# AgentEvidence ➜ dict
if hasattr(ev, "model_dump"): # AgentEvidence (pydantic)
ev_dict = ev.model_dump()
else: # in case a raw dict sneaks in
ev_dict = dict(ev)
evidence_dict = {
"kind" : ev_dict.get("kind"),
"table" : ev_dict.get("table", ""),
"columns" : ev_dict.get("columns", []),
"specific_details": ev_dict.get("specific_details",""),
}
# Extract table/column info from specific_details if not directly available
details = evidence_dict['specific_details'].lower()
# Try to parse table and column from details if not already set
if not evidence_dict['table'] and ' table ' in details:
# Extract table name from details
match = re.search(r'(\w+)\s+table', details)
if match:
evidence_dict['table'] = match.group(1)
if not evidence_dict['columns'] and ' column' in details:
# Extract column names from details
cols = re.findall(r'(\w+)\s+column', details)
if cols:
evidence_dict['missing_cols'] = cols
evidence_dict['unexpected_cols'] = cols
evidence_dict['column'] = cols[0] if len(cols) == 1 else None
# Add columns to evidence_dict for matching
if evidence_dict['columns']:
evidence_dict['missing_cols'] = evidence_dict['columns']
evidence_dict['unexpected_cols'] = evidence_dict['columns']
instance_key = extract_instance_key(evidence_dict)
detected_instances[instance_key] = evidence_dict
if debug:
print(f"\n🔍 DEBUG: Detected evidence {i}:")
print(f" Type: {type(ev)}")
if hasattr(ev, 'kind'): # Pydantic model
print(f" Raw evidence: kind={ev.kind}, table={getattr(ev, 'table', 'N/A')}, details={getattr(ev, 'specific_details', 'N/A')}")
else: # Dict
print(f" Raw evidence: {dict(ev)}")
print(f" Parsed evidence_dict: {evidence_dict}")
print(f" Instance key: {instance_key}")
if debug:
print(f"\n🔍 DEBUG: Detected instances:")
for key, value in detected_instances.items():
print(f" {key}")
# Calculate instance-level metrics
manifest_keys = set(manifest_instances.keys())
detected_keys = set(detected_instances.keys())
if debug:
print(f"\n🔍 DEBUG: Key comparison:")
print(f" Manifest keys: {manifest_keys}")
print(f" Detected keys: {detected_keys}")
print(f" Intersection: {manifest_keys & detected_keys}")
print(f" Manifest only: {manifest_keys - detected_keys}")
print(f" Detected only: {detected_keys - manifest_keys}")
true_positives = len(manifest_keys & detected_keys)
false_positives = len(detected_keys - manifest_keys)
false_negatives = len(manifest_keys - detected_keys)
precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
# Count difference penalty (normalized)
count_diff = abs(len(manifest) - len(detected_evidence))
max_count = max(len(manifest), len(detected_evidence))
count_penalty = 1.0 - (count_diff / max_count) if max_count > 0 else 1.0
metrics = {
'manifest_count': len(manifest),
'detected_count': len(detected_evidence),
'count_difference': count_diff,
'count_penalty': count_penalty,
'true_positives': true_positives,
'false_positives': false_positives,
'false_negatives': false_negatives,
'precision': precision,
'recall': recall,
'f1_score': f1_score,
'manifest_instances': manifest_instances,
'detected_instances': detected_instances,
'matched_instances': manifest_keys & detected_keys,
'missed_instances': manifest_keys - detected_keys,
'extra_instances': detected_keys - manifest_keys
}
return metrics
def print_instance_level_results(metrics: dict):
"""Pretty print instance-level evaluation results"""
print("📊 INSTANCE-LEVEL EVALUATION RESULTS")
print("=" * 50)
print(f"\n📈 Overall Counts:")
print(f" Manifest issues: {metrics['manifest_count']}")
print(f" Detected issues: {metrics['detected_count']}")
print(f" Count difference: {metrics['count_difference']}")
print(f" Count penalty: {metrics['count_penalty']:.2%}")
print(f"\n🎯 Instance-Level Metrics:")
print(f" True Positives: {metrics['true_positives']}")
print(f" False Positives: {metrics['false_positives']}")
print(f" False Negatives: {metrics['false_negatives']}")
print(f" Precision: {metrics['precision']:.2%}")
print(f" Recall: {metrics['recall']:.2%}")
print(f" F1 Score: {metrics['f1_score']:.2%}")
if metrics['missed_instances']:
print(f"\n❌ Missed Instances:")
for key in list(metrics['missed_instances'])[:3]:
manifest_item = metrics['manifest_instances'][key]
print(f" - {key}")
print(f" Symptom: {manifest_item['symptom_key']}")
if metrics['extra_instances']:
print(f"\n⚠️ Extra Detections (False Positives):")
for key in list(metrics['extra_instances'])[:3]:
print(f" - {key}")
print("✅ Instance-level evaluation updated to handle both Evidence types")
print("✅ Added missing analyze_by_evidence_type_detailed function")
✅ Instance-level evaluation updated to handle both Evidence types
✅ Added missing analyze_by_evidence_type_detailed function
# -----------------------------------------------------------------------------
# LLM-AS-JUDGE WITH SPECIFIC DETAIL REQUIREMENTS
# -----------------------------------------------------------------------------
class EvidenceMatch(dspy.Signature):
"""
Qualitative judge that requires specific table/column details.
INPUTS
------
manifest_entry
One injected corruption with symptom_key, root_cause, and evidence JSON.
all_detected_evidence
Numbered list of ALL evidence items from the agent.
STRICT REQUIREMENTS
-------------------
1. Each evidence item MUST specify:
- The exact table name affected
- The specific column(s) involved (if applicable)
- The concrete issue (e.g., "manufacturer_id column missing", not "some fields missing")
2. VAGUE evidence should be penalized:
- "Some fields are missing" → MISS (too vague)
- "Missing field in clients table" → PARTIAL (table specified but not column)
- "Missing 'description' column in return_orders table" → COMPLETE (specific)
3. Match rating criteria:
- COMPLETE: Corruption identified with specific table AND column/detail
- PARTIAL: Corruption mentioned but lacks specific table OR column info
- MISS: No evidence relates to this corruption OR evidence is too vague
4. Evidence specificity check:
For each matched evidence line, verify it contains:
- Table name (e.g., "clients", "orders")
- Column name or specific detail (e.g., "description", "manufacturer_id")
Return matched_evidence_indices and a specificity_score (0-1) indicating
how specific the evidence is.
"""
manifest_entry = dspy.InputField(
desc="One corruption from manifest with specific table/column details"
)
all_detected_evidence = dspy.InputField(
desc="All evidence items from agent (should contain table/column specifics)"
)
match_rating: str = dspy.OutputField(
desc="COMPLETE (specific), PARTIAL (somewhat specific), or MISS (vague/none)"
)
matched_evidence_indices: List[int] = dspy.OutputField(
desc="Indices of evidence lines matching this corruption"
)
specificity_score: float = dspy.OutputField(
desc="0-1 score for how specific the evidence is (1.0 = table + column specified)"
)
reasoning: str = dspy.OutputField(
desc="Explanation focusing on specificity of table/column details"
)
class LLMJudge(dspy.Module):
"""Judge that penalizes vague evidence and rewards specificity."""
def __init__(self):
self.match_evaluator = dspy.Predict(EvidenceMatch)
def forward(self, manifest, detected_evidence):
results = {
"matches": [],
"summary": {
"complete": 0,
"partial": 0,
"miss": 0,
"false_positives": 0,
"avg_specificity": 0.0
},
}
matched_indices = set()
total_specificity = 0.0
for entry in manifest:
manifest_str = self._format_manifest_with_details(entry)
detected_str = self._format_detected_with_validation(detected_evidence)
match = self.match_evaluator(
manifest_entry=manifest_str,
all_detected_evidence=detected_str,
)
results["matches"].append({
"symptom_key": entry["symptom_key"],
"root_cause": entry["root_cause"],
"rating": match.match_rating,
"matched_indices": match.matched_evidence_indices,
"specificity_score": match.specificity_score,
"reasoning": match.reasoning,
})
results["summary"][match.match_rating.lower()] += 1
total_specificity += match.specificity_score
matched_indices.update(match.matched_evidence_indices)
# Calculate metrics
results["summary"]["false_positives"] = len(detected_evidence) - len(matched_indices)
results["summary"]["avg_specificity"] = (
total_specificity / len(manifest) if manifest else 0.0
)
# Enhanced scoring with specificity penalty
total_manifest = len(manifest)
if total_manifest == 0:
results["score"] = 0.0
return results
def weight(rating, specificity):
base = {"COMPLETE": 1.0, "PARTIAL": 0.5, "MISS": 0.0}.get(rating, 0.0)
# Apply specificity penalty - vague evidence gets reduced score
return base * specificity
score_sum = 0.0
for m in results["matches"]:
base_score = weight(m["rating"], m["specificity_score"])
# Duplication penalty
dup_penalty = 1 / max(1, len(m["matched_indices"]))
score_sum += base_score * dup_penalty
results["score"] = score_sum / total_manifest
return results
def _format_manifest_with_details(self, entry):
"""Format manifest entry emphasizing specific details."""
ev = json.loads(entry["evidence"])
# Extract specific details from evidence
details = []
if "table" in ev:
details.append(f"Table: {ev['table']}")
if "missing_cols" in ev:
details.append(f"Missing columns: {ev['missing_cols']}")
if "unexpected_cols" in ev:
details.append(f"Unexpected columns: {ev['unexpected_cols']}")
if "column" in ev:
details.append(f"Column: {ev['column']}")
if "from_column" in ev:
details.append(f"FK column: {ev['from_column']}")
return (
f"Corruption: {entry['symptom_key']} (root: {entry['root_cause']})\n"
f"Specific details required:\n"
+ "\n".join(f" - {d}" for d in details)
+ f"\nFull evidence: {json.dumps(ev, indent=2)}"
)
# WTF is THIS? THERE SHOULD BE NO REFERENCE TO SPECIFIC TABLES OR COLUMNS - this needs to be generic
def _format_detected_with_validation(self, detected_evidence):
"""
Render each AgentEvidence line and tag it as
✓ SPECIFIC – table AND at least one column provided
⚠ PARTIAL – table OR column provided, but not both
✗ TOO VAGUE – neither table nor column provided
No domain-specific table names are referenced.
"""
lines = []
for i, ev in enumerate(detected_evidence):
# AgentEvidence is a pydantic model – fall back to dict if needed
ev_dict = ev.model_dump() if hasattr(ev, "dict") else dict(ev)
has_table = bool(ev_dict.get("table"))
has_col = bool(ev_dict.get("columns"))
if not has_col:
# Try to spot explicit column mentions in free-text details
has_col = bool(re.search(r"\b\w+\.\w+\b", ev_dict.get("specific_details", "")))
tag = "✓ SPECIFIC" if has_table and has_col else (
"⚠ PARTIAL" if has_table or has_col else
"✗ TOO VAGUE")
lines.append(f"{i}. [{tag}] {ev_dict.get('specific_details','').strip()}")
return "\n".join(lines)
# Test the judge with the first dataset
print("✅ Created LLMJudge with specificity requirements")
if synthetic_datasets and 'agent_response' in locals():
print("\n🧑⚖️ Testing LLM Judge...")
# Use a faster model for judging
judge_lm = dspy.LM('openai/gpt-4.1', api_key=os.getenv('OPENAI_API_KEY'), temperature=0.3, max_tokens=1000)
with dspy.context(lm=judge_lm):
judge = LLMJudge()
judge_output = judge(
manifest=first_dataset['manifest'],
detected_evidence=agent_response.detected_evidence
)
print(f"\n📊 Judge Results:")
print(f" Average Specificity: {judge_output['summary']['avg_specificity']:.2f}")
print(f" Score: {judge_output['score']:.2f}")
✅ Created LLMJudge with specificity requirements
🧑⚖️ Testing LLM Judge...
📊 Judge Results:
Average Specificity: 1.00
Score: 0.88
DSPy Optimizer
Now we'll create a DSPy optimizer that uses our 10 synthetic datasets to optimize the data quality agent's prompts and reasoning.
Example Preparations
# -----------------------------------------------------------------------------
# STEP 1: CREATE DSPY EXAMPLES FROM SYNTHETIC DATASETS (CONSOLIDATED)
# -----------------------------------------------------------------------------
def create_dspy_examples(synthetic_datasets: list) -> list:
"""Convert synthetic datasets into DSPy Example objects for training"""
examples = []
for i, dataset in enumerate(synthetic_datasets):
# Create Evidence objects from manifest entries
evidence_objects = []
for entry in dataset['manifest']:
evidence_data = json.loads(entry['evidence'])
# Extract kind from evidence data - now it's a string, not enum
kind_str = evidence_data.get('kind', 'unknown')
col_list = []
# schema_diff
col_list += evidence_data.get("missing_cols", [])
col_list += evidence_data.get("unexpected_cols", [])
# missing_required
col_list += evidence_data.get("missing_fields", [])
# type_mismatch / unique_violation etc.
if evidence_data.get("column"):
col_list.append(evidence_data["column"])
# remove duplicates while preserving order
seen = set()
columns = [c for c in col_list if not (c in seen or seen.add(c))]
# Create Evidence object with all required fields
evidence_obj = Evidence(
kind=kind_str, # Use string directly, not enum
table=evidence_data.get('table', evidence_data.get('child_tbl', 'unknown_table')),
columns=columns,
specific_details=f"{entry['symptom_key']}: {entry['root_cause']}",
row_ids=evidence_data.get('orphan_row_ids', [])
)
evidence_objects.append(evidence_obj)
# Create the example with inputs and expected outputs
example = dspy.Example(
json_data=dataset['corrupted_data'],
detected_evidence=evidence_objects
).with_inputs('json_data')
examples.append(example)
return examples
# Create examples from our synthetic datasets
dspy_examples = create_dspy_examples(synthetic_datasets)
print(f"✅ Created {len(dspy_examples)} DSPy examples from synthetic datasets")
# Show structure of first example
print("\n📋 First example structure:")
print(f" - Input tables: {list(dspy_examples[0].json_data.keys())}")
print(f" - Expected evidence count: {len(dspy_examples[0].detected_evidence)}")
# Now both training examples and agent output use the same Evidence class
print(f" - Evidence kinds: {[e['kind'] for e in dspy_examples[0].detected_evidence]}")
# Show first evidence example structure
if dspy_examples[0].detected_evidence:
first_evidence = dspy_examples[0].detected_evidence[0]
print(f"\n📋 First evidence object:")
print(f" - Type: {type(first_evidence).__name__}")
print(f" - Table: {first_evidence['table']}")
print(f" - Columns: {first_evidence['columns']}")
print(f" - Details: {first_evidence['specific_details']}")
✅ Created 10 DSPy examples from synthetic datasets
📋 First example structure:
- Input tables: ['manufacturers', 'part_categories', 'parts', 'car_models', 'car_model_parts']
- Expected evidence count: 4
- Evidence kinds: ['schema_diff', 'schema_diff', 'type_mismatch', 'ri_diff']
📋 First evidence object:
- Type: Evidence
- Table: manufacturers
- Columns: ['category_id']
- Details: add_related_data: unexpected_data
Evaluation Definitions
# -----------------------------------------------------------------------------
# IMPROVED EVALUATION FUNCTION WITH PROPER MANIFEST RECONSTRUCTION (FIXED)
# -----------------------------------------------------------------------------
def dspy_evaluation_metric(example, prediction, trace=None):
"""
Improved evaluation metric that properly reconstructs manifest from original data.
Returns a score between 0 and 1:
- 0.2 weight on instance-level F1 score
- 0.6 weight on qualitative LLM judge score with specificity
- 0.1 weight on count penalty (penalizes detecting too few/many issues)
- 0.1 weight on specificity bonus
"""
# Improved manifest reconstruction preserving actual evidence details
manifest = []
for i, evidence in enumerate(example.detected_evidence):
# Fixed: Evidence is a dict, access via dictionary keys
if 'table' in evidence:
table = evidence['table']
else:
table = f"unknown_table_{i}"
# Create evidence dict with proper structure
evidence_dict = {
'kind': evidence['kind'], # Now it's already a string
'table': table,
}
# Add kind-specific details
if evidence['kind'] == 'schema_diff':
evidence_dict.update({
'missing_cols': evidence.get('columns', []),
'unexpected_cols': evidence.get('columns', [])
})
elif evidence['kind'] == 'type_mismatch':
evidence_dict.update({
'column': evidence.get('columns', ['unknown'])[0] if evidence.get('columns', []) else 'unknown',
'expected_type': 'INTEGER',
'actual_type': 'TEXT'
})
elif evidence['kind'] == 'ri_diff':
evidence_dict.update({
'child_tbl': table,
'parent_tbl': f"{table}_parent",
'fk': f"{table}_id",
'orphan_row_ids': [1]
})
elif evidence['kind'] == 'missing_required':
evidence_dict.update({
'missing_fields': evidence.get('columns', ['unknown_field'])
})
# Extract symptom and root cause from specific_details
details = evidence.get('specific_details', '')
if ': ' in details and details.count(': ') == 1:
symptom_key, root_cause = details.split(': ')
else:
symptom_key = f"{evidence['kind']}_issue"
root_cause = evidence['kind']
manifest.append({
'symptom_key': symptom_key,
'root_cause': root_cause,
'evidence': json.dumps(evidence_dict)
})
# Get predicted evidence
predicted_evidence = prediction.detected_evidence
# 1. Instance-level evaluation with proper evidence reconstruction
instance_metrics = instance_level_evaluation(manifest, predicted_evidence)
f1_score = instance_metrics['f1_score']
count_penalty = instance_metrics['count_penalty']
# 2. Qualitative evaluation with improved LLM judge
try:
judge_lm = dspy.LM('openai/gpt-4.1', api_key=os.getenv('OPENAI_API_KEY'), temperature=0.7, max_tokens=5000)
with dspy.context(lm=judge_lm):
# Use ImprovedLLMJudge if available
try:
llm_judge = LLMJudge()
except:
# Fallback to basic evaluation
qual_score = f1_score
specificity_score = 0.5
return 0.4 * f1_score + 0.3 * qual_score + 0.2 * count_penalty + 0.1 * specificity_score
judge_output = llm_judge(
manifest=manifest,
detected_evidence=predicted_evidence,
)
qual_score = judge_output['score']
specificity_score = judge_output.get('summary', {}).get('avg_specificity', 0.8)
except Exception as e:
print(f"⚠️ LLM judge failed: {e}, using F1 score only")
qual_score = f1_score
specificity_score = 0.5
# Combine scores with weights
final_score = (
0.2 * f1_score + # Instance-level accuracy (most important)
0.6 * qual_score + # Qualitative assessment
0.1 * count_penalty + # Penalize wrong counts
0.1 * specificity_score # Reward specific details
)
return final_score
def evaluate_with_improved_metric(agent, examples, verbose=False):
"""Evaluate agent using the improved metric with proper manifest reconstruction"""
scores = []
details = []
for i, example in enumerate(examples):
try:
# Run agent
prediction = agent(json_data=example.json_data)
# Calculate score using improved metric
score = dspy_evaluation_metric(example, prediction)
scores.append(score)
# Get detailed metrics for analysis with proper manifest reconstruction
manifest = []
expected_evidence = example.detected_evidence
for j, evidence in enumerate(expected_evidence):
# evidence is an Evidence (dict subclass) -----------------
kind_str = evidence["kind"] # Now it's already a string
manifest_item = {
"symptom_key": f"{kind_str}_{j}",
"root_cause" : kind_str,
"evidence" : json.dumps(evidence, default=str),
}
manifest.append(manifest_item)
instance_metrics = instance_level_evaluation(manifest, prediction.detected_evidence)
details.append({
'f1': instance_metrics['f1_score'],
'precision': instance_metrics['precision'],
'recall': instance_metrics['recall'],
'count_penalty': instance_metrics['count_penalty']
})
if verbose:
print(f"Example {i+1}: Score = {score:.3f} (F1={instance_metrics['f1_score']:.2f}, Count={instance_metrics['count_penalty']:.2f})")
except Exception as e:
print(f"❌ Error on example {i+1}: {e}")
scores.append(0.0)
details.append({'f1': 0, 'precision': 0, 'recall': 0, 'count_penalty': 0})
avg_score = sum(scores) / len(scores) if scores else 0.0
# Calculate average metrics
avg_details = {
'avg_f1': sum(d['f1'] for d in details) / len(details) if details else 0,
'avg_precision': sum(d['precision'] for d in details) / len(details) if details else 0,
'avg_recall': sum(d['recall'] for d in details) / len(details) if details else 0,
'avg_count_penalty': sum(d['count_penalty'] for d in details) / len(details) if details else 0
}
return avg_score, scores, avg_details
# Test the improved evaluation metric with proper manifest reconstruction
print("🧪 Testing Improved Evaluation Metric with Proper Manifest Reconstruction...")
# Test on first synthetic dataset if available
if synthetic_datasets and 'data_quality_agent' in locals():
# Use first dataset to test the improved evaluation
test_example = dspy_examples[0]
print("\n📈 Testing improved metric:")
try:
test_prediction = data_quality_agent(json_data=test_example.json_data)
test_score = dspy_evaluation_metric(test_example, test_prediction)
print(f" Test score: {test_score:.3f}")
# Show manifest reconstruction details
reconstructed_manifest = []
for i, evidence in enumerate(test_example.detected_evidence):
# Fixed: Evidence is a dict, access via dictionary keys
table = evidence.get('table', f"unknown_table_{i}")
evidence_dict = {'kind': evidence['kind'], 'table': table}
details_str = evidence.get('specific_details', '')
if ': ' in details_str and details_str.count(': ') == 1:
symptom_key, root_cause = details_str.split(': ')
else:
symptom_key = f"{evidence['kind']}_issue"
root_cause = evidence['kind']
reconstructed_manifest.append({
'symptom_key': symptom_key,
'root_cause': root_cause
})
print(f" Reconstructed manifest items: {len(reconstructed_manifest)}")
print(f" Detected evidence items: {len(test_prediction.detected_evidence)}")
# Test instance-level evaluation
instance_metrics = instance_level_evaluation(
[{'symptom_key': m['symptom_key'], 'root_cause': m['root_cause'], 'evidence': '{"kind":"test"}'}
for m in reconstructed_manifest],
test_prediction.detected_evidence
)
print(f" Instance-level F1: {instance_metrics['f1_score']:.3f}")
print(f" Count penalty: {instance_metrics['count_penalty']:.3f}")
except Exception as e:
print(f"❌ Test failed: {e}")
print("\n✅ Improved evaluation metric with manifest reconstruction implemented!")
🧪 Testing Improved Evaluation Metric with Proper Manifest Reconstruction...
📈 Testing improved metric:
Test score: 0.682
Reconstructed manifest items: 4
Detected evidence items: 7
Instance-level F1: 0.000
Count penalty: 0.571
✅ Improved evaluation metric with manifest reconstruction implemented!
Optimize
Initially I had tried to use bootstrapped demos; the problem I eventually realized was that the demos I had currated were rooted on database schema, so having a model generate bootstrapped examples when they didn't have access to the underlying schema seemed to defeat the purpose
RUN_BSFS_OPTIMIZER = False
# -----------------------------------------------------------------------------
# STEP 3: DSPY OPTIMIZER IMPLEMENTATION WITH FIXED FUNCTION CALLS
# -----------------------------------------------------------------------------
if RUN_BSFS_OPTIMIZER:
# Split examples into train and validation sets
train_examples = dspy_examples[:7] # 70% for training
val_examples = dspy_examples[7:] # 30% for validation
print(f"📊 Dataset split:")
print(f" - Training examples: {len(train_examples)}")
print(f" - Validation examples: {len(val_examples)}")
# Baseline evaluation using the correct function
print("\n📈 Baseline Performance (before optimization):")
baseline_score, baseline_scores, baseline_details = evaluate_with_improved_metric(data_quality_agent, val_examples, verbose=True)
print(f"\n📊 Baseline average score: {baseline_score:.3f}")
print(f" - Avg F1: {baseline_details['avg_f1']:.3f}")
print(f" - Avg Precision: {baseline_details['avg_precision']:.3f}")
print(f" - Avg Recall: {baseline_details['avg_recall']:.3f}")
# Configure optimizer
from dspy.teleprompt import BootstrapFewShot
# Create optimizer with our evaluation metric
optimizer = BootstrapFewShot(
metric=dspy_evaluation_metric,
max_bootstrapped_demos=0, # Do not use bootstrapped demos!! This is important - too much work has been done to get the exact right examples based on schema
# max_bootstrapped_demos=5, # Do not use bootstrapped demos!! This is important - too much work has been done to get the exact right examples based on schema
max_labeled_demos=5, # Max examples to try
max_rounds=2, # Number of optimization rounds
max_errors=5, # Max errors before stopping
)
# Compile the optimized agent
print("\n🔧 Optimizing data quality agent...")
print("This may take a few minutes...")
optimized_agent = optimizer.compile(
data_quality_agent,
trainset=train_examples
)
print("✅ Optimization complete!")
# Evaluate optimized agent using the correct function
print("\n📈 Optimized Performance:")
optimized_score, optimized_scores, optimized_details = evaluate_with_improved_metric(optimized_agent, val_examples, verbose=True)
print(f"\n📊 Optimized average score: {optimized_score:.3f}")
print(f" - Avg F1: {optimized_details['avg_f1']:.3f}")
print(f" - Avg Precision: {optimized_details['avg_precision']:.3f}")
print(f" - Avg Recall: {optimized_details['avg_recall']:.3f}")
print(f"\n🎯 Improvement: {(optimized_score - baseline_score):.3f} ({((optimized_score/baseline_score - 1)*100):.1f}%)")
# Compare detailed metrics
print(f"\n📈 Detailed Performance Comparison:")
print(f" F1 Score: {baseline_details['avg_f1']:.3f} → {optimized_details['avg_f1']:.3f} ({(optimized_details['avg_f1'] - baseline_details['avg_f1']):+.3f})")
print(f" Precision: {baseline_details['avg_precision']:.3f} → {optimized_details['avg_precision']:.3f} ({(optimized_details['avg_precision'] - baseline_details['avg_precision']):+.3f})")
print(f" Recall: {baseline_details['avg_recall']:.3f} → {optimized_details['avg_recall']:.3f} ({(optimized_details['avg_recall'] - baseline_details['avg_recall']):+.3f})")
print("\n✅ DSPy optimization evaluation complete!")
RUN_LFS_OPTIMIZER = True
if RUN_LFS_OPTIMIZER:
# -----------------------------------------------------------------------------
# STEP 3: DSPY OPTIMIZER IMPLEMENTATION WITH FIXED FUNCTION CALLS
# -----------------------------------------------------------------------------
# Split examples into train and validation sets
train_examples = dspy_examples[:7] # 70% for training
val_examples = dspy_examples[7:] # 30% for validation
print(f"📊 Dataset split:")
print(f" - Training examples: {len(train_examples)}")
print(f" - Validation examples: {len(val_examples)}")
# Baseline evaluation using the correct function
print("\n📈 Baseline Performance (before optimization):")
baseline_score, baseline_scores, baseline_details = evaluate_with_improved_metric(
data_quality_agent, val_examples, verbose=True
)
print(f"\n📊 Baseline average score: {baseline_score:.3f}")
print(f" - Avg F1: {baseline_details['avg_f1']:.3f}")
print(f" - Avg Precision: {baseline_details['avg_precision']:.3f}")
print(f" - Avg Recall: {baseline_details['avg_recall']:.3f}")
# Configure optimizer
from dspy.teleprompt import LabeledFewShot
# Create optimizer with our evaluation metric
optimizer = LabeledFewShot(5)
# Compile the optimized agent
print("\n🔧 Optimizing data quality agent...")
print("This may take a few minutes...")
optimized_agent = optimizer.compile(data_quality_agent, trainset=train_examples)
# Evaluate optimized agent using the correct function
print("\n📈 Optimized Performance:")
optimized_score, optimized_scores, optimized_details = (
evaluate_with_improved_metric(optimized_agent, val_examples, verbose=True)
)
print(f"\n📊 Optimized average score: {optimized_score:.3f}")
print(f" - Avg F1: {optimized_details['avg_f1']:.3f}")
print(f" - Avg Precision: {optimized_details['avg_precision']:.3f}")
print(f" - Avg Recall: {optimized_details['avg_recall']:.3f}")
print(
f"\n🎯 Improvement: {(optimized_score - baseline_score):.3f} ({((optimized_score/baseline_score - 1)*100):.1f}%)"
)
# Compare detailed metrics
print(f"\n📈 Detailed Performance Comparison:")
print(
f" F1 Score: {baseline_details['avg_f1']:.3f} → {optimized_details['avg_f1']:.3f} ({(optimized_details['avg_f1'] - baseline_details['avg_f1']):+.3f})"
)
print(
f" Precision: {baseline_details['avg_precision']:.3f} → {optimized_details['avg_precision']:.3f} ({(optimized_details['avg_precision'] - baseline_details['avg_precision']):+.3f})"
)
print(
f" Recall: {baseline_details['avg_recall']:.3f} → {optimized_details['avg_recall']:.3f} ({(optimized_details['avg_recall'] - baseline_details['avg_recall']):+.3f})"
)
print("\n✅ DSPy optimization evaluation complete!")
📊 Dataset split:
- Training examples: 7
- Validation examples: 3
📈 Baseline Performance (before optimization):
Example 1: Score = 0.355 (F1=0.22, Count=0.80)
Example 2: Score = 0.710 (F1=0.50, Count=0.60)
Example 3: Score = 0.694 (F1=0.44, Count=0.50)
📊 Baseline average score: 0.586
- Avg F1: 0.389
- Avg Precision: 0.311
- Avg Recall: 0.528
🔧 Optimizing data quality agent...
This may take a few minutes...
📈 Optimized Performance:
Example 1: Score = 0.432 (F1=0.40, Count=0.67)
Example 2: Score = 0.694 (F1=0.44, Count=0.50)
Example 3: Score = 0.694 (F1=0.44, Count=0.50)
📊 Optimized average score: 0.607
- Avg F1: 0.430
- Avg Precision: 0.333
- Avg Recall: 0.611
🎯 Improvement: 0.020 (3.5%)
📈 Detailed Performance Comparison:
F1 Score: 0.389 → 0.430 (+0.041)
Precision: 0.311 → 0.333 (+0.022)
Recall: 0.528 → 0.611 (+0.083)
✅ DSPy optimization evaluation complete!
def run_dspy_optimization(data_quality_agent, dspy_examples, run_optimizer=True, train_split=0.7):
"""
Run DSPy optimization evaluation on a data quality agent.
Args:
data_quality_agent: The DSPy agent to evaluate and optimize
dspy_examples: List of examples for training and validation
run_optimizer: Whether to run the optimizer (default: True)
train_split: Fraction of examples to use for training (default: 0.7)
Returns:
dict: Results containing baseline and optimized performance metrics
"""
# Split examples into train and validation sets
split_idx = int(len(dspy_examples) * train_split)
train_examples = dspy_examples[:split_idx]
val_examples = dspy_examples[split_idx:]
print(f"📊 Dataset split:")
print(f" - Training examples: {len(train_examples)}")
print(f" - Validation examples: {len(val_examples)}")
# Baseline evaluation using the correct function
print("\n📈 Baseline Performance (before optimization):")
baseline_score, baseline_scores, baseline_details = evaluate_with_improved_metric(
data_quality_agent, val_examples, verbose=True
)
print(f"\n📊 Baseline average score: {baseline_score:.3f}")
print(f" - Avg F1: {baseline_details['avg_f1']:.3f}")
print(f" - Avg Precision: {baseline_details['avg_precision']:.3f}")
print(f" - Avg Recall: {baseline_details['avg_recall']:.3f}")
results = {
'baseline_score': baseline_score,
'baseline_scores': baseline_scores,
'baseline_details': baseline_details,
'optimized_score': None,
'optimized_scores': None,
'optimized_details': None,
'improvement': None,
'improvement_percentage': None
}
if run_optimizer:
# Configure optimizer
from dspy.teleprompt import LabeledFewShot
# Create optimizer with our evaluation metric
optimizer = LabeledFewShot(5)
# Compile the optimized agent
print("\n🔧 Optimizing data quality agent...")
print("This may take a few minutes...")
optimized_agent = optimizer.compile(data_quality_agent, trainset=train_examples)
# Evaluate optimized agent using the correct function
print("\n📈 Optimized Performance:")
optimized_score, optimized_scores, optimized_details = (
evaluate_with_improved_metric(optimized_agent, val_examples, verbose=True)
)
print(f"\n📊 Optimized average score: {optimized_score:.3f}")
print(f" - Avg F1: {optimized_details['avg_f1']:.3f}")
print(f" - Avg Precision: {optimized_details['avg_precision']:.3f}")
print(f" - Avg Recall: {optimized_details['avg_recall']:.3f}")
improvement = optimized_score - baseline_score
improvement_percentage = (optimized_score/baseline_score - 1) * 100
print(f"\n🎯 Improvement: {improvement:.3f} ({improvement_percentage:.1f}%)")
# Compare detailed metrics
print(f"\n📈 Detailed Performance Comparison:")
print(
f" F1 Score: {baseline_details['avg_f1']:.3f} → {optimized_details['avg_f1']:.3f} ({(optimized_details['avg_f1'] - baseline_details['avg_f1']):+.3f})"
)
print(
f" Precision: {baseline_details['avg_precision']:.3f} → {optimized_details['avg_precision']:.3f} ({(optimized_details['avg_precision'] - baseline_details['avg_precision']):+.3f})"
)
print(
f" Recall: {baseline_details['avg_recall']:.3f} → {optimized_details['avg_recall']:.3f} ({(optimized_details['avg_recall'] - baseline_details['avg_recall']):+.3f})"
)
print("\n✅ DSPy optimization evaluation complete!")
# Update results
results.update({
'optimized_score': optimized_score,
'optimized_scores': optimized_scores,
'optimized_details': optimized_details,
'improvement': improvement,
'improvement_percentage': improvement_percentage,
'optimized_agent': optimized_agent
})
return results
lm = dspy.LM("openai/gpt-4.1")
with dspy.context(lm=lm, cache=False):
# Run the optimization evaluation
optimization_results = run_dspy_optimization(
data_quality_agent, dspy_examples, run_optimizer=RUN_LFS_OPTIMIZER, train_split=0.7
)
📊 Dataset split:
- Training examples: 7
- Validation examples: 3
📈 Baseline Performance (before optimization):
Example 1: Score = 0.657 (F1=0.57, Count=0.75)
Example 2: Score = 0.867 (F1=0.67, Count=1.00)
Example 3: Score = 0.442 (F1=0.29, Count=0.75)
📊 Baseline average score: 0.655
- Avg F1: 0.508
- Avg Precision: 0.528
- Avg Recall: 0.500
🔧 Optimizing data quality agent...
This may take a few minutes...
📈 Optimized Performance:
Example 1: Score = 0.475 (F1=0.50, Count=1.00)
Example 2: Score = 0.693 (F1=0.40, Count=0.67)
Example 3: Score = 0.732 (F1=0.57, Count=0.75)
📊 Optimized average score: 0.633
- Avg F1: 0.490
- Avg Precision: 0.500
- Avg Recall: 0.500
🎯 Improvement: -0.022 (-3.3%)
📈 Detailed Performance Comparison:
F1 Score: 0.508 → 0.490 (-0.017)
Precision: 0.528 → 0.500 (-0.028)
Recall: 0.500 → 0.500 (+0.000)
✅ DSPy optimization evaluation complete!
lm = dspy.LM("anthropic/claude-sonnet-4-20250514", cache=False)
with dspy.context(lm=lm, cache=False):
# Run the optimization evaluation
optimization_results = run_dspy_optimization(
data_quality_agent,
dspy_examples,
run_optimizer=RUN_LFS_OPTIMIZER,
train_split=0.7,
)
📊 Dataset split:
- Training examples: 7
- Validation examples: 3
📈 Baseline Performance (before optimization):
Example 1: Score = 0.649 (F1=0.44, Count=0.80)
Example 2: Score = 0.810 (F1=0.50, Count=0.60)
❌ Error on example 3: argument of type 'int' is not iterable
📊 Baseline average score: 0.486
- Avg F1: 0.315
- Avg Precision: 0.267
- Avg Recall: 0.389
🔧 Optimizing data quality agent...
This may take a few minutes...
📈 Optimized Performance:
Example 1: Score = 0.619 (F1=0.44, Count=0.80)
Example 2: Score = 0.889 (F1=0.57, Count=0.75)
❌ Error on example 3: argument of type 'int' is not iterable
📊 Optimized average score: 0.503
- Avg F1: 0.339
- Avg Precision: 0.300
- Avg Recall: 0.389
🎯 Improvement: 0.016 (3.3%)
📈 Detailed Performance Comparison:
F1 Score: 0.315 → 0.339 (+0.024)
Precision: 0.267 → 0.300 (+0.033)
Recall: 0.389 → 0.389 (+0.000)
✅ DSPy optimization evaluation complete!
Prompt Optimization
# RUN_MIPRO = False # Very expensive
RUN_MIPRO = False # Very expensive
# Alternative solution: Use a different model for optimization that supports temperature
# Configure optimizer with a different model
from dspy.teleprompt import MIPROv2
# Temporarily switch to a model that supports temperature variations for optimization
optimization_lm = dspy.LM('openai/gpt-4.1-mini', api_key=openai_api_key, temperature=0.7, max_tokens=5000)
# NOTE: If we are using an O-series model, we need to disable litellm params
import litellm
# disable litellm params
# litellm.drop_params = True
if RUN_MIPRO:
# Compile the optimized agent
print("\n🔧 Optimizing data quality agent with GPT-4...")
print("This may take a few minutes...")
with dspy.context(lm=optimization_lm):
optimizer = MIPROv2(
metric=dspy_evaluation_metric, # Fixed function name
auto="medium",
max_bootstrapped_demos=0, # Do not
max_labeled_demos=5, # Max examples to try
max_errors=5, # Max errors before stopping
)
optimized_agent = optimizer.compile(
data_quality_agent,
trainset=train_examples, # Fixed agent name
requires_permission_to_run=False
)
print("✅ Optimization complete!")
# Evaluate optimized agent using correct function
print("\n📈 Optimized Performance:")
optimized_score, optimized_scores, optimized_details = evaluate_with_improved_metric(
optimized_agent, val_examples, verbose=True
)
print(f"\n📊 Optimized average score: {optimized_score:.3f}")
print(
f"🎯 Improvement: {(optimized_score - baseline_score):.3f} ({((optimized_score/baseline_score - 1)*100):.1f}%)"
)
SIMBA
A nice utility of DSPy is we can optimize with a cheaper model, and then run inference with a larger model.
RUN_SIMBA = False # Set to False to skip SIMBA optimization
# RUN_SIMBA = True # Set to False to skip SIMBA optimization
if RUN_SIMBA:
with dspy.context(lm=optimization_lm):
simba = dspy.SIMBA(metric=dspy_evaluation_metric, max_steps=12, max_demos=5, bsize=5)
optimized_agent = simba.compile(data_quality_agent, trainset=train_examples, seed=6793115)
In the end, the SIMBA results overfit and are expensive to run. The analysis has been removed to keep this notebook as short as possible, but I can confidently say this and MIPROv2 are too much horsepower for this current synthetic data setup
Conclusions
- SIMBA overfits given our minimal number of examples. This was clear given the optimized prompt did worse. MIPROv2 was even longer and I came to the conclusion that
- Adding labelled few-shot examples for a react-based made marginal difference and actually worsened performance for gpt-4.1
- GPT-4.1 is really impressive - better 0-shot performance than o3
- Sonnet-4 had trouble respecting output formats, but on ones it did complete it
It seems like o3, while tehcnically the most 'intelligent' of the lot, isn't fully optimized for tool calling compared to gpt-4.1 and sonnet-4. My interpretation based on this experiment is actually these other models are bestter for code-specific multi-tool analysis. o3 would be better for drawing deeply thought-out impacts (ie. a nuanced conclusion or a report summary) of these other models analysis and possibly generating a plan based on these. o3 is like the wise old oracle that you risk over-relying on.
This is an invaluable lesson - bigger is not always better. The further along we go in this AI journey, its important to baseline all of your assumptions against evidence (LLM call outputs), ideally with quantitative evals.
Limitations
- Creating our own data corruptions and then building an agent to detect these may be 'grading our own homework' and am unsure about the impoact of this.
- We haven't varied any params like temperature
Notes
Our synthetic data covers the five structural / integrity issues represented by the demo injections, but as soon as you broaden the corruption library you will want more kinds. Typical data-quality categories you may bump into:
- duplicate_key / duplicate_row
- out_of_range (value outside min–max)
- invalid_enum / invalid_category
- null_violation (NULL where allowed? or use missing_required)
- business_rule_violation (e.g. start_date > end_date)
- inconsistent_format (e.g. date strings in multiple formats)
- inconsistent_units / mixed_units
- orphan_table (table has no referencing rows)
- semantic_duplicate (two rows represent the same real-world entity)
If we want to add new injections in the future:
• Any new Injection that emits Evidence should either a) reuse an existing kind that precisely describes it, or b) introduce a new kind and register it (Enum or dynamic discovery).
Next Steps
- Better context engineering (decent summary here) - this is quite hard to do, but is important to understand
- Business rule injection
- Actual SQL scratchpad (copy db and run queries and updates) to ensure clean generated data can be inserted into SQL and injections are true violations - what about SDV?
- Feedback loop to incorporate replies or additonal data provided by the user