Monday, February 17, 2025

Delta Lake

Delta Lake is an open-source storage layer that brings ACID transactions, scalable metadata handling, and unified batch + streaming processing to big data workloads on top of Apache Spark and cloud object storage (like Azure Data Lake Storage, AWS S3, or GCS).


Key Features of Delta Lake

FeatureDescription
ACID TransactionsGuarantees data consistency through commits and rollbacks.
Schema EvolutionAutomatically handles changes in data schema during writes.
Time TravelQuery older versions of data using versioning.
Unified Batch + StreamingUse the same Delta table for both batch and real-time data processing.
Scalable MetadataSupports billions of files and petabytes of data efficiently.
Data ReliabilityEnforces schema, prevents partial or dirty writes.


🏗️ Delta Architecture: Bronze, Silver, Gold

  1. Bronze Layer: Raw ingestion (streaming or batch from sources like Kafka, Event Hubs)

  2. Silver Layer: Cleaned and transformed data

  3. Gold Layer: Aggregated and business-ready data (used in dashboards)


Bronze Layer (Raw / Ingest)

Goal: Immutable as-received capture of source data with minimal transformation. Establish lineage and replayability.

Characteristics

  • Schema: Often semi-structured (JSON, CSV, Avro) stored as ingested; may include _ingest_ts, _source_file, _source_system.

  • Data Quality: Not validated beyond basic ingestion success.

  • Storage Pattern: Partition by ingest date/time (e.g., ingest_date=YYYY-MM-DD) to simplify retention & replay.

  • Use Cases: Reprocessing, audit, debugging, forensic analysis, schema drift detection.

  • Append-only writes from streaming (readStream from Event Hubs/Kafka; Auto Loader for files).


Silver Layer (Clean / Conform / Enrich)

Goal: Turn raw data into analytics-grade canonical entities that are trustworthy and joinable across domains.

Transforms Typically Applied

StepExamples
Data CleansingDrop corrupt rows; parse JSON; enforce data types.
DeduplicationUse event IDs, hashes, or window-based dedupe.
NormalizationExplode arrays; flatten nested structures.
ConformanceStandardize units, currencies, time zones (UTC), enums.
Joins / EnrichmentLookup dimension tables (users, products, geo).
Watermark + Late Data HandlingStructured Streaming with withWatermark to discard/mark very late events.

Table Modeling

  • Often entity-level (e.g., silver.sales_orders, silver.user_profile).

  • Partition on business date (event_date, transaction_date) when high volume.

  • Use MERGE INTO for CDC updates (upsert SCD Type 1/2 patterns).


Gold Layer (Curated / Business & Analytics)

Goal: High-trust, consumption-optimized data products: aggregates, KPIs, dimensional models, ML feature tables.

Patterns

Consumption StyleModeling ApproachNotes
BI ReportingStar/Snowflake (Fact + Dim tables)Fast ad hoc BI (Power BI / Synapse).
Metrics/KPIsPre-aggregated summary tablesDaily/Hourly rollups, incremental refresh.
ML FeaturesFeature store–style Delta tablesPoint-in-time correctness; training vs inference views.
Data SharingCleaned, governed shareable tablesUnity Catalog + Delta Sharing.

Example Incremental ETL Flow (PySpark)

The Delta table is physically stored in your ADLS Gen2 container, mounted under /mnt/datalake/

# Bronze -> Silver incremental clean
bronze_path = "/mnt/datalake/bronze/events"
silver_table = "refined.silver.events_clean"

bronze_stream = (
  spark.readStream
       .format("delta")
       .load(bronze_path)
)

cleaned = (
  bronze_stream
    .filter("body IS NOT NULL")
    .selectExpr("cast(body as string) as json_str", "ingest_ts")
    .select(from_json("json_str", event_schema).alias("e"), "ingest_ts")
    .select("e.*", "ingest_ts")
    .dropDuplicates(["eventId"])
)

(cleaned.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/mnt/datalake/checkpoints/silver/events_clean")
    .toTable(silver_table))


Silver → Gold Aggregation (Triggered Batch Refresh)

gold_table = "curated.gold.daily_event_metrics"
silver_df = spark.table("refined.silver.events_clean") daily = ( silver_df .groupBy("event_date", "channel") .agg(count("*").alias("event_count"), countDistinct("userId").alias("unique_users")) ) (daily.write .format("delta") .mode("overwrite") .option("overwriteSchema", "true") .saveAsTable(gold_table))

Apache Iceberg

Work in Progress