This is part two of a series of blogs for Databricks Delta Live tables.In part one of the blog we have discussed the basic concepts and terminology related to Databricks Delta Live tables. In this blog, we will learn how to implement Databricks Delta Live Table in three easy steps.
We will implement the different concepts and terminology used in Delta Live tables: Pipelines, Settings, Datasets, Pipeline Modes, and different editions of Delta Live Pipelines.
Databricks Delta Live Table can be implemented in three easy steps. We will take an example and go thru sample code which you can copy and run in your own Databrick environment.
Step 1: Design the Lakehouse zones
First, we need to design all the layers for the Lakehouse platform:
- Bronze: It contains the raw data as it is received for audit purposes to trace back to the data sources.
- Silver: This zone filters and cleans the data from the Bronze zone. Essentially it handles the missing data and standardizes clean fields. If there are nested objects it converts them into flat structures for ease of querying. It also renames the columns to friendly names so it is well understood.
- Gold: When the data reaches the Gold zone it creates the business-specific model and aggregates it based on Dimesntions and Facts. It also provides business-friendly names and creates views for business users can access these views.
In the below code I have created these zones.
--Mapping zone containing the Databricks Customer dataset from csv file --Mapping Zone CREATE INCREMENTAL LIVE TABLE customers COMMENT "The customers buying finished products, ingested from /databricks-datasets." TBLPROPERTIES ("myCompanyPipeline.quality" = "mapping") AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv");
-- Sales order Databricks JSON dataset --Bronze Zone CREATE INCREMENTAL LIVE TABLE sales_orders_raw COMMENT "The raw sales orders, ingested from /databricks-datasets." TBLPROPERTIES ("myCompanyPipeline.quality" = "bronze") AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json", map("cloudFiles.inferColumnTypes", "true"))
-- Sales Order cleaned Silver zone created after dropping invalid sales order (Where order number is NULL) numbers -- This dataset is partitioned by date and created with join from customers and sales order datasets created earlier CREATE INCREMENTAL LIVE TABLE sales_orders_cleaned( CONSTRAINT valid_order_number EXPECT (order_number IS NOT NULL) ON VIOLATION DROP ROW ) PARTITIONED BY (order_date) COMMENT "The cleaned sales orders with valid order_number(s) and partitioned by order_datetime." TBLPROPERTIES ("myCompanyPipeline.quality" = "silver") AS SELECT f.customer_id, f.customer_name, f.number_of_line_items, TIMESTAMP(from_unixtime((cast(f.order_datetime as long)))) as order_datetime, DATE(from_unixtime((cast(f.order_datetime as long)))) as order_date, f.order_number, f.ordered_products, c.state, c.city, c.lon, c.lat, c.units_purchased, c.loyalty_segment FROM STREAM(LIVE.sales_orders_raw) f LEFT JOIN LIVE.customers c ON c.customer_id = f.customer_id AND c.customer_name = f.customer_name
-- Sales Order cleaned Silver zone created after dropping invalid sales order (Where order number is NULL) numbers -- This dataset is partitioned by date and created with join from customers and sales order datasets created earlier CREATE INCREMENTAL LIVE TABLE sales_orders_cleaned( CONSTRAINT valid_order_number EXPECT (order_number IS NOT NULL) ON VIOLATION DROP ROW ) PARTITIONED BY (order_date) COMMENT "The cleaned sales orders with valid order_number(s) and partitioned by order_datetime." TBLPROPERTIES ("myCompanyPipeline.quality" = "silver") AS SELECT f.customer_id, f.customer_name, f.number_of_line_items, TIMESTAMP(from_unixtime((cast(f.order_datetime as long)))) as order_datetime, DATE(from_unixtime((cast(f.order_datetime as long)))) as order_date, f.order_number, f.ordered_products, c.state, c.city, c.lon, c.lat, c.units_purchased, c.loyalty_segment FROM STREAM(LIVE.sales_orders_raw) f LEFT JOIN LIVE.customers c ON c.customer_id = f.customer_id AND c.customer_name = f.customer_name
--Gold Zone where we are retrieving the Sales order from Los Angels ( Filter conditions) -- We are using cleaned slaes order dataset to get it CREATE LIVE TABLE sales_order_in_la COMMENT "Sales orders in LA." TBLPROPERTIES ("myCompanyPipeline.quality" = "gold") AS SELECT city, order_date, customer_id, customer_name, ordered_products_explode.curr, SUM(ordered_products_explode.price) as sales, SUM(ordered_products_explode.qty) as qantity, COUNT(ordered_products_explode.id) as product_count FROM ( SELECT city, order_date, customer_id, customer_name, EXPLODE(ordered_products) as ordered_products_explode FROM LIVE.sales_orders_cleaned WHERE city = 'Los Angeles' ) GROUP BY order_date, city, customer_id, customer_name, ordered_products_explode.curr
Step 2: Implement Quality Rules
The purpose of the Silver zone is to clean the data from the Raw zone. Delta live table provides a very easy mechanism to clean this data. It provides the EXPECT clause and instructions on what to do when the condition does not match. Upon finding the invalid record you may take any of these actions:
- Drop the invalid record
- Fail the pipeline
- Retain invalid record.
In the example below we are dropping the invalid record when the Order number is NULL.
-- Sales Order cleaned Silver zone created after dropping invalid sales order (Where order number is NULL) numbers -- This dataset is partitioned by date and created with join from customers and sales order datasets created earlier CREATE INCREMENTAL LIVE TABLE sales_orders_cleaned( CONSTRAINT valid_order_number EXPECT (order_number IS NOT NULL) ON VIOLATION DROP ROW ) PARTITIONED BY (order_date) COMMENT "The cleaned sales orders with valid order_number(s) and partitioned by order_datetime." TBLPROPERTIES ("myCompanyPipeline.quality" = "silver") AS SELECT f.customer_id, f.customer_name, f.number_of_line_items, TIMESTAMP(from_unixtime((cast(f.order_datetime as long)))) as order_datetime, DATE(from_unixtime((cast(f.order_datetime as long)))) as order_date, f.order_number, f.ordered_products, c.state, c.city, c.lon, c.lat, c.units_purchased, c.loyalty_segment FROM STREAM(LIVE.sales_orders_raw) f LEFT JOIN LIVE.customers c ON c.customer_id = f.customer_id AND c.customer_name = f.customer_nameer_id AND c.customer_name = f.customer_name
Step 3: Deploy, Test, and Monitor
Finally, once we deploy the Delta Live Pipeline we need to perform the test whether everything is working as expected. This code tests all the output tables and verifies them.
-- Verify if Customer table is processed in the Mapping zone SELECT * from retail_sales.customers
---- Verify if Sales Order table is processed in the Bronze zone SELECT * from retail_sales.sales_orders_raw
-- Verify if Sales Order Cleaned table is processed in the Silver zone SELECT * from retail_sales.sales_orders_cleaned
-- Verify if Sales Order in Chicago table is processed in the Gold Zone SELECT * from retail_sales.sales_order_in_chicago
-- Verify if Sales Order in LA table is processed in the Gold Zone SELECT * from retail_sales.sales_order_in_la
Now we have tested the code and it is promoted into the production environment. In order to maintain it, we need to operationalize it which requires monitoring capabilities. Delta Live Table provides unique monitoring capabilities. The code provided below shows how to customize your code based on your own monitoring requirements.
%python from pyspark.sql.functions import * from pyspark.sql.types import * # Here we have not defined any storage location so events will be stored in /pipelines/9da89b4f-4786-4bd0-aff2-6054843affdc/system/events where Pipeline id is 9da89b4f-4786-4bd0-aff2-6054843affdc event_log = spark.read.format('delta').load(f"/pipelines/9da89b4f-4786-4bd0-aff2-6054843affdc/system/events") event_log.createOrReplaceTempView("dltevents")
--Verify if we are able to see these events from TempView created earlier SELECT * from dltevents
--Filtering the logs where event type was Flow-Progress select details from dltevents where event_type = 'flow_progress'
--Filtering those events where flow is in progress and status is COMPLETED select details from dltevents where event_type = 'flow_progress' and details:flow_progress.status ='COMPLETED'
%python # Let's filter events with the Select Expression where FLOW is in progress and status is completed ...See how the nested elements are referenced in the code. from pyspark.sql.functions import * from pyspark.sql.types import * event_log = spark.read.format('delta').load(f"/pipelines/9da89b4f-4786-4bd0-aff2-6054843affdc/system/events") runHistorydf = (event_log .selectExpr("id", "timestamp", "origin.flow_name", "details:flow_progress.metrics.num_output_rows", "details:flow_progress.data_quality.dropped_records", "*") .filter("event_type = 'flow_progress' and details:flow_progress.status ='COMPLETED'") ) display(runHistorydf)
%python # Finding Expectations apart from other fields from pyspark.sql.functions import * from pyspark.sql.types import * event_log = spark.read.format('delta').load(f"/pipelines/9da89b4f-4786-4bd0-aff2-6054843affdc/system/events") runHistorydf = (event_log .selectExpr("id", "timestamp", "origin.flow_name", "details:flow_progress.metrics.num_output_rows", "details:flow_progress.data_quality.dropped_records", "details:flow_progress.data_quality.expectations") .filter("event_type = 'flow_progress' and details:flow_progress.status ='COMPLETED'") ) display(runHistorydf)
%python #Filtering for each expectation no of records passed and failed from pyspark.sql.functions import * from pyspark.sql.types import * event_log = spark.read.format('delta').load(f"/pipelines/9da89b4f-4786-4bd0-aff2-6054843affdc/system/events") expectationsdf = (event_log .selectExpr("id", "details:flow_progress.data_quality.expectations") .filter("event_type = 'flow_progress' and details:flow_progress.status ='COMPLETED'") ) schema = ArrayType(StructType([ StructField('name', StringType()), StructField('dataset', StringType()), StructField ('passed_records' , LongType()) , StructField ('failed_records' , LongType()) ])) expectationsdf = expectationsdf.withColumn("expectations", explode(from_json("expectations",schema))) expectationsdf = ( expectationsdf.selectExpr("id", "expectations.name", "expectations.dataset", "expectations.passed_records", "expectations.failed_records")) expectationsdf.display()
I hope you enjoyed this article.