SparkSQL优化分析
AQE(Adaptive Query Execution )
3.0.0之后特性,3.2.0默认开启,通过
spark.sql.adaptive.enabled
设置,默认优化合并shuffle write分区小文件、转换sort-merge join为broadcast join,join倾斜优化。
合并shuffle后的分区
通过设置
spark.sql.adaptive.enabled
和spark.sql.adaptive.coalescePartitions.enabled
为true
开启shuffle分区合并,基于map端输出分析合并shuffle后的分区。不需要在通过spark.shuffle.partition
设置特定的分区数,只需要设置spark.sql.adaptive.coalescePartitions.initialPartitionNum
初始化分区配置spark就可以找到他合适的分区数。
Property Name | Default | Meaning | Since Version |
---|---|---|---|
| true | 当和 | 3.0.0 |
| true | 设置为true时会忽略 | 3.2.0 |
| 1MB | 最多为 | 3.2.0 |
| (none) | 合并前的初始shuffle分区数。如果没有设置,它等于' spark.sql.shuffle.partitions '。此配置仅在spark.sql.adaptive | 3.0.0 |
| 64 MB | 在spark.sql.adaptive | 3.0.0 |
sort-merge join转换为broadcast join
当分析出运行时任何join方小于broadcast hash join设置的阈值则会转换为broadcast join。这不是表示broadcast hash join最高效,但是它优于sort-merge join,因为我们可以保证链接俩端排序并且能够本地读shuffle文件通过
spark.sql.adaptive.localShuffleReader.enabled
设置为true
,spark.sql.adaptive.autoBroadcastJoinThreshold
是转换为broadcast join的阈值,如果小于则可以将merge-sort join转换为broadcast join
转换sort-merge join转换为shuffled hash join
当shuffle分区数小于设置的阈值则会将sort-merge join转换为hash join,阈值通过
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold
配置
优化倾斜join
数据倾斜会严重降低join查询的性能。该特性通过将倾斜的任务拆分(如果需要,还可以复制)为大小大致相同的任务来动态处理sort-merge join中的倾斜。当
spark.sql.adaptive.enabled
和spark.sql.adaptive.skewJoin.enabled
配置同时启用时生效。spark.sql.adaptive.skewJoin.enabled
当和spark.sql.adaptive.enabled
同时为true时,spark会通过分裂(必要时复制)倾斜分区来动态处理sort-merge join的倾斜分区。spark.sql.adaptive.skewJoin.skewedPartitionFactor
,如果一个分区的大小大于这个因子乘以分区中值大小,并且大于spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
,则认为该分区是倾斜的。spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
,如果分区的字节大小大于这个阈值,并且大于spark.sql.adaptive.skewJoin.skewedPartitionFactor乘以分区大小中值,则认为分区是倾斜的。理想情况下,这个配置应该设置比spark.sql.adaptive.advisoryPartitionSizeInBytes
大。
RBO(Rule-base Optimization)
在将
Resolved Logical
转换为Optimized Resolved Logical
时会基于Rule进行优化。每个优化以 Rule 的形式存在,每条 Rule 都是对 Analyzed Plan 的等价转换
RBO 设计良好,易于扩展,新的规则可以非常方便地嵌入进 Optimizer
RBO 目前已经足够好,但仍然需要更多规则来 cover 更多的场景
优化思路主要是减少参与计算的数据量以及计算本身的代价
PushDownPredicate
算子下推优化,如果俩个表进行join可以先进行filter后再去进行join,这个优化输入LogicalPlan的优化,从逻辑上保证了将Filter下推后由于参与Join的数据量变少而提高性能。
在物理层面,Filter 下推后,对于支持 Filter 下推的 Storage,并不需要将表的全量数据扫描出来再过滤,而是直接只扫描符合 Filter 条件的数据,从而在物理层面极大减少了扫描表的开销,提高了执行速度。
ConstantFolding
如果Project包含对于常量的计算比如
select 100+20 from xx
类似操作,如果记录过多就会进行多次操作,可以通过ConstantFolding
进行常量合并,从而减少不必要的计算,提高执行速度。
ColumnPruning
Filter 与 Join 操作会保留两边所有字段,然后在 Project 操作中筛选出需要的特定列。
ColumnPruning
规则能将 Project 下推,在扫描表时就只筛选出满足后续操作的最小字段集,则能大大减少 Filter 与 Project 操作的中间结果集数据量,从而极大提高执行速度。从物理层面在Project下推后,对于列式存储,扫描表时就只扫描需要的列减少IO消耗。
CBO(Cost-Based Optimizer)
基于代价优化考虑了数据本身的特点(如大小、分布)以及操作算子的特点(中间结果集的分布及大小)及代价,从而更好的选择执行代价最小的物理执行计划,即 SparkPlan(物理执行计划)。
每个执行节点的代价分为俩个部分:
该执行节点对数据集的影响,或者说该节点输出数据集的大小与分布
该执行节点操作算子的代价
代价主要考虑的问题为以下俩个方面
原始数据集的统计信息
如何根据输入数据集估算特定算子的输出数据集
Statistics收集
通过
ANALYZE
语句可以计算出整个表的记录总数以及大小
设置
spark.sql.statistics.histogram.enabled
为true
统计的信息输出为直方图
算子对数据集影响估计
对于常见的
Column A < value B
Filter,可通过如下方式估算输出中间结果的统计信息若 B < A.min,则无数据被选中,输出结果为空
若 B > A.max,则全部数据被选中,输出结果与 A 相同,且统计信息不变
若 A.min < B < A.max,则被选中的数据占比为 (B.value - A.min) / (A.max - A.min),A.min 不变,A.max 更新为 B.value,A.ndv = A.ndv * (B.value - A.min) / (A.max - A.min)
如果数据
很多时候,数据分布并不均匀,且当数据倾斜严重是,上述估算误差较大。此时,可充分利用 histogram 进行更精确的估算。启用 Historgram 后,Filter
Column A < value B
的估算方法为若 B < A.min,则无数据被选中,输出结果为空
若 B > A.max,则全部数据被选中,输出结果与 A 相同,且统计信息不变
若 A.min < B < A.max,则被选中的数据占比为 height(<B) / height(All),A.min 不变,A.max = B.value,A.ndv = ndv(<B)
算子代价估计
SQL 中常见的操作有 Selection(由 select 语句表示),Filter(由 where 语句表示)以及笛卡尔乘积(由 join 语句表示)。其中代价最高的是 join。
Spark SQL中CBO通过以下方法估算join的代价
Build侧算子
hash join俩标一般选择小表作为build size构建哈希表,另一边作为 probe side。如果没开启CBO,大表经过
filter过滤后大小小于小表
,这时候还是会选择原始小表为build size,如果开启CBO则会选择最终filter后的表
为build size。
优化Join类型
在 Spark SQL 中,Join 可分为 Shuffle based Join 和 BroadcastJoin。Shuffle based Join 需要引入 Shuffle,代价相对较高。BroadcastJoin 无须 Join,但要求至少有一张表足够小,能通过 Spark 的 Broadcast 机制广播到每个 Executor 中。
在不开启 CBO 中,Spark SQL 通过 spark.sql.autoBroadcastJoinThreshold 判断是否启用 BroadcastJoin。其默认值为 10485760 即 10 MB。并且该判断基于参与 Join 的表的原始大小。
如果Table 1 大小为 1 TB,Table 2 大小为 20 GB,因此在对二者进行 join 时,由于二者都远大于自动 BroatcastJoin 的阈值,因此 Spark SQL 在未开启 CBO 时选用 SortMergeJoin 对二者进行 Join。
而开启 CBO 后,由于 Table 1 经过 Filter 1 后结果集大小为 500 GB,
Table 2 经过 Filter 2 后结果集大小为 10 MB 低于自动 BroatcastJoin 阈值
,因此 Spark SQL 选用 BroadcastJoin。
优化多表join顺序
未开启 CBO 时,Spark SQL 按 SQL 中 join 顺序进行 Join。极端情况下,整个 Join 可能是 left-deep tree
left-deep tree,因此所有后续 Join 都依赖于前面的 Join 结果,各 Join 间无法并行进行
前面的两次 Join 输入输出数据量均非常大,属于大 Join,执行时间较长
开启 CBO 后, Spark SQL 将执行计划优化如下
Join 树不再是 left-deep tree,因此 Join 3 与 Join 4 可并行进行,Join 5 与 Join 6 可并行进行
最大的 Join 5 输出数据只有两百万条结果,Join 6 有 1.49 亿条结果,Join 7相当于小 Join
数据倾斜优化方案
不重复的key数据倾斜
数据量大
通过增加并行度或者减少并行度保证每个task处理的数据相对均匀
自定义分区器根据数据的特性写出相对合适的分区器保证数据分布均匀
数据量不大
通过挑战broadcast的阈值将reduce join转换为map join。
重复的key的数据量过大
为skew的key增加随机前/后缀
为数据量特别大的Key增加随机前/后缀,使得原来Key相同的数据变为Key不相同的数据,从而使倾斜的数据集分散到不同的Task中,彻底解决数据倾斜问题。Join另一则的数据中,与倾斜Key对应的部分数据,与随机前缀集作笛卡尔乘积,从而保证无论数据倾斜侧倾斜Key如何加前缀,都能与之正常Join。
总结
适用场景
两张表都比较大,无法使用Map则Join。其中一个RDD有少数几个Key的数据量过大,另外一个RDD的Key分布较为均匀。
解决方案
将有数据倾斜的RDD中倾斜Key对应的数据集单独抽取出来加上随机前缀,另外一个RDD每条数据分别与随机前缀结合形成新的RDD(相当于将其数据增到到原来的N倍,N即为随机前缀的总个数),然后将二者Join并去掉前缀。然后将不包含倾斜Key的剩余数据进行Join。最后将两次Join的结果集通过union合并,即可得到全部Join结果。
优势
相对于Map侧Join,更能适应大数据集的Join。如果资源充足,倾斜部分数据集与非倾斜部分数据集可并行进行,效率提升明显。且只针对倾斜部分的数据做数据扩展,增加的资源消耗有限。
劣势
如果倾斜Key非常多,则另一侧数据膨胀非常大,此方案不适用。而且此时对倾斜Key与非倾斜Key分开处理,需要扫描数据集两遍,增加了开销。
大表随机添加N种随机前缀,小表扩大N倍(AQE实现原理)
如果出现数据倾斜的Key比较多,上一种方法将这些大量的倾斜Key分拆出来,意义不大。此时更适合直接对存在数据倾斜的数据集全部加上随机前缀,然后对另外一个不存在严重数据倾斜的数据集整体与随机前缀集作笛卡尔乘积(即将数据量扩大N倍)。
总结
适用场景
一个数据集存在的倾斜Key比较多,另外一个数据集数据分布比较均匀。
优势
对大部分场景都适用,效果不错。
劣势
需要将一个数据集整体扩大N倍,会增加资源消耗。
最后更新于