Delta table operations on Azure Databricks

July 10, 2023 13 mins to read

Introduction

Delta tables, combined with the power of PySpark, offer a comprehensive solution for managing big data workloads. In this post, we will explore the fundamentals of Delta tables and understand how we can interact with them using PySpark. We will take some code from my previous post Ingest data from an API with Databricks and PySpark and explore some use cases I found very interesting from the official delta cheat sheet[1] and the documentation[2].

Prerequisites

Table of contents

Delta tables

Delta tables are an essential component of Delta Lake, an open-source storage layer that brings ACID transactions, schema evolution, and time travel capabilities to big data workloads. They are built on top of Parquet files, a columnar storage format that provides efficient compression and encoding of data.

Delta tables provide a unified storage solution that combines the scalability of big data processing engines with the reliability and efficiency of transactional systems. They allow you to perform atomic, consistent, isolated, and durable (ACID) operations on your data, ensuring data integrity and enabling efficient data ingestion, processing, and analytics workflows.

Initial setup

  • Sign up for a free account on weatherapi.com, copy the API_KEY and create a secret in your Key Vault named weather-api-key
  • Generate a SAS Token and save it in your Key Vault with the name sas-token-sta-d-001
  • Create a container called dls in your storage account. If you haven’t done it before, you can learn how to do it here.
  • Clone the repository https://github.com/AStefanachi/delta-tables-tutorial.git  into your Databricks workspace
  • Schedule the current.py notebook to be executed every 15 minutes. After 1 hour remove the schedule and delete the job: you don’t want to spend more than necessary, right?

current.py

The current.py notebook contains the code to perform an API call to the current.json endpoint of the WeatherAPI in a similar way it was presented in my previous post, except for few modifications.

# Define the Delta table name and path
TABLE_NAME =  "current"
TABLE_PATH = f"abfss://{DLS_CONTAINER}@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/" \
    f"weather_api/current/{TABLE_NAME}"

