Ingest data from an API with Databricks and PySpark

June 24, 2023 10 mins to read

Introduction

In a simple batch architecture, querying an API, is a very common task. Data is rendered available through an exposed endpoint so external services can programmatically extract the necessary data. As this is a very common use case for data engineering, let me show you how Azure Databricks can gracefully handle this scenario.

Prerequisites

Table of contents

Architecture

It has been few years now that I live in Hamburg, Germany and as any of its residents. Even though the Germans teach you “…there is no bad weather, just wrong clothing…” I quickly learned about how important is to be informed about weather.

  • API: for this post we will use weatherapi.com and we will query two endpoints, current and forecast. While current will return the up to date weather information in json and xml about a city, forecast will return astronomy data, day weather forecast and hourly interval weather information for a given city[1]
  • Databricks Workspace: this solution consists of several notebooks
      • cities_astro: handles the call to the forecast.json endpoint and saves the astronomical information (sunrise, sunset etc.) about a city
      • classes: holds the WeatherAPI object and the necessary methods to call the current.json and forecast.json endpoints
      • constants: contains the call to the Key Vault to get the API_KEY and a list CITIES
      • forecast: handles the call to the forecast.json endpoint and saves the hourly weather information about a city
      • current: handles the call to the current.json endpoint and saves the current weather information about a city
      • functions: contains helper functions to clean the json response from the api and add some metadata
      • test_current (optional): In the repository you will find this notebook. Take it as a template to run your tests for your data
  • Key Vault: weatherapi.com requires an API_KEY for authenticate, we store it here for security
  • Workflows: since our data sources can change at a difference pace in time, we will schedule our ingestion jobs differently
    • cities_astro: once a day
    • current: once per hour
    • forecast: every 4 hours
  • Databricks Database (managed): we are going to store our data three managed tables called bronze_weather.cities_astro, bronze_weather.current and bronze_weather.hourly_forecasts

Initial Setup

  • Sign up for a free account on weatherapi.com, copy the API_KEY and create a secret in your Key Vault named weather-api-key
  • Clone the repository https://github.com/AStefanachi/databricks-ingest-api.git in your Databricks Workspace
  • Feel free to change or expand the CITIES list in the constants.py notebook

Helper notebooks

The notebooks constants.py, functions.py and classes.py will serve the other notebooks which will handle the call to the API, therefore I refer to them as helper.

constants.py

# Databricks notebook source
# DBTITLE 1,Constants
SECRET_SCOPE = "kwdwe001"

# List of cities
CITIES = ["hamburg", "berlin", "bremen", "frankfurt"]

# Api key from weatherapi.com
API_KEY = dbutils.secrets.get(SECRET_SCOPE, "weather-api-key")

In the optic of a future development, having a separate notebook holding the constants can be very handy.

Don’t forget to change the SECRET_SCOPE to your current secret scope in your Databricks Workspace, otherwise it won’t be able to read the secret from your Key Vault.

functions.py

# Databricks notebook source
# DBTITLE 1,Imports
from datetime import datetime, date
import uuid
import pytz

# COMMAND ----------

# DBTITLE 1,Functions
def replace_dots_in_columns(sdf):
    """
    Replaces the . contained in the dataframe column names with an _ and returns a pyspark dataframe
    """
    for col in sdf.columns:
        sdf = sdf.withColumnRenamed(col, col.replace('.', '_'))
    return sdf

# COMMAND ----------

def run_metadata():
    """
    Returns a dictionary containing metadata about the latest Run according to the CET time zone
    """
    cet = pytz.timezone('CET')
    _run_date = str(date.today())
    _run_timestamp = str(datetime.now().replace(tzinfo=pytz.utc).astimezone(cet))
    _run_id = str(uuid.uuid1())
    return { "_run_date": _run_date, "_run_timestamp": _run_timestamp, "_run_id": _run_id }
  • replace_dots_in_column(sdf) expects a spark data frame. It loops through the columns and replaces the dots with underscore
  • As written in the docstring run_metadata() returns a dictionary containing the infromation about the latest run according to the CET timezone

References:

classes.py

# Databricks notebook source
# DBTITLE 1,Imports
import requests

# COMMAND ----------

# DBTITLE 1,Classes
class WeatherAPI:
    def __init__(self, api_key, city):
        """
        Initialize 3 variables:
        - api_key: secret API key provided by weatherapi.com
        - city: parameter that need to be passed in lowercase
        - API_URL: fixed value
        """
        self.api_key = api_key
        self.city = city
        self.API_URL = "http://api.weatherapi.com/v1/"
    
    def get_json_response(self, endpoint):
        """
        Passing the parameters to the endpoint inclusive of the .json extension
        will return a json formatted response
        """
        request = self.API_URL + endpoint
        params = {
            "key": self.api_key,
            "q": self.city
        }
        response = requests.get(request, params=params)
        return response.json()

In this simple implementation I have identified the WeatherAPI() object as the representation of the namesake API. In the  method __init__(self, api_key, city)the class initialized its core values. In the method get_json_response(self, endpoint), after passing the desired endpoint as parameter, it returns a json formatted response.

Code references:

Ingesting API data

The notebooks cities_astro.py, current.py and forecast.py have basically all the same structure. For simplicity we will analyze only the code of current.py

# Databricks notebook source
# DBTITLE 1,Constants notebook
# MAGIC %run ./constants

# COMMAND ----------

# DBTITLE 1,Classes notebook
# MAGIC %run ./classes

# COMMAND ----------

# DBTITLE 1,Classes functions
# MAGIC %run ./functions

# COMMAND ----------

