pyos_data_validation
Overview
pyos_data_validation is a lightweight Python package for defining, validating, and comparing data contracts for tabular datasets. A data contract captures assumptions about a dataset’s schema, data types, missingness, numeric ranges, and categorical values.
The package supports a simple, reproducible workflow:
- Infer a contract from a reference dataset
- Validate new data against the contract
- Compare contracts to detect schema or distribution drift
- Summarize validation failures for debugging and CI use
Installation
Install from TestPyPI:
pip install -i https://test.pypi.org/simple/ pyos-data-validationOr install from source for development:
git clone https://github.com/UBC-MDS/DSCI_524_G26_Data_Validation.git
cd DSCI_524_G26_Data_Validation
pip install -e .Quick Start
Here’s a complete workflow demonstrating all four core functions:
import pandas as pd
from pyos_data_validation import (
infer_contract,
validate_contract,
compare_contracts,
summarize_violations,
)
# Step 1: Create training data
training_data = pd.DataFrame({
"age": [25, 40, 35, 28, 45],
"salary": [50000, 75000, 62000, 55000, 80000],
"department": ["HR", "Engineering", "HR", "Sales", "Engineering"]
})
# Step 2: Infer a contract from the training data
contract = infer_contract(training_data)
print("Contract inferred successfully!")
print(f"Columns in contract: {list(contract.columns.keys())}")Validating Data
Validate new data against the contract:
# Valid data - passes all checks
valid_data = pd.DataFrame({
"age": [30, 42],
"salary": [58000, 72000],
"department": ["HR", "Sales"]
})
result = validate_contract(valid_data, contract)
print(f"Validation passed: {result.ok}")
print(f"Issues found: {len(result.issues)}")Now let’s see what happens with invalid data:
# Invalid data - multiple violations
invalid_data = pd.DataFrame({
"age": [30, 55, 22], # 55 and 22 outside range
"salary": [58000, 72000, 45000], # 45000 below minimum
"department": ["HR", "Sales", "Marketing"], # Marketing not in contract
"bonus": [5000, 8000, 3000] # Extra column
})
result = validate_contract(invalid_data, contract)
print(f"Validation passed: {result.ok}")
print(f"Issues found: {len(result.issues)}")
# Show first few issues
for issue in result.issues[:3]:
print(f" - {issue.column}: {issue.kind}")Summarizing Violations
Get an actionable summary of the most critical issues:
summary = summarize_violations(result, top_k=3)
print(f"Overall status: {'PASS' if summary.ok else 'FAIL'}")
print(f"\nTop {len(summary.top_issues)} critical issues:")
for issue in summary.top_issues:
print(f" - {issue.column}: {issue.kind}")
print(f"\nIssue counts by type:")
for kind, count in summary.counts_by_kind.items():
print(f" {kind}: {count}")Comparing Contracts (Drift Detection)
Detect changes between data versions:
# New data with different characteristics
new_data = pd.DataFrame({
"age": [25, 40, 50, 60], # Expanded age range
"salary": [50000, 75000, 90000, 110000], # Higher salaries
"department": ["HR", "Engineering", "Finance", "Engineering"], # New dept
"location": ["NYC", "SF", "NYC", "Austin"] # New column
})
new_contract = infer_contract(new_data)
drift_report = compare_contracts(contract, new_contract)
print(f"Schema drift detected: {drift_report.has_schema_drift}")
print(f"Distribution drift detected: {drift_report.has_distribution_drift}")
if drift_report.added_columns:
print(f"New columns: {drift_report.added_columns}")
if drift_report.distribution_changes:
print(f"Distribution changes in: {list(drift_report.distribution_changes.keys())}")Detailed Documentation
For complete documentation of each function including all parameters, return types, and additional examples:
- infer_contract - Learn data contracts from DataFrames
- validate_contract - Check data against contracts
- compare_contracts - Detect schema and distribution drift
- summarize_violations - Prioritize validation failures
See the complete API Reference for all functions and types.
Use Cases
CI/CD Integration
Use in automated testing:
def test_data_quality():
expected_contract = infer_contract(reference_data)
result = validate_contract(new_data, expected_contract)
if not result.ok:
summary = summarize_violations(result)
raise AssertionError(
f"Data validation failed with {len(summary.top_issues)} critical issues"
)Data Pipeline Monitoring
Monitor data drift over time:
baseline_contract = infer_contract(baseline_data)
for batch in data_batches:
current_contract = infer_contract(batch)
drift = compare_contracts(baseline_contract, current_contract)
if drift.has_schema_drift:
alert_team("Schema drift detected!")
if drift.has_distribution_drift:
log_drift_metrics(drift.distribution_changes)Contributing
We welcome contributions! Check out:
License
MIT License - see LICENSE for details.