Writing robust Databricks SQL workflows for maximum efficiency

Do you have a big data workload that needs to be managed efficiently and effectively? Are the current SQL workflows falling short? Life as a developer can be hectic especially when you struggle to find ways to optimize your workflow to ensure that you are maximizing efficiency while also reducing errors and bugs along the way. Writing robust Databricks SQL workflows is key to getting the most out of your data and ensuring maximum efficiency. Databricks workflow, a scalable integrated platform, designed to enhance productivity by streamlining data engineering processes. Getting started with writing these powerful workflows can appear daunting, but it doesn’t have to be. This blog post will provide an introduction into leveraging the capabilities of Databricks SQL in your workflow and equip you with best practices for developing powerful Databricks SQL workflows. We will discuss best practices for creating superior SQL workflows with Databricks, allowing developers like you to create optimized projects whose level of quality rivals even their most time-intensive efforts!

Introduction to Databricks Workflows

As data-driven strategies continue to become more prevalent in the development space, developers require tools that can help them effectively take advantage of those opportunities. Databricks Workflows provides a powerful platform for creating and managing complex workflows with multiple steps. It enables easy orchestration of notebooks, spark jobs, big data jobs and other scripts while also offering users the ability to store custom metadata associated with every step in the workflow run. Databricks workflow is a integrated tool which provides great features for developers:

  • Fully-managed orchestration service. 
  • Deeply integrated with the Databricks Lakehouse Platform.
  • No need to create complex infra in the cloud.
  • Can be used in Data Engineering and ML workflows.
  • Provides Lineage.
  • Works with Delta Live tables.
  • Combination of Notebook, Spark, ML Model, and dbt as Job workflow
  • Task and workflow are isolated from each other enabling the collaboration.
  • Workflow automatically managed your resources which means more power in your hand.
  • Simple workflow authoring experience with drag-and-drop capability.
  • Supports multiple languages.
  • Inbuilt monitoring.

This is how its easy to use graphical user interface looks like:

Workflow monitoring

Developers need more than just a development platform – they need comprehensive solutions that enable them to quickly and efficiently create applications with accuracy, agility and scalability. Databricks Workflow provides the flexibility for developers to do exactly that. With its built-in monitoring capabilities and workflow management tools, it enables teams to focus on building best-in-class data pipelines faster and with fewer errors, even in complex environments.

Workflow lineage

As a developer, the ability to actively track data workflows is essential for optimizing performance and understanding data flow. This can be especially important in large-scale organizations where datasets become increasingly complex. Databricks Workflow Lineage provides developers with an effective way to keep up with their organization’s data workflows by providing an easy-to-use tool that explains how different tasks within a workflow are connected. With this helpful visualization tool, developers can quickly recognize patterns and identify areas where improvements need to be made for successful execution of the entire process.

Workflow scheduling

DatabricksWorkflow scheduling is a powerful tool that can be used to efficiently coordinate and manage workflows for data engineering pipelines. With DatabricksWorkflow, users can define tasks with dependencies, set up automated alerts for when tasks fail or take longer than expected, and manage job scheduling across different environments. Users can also use the Scheduler API to create custom schedules for their jobs, which allows them to better control when tasks are run and in what order they will be executed. This makes it easier to keep track of the status of data processing tasks and optimizes the workflow performance. Additionally, DatabricksWorkflow provides support for configuring advanced job settings like maximum concurrency and retry parameters. This helps users ensure that their workflows are configured for optimal performance and efficiency.

SparkUI, Metrics, and workflow logs

Databricks provides many features for the efficient and effective execution of workflows.

SparkUI

Spark UI helps to find out the job status and its execution. It helps to understand the event timeline from start to finish and how executors are added and removed.

Workflow Logs

Finding workflow logs is an important must-have tool under the developer’s belt to debug and diagnose workflow execution issues.

Metrics

Metrics pertaining to Workflow provide incredible insight into how workflow affects the CPU memory and other critical system resources in a timely fashion.

Demo scenario

The entire demo is recorded in the above youtube video. Here we are collecting the Github info (Github Rest API) via python code which produces the JSON files. Then we process the JSON files with Delta live pipelines to create three tables in the SQL data warehouse. Finally, we will develop three queries to retrieve the data from these tables and plot the results in a bar chart. These Bar charts will be used to create the dashboard.

Databricks secret

The following Databricks CLI command can be used to store the Github secret

-----Databricks secret related commands-----------

databricks secrets create-scope --scope databricksgithubscope

databricks secrets put --scope databricksgithubscope --key githubtoken --string-value Store the github token value

Python code to extract the Github data via Rest API call

This python code uses a REST API call to retrieve the GitHub commit data and contributor details. The method call is the same for both methods only difference is the string passed based on which method call is differentiated.

import json
import requests
import sys

