Simplify ETL Pipelines using Delta Live Tables
Consider a common scenario of data engineering pipeline where raw data needs to be cleansed, transformed or aggregated before writing to a target system. For this case, usually we create 3-4 tables to store raw data, cleansed data, transformed data and aggregated data respectively. In order to implement, this needs 1) lot of source code to be written to implement cleansing and transformation logic, 2) job orchestration and scheduling platforms includes Airflow , Kubeflow, MLFlow..etc. Also, we might need to use a separate environment for development phase and another environment for scheduling jobs. Now, imagine all these functionalities can be implemented in few lines of code and within a same environment. Say what? That’s your mind voice right. Yes, it is for me too. Databricks has introduced Live tables (part of Delta table) recently which is capable of implementing the functionality. Now, without further ado, let us directly jump on to Live tables characteristics and then a demo.
Characteristics of Live Tables:
- Live tables introduced by Databricks facilitated to create and manage data pipelines that delivers curated high-quality data on Delta Lake.
- Simplifies the data pipeline creation by declarative pipeline model, automatic data testing, less verbose, more insights on job monitoring and recovery.
- Just by specifying the data source, transformation logic and target system, End-to-End data robust data pipelines can be build easily. Thereby, it reduces the manual efforts and time required to brainstorm and assemble the data processing pipelines.
- Enhances data pipeline by preventing corrupted files, bad data through DQ validation and integrity checks.
- Batch and Streaming can be enabled using same version of code.
If you’re following steps below and do not see Delta Live Tables, then it might not be enabled for your account. Please check with Databricks support team for enabling Live tables.
Scenario: Consider a AWS S3 bucket that acts as a target location for streaming data flowing through Kinesis Data Firehose. Streaming data holds details on songs and it’s respective artist details. This demo focusses on building ETL pipelines on top of this S3 bucket for Business Intelligence reporting use cases.
Though there are close to 100 columns available in S3, based on the frequency of column usage from multiple ETL jobs and analytical querying purposes, only 10 columns are loaded to Databricks in form of raw live tables. For this demo, all the queries are executed on Databricks notebook environment running on top of AWS CloudFormation stack. End objective of this demo is to prepare an aggregated form of data, that can be directly used for visualization and business intelligence reporting. Initial part of the demo focusses on creation of required Live tables using Databricks notebooks, whereas last part covers on monitoring and job scheduling.
- Raw Live Table: For most of our Data Engineering use cases, we always have raw data available in a database or filesystem. Though most of the production cases depends on curated, cleansed, aggregated form of data, it always good to keep the raw data available in some form. Live table can be created from data available in S3 as below:
CREATE LIVE TABLE live.songs_list_raw_tbl
COMMENT "Raw Table created to experiment Delta Live Tables. Holds Details on Songs and it's corresponding artist"
AS SELECT * FROM parquet.`s3://delta-live-tables-testing/songs_list`
Once the data gets loaded to Live Table, it can be queried for validation purposes. Sneak peek on the data available in Raw Live table is as follows
2. Curated Live Table: Consuming raw data directly for our ETL jobs or querying needs lot of cleansing to be done before proceeding further. The issue with this cleansing is it needs to be preformed in every job before publishing data to downstream consumers. So, it’s better to have a separate table that has curated, projected (columns only required), valid rows to remove duplicate, nullable data. In this way, we can avoid performing cleansing operation in every job and implementing row level and column level constraints on data. Live tables supports DQ level checks using
CONSTRAINT keyword in required syntax. Below code shows the creation of curated live table with below conditions: a) rows that has non-nullable values for
artist_name , b) requires only songs that are released after year 1999, c) ignoring location details that comprises of
CREATE LIVE TABLE live.clnsd_prjcd_songs(
CONSTRAINT not_null_artist EXPECT (artist_name IS NOT NULL),
CONSTRAINT latest_decade EXPECT (year > 1999) ON VIOLATION FAIL UPDATE)
COMMENT "Projected, Cleansed version of Raw Live table"
3. Aggregated Live Table: This will be our final output table which can be consumed directly for data visualization and business intelligence querying purposes. Despite the fact that the aggregation transformation can be performed in downstream jobs, for the same reasons mentioned for
curated live table , it’s better to create a new table for aggregation and refined form of data. This aggregated table is created to satisfy the requirement of albums released by author before year 2010 and also matches the minimal count of 25. Aggregated table can be formed as follows:
CREATE LIVE TABLE live.aggregated_play_list
COMMENT "Aggregated data from cleansed_songs"
count(song_id) as num_albums
WHERE year < 2010
GROUP BY artist_id
HAVING count(song_id) >= 25
4. Pipeline Creation: Once all the tables are formed, it’s pretty easy to create pipeline using Databricks UI by completing steps below: 1) Click Jobs, then create pipelines using Pipelines tab, 2) Leave the optional parameters for the first time and start pointing to the notebook that creates the live tables and trigger the pipeline, 3) Successful creation of pipeline will launch a cluster, executes the code written in notebook and then terminates the cluster.
5. Monitoring: Databricks Pipelines UI provides more insights on the status of pipelines and the current flow of each job. Snapshots on status of each job on workflow and the pipeline status are attached below. Like this, there are different ways to track the job status, execution and its logistics.
This is just a small pipeline that has been created. But even this small pipeline takes a lot of effort and time to create using an ETL tools. As already mentioned earlier, separate ETL tools, programming frameworks like Spark, Hadoop, development environment, database needs to created and managed for creating a pipeline. However, here in Delta Live Tables, all these stuff can be managed easily in few lines of code without worrying about execution environment, job observability and monitoring. Also, it has additional capabilities of reusing the same code for batch and streaming jobs, visualization of job metrics using Databricks SQL.
I believe this post will assist you in understanding the basics of Delta Live tables. On a separate note, watch my space for more detailed analysis and complex pipeline creation in Delta Live table.
If you’re new to data engineering, please go through my other posts below
- Why Lazy Evaluation enhances performance and makes Apache Spark distinct?
- Is Hadoop complex than other database?
- Move S3 Objects faster without any hurdles