Spark New Optimization Rule - ReplaceExceptWithNotFilter
Spark community decided to replace Except logical operator using left anti-join in SPARK-12660. It facilitates to take advantage of all the benefits of the join operations such as managed memory, code generation and broadcast joins, cc. SPARK-12660.
Except is one of the mostly used logical operator that is mainly used to get the difference between the two datasets. More often (not always), we happen to use except on two different datasets that are derived/ transformed from a single parent dataset. If two datasets parent relations (say., HDFS File) are same and one or both of the datasets (on which we need to run apply except) are purely transformed using filter operations, then instead of rewriting the Except operator using expensive join operation, we can rewrite it using cheaper filter operation. I will discuss the details of this rule in the blog post. If you are not already familiar with the internals of spark catalyst and it’s Tree Node structure and query optimization refer my previous post here before continuing with this post.
Quick Example
Case 1:
Case 2:
Logical Operators
Here is a brief introduction about the three logical operators that is concerned with this post.
Except operator
Except operation takes two datasets d1 and d2 and returns a resulting dataset with the rows that don’t appear in the second dataset. In order to compare the tuples from the two datasets both the datsets should contains same number of columns/ fields or at least has to be projected in that way. There are two types of except operation based on whether to remove duplicates or not.
-
Except All: Don’t remove duplicates
-
Except Distinct: Removes duplicates
In spark there is no such distinction between Except All and Except Distinct, there is only one operator Except, which actually Except Distinct.
Anti Join operator
Anti join is a better way of doing queries that is traditionally used to be done using NOT IN
or
NOT EXISTS
operator. The performance benefits of using anti join over NOT IN
or NOT Exists
operator
is similar to performance of Nested loop join over Hash join. You can read more about it at this
link.
Eg:
select * from table1 t1 where not exists (select 1 from table2 t2 where t1.id = t2.id)
Anti join don’t remove the duplicates, so the results of a query using anti join will be same as that of the
results of the Except All
operator.
Distinct operator
Distinct operation removes duplicate tuples in a dataset. In order to do this spark does Hash partitioning on the dataset using all the columns of the dataset as the partitioning key and uses the hash code of the tuples for removing the duplicates. Distinct operation is actually expensive than doing an anti join operation.
Spark Optimizer
Enough of the theory, let’s take a look at some code from the spark codebase.
Below is a snippet from spark Logical query optimizer: org.apache.spark.sql.catalyst.optimizer.Optimizer
::
Batch("Replace Operators", fixedPoint,
ReplaceIntersectWithSemiJoin,
ReplaceExceptWithAntiJoin,
ReplaceDistinctWithAggregate)
::
And here is the actual ReplaceExceptWithAntiJoin
Rule implementation.
object ReplaceExceptWithAntiJoin extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Except(left, right) =>
assert(left.output.size == right.output.size)
val joinCond = left.output.zip(right.output).map { case (l, r) => EqualNullSafe(l, r) }
Distinct(Join(left, right, LeftAnti, joinCond.reduceLeftOption(And)))
}
}
By seeing this rule, it’s tempted me to write a new optimization rule called ReplaceExceptWithNotFilter
and schedule it just before the ReplaceExceptWithAntiJoin
rule and check what happens. Here is the
code, it’s not necessarily to be perfect as i’m still experimenting with it.
::
Batch("Replace Operators", fixedPoint,
ReplaceIntersectWithSemiJoin,
ReplaceExceptWithNotFilter,
ReplaceExceptWithAntiJoin,
ReplaceDistinctWithAggregate)
::
object ReplaceExceptWithNotFilter extends Rule[LogicalPlan] {
implicit def nodeToFilter(node: LogicalPlan): Filter = node.asInstanceOf[Filter]
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Except(left, right) if isEligible(left, right) =>
Distinct(
Filter(Not(replaceAttributesIn(combineFilters(right).condition, left)), left)
)
}
def isEligible(left: LogicalPlan, right: LogicalPlan): Boolean = (left, right) match {
case (left: Filter, right: Filter) => parent(left).sameResult(parent(right))
case (left, right: Filter) => left.sameResult(parent(right))
case _ => false
}
def parent(plan: LogicalPlan): LogicalPlan = plan match {
case x @ Filter(_, child) => parent(child)
case x => x
}
def combineFilters(plan: LogicalPlan): LogicalPlan = CombineFilters(plan) match {
case result if !result.fastEquals(plan) => combineFilters(result)
case result => result
}
def replaceAttributesIn(condition: Expression, leftChild: LogicalPlan): Expression = {
condition transform {
case AttributeReference(name, _, _, _) =>
leftChild.output.find(_.name == name).get
}
}
}
I tested the code on a csv file of size 500 Mb using couple of quick queries something as follows:
val ds1 = spark.read.option("header", "true").csv("path/to/the/dataset")
val ds2 = ds1.where($"month" === 12)
val ds3 = ds1.where($"month" < 3)
val ds4 = ds3.except(ds2)
And when i verified the optimized plan via ds4.queryExecution.optimizedPlan.numberedTreeStringd
,
the plan is changed
from:
00 Aggregate [...]
01 +- Join LeftAnti, (...)
02 +- Relation[...]
to:
00 Aggregate [...]
01 +- Filter ((isnotnull(month) && (cast(month as int) < 3)) && NOT (cast(month as int) = 12))
02 +- Relation[...]
Apparently the new rule ReplaceExceptWithNotFilter
gave a good performance gain. The query
that took around 60 seconds is reduced to 26 seconds with the addition of the new rule. My immediate
thought was when there is 120% latency gain on a standalone instance where there is no actual shuffling involved,
there should be a much better performance gain in a big cluster. So i wanted to add this new rule to the
spark Optimizer via ExperimentalMethods
that is available since spark 2.0 (thanks to
SPARK-9843) and verify it’s performance in a
production cluster.
When i did so, the new rule is never applied by the spark optimizer. When i did some debugging,
i found the extra optimizations added via ExperimentalMethods
are applied only after all the batches
of predefined are rules applied. By the time the new ReplaceExceptWithNotFilter
rule scheduled
to apply, the Except
operator is already replaced with the Anti-Join
operation by the predefined
rules. It would have been helpful if the ExperimentalMethods
class provides option to add some
pre-optimization rules here. I will discuss how to make this option available in the
next post.
So in order to test the performance of this new rule i wrote an another rule ReplaceAntiJoinWithNotFilter
that rewrites the anti join operation with a filter operator.
Here is that custom rule:
object ReplaceAntiJoinWithNotFilter extends Rule[LogicalPlan] {
implicit def nodeToFilter(node: LogicalPlan) = node.asInstanceOf[Filter]
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Join(left, right, joinType, _) if joinType.sql == "LEFT ANTI" && isEligible(left, right) =>
Filter(Not(replaceAttributesIn(right.condition, left)), left)
}
def isEligible(left: LogicalPlan, right: LogicalPlan): Boolean = (left, right) match {
case (_ @ Filter(_, lChild: LogicalRelation), _ @ Filter(_, rChild: LogicalRelation)) =>
equals(lChild, rChild)
case (leftNode: LogicalRelation, _ @ Filter(_, rChild: LogicalRelation)) =>
equals(leftNode, rChild)
case _ => false
}
def equals(leftNode: LogicalRelation, rightNode: LogicalRelation): Boolean = {
leftNode.relation == rightNode.relation
}
def replaceAttributesIn(condition: Expression, leftChild: LogicalPlan): Expression = {
condition transform {
case AttributeReference(name, _, _, _) =>
leftChild.output.find(_.name == name).get
}
}
}
Finally, i managed to test the ReplaceAntiJoinWithNotFilter
rule on a production Hadoop cluster
with the resources --executor-memory 110G --total-executor-cores 250
on a csv file of 10 Gb.
A similar query as we seen above that takes 3 minutes is reduced to 20 seconds, which is actually
an order of magnitude difference.
One of the limitations of applying the custom rule at the end is that the custom rule won’t be subjected
to any further optimization. For example, when i scheduled to apply the ReplaceExceptWithNotFilter
rule just before the ReplaceExceptWithAntiJoin
rule in the “Replace Operators” batch, the filter
operation is further optimized by some of the spark predefined optimization rules
from:
01 +- Filter NOT (isnotnull(month) && (cast(month as int) = 12))
02 +- Filter (isnotnull(month) && (cast(month as int) < 3))
to:
01 +- Filter ((isnotnull(month) && (cast(month as int) < 3)) && NOT (cast(month as int) = 12))
But in case of adding the ReplaceAntiJoinWithNotFilter
rule via ExperimentalMethods
, the same
optimization is not effected. So while applying any custom rules at the end via ExperimentalMethods
,
we should make sure it is fully optimized, unless ExperimentalMethods
class provides an option
to apply our custom rules before spark predefined rules are applied. Let’s see how to open this option
in ExperimentalMethods
in less than 10 lines of codes in my
next post.