Writing Powerful data ingestion pipelines with Azure Databricks Autoloader

I have been using Azure Data Factory to ingest the files into ADLS Gen 2 for processing. Lately I found many challenges when we use ADF for file ingestion:

  1. We need to store the state of the file from the time when it is ingested till the time when it is processed to track the file status from start to finish. This is required to make sure that we do not miss any file and track the processing life cycle of the file.
  2. Lots of extra code (other than file processing code) needs to be written for tracking the files and it requires extra infrastructure for it. For example, storing the state of the file in SQL database and retrieving the state of the file when you need it.
  3. ADF file ingestion approach may not scale when number of files increases.
  4. If file schema changes you need to revisit your code again.
  5. Even though ADF provides no code development environment, but its interface is too clumsy with lots of drill down windows and when you want to write more complex logic you need to create expressions and dig deeper into it. Developers who are familiar to write the code may not like this approach because it is a new learning curve for them.

Azure Databricks Autoloader is a great in terms of its capabilities:

  • Scalability: Auto Loader can discover millions of files in most efficient and optimal way. It can run asynchronously to discover the files and this way it avoids wasting any compute resources.
  • Performance: The cost of discovering files depends upon the number of files that are being ingested instead of number of directories that the files may dumped into. For example, if you have ten directories but there are only three files landing into these directories so it will charge for three files only instead of charging for 10 directories.
  • Schema inference and evolution support: Do not worry if your schema changes. Auto Loader can detect schema changes and it will notify you when schema changes happen, and then rescue the data that would have been otherwise ignored or lost.
  • Cost: Auto Loader uses native cloud APIs to get lists of files from the storage and this saves the cost. Auto Loader’s file notification mode can help reduce the cloud costs by avoiding directory listing. Auto Loader set up file notification services on storage to make file discovery much cheaper.

In the traditional approach if you implement the file ingestion functionality you may have to setup multiple components

  1. Storage Queue
  2. Event Grid
  3. Setting up notification with the blob specific events
  4. Custom code to process the file once the events are received

With Autoloader you do not have to write any custom code because Auto loader does it automatically for you.

You need to follow these steps in order for Autoloader to work in Azure

  1. Create Resource Group.
  2. Create ADLS Gen 2 or Storage account into resource group.
  3. Create container in storage account.
  4. Create folders inside the container.
  5. Create service principle in Azure AD.
  6. Grant “EventGrid EventSubscription Contributor” permission to the Service principle for resource group.
  7. Grant these permissions to Service principle for storage account: Storage Blob Data Contributor, Contributor, Storage Queue Data Contributor.
  8. Write Python/Scala code specifying service principle, Storage account.
  9. Mount the ADLS gen 2 Data Lake on Databricks.
  10. Run the Auto loader code to see whether the code ingest the file and captures the data from file.

All these steps are tedious to perform and error prone. So, I wrote a PowerShell code which not only performs these configuration steps but also generates the python code. It generates the python code in a text file (code.txt) which you can copy in python notebook to execute the steps. You can change the schema definition based on your needs.

#please run the module install commandlets with admin previleges
# Please run the install commands only when you do not have these modules installed
install Az.Resources module
Install-Module -Name Az.Resources -RequiredVersion 4.4.1 -AllowClobber
#If Module is not imported please import if after installation
Import-Module Az.Storage


# Set these variables according to your convenience.
$resourceGroup="RG0111"
$tenantID
$location="eastus"
$storageAccountName="autoloaderdemo777"
$appName="autoloader777"
$uri="api://"+$appName
$homePageURL="http://www.microsoft.com"
$filesystemName = "incoming"
$incomingDirName = "file/"
$checkpointDirName = "StreamCheckpoint/"
$deltaDirName = "delta/"
$schemaDirName = "schema/"
$mntPoint="mnt/Live Network "

#First create a resource group
Connect-AzAccount
New-AzResourceGroup -Name $resourceGroup -Location $location
$tenantID=(Get-AzTenant).TenantId[0].ToString()

