How to implement Databricks Delta Live Tables in three easy steps?

This is part two of a series of blogs for Databricks Delta Live tables.In part one of the blog we have discussed the basic concepts and terminology related to Databricks Delta Live tables. In this blog, we will learn how to implement Databricks Delta Live Table in three easy steps.

We will implement the different concepts and terminology used in Delta Live tables: Pipelines, Settings, Datasets, Pipeline Modes, and different editions of Delta Live Pipelines.

Databricks Delta Live Table can be implemented in three easy steps. We will take an example and go thru sample code which you can copy and run in your own Databrick environment.

Step 1: Design the Lakehouse zones

First, we need to design all the layers for the Lakehouse platform:

  • Bronze: It contains the raw data as it is received for audit purposes to trace back to the data sources.
  • Silver: This zone filters and cleans the data from the Bronze zone. Essentially it handles the missing data and standardizes clean fields. If there are nested objects it converts them into flat structures for ease of querying. It also renames the columns to friendly names so it is well understood.
  • Gold: When the data reaches the Gold zone it creates the business-specific model and aggregates it based on Dimesntions and Facts. It also provides business-friendly names and creates views for business users can access these views.

In the below code I have created these zones.

--Mapping zone containing the Databricks Customer dataset from csv file
--Mapping Zone
CREATE INCREMENTAL LIVE TABLE customers
COMMENT "The customers buying finished products, ingested from /databricks-datasets."
TBLPROPERTIES ("myCompanyPipeline.quality" = "mapping")
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv");
-- Sales order Databricks JSON dataset
--Bronze Zone
CREATE INCREMENTAL LIVE TABLE sales_orders_raw
COMMENT "The raw sales orders, ingested from /databricks-datasets."
TBLPROPERTIES ("myCompanyPipeline.quality" = "bronze")
AS
SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json", map("cloudFiles.inferColumnTypes", "true"))
-- Sales Order cleaned Silver zone created after dropping invalid sales order (Where order number is NULL) numbers
-- This dataset is partitioned by date and created with join from customers and sales order datasets created earlier
CREATE INCREMENTAL LIVE TABLE sales_orders_cleaned(
  CONSTRAINT valid_order_number EXPECT (order_number IS NOT NULL) ON VIOLATION DROP ROW
)
PARTITIONED BY (order_date)
COMMENT "The cleaned sales orders with valid order_number(s) and partitioned by order_datetime."
TBLPROPERTIES ("myCompanyPipeline.quality" = "silver")
AS
SELECT f.customer_id, f.customer_name, f.number_of_line_items,
  TIMESTAMP(from_unixtime((cast(f.order_datetime as long)))) as order_datetime,
  DATE(from_unixtime((cast(f.order_datetime as long)))) as order_date,
  f.order_number, f.ordered_products, c.state, c.city, c.lon, c.lat, c.units_purchased, c.loyalty_segment
  FROM STREAM(LIVE.sales_orders_raw) f
  LEFT JOIN LIVE.customers c
      ON c.customer_id = f.customer_id
     AND c.customer_name = f.customer_name
-- Sales Order cleaned Silver zone created after dropping invalid sales order (Where order number is NULL) numbers
-- This dataset is partitioned by date and created with join from customers and sales order datasets created earlier
CREATE INCREMENTAL LIVE TABLE sales_orders_cleaned(
  CONSTRAINT valid_order_number EXPECT (order_number IS NOT NULL) ON VIOLATION DROP ROW
)
PARTITIONED BY (order_date)
COMMENT "The cleaned sales orders with valid order_number(s) and partitioned by order_datetime."
TBLPROPERTIES ("myCompanyPipeline.quality" = "silver")
AS
SELECT f.customer_id, f.customer_name, f.number_of_line_items,
  TIMESTAMP(from_unixtime((cast(f.order_datetime as long)))) as order_datetime,
  DATE(from_unixtime((cast(f.order_datetime as long)))) as order_date,
  f.order_number, f.ordered_products, c.state, c.city, c.lon, c.lat, c.units_purchased, c.loyalty_segment
  FROM STREAM(LIVE.sales_orders_raw) f
  LEFT JOIN LIVE.customers c
      ON c.customer_id = f.customer_id
     AND c.customer_name = f.customer_name
--Gold Zone where we are retrieving the Sales order from Los Angels ( Filter conditions)
-- We are using cleaned slaes order dataset to get it
CREATE LIVE TABLE sales_order_in_la
COMMENT "Sales orders in LA."
TBLPROPERTIES ("myCompanyPipeline.quality" = "gold")
AS
SELECT city, order_date, customer_id, customer_name, ordered_products_explode.curr, SUM(ordered_products_explode.price) as sales, SUM(ordered_products_explode.qty) as qantity, COUNT(ordered_products_explode.id) as product_count
FROM (
  SELECT city, order_date, customer_id, customer_name, EXPLODE(ordered_products) as ordered_products_explode
  FROM LIVE.sales_orders_cleaned
  WHERE city = 'Los Angeles'
  )
GROUP BY order_date, city, customer_id, customer_name, ordered_products_explode.curr

Step 2: Implement Quality Rules

The purpose of the Silver zone is to clean the data from the Raw zone. Delta live table provides a very easy mechanism to clean this data. It provides the EXPECT clause and instructions on what to do when the condition does not match. Upon finding the invalid record you may take any of these actions:

  • Drop the invalid record
  • Fail the pipeline
  • Retain invalid record.

