{cas} a journal by Cas Stantonius

Data Onboarder - Part 1

· Calculating...

Can I use DSPy to optimize converting JSON into existing SQL

TL;DR

  • Objective: Generate synthetic data with known data manipulations (transformations and corruptions); optimize prompts so an agent can spot said manipulations to ultimately suggest how the json could be incorporated into the SQL
  • Process: Created a mock database, Python tools for querying the database, defined manipulations, generated synthetic data based on existing schema, built agent to predict these known data anomalies, attempted to score these predictions
  • Outcome: The agent did a good job in identifying the issues in general but we don't know how well yet, since scoring proved to be a challenge. It was too simple to assume the agent would be able to identify root cause data manipluations (ie. delete foregin key) from a series of symptoms (ie. record with id 123 has no foreign key). This may be solvable through better prompting, LLM scoring, and improved manipulation definitions. Additionally, trying to 'pre-define' the impact of data manipulations was too much added complexity.

Intro

I live in a world of mini-projects, where the (often small/non-existant) data seems easier to get as JSON. However any serious project needs to use some form data model (SQL, NoSQL). Therefore this post looks at building an agent that:

  1. Detects data quality issues in incoming JSON
  2. Classify them by severity and remediation type
  3. Suggest fixes that respect your database constraints

The main hypothesis: by generating synthetic data with known corruptions, we can optimize our agent's prompts to better detect real-world data issues.

The Approach: Synthetic Data with Known Faults

Instead of manually crafting test cases, we:

  1. Generate clean synthetic data that respects our schema
  2. Apply realistic corruptions (the kinds we see in production)
  3. Train an agent to detect these known issues
  4. Optimize the detection prompts using DSPy

This approach lets us systematically improve our data quality checks.

Aside: I know I am not the first to try this, and real world data is a lot messier and complex. But the reality is we have to try this since the 'indie' devs like me don't have the key ingredient for projects - large, clean datasets.

graph LR
				A[Clean Synthetic<br/>Data Generator] --> B[Apply Known<br/>Corruptions]
				B --> C[Corrupted Data<br/>+ Fault Labels]
				C --> D[Data Quality<br/>Agent Analysis]
				D --> E[Compare Results<br/>vs Fault Labels]
				E --> F[Optimize Agent<br/>Prompts DSPy]
				F --> A

Setup uv Environment

%%capture

!uv pip install -U sqlite-utils dspy pydantic
import sqlite3
import pandas as pd
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple, TypedDict, Dict, Any, Union
import dspy
import os
from enum import Enum
import random
from pydantic import BaseModel, Field


# Get OpenAI API key from environment variable
openai_api_key = os.getenv('OPENAI_API_KEY')

if not openai_api_key:
    raise ValueError("OPENAI_API_KEY environment variable not found. Please set it with your OpenAI API key.")

# Configure DSPy with O3 model
lm = dspy.LM('openai/o3', api_key=openai_api_key, temperature=1.0, max_tokens=50000)
dspy.configure(lm=lm)

print("✅ DSPy configured with OpenAI O3 model")
print(f"Model: {lm.model}")
print("Ready to use DSPy signatures and modules!")
✅ DSPy configured with OpenAI O3 model
Model: openai/o3
Ready to use DSPy signatures and modules!
# test using dspy

# lm("Is Greenland in Europe or NA?")

Database Setup

Now let's create our SQLite database with the car parts schema and sample data:

# Create database file
db_path = Path("tmp/car_parts.db")
print(f"Creating database at: {db_path}")

# make sure the directory exists
db_path.parent.mkdir(exist_ok=True)

# Connect to database
conn = sqlite3.connect(db_path)
cursor = conn.cursor()

# Enable foreign key enforcement
cursor.execute("PRAGMA foreign_keys = ON")
print("Foreign key enforcement enabled")
Creating database at: tmp/car_parts.db
Foreign key enforcement enabled
# Drop existing tables if they exist
drop_statements = [
    "DROP TABLE IF EXISTS car_model_parts",
    "DROP TABLE IF EXISTS parts", 
    "DROP TABLE IF EXISTS part_categories",
    "DROP TABLE IF EXISTS car_models",
    "DROP TABLE IF EXISTS manufacturers"
]

for statement in drop_statements:
    cursor.execute(statement)
    print(f"Executed: {statement}")

conn.commit()
print("All tables dropped successfully")
Executed: DROP TABLE IF EXISTS car_model_parts
Executed: DROP TABLE IF EXISTS parts
Executed: DROP TABLE IF EXISTS part_categories
Executed: DROP TABLE IF EXISTS car_models
Executed: DROP TABLE IF EXISTS manufacturers
All tables dropped successfully
# Create all tables
create_statements = [
    """CREATE TABLE manufacturers (
        manufacturer_id INTEGER PRIMARY KEY,
        name            TEXT NOT NULL UNIQUE
    )""",
    
    """CREATE TABLE car_models (
        model_id        INTEGER PRIMARY KEY,
        manufacturer_id INTEGER NOT NULL,
        model_name      TEXT NOT NULL,
        model_year      INTEGER NOT NULL,
        FOREIGN KEY (manufacturer_id) REFERENCES manufacturers(manufacturer_id)
            ON DELETE CASCADE
    )""",
    
    """CREATE TABLE part_categories (
        category_id   INTEGER PRIMARY KEY,
        category_name TEXT NOT NULL UNIQUE
    )""",
    
    """CREATE TABLE parts (
        part_id      INTEGER PRIMARY KEY,
        category_id  INTEGER NOT NULL,
        part_number  TEXT NOT NULL UNIQUE,
        description  TEXT NOT NULL,
        FOREIGN KEY (category_id) REFERENCES part_categories(category_id)
            ON DELETE RESTRICT
    )""",
    
    """CREATE TABLE car_model_parts (
        model_id INTEGER NOT NULL,
        part_id  INTEGER NOT NULL,
        qty      INTEGER NOT NULL DEFAULT 1,
        PRIMARY KEY (model_id, part_id),
        FOREIGN KEY (model_id) REFERENCES car_models(model_id)
            ON DELETE CASCADE,
        FOREIGN KEY (part_id)  REFERENCES parts(part_id)
            ON DELETE RESTRICT
    )"""
]

for i, statement in enumerate(create_statements, 1):
    cursor.execute(statement)
    table_name = statement.split()[2]  # Extract table name
    print(f"{i}. Created table: {table_name}")

conn.commit()
print("All tables created successfully!")
1. Created table: manufacturers
2. Created table: car_models
3. Created table: part_categories
4. Created table: parts
5. Created table: car_model_parts
All tables created successfully!

Insert Sample Data

Now let's populate the tables with sample data:

# Insert manufacturers
manufacturers_data = [
    ('Toyota',),
    ('Honda',)
]

cursor.executemany("INSERT INTO manufacturers (name) VALUES (?)", manufacturers_data)
print(f"Inserted {len(manufacturers_data)} manufacturers")

# Insert car models
car_models_data = [
    (1, 'Camry', 2022),
    (1, 'Corolla', 2023),
    (2, 'Accord', 2022),
    (2, 'Civic', 2023)
]

cursor.executemany("INSERT INTO car_models (manufacturer_id, model_name, model_year) VALUES (?, ?, ?)", car_models_data)
print(f"Inserted {len(car_models_data)} car models")

# Insert part categories
part_categories_data = [
    ('Engine',),
    ('Brake',),
    ('Suspension',)
]

cursor.executemany("INSERT INTO part_categories (category_name) VALUES (?)", part_categories_data)
print(f"Inserted {len(part_categories_data)} part categories")

conn.commit()
print("Basic data inserted successfully!")
Inserted 2 manufacturers
Inserted 4 car models
Inserted 3 part categories
Basic data inserted successfully!
# Insert parts
parts_data = [
    (1, 'ENG-2.5L-A25A', '2.5 L I4 engine (Toyota)'),
    (1, 'ENG-1.5T-L15B', '1.5 L Turbo I4 engine (Honda)'),
    (2, 'BRK-FR-TOY-22', 'Front brake pad set (Toyota 2022+)'),
    (2, 'BRK-FR-HON-22', 'Front brake pad set (Honda 2022+)'),
    (3, 'SUS-STRUT-CIV-23', 'Front strut (Civic 2023)')
]

cursor.executemany("INSERT INTO parts (category_id, part_number, description) VALUES (?, ?, ?)", parts_data)
print(f"Inserted {len(parts_data)} parts")

# Insert car model parts relationships
car_model_parts_data = [
    # Camry 2022
    (1, 1, 1),  # Camry -> Toyota Engine
    (1, 3, 2),  # Camry -> Toyota Brake Pads (qty: 2)
    # Corolla 2023
    (2, 3, 2),  # Corolla -> Toyota Brake Pads (qty: 2)
    # Accord 2022
    (3, 2, 1),  # Accord -> Honda Engine
    (3, 4, 2),  # Accord -> Honda Brake Pads (qty: 2)
    # Civic 2023
    (4, 2, 1),  # Civic -> Honda Engine
    (4, 5, 2)   # Civic -> Front Strut (qty: 2)
]

cursor.executemany("INSERT INTO car_model_parts (model_id, part_id, qty) VALUES (?, ?, ?)", car_model_parts_data)
print(f"Inserted {len(car_model_parts_data)} car model-part relationships")

conn.commit()
print("All sample data inserted successfully!")
Inserted 5 parts
Inserted 7 car model-part relationships
All sample data inserted successfully!

Verify the Data

Let's verify that our database was created correctly and query some data:

# Check table structure
print("=== DATABASE TABLES ===")
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
tables = cursor.fetchall()
for table in tables:
    print(f"✓ {table[0]}")

print(f"\nTotal tables created: {len(tables)}")

