Introduction
Delta tables, combined with the power of PySpark, offer a comprehensive solution for managing big data workloads. In this post, we will explore the fundamentals of Delta tables and understand how we can interact with them using PySpark. We will take some code from my previous post Ingest data from an API with Databricks and PySpark and explore some use cases I found very interesting from the official delta cheat sheet[1] and the documentation[2].
Prerequisites
Table of contents
Delta tables
Delta tables are an essential component of Delta Lake, an open-source storage layer that brings ACID transactions, schema evolution, and time travel capabilities to big data workloads. They are built on top of Parquet files, a columnar storage format that provides efficient compression and encoding of data.
Delta tables provide a unified storage solution that combines the scalability of big data processing engines with the reliability and efficiency of transactional systems. They allow you to perform atomic, consistent, isolated, and durable (ACID) operations on your data, ensuring data integrity and enabling efficient data ingestion, processing, and analytics workflows.
Initial setup
API_KEY
and create a secret in your Key Vault named weather-api-key
sas-token-sta-d-001
dls
in your storage account. If you haven’t done it before, you can learn how to do it here.https://github.com/AStefanachi/delta-tables-tutorial.git
into your Databricks workspacecurrent.py
notebook to be executed every 15 minutes. After 1 hour remove the schedule and delete the job: you don’t want to spend more than necessary, right?current.py
The current.py notebook contains the code to perform an API call to the current.json endpoint of the WeatherAPI in a similar way it was presented in my previous post, except for few modifications.
# Define the Delta table name and path TABLE_NAME = "current" TABLE_PATH = f"abfss://{DLS_CONTAINER}@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/" \ f"weather_api/current/{TABLE_NAME}" # Setting SAS Configuration spark.conf.set(f"fs.azure.account.auth.type.{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net", "SAS") spark.conf.set(f"fs.azure.sas.token.provider.type.{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider") spark.conf.set(f"fs.azure.sas.fixed.token.{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net", STORAGE_ACCOUNT_SAS_TOKEN) # Selecting df in order cities_df_union = cities_df_union.select(cities_df_union.columns) # Write the DataFrame as a Delta table to the external storage account cities_df_union.write \ .option('path', TABLE_PATH) \ .mode('append') \ .saveAsTable('bronze_weather.current')
cities_df_union
data frame in the external storage account in the delta format and save it as a table in Databricks. In this way, we just created an external table: a table that references an existing storage path in an external location and provides an abstraction layer that hides the storage and credential details for users who are granted access..mode('overwrite')
for the first run, and switch it back to .mode('append')
from the second run onwardsdelta_table_operations.py
Let’s dissect the several commands adding more knowledge about Delta Tables.
The anatomy of a delta table
# According to the cluster configuration, spark creates snappy compressed parquet files. # In my case, having 4 cores and a limited dataset, I have a number of files multiple of 4 fi_path = [fi.path for fi in dbutils.fs.ls(TABLE_PATH)] fi_path
In this simple list comprehension we loop through the files contained at our TABLE_PATH
. By printing it we reveal a multitude of snappy compressed .parquet
files, in multiple of how many executors your cluster is composed of, and a _delta_log
table which makes the first layer of our delta table.
_delta_log folder: transaction logs
# The _delta_log folder contains transaction information and metadata about the table # every 10 commits a checkpoint file will be generated # more info here: https://www.databricks.com/blog/2019/08/21/diving-into-delta-lake-unpacking-the-transaction-log.html fi_path_delta_log = [fi.path for fi in dbutils.fs.ls(TABLE_PATH + "/_delta_log")] fi_path_delta_log
Looping through the objects contained in the _delta_log
folder we reveal the heart of our delta table.
When you create a delta table, it automatically keeps a record of all the changes made in a special folder called the _delta_log
. Every change, or commit, is saved as a JSON file with a number, starting with 000000.json
and increasing numerically.
Imagine you add some records to your table from files called 1.parquet
and 2.parquet
. These changes are saved in the first commit, 000000.json
. Later, you change your mind and remove those files, adding a new file called 3.parquet
. This will be recorded in the next commit (000001.json
).
# What's inside a transaction file? spark.read.json(f"abfss://dls@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/weather_api/current/current/_delta_log/00000000000000000018.json").display()
In our case, every time we run the current.py
notebook we record new weather information about our favorite cities. Since we set the ingestion to append mode, we can see under the column add all the information that was committed.
Even if you remove files like 1.parquet
and 2.parquet
from the delta table, their history is still stored in the transaction log. This allows you to review previous table states and see what your table looked like at any given point in time.
Important things to keep in mind:
Checkpoint files
# What's inside a checkpoint file? spark.read.parquet(f"abfss://dls@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/weather_api/current/current/_delta_log/00000000000000000010.checkpoint.parquet").display()
After making several changes to a table, Databricks creates a checkpoint file in a special folder. This handy file is in .parquet, which is easy and fast for Spark (a data processing tool) to read. Checkpoint files act as a shortcut, showing the table’s state at a specific moment. In this way spark doesn’t need to go through all the JSON transaction log which would hinder its performance.
To update the table quickly, Spark looks at the latest checkpoint file and only processes the new changes since that file was made.
Imagine you’ve made changes to a table, saved as commits from 000000.json
through 000007.json
. Spark is up-to-date with these changes and has them stored in memory. But, other people made more changes, creating commits up to 000012.json
.
To catch up with these new changes, Spark first looks at the checkpoint file, which has the table’s state until commit number 10. Instead of processing all JSON files from the beginning, Spark can just focus on the new ones (000011.json
and 000012.json
). This helps Databricks keep the table’s state updated efficiently at all times.
Reading a delta table
There are several ways to read a delta table, here we will explore a couple of options: read the data from the already existing external Spark table or using the DeltaTable
class.
Read an external table
# Reading a delta table into a spark dataframe current_forecast = spark.read.table('bronze_weather.current') current_forecast.display()
Read an external table using the DeltaTable class
#Read delta table as DeltaTable object from delta.tables import DeltaTable deltaTable = DeltaTable.forName(spark, 'bronze_weather.current') # Convert the Delta Table into a spark data frame and display the content deltaTable.toDF().display()
These two code snippets are self explanatory. In the first one we read the external table we created in the current.py
notebook in a spark dataframe, in the second one we do the same by initializing the DeltaTable
class and using the method .toDF()
so we can easily visualize its content.
Describe detail
# information about the table such as size, name, location (for external tables) deltaTable.detail().display()
The detail
method[7] retrieves attributes such as the table’s size, name, and location (very useful for our case, as we are managing an external table) and display()
is used afterward to display this information in an easily readable format.
Time travel
Now we know from the previous points that delta tables are able to maintain the state of our data over time for us to recover it. In simple terms, if I make some modifications of any sort to my table at a certain point in time I can always recover my data from the previous versions of my table, as long as I have a healthy and complete _delta_log
folder.
Human error
# We simulate an human error, which will delete all records deltaTable.delete()
Be careful with this command: this is done intentionally for educational purposes, if you are feeling adventurous and you want to experiment with the .delete()
method of the DeltaTable
class, I recommend to read more in the documentation[6].
With all that said, let’s simulate a human error: for some reason all the records of our table are lost.
Describe history
# Display table version history to identify transactions deltaTable.history().display()
history()
is a method[8] that retrieves a complete log of all transactions made to this specific table. It returns information such as the version of the table at the time of the transaction, the time the transaction was performed, the user who performed it, and the type of change (like adding data, updating existing data, or deleting data).
Important note:
Comparing delta table versions
# In my case, version 18 is the most recent one with all the data I lost. # Version 18: before delete # Version 19: after delete version19 = spark.read.option('versionAsOf', 19).table('bronze_weather.current') version18 = spark.read.option('versionAsOf', 18).table('bronze_weather.current') print("Records which have been deleted between version 19 and 18 \n\r") version18.exceptAll(version19).display()
Even though this step is completely optional, it hold a high educational value so I recommend not to skip it.
Finally we get to one of my favorite features of Delta tables: time travelling which allows you to access and recover historical versions of the data.
Here’s a breakdown of the code
bronze_weather.current
table as it existed at version 19, after data was deleted, and assigns it to version19
version18
exceptAll
finds all rows in version18
(before delete) that aren’t in version19
(after delete). Basically, it identifies the data that got deleted. display()
then shows this data.So, to put it simply, this code lets you see the data that was accidentally removed between the 18th and the 19th versions of your Delta table.
Restoring our data back to a previous version
#Restore table as of a precedent version deltaTable.restoreToVersion(18)
This line literally “turns back time” on your deltaTable. The restoreToVersion method takes as input the version number (18 in this case) and restores the table to what it was at that specific version[9].
Conclusion
In summary, this post showcases the powerful synergy between Delta tables and PySpark on Azure Databricks for handling data operations. From transactional reliability to data recovery and version control, Delta tables offer essential features to simplify and enhance big data processing. By implementing the insights and practices presented in this post, you’ll be well-equipped to unlock the full potential of these tools and drive data-driven decision-making in your organization.
References