Do you have a big data workload that needs to be managed efficiently and effectively? Are the current SQL workflows falling short? Life as a developer can be hectic especially when you struggle to find ways to optimize your workflow to ensure that you are maximizing efficiency while also reducing errors and bugs along the way. Writing robust Databricks SQL workflows is key to getting the most out of your data and ensuring maximum efficiency. Databricks workflow, a scalable integrated platform, designed to enhance productivity by streamlining data engineering processes. Getting started with writing these powerful workflows can appear daunting, but it doesn’t have to be. This blog post will provide an introduction into leveraging the capabilities of Databricks SQL in your workflow and equip you with best practices for developing powerful Databricks SQL workflows. We will discuss best practices for creating superior SQL workflows with Databricks, allowing developers like you to create optimized projects whose level of quality rivals even their most time-intensive efforts!
Introduction to Databricks Workflows
As data-driven strategies continue to become more prevalent in the development space, developers require tools that can help them effectively take advantage of those opportunities. Databricks Workflows provides a powerful platform for creating and managing complex workflows with multiple steps. It enables easy orchestration of notebooks, spark jobs, big data jobs and other scripts while also offering users the ability to store custom metadata associated with every step in the workflow run. Databricks workflow is a integrated tool which provides great features for developers:
- Fully-managed orchestration service.
- Deeply integrated with the Databricks Lakehouse Platform.
- No need to create complex infra in the cloud.
- Can be used in Data Engineering and ML workflows.
- Provides Lineage.
- Works with Delta Live tables.
- Combination of Notebook, Spark, ML Model, and dbt as Job workflow
- Task and workflow are isolated from each other enabling the collaboration.
- Workflow automatically managed your resources which means more power in your hand.
- Simple workflow authoring experience with drag-and-drop capability.
- Supports multiple languages.
- Inbuilt monitoring.
This is how its easy to use graphical user interface looks like:
Workflow monitoring
Developers need more than just a development platform – they need comprehensive solutions that enable them to quickly and efficiently create applications with accuracy, agility and scalability. Databricks Workflow provides the flexibility for developers to do exactly that. With its built-in monitoring capabilities and workflow management tools, it enables teams to focus on building best-in-class data pipelines faster and with fewer errors, even in complex environments.
Workflow lineage
As a developer, the ability to actively track data workflows is essential for optimizing performance and understanding data flow. This can be especially important in large-scale organizations where datasets become increasingly complex. Databricks Workflow Lineage provides developers with an effective way to keep up with their organization’s data workflows by providing an easy-to-use tool that explains how different tasks within a workflow are connected. With this helpful visualization tool, developers can quickly recognize patterns and identify areas where improvements need to be made for successful execution of the entire process.
Workflow scheduling
DatabricksWorkflow scheduling is a powerful tool that can be used to efficiently coordinate and manage workflows for data engineering pipelines. With DatabricksWorkflow, users can define tasks with dependencies, set up automated alerts for when tasks fail or take longer than expected, and manage job scheduling across different environments. Users can also use the Scheduler API to create custom schedules for their jobs, which allows them to better control when tasks are run and in what order they will be executed. This makes it easier to keep track of the status of data processing tasks and optimizes the workflow performance. Additionally, DatabricksWorkflow provides support for configuring advanced job settings like maximum concurrency and retry parameters. This helps users ensure that their workflows are configured for optimal performance and efficiency.
SparkUI, Metrics, and workflow logs
Databricks provides many features for the efficient and effective execution of workflows.
SparkUI
Spark UI helps to find out the job status and its execution. It helps to understand the event timeline from start to finish and how executors are added and removed.
Workflow Logs
Finding workflow logs is an important must-have tool under the developer’s belt to debug and diagnose workflow execution issues.
Metrics
Metrics pertaining to Workflow provide incredible insight into how workflow affects the CPU memory and other critical system resources in a timely fashion.
Demo scenario
The entire demo is recorded in the above youtube video. Here we are collecting the Github info (Github Rest API) via python code which produces the JSON files. Then we process the JSON files with Delta live pipelines to create three tables in the SQL data warehouse. Finally, we will develop three queries to retrieve the data from these tables and plot the results in a bar chart. These Bar charts will be used to create the dashboard.
Databricks secret
The following Databricks CLI command can be used to store the Github secret
-----Databricks secret related commands----------- databricks secrets create-scope --scope databricksgithubscope databricks secrets put --scope databricksgithubscope --key githubtoken --string-value Store the github token value
Python code to extract the Github data via Rest API call
This python code uses a REST API call to retrieve the GitHub commit data and contributor details. The method call is the same for both methods only difference is the string passed based on which method call is differentiated.
import json import requests import sys api_url = "https://api.github.com" def get_commits(owner, repo, token, path): page = 1 request_url = f"{api_url}/repos/{owner}/{repo}/commits" more = True get_response(request_url, f"{path}/commits", token) def get_contributors(owner, repo, token, path): page = 1 request_url = f"{api_url}/repos/{owner}/{repo}/contributors" more = True get_response(request_url, f"{path}/contributors", token) def get_response(request_url, path, token): page = 1 more = True while more: response = requests.get(request_url, params={'page': page}, headers={'Authorization': "token " + token}) if response.text != "[]": write(path + "/records-" + str(page) + ".json", response.text) page += 1 else: more = False def write(filename, contents): dbutils.fs.put(filename, contents) def main(): args = sys.argv[1:] if len(args) < 6: print("Usage: github-api.py owner repo request output-dir secret-scope secret-key") sys.exit(1) owner = sys.argv[1] repo = sys.argv[2] request = sys.argv[3] output_path = sys.argv[4] secret_scope = sys.argv[5] secret_key = sys.argv[6] token = dbutils.secrets.get(scope=secret_scope, key=secret_key) if (request == "commits"): get_commits(owner, repo, token, output_path) elif (request == "contributors"): get_contributors(owner, repo, token, output_path) if __name__ == "__main__": main()
These method calls differentiate the commits and contributor tasks.
-----Values in contrib task-------------------------- get_contributors ["databrickslabs","overwatch","contributors","dbfs:/FileStore/output/contribs","databricksgithubscope","githubtoken"] --------------------------
There is a difference between the commit and contributor path
commits-path dbfs:/FileStore/output/commits contribs-path dbfs:/FileStore/output/contribs
Delta live table
Delta live pipeline is used to fetch the JSON data and store it in the SQL data warehouse table.
import dlt from pyspark.sql.functions import * def parse(df): return (df .withColumn("author_date", to_timestamp(col("commit.author.date"))) .withColumn("author_email", col("commit.author.email")) .withColumn("author_name", col("commit.author.name")) .withColumn("comment_count", col("commit.comment_count")) .withColumn("committer_date", to_timestamp(col("commit.committer.date"))) .withColumn("committer_email", col("commit.committer.email")) .withColumn("committer_name", col("commit.committer.name")) .withColumn("message", col("commit.message")) .withColumn("sha", col("commit.tree.sha")) .withColumn("tree_url", col("commit.tree.url")) .withColumn("url", col("commit.url")) .withColumn("verification_payload", col("commit.verification.payload")) .withColumn("verification_reason", col("commit.verification.reason")) .withColumn("verification_signature", col("commit.verification.signature")) .withColumn("verification_verified", col("commit.verification.signature").cast("string")) .drop("commit") ) @dlt.table( comment="Raw GitHub commits" ) def github_commits_raw(): df = spark.read.json(spark.conf.get("commits-path")) return parse(df.select("commit")) @dlt.table( comment="Info on the author of a commit" ) def commits_by_author(): return ( dlt.read("github_commits_raw") .withColumnRenamed("author_date", "date") .withColumnRenamed("author_email", "email") .withColumnRenamed("author_name", "name") .select("sha", "date", "email", "name") ) @dlt.table( comment="GitHub repository contributors" ) def github_contributors_raw(): return( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "json") .load(spark.conf.get("contribs-path")) )
Queries used in the Dashboard
We have used three queries in the dashboard.
- Total commit by Author
SELECT name, count(1) commits FROM commits_by_author GROUP BY name ORDER BY commits DESC LIMIT 10
2. Top 20 contributors
SELECT login, cast(contributions AS INTEGER) FROM github_contributors_raw ORDER BY contributions DESC LIMIT 20
3. Top 10 contributors by month
SELECT date_part('YEAR', date) AS year, date_part('MONTH', date) AS month, name, count(1) FROM commits_by_author WHERE name IN ( SELECT name FROM commits_by_author GROUP BY name ORDER BY count(name) DESC LIMIT 10 ) AND date_part('YEAR', date) >= 2022 GROUP BY name, year, month ORDER BY year, month, name
4. Dashboard created from the three queries: Here is the look and feel of the dashboard
Hope you find this blog and video to be helpful.