# Show row counts
print("\n=== ROW COUNTS ===")
for table in tables:
    cursor.execute(f"SELECT COUNT(*) FROM {table[0]}")
    count = cursor.fetchone()[0]
    print(f"{table[0]}: {count} rows")
=== DATABASE TABLES ===
✓ car_model_parts
✓ car_models
✓ manufacturers
✓ part_categories
✓ parts

Total tables created: 5

=== ROW COUNTS ===
car_model_parts: 7 rows
car_models: 4 rows
manufacturers: 2 rows
part_categories: 3 rows
parts: 5 rows
# Example query: Get all car models with their manufacturers
query = """
SELECT 
    m.name as manufacturer,
    cm.model_name,
    cm.model_year
FROM car_models cm
JOIN manufacturers m ON cm.manufacturer_id = m.manufacturer_id
ORDER BY m.name, cm.model_name
"""

df_models = pd.read_sql_query(query, conn)
print("=== CAR MODELS ===")
print(df_models.to_string(index=False))
=== CAR MODELS ===
manufacturer model_name  model_year
       Honda     Accord        2022
       Honda      Civic        2023
      Toyota      Camry        2022
      Toyota    Corolla        2023

# Complex query: Show which parts fit which car models
query = """
SELECT 
    m.name as manufacturer,
    cm.model_name,
    cm.model_year,
    pc.category_name as part_category,
    p.part_number,
    p.description,
    cmp.qty
FROM car_model_parts cmp
JOIN car_models cm ON cmp.model_id = cm.model_id
JOIN manufacturers m ON cm.manufacturer_id = m.manufacturer_id
JOIN parts p ON cmp.part_id = p.part_id
JOIN part_categories pc ON p.category_id = pc.category_id
ORDER BY m.name, cm.model_name, pc.category_name
"""

df_compatibility = pd.read_sql_query(query, conn)
print("=== PARTS COMPATIBILITY ===")
print(df_compatibility.to_string(index=False))
=== PARTS COMPATIBILITY ===
manufacturer model_name  model_year part_category      part_number                        description  qty
       Honda     Accord        2022         Brake    BRK-FR-HON-22  Front brake pad set (Honda 2022+)    2
       Honda     Accord        2022        Engine    ENG-1.5T-L15B      1.5 L Turbo I4 engine (Honda)    1
       Honda      Civic        2023        Engine    ENG-1.5T-L15B      1.5 L Turbo I4 engine (Honda)    1
       Honda      Civic        2023    Suspension SUS-STRUT-CIV-23           Front strut (Civic 2023)    2
      Toyota      Camry        2022         Brake    BRK-FR-TOY-22 Front brake pad set (Toyota 2022+)    2
      Toyota      Camry        2022        Engine    ENG-2.5L-A25A           2.5 L I4 engine (Toyota)    1
      Toyota    Corolla        2023         Brake    BRK-FR-TOY-22 Front brake pad set (Toyota 2022+)    2
# Close database connection
conn.close()
print(f"Database connection closed. Database saved as: {db_path}")
print(f"Database size: {db_path.stat().st_size} bytes")
Database connection closed. Database saved as: /Users/craig/Projects/cas/notebooks/nbs/tmp/car_parts.db
Database size: 40960 bytes

Define Tools

# Database connection helper
def get_db_connection(db_path: str = "tmp/car_parts.db") -> sqlite3.Connection:
    """Get a database connection with foreign keys enabled."""
    conn = sqlite3.connect(db_path)
    conn.execute("PRAGMA foreign_keys = ON")
    return conn

def list_tables() -> List[str]:
    """
    Get all table names in the database.
    
    Use this tool when: You need to know what tables exist in the database before 
    querying or inserting data.
    
    Returns: List of table names as strings.
    
    Example output: ['manufacturers', 'car_models', 'parts']
    """
    conn = get_db_connection()
    cursor = conn.cursor()
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
    tables = [row[0] for row in cursor.fetchall()]
    conn.close()
    return tables

def get_table_schema(table_name: str) -> pd.DataFrame:
    """
    Get detailed schema information for a specific table including column names,
    types, constraints, and whether columns allow NULL values.
    
    Use this tool when: You need to understand the structure of a table before
    inserting data, including what columns exist, their data types, which are
    required (NOT NULL), and which are primary keys.
    
    Args:
        table_name: Name of the table to inspect
        
    Returns: DataFrame with columns: column_name, data_type, not_null, default_value, primary_key
    
    Example output for 'manufacturers' table:
    | column_name     | data_type | not_null | default_value | primary_key |
    |-----------------|-----------|----------|---------------|-------------|
    | manufacturer_id | INTEGER   | 0        | None          | 1           |
    | name            | TEXT      | 1        | None          | 0           |
    """
    conn = get_db_connection()
    cursor = conn.cursor()
    cursor.execute(f"PRAGMA table_info({table_name})")
    columns = cursor.fetchall()
    conn.close()
    
    df = pd.DataFrame(columns, columns=['cid', 'column_name', 'data_type', 'not_null', 'default_value', 'primary_key'])
    return df[['column_name', 'data_type', 'not_null', 'default_value', 'primary_key']]

def get_foreign_key_relationships() -> pd.DataFrame:
    """
    Get all foreign key relationships in the database showing how tables are connected.
    
    Use this tool when: You need to understand table relationships before inserting
    data to ensure referential integrity. This shows which columns reference other
    tables and what happens on delete/update.
    
    Returns: DataFrame with columns: from_table, from_column, to_table, to_column, on_delete, on_update
    
    Example output:
    | from_table  | from_column     | to_table      | to_column       | on_delete | on_update |
    |-------------|-----------------|---------------|-----------------|-----------|-----------|
    | car_models  | manufacturer_id | manufacturers | manufacturer_id | CASCADE   | NO ACTION |
    """
    conn = get_db_connection()
    cursor = conn.cursor()
    
    # Get all tables
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
    tables = [row[0] for row in cursor.fetchall()]
    
    relationships = []
    for table in tables:
        cursor.execute(f"PRAGMA foreign_key_list({table})")
        fks = cursor.fetchall()
        for fk in fks:
            relationships.append({
                'from_table': table,
                'from_column': fk[3],  # from column
                'to_table': fk[2],     # to table
                'to_column': fk[4],    # to column
                'on_delete': fk[6],    # on delete action
                'on_update': fk[5]     # on update action
            })
    
    conn.close()
    return pd.DataFrame(relationships)

def sample_table_data(table_name: str, limit: int = 5) -> pd.DataFrame:
    """
    Get a sample of rows from a table to understand the data format and content.
    
    Use this tool when: You need to see example data in a table to understand
    the format, typical values, and data patterns before inserting new records.
    
    Args:
        table_name: Name of the table to sample
        limit: Maximum number of rows to return (default: 5)
        
    Returns: DataFrame containing sample rows from the table
    
    Example output for 'manufacturers' table:
    | manufacturer_id | name   |
    |-----------------|--------|
    | 1               | Toyota |
    | 2               | Honda  |
    """
    conn = get_db_connection()
    query = f"SELECT * FROM {table_name} LIMIT {limit}"
    df = pd.read_sql_query(query, conn)
    conn.close()
    return df

def get_unique_values(table_name: str, column_name: str, limit: int = 50) -> List[Any]:
    """
    Get unique values from a specific column in a table.
    
    Use this tool when: You need to see what values already exist in a column
    to avoid duplicates, understand naming conventions, or see valid options
    for foreign key references.
    
    Args:
        table_name: Name of the table
        column_name: Name of the column to get unique values from
        limit: Maximum number of unique values to return (default: 50)
        
    Returns: List of unique values from the column
    
    Example output for manufacturers.name:
    ['Toyota', 'Honda']
    """
    conn = get_db_connection()
    cursor = conn.cursor()
    cursor.execute(f"SELECT DISTINCT {column_name} FROM {table_name} ORDER BY {column_name} LIMIT {limit}")
    values = [row[0] for row in cursor.fetchall()]
    conn.close()
    return values

def get_table_row_count(table_name: str) -> int:
    """
    Get the total number of rows in a table.
    
    Use this tool when: You need to understand the size of a table or check
    if it's empty before inserting data.
    
    Args:
        table_name: Name of the table to count
        
    Returns: Integer count of rows in the table
    
    Example output: 2 (for manufacturers table)
    """
    conn = get_db_connection()
    cursor = conn.cursor()
    cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
    count = cursor.fetchone()[0]
    conn.close()
    return count

def get_table_columns(table_name: str) -> List[str]:
    """
    Get just the column names for a table (simpler than full schema).
    
    Use this tool when: You need a quick list of column names for a table
    without the full schema details.
    
    Args:
        table_name: Name of the table
        
    Returns: List of column names as strings
    
    Example output for 'manufacturers': ['manufacturer_id', 'name']
    """
    conn = get_db_connection()
    cursor = conn.cursor()
    cursor.execute(f"PRAGMA table_info({table_name})")
    columns = [row[1] for row in cursor.fetchall()]  # row[1] is column name
    conn.close()
    return columns

def execute_query(query: str) -> pd.DataFrame:
    """
    Execute a custom SQL query and return results as DataFrame.
    
    Use this tool when: You need to run a specific SQL query to understand
    data relationships, check for existing data, or perform complex lookups
    that other tools don't cover.
    
    Args:
        query: SQL query string to execute
        
    Returns: DataFrame with query results
    
    Example usage: execute_query("SELECT COUNT(*) as total FROM manufacturers")
    """
    conn = get_db_connection()
    df = pd.read_sql_query(query, conn)
    conn.close()
    return df

