Spark 性能调优的本质是消除瓶颈,而瓶颈的消除效果是分层次的。一个错误的架构或算法,即使配置了顶级的资源,其性能也远不如使用正确算法配合普通配置的作业。

我们将调优层次划分为三个重要级别:

  1. 架构与算法层 (最高效,影响 80%):
    • 优化:重写 RDD/DataFrame 逻辑,采用更高效的数据结构。
    • 核心:消除数据倾斜(Data Skew),改变 Shuffle 密集型操作的实现方式(如将 Sort-Merge Join 优化为 Broadcast Hash Join)。
    • 工具:充分利用 Adaptive Query Execution (AQE) 的能力。
  2. 代码实现层 (中等高效,影响 15%):
    • 优化:避免使用 Python 或 Scala 的 UDF(用户自定义函数),转而使用 Spark SQL 内置函数或表达式。
    • 核心:使用 Kryo 序列化替换默认的 Java 序列化,减少对象大小和网络传输量。
    • 工具:避免频繁创建临时 RDD 或 DataFrame,进行链式操作。
  3. 配置参数层 (基础保障,影响 5%):
    • 优化:合理配置 Executor 数量、CPU 核心数、内存大小。
    • 核心:调整 JVM GC 参数,确保 Full GC 不成为主要瓶颈。

本部分将重点聚焦于基础保障、内存内核和 Shuffle 算法调优。

第一章:资源分配与调度模型精讲

资源配置是 Spark 应用性能的基石。不合理的资源分配会导致 CPU 饥饿、内存溢出或大量的 GC 暂停。

1.1 Executor 资源分配黄金法则 (5-Core Principle)

为确保每个 Executor 既能并行计算,又有充足的 I/O 线程处理 Shuffle 和 HDFS 读写,我们推荐采用 5核/Executor 的黄金原则。

核心配置项推荐:

配置项

推荐值

原理剖析

spark.executor.cores

5

经验证的最佳平衡点。可以同时运行 5 个 Task,并为每个 Task 预留 I/O 线程,避免线程竞争。

spark.executor.memory

15G ~ 25G

配合 G1GC,确保堆内存大小适中。Executor 内存过大会增加 GC 暂停时间,过小则易触发 OOM。

spark.executor.memoryOverhead

10% * memory + 384MB

至关重要。 这是分配给堆外内存 (Off-Heap)、Netty 缓冲区、线程栈、JVM JIT 等非堆区域的内存,YARN 容器尺寸将计入此部分,防止 YARN 上的 OOM。

spark.executor.instances

依据集群总核数 $C$ / 5 计算

最大化利用集群资源,并预留少量核心给 YARN 的 NodeManager/ResourceManager。

容器总尺寸计算: 在 YARN 或 Kubernetes 上,最终申请的容器内存大小是:

$$\text{Container Size (MB)} = \text{spark.executor.memory} + \text{spark.executor.memoryOverhead}$$

如果设置内存为 20GB,开销约为 2.4GB,容器将申请约 22.4GB。

1.2 动态资源分配 (Dynamic Allocation) 的高效利用

动态分配(spark.dynamicAllocation.enabled=true)允许 Spark 根据工作负载增减 Executor,提高集群利用率。

前提条件:External Shuffle Service (ESS) 使用动态分配时,必须启用 spark.shuffle.service.enabled = true

  • 原理: ESS 将 Shuffle 数据存储和读取的责任从 Executor 转移到一个独立的、长驻的服务进程。即使 Executor 因空闲被回收,其他 Executor 仍然可以从 ESS 读取其写入的 Shuffle 数据,保证容错性和数据的连续性。

调优参数:

配置项

默认值

作用与调优建议

spark.dynamicAllocation.minExecutors

0

保持的最小 Executor 数量,建议设置为一个能处理元数据和少量启动任务的值(如 10)。

spark.dynamicAllocation.maxExecutors

无限制

上限,必须根据集群总资源设置。

spark.dynamicAllocation.executorIdleTimeout

