原文地址: https://0x0fff.com/spark-memory-management/
从Apache Spark 1.6开始,内存管理模型就发生了变化了。以前的内存管理模型是通过StaticMemoryManager 实现的,而现在,默认情况下,并不会使用这个内存管理模型。也就是说,即使你在Spark 1.5.x和Spark 1.6.0上运行相同的代码,结果也会是不同的。这一点要特别小心。
当然,你还是可以手动启用旧的内存管理模型的。只需要指定这么一个参数spark.memory.useLegacyMode
在前面的文章中,我已经描述过旧的Spark内存管理模型(译者注:译者同样翻译过这篇文章,链接为Spark架构)。而且,还写了Spark Shuffle implementations这篇文章,来简单描述不同的shuffle策略对内存的使用。
而在这篇文章中,我们会描述Spark 1.6.0中出现的新的内存管理模型。它是通过UnifiedMemoryManager实现的。
简而言之,新的内存管理模型,如下所示:
在上图中,我们会看到,总共有三个区域:
- Reserved Memory。如名字所示,这块内存是留给系统使用的。它的大小是硬编码的,我们无法改变它的大小,除非重新编译源代码,或者使用
spark.testing.reservedMemory
这个配置,而毫无疑问,后者是我们不推荐使用的,毕竟只是一个测使用的配置项。 在Spark 1.6.0中,它的大小是300MB,也就是说,我们分配的堆内存中的300MB,在我们计算程序可用内存时,不应该计算在内。你不可能用全部的堆内存来缓冲数据,因为这块内存的存在。当然这块内存也不是划分出来,啥事都不做。它会保存很多Spark的内部使用到的对象。 在给Spark executor设置堆内存时,如果你分配的堆内存小于1.5 * Reserved Memory=450MB
,那么,会直接甩你一个please use larger heap size然后转身离开。 - User Memory。这块内存,如名字所示,完全取决于用户。你可以用它存储一些你在RDD transformation中用到的数据结构。举个栗子,如果你想写通过使用
mapPartitions
实现一个聚集函数,并且内部维护一个Hash表,那么,这个Hash表就会被存储在User Memory中。 在Spark 1.6.0中,这块内存的大小,可以通过(Java Heap – Reserved Memory) * (1.0 – spark.memory.fraction)
计算得出,也就是说,默认情况下,是(Java Heap – 300MB) * 0.25
。如果我们给executor分配了4GB的堆内存,那么,User Memory将是949MB。 再重申一遍,这块内存,怎么使用,完全取决于用户。Spark不会帮你监管它。所以,如果你不清楚你的User Memory的大小,而存储了一个比它大的对象,那么,很可能会导致OOM。 - Spark Memory。这块内存是Spark管理的内存,它的大小是
(Java Heap – Reserved Memory) * spark.memory.fraction
,默认情况下,是(Java Heap – 300MB) * 0.75
。如果我们给Executor分配4GB的堆内存,那么,这块内存有2847MB。 整个Spark Memory被分成两部分,Storage Memory和Execution Memory。这两者,各占大小,通过spark.memory.storageFraction
这个参数来配置,默认情况,是0.5,也就是五五开。 在新的内存管理模型下,Storage Memory和Execution Memory的大小并不是固定的。
我们下面介绍Storage Memory和Execution Memory是如何被使用的:
- Storage Memory。这块内存会被Spark用来缓存数据,以及临时存放序列化过的数据。全部的broadcast变量,都会作为缓存,存储在这块区域中。如果你感兴趣,可以读一下这份代码unroll。你可能会看到,对于unrolled block,如果内存不够,没关系,只要persistence level允许,它会直接把unrolled partition放到硬盘里。全部的broadcast变量,会以MEMORY_AND_DISK的persistence level,存放在缓存中。
- Execution Memory.这块内存,Spark会用来存储执行Task时产生的对象。例如,它会存储Shuffle时,Map端产生的中间对象(译者疑问:那Reducer端存放在哪儿呢?前几天就是Shuffle时Reducer内存爆掉了),它也会存储Hash Aggregation时,内部需要用到的HashTable。当内存不足时,会自动刷到磁盘上。但是,其它的线程(Task),不能强制收回内存。
好,接下来我们介绍Storage Memory and Execution Memory
之间的内存重新分配问题。
从上面对Execution Memory
的介绍中,我们可以看到,我们不能强制回收Execution Memory
中的内存。因为Execution Memory
中存储的,都是运行Task时需要的对象,如果回收掉,那么Task就不能正常运行了。但是Storage Memory
就不一样了,它只是缓存,即使我们回收了这些内存,我们只需要简单的更新元数据,告诉它这块内存被刷到磁盘上或者已经被移除掉了,然后当我们再次访问这些缓存数据时,Spark会直接从磁盘上读(如果persistence level不允许刷到磁盘上,那么会重新计算。)
所以,我们完全可以回收掉Storage Memory,将它们划给Execution Meomry使用。那到底在什么情况下,Execution Memory可以使用Storage Memory呢?
只要发生下面两种情况中的一个即可:
- Storage Memory中有空闲的内存。比如说,缓存的数据并没有用了Storage Memory中全部的内存。那我们就可以将剩下的给Execution Memory使用。
- Storage Memory超过了刚开始给它分配的大小,并且这些内存全部被使用了。当发生这种情况时,
current storage memroy size - intial storage memory size
这些内存,都会被强制刷到磁盘上。
而与此相反,只有当Execution Memory有空闲的时,Storage Memory才能使用。
Initial Storage Memory size,可以通过下面的公式计算Spark Memory * spark.memory.fraction * spark.memory.storageFraction = (Java Heap – Reserved Memory) * spark.memory.fraction * spark.memory.storageFraction
(译者注,原文这里少了spark.memory.fraction)。在默认情况下,是(Java Heap – 300MB) * 0.75 * 0.5 = (Java Heap – 300MB) * 0.375
。如果Executor的堆内存有4GB,那Initial Storage Memory size是1423.5MB
所以,如果你要放到Storage Memory中的缓存的大小,要跟initial Storage Memory size一样大,甚至比它还大。而Execution Memory使用的内存,比Execution Memory的initial size还大,并且,此时Execution Memory已经压榨了Storage Memory的内存,使它不能放下全部的缓存。那么,Execution Memory并不会说,”抱歉,我腾出来点给你吧”。而是直接就一把掌,”滚犊子”。Storage Memory就只能委屈的用此时仅有的那点内存,只有当Execution Memory主动释放了一部分内存以后,它才能占用。
后记
原文中,有很多有价值的提问,所以建议还是读原文。并附带看一下提问。
其他链接
Spark内存管理详解 [Spark性能调优] Spark Shuffle 中 JVM 内存使用及配置内幕详情 Spark study notes: core concepts visualized