Spark Adding Custom Optimization Rules
One of the main benefits of spark-sql as mentioned in their sigmod paper
is its ability to easily define and plug in user defined adhoc rules for better optimization.
Spark-sql provides api for adding set of adhoc rules that can be plugged into the query planner during
runtime. Especially after the data source api is made independent from the spark core, it is very
useful for different database/ datasource vendors for adding rules that best suit their datasource
design. This api for adding adhoc rules at run time is moved from SQLContext to separate
class called ExperimentalMethods
in SPARK-5193.
However this api supported only adding rules for optimizing spark PhysicalPlan
and didn’t offer
support for adding rules for the OptimizedPlan
. In SPARK-9843
this support is added and officially made available to the public since Spark 2.0. If you are not familiar
with the difference between spark PhycialPlan
and OptimizedPlan
please refer my previous post here
However, the api provided by the ExperimentalMethods
for adding additional rules provides option
to apply user provided rule only after all the native rules of spark are applied. This may limit the
ability to do some advanced optimization, also it limits additional optimization on the user
provided custom rule by taking advantage of the spark’s native optimization rules. One such examples
can be found in my previous post on SparkOptimizer.
Experimental Methods
Let’s take a look at the ExperimentalMethods
class and how user provided optimization rules are
plugged into SparkOptimizer
class.
ExperimentalMethods class:
class ExperimentalMethods private[sql]() {
@volatile var extraStrategies: Seq[Strategy] = Nil
@volatile var extraOptimizations: Seq[Rule[LogicalPlan]] = Nil
override def clone(): ExperimentalMethods = {
val result = new ExperimentalMethods
result.extraStrategies = extraStrategies
result.extraOptimizations = extraOptimizations
result
}
}
Snippet from SparkOptimizer:
override def batches: Seq[Batch] = (preOptimizationBatches ++ super.batches :+
Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog, conf)) :+
Batch("Extract Python UDF from Aggregate", Once, ExtractPythonUDFFromAggregate) :+
Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions)) ++
postHocOptimizationBatches :+
Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*)
And here are the changes that facilitate to apply user provided optimization rules before spark’s
native rules for Analyzer
and Optimizer
are applied:
class ExperimentalMethods private[sql]() {
@volatile var extraStrategies: Seq[Strategy] = Nil
@volatile var extraPreOptimizations: Seq[Rule[LogicalPlan]] = Nil
@volatile var extraOptimizations: Seq[Rule[LogicalPlan]] = Nil
override def clone(): ExperimentalMethods = {
val result = new ExperimentalMethods
result.extraStrategies = extraStrategies
result.extraPreOptimizations = extraPreOptimizations
result.extraOptimizations = extraOptimizations
result
}
}
val experimentalPreOptimizations: Seq[Batch] = Seq(Batch(
"User Provided Pre Optimizers", fixedPoint, experimentalMethods.extraPreOptimizations: _*))
val experimentalPostOptimizations: Batch = Batch(
"User Provided Post Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*)
override def batches: Seq[Batch] = experimentalPreOptimizations ++
(preOptimizationBatches ++ super.batches :+
Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+
Batch("Extract Python UDF from Aggregate", Once, ExtractPythonUDFFromAggregate) :+
Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions)) ++
postHocOptimizationBatches :+ experimentalPostOptimizations
I hope these changes to the codebase would be very helpful in many cases for advanced optimization. Also it will take full advantage of the spark’s simplicity on defining new rules and plug them into spark on the fly during run time. At least these changes on the local branch of my laptop made it very useful to experiment with the SparkOptimizer without having to recompile the code every time.