def check_referential_integrity(table_name: str, column_name: str, value: Any) -> bool:
    """
    Check if a foreign key value exists in the referenced table.
    
    Use this tool when: You need to verify that a foreign key value exists
    in the parent table before inserting a record.
    
    Args:
        table_name: Name of the table containing the foreign key
        column_name: Name of the foreign key column
        value: Value to check for existence
        
    Returns: True if the value exists in the referenced table, False otherwise
    
    Example: check_referential_integrity('car_models', 'manufacturer_id', 1)
    Returns: True if manufacturer_id=1 exists in manufacturers table
    """
    # First get the foreign key relationship
    conn = get_db_connection()
    cursor = conn.cursor()
    
    cursor.execute(f"PRAGMA foreign_key_list({table_name})")
    fks = cursor.fetchall()
    
    target_table = None
    target_column = None
    
    for fk in fks:
        if fk[3] == column_name:  # from column matches
            target_table = fk[2]   # to table
            target_column = fk[4]  # to column
            break
    
    if not target_table:
        conn.close()
        return False  # No foreign key relationship found
    
    # Check if value exists in target table
    cursor.execute(f"SELECT COUNT(*) FROM {target_table} WHERE {target_column} = ?", (value,))
    exists = cursor.fetchone()[0] > 0
    conn.close()
    return exists

def validate_json_types_against_schema(json_data: Dict[str, Any], table_name: str) -> List[str]:
    """
    Validate that JSON data types match the expected database schema types.
    
    Use this tool when: You need to verify that incoming JSON data has compatible
    types before attempting to insert into the database.
    
    Args:
        json_data: Dictionary containing the JSON data to validate
        table_name: Name of the target table to validate against
        
    Returns: List of validation error messages (empty list if all valid)
    
    Example output: ['Field "model_year" expected INTEGER but got string "2022"']
    """
    errors = []
    schema_df = get_table_schema(table_name)
    
    for _, row in schema_df.iterrows():
        column_name = row['column_name']
        expected_type = row['data_type']
        is_required = row['not_null'] == 1
        is_primary_key = row['primary_key'] == 1
        
        # Skip auto-increment primary keys
        if is_primary_key and expected_type == 'INTEGER':
            continue
            
        if column_name in json_data:
            value = json_data[column_name]
            
            # Check for null values in required fields
            if value is None and is_required:
                errors.append(f"Field '{column_name}' is required but got null")
                continue
            
            # Skip type checking for null values in optional fields
            if value is None:
                continue
                
            # Type validation
            if expected_type == 'INTEGER':
                if not isinstance(value, int):
                    # Allow string numbers that can be converted
                    try:
                        int(value)
                    except (ValueError, TypeError):
                        errors.append(f"Field '{column_name}' expected INTEGER but got {type(value).__name__} '{value}'")
            
            elif expected_type == 'TEXT':
                if not isinstance(value, str):
                    errors.append(f"Field '{column_name}' expected TEXT but got {type(value).__name__} '{value}'")
            
            elif expected_type == 'REAL':
                if not isinstance(value, (int, float)):
                    try:
                        float(value)
                    except (ValueError, TypeError):
                        errors.append(f"Field '{column_name}' expected REAL but got {type(value).__name__} '{value}'")
        
        elif is_required and not is_primary_key:
            errors.append(f"Required field '{column_name}' is missing from JSON data")
    
    return errors

def find_semantic_duplicates(new_data: Dict[str, Any], table_name: str, similarity_threshold: float = 0.8) -> List[Dict[str, Any]]:
    """
    Find existing records that might be semantic duplicates of the new data.
    
    Use this tool when: You need to detect records that aren't exact duplicates
    but represent the same entity (e.g., "Toyota" vs "TOYOTA" vs "Toyota Motors").
    
    Args:
        new_data: Dictionary containing the new record data
        table_name: Name of the table to search for duplicates
        similarity_threshold: Minimum similarity score (0.0 to 1.0) to consider a match
        
    Returns: List of potentially duplicate records with similarity scores
    
    Example output: [{'record': {...}, 'similarity': 0.85, 'matched_fields': ['name']}]
    """
    import difflib
    
    # Get existing data from the table
    existing_df = sample_table_data(table_name, limit=1000)  # Limit for performance
    
    if existing_df.empty:
        return []
    
    potential_duplicates = []
    
    # Get text columns for comparison (skip IDs and numeric-only fields)
    text_columns = []
    schema_df = get_table_schema(table_name)
    for _, row in schema_df.iterrows():
        if row['data_type'] == 'TEXT' and row['column_name'] in new_data:
            text_columns.append(row['column_name'])
    
    if not text_columns:
        return []
    
    # Compare new data against each existing record
    for idx, existing_row in existing_df.iterrows():
        total_similarity = 0
        matched_fields = []
        comparison_count = 0
        
        for column in text_columns:
            if column in new_data and pd.notna(existing_row[column]):
                new_value = str(new_data[column]).lower().strip()
                existing_value = str(existing_row[column]).lower().strip()
                
                if new_value and existing_value:
                    similarity = difflib.SequenceMatcher(None, new_value, existing_value).ratio()
                    total_similarity += similarity
                    comparison_count += 1
                    
                    if similarity >= similarity_threshold:
                        matched_fields.append(column)
        
        if comparison_count > 0:
            avg_similarity = total_similarity / comparison_count
            
            if avg_similarity >= similarity_threshold:
                potential_duplicates.append({
                    'record': existing_row.to_dict(),
                    'similarity': round(avg_similarity, 3),
                    'matched_fields': matched_fields
                })
    
    # Sort by similarity score (highest first)
    potential_duplicates.sort(key=lambda x: x['similarity'], reverse=True)
    
    return potential_duplicates

def validate_business_rules_with_llm(json_data: Dict[str, Any], table_name: str, business_rules: str) -> Dict[str, Any]:
    """
    Use an LLM to perform a "sense check" of the data against business rules.
    
    Use this tool when: You need to validate complex business logic that can't be
    expressed as simple database constraints. The LLM will analyze the data and
    rules to identify potential violations or concerns.
    
    Args:
        json_data: Dictionary containing the data to validate
        table_name: Name of the target table
        business_rules: String describing the business rules to validate against
        
    Returns: Dictionary with validation results
    
    Example output: {
        'valid': False,
        'violations': ['Honda part assigned to Toyota model'],
        'warnings': ['Unusual quantity: 10 engines for one car'],
        'suggestions': ['Consider verifying part compatibility']
    }
    
    Note: This is a placeholder function that would integrate with an LLM service.
    """
    
    # Get related data for context
    schema_info = get_table_schema(table_name).to_dict('records')
    sample_data = sample_table_data(table_name, limit=3).to_dict('records')
    
    # For foreign keys, get some context from related tables
    fk_context = {}
    fk_relationships = get_foreign_key_relationships()
    table_fks = fk_relationships[fk_relationships['from_table'] == table_name]
    
    for _, fk in table_fks.iterrows():
        if fk['from_column'] in json_data:
            fk_value = json_data[fk['from_column']]
            related_data = execute_query(f"SELECT * FROM {fk['to_table']} WHERE {fk['to_column']} = {fk_value}")
            if not related_data.empty:
                fk_context[fk['from_column']] = related_data.iloc[0].to_dict()
    
    # Placeholder LLM prompt structure
    validation_context = {
        'table_name': table_name,
        'new_data': json_data,
        'schema': schema_info,
        'sample_existing_data': sample_data,
        'foreign_key_context': fk_context,
        'business_rules': business_rules
    }
    
    # TODO: Replace with actual LLM call
    # prompt = f"""
    # Analyze this data for business rule violations:
    # 
    # Table: {table_name}
    # New Data: {json_data}
    # Business Rules: {business_rules}
    # Schema Context: {schema_info}
    # Related Data: {fk_context}
    # 
    # Identify any violations, warnings, or suggestions.
    # """
    # 
    # llm_response = call_llm(prompt)
    
    # Placeholder response - in real implementation, this would be LLM output
    placeholder_result = {
        'valid': True,
        'violations': [],
        'warnings': [],
        'suggestions': ['Business rule validation requires LLM integration'],
        'context_used': validation_context
    }
    
    return placeholder_result

# Test the tools
print("✅ All database validation tools defined successfully!")
print(f"Available tables: {list_tables()}")
print("🆕 Added: validate_json_types_against_schema()")
print("🆕 Added: find_semantic_duplicates()")  
print("🆕 Added: validate_business_rules_with_llm() [placeholder]")
✅ All database validation tools defined successfully!
Available tables: ['car_model_parts', 'car_models', 'manufacturers', 'part_categories', 'parts']
🆕 Added: validate_json_types_against_schema()
🆕 Added: find_semantic_duplicates()
🆕 Added: validate_business_rules_with_llm() [placeholder]
# Demonstrate the new validation tools
print("=== DEMONSTRATING NEW VALIDATION TOOLS ===\n")

# 1. JSON Type Validation
print("1. JSON Type Validation Examples:")

# Valid data
valid_data = {
    "manufacturer_id": 1,
    "model_name": "Prius", 
    "model_year": 2024
}
errors = validate_json_types_against_schema(valid_data, "car_models")
print(f"   Valid data errors: {errors}")

# Invalid data - wrong types
invalid_data = {
    "manufacturer_id": "not_a_number",  # Should be INTEGER
    "model_name": 123,  # Should be TEXT
    "model_year": "2024"  # Could be converted to INT
}
errors = validate_json_types_against_schema(invalid_data, "car_models")
print(f"   Invalid data errors: {errors}")

# Missing required field
missing_data = {
    "model_name": "Prius"
    # Missing required manufacturer_id and model_year
}
errors = validate_json_types_against_schema(missing_data, "car_models")
print(f"   Missing fields errors: {errors}")

print("\n" + "="*50 + "\n")

# 2. Semantic Duplicate Detection
print("2. Semantic Duplicate Detection:")

# Test with similar manufacturer name
new_manufacturer = {"name": "TOYOTA"}  # Similar to existing "Toyota"
duplicates = find_semantic_duplicates(new_manufacturer, "manufacturers")
print(f"   Potential duplicates for 'TOYOTA': {len(duplicates)} found")
if duplicates:
    for dup in duplicates:
        print(f"     - Match: {dup['record']['name']} (similarity: {dup['similarity']})")

