Introduction
In a simple batch architecture, querying an API, is a very common task. Data is rendered available through an exposed endpoint so external services can programmatically extract the necessary data. As this is a very common use case for data engineering, let me show you how Azure Databricks can gracefully handle this scenario.
Prerequisites
Table of contents
Architecture
It has been few years now that I live in Hamburg, Germany and as any of its residents. Even though the Germans teach you “…there is no bad weather, just wrong clothing…” I quickly learned about how important is to be informed about weather.
current
will return the up to date weather information in json and xml about a city, forecast
will return astronomy data, day weather forecast and hourly interval weather information for a given city[1]forecast.json
endpoint and saves the astronomical information (sunrise, sunset etc.) about a cityWeatherAPI
object and the necessary methods to call the current.json
and forecast.json
endpointsAPI_KEY
and a list CITIES
forecast.json
endpoint and saves the hourly weather information about a citycurrent.json
endpoint and saves the current weather information about a cityAPI_KEY
for authenticate, we store it here for securitybronze_weather.cities_astro
, bronze_weather.current
and bronze_weather.hourly_forecasts
Initial Setup
API_KEY
and create a secret in your Key Vault named weather-api-key
https://github.com/AStefanachi/databricks-ingest-api.git
in your Databricks WorkspaceHelper notebooks
The notebooks constants.py
, functions.py
and classes.py
will serve the other notebooks which will handle the call to the API, therefore I refer to them as helper.
constants.py
# Databricks notebook source # DBTITLE 1,Constants SECRET_SCOPE = "kwdwe001" # List of cities CITIES = ["hamburg", "berlin", "bremen", "frankfurt"] # Api key from weatherapi.com API_KEY = dbutils.secrets.get(SECRET_SCOPE, "weather-api-key")
In the optic of a future development, having a separate notebook holding the constants can be very handy.
Don’t forget to change the SECRET_SCOPE
to your current secret scope in your Databricks Workspace, otherwise it won’t be able to read the secret from your Key Vault.
functions.py
# Databricks notebook source # DBTITLE 1,Imports from datetime import datetime, date import uuid import pytz # COMMAND ---------- # DBTITLE 1,Functions def replace_dots_in_columns(sdf): """ Replaces the . contained in the dataframe column names with an _ and returns a pyspark dataframe """ for col in sdf.columns: sdf = sdf.withColumnRenamed(col, col.replace('.', '_')) return sdf # COMMAND ---------- def run_metadata(): """ Returns a dictionary containing metadata about the latest Run according to the CET time zone """ cet = pytz.timezone('CET') _run_date = str(date.today()) _run_timestamp = str(datetime.now().replace(tzinfo=pytz.utc).astimezone(cet)) _run_id = str(uuid.uuid1()) return { "_run_date": _run_date, "_run_timestamp": _run_timestamp, "_run_id": _run_id }
replace_dots_in_column(sdf)
expects a spark data frame. It loops through the columns and replaces the dots with underscorerun_metadata()
returns a dictionary containing the infromation about the latest run according to the CET timezoneReferences:
classes.py
# Databricks notebook source # DBTITLE 1,Imports import requests # COMMAND ---------- # DBTITLE 1,Classes class WeatherAPI: def __init__(self, api_key, city): """ Initialize 3 variables: - api_key: secret API key provided by weatherapi.com - city: parameter that need to be passed in lowercase - API_URL: fixed value """ self.api_key = api_key self.city = city self.API_URL = "http://api.weatherapi.com/v1/" def get_json_response(self, endpoint): """ Passing the parameters to the endpoint inclusive of the .json extension will return a json formatted response """ request = self.API_URL + endpoint params = { "key": self.api_key, "q": self.city } response = requests.get(request, params=params) return response.json()
In this simple implementation I have identified the WeatherAPI()
object as the representation of the namesake API. In the method __init__(self, api_key, city)
the class initialized its core values. In the method get_json_response(self, endpoint)
, after passing the desired endpoint as parameter, it returns a json formatted response.
Code references:
Ingesting API data
The notebooks cities_astro.py, current.py and forecast.py have basically all the same structure. For simplicity we will analyze only the code of current.py
# Databricks notebook source # DBTITLE 1,Constants notebook # MAGIC %run ./constants # COMMAND ---------- # DBTITLE 1,Classes notebook # MAGIC %run ./classes # COMMAND ---------- # DBTITLE 1,Classes functions # MAGIC %run ./functions # COMMAND ---------- # DBTITLE 1,Imports from pyspark.sql import functions as F import pandas as pd from functools import reduce # COMMAND ---------- # DBTITLE 1,Normalize the json response into a list of cleaned dataframes cities_current_weather = [ replace_dots_in_columns( spark.createDataFrame( pd.json_normalize( WeatherAPI(API_KEY, city).get_json_response("current.json") ) ) ) for city in CITIES ] # COMMAND ---------- # DBTITLE 1,Reduce into a single dataframe cities_df_union = reduce(lambda acc, curr: acc.union(curr), cities_current_weather) # COMMAND ---------- # DBTITLE 1,Adding the run_metadata() information via a crossJoin to the cities_df_union dataframe metadata_df = pd.DataFrame([run_metadata()]) metadata_df = spark.createDataFrame(metadata_df) cities_df_union = cities_df_union.crossJoin(metadata_df) # COMMAND ---------- # DBTITLE 1,Check if the schema exists, otherwise create spark.sql("CREATE SCHEMA IF NOT EXISTS bronze_weather") # COMMAND ---------- # DBTITLE 1,Create table in append mode to build history cities_df_union.write.mode("append").saveAsTable("bronze_weather.current")
CITIES
constantsWeatherAPI()
object and we use the get_json_response("current.json")
method to perform the API calljson_normalize()
converts the json response into a pandas dataframecreateDataFrame
converts the pandas dataframe in a spark dataframereplace_dots_in_columns()
cleans the spark dataframe columns for ingestioncities_current_weather
reduce
function we union each dataframe in the list resulting in a single dataframe called cities_df_union
.run_metadata()
function information in a list and parse it into a pandas dataframe and later in a spark dataframe. With a crossJoin
we apply the run metadata information to all the rows of the cities_df_union dataframesaveAsTable
the dataframe in append modeCode references:
Import the notebooks in the main workspace
This will import the weather folder containing all the notebooks from the repository rendering it available for scheduling the ingestion.
Databricks Workflows
Azure Databricks Workflows[2] is a platform that enables users to orchestrate data processing, machine learning, and analytics pipelines on the Azure Databricks Lakehouse platform. Workflows provides fully managed orchestration services integrated with the Azure Databricks platform, including Azure Databricks Jobs to run non-interactive code in your Azure Databricks workspace and Delta Live Tables to build reliable and maintainable ETL pipelines. With Workflows, users can create and run jobs using the Jobs UI, the Databricks CLI, or by invoking the Jobs API. Workflows also allows users to control the execution order of tasks by specifying dependencies between the tasks and to run jobs interactively or continuously. Overall, Azure Databricks Workflows provides a comprehensive framework for building and deploying end-to-end data processing and analysis workflows.
For our case we are interested of the Schedule[3] feature in order to control the run of our notebooks with a programmed frequence.
Scheduling execution of notebooks
In our architecture we expect to have three separate ingestions via three notebooks: cities_astro
, current
and forecast
. We will now create a job for each notebook with the following schedule:
current
: job_cluster, one execution per hourcities_astro
: job_cluster, one execution per dayforecast
: job_cluster, one execution every 6 hoursI will show you how to create the ingest_current job and schedule its execution. The other jobs can be created and scheduled according to the same steps.
/weather/current
pathOnce the job is created, click on the Workflows icon from the left menu, then click on the job name to access it.
Now that we have scheduled the execution of the ingest_current
job every minute past one hour for the Central Europe Time zone, you can proceed to schedule the other notebooks.
Cleanup
As this operation involves the programmed execution of notebooks it will create costs. Terminate your cluster and remove the schedule of every jobs you created in order not to have bad surprises.
References
1 Comment on “Ingest data from an API with Databricks and PySpark”
Comments are closed.