pyos_data_validation

Overview

pyos_data_validation is a lightweight Python package for defining, validating, and comparing data contracts for tabular datasets. A data contract captures assumptions about a dataset’s schema, data types, missingness, numeric ranges, and categorical values.

The package supports a simple, reproducible workflow:

  1. Infer a contract from a reference dataset
  2. Validate new data against the contract
  3. Compare contracts to detect schema or distribution drift
  4. Summarize validation failures for debugging and CI use

Installation

Install from TestPyPI:

pip install -i https://test.pypi.org/simple/ pyos-data-validation

Or install from source for development:

git clone https://github.com/UBC-MDS/DSCI_524_G26_Data_Validation.git
cd DSCI_524_G26_Data_Validation
pip install -e .

Quick Start

Here’s a complete workflow demonstrating all four core functions:

import pandas as pd
from pyos_data_validation import (
    infer_contract,
    validate_contract,
    compare_contracts,
    summarize_violations,
)

# Step 1: Create training data
training_data = pd.DataFrame({
    "age": [25, 40, 35, 28, 45],
    "salary": [50000, 75000, 62000, 55000, 80000],
    "department": ["HR", "Engineering", "HR", "Sales", "Engineering"]
})

# Step 2: Infer a contract from the training data
contract = infer_contract(training_data)
print("Contract inferred successfully!")
print(f"Columns in contract: {list(contract.columns.keys())}")

Validating Data

Validate new data against the contract:

# Valid data - passes all checks
valid_data = pd.DataFrame({
    "age": [30, 42],
    "salary": [58000, 72000],
    "department": ["HR", "Sales"]
})

result = validate_contract(valid_data, contract)
print(f"Validation passed: {result.ok}")
print(f"Issues found: {len(result.issues)}")

Now let’s see what happens with invalid data:

# Invalid data - multiple violations
invalid_data = pd.DataFrame({
    "age": [30, 55, 22],  # 55 and 22 outside range
    "salary": [58000, 72000, 45000],  # 45000 below minimum
    "department": ["HR", "Sales", "Marketing"],  # Marketing not in contract
    "bonus": [5000, 8000, 3000]  # Extra column
})

result = validate_contract(invalid_data, contract)
print(f"Validation passed: {result.ok}")
print(f"Issues found: {len(result.issues)}")

# Show first few issues
for issue in result.issues[:3]:
    print(f"  - {issue.column}: {issue.kind}")

Summarizing Violations

Get an actionable summary of the most critical issues:

summary = summarize_violations(result, top_k=3)

print(f"Overall status: {'PASS' if summary.ok else 'FAIL'}")
print(f"\nTop {len(summary.top_issues)} critical issues:")
for issue in summary.top_issues:
    print(f"  - {issue.column}: {issue.kind}")

print(f"\nIssue counts by type:")
for kind, count in summary.counts_by_kind.items():
    print(f"  {kind}: {count}")

Comparing Contracts (Drift Detection)

Detect changes between data versions:

# New data with different characteristics
new_data = pd.DataFrame({
    "age": [25, 40, 50, 60],  # Expanded age range
    "salary": [50000, 75000, 90000, 110000],  # Higher salaries
    "department": ["HR", "Engineering", "Finance", "Engineering"],  # New dept
    "location": ["NYC", "SF", "NYC", "Austin"]  # New column
})

new_contract = infer_contract(new_data)
drift_report = compare_contracts(contract, new_contract)

print(f"Schema drift detected: {drift_report.has_schema_drift}")
print(f"Distribution drift detected: {drift_report.has_distribution_drift}")

if drift_report.added_columns:
    print(f"New columns: {drift_report.added_columns}")
if drift_report.distribution_changes:
    print(f"Distribution changes in: {list(drift_report.distribution_changes.keys())}")

Detailed Documentation

For complete documentation of each function including all parameters, return types, and additional examples:

See the complete API Reference for all functions and types.


Use Cases

CI/CD Integration

Use in automated testing:

def test_data_quality():
    expected_contract = infer_contract(reference_data)
    result = validate_contract(new_data, expected_contract)
    
    if not result.ok:
        summary = summarize_violations(result)
        raise AssertionError(
            f"Data validation failed with {len(summary.top_issues)} critical issues"
        )

Data Pipeline Monitoring

Monitor data drift over time:

baseline_contract = infer_contract(baseline_data)

for batch in data_batches:
    current_contract = infer_contract(batch)
    drift = compare_contracts(baseline_contract, current_contract)
    
    if drift.has_schema_drift:
        alert_team("Schema drift detected!")
    if drift.has_distribution_drift:
        log_drift_metrics(drift.distribution_changes)

Contributing

We welcome contributions! Check out:

License

MIT License - see LICENSE for details.