Databricks AutoLoader : Enhance ETL by simplifying Data Ingestion Process

Introduction:

  1. High Latency due to batch processing: Though data is landing at regular intervals for every few minutes, most of the cases a batch job can be scheduled to process files for every one hour. This increases latency (and lengthens SLA) and adds huge workload, since every batch needs to process huge chunks of files.
  2. Processing too small files in stream processing: A streaming job that is continuously monitoring for source files and processing small chunks of data results in writing too small files in target system which introduces new set of issues for downstream consumers.
  3. Missing Input Files: Files getting created before the start of batch processing and completed after the start of batch processing might get missed during processing. For example, consider a hourly scheduled job and an input file getting created 5 minutes before start of the job and ends 2 minutes after start of the job. This file will get picked only during the next batch and will increase SLA by one hour.
  4. Cloud Rate Limit Issue: The naive file-based streaming source identifies new files in cloud by recursively traversing through the cloud buckets/folders to differentiate new files between old files. Both cost and latency can add up quickly as more and more files get added to a directory due to repeated listing of files. We might also get into Rate Limit Issue set up by cloud service provider. (For example, S3 throws exception whenever number of requests made to S3 has crossed a particular limit)

Implementation:

//Initiate Input, Output and Checkpoint folders
val baseDir = "s3://for-auto-loader-demo"
val inputDir = s"${baseDir}/input"
val checkpointDir = s"${baseDir}/checkpoint"
val outputDir = s"${baseDir}/output"
//Define Reading part for files present in S3. For reading files with //defined schema like parquet, orc, avro..etc no need to define the //schema explicitly like below.
val universityDf = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.schema("university_name string, num_students int, student_staff_ratio long, female_male_ratio long")
.load(inputDir)
//Define Writing part in predefined output directory along with //checkpoint location for storing job logs
universityDf.writeStream.format("parquet")
.option("checkpointLocation", checkpointDir)
.start(outputDir)
  1. If there is need to execute job in batch mode (only once instead of streaming), then add tigger(trigger.once) while writing data.
  2. Delta format can be chosen while writing data to target path.
  3. Auto Loader uses RocksDB for storing metadata. If you’re curious, please download and check the log files under path s3://<checkpoint_folder>/sources/0/rocksdb/logs/

Benefits:

  • No file state management: All ETL projects should store the state of the job in order for subsequent job to pick up from the completed state. For example, all incremental jobs stores input end time or the last file processed by the job.
  • Not dependent on cloud notifications: Most of the current projects that runs with this approach depends on cloud service provider like SNS or S3 event notifications (in case of AWS) for establishing ETL framework like this. Here, Auto Loader takes care of everything and no need to set anything up.
  • Scalable State Management: As mentioned earlier, it leverages RocksDB for storing metadata which is scalable even with millions of files in a directory.
  • No Manual Effort Required to reduce the SLA: Sometimes, to adhere with SLA limits, we trigger adhoc job to process the file, which might introduce few manual errors.

References:

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store