Spark性能调优思路

Posted by AlstonWilliams on February 17, 2019

本文是作者在读完《Apache Spark - Best practices and Tuning》以及《High Performance Spark》以后,对如何编写高效的Spark程序,以及如何如何调优的一些总结。

本文总结的总归是不如原文全面,所以请各位有条件的同学,直接阅读原文。

一般来说,我们在进行性能调优的时候,主要是往三点靠,降低CPU消耗,减少IO,优化内存使用。如下图所示:

我们在Spark调优时,也是尽可能朝着这三点靠。

Spark调优主要从两个方面,一个是Spark Core,另一个是Spark SQL。

Spark Core调优

  1. 避免使用groupByKey,而使用reduceByKey 这一点相信各位已经熟知了。原因是reduceByKey会在executor上先进行一次合并,然后才传输到其他节点

  2. 使用TreeReduce/TreeAggregate,而不是Reduce/Aggregate 当partition很多,或者partition内的数据很大时,Driver的内存可能会爆掉。而TreeReduce/TreeAggregate由于会在中间合并一下,所以减少了这种风险。

  3. 如果想减少partition,应该使用coalesce,而不是repartition, coalesce会尽可能地减少数据传输。但是,如果是想增加partition的个数,那么,无论使用coalesce还是repartition,都会产生shuffle.

  4. 当join两个RDD时,如果其中一个很小,另一个很大,那么应当把小的RDD broadcast出去,这样数据传输量就很少。如果一个数据量适中,记为A,另一个很大,记为B,那么可以先提取A的key,broadcast出去,对B先进行过滤一遍,再进行join。

  5. 选择合适的StorageLevel 鱼与熊掌不可得兼。我们不能同时获得速度,使用内存少这两个特点,只能进行折中。MEMORY_ONLY会把数据缓存到内存中,但是相对于MEMORY_ONLY_SER,它的内存使用量更大,而Spark中内存本身就是一个很稀缺的资源。而后者呢,也有缺点,就是在使用的时候需要进行反序列化,这会增加时间。 另外,即使你的RDD会被复用两三次,把它缓存下来,也不一定是合适的选择。因为可能重新计算这个RDD的代价,比缓存并读取的效率要高。 而且,persist()checkpoint()是有区别的。不要混用。