60s

Executor 空闲多久后被回收。ETL 任务可以设置长一些(120s),交互式任务设置短一些。

spark.dynamicAllocation.schedulerBacklogTimeout

1s

任务积压多久后开始申请新的 Executor。设置低一些可以更快地获取资源。

1.3 Driver 节点:控制中枢的内存保障

Driver 负责 SparkContext 的创建、逻辑计划的生成、Task 的分发和结果的聚合。

Driver 内存不足 (OOM) 的常见原因:

  1. 大规模 collect() 操作: 将 RDD 或 DataFrame 的全部数据拉取到 Driver 内存。这是最常见的反模式。
  2. 超大广播变量: 广播变量在 Driver 侧构建,如果大小超过 Driver 内存限制,会失败。
  3. 结果聚合: 收集大量的 Task 状态、Metrics 或较小的聚合结果。

调优:

  • 常规 ETL:spark.driver.memory = 4g
  • 涉及大量广播或需要收集大量 Metrics 的任务:spark.driver.memory = 16g ~ 32g

第二章:内存模型深度解析与 GC 优化

Spark 性能的基石在于对 JVM 内存的细致管理,特别是 Tungsten 引擎的引入。

2.1 统一内存管理与 Tungsten 引擎

从 Spark 1.6 开始,Executor 内存采用统一管理,将堆内存划分为四个主要区域:

内存区域

占比 (默认)

主要用途

优先级与管理

Reserved Memory

300MB

系统预留,无法被应用使用。

N/A

User Memory

40% of Heap - 300MB

存放 UDF、应用程序自定义的数据结构、以及外部库的对象。

不受 Spark 统一管理,是 OOM 的主要来源之一。

Storage Memory (存储)

30% of Usable Heap

用于缓存 RDDs 和 DataFrame (通过 cache()persist())。

可被执行内存驱逐 (Evict),但驱逐代价高。

Execution Memory (执行)

30% of Usable Heap

用于 Shuffle、Join、聚合等操作的临时数据结构(如 Hash Map、Sort Buffer)。

可被存储内存借用,并在需要时驱逐存储内存中的 Block。

Tungsten 与堆外内存 (Off-Heap) Tungsten 引擎通过内存编码和高效序列化,将数据结构扁平化,并尽可能使用堆外内存 (Off-Heap Memory)。

  • 配置: 通过 spark.memory.offHeap.enabled = true 开启,并设置 spark.memory.offHeap.size
  • 优势: 堆外内存不受 JVM GC 管理,完全消除了执行内存中大型数据结构(如 Shuffle Sort Buffer)带来的 GC 暂停,极大地提升了稳定性。

2.2 序列化优化:Kryo 的极致效率

序列化是内存和网络传输优化的第一步。默认的 Java 序列化效率低、体积大。

推荐使用 Kryo 序列化:

spark.serializer = org.apache.spark.serializer.KryoSerializer
# 启用或定制注册器,这是 Kryo 性能的关键
spark.kryo.registrator = com.yourcompany.MyKryoRegistrator
# 增大 Kryo 缓冲区,防止序列化单个大对象时失败
spark.kryoserializer.buffer.max = 1024m 

注意: 如果不注册自定义类,Kryo 会进入慢速模式,性能会大幅下降。务必为所有被序列化(跨网络传输或缓存)的自定义类进行注册。

2.3 JVM GC 调优:G1GC 配置实战

对于 15GB 以上的大 Executor 堆内存,必须使用 G1GC (Garbage First Garbage Collector) 以控制 GC 暂停时间。

G1GC 推荐配置 (添加到 spark.executor.extraJavaOptions):

-XX:+UseG1GC 
-XX:G1HeapRegionSize=16M 
-XX:MaxGCPauseMillis=200 
-XX:InitiatingHeapOccupancyPercent=35 
-XX:+ParallelRefProcEnabled
  • MaxGCPauseMillis=200:设置最大暂停时间目标为 200 毫秒。G1GC 会尽力满足此目标。
  • InitiatingHeapOccupancyPercent=35:让 G1GC 在老年代占用达到 35% 时就启动并发标记周期。默认值较高,往往导致并发标记启动太晚,最终触发 Full GC。提前启动并发周期是避免 Full GC 的关键。