# DBTITLE 1,Imports
from pyspark.sql import functions as F
import pandas as pd
from functools import reduce

# COMMAND ----------

# DBTITLE 1,Normalize the json response into a list of cleaned dataframes
cities_current_weather = [
        replace_dots_in_columns(
            spark.createDataFrame(
                pd.json_normalize(
                    WeatherAPI(API_KEY, city).get_json_response("current.json")
                    )
                ) 
        )
    for city in CITIES
]

# COMMAND ----------

# DBTITLE 1,Reduce into a single dataframe
cities_df_union = reduce(lambda acc, curr: acc.union(curr), cities_current_weather)

# COMMAND ----------

# DBTITLE 1,Adding the run_metadata() information via a crossJoin to the cities_df_union dataframe
metadata_df = pd.DataFrame([run_metadata()])
metadata_df = spark.createDataFrame(metadata_df)
cities_df_union = cities_df_union.crossJoin(metadata_df)

# COMMAND ----------

# DBTITLE 1,Check if the schema exists, otherwise create
spark.sql("CREATE SCHEMA IF NOT EXISTS bronze_weather")

# COMMAND ----------

# DBTITLE 1,Create table in append mode to build history
cities_df_union.write.mode("append").saveAsTable("bronze_weather.current")
  1. After running the helper notebooks and the imports we begin with the normalization of the json response. In order to do that, in a list comprehension:
    1. we loop through the CITIES constants
    2. for each city we initialize a WeatherAPI() object and we use the get_json_response("current.json") method to perform the API call
    3. the pandas method json_normalize() converts the json response into a pandas dataframe
    4. the spark method createDataFrame converts the pandas dataframe in a spark dataframe
    5. the function replace_dots_in_columns() cleans the spark dataframe columns for ingestion
  2. As result of this operation we will have a list of spark dataframes called cities_current_weather
  3. By using the reduce function we union each dataframe in the list resulting in a single dataframe called cities_df_union.
  4. We read the run_metadata() function information in a list and parse it into a pandas dataframe and later in a spark dataframe. With a crossJoin we apply the run metadata information to all the rows of the cities_df_union dataframe
  5. If it doesn’t exist yet we create the schema bronze_weather
  6. Using the spark dataframe write property, we saveAsTable the dataframe in append mode

Code references:

Import the notebooks in the main workspace

 

  1. Click on Repos
  2. Click on your linked Git Hub repository
  3. Right click on the weather folder
  4. Click on Export
  5. Click on Source file
  6. Save the file onto your desired location

  1. Click on Workspace
  2. Right click
  3. Click on Import

 

  1. Click on browse
  2. Select the weather.zip file you extracted in the previous step
  3. Click on open

  1. Click on Import

This will import the weather folder containing all the notebooks from the repository rendering it available for scheduling the ingestion.

Databricks Workflows

Azure Databricks Workflows[2] is a platform that enables users to orchestrate data processing, machine learning, and analytics pipelines on the Azure Databricks Lakehouse platform. Workflows provides fully managed orchestration services integrated with the Azure Databricks platform, including Azure Databricks Jobs to run non-interactive code in your Azure Databricks workspace and Delta Live Tables to build reliable and maintainable ETL pipelines. With Workflows, users can create and run jobs using the Jobs UI, the Databricks CLI, or by invoking the Jobs API. Workflows also allows users to control the execution order of tasks by specifying dependencies between the tasks and to run jobs interactively or continuously. Overall, Azure Databricks Workflows provides a comprehensive framework for building and deploying end-to-end data processing and analysis workflows.

For our case we are interested of the Schedule[3] feature in order to control the run of our notebooks with a programmed frequence.

Scheduling execution of notebooks

In our architecture we expect to have three separate ingestions via three notebooks: cities_astro, current and forecast. We will now create a job for each notebook with the following schedule:

  • current: job_cluster, one execution per hour
  • cities_astro: job_cluster, one execution per day
  • forecast: job_cluster, one execution every 6 hours

I will show you how to create the ingest_current job and schedule its execution. The other jobs can be created and scheduled according to the same steps.

  1. Click on the Workflow icon
  2. Click on Create Job

  1. Insert a Task name
  2. If not yet selected, select Notebook
  3. If not yet selected, select Workspace
  4. Click to open the context menu and select the notebook at the /weather/current path
  5. Select Job_cluster[4]. In this case, Databricks will deploy an ad-hoc cluster for our job. It will spin it up, execute the job and remove the cluster after its execution. With the right settings it will save cost in the long run.
  6. Click on Create

Once the job is created, click on the Workflows icon from the left menu, then click on the job name to access it.

  1. Click on Add schedule
  2. Select Scheduled for Trigger type
  3. Select Every 1Hour, at 1 minute past the hour and (UTC+02:00)

Now that we have scheduled the execution of the ingest_current job every minute past one hour for the Central Europe Time zone, you can proceed to schedule the other notebooks.

Cleanup

As this operation involves the programmed execution of notebooks it will create costs. Terminate your cluster and remove the schedule of every jobs you created in order not to have bad surprises.

References

  1. Weather API, documentation, https://www.weatherapi.com/docs/
  2. Databricks Workflows, https://learn.microsoft.com/en-us/azure/databricks/workflows/
  3. Databricks Workflows, Schedule, https://learn.microsoft.com/en-us/azure/databricks/workflows/jobs/schedule-jobs
  4. Databricks Clusters, https://learn.microsoft.com/en-us/azure/databricks/clusters/

1 Comment on “Ingest data from an API with Databricks and PySpark”

Comments are closed.