# Test with different manufacturer
new_manufacturer2 = {"name": "Ford"}
duplicates2 = find_semantic_duplicates(new_manufacturer2, "manufacturers")
print(f"   Potential duplicates for 'Ford': {len(duplicates2)} found")

print("\n" + "="*50 + "\n")

# 3. Business Rules Validation (Placeholder)
print("3. Business Rules Validation:")

sample_business_rules = """
1. A car model cannot have more than one engine
2. Honda parts should not be used on Toyota vehicles
3. Model year must be within 10 years of current year
4. Brake parts are required for all car models
"""

sample_data = {
    "model_id": 1,
    "part_id": 2,  # Honda engine
    "qty": 1
}

validation_result = validate_business_rules_with_llm(sample_data, "car_model_parts", sample_business_rules)
print(f"   Validation result: {validation_result['valid']}")
print(f"   Suggestions: {validation_result['suggestions']}")
print(f"   Context gathered: {list(validation_result['context_used'].keys())}")

print("\n✅ All new validation tools demonstrated!")
=== DEMONSTRATING NEW VALIDATION TOOLS ===

1. JSON Type Validation Examples:
   Valid data errors: []
   Invalid data errors: ["Field 'manufacturer_id' expected INTEGER but got str 'not_a_number'", "Field 'model_name' expected TEXT but got int '123'"]
   Missing fields errors: ["Required field 'manufacturer_id' is missing from JSON data", "Required field 'model_year' is missing from JSON data"]

==================================================

2. Semantic Duplicate Detection:
   Potential duplicates for 'TOYOTA': 1 found
     - Match: Toyota (similarity: 1.0)
   Potential duplicates for 'Ford': 0 found

==================================================

3. Business Rules Validation:
   Validation result: True
   Suggestions: ['Business rule validation requires LLM integration']
   Context gathered: ['table_name', 'new_data', 'schema', 'sample_existing_data', 'foreign_key_context', 'business_rules']

✅ All new validation tools demonstrated!
# Demonstrate the database inspection tools
print("=== DEMONSTRATING DATABASE INSPECTION TOOLS ===\n")

# 1. List all tables
print("1. Available tables:")
tables = list_tables()
for table in tables:
    print(f"   - {table}")

print("\n" + "="*50 + "\n")

# 2. Get schema for manufacturers table
print("2. Schema for 'manufacturers' table:")
schema = get_table_schema('manufacturers')
print(schema.to_string(index=False))

print("\n" + "="*50 + "\n")

# 3. Show foreign key relationships
print("3. Foreign key relationships:")
fk_df = get_foreign_key_relationships()
if not fk_df.empty:
    print(fk_df.to_string(index=False))
else:
    print("No foreign key relationships found")

print("\n" + "="*50 + "\n")

# 4. Sample data from manufacturers
print("4. Sample data from 'manufacturers' table:")
sample = sample_table_data('manufacturers', 3)
print(sample.to_string(index=False))

print("\n" + "="*50 + "\n")

# 5. Get unique values from a column
print("5. Unique manufacturer names:")
unique_names = get_unique_values('manufacturers', 'name')
print(f"   {unique_names}")

print("\n" + "="*50 + "\n")

# 6. Row counts for all tables
print("6. Row counts for all tables:")
for table in tables:
    count = get_table_row_count(table)
    print(f"   {table}: {count} rows")

print("\n" + "="*50 + "\n")

# 7. Column names for car_models table
print("7. Column names for 'car_models' table:")
columns = get_table_columns('car_models')
print(f"   {columns}")

print("\n" + "="*50 + "\n")

# 8. Custom query example
print("8. Custom query - Car models with manufacturer names:")
query_result = execute_query("""
    SELECT m.name as manufacturer, cm.model_name, cm.model_year 
    FROM car_models cm 
    JOIN manufacturers m ON cm.manufacturer_id = m.manufacturer_id 
    ORDER BY m.name
""")
print(query_result.to_string(index=False))

print("\n" + "="*50 + "\n")

# 9. Check referential integrity
print("9. Referential integrity checks:")
print(f"   manufacturer_id=1 exists: {check_referential_integrity('car_models', 'manufacturer_id', 1)}")
print(f"   manufacturer_id=99 exists: {check_referential_integrity('car_models', 'manufacturer_id', 99)}")

print("\n✅ All tools working correctly!")
=== DEMONSTRATING DATABASE INSPECTION TOOLS ===

1. Available tables:
   - car_model_parts
   - car_models
   - manufacturers
   - part_categories
   - parts

==================================================

2. Schema for 'manufacturers' table:
    column_name data_type  not_null default_value  primary_key
manufacturer_id   INTEGER         0          None            1
           name      TEXT         1          None            0

==================================================

3. Foreign key relationships:
     from_table     from_column        to_table       to_column on_delete on_update
     car_models manufacturer_id   manufacturers manufacturer_id   CASCADE NO ACTION
          parts     category_id part_categories     category_id  RESTRICT NO ACTION
car_model_parts         part_id           parts         part_id  RESTRICT NO ACTION
car_model_parts        model_id      car_models        model_id   CASCADE NO ACTION

==================================================

4. Sample data from 'manufacturers' table:
 manufacturer_id   name
               1 Toyota
               2  Honda

==================================================

5. Unique manufacturer names:
   ['Honda', 'Toyota']

==================================================

6. Row counts for all tables:
   car_model_parts: 7 rows
   car_models: 4 rows
   manufacturers: 2 rows
   part_categories: 3 rows
   parts: 5 rows

==================================================

7. Column names for 'car_models' table:
   ['model_id', 'manufacturer_id', 'model_name', 'model_year']

==================================================

8. Custom query - Car models with manufacturer names:
manufacturer model_name  model_year
       Honda     Accord        2022
       Honda      Civic        2023
      Toyota      Camry        2022
      Toyota    Corolla        2023

==================================================

9. Referential integrity checks:
   manufacturer_id=1 exists: True
   manufacturer_id=99 exists: False

✅ All tools working correctly!

Define Transformations

We categorize data issues into three types:

Schema-Verifiable Violations: Issues detectable through database constraints

  • Missing required fields
  • Foreign key violations
  • Type mismatches

Business Logic Violations: Issues requiring domain knowledge

  • Invalid date ranges
  • Semantic duplicates
  • Workflow state violations

Data Transformations: Structural changes from source systems

  • Renamed columns
  • Flattened relationships
  • Added noise fields
# -----------------------------------------------------------------------------
# FULL ADVANCED SYNTHETIC DATA PARAMETER DEFINITIONS
#   - All violation keys (schema-verifiable & non-verifiable)
#   - All transformation keys (shape & volume)
#   - Helpers to sample according to desired probabilities
# -----------------------------------------------------------------------------

# Helper


def opts(*details):
    """Wrap detail strings in a uniform structure."""
    return {"detail_options": list(details)}


# --- 1. Violations discoverable via DB schema & simple queries ---------------
schema_verifiable_params = {
    # NOT-NULL / required
    "required_fields_missing": opts(
        "omit one required column",
        "omit several required columns",
        "omit every required column",
    ),
    "null_in_not_null_columns": opts(
        "set single NOT-NULL column to NULL",
        "set multiple NOT-NULL columns to NULL",
    ),
    # FK integrity
    "foreign_key_references_missing": opts(
        "one FK points at missing parent",
        "many FKs point at missing parents",
    ),
    "foreign_key_wrong_type": opts(
        "string instead of integer FK",
        "float instead of integer FK",
    ),
    "foreign_key_circular_references": opts(
        "simple two-table cycle",
        "complex three-table cycle",
    ),
    "orphaned_child_records": opts(
        "single orphan child row",
        "many orphan child rows",
    ),
    "foreign_key_null_when_required": opts(
        "one required FK column NULL",
        "multiple required FK columns NULL",
    ),
    # PK / UNIQUE
    "primary_key_duplicates": opts(
        "duplicate PK once",
        "duplicate PK across several rows",
    ),
    "primary_key_missing": opts(
        "row with NULL PK",
        "row missing PK column entirely",
    ),
    "primary_key_wrong_type": opts(
        "string PK where integer expected",
    ),
    "unique_constraint_violations": opts(
        "duplicate single-column UNIQUE value",
        "duplicate composite UNIQUE value",
    ),
    "composite_unique_violations": opts(
        "same composite key duplicated twice",
        "same composite key duplicated many times",
    ),
    # Data-type & length
    "data_type_mismatches": opts(
        "string in integer column",
        "boolean in text column",
        "mixed types in one column",
    ),
    "string_length_violations": opts(
        "value exceeds max length",
        "empty string in NOT-NULL text column",
    ),
    # Composite keys & duplicates
    "incomplete_composite_keys": opts(
        "missing last column of composite PK",
        "only first column supplied",
    ),
    "exact_duplicate_rows": opts(
        "insert identical row twice",
        "insert identical row ten times",
    ),
    "redundant_relationships": opts(
        "duplicate association row",
        "conflicting duplicate association rows",
    ),
    # Multi-table referential chains
    "referential_integrity_chains": opts(
        "break chain at mid-level",
        "multiple broken links in chain",
    ),
    "cascade_delete_conflicts": opts(
        "delete parent restricted by child",
        "cascade loop across multiple tables",
    ),
}

