This is a data engineering pipeline built on Databricks + Delta Lake + PySpark that ingests travel booking and customer master data, applies SCD Type 2 logic, and delivers analytics-ready tables. It includes data quality enforcement, dimension versioning, fact aggregation, and performance tuning.
CSV Files → Bronze Layer → Data Quality → Silver Layer → Analytics
↓ ↓ ↓ ↓ ↓
Raw Data Raw Ingestion DQ Validation SCD2 Dims Fact Tables
Customer + Metadata + PyDeequ + Surrogate + Aggregations
Booking + Audit Trail + Logging Keys + Analytics
- Bronze Layer: Raw data ingestion with metadata enrichment
- Silver Layer: SCD2 dimensions and aggregated fact tables
- Gold Layer: Analytics and reporting tables (via SQL queries)
Travel_Booking_SCD2_Merge_Project/
├── notebooks/
│ ├── 01_input_validation.ipynb # Input validation and audit logging
│ ├── 02_ingest_bookings_bronze.ipynb # Booking data ingestion
│ ├── 03_ingest_customers_bronze.ipynb # Customer data ingestion
│ ├── 04_bookings_dq.ipynb # Booking data quality checks
│ ├── 05_customers_dq.ipynb # Customer data quality checks
│ ├── 06_customers_dim_scd2.ipynb # SCD2 customer dimension
│ ├── 07_booking_fact.ipynb # Booking fact table
│ ├── 08_zorder_optimize.ipynb # Table optimization
│ └── 09_analyze_stats.ipynb # Statistics analysis
├── queries/
│ ├── travel_booking_init.sql # Schema initialization
│ ├── customer360.sql # Customer analytics
│ ├── daily_revenue.sql # Revenue analytics
│ ├── data_quality_summary.sql # DQ reporting
│ └── log_completion_flow.sql # Pipeline completion
├── booking_data/ # Sample booking CSV files
├── customer_data/ # Sample customer CSV files
└── imgs/
└── orchestration_screenshots
└── README.md # This file
- Files:
bookings_YYYY-MM-DD.csv - Purpose: Daily booking transaction records
- Schema:
booking_id,customer_id,booking_type,amount,discount,quantity,booking_date
- Files:
customers_YYYY-MM-DD.csv - Purpose: Customer master data with potential attribute changes
- Schema:
customer_id,customer_name,customer_address,email
- Purpose: Validates input parameters and file existence
- Features:
- Parameter extraction with defaults
- File existence validation
- Audit logging setup
- Run tracking
- Purpose: Raw booking data ingestion
- Features:
- CSV parsing with schema inference
- Metadata enrichment (ingestion_time, business_date)
- Delta table storage with append mode
- Purpose: Raw customer data ingestion with SCD2 preparation
- Features:
- SCD2 temporal columns (valid_from, valid_to, is_current)
- Business date partitioning
- Delta table storage
- Purpose: Comprehensive booking data quality checks
- Checks:
- Data existence (row count > 0)
- Completeness (customer_id, amount)
- Non-negativity (amount, quantity, discount)
- Framework: PyDeequ with error-level validation
- Purpose: Customer data quality validation
- Checks:
- Data existence and completeness
- Email format validation
- Required field validation
- Purpose: Implements SCD2 for customer dimension
- Features:
- Surrogate key generation (IDENTITY column)
- Historical version tracking
- Merge logic for attribute changes
- Temporal column management
- Purpose: Creates aggregated booking fact table
- Features:
- Daily grain aggregation
- Customer surrogate key integration
- Financial calculations (amount - discount)
- Idempotent merge operations
- Purpose: Optimizes table performance
- Features:
- Z-ORDER BY clustering
- VACUUM operations
- Performance tuning
- Purpose: Updates table statistics
- Features:
- ANALYZE TABLE commands
- Query optimization support
- Historical Tracking: Maintains all versions of customer attributes
- Surrogate Keys: Auto-generated identity columns for performance
- Temporal Columns: valid_from, valid_to, is_current for time-based queries
- Merge Logic: Closes old versions and creates new ones for changes
- PyDeequ Integration: Comprehensive DQ checks with configurable levels
- Audit Logging: All DQ results stored for monitoring and reporting
- Error Handling: Pipeline stops on DQ failures to prevent bad data
- ACID Properties: Transactional consistency across operations
- Schema Evolution: Automatic schema updates with mergeSchema
- Time Travel: Historical data access capabilities
- Optimization: Z-ORDER and VACUUM for performance
- Widget-based: Configurable parameters via Databricks widgets
- Default Values: Sensible defaults for all parameters
- Flexibility: Easy customization for different environments
- Databricks workspace with Unity Catalog
- PyDeequ library installed
- Appropriate permissions for table creation
# Upload sample CSV files to Volumes
# Structure: /Volumes/{catalog}/{schema}/data/
# ├── booking_data/
# │ └── bookings_YYYY-MM-DD.csv
# └── customer_data/
# └── customers_YYYY-MM-DD.csv# Set widget parameters (optional - defaults provided)
dbutils.widgets.text("arrival_date", "2025-01-15")
dbutils.widgets.text("catalog", "travel_bookings")
dbutils.widgets.text("schema", "default")
dbutils.widgets.text("base_volume", "/Volumes/travel_bookings/default/data")# Run notebooks in sequence:
# 1. 01_input_validation.ipynb
# 2. 02_ingest_bookings_bronze.ipynb
# 3. 03_ingest_customers_bronze.ipynb
# 4. 04_bookings_dq.ipynb
# 5. 05_customers_dq.ipynb
# 6. 06_customers_dim_scd2.ipynb
# 7. 07_booking_fact.ipynb
# 8. 08_zorder_optimize.ipynb
# 9. 09_analyze_stats.ipynbCREATE TABLE customer_dim (
customer_sk BIGINT GENERATED ALWAYS AS IDENTITY, -- Surrogate key
customer_id INT, -- Natural key
customer_name STRING,
customer_address STRING,
email STRING,
valid_from DATE, -- SCD2 start date
valid_to DATE, -- SCD2 end date
is_current BOOLEAN -- Current version flag
)CREATE TABLE booking_fact (
booking_type STRING,
customer_id INT, -- Natural key
customer_sk BIGINT, -- Surrogate key
business_date DATE, -- Daily grain
total_amount_sum DOUBLE, -- Aggregated amount
total_quantity_sum BIGINT -- Aggregated quantity
)- Compares current dimension records with incoming data
- Identifies changes in: customer_name, customer_address, email
- Ignores changes in: customer_id (natural key)
- Close Old Version: Update valid_to and is_current for changed records
- Create New Version: Insert new record with updated attributes
- Surrogate Key: Auto-generated for new versions
-- Current customer data
SELECT * FROM customer_dim WHERE is_current = true
-- Historical customer data
SELECT * FROM customer_dim
WHERE customer_id = 123
AND '2025-01-15' BETWEEN valid_from AND valid_to
-- Customer changes over time
SELECT customer_id, customer_name, valid_from, valid_to
FROM customer_dim
WHERE customer_id = 123
ORDER BY valid_fromCREATE TABLE ops.dq_results (
business_date DATE,
dataset STRING,
check_name STRING,
status STRING,
constraint STRING,
message STRING,
recorded_at TIMESTAMP
)-- DQ failure summary
SELECT business_date, dataset, COUNT(*) as failed_checks
FROM ops.dq_results
WHERE status = 'Failure'
GROUP BY business_date, dataset
-- DQ trend analysis
SELECT business_date,
SUM(CASE WHEN status = 'Success' THEN 1 ELSE 0 END) as passed,
SUM(CASE WHEN status = 'Failure' THEN 1 ELSE 0 END) as failed
FROM ops.dq_results
GROUP BY business_date
ORDER BY business_date-- Optimize fact table for common query patterns
OPTIMIZE booking_fact ZORDER BY (business_date, customer_sk)
-- Optimize dimension table for lookups
OPTIMIZE customer_dim ZORDER BY (customer_id, is_current)-- Update statistics for query optimization
ANALYZE TABLE booking_fact COMPUTE STATISTICS
ANALYZE TABLE customer_dim COMPUTE STATISTICSThe pipeline is designed to be executed as Databricks workflows:
- Tasks: Sequential execution of all notebooks
- Dependencies: Each notebook depends on previous completion
- Error Handling: Stop on failure with detailed logging
- Tasks: Execution of SQL queries for reporting
- Trigger: Triggered by Workflow 1 completion
- Output: Analytics tables and reports
{
"name": "travel_booking_pipeline",
"tasks": [
{
"task_key": "validate_inputs",
"notebook_task": {
"notebook_path": "/notebooks/validate_inputs"
}
},
{
"task_key": "ingest_bronze",
"depends_on": [{"task_key": "validate_inputs"}],
"notebook_task": {
"notebook_path": "/notebooks/10_ingest_bookings_bronze"
}
}
]
}- Comprehensive validation at multiple stages
- Automated DQ monitoring and alerting
- Graceful error handling and logging
- Z-ORDER clustering for common query patterns
- Statistics updates for query optimization
- Efficient merge operations with Delta Lake
- Clear naming conventions and documentation
- Parameterized notebooks for flexibility
- Modular design for easy updates
- Incremental processing by business date
- Delta Lake for large-scale data processing
- Optimized storage and query performance
- Symptom:
FileNotFoundErrorin validate_inputs - Solution: Verify file paths and ensure CSV files exist
- Symptom: Merge operations fail in customer_dim
- Solution: Check for duplicate customer_ids or schema mismatches
- Symptom: PyDeequ checks fail
- Solution: Review source data quality and adjust DQ rules
- Check run_log table for pipeline status
- Review DQ results for data quality issues
- Validate input data format and content
- Check Unity Catalog permissions
- Machine learning integration for booking predictions
- Customer segmentation and behavior analysis
- Revenue forecasting and trend analysis
- Streaming data ingestion with Delta Live Tables
- Real-time SCD2 processing
- Event-driven architecture
- Column-level security and masking
- Data lineage tracking
- Compliance and audit frameworks
This Travel Booking SCD2 Merge Project demonstrates a production-ready data engineering pipeline with comprehensive SCD2 implementation, data quality validation, and performance optimization. The pipeline provides a solid foundation for travel booking data processing with built-in monitoring, audit trails, and scalability features.
For questions or issues with this pipeline, please refer to the Databricks documentation or contact the data engineering team.