Fast-Track PySpark UDF execution with Apache Arrow

Balachandar Paulraj
4 min readNov 19, 2023

--

Developers often create custom UDFs (user-defined-functions) in their Spark code to handle specific transformations. This allows users to develop personalized code for their unique data processing requirements.

Photo by Braden Collum on Unsplash

PROBLEM STATEMENT

Despite the myriad advantages that UDF brings to Spark, the (de)serialization process in Python heavily relies on the pickle format (specifically, cloudpickle). A challenge surfaces when cloudpickle introduces performance bottlenecks during the data processing in UDFs, particularly in situations where there are extensive data inputs and outputs.

PERFORMANCE BOTTLENECK — REASON

(De)Serialization for Python UDF is carried out as a separate process which takes additional time for its completion. Allow me to enhance the clarity of the performance bottleneck by referring to a diagram from one of my previous posts which explains the under the hood processes involved in converting Spark Dataframe to a Pandas Dataframe.

Issues in pickle format explained through a simple case of converting Spark to Pandas Dataframe

I hope the above diagram shows the additional process required in (de)serializing the data using pickle format. Since all the records needs to be moved to driver for serialization, most of the cases that involves huge data volume doesn’t fit the dataframe within driver memory and might fail. Though this can be fixed through different options, it needs additional effort.

APPROACH USING APACHE ARROW

Unveiling a paradigm shift in Apache Spark 3.5 and Databricks Runtime 14.0, the introduction of Arrow-optimized Python UDFs stands as a game-changer for performance enhancement.

Anchored by Apache Arrow, a universally adopted cross-language columnar in-memory data representation, this optimization dismantles traditional, slower data (de)serialization methods. The result is an agile and efficient data interchange between JVM and Python processes, elevating the overall efficiency of data processing. Enriched by the versatility of Apache Arrow’s type system, these optimized UDFs establish a new standard, offering a consistent and refined approach to handling type coercion.

Now, let’s uncover the intricacies of the couple of benefits mentioned before which includes 1) faster (de)serialization and 2) handling of type coercion.

  1. Faster (De)serialization:

In contradistinction to Pickle’s approach of serializing entire Rows, Arrow opts for a column-oriented storage format. This decision enhances compression and memory locality, making it a well-suited choice for analytical workloads.

In order to validate this, I have written below small UDF in python and validated for different number of records.

from pyspark.sql.functions import desc
from pyspark.sql.types import IntegerType

df = spark.range(1,1000000) #change this to validate the below udf for different input range

@udf(returnType=IntegerType())
def add_one(x):
return x + 1

df.select("id", add_one("id").alias("id_plus_one")).orderBy(desc("id_plus_one")).show(20)

I have updated the below spark config `spark.sql.execution.pythonUDF.arrow.enabled` to True/False to switch between Arrow and default serialization (pickle).

Arrow was able to perform under the hood operations very fast to complete the required tasks. Time taken to complete the operation for different simple data sets are as follows:

Note: This whole test has been performed using one drirver and one executor and it’s instance types belongs to m6.ilarge (8GB memory, 2 cores).

2. Handling of Type Coercion: Type coercion in DataFrame encounters hurdles when the Python values returned by the UDF deviate from the specified return type provided by the user. For example, if we are trying to convert String to Int through an UDF or string to date conversion causes ambiguous erroneous values in Pickle serialization. Let’s explore this type coercion with below piece of code.

#string to int conversion
df = spark.createDataFrame(['1', '2', '3', '4'], schema='string')
df.select(udf(lambda x: x)('value').alias('int_value')).show()

Executing the above piece of code with Arrow successfully coerces integers stored as a string back to integer as specified (returns 1,2,3,4 respectively), but a pickled format falls back to NULL (returns NULL for all records)

#converting date to string
df = spark.createDataFrame([datetime.date(1970, 1, 1)], schema='date')
df.select(udf(lambda x: x, 'string')('value').alias('string_value')).show()

Similar to previous example, above code with Arrow converts date to string value and returns the proper value, however pickled format returns erroneous values.

CONCLUSION:

Leveraging the power of Apache Arrow propels PySpark UDFs into a realm of unparalleled speed and efficiency, revolutionizing data processing. Embrace the fast track to enhanced execution, where Arrow becomes the catalyst for a seamless and accelerated PySpark UDF experience.

REFERENCE:

https://www.databricks.com/blog/arrow-optimized-python-udfs-apache-sparktm-35

--

--

No responses yet