# --- 2. Violations requiring business / semantic reasoning --------------------
non_schema_verifiable_params = {
    "numeric_range_violations": opts(
        "value below allowed minimum",
        "value above allowed maximum",
    ),
    "date_format_violations": opts(
        "invalid date format",
        "impossible calendar date",
    ),
    "enum_value_violations": opts(
        "single invalid enum value",
        "multiple invalid enum values",
    ),
    "business_rule_violations": opts(
        "violate one business rule",
        "violate several business rules",
    ),
    "cross_column_check_failures": opts(
        "date range invalid",
        "amount inconsistent with total",
    ),
    "calculated_field_mismatches": opts(
        "derived field mismatch",
    ),
    "partial_record_sets": opts(
        "missing related lookup row",
        "incomplete hierarchy levels",
    ),
    "missing_required_relationships": opts(
        "parent without children",
        "children without parent",
    ),
    "semantic_duplicates": opts(
        "similar names differing in case",
        "fuzzy duplicate across spelling variants",
    ),
    "case_sensitivity_problems": opts(
        "mixed-case duplicates",
        "case inconsistency in column",
    ),
    "special_character_handling": opts(
        "suspicious SQL-injection pattern",
        "unicode edge-case characters",
    ),
    "temporal_consistency_violations": opts(
        "events out of chronological order",
        "overlapping validity periods",
    ),
    "transaction_boundary_violations": opts(
        "partial commit leaves inconsistent state",
        "rolled-back transaction not isolated",
    ),
    "concurrent_modification_conflicts": opts(
        "version conflict detected",
        "row-level lock conflict",
    ),
    "workflow_state_violations": opts(
        "invalid workflow transition",
        "missing prerequisite state",
    ),
    "temporal_logic_violations": opts(
        "event occurs before creation date",
        "end date earlier than start date",
    ),
    "quantity_balance_violations": opts(
        "negative inventory quantity",
        "quantity exceeds capacity",
    ),
    "version_compatibility_issues": opts(
        "use deprecated field",
        "reference new required field",
    ),
    "schema_migration_conflicts": opts(
        "incompatible type change",
        "potential data loss on migration",
    ),
}

# --- 3a. JSON shape / structure transformations ------------------------------
shape_transformations = {
    "add_related_data": opts(
        "attach new related column to existing table",
        "introduce entirely new related table",
    ),
    "add_noise": opts(
        "insert irrelevant columns",
        "insert unrelated nested object",
    ),
    "flatten_data": opts(
        "denormalise child into parent table",
        "merge nested keys into root object",
    ),
    "rename_columns": opts(
        "rename one column",
        "rename multiple columns",
    ),
    "rename_tables": opts(
        "rename a single table",
        "rename several tables",
    ),
    "drop_columns": opts(
        "drop one expected column",
        "drop multiple expected columns",
    ),
    "drop_tables": opts(
        "drop one expected table",
        "drop several expected tables",
    ),
}

# --- 3b. Volume / depth stress-test transformations --------------------------
output_transformation_params = {
    # "record_volume": opts(1, 10, 100, 1000),
    # "batch_size_variations": opts(
    #     "single_record",
    #     "small_batch (≤10)",
    #     "large_batch (≈100)",
    #     "oversized_batch (>1000)",
    # ),
    "nested_relationship_depth": opts(
        "shallow (1 level)",
        "moderate (2–3 levels)",
        "deep (4–5 levels)",
        "excessive (>5 levels)",
    ),
}

# Combine all transformations
transformation_params = {**shape_transformations, **output_transformation_params}
# =============================================================
#  LIGHTWEIGHT FAULT-LABELLING + SAMPLING HELPERS (v2025-06-20)
# =============================================================

# ---------- 1. ENUMS & LABEL STRUCTURE -----------------------


class Severity(str, Enum):
    BLOCKER = "blocker"  # must fix before insert
    RISKY = "risky"  # can insert but follow-up needed
    INFO = "info"  # cosmetic


class FixType(str, Enum):
    AUTO = "auto"  # agent repairs silently
    ASK_VALUE = "ask_value"  # missing scalar / FK / range
    ASK_MAP = "ask_map"  # ambiguous rename / enum ext.
    ASK_SCHEMA = "ask_schema"  # requires DDL approval


class FaultLabel(TypedDict):
    name: str   # name of the violation or transformation
    severity: Severity
    fix: FixType
    target: str  # filled later by agent



# ---------- 2. RULES (key → severity / fix) ------------------

# Schema-verifiable: almost always BLOCKER or RISKY
_blocker_keys = {
    "required_fields_missing",
    "foreign_key_references_missing",
    "primary_key_duplicates",
    "foreign_key_circular_references",
    "cascade_delete_conflicts",
}
_risky_value_keys = {
    "null_in_not_null_columns",
    "foreign_key_wrong_type",
    "foreign_key_null_when_required",
    "data_type_mismatches",
    "string_length_violations",
    "numeric_range_violations",
    "enum_value_violations",
    "date_format_violations",
}

# Shape transformations
_shape_ask_map = {"rename_columns", "rename_tables"}
_shape_ask_schema = {
    "add_related_data",
    "drop_columns",
    "add_noise",
    "drop_tables",
    "flatten_data",
}


def severity_for(key: str) -> Severity:
    if key in _blocker_keys:
        return Severity.BLOCKER
    if key in _risky_value_keys or key in _shape_ask_map or key in _shape_ask_schema:
        return Severity.RISKY
    return Severity.INFO


def fix_for(key: str) -> FixType:
    if key in _shape_ask_schema:
        return FixType.ASK_SCHEMA
    if key in _shape_ask_map:
        return FixType.ASK_MAP
    if key in _risky_value_keys:
        return FixType.ASK_VALUE
    if key in _blocker_keys:
        return FixType.ASK_VALUE
    return FixType.AUTO


def classify_key(key: str) -> FaultLabel:
    return {
        "name": key,
        "severity": severity_for(key),
        "fix": fix_for(key),
        "target": '',  # agent fills once it inspects payload
    }


# ---------- 3. UPDATED SAMPLERS ------------------------------


def pick_violation(pool: Dict, *, skip_probability: float = 0.05):
    """Return None or {type, detail, label}"""
    if random.random() < skip_probability:
        return None
    key = random.choice(list(pool))
    detail = random.choice(pool[key]["detail_options"])
    return {"type": key, "detail": detail, "label": classify_key(key)}


def pick_transformations(
    pool: Dict,
    *,
    none_probability: float = 0.10,
    weights: tuple[float, float, float] = (0.60, 0.25, 0.15),
):
    """Return None or list[{type, detail, label}]"""
    if random.random() < none_probability:
        return None
    n = random.choices([1, 2, 3], weights=weights, k=1)[0]
    n = min(n, len(pool))
    keys = random.sample(list(pool), n)
    out = []
    for k in keys:
        detail = random.choice(pool[k]["detail_options"])
        out.append({"type": k, "detail": detail, "label": classify_key(k)})
    return out


# ---------- 4. QUICK SELF-TEST -------------------------------

print("Sample violation:")
print(pick_violation(schema_verifiable_params))
print("\nSample transformations:")
print(pick_transformations(transformation_params))
Sample violation:
{'type': 'composite_unique_violations', 'detail': 'same composite key duplicated many times', 'label': {'name': 'composite_unique_violations', 'severity': <Severity.INFO: 'info'>, 'fix': <FixType.AUTO: 'auto'>, 'target': ''}}

Sample transformations:
[{'type': 'add_related_data', 'detail': 'introduce entirely new related table', 'label': {'name': 'add_related_data', 'severity': <Severity.RISKY: 'risky'>, 'fix': <FixType.ASK_SCHEMA: 'ask_schema'>, 'target': ''}}]

Generate Data

# helper functions to collate schema and current sample data

def collate_schema() -> str:
    """
    Collate all database schema information into a single formatted string.
    
    This function combines table schemas, foreign key relationships, and 
    structural information into a comprehensive schema description suitable
    for LLM processing.
    
    Returns: A formatted string containing complete schema information
    """
    output_lines = []
    output_lines.append("=== DATABASE SCHEMA INFORMATION ===\n")
    
    # Get all tables
    tables = list_tables()
    output_lines.append(f"Database contains {len(tables)} tables: {', '.join(tables)}\n")
    
    # Get foreign key relationships first (for context)
    fk_relationships = get_foreign_key_relationships()
    
    if not fk_relationships.empty:
        output_lines.append("--- FOREIGN KEY RELATIONSHIPS ---")
        for _, fk in fk_relationships.iterrows():
            relationship_desc = (
                f"{fk['from_table']}.{fk['from_column']} -> "
                f"{fk['to_table']}.{fk['to_column']} "
                f"(ON DELETE {fk['on_delete']}, ON UPDATE {fk['on_update']})"
            )
            output_lines.append(relationship_desc)
        output_lines.append("")
    
    # Get detailed schema for each table
    output_lines.append("--- TABLE SCHEMAS ---")
    for table_name in sorted(tables):
        output_lines.append(f"\nTable: {table_name}")
        output_lines.append("-" * (len(table_name) + 7))
        
        # Get schema info
        schema_df = get_table_schema(table_name)
        
        # Format schema information
        for _, row in schema_df.iterrows():
            column_info = f"  {row['column_name']}: {row['data_type']}"
            
            # Add constraints
            constraints = []
            if row['primary_key'] == 1:
                constraints.append("PRIMARY KEY")
            if row['not_null'] == 1:
                constraints.append("NOT NULL")
            if row['default_value'] is not None:
                constraints.append(f"DEFAULT {row['default_value']}")
            
            if constraints:
                column_info += f" ({', '.join(constraints)})"
            
            output_lines.append(column_info)
        
        # Add foreign key info for this table
        table_fks = fk_relationships[fk_relationships['from_table'] == table_name] if not fk_relationships.empty else []
        if len(table_fks) > 0:
            output_lines.append("  Foreign Keys:")
            for _, fk in table_fks.iterrows():
                fk_info = f"    {fk['from_column']} references {fk['to_table']}.{fk['to_column']}"
                output_lines.append(fk_info)
        
        # Add row count
        row_count = get_table_row_count(table_name)
        output_lines.append(f"  Current rows: {row_count}")
    
    return "\n".join(output_lines)


