Databricks AutoLoader : Enhance ETL by simplifying Data Ingestion Process

Balachandar Paulraj
4 min readDec 20, 2021

--

Introduction:

Before we start deep diving on AutoLoader, let us focus on the existing data engineering issues in ingestion process that fits into one of the below categories:

  1. High Latency due to batch processing: Though data is landing at regular intervals for every few minutes, most of the cases a batch job can be scheduled to process files for every one hour. This increases latency (and lengthens SLA) and adds huge workload, since every batch needs to process huge chunks of files.
  2. Processing too small files in stream processing: A streaming job that is continuously monitoring for source files and processing small chunks of data results in writing too small files in target system which introduces new set of issues for downstream consumers.
  3. Missing Input Files: Files getting created before the start of batch processing and completed after the start of batch processing might get missed during processing. For example, consider a hourly scheduled job and an input file getting created 5 minutes before start of the job and ends 2 minutes after start of the job. This file will get picked only during the next batch and will increase SLA by one hour.
  4. Cloud Rate Limit Issue: The naive file-based streaming source identifies new files in cloud by recursively traversing through the cloud buckets/folders to differentiate new files between old files. Both cost and latency can add up quickly as more and more files get added to a directory due to repeated listing of files. We might also get into Rate Limit Issue set up by cloud service provider. (For example, S3 throws exception whenever number of requests made to S3 has crossed a particular limit)

Above reasons poses a great issue in incrementally processing new data and grooming it to available for downstream jobs and common workflow in ETL jobs. For the listed reasons, we would need something between batch and streaming to do the job for us.

By keeping the issues in mind, let’s focus on Auto Loader. It acts as an optimized and enhanced system for source files present in cloud for Apache Spark that loads data continuously and efficiently from cloud storage whenever new data arrives. It provides a seamless way for data engineering teams to simplify data ingestion process with less cost. Let’s jump to implementation part from here.

Implementation:

The code snippet below has been written by considering AWS S3 as cloud filesystem.

//Initiate Input, Output and Checkpoint folders
val baseDir = "s3://for-auto-loader-demo"
val inputDir = s"${baseDir}/input"
val checkpointDir = s"${baseDir}/checkpoint"
val outputDir = s"${baseDir}/output"
//Define Reading part for files present in S3. For reading files with //defined schema like parquet, orc, avro..etc no need to define the //schema explicitly like below.
val universityDf = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.schema("university_name string, num_students int, student_staff_ratio long, female_male_ratio long")
.load(inputDir)
//Define Writing part in predefined output directory along with //checkpoint location for storing job logs
universityDf.writeStream.format("parquet")
.option("checkpointLocation", checkpointDir)
.start(outputDir)

Hopefully, the code written here should be straight forward if you already have experience in leveraging Apache Spark framework for reading and writing data. Other than few updates in syntax, couple of things that might be new here includes the format as “cloudFiles” and adding a checkpointLocation to store the logs and metadata of the job. To keep it simple, keeping source as cloudFiles automatically processes new files as they arrive, with the option of also processing existing files in that directory.

Note:

  1. If there is need to execute job in batch mode (only once instead of streaming), then add tigger(trigger.once) while writing data.
  2. Delta format can be chosen while writing data to target path.
  3. Auto Loader uses RocksDB for storing metadata. If you’re curious, please download and check the log files under path s3://<checkpoint_folder>/sources/0/rocksdb/logs/

Benefits:

  • No file state management: All ETL projects should store the state of the job in order for subsequent job to pick up from the completed state. For example, all incremental jobs stores input end time or the last file processed by the job.
  • Not dependent on cloud notifications: Most of the current projects that runs with this approach depends on cloud service provider like SNS or S3 event notifications (in case of AWS) for establishing ETL framework like this. Here, Auto Loader takes care of everything and no need to set anything up.
  • Scalable State Management: As mentioned earlier, it leverages RocksDB for storing metadata which is scalable even with millions of files in a directory.
  • No Manual Effort Required to reduce the SLA: Sometimes, to adhere with SLA limits, we trigger adhoc job to process the file, which might introduce few manual errors.

References:

--

--