Demystify Spark Efficiency in Union Operator
The union operator is likely one of the set operators to merge two enter knowledge frames into one. Union is a handy operation in Apache Spark for combining rows with the identical order of columns. One often used case is making use of totally different transformations after which unioning them collectively.
The methods of utilizing the union operation in Spark are sometimes mentioned extensively. Nonetheless, a hidden undeniable fact that has been much less mentioned is the efficiency caveat related to the union operator. If we didn’t perceive the caveat of the union operator in Spark, we would fall into the lure of doubling the execution time to get the consequence.
We’ll give attention to the Apache Spark DataFrame union operator on this story with examples, present you the bodily question plan, and share strategies for optimization on this story.
Like Relational Database (RDBMS) SQL, the union is a direct option to mix rows. One essential factor to notice when coping with a union operator is to make sure rows comply with the identical construction:
- The variety of columns must be equivalent. The union operation received’t silently work or fill with NULL when the variety of columns differs on knowledge frames.
- The column knowledge sort ought to match and resolves columns by place. The column title ought to comply with the identical sequence for every knowledge body. Nonetheless, that’s not obligatory. The primary knowledge body will likely be chosen because the default for the column title. So mixing order can probably trigger an undesired consequence. Spark
unionByName
is meant to resolve this situation.
In Spark, the operation unionAll
is an alias to union
that doesn’t take away duplication. We’d want so as to add distinct after performing union to carry out SQL-like union operations with out duplication.
We will additionally mix a number of knowledge frames to provide a closing knowledge body.
df = df1.union(df2).union(df3)
One typical sample of utilizing the union operator is splitting a single knowledge body into a number of, then making use of totally different transformations, and ultimately combining them into the ultimate one.
Right here is an instance: we now have two huge tables (truth desk) that want to affix, and one of the simplest ways to affix is the SortMerged take part Spark. As soon as we obtained the SortMerged knowledge body, we break up it into 4 subsets. Every subset makes use of totally different transformations, and ultimately, we mix these 4 knowledge frames into the ultimate one.
Spark knowledge body leverages Catalyst optimizer, which takes the info body code you had, then performs code evaluation, logical optimization, bodily planning, and code technology. Catalyst tries to create an optimum plan that executes your Spark job effectively.
In recent times, Spark has extensively achieved a number of optimization on Catalyst to enhance efficiency on Spark be a part of operations. The be a part of operation has extra eventualities to make use of than the union operation, resulting in much less effort put into the union operation.
If customers don’t use union on completely totally different knowledge sources, union operators will face a possible efficiency bottleneck — Catalyst isn’t “good” to determine the shared knowledge frames to reuse.
On this case, Spark will take every knowledge body as separate branches, then carry out all the things from the foundation a number of instances. In our instance, we are going to carry out the 2 huge desk be a part of 4 instances! It’s a large bottleneck.
It’s easy to breed a non-optimized bodily question plan for the union operator in Spark. We’ll do the next
- Create two knowledge frames from 1 to 1000000. Let’s name them
df1
anddf2
- Carry out inside be a part of on
df1
anddf2
- Break up the joined consequence into two knowledge frames: one solely incorporates the odd numbers, one other one for the even numbers
- Add a metamorphosis with a subject known as
magic_value
, which is generated by two dummy transformations. - Union the odd and even quantity knowledge frames
## Create two knowledge frames from 1 to 1000000. Let's name them df1 and df2
df1 = spark.createDataFrame([i for i in range(1000000)], IntegerType())
df2 = spark.createDataFrame([i for i in range(1000000)], IntegerType())## Carry out inside be a part of on df1 and df2
df = df1.be a part of(df2, how="inside", on="worth")
## Break up the joined consequence into two knowledge frames: one solely incorporates the odd numbers, one other one for the even numbers
df_odd = df.filter(df.worth % 2 == 1)
df_even = df.filter(df.worth % 2 == 0)
## Add a metamorphosis with a subject known as magic_value which is generated by two dummy transformations.
df_odd = df_odd.withColumn("magic_value", df.worth+1)
df_even = df_even.withColumn("magic_value", df.worth/2)
## Union the odd and even quantity knowledge frames
df_odd.union(df_even).rely()
Here’s a high-level view of what the DAG appears to be like like. If we take a look at the DAG bottom-up, one factor that stands out is the be a part of occurred twice, and the upstream virtually appears to be like equivalent.
We’ve seen the place Spark must optimize the union operator extensively, and far time is wasted performing pointless recomputing if the info supply will be reused.
Right here is the bodily plan that has 50 phases scheduled with AQE enabled. We will see ids 13 and 27. Spark did carry out be a part of twice on every department and recomputed its department.
Now we are able to see this potential bottleneck. How might we resolve this? One possibility is to double the variety of executors to run extra concurrent duties. However there’s a higher option to trace to Catalyst and let it reuse the joined knowledge body from reminiscence.
To resolve the difficulty of the Spark efficiency of union operation, we are able to explicitly name a cache
to persist the joined knowledge body in reminiscence. So Catalyst is aware of the shortcut to fetch the info as a substitute of returning it to the supply.
The place so as to add the cache()
? The advisable place could be the info body earlier than the filtering and after the be a part of is accomplished.
Let’s see it in motion:
# ...........................
## Carry out inside be a part of on df1 and df2
df = df1.be a part of(df2, how="inside", on="worth")## add cache right here
df.cache()
## Break up the joined consequence into two knowledge frames: one solely incorporates the odd numbers, one other one for the even numbers
df_odd = df.filter(df.worth % 2 == 1)
# ...........................
Right here is the question plan: InMemoryTableScan is current, so we are able to reuse the info body to save lots of different computing.
Now the bodily plan is diminished to have solely 32 phases, and if we examine, ids 1 and 15 each leverage the InMemoryTableScan. This might save rather more time if we break up the unique knowledge frames into smaller datasets after which union them.
I hope this story helps present some insights into why typically the union operation turns into a bottleneck in your Spark efficiency. Because of the lack of optimization in Catalyst for the union operator in Spark, customers want to concentrate on such caveats to develop Spark code extra successfully.
Including cache can save time in our instance, nevertheless it received’t assist if the union is carried out on two fully totally different knowledge sources and there’s no shared place to carry out cache.
Kazuaki Ishizaki’s discuss conjures up this story — Goodbye Hell of Unions in Spark SQL, and my expertise dealing with an analogous situation for my initiatives.
ps: If you’re curiosity in methods to deal with knowledge skew a part of Spark efficiency, I’ve one other story on TDS for it.