def collate_sample_data(max_rows_per_table: int = 3) -> str:
    """
    Collate sample data from all tables into a single formatted string.
    
    This function retrieves sample data from each table and formats it
    in a way that's easy for an LLM to understand the data patterns,
    formats, and relationships.
    
    Args:
        max_rows_per_table: Maximum number of sample rows to include per table
        
    Returns: A formatted string containing sample data from all tables
    """
    output_lines = []
    output_lines.append("=== SAMPLE DATA FROM ALL TABLES ===\n")
    
    # Get all tables
    tables = list_tables()
    
    for table_name in sorted(tables):
        output_lines.append(f"Table: {table_name}")
        output_lines.append("-" * (len(table_name) + 7))
        
        # Get sample data
        sample_df = sample_table_data(table_name, limit=max_rows_per_table)
        
        if sample_df.empty:
            output_lines.append("  (No data)")
        else:
            # Convert DataFrame to a more readable format
            output_lines.append(f"  Columns: {', '.join(sample_df.columns.tolist())}")
            output_lines.append("  Sample rows:")
            
            for idx, row in sample_df.iterrows():
                row_data = []
                for col in sample_df.columns:
                    value = row[col]
                    if pd.isna(value):
                        value = "NULL"
                    elif isinstance(value, str):
                        value = f"'{value}'"
                    row_data.append(f"{col}={value}")
                
                output_lines.append(f"    [{', '.join(row_data)}]")
        
        output_lines.append("")
    
    return "\n".join(output_lines)


def natty_lang_violations(violation_output) -> List[str] | None:
    """
    Convert violation output from pick_violation into natural language.
    
    Args:
        violation_output: Output from pick_violation function (None or dict with 'type' and 'detail' keys)
        
    Returns: None if input is None, otherwise list of strings describing the violation
    """
    if violation_output is None:
        return None
    
    violation_type = violation_output.get('type', 'unknown_violation')
    violation_detail = violation_output.get('detail', 'no details provided')
    
    # Convert snake_case to readable format
    readable_type = violation_type.replace('_', ' ').title()
    
    # Create natural language description
    description = f"Data Violation: {readable_type} - {violation_detail}"
    
    return [description]


def natty_lang_transformations(transformations_output) -> List[str] | None:
    """
    Convert transformations output from pick_transformations into natural language.
    
    Args:
        transformations_output: Output from pick_transformations function (None or list of dicts with 'type' and 'detail' keys)
        
    Returns: None if input is None, otherwise list of strings describing the transformations
    """
    if transformations_output is None:
        return None
    
    descriptions = []
    
    for transformation in transformations_output:
        transformation_type = transformation.get('type', 'unknown_transformation')
        transformation_detail = transformation.get('detail', 'no details provided')
        
        # Convert snake_case to readable format
        readable_type = transformation_type.replace('_', ' ').title()
        
        # Create natural language description
        description = f"Data Transformation: {readable_type} - {transformation_detail}"
        descriptions.append(description)
    
    return descriptions

# Module 1: Generate clean, valid synthetic data based on schema and sample data
class GenerateCleanSynthData(dspy.Signature):
    """
    Generate *new, clean, valid* synthetic data that conforms to the database schema.
    Use the sample data to understand patterns, naming conventions, and data formats.

    IMPORTANT: Create realistic automotive industry data with natural variation:
    - Use real car manufacturers beyond the samples (Ford, Chevrolet, Nissan, BMW, etc.)
    - Generate authentic part numbers following automotive conventions
    - Create realistic model names and years (recent but not current sample years)
    - Use proper automotive terminology in descriptions
    - Maintain referential integrity across all foreign key relationships
    - Include variety in data formatting while staying within schema constraints

    The output should be well-formed JSON data that could be successfully inserted
    into the database without any violations. This clean data will later be corrupted
    with realistic data quality issues for testing purposes.

    Generate 3-5 records per table to provide good variety for testing.
    """

    current_schema = dspy.InputField(
        desc="Complete database schema with table structures, relationships, and constraints"
    )
    sample_data = dspy.InputField(
        desc="Sample of existing data showing patterns and formats"
    )

    clean_synthetic_data: dict = dspy.OutputField(
        desc="Valid JSON data organized by table name, ready for database insertion"
    )


class GenerateCleanData(dspy.Module):
    """First module: Generate clean synthetic data that conforms to schema"""

    def __init__(self):
        self.schema = collate_schema()
        self.sample = collate_sample_data()
        self.sig = dspy.Predict(GenerateCleanSynthData)

    def forward(self):
        response = self.sig(current_schema=self.schema, sample_data=self.sample)

        return response


class CorruptDataItem(BaseModel):
    name: str = Field(description="The name of the applied violation or transformation as provided by the `pick_violation` or `pick_transformations` input")
    detail: str = Field(description="An explicit, natural language description of the applied violation or transformation. Must all names of the table, column, or value that was affected.")

# SIMPLIFIED Module 2: Apply violations and transformations AND output fault labels directly
class ApplyDataCorruptionSimplified(dspy.Signature):
    """
    Take clean, valid synthetic data and apply specific violations and transformations 
    to create REALISTIC, ORGANIC test scenarios that mimic real-world data onboarding problems.
    
    IMPORTANT: For each violation or transformation applied, output a `CorruptDataItem` object with the name of the violation or transformation and a explicit detail string.
    
    ❌ AVOID creating obvious artificial problems like:
    - Tables named "noise", "junk", or "test_data"
    - Completely unrelated data (like food items in a car parts database)
    
    ✅ CREATE realistic automotive industry data problems like:
    - Manufacturer name variations: "Toyota" vs "TOYOTA MOTOR CORP" vs "Toyota Motors"
    - Inconsistent part numbering: "BRK-FR-TOY-22" vs "BRK_FR_TOY_22" vs "BRKFRTOY22"
    - Missing required fields, NULL values in NOT NULL columns
    - Renamed columns from legacy systems: "mfg_name" instead of "name"
    - Dropped tables/columns during data migration
    """
    clean_data = dspy.InputField(desc="Clean, valid synthetic data organized by table")
    schema_info = dspy.InputField(desc="Database schema information for constraint validation") 
    violation_instructions = dspy.InputField(desc="Natural language description of data violation to apply")
    transformation_instructions = dspy.InputField(desc="Natural language description of data transformations to apply")
    
    corrupted_data: dict = dspy.OutputField(desc="Realistically corrupted data that maintains automotive domain authenticity")
    corruption_details: List[CorruptDataItem] = dspy.OutputField(desc="List of CorruptDataItem objects with the name of the violation or transformation and an explicit detail string")


class ApplyViolationsAndTransformationsSimplified(dspy.Module):
    """SIMPLIFIED module: Apply violations and transformations to clean data and output fault labels directly"""

    def __init__(self, clean_data: dict):
        self.clean_data = clean_data
        self.schema = collate_schema()
        # Generate random violations and transformations
        self.violation = pick_violation(schema_verifiable_params)
        self.transformations = pick_transformations(transformation_params)
        self.sig = dspy.Predict(ApplyDataCorruptionSimplified)

    def forward(self):
        # Convert violations and transformations to natural language instructions
        violation_instructions = natty_lang_violations(self.violation)
        transformation_instructions = natty_lang_transformations(self.transformations)

        # Convert to strings for LLM input
        violation_str = "; ".join(violation_instructions) if violation_instructions else "No violations to apply"
        transformation_str = "; ".join(transformation_instructions) if transformation_instructions else "No transformations to apply"

        response = self.sig(
            clean_data=self.clean_data,
            schema_info=self.schema,
            violation_instructions=violation_str,
            transformation_instructions=transformation_str
        )

        # Process corruption_details into proper FaultLabel objects
        # Use the original violation/transformation objects instead of relying on LLM output
        processed_fault_labels = []
        
        # Create fault labels from original violations and transformations
        original_violations_and_transformations = []
        if self.violation:
            original_violations_and_transformations.append(self.violation)
        if self.transformations:
            original_violations_and_transformations.extend(self.transformations)
        
        # Map corruption details to original violations/transformations
        # Handle cases where LLM might return different number of items than expected
        num_items = min(len(response.corruption_details), len(original_violations_and_transformations))
        
        for i in range(num_items):
            corruption_item = response.corruption_details[i]
            original_item = original_violations_and_transformations[i]
            original_key = original_item["type"]
            
            # Use the original key for classification
            fault_classification = classify_key(original_key)
            
            # Create proper fault label with correct enums
            processed_fault = {
                "severity": fault_classification["severity"],
                "fix": fault_classification["fix"],
                "target": corruption_item.detail,
                "violation_type": original_key,  # Use original key
            }
            processed_fault_labels.append(processed_fault)
        
        # If LLM returned more corruption details than we had original items,
        # try to map them by creating a reverse mapping from title case back to snake_case
        if len(response.corruption_details) > len(original_violations_and_transformations):
            # Create reverse mapping dictionary
            all_keys = {**schema_verifiable_params, **transformation_params}
            reverse_mapping = {}
            for key in all_keys.keys():
                title_case_key = key.replace('_', ' ').title()
                reverse_mapping[title_case_key] = key
                reverse_mapping[key] = key  # Also map original key to itself
            
            # Process remaining items
            for i in range(len(original_violations_and_transformations), len(response.corruption_details)):
                corruption_item = response.corruption_details[i]
                
                # Try to find original key using reverse mapping
                original_key = reverse_mapping.get(corruption_item.name, corruption_item.name)
                
                # Use the original key for classification
                fault_classification = classify_key(original_key)
                
                # Create proper fault label with correct enums
                processed_fault = {
                    "severity": fault_classification["severity"],
                    "fix": fault_classification["fix"],
                    "target": corruption_item.detail,
                    "violation_type": original_key,
                }
                processed_fault_labels.append(processed_fault)

        # Add fault_labels to response
        response.fault_labels = processed_fault_labels

        return response