# Create Datalake Gen2
$storageAccount =New-AzStorageAccount -ResourceGroupName $resourceGroup `
  -Name $storageAccountName `
  -Location $location `
  -SkuName Standard_LRS `
  -Kind StorageV2 `
  -EnableHierarchicalNamespace $True

$accessKey=(Get-AzStorageAccountKey -ResourceGroupName $resourceGroup -AccountName $storageAccountName).GetValue(0)
$Context = $storageAccount.Context

# Create Container in Datalake Gen2
New-AzStorageContainer -Context $Context -Name $filesystemName -Permission Off

#Create a directory named file in Datalake Gen2
New-AzDataLakeGen2Item -Context $Context -FileSystem $filesystemName -Path $incomingDirName -Directory


#Create a directory named StreamCheckpoint in Datalake Gen2
New-AzDataLakeGen2Item -Context $Context -FileSystem $filesystemName -Path $checkpointDirName -Directory

#Create a directory named delta in Datalake Gen2
New-AzDataLakeGen2Item -Context $Context -FileSystem $filesystemName -Path $deltaDirName -Directory

#Create a directory named schema in Datalake Gen2
New-AzDataLakeGen2Item -Context $Context -FileSystem $filesystemName -Path $schemaDirName -Directory

#Create a Service principle
Connect-AzureAD -TenantId $tenantID
$azureADAppReg=New-AzADApplication  -DisplayName $appName -HomePage $homePageURL -IdentifierUris $uri
$secret = (New-AzureADApplicationPasswordCredential  -ObjectId $azureADAppReg.ObjectId  -CustomKeyIdentifier mysecret -StartDate (Get-Date) -EndDate ((Get-Date).AddYears(6))).value


$clientID=$azureADAppReg.ApplicationId
$servicePrincipal=New-AzureADServicePrincipal -AppId $clientID
$subscriptionID=(Get-AzSubscription -TenantId $tenantID).id
$storageAccountResourceType=(Get-AzResource -Name $storageAccountName).ResourceId


#Add Role assignment for resource Group
New-AzRoleAssignment  `
-ApplicationId $servicePrincipal.AppId  `
-RoleDefinitionName "EventGrid EventSubscription Contributor" `
-ResourceGroupName $resourceGroup



#Add Role assignment for Storage account
New-AzRoleAssignment  `
-ApplicationId $servicePrincipal.AppId  `
-RoleDefinitionName "Storage Blob Data Contributor" `
-Scope $storageAccountResourceType

#Add Role assignment for Storage account
New-AzRoleAssignment  `
-ApplicationId $servicePrincipal.AppId  `
-RoleDefinitionName "Contributor" `
-Scope $storageAccountResourceType

#Add Role assignment for Storage account
New-AzRoleAssignment  `
-ApplicationId $servicePrincipal.AppId  `
-RoleDefinitionName "Storage Queue Data Contributor" `
-Scope $storageAccountResourceType

#Python code generation starts from here
$PythonCodeBlock = @'

configs = {"fs.azure.account.auth.type": "OAuth",
          "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
          "fs.azure.account.oauth2.client.id": "clientID",
          "fs.azure.account.oauth2.client.secret":"clsecret",
          "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/tenantID/oauth2/token"}

# Please run the mount command only once after that you do not have to run it otherwise it willl throw error
dbutils.fs.mount(
  source = "abfss://filesystemName@storageAccountName.dfs.core.windows.net/",
  mount_point = "/mnt/",
  extra_configs = configs)



from pyspark.sql.functions import *
from pyspark.sql import *
from pyspark.sql.types import StringType, IntegerType, StructType, StructField, TimestampType, DoubleType, DateType;
from pyspark.sql.types import *


#InputDirectory and Checkpoint Location
SourceFilePath = "/mnt/file/"
CheckpointPath = "/mnt/StreamCheckpoint/"
WritePath = "/mnt/delta/"
schemaLocation= "/mnt/schema/"

#Define Schema for the Incoming files
schema = StructType([StructField('Employee_First_Name', StringType(), True),
                     StructField('Employee_Last_Name', StringType(), True),
                     StructField('Date_Of_Joining', DateType(), True)]
                       )
dbutils.fs.unmount("/mnt/Live Network /")
readquery = (spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("cloudFiles.useNotifications" , "true")
  .option("cloudFiles.includeExistingFiles", "true")
  .option("cloudFiles.resourceGroup", "ResourceGroup")
  .option("cloudFiles.subscriptionId", "subscriptionID")
  .option("cloudFiles.tenantId", "tenantID")
  .option("cloudFiles.clientId","clientID")
  .option("cloudFiles.clientSecret","clsecret")
  .schema(schema)
  .option("mergeSchema", "true")
  .option("cloudFiles.schemaLocation",schemaLocation)
  .load(SourceFilePath)
            )

readquery.writeStream.trigger(once=True).format('delta').option('checkpointLocation', CheckpointPath).start(WritePath)

#You can write the below line of code into another cell of databricks
import time
time.sleep(15)
df = spark.read.format('delta').load(WritePath)
df.count()
'@
$FinalCodeBlock=$PythonCodeBlock.`
Replace("clientID",$clientID).
Replace("ResourceGroup",$resourceGroup).`
Replace("clsecret",$secret).`
Replace("tenantID",$tenantID).`
Replace("subscriptionID",$subscriptionID).`
Replace("filesystemName",$filesystemName).`
Replace("incomingDirName",$incomingDirName).`
Replace("checkpointDirName",$checkpointDirName).`
Replace("deltaDirName",$deltaDirName).`
Replace("schemaDirName",$schemaDirName).`
Replace("accessKey",$accessKey.value).`
Replace("storageAccountName",$storageAccountName).`
Replace("mnt",$mntPoint)

$FinalCodeBlock | out-file code.txt

Now upload the csv file into folder named file and run the autoloader code. you will see the record count changed. Now upload another csv file with the same schema and run the streaming code above and verify the count it will display the increased count.

Here is the Databricks notebook code. This is the code generated from the above PowerShell script:

I hope this helps.

Leave a Reply

Your email address will not be published. Required fields are marked *