In the example below we are dropping the invalid record when the Order number is NULL.

-- Sales Order cleaned Silver zone created after dropping invalid sales order (Where order number is NULL) numbers
-- This dataset is partitioned by date and created with join from customers and sales order datasets created earlier
CREATE INCREMENTAL LIVE TABLE sales_orders_cleaned(
  CONSTRAINT valid_order_number EXPECT (order_number IS NOT NULL) ON VIOLATION DROP ROW
)
PARTITIONED BY (order_date)
COMMENT "The cleaned sales orders with valid order_number(s) and partitioned by order_datetime."
TBLPROPERTIES ("myCompanyPipeline.quality" = "silver")
AS
SELECT f.customer_id, f.customer_name, f.number_of_line_items,
  TIMESTAMP(from_unixtime((cast(f.order_datetime as long)))) as order_datetime,
  DATE(from_unixtime((cast(f.order_datetime as long)))) as order_date,
  f.order_number, f.ordered_products, c.state, c.city, c.lon, c.lat, c.units_purchased, c.loyalty_segment
  FROM STREAM(LIVE.sales_orders_raw) f
  LEFT JOIN LIVE.customers c
      ON c.customer_id = f.customer_id
     AND c.customer_name = f.customer_nameer_id
     AND c.customer_name = f.customer_name

Step 3: Deploy, Test, and Monitor

Finally, once we deploy the Delta Live Pipeline we need to perform the test whether everything is working as expected. This code tests all the output tables and verifies them.

-- Verify if Customer table is processed in the Mapping zone
SELECT * from retail_sales.customers
---- Verify if Sales Order table is processed in the Bronze zone
SELECT * from retail_sales.sales_orders_raw
-- Verify if Sales Order Cleaned table is processed in the Silver zone
SELECT * from retail_sales.sales_orders_cleaned
-- Verify if Sales Order in Chicago  table is processed in the Gold Zone
SELECT * from retail_sales.sales_order_in_chicago
-- Verify if Sales Order in LA  table is processed in the Gold Zone
SELECT * from retail_sales.sales_order_in_la

Now we have tested the code and it is promoted into the production environment. In order to maintain it, we need to operationalize it which requires monitoring capabilities. Delta Live Table provides unique monitoring capabilities. The code provided below shows how to customize your code based on your own monitoring requirements.

%python
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Here we have not defined any storage location so events will be stored in /pipelines/9da89b4f-4786-4bd0-aff2-6054843affdc/system/events where Pipeline id is 9da89b4f-4786-4bd0-aff2-6054843affdc
event_log = spark.read.format('delta').load(f"/pipelines/9da89b4f-4786-4bd0-aff2-6054843affdc/system/events")
event_log.createOrReplaceTempView("dltevents")
--Verify if we are able to see these events from TempView created earlier
SELECT * from dltevents
--Filtering the logs where event type was Flow-Progress
select details from dltevents where event_type = 'flow_progress'
--Filtering those events where flow is in progress and status is COMPLETED
select details from dltevents where event_type = 'flow_progress' and details:flow_progress.status ='COMPLETED'
%python
# Let's filter events with the Select Expression where FLOW is in progress and status is completed ...See how the nested elements are referenced in the code.
from pyspark.sql.functions import *
from pyspark.sql.types import *
event_log = spark.read.format('delta').load(f"/pipelines/9da89b4f-4786-4bd0-aff2-6054843affdc/system/events")

runHistorydf = (event_log
      .selectExpr("id",
                  "timestamp",
                  "origin.flow_name",
                  "details:flow_progress.metrics.num_output_rows",
                  "details:flow_progress.data_quality.dropped_records", "*")
     .filter("event_type = 'flow_progress' and details:flow_progress.status ='COMPLETED'")
 )

display(runHistorydf)
%python
# Finding Expectations apart from other fields
from pyspark.sql.functions import *
from pyspark.sql.types import *
event_log = spark.read.format('delta').load(f"/pipelines/9da89b4f-4786-4bd0-aff2-6054843affdc/system/events")

runHistorydf = (event_log
      .selectExpr("id",
                  "timestamp",
                  "origin.flow_name",
                  "details:flow_progress.metrics.num_output_rows",
                  "details:flow_progress.data_quality.dropped_records",
                  "details:flow_progress.data_quality.expectations")
     .filter("event_type = 'flow_progress' and details:flow_progress.status ='COMPLETED'")
 )

display(runHistorydf)
%python
#Filtering for each expectation no of records passed and failed
from pyspark.sql.functions import *
from pyspark.sql.types import *
event_log = spark.read.format('delta').load(f"/pipelines/9da89b4f-4786-4bd0-aff2-6054843affdc/system/events")

expectationsdf = (event_log
       .selectExpr("id",
                   "details:flow_progress.data_quality.expectations")
       .filter("event_type = 'flow_progress' and details:flow_progress.status ='COMPLETED'")
)

schema = ArrayType(StructType([
    StructField('name', StringType()),
    StructField('dataset', StringType()),
    StructField ('passed_records' , LongType()) ,
    StructField ('failed_records' , LongType())
]))
expectationsdf = expectationsdf.withColumn("expectations", explode(from_json("expectations",schema)))

expectationsdf = (
  expectationsdf.selectExpr("id",
                            "expectations.name",
                            "expectations.dataset",
                            "expectations.passed_records",
                            "expectations.failed_records"))
expectationsdf.display()

I hope you enjoyed this article.

Leave a Reply

Your email address will not be published. Required fields are marked *