# Setting SAS Configuration
spark.conf.set(f"fs.azure.account.auth.type.{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net", "SAS")
spark.conf.set(f"fs.azure.sas.token.provider.type.{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set(f"fs.azure.sas.fixed.token.{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net", STORAGE_ACCOUNT_SAS_TOKEN)

# Selecting df in order
cities_df_union = cities_df_union.select(cities_df_union.columns)

# Write the DataFrame as a Delta table to the external storage account
cities_df_union.write \
  .option('path', TABLE_PATH) \
  .mode('append') \
  .saveAsTable('bronze_weather.current')

 

  • Even though Databricks strongly recommend the use of the Unity Catalog external locations or managed identities, for simplicity, we will modify the settings of the spark properties directly in the notebook[3]
  • Databricks depecrated the use of Windows Azure Storage Blob driver (wasbs)[4] in favor of the Azure Blob File System driver (abfss)[5] hence the application for this post
  • In the last code block we are going to write our cities_df_union data frame in the external storage account in the delta format and save it as a table in Databricks. In this way, we just created an external table: a table that references an existing storage path in an external location and provides an abstraction layer that hides the storage and credential details for users who are granted access.
  • If this is the first time that you execute this code and save the json response as delta table, my recommendation is to change the write mode to .mode('overwrite') for the first run, and switch it back to .mode('append') from the second run onwards

delta_table_operations.py

Let’s dissect the several commands adding more knowledge about Delta Tables.

The anatomy of a delta table

# According to the cluster configuration, spark creates snappy compressed parquet files.
# In my case, having 4 cores and a limited dataset, I have a number of files multiple of 4
fi_path = [fi.path for fi in dbutils.fs.ls(TABLE_PATH)]
fi_path

In this simple list comprehension we loop through the files contained at our TABLE_PATH. By printing it we reveal a multitude of snappy compressed .parquet files, in multiple of how many executors your cluster is composed of, and a _delta_log table which makes the first layer of our delta table.

_delta_log folder: transaction logs

# The _delta_log folder contains transaction information and metadata about the table
# every 10 commits a checkpoint file will be generated
# more info here: https://www.databricks.com/blog/2019/08/21/diving-into-delta-lake-unpacking-the-transaction-log.html
fi_path_delta_log = [fi.path for fi in dbutils.fs.ls(TABLE_PATH + "/_delta_log")]
fi_path_delta_log

Looping through the objects contained in the _delta_log folder we reveal the heart of our delta table.

When you create a delta table, it automatically keeps a record of all the changes made in a special folder called the _delta_log. Every change, or commit, is saved as a JSON file with a number, starting with 000000.json and increasing numerically.

Imagine you add some records to your table from files called 1.parquet and 2.parquet. These changes are saved in the first commit, 000000.json. Later, you change your mind and remove those files, adding a new file called 3.parquet. This will be recorded in the next commit (000001.json).

# What's inside a transaction file?
spark.read.json(f"abfss://dls@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/weather_api/current/current/_delta_log/00000000000000000018.json").display()

In our case, every time we run the current.py notebook we record new weather information about our favorite cities. Since we set the ingestion to append mode, we can see under the column add all the information that was committed.

Even if you remove files like 1.parquet and 2.parquet from the delta table, their history is still stored in the transaction log. This allows you to review previous table states and see what your table looked like at any given point in time.

Important things to keep in mind:

  • Spark won’t automatically delete files from your storage, even if they’re removed from the table. If you want to get rid of those old files, you can either use the VACUUM command or delete them manually from your storage location
  • If for any reason your data is lost in your Databricks workspace (a delete operation went wrong), you can always recreate your table with all its history by reading the delta table in a dataframe and saving it as a table in Spark

Checkpoint files

# What's inside a checkpoint file?
spark.read.parquet(f"abfss://dls@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/weather_api/current/current/_delta_log/00000000000000000010.checkpoint.parquet").display()

After making several changes to a table, Databricks creates a checkpoint file in a special folder. This handy file is in .parquet, which is easy and fast for Spark (a data processing tool) to read. Checkpoint files act as a shortcut, showing the table’s state at a specific moment. In this way spark doesn’t need to go through all the JSON transaction log which would hinder its performance.

To update the table quickly, Spark looks at the latest checkpoint file and only processes the new changes since that file was made.

Imagine you’ve made changes to a table, saved as commits from 000000.json through 000007.json. Spark is up-to-date with these changes and has them stored in memory. But, other people made more changes, creating commits up to 000012.json.

To catch up with these new changes, Spark first looks at the checkpoint file, which has the table’s state until commit number 10. Instead of processing all JSON files from the beginning, Spark can just focus on the new ones (000011.json and 000012.json). This helps Databricks keep the table’s state updated efficiently at all times.

Reading a delta table

There are several ways to read a delta table, here we will explore a couple of options: read the data from the already existing external Spark table or using the DeltaTable class.

Read an external table

# Reading a delta table into a spark dataframe
current_forecast = spark.read.table('bronze_weather.current')
current_forecast.display()

Read an external table using the DeltaTable class

#Read delta table as DeltaTable object
from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, 'bronze_weather.current')
# Convert the Delta Table into a spark data frame and display the content
deltaTable.toDF().display()

These two code snippets are self explanatory. In the first one we read the external table we created in the current.py notebook in a spark dataframe, in the second one we do the same by initializing the DeltaTable class and using the method .toDF() so we can easily visualize its content.

Describe detail

# information about the table such as size, name, location (for external tables)
deltaTable.detail().display()

The detail method[7] retrieves attributes such as the table’s size, name, and location (very useful for our case, as we are managing an external table) and display() is used afterward to display this information in an easily readable format.

Time travel

Now we know from the previous points that delta tables are able to maintain the state of our data over time for us to recover it. In simple terms, if I make some modifications of any sort to my table at a certain point in time I can always recover my data from the previous versions of my table, as long as I have a healthy and complete _delta_log folder.

Human error

# We simulate an human error, which will delete all records
deltaTable.delete()

Be careful with this command: this is done intentionally for educational purposes, if you are feeling adventurous and you want to experiment with the  .delete() method of the DeltaTable class, I recommend to read more in the documentation[6].

With all that said, let’s simulate a human error: for some reason all the records of our table are lost.

Describe history

# Display table version history to identify transactions
deltaTable.history().display()

history() is a method[8] that retrieves a complete log of all transactions made to this specific table. It returns information such as the version of the table at the time of the transaction, the time the transaction was performed, the user who performed it, and the type of change (like adding data, updating existing data, or deleting data).

Important note:

  • In my case in the version 19 the delete operation has been performed, your version will be different
  • The following code snippets and examples are based on the versions I had on my delta table, adjust them accordingly

Comparing delta table versions

# In my case, version 18 is the most recent one with all the data I lost.
# Version 18: before delete
# Version 19: after delete
version19 = spark.read.option('versionAsOf', 19).table('bronze_weather.current')
version18 = spark.read.option('versionAsOf', 18).table('bronze_weather.current')

print("Records which have been deleted between version 19 and 18 \n\r")
version18.exceptAll(version19).display()

Even though this step is completely optional, it hold a high educational value so I recommend not to skip it.

Finally we get to one of my favorite features of Delta tables: time travelling which allows you to access and recover historical versions of the data.

Here’s a breakdown of the code

  • line 4: This line tells Spark to read data from the bronze_weather.current table as it existed at version 19, after data was deleted, and assigns it to version19
  • line 5: Here, Spark reads the table as it was at version 18, which was before the data was deleted, and assigns it to version18
  • line 8: This line compares the two versions of the data. exceptAll finds all rows in version18 (before delete) that aren’t in version19 (after delete). Basically, it identifies the data that got deleted. display() then shows this data.

So, to put it simply, this code lets you see the data that was accidentally removed between the 18th and the 19th versions of your Delta table.

Restoring our data back to a previous version

#Restore table as of a precedent version
deltaTable.restoreToVersion(18)

This line literally “turns back time” on your deltaTable. The restoreToVersion method takes as input the version number (18 in this case) and restores the table to what it was at that specific version[9].

Conclusion

In summary, this post showcases the powerful synergy between Delta tables and PySpark on Azure Databricks for handling data operations. From transactional reliability to data recovery and version control, Delta tables offer essential features to simplify and enhance big data processing. By implementing the insights and practices presented in this post, you’ll be well-equipped to unlock the full potential of these tools and drive data-driven decision-making in your organization.

References