第三章:Shuffle 与数据倾斜 (Data Skew) 的终极处理

Shuffle 是 Spark 应用中最慢、最昂贵的操作,因为它涉及磁盘 I/O、网络传输和序列化/反序列化。

3.1 Shuffle 瓶颈的根源:数据倾斜

数据倾斜是 Shuffle 性能低下的主要原因。当 Join 或 GroupBy 的 Key 存在热点 Key 时,所有包含该热点 Key 的记录会被分配给同一个 Task,导致:

  1. Long Tail Task: 该 Task 执行时间极长,成为整个 Stage 的瓶颈。
  2. OOM: 该 Task 接收的数据量远超 Execution Memory 的容量。

3.2 倾斜处理:三板斧策略

我们必须在逻辑层面打破热点 Key 的聚集。

3.2.1 针对聚合/GroupBy 的随机盐扩容

适用于:热点 Key 数量不多,但单个 Key 记录数巨大的场景。

  1. 第一次 Map/加盐: 对原始 Key 附加一个$1$到$N$之间的随机数(Salt),例如 Key_A 变成 Key_A_1, Key_A_2, ..., Key_A_N
    • 目的:将原来聚集到 1 个 Task 的数据,分散到$N$个 Task 中。
  2. 第一次 Reduce/局部聚合: 执行局部聚合(例如 groupBy(Key_i).agg(...))。
  3. 第二次 Map/去盐: 去除随机数,恢复原始 Key。
  4. 第二次 Reduce/全局聚合: 根据原始 Key 执行最终的全局聚合。

3.2.2 针对 Join 的倾斜 Key 优化

适用于:Join 两张大表时出现倾斜。

  1. 检测与分离: 统计左表(倾斜表)的 Key 频率,找出热点 Key 列表$K_{skew}$。
  2. 非倾斜部分: 对非倾斜 Key 的数据正常 Join。
  3. 倾斜部分处理:
    • 将右表(非倾斜侧)广播到所有 Executor。
    • 将左表(倾斜侧)中包含$K_{skew}$的记录,通过随机盐(如 3.2.1 描述)进行扩容。
    • 将扩容后的倾斜 Key 数据集与广播后的右表进行 Join。

3.2.3 Spark 3.0+ 的自动倾斜处理 (AQE)

如果您使用 Spark 3.0 及以上版本,务必开启 AQE (spark.sql.adaptive.enabled = true)。

  • AQE 优势: AQE 可以动态检测 Shuffle 后的分区大小。如果发现某个分区远大于平均值(即倾斜),它会在运行时自动将这个热点分区拆分为多个子分区(Split),从而实现倾斜 Task 的并行化。

3.3 分区数与 Shuffle 性能

spark.sql.shuffle.partitions (默认 200) 是影响 Shuffle 的关键参数。

  • 太小: 导致每个 Task 接收的数据量过大,可能 OOM,且并行度不足。
  • 太大: 产生大量小文件和 Task,Shuffle 读写开销和元数据管理开销过大。

调优准则:

  • 理想情况下,一个 Task 处理的数据量在 128MB 到 256MB 之间。
  • 分区数$N = \lceil \text{Total Shuffle Write Size} / 200 \text{MB} \rceil$。

repartitioncoalesce 的区别:

  • repartition(N) 强制进行 Full Shuffle。数据会被平均分配到$N$个分区。用于增大分区数。
  • coalesce(N) 尽量避免 Full Shuffle。只在 Executor 内部合并数据,用于减少分区数。强烈推荐在 Shuffle 结束后用于减小分区数,以减少最终输出文件数量。

This post is for subscribers on the 网站会员 and 成为小万的高级会员 tiers only

Subscribe Now

Already have an account?