# SIMPLIFIED combined pipeline
class GenerateSyntheticTestDataSimplified(dspy.Module):
    """SIMPLIFIED pipeline: Generate clean data, apply violations/transformations with direct fault label output"""
    
    def __init__(self):
        self.clean_generator = GenerateCleanData()
        
    def forward(self):
        # Create faster model for clean data generation
        faster_lm = dspy.LM('openai/gpt-4.1', api_key=os.getenv('OPENAI_API_KEY'), temperature=1.0, max_tokens=5000)
        
        # Step 1: Generate clean data (using faster model)
        with dspy.context(lm=faster_lm):
            clean_response = self.clean_generator()
            clean_data = clean_response.clean_synthetic_data
        
        # Step 2: Apply violations and transformations AND get fault labels directly (keep original model)
        corruptor = ApplyViolationsAndTransformationsSimplified(clean_data)
        corrupted_response = corruptor()
        
        return {
            'clean_data': clean_data,
            'corrupted_data': corrupted_response.corrupted_data,
            'fault_labels': corrupted_response.fault_labels,  # Fault labels directly from ApplyDataCorruption
            'expected_detections': len(corrupted_response.fault_labels)
        }


# Test the SIMPLIFIED pipeline
print("🔧 Testing SIMPLIFIED GenerateSyntheticTestData pipeline...")

# Create and run the simplified pipeline
simplified_pipeline = GenerateSyntheticTestDataSimplified()
simplified_result = simplified_pipeline()

print("\n=== SIMPLIFIED PIPELINE RESULTS ===")
print(f"Tables in corrupted data: {list(simplified_result['corrupted_data'].keys())}")
print(f"Number of fault labels: {simplified_result['expected_detections']}")

print("\n--- FAULT LABELS FOR EVALUATION ---")
for i, fault_label in enumerate(simplified_result['fault_labels'], 1):
    print(f"\nFault {i}:")
    print(f"  Type: {fault_label.get('violation_type', 'unknown')}")
    print(f"  Severity: {fault_label['severity']}")
    print(f"  Fix: {fault_label['fix']}")
    print(f"  Target: {fault_label['target']}")
    
🔧 Testing SIMPLIFIED GenerateSyntheticTestData pipeline...

=== SIMPLIFIED PIPELINE RESULTS ===
Tables in corrupted data: ['manufacturers', 'part_categories', 'car_models', 'parts', 'car_model_parts']
Number of fault labels: 2

--- FAULT LABELS FOR EVALUATION ---

Fault 1:
  Type: required_fields_missing
  Severity: Severity.BLOCKER
  Fix: FixType.ASK_VALUE
  Target: Removed NOT NULL columns in several tables: manufacturers.name absent for manufacturer_id=4 and manufacturers.manufacturer_id missing for new row 'Toyota Motor Corp'; part_categories.category_name omitted for category_id=5; car_models.model_year omitted for model_id=5 and manufacturer_id omitted for model_id=7; parts.part_number missing for part_id=7 and parts.category_id missing for part_id=9; car_model_parts.qty omitted for (model_id=5, part_id=8) and car_model_parts.model_id omitted for row with part_id=10.

Fault 2:
  Type: nested_relationship_depth
  Severity: Severity.INFO
  Fix: FixType.AUTO
  Target: Replaced the flat TEXT value in parts.description for part_id=8 with a 5-level nested JSON structure (partDetails → fitment → vehicle and partDetails → package → dimensions → length/width/height) to introduce excessive nesting.