6.repartition, join, cogroup, 以及 *By or *ByKey都会产生shuffle,所以尽可能少地使用它们。尽可能对输入数据进行过滤,减少数据量。

  1. 设置合适的分区数量。对于parallelize/NewHadoopRDD来说,partition的数量,跟文件大小,block size是挂钩的。而如果你的partition太少,就可能导致给这个Spark应用分配的资源不能被合理利用。或者分区内数据太多导致OOM.

  2. 不要使用默认的Java serializer,它的效率很低。应当使用Kryo Serializer.

  3. join操作并不一定都是shuffle dependency,如果相同的key已经位于相同的partition里了,那么它是narrow dependency.

  4. static allocation/dynamic allocation? static allocation会在Spark application启动前,就给它分配我们指定的Resource,然后再开始运行。而dynamic allocation,则会在获取到足够量的资源,就开始运行。在后面会动态地增加或减少资源。 但是,动态分配也有问题。当集群很忙,没有充足的资源的时候,Spark Application就会在申请资源那里卡住。而且,我不清楚的是,当集群很忙时,如果开起了Dynamic allocation, YARN会不会强制回收资源。 而且,这样会不会产生死锁呢?假如说集群更有1TB内存,200cores.如果Application A运行分配了600G内存,150cores, Application B运行分配了300G内存,30 cores。在运行过程中,Application A和Application B同时需要更多的资源来运行。这样会不会导致它们死锁在那儿?

  5. 在join以前,先对重复的key做distinct。这样能减少shuffle的时候数据的传输量。

  6. Spark Core中,默认的Join方式是Shuffle hash join。当两个RDD都有相同的Partitioner的时候,不会进行Shuffle。如果其中一个RDD很小,我们可以使用Broadcast hash join

  7. 当改变一条数据的partition的时候,这条数据会先发送给Driver,而不是直接从Executor到另一个Executor。想不明白这里为什么要这样做。当数据量很大的时候,Driver岂不是很容易OOM?

  1. Shuffle Dependency的partition,如果失败了,它的开销是很大的,因为需要重新计算很多partition(包括它的上有RDD)。所以,对于Shuffle Dependency得到的RDD,我们可以对它进行缓存一下。

  2. 尽量少创建object,并且创建小的object,来减少GC压力。尽量重用也有的对象。如果创建的对象过大,会直接进入老年代,给内存带来压力。带来频繁的Full GC。

  3. 原生数据结构,比如数组等,对内存更加友好。相对于自定义的对象,Tuple等,它的内存占用更少。所以,尽管代码可读性上,会带来一些困惑。但是确实能够减少内存的开销。

  4. 在函数内部,尽量减少创建中间对象。而隐式转换会创建一个中间对象。这是我们需要注意的。

  5. 使用mapPartition,而不是map。因为当一个分区不能被完全装填到内存时,mapPartition会只把那些不能装填进去的spill到磁盘中,而不会把整个partition spill到磁盘,这样减少了IO。而且,由于mapPartition操作的是iterator,避免了创建中间数据结构的开销。

  6. union/subtract的结果,可能存在重复的值。所以,最好使用distinct一下。而intersection的结果不存在重复的值。

  7. 对于某些需要昂贵开销的操作,比如创建数据库连接。这种使用mapPartitionmap更好,因为每个partition只会创建一次。

  8. 如果一个broadcast变量不再被需要的话,可以用unpersist来手动移除。特别是对比较大的broadcast,可以节省内存。

  9. 一般当一个RDD有下面的倾向时,persist/checkpoint才有意义:
    • 这个RDD会被使用多次
    • 我们要在这个RDD上执行多次action。此时如果不对这个RDD进行persist/checkpoint,每次执行一次action,都会从头计算这个RDD。
    • 这个RDD计算的开销特别大
  10. persist只对当前Spark Application有效。如果想在不同Spark Application之间共享RDD,那么需要使用checkpoint。

  11. 使用persist/checkpoint是有代价的。他们会占用memory/disk,并且会有序列化/反序列化的开销。所以,有的时候其实重新计算RDD,开销反而会小一些。

  12. Spark使用LRU的算法,来决定当开始出现OOM的时候,抛弃哪个partition。

  13. 当Spark在shuffle的时候,会往磁盘写一些shuffle file。它们包含每个分区内,排序好的records。所以,如果Spark重用RDD,可能会重用这些shuffle file,这样就减少了重新计算的开销。

  14. HashPartitioner需要接收一个partitions参数,来确定分区的个数。如果不指定的话,Spark会默认使用SparkConf中的spark.default.parallelism来决定分区的个数。如果这个值也没有设置,那么Spark默认会使用RDD在它的lineage中,最大的partition数。

  15. 除非一个Transformation已知是只改变value(对Key/Value pair RDD来说),否则输出的RDD不会保留partitioner信息。

  16. 先确定每个节点上具有的memory/cores的数量,然后确定给每个executor分配的资源。

  17. 增加partitions的数量不一定会提升性能。因为每个partition都会在一个task上面执行,每次能够执行的task的数量为executor number * executor cores。如果partition的数量大于这个值,就得排队,等下次才能运行。

  18. 给Executor分配的内存,并不是全部都能用来计算或者存储数据,有一部分元数据的开销的。这个可以通过spark.memory.storage.fraction来计算比例。

Spark SQL

  1. 采用合适的存储格式,如列式存储。

  2. DataFrame/DataSet往往比RDD更高效,所以,不要获取到DataFrame以后,就把它转换成RDD。能用DataFrame/DataSet就用。

  3. 尽量查询更少的列,不要包含冗余数据。并通过过滤器等过滤到坏数据。

  4. join的时候,如果出现OOM,并不一定总是Executor内存不够。因为Spark SQL会自动选择合适的join策略,并不一定总是Shuffle hash join。当其中一个很小的时候,会选择Broadcast hash join。而这需要先将数据发送到Driver上,才能进行Broadcast hash join。我们曾经就遇到过这个问题。但是,这儿的疑惑是,这个DataFrame的数据量应该很小,按理说不会造成Driver的OOM。应当再研究一下这个问题。看看到底是按照什么标准定义的小。