Spark调优
参数配置
spark-defaults.conf
动态载入属性
代码设置
调优
序列化调优
Java序列化
默认情况下,Java采用Java的ObjectOutputStream序列化一个对象。该方式适用于所有实现的java.io.Serializable的类。通过继承
java.io.Externalizable
,能进一步控制序列化的性能。Java序列化非常灵活,但是速度较慢,序列化结果也较大会产生一些spark不需要的类的相关信息。
Kryo序列化
Spark也支持Kryo序列化对象,Kryo速度快,产出结果也更为紧凑。
Kryo并不支持所有类型,为了获得更好的性能,开发者需要提前注册程序中使用的类
。
设置Kryo序列化
Kryo序列化器会作用于worker节点数据的shuffle以及RDD序列化到磁盘,适合用
网络密集型
应用。
Kryo注册序列化类
序列化对象过大时可以使用
spark.kryoserializer.buffer.mb
的值,该属性的默认值时32,但是该属性需要足够大,以便能偶容纳需要序列化的最大对象。
内存优化
对象占用的内存
访问对象的消耗
垃圾回收占用的内存开销
确定内存消耗
计算数据集所需内存大小是创建一个RDD,将其放入缓存,然后观察Spark history webUI的Storage页面,该页面会列出RDD各项信息。
估算一个特殊对象的内存消耗,可以使用SizeEstimator类的estimate方法,可以降低不同数据布局的内存消耗,还可以决定广播变量在每个executor heap上所占内存大小。
优化调整数据结构
使用对象数组以及原始类型的数组代替Java或Scala集合类。fastuitl库为原始类型提供了方便的集合类,同时这些集合兼容Java标准类库。
避免使用含有指针和小对象的嵌套结构
考虑采用数字ID或枚举代替String类型的key
当内存少于32GB时,可设置JVM参数为
-XX:+UseCompressedOps
,以将8字节指针修改为4字节。设置JVM参数-XX:+UseCompressedStrings
,以8bit来编码每一个ASCII字符。
序列化RDD存储
采用RDD持久化API的序列化StorageLevel,如Memory_ONLY_SER。将RDD序列化为byte数组存储,使用时间换空间策略,此时可以使用Kryo序列化器来降低对象大小和加快访问速度。
优化GC
内存回收信息,开启gc参数
缓存大小优化
内存回收优化
JVM相关优化
调整非堆内存
数据本地化
数据本地化堆Spark job有着重要的影响,如果数据和计算逻辑放在一起那么运算速度会更快减少了数据的网络传输。
Spark的本地化层级
PROCESS_LOCAL:数据和运行代码位于同一个JVM实例中,性能最好的本地化。
NODE_LOCAL:数据和运行代码位于同一个节点,比如在同一个节点上的HDFS中或者同一个节点的其他executor中。
NO_PREF:数据可以从任何地方以同等速度访问,并且不倾向于本地化。
RACK_LOCAL:数据位于同一机架。数据位于同一机架上的不同server上,因此需要通过网络传输。
ANY数据位于不同机架。
Spark的本地化选择
Spark优先层级高的位置,当空闲的executor上不存在未被处理的数据时,Spark专向更低的存储层级。通常选择为俩种:
等待忙碌的CPU闲下来,然后启动同一个server上的数据关联的任务。
在远离数据的地方立即启动任务,这需要移动数据。
Spark一般使用的策略是等待忙碌的CPU闲置,一旦等待时间超时,它会将数据移动到远端闲置CPU。不同层级的回滚等待时间可以单独配置参考
spark.locality
部分。
其他优化
并行度设置
通常集群中为每个CPU核分配2~3个任务比较合适。
Reduce Task的内存使用
任务遇到大结果集导致的OOM,并非RDD不能加载导致的,而是因为Spark shuffle操作导致的需要为每个任务创建哈希表导致的OOM,可以通过增大并行度修复。
任务执行速度倾斜
数据倾斜
一般为partition key取的存在问题,替换其他的并行处理方式,中间可以加入一步aggregation。
worker倾斜
设置
spark.speculation=true
,持续慢的node去掉。
shuffle磁盘IO时间长
设置组磁盘。
spark.local.dir=/dir1,/dir2,/dir3
Shuffle过程优化
spark.shuffle.memoryFraction
:用于设置Shuffle读阶段Task从上一个阶段拉取到内存的AppendOnlyMap中做聚合计算可使用的内存大小,当内存中的AppendOnlyMap装满时会将数据溢写到磁盘中。默认是Executor内存的0.2倍,如果Task拉取的数据量过大可以增大改值。
最后更新于