!uv pip install nbformat
Using Python 3.12.8 environment at: /Users/craig/miniforge3/envs/cas
Resolved 11 packages in 177ms                                        
⠙ Preparing packages... (0/2)                                                   
⠙ Preparing packages... (0/2)--------------     0 B/23.36 KiB           
⠙ Preparing packages... (0/2)--------------     0 B/23.36 KiB           
fastjsonschema       ------------------------------     0 B/23.36 KiB
⠙ Preparing packages... (0/2)--------------     0 B/76.62 KiB           
fastjsonschema       ------------------------------ 14.91 KiB/23.36 KiB
⠙ Preparing packages... (0/2)--------------     0 B/76.62 KiB           
fastjsonschema       ------------------------------ 14.91 KiB/23.36 KiB
⠙ Preparing packages... (0/2)-------------- 16.00 KiB/76.62 KiB         
fastjsonschema       ------------------------------ 23.36 KiB/23.36 KiB
⠙ Preparing packages... (0/2)-------------- 16.00 KiB/76.62 KiB         
fastjsonschema       ------------------------------ 23.36 KiB/23.36 KiB
⠙ Preparing packages... (0/2)-------------- 32.00 KiB/76.62 KiB         
⠙ Preparing packages... (0/2)-------------- 32.00 KiB/76.62 KiB         
⠙ Preparing packages... (0/2)[2m----------- 48.00 KiB/76.62 KiB         
⠙ Preparing packages... (0/2)---------- 64.00 KiB/76.62 KiB         
Prepared 2 packages in 34ms                                                  
Installed 2 packages in 10ms                                
 + fastjsonschema==2.21.1
 + nbformat==5.10.4

Data Agent

# Simplified FaultLabel without name field
class DetectedFault(BaseModel):
    severity: str = Field(description="One of: blocker, risky, info")
    fix: str = Field(description="One of: auto, ask_value, ask_map, ask_schema")
    target: str = Field(
        description="Detailed description of what was detected and where"
    )


# DSPy Signature for the Data Agent
class AnalyzeDataQuality(dspy.Signature):
    """
    Analyze provided JSON data against database schema to detect data quality issues.

    You have access to database inspection tools. Use them systematically to:
    1. Check schema compliance (data types, constraints, foreign keys)
    2. Detect missing required fields or tables
    3. Identify referential integrity violations
    4. Find semantic issues (duplicates, invalid values)
    5. Spot structural transformations (renamed/dropped columns)

    For each issue found, determine:
    - Severity: blocker (prevents insertion), risky (needs follow-up), info (cosmetic)
    - Fix type: auto (can fix automatically), ask_value (need user input for values),
               ask_map (need mapping info), ask_schema (need schema changes)
    - Target: Specific description of the issue with table/column/value details

    Be thorough but avoid false positives. Focus on actual data problems.
    
    Use the tools provided to inspect the data and schema.
    """

    json_data: dict = dspy.InputField(
        desc="JSON data to analyze, organized by table name"
    )

    detected_faults: List[DetectedFault] = dspy.OutputField(
        desc="List of detected data quality issues with severity, fix type, and details"
    )

# Create a dictionary of all the database tools
db_tools = [list_tables, get_table_schema, get_foreign_key_relationships, sample_table_data, get_unique_values, get_table_row_count, get_table_columns, execute_query, check_referential_integrity, validate_json_types_against_schema, find_semantic_duplicates, validate_business_rules_with_llm, collate_schema, collate_sample_data]


# DSPy ReAct Agent
data_quality_agent = dspy.ReAct(AnalyzeDataQuality, tools=db_tools)

# Run the agent on the corrupted synthetic data
print("🔍 Analyzing corrupted data with Data Quality Agent...")

# Use the corrupted data from the previous pipeline run
corrupted_data = simplified_result["corrupted_data"]
# Run the agent
agent_result = data_quality_agent(json_data=corrupted_data)

print(f"\n✅ Agent analysis complete!")
🔍 Analyzing corrupted data with Data Quality Agent...

✅ Agent analysis complete!
simplified_result
{'clean_data': {'manufacturers': [{'manufacturer_id': 3, 'name': 'Ford'},
   {'manufacturer_id': 4, 'name': 'BMW'},
   {'manufacturer_id': 5, 'name': 'Chevrolet'}],
  'part_categories': [{'category_id': 4, 'category_name': 'Electrical'},
   {'category_id': 5, 'category_name': 'Transmission'},
   {'category_id': 6, 'category_name': 'Exhaust'}],
  'car_models': [{'model_id': 4,
    'manufacturer_id': 3,
    'model_name': 'F-150',
    'model_year': 2021},
   {'model_id': 5,
    'manufacturer_id': 4,
    'model_name': '3 Series',
    'model_year': 2020},
   {'model_id': 6,
    'manufacturer_id': 5,
    'model_name': 'Silverado',
    'model_year': 2021},
   {'model_id': 7,
    'manufacturer_id': 3,
    'model_name': 'Escape',
    'model_year': 2019}],
  'parts': [{'part_id': 6,
    'category_id': 4,
    'part_number': 'ELC-ALT-FD21',
    'description': 'Alternator assembly (Ford F-150 2021-)'},
   {'part_id': 7,
    'category_id': 5,
    'part_number': 'TRN-6SPD-GM21',
    'description': '6-speed automatic transmission (Chevrolet Silverado 2021-)'},
   {'part_id': 8,
    'category_id': 6,
    'part_number': 'EXH-MUF-BMW20',
    'description': 'Muffler assembly (BMW 3 Series 2020-2023)'},
   {'part_id': 9,
    'category_id': 4,
    'part_number': 'ELC-BAT-FD19',
    'description': '12V battery (Ford Escape 2019-2021)'},
   {'part_id': 10,
    'category_id': 5,
    'part_number': 'TRN-CVT-FD19',
    'description': 'CVT transmission (Ford Escape 2019-2021)'}],
  'car_model_parts': [{'model_id': 4, 'part_id': 6, 'qty': 1},
   {'model_id': 4, 'part_id': 9, 'qty': 1},
   {'model_id': 5, 'part_id': 8, 'qty': 1},
   {'model_id': 6, 'part_id': 7, 'qty': 1},
   {'model_id': 7, 'part_id': 9, 'qty': 1},
   {'model_id': 7, 'part_id': 10, 'qty': 1},
   {'model_id': 6, 'part_id': 7, 'qty': 2}]},
 'corrupted_data': {'manufacturers': [{'manufacturer_id': 3, 'name': 'Ford'},
   {'manufacturer_id': 4},
   {'manufacturer_id': 5, 'name': 'Chevrolet'},
   {'name': 'Toyota Motor Corp'}],
  'part_categories': [{'category_id': 4, 'category_name': 'Electrical'},
   {'category_id': 5},
   {'category_id': 6, 'category_name': 'Exhaust'}],
  'car_models': [{'model_id': 4,
    'manufacturer_id': 3,
    'model_name': 'F-150',
    'model_year': 2021},
   {'model_id': 5, 'manufacturer_id': 4, 'model_name': '3 Series'},
   {'model_id': 6,
    'manufacturer_id': 5,
    'model_name': 'Silverado',
    'model_year': 2021},
   {'model_id': 7, 'model_name': 'Escape', 'model_year': 2019}],
  'parts': [{'part_id': 6,
    'category_id': 4,
    'part_number': 'ELC-ALT-FD21',
    'description': 'Alternator assembly (Ford F-150 2021-)'},
   {'part_id': 7,
    'category_id': 5,
    'description': '6-speed automatic transmission (Chevrolet Silverado 2021-)'},
   {'part_id': 8,
    'category_id': 6,
    'part_number': 'EXH-MUF-BMW20',
    'description': {'partDetails': {'fitment': {'vehicle': {'make': 'BMW',
        'model': '3 Series',
        'year': 2020}},
      'package': {'dimensions': {'length': {'value': 120, 'unit': 'cm'},
        'width': {'value': 40, 'unit': 'cm'},
        'height': {'value': 30, 'unit': 'cm'}}}}}},
   {'part_id': 9,
    'part_number': 'ELC-BAT-FD19',
    'description': '12V battery (Ford Escape 2019-2021)'},
   {'part_id': 10,
    'category_id': 5,
    'part_number': 'TRN-CVT-FD19',
    'description': 'CVT transmission (Ford Escape 2019-2021)'}],
  'car_model_parts': [{'model_id': 4, 'part_id': 6, 'qty': 1},
   {'model_id': 4, 'part_id': 9, 'qty': 1},
   {'model_id': 5, 'part_id': 8},
   {'model_id': 6, 'part_id': 7, 'qty': 1},
   {'model_id': 7, 'part_id': 9, 'qty': 1},
   {'part_id': 10, 'qty': 1},
   {'model_id': 6, 'part_id': 7, 'qty': 2}]},
 'fault_labels': [{'severity': <Severity.BLOCKER: 'blocker'>,
   'fix': <FixType.ASK_VALUE: 'ask_value'>,
   'target': "Removed NOT NULL columns in several tables: manufacturers.name absent for manufacturer_id=4 and manufacturers.manufacturer_id missing for new row 'Toyota Motor Corp'; part_categories.category_name omitted for category_id=5; car_models.model_year omitted for model_id=5 and manufacturer_id omitted for model_id=7; parts.part_number missing for part_id=7 and parts.category_id missing for part_id=9; car_model_parts.qty omitted for (model_id=5, part_id=8) and car_model_parts.model_id omitted for row with part_id=10.",
   'violation_type': 'required_fields_missing'},
  {'severity': <Severity.INFO: 'info'>,
   'fix': <FixType.AUTO: 'auto'>,
   'target': 'Replaced the flat TEXT value in parts.description for part_id=8 with a 5-level nested JSON structure (partDetails → fitment → vehicle and partDetails → package → dimensions → length/width/height) to introduce excessive nesting.',
   'violation_type': 'nested_relationship_depth'}],
 'expected_detections': 2}
agent_result['detected_faults']
[DetectedFault(severity='blocker', fix='ask_value', target='manufacturers: row with manufacturer_id=4 is missing required field `name` (NOT NULL).'),
 DetectedFault(severity='blocker', fix='ask_value', target='part_categories: row with category_id=5 is missing required field `category_name` (NOT NULL).'),
 DetectedFault(severity='blocker', fix='ask_value', target='car_models: row with model_id=5 is missing required field `model_year` (NOT NULL).'),
 DetectedFault(severity='blocker', fix='ask_value', target='car_models: row with model_id=7 is missing required field `manufacturer_id` (NOT NULL / FK to manufacturers).'),
 DetectedFault(severity='blocker', fix='ask_value', target='parts: row with part_id=7 is missing required field `part_number` (NOT NULL).'),
 DetectedFault(severity='blocker', fix='ask_value', target='parts: row with part_id=9 is missing required field `category_id` (NOT NULL / FK to part_categories).'),
 DetectedFault(severity='risky', fix='ask_value', target='parts: row with part_id=8 has `description` as an object, expected TEXT.'),
 DetectedFault(severity='risky', fix='ask_value', target='car_model_parts: duplicate combination (model_id=6, part_id=7) appears with qty=1 and qty=2.'),
 DetectedFault(severity='blocker', fix='ask_value', target='car_model_parts: row {"part_id": 10, "qty": 1} is missing required field `model_id` (NOT NULL / FK).')]

Summary

This is a very abrupt end to the notebook. It was at this point, seeing the valid yet ill-formatted response from our agent, that I realized this notebook was going to be longer than I anticipated and needs some rework. We haven't even got into the meat of using DSPy yet - this will come.

Current Known Limitations

  • When generating the 'clean' data in step 1, we are assuming the model does maintain referential integrity. This may work for small tables and datasets, but would need some 'verifier' to really trust the cleanliness of output. You could do this a couple ways: actually try to load the json into SQL (and then delete),
  • The database schema itself that we start with is static. In the next iteration, we would add a pre step to generate different schemas with different constraints.

Issues with Current Code

  • The ground-truth labels are root-cause (“orphaned_child_records”, “flatten_data”).
  • The agent sees only the symptoms in the JSON: FK violations, duplicate columns, etc.
    • Nothing in the prompt tells it to invent a higher-level abstraction (“orphaned_child_records”) or to coalesce many row-level violations into one label.
  • We expect the agent to emit severity and fix type as well, but those are policy decisions that require extra domain knowledge – they are not directly observable from the data.

NOTE: We also need to consider how we accept new data. Ie. instead of classifying something as 'out of scope', we assume its new data that needs to be included in the database and may require

Other Considerations

After reviewing this code again, I entertained the thought that maybe our corrupted_data generation was actually too broad. If we delete a foreign key reference column from the JSON data, that affects every row in the JSON table. Conversely, the data inspection agent seems to look at each 'item' of JSON and notices the error for each row without consideration of a larger 'theme' for why this error exists. Therefore if we tried to corrupt the data such that each 'affected' line was output as a 'fault' for the data agent to predict, it might make this easier.

In practice, this becomes more complicated - since the corruption exercise needs to take a broad corruption or transformation, apply it, then go through the result to output each data point impacted. This is a lot of 'napkin math' (aka. context) we are relying on the LLM to keep track of, which is not their strong suit (hence the concept of 'context engineering'). Instead we should still stick with the approach of doing a 2-stage data agent: first step is to ID all data issues, the second is to roll them up to broad issues we can score against. This is still 'inefficient' as I'd love for this to be a single step.

Next Steps

  1. Remove trying to pre-define the severity and impact. These are policy decisions - when the policy is defined, we can pass it along with the data and schema to an LLM and reasonably expect it to get these right. o3 was correct in pointing this out. This means the data generation step is simply to introduce the errors as we have done, just without the severity and fix - so not too much wasted code.
  2. Break out the data agent into 3 steps:
    1. Detection (objective, data-driven)
      • Task: enumerate every concrete defect that can be verified mechanically (missing parent row, unexpected column, duplicate PK, etc.).
      • Output: DetectedIssue{table, column/row reference, description}.
      • NOTE: unless we change something in the corruption generation code, this step won't have a metric (making DSPy less helpful). However if we did update the corruption module to 'break-down' the changes into their 'component parts' then we could assess score?
    2. Root-cause grouping (heuristic / LLM)
      • Input: the Stage A list.
      • Task: cluster issues that share a causal pattern (e.g. “all these FK errors stem from missing part_categories.category_id=5”) and assign a human-readable group label.
      • This is where terms like “orphaned_child_records” or “flatten_data” live and thus we can use metrics to improve prompts
      • Good place for 'few-shot' examples
    3. Impact & remediation classification (policy)
      • Input: grouped root causes from Stage B.
      • Task: map to Severity ∈ {blocker, risky, …}, FixType ∈ {auto, ask_value, …}.
      • Can be a rule table or another small model; decoupling lets you tweak policy without retraining detection.
      • See above note about outlining our policy about how we would want a model to handle this

Future Directions

  • Improving the scoring
  • Feedback loop (incorporate human responses to new plan)
  • Vary database schema and introduce more tables for added complexity; run 'clean' generated data to SQL to ensure it was defined properly

Notes on Synthetic SQL Data Limitations

Notes from a HN post that describes gotchas when generating synthetic data:

b0a04gl:

seen this pattern a before too. faker holds shape without flow. real tables come from actions : retry, decline, manual review, all that. you just set col types, you might miss why the row even happened. gen needs to simulate behavior, not format

Replies

ajd555:

Was looking for this exact comment. I completely agree with this method, especially if you're testing an entire flow, and not just a UI tool. You want to test the service that interfaces between the API and the database.

I've been writing custom simulation agents (just simple go programs) that simulate different users of my system. I can scale appropriately and see test data flow in. If metabase could generate these simulation agents based on a schema and some instructions, now that would be quite neat! Good job on this first version of the tool, though!

matthewhefferon:

That's a solid callout, appreciate you pointing it out. I'll definitely dig into that more.

tomrod:

The best synthetic data are those that capture ingestion and action, instead of just relationship.

Relationship is important, but your data structure might capture a virtually infinite number of unexpected behaviors that you would preferably call errors or bugs.