api_url = "https://api.github.com"

def get_commits(owner, repo, token, path):
  page = 1
  request_url =  f"{api_url}/repos/{owner}/{repo}/commits"
  more = True

  get_response(request_url, f"{path}/commits", token)

def get_contributors(owner, repo, token, path):
  page = 1
  request_url =  f"{api_url}/repos/{owner}/{repo}/contributors"
  more = True

  get_response(request_url, f"{path}/contributors", token)

def get_response(request_url, path, token):
  page = 1
  more = True

  while more:
    response = requests.get(request_url, params={'page': page}, headers={'Authorization': "token " + token})
    if response.text != "[]":
      write(path + "/records-" + str(page) + ".json", response.text)
      page += 1
    else:
      more = False

def write(filename, contents):
  dbutils.fs.put(filename, contents)

def main():
  args = sys.argv[1:]
  if len(args) < 6:
    print("Usage: github-api.py owner repo request output-dir secret-scope secret-key")
    sys.exit(1)

  owner = sys.argv[1]
  repo = sys.argv[2]
  request = sys.argv[3]
  output_path = sys.argv[4]
  secret_scope = sys.argv[5]
  secret_key = sys.argv[6]

  token = dbutils.secrets.get(scope=secret_scope, key=secret_key)

  if (request == "commits"):
    get_commits(owner, repo, token, output_path)
  elif (request == "contributors"):
    get_contributors(owner, repo, token, output_path)

if __name__ == "__main__":
    main()

These method calls differentiate the commits and contributor tasks.

-----Values in contrib task--------------------------

get_contributors
["databrickslabs","overwatch","contributors","dbfs:/FileStore/output/contribs","databricksgithubscope","githubtoken"]
--------------------------

There is a difference between the commit and contributor path

commits-path dbfs:/FileStore/output/commits
contribs-path dbfs:/FileStore/output/contribs

Delta live table

Delta live pipeline is used to fetch the JSON data and store it in the SQL data warehouse table.

import dlt
from pyspark.sql.functions import *

def parse(df):
   return (df
     .withColumn("author_date", to_timestamp(col("commit.author.date")))
     .withColumn("author_email", col("commit.author.email"))
     .withColumn("author_name", col("commit.author.name"))
     .withColumn("comment_count", col("commit.comment_count"))
     .withColumn("committer_date", to_timestamp(col("commit.committer.date")))
     .withColumn("committer_email", col("commit.committer.email"))
     .withColumn("committer_name", col("commit.committer.name"))
     .withColumn("message", col("commit.message"))
     .withColumn("sha", col("commit.tree.sha"))
     .withColumn("tree_url", col("commit.tree.url"))
     .withColumn("url", col("commit.url"))
     .withColumn("verification_payload", col("commit.verification.payload"))
     .withColumn("verification_reason", col("commit.verification.reason"))
     .withColumn("verification_signature", col("commit.verification.signature"))
     .withColumn("verification_verified", col("commit.verification.signature").cast("string"))
     .drop("commit")
   )

@dlt.table(
   comment="Raw GitHub commits"
)
def github_commits_raw():
  df = spark.read.json(spark.conf.get("commits-path"))
  return parse(df.select("commit"))

@dlt.table(
  comment="Info on the author of a commit"
)
def commits_by_author():
  return (
    dlt.read("github_commits_raw")
      .withColumnRenamed("author_date", "date")
      .withColumnRenamed("author_email", "email")
      .withColumnRenamed("author_name", "name")
      .select("sha", "date", "email", "name")
  )

@dlt.table(
  comment="GitHub repository contributors"
)
def github_contributors_raw():
  return(
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(spark.conf.get("contribs-path"))
  )

Queries used in the Dashboard

We have used three queries in the dashboard.

  1. Total commit by Author
SELECT
  name,
  count(1) commits
FROM
  commits_by_author
GROUP BY
  name
ORDER BY
  commits DESC
LIMIT 10

2. Top 20 contributors

SELECT
  login,
  cast(contributions AS INTEGER)
FROM
  github_contributors_raw
ORDER BY
  contributions DESC
LIMIT 20

3. Top 10 contributors by month

SELECT
  date_part('YEAR', date) AS year,
  date_part('MONTH', date) AS month,
  name,
  count(1)
FROM
  commits_by_author
WHERE
  name IN (
    SELECT
      name
    FROM
      commits_by_author
    GROUP BY
      name
    ORDER BY
      count(name) DESC
    LIMIT 10
  )
  AND
    date_part('YEAR', date) >= 2022
GROUP BY
  name, year, month
ORDER BY
  year, month, name

4. Dashboard created from the three queries: Here is the look and feel of the dashboard

Hope you find this blog and video to be helpful.

Leave a Reply

Your email address will not be published. Required fields are marked *