01|导读与学习路径
1.1 为什么要学 Shuffle
- 性能天花板:一旦出现宽依赖(跨分区数据重分布),就会触发 Shuffle,涉及网络、磁盘、排序/哈希、内存等多重成本。
- 稳定性关键:Shuffle 常伴随长尾任务、
FetchFailed
、OOM 等问题;解决这些问题的能力直接决定交付质量。 - 通用性:聚合、排序、去重、重分区、Join 都可能触发 Shuffle,覆盖绝大多数离线/交互/流式场景。
1.2 学完能做什么
- 看懂
EXPLAIN FORMATTED
中的Exchange
、SortMergeJoin
、BroadcastHashJoin
、ShuffledHashJoin
等物理算子。 - 在 Spark UI 里定位瓶颈(
Shuffle Read/Write
、最长 Task、GC、Peak Memory)。 - 用 AQE、分区/序列化/压缩、预聚合/广播 等手段进行系统调优与故障排查。
1.3 学习路径
- 先理解概念与触发 → 2) 端到端生命周期 → 3) 实现演化 → 4) 参数与组件 → 5) Join 与 Shuffle → 6) 优化与倾斜 → 7) AQE → 8) UI 阅读 → 9) 实战复现。
📘 Q&A
Q1:Shuffle 一定慢吗?
答:不必然,但网络与磁盘是硬成本;即使硬件很好,不合理的分区/策略也会拖慢。优化目标不是“消灭 Shuffle”,而是“必要且可控的 Shuffle”。
Q2:我只写 DataFrame/SQL,还需要懂 Shuffle 吗?
答:需要。SQL 物理计划中的 Exchange
就是 Shuffle 的信号;优化 SQL 的核心之一就是优化这些 Exchange
的产生与代价。
02|环境与工具准备(本地与集群)
2.1 版本与依赖建议
- Spark 3.3+(建议 3.4/3.5),JDK 8+/11,Scala 2.12,或 PySpark(Python 3.8+)。
- 本地至少 8C/16G 更顺手;有集群更好。
2.2 运行与调试
- 提交:
spark-submit
(集群)、spark-shell
/pyspark
(本地 REPL)。 - 监控:运行时访问 Spark UI(Driver 默认 4040 端口),历史作业看 History Server。
2.3 造数与基准
spark.range(n)
方便造整形序列;rand()
造随机列;cache()
可复用数据。- 对比实验时固定随机种子、记录配置与数据量,确保结论可复现。
📘 Q&A
Q1:为什么本地能跑、集群更慢?
答:集群涉及网络 I/O、远程存储、YARN/K8s 调度、资源隔离等因素;本地与集群瓶颈不同,调优要以目标环境为准。
Q2:UI 打不开怎么办?
答:确认 Driver 端口是否被占用;在日志中查 “Started SparkUI at …”;必要时设置 spark.ui.port
。
03|先修:Spark 计算模型速览
3.1 RDD / DataFrame / Dataset 定位
- RDD:底层抽象,元素是对象,灵活但开销大。
- DataFrame:带 Schema 的列式抽象,走 Catalyst + Tungsten + Codegen,通常默认首选。
- Dataset(Scala/Java):强类型的 DataFrame;PySpark 不提供 Dataset(是
Dataset[Row]
的包装)。
3.2 Job / Stage / Task 与依赖
- 窄依赖:每个子分区只依赖父 RDD 的一个分区(
map
、filter
)。 - 宽依赖:子分区依赖多个父分区(
groupByKey
、reduceByKey
、join
、repartition
、orderBy
),会产生 Shuffle。 - Stage:遇到宽依赖切分 Stage;Stage 内是窄依赖链路,可流水线执行。
3.3 计划与算子
EXPLAIN FORMATTED
可查看 Logical/Physical Plan;Exchange
表示重分区(即 Shuffle)。
3.4 最小示例(PySpark)
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.getOrCreate()
df = (spark.range(10_0000) # 10 万
.select((F.col("id")%1000).alias("user_id"),
(F.col("id")%100).alias("amount")))
res = (df.groupBy("user_id").agg(F.sum("amount").alias("amt"))
.orderBy(F.col("amt").desc()))
res.explain("formatted") # 观察是否有 Exchange、Sort
📘 Q&A
Q1:为什么 orderBy
几乎必定 Shuffle?
答:全局排序需要聚集所有分区的比较信息,必须重分布数据才能得到全序。
Q2:sortWithinPartitions
会 Shuffle 吗?
答:不会。它只在分区内排序,不改变分区边界。
04|什么是 Shuffle(定义、直觉与代价)
4.1 定义
为满足“同 key 的记录必须到同一分区”或“改变分区策略”的需要,Spark 触发跨分区重分布,即 Shuffle。
4.2 直觉
像邮局按地址分拣信件;或把多个班的“同学号”聚到同一教室,便于点名和统计。
4.3 成本构成
- 网络:跨机拉取,连接/传输耗时。
- 磁盘:溢写、索引、落盘。
- CPU/内存:排序/哈希、序列化/反序列化、压缩/解压。
- 长尾:某些分区数据异常大导致任务拖慢。
4.4 何时不会 Shuffle
- 纯窄依赖:
map
/filter
/mapPartitions
。 coalesce(num, shuffle=false)
尽量不打散。
📘 Q&A
Q1:“没有 Shuffle 就一定快”成立吗?
答:不一定。没有 Shuffle 但对象开销巨大、UDF 低效也可能慢。优化要综合观察算子/对象/内存/IO。
Q2:怎么初步判断一条 SQL 会不会 Shuffle?
答:看是否存在分布式聚合/排序/重分区需求;EXPLAIN
中若有 Exchange
基本意味着会 Shuffle。
05|Shuffle 触发场景与 DAG 切分
5.1 典型触发
- 聚合:
groupByKey
、reduceByKey
、aggregateByKey
、groupBy().agg()
- 连接:
join
(等值/非等值)、cogroup
- 去重/重分区:
distinct
、repartition
- 排序:
orderBy
、sortByKey
- 分发:
distributeBy
、bucketBy
(落地时)
5.2 写法影响
groupByKey
会把所有值拉到网络上,易放大;reduceByKey/aggregateByKey
可map 端预聚合。repartition
一定全局打散;coalesce
尽量不打散。
5.3 SQL/DF 的 Exchange
Exchange (hashpartitioning(key, N))
:哈希重分区。Exchange (range)
:范围重分区,常用于全局排序或均衡。
5.4 案例:Stage 切分对比(思路)
- 同一任务分别用
groupByKey
与reduceByKey
实现,观察 Stage 数、Shuffle Read/Write
差异。
📘 Q&A
Q1:distinct
为什么会 Shuffle?
答:需要把相同值聚在一起去重,属于全局性操作。
Q2:coalesce(1)
会很快吗?
答:它减少分区数,可能导致单任务写超大文件和长尾;慎用。
06|Shuffle 生命周期:Map 写 → Reduce 拉(端到端)
6.1 Map 端(写出)
- 分区器:决定记录被发送到哪个 reduce 分区(
HashPartitioner
/RangePartitioner
/自定义)。 - ShuffleWriter:把记录写入中间文件;内存不足触发 spill,最终归并合并。
- 压缩:
spark.shuffle.compress
、spark.shuffle.spill.compress
。
6.2 Reduce 端(拉取)
- Block 拉取:依据索引定位 map 输出块;通过网络获取。
- 聚合/排序:哈希表聚合或按序归并;内存不足继续 spill。
6.3 并发与流控
spark.reducer.maxReqsInFlight
控制并发拉取请求数。spark.network.timeout
、spark.shuffle.io.maxRetries
、retryWait
控制超时与重试。
6.4 文件与索引
- 数据文件:按分区顺序写;索引文件记录各分区的偏移,便于定位。
📘 Q&A
Q1:为什么要压缩?
答:降低网络与磁盘开销,尽管会增加 CPU。对于大规模 Shuffle,通常收益大于开销。
Q2:Map 端 spill 是坏事吗?
答:是容量不足的自然反应,不必恐慌;但频繁 spill 暗示内存/并行度/对象过多,需要调优。
07|实现演化:Hash Shuffle → Sort Shuffle → Tungsten
7.1 Hash-based Shuffle(早期)
- 每个 reduce 分区维护独立输出 → 文件爆炸、FD 耗尽,不可扩展。
7.2 Sort-based Shuffle(主流)
- Map 端对数据按(分区、Key)排序后顺序写 → 大幅减少文件数量;代价是排序 CPU/内存。
7.3 Tungsten / UnsafeRow / Off-heap
- 使用
UnsafeRow
二进制布局与堆外内存,减少对象与 GC,提升缓存友好性与吞吐。
7.4 Push-based Shuffle(补充)
- Map 端将数据主动推送给下游合并点做预合并,降低 reduce 拉取碎片;启用依赖环境支持。
📘 Q&A
Q1:现代 Spark 用的哪种?
答:以 Sort-based Shuffle 为主,并结合 Tungsten 的内存/二进制优化。
Q2:为什么 Hash 方案会文件爆炸?
答:map×reduce 的组合带来非常多的独立输出,分区一多就难以管理。
08|关键组件与内部机制
8.1 组件一览
- ShuffleManager:生命周期管理。
- BlockManager:数据块存储与传输(内存/磁盘/网络)。
- ShuffleBlockResolver:索引、文件定位。
- SerializerManager:Java/Kryo 序列化。
- External Shuffle Service:Executor 挂掉仍可保留 map 输出,减少重算。
8.2 与存储层
- 本地磁盘的读写路径、与 HDFS/对象存储的协作;注意读写热点与并发。
8.3 序列化与压缩
- Kryo 更紧凑更快;注册类进一步减少开销。
- 压缩编解码器选择影响 CPU 与 I/O 平衡。
📘 Q&A
Q1:External Shuffle Service 有必要开吗?
答:大集群/长作业建议开启,有助于减少 FetchFailed
触发的整段重算。
Q2:Kryo 一定比 Java 序列化快?
答:一般更快更小,但要正确注册类并结合对象模式使用,才能稳定获益。
09|参数与默认值解读(按主题分组)
9.1 并行度与分区
spark.sql.shuffle.partitions
:SQL/DF 的下游并行度基准;结合集群资源、数据规模调整。spark.default.parallelism
:RDD 默认并行度。
9.2 压缩与序列化
spark.shuffle.compress
、spark.shuffle.spill.compress
:建议开启。spark.serializer
:使用KryoSerializer
;spark.kryo.registrator
注册类。
9.3 拉取并发与网络
spark.reducer.maxReqsInFlight
、spark.reducer.maxBlocksInFlightPerAddress
。spark.network.timeout
、spark.shuffle.io.maxRetries
、spark.shuffle.io.retryWait
。
9.4 其他关键
spark.shuffle.file.buffer
(写缓冲)、spark.locality.wait
(局部性等待)、spark.shuffle.service.enabled
(External Shuffle Service)。
📘 Q&A
Q1:shuffle.partitions
是越大越好吗?
答:不是。过大带来调度开销与小文件;过小则并行度不足、易长尾。要以 UI 指标与目标文件大小迭代调参。
Q2:网络参数怎么定?
答:结合集群带宽/延迟做小规模压测;观察 FetchFailed
、Timeout
、内存峰值后再调整并发与重试。
10|Join 与 Shuffle(策略选择与成本差异)
10.1 等值 Join 三策略
- BHJ(广播哈希):把小表发到每个 Executor,本地哈希探测,大表无需 Shuffle,通常最快。
- SMJ(排序合并):两侧排序后线性合并,稳定扩展,适合双大表。
- SHJ(Shuffle 哈希):两侧重分区,在分区内构建哈希,适合中等规模且内存可承载。
10.2 阈值与 Hints
spark.sql.autoBroadcastJoinThreshold
控制是否自动广播。- Hints:
BROADCAST
/MERGE
/SHUFFLE_HASH
可干预物理计划。
10.3 Bucketing / Sort
- 两侧一致分桶/排序可减少或跳过
Exchange
/Sort
;以EXPLAIN + UI
实证为准。
10.4 实操建议
- 能广播就广播(小表);双大表选 SMJ;中等规模尝试 SHJ;始终
EXPLAIN
校验。
📘 Q&A
Q1:AQE 为什么会把 SMJ 改成 BHJ?
答:运行时发现一侧实际很小(落入阈值),代价模型选择更优的 BHJ。
Q2:非等值 Join 怎么办?
答:可能落到嵌套循环或代价更高策略。先过滤下推、或将区间离散化近似等值,再 Join。
11|性能优化:从代码到集群
11.1 代码层
- 过滤/列裁剪前置,减少上游数据量。
- 用
reduceByKey/aggregateByKey
代替groupByKey
,做 map-side 预聚合。 - 少用 Python UDF;优先内置函数或 Pandas UDF(向量化)。
11.2 分区层
repartition
(全局打散)与coalesce
(合并、不打散)。RangePartitioner
/自定义 partitioner 对热点键做均衡。
11.3 集群层
- 合理的
shuffle.partitions
;开启压缩与 Kryo;治理小文件(目标文件大小、后续 compaction)。
11.4 交付物
- 配置清单、
EXPLAIN
片段、UI 截图、数据规模说明;让优化可复现、可审核。
📘 Q&A
*Q1:为何“select ”会拖慢?
答:宽表传输放大网络与序列化开销;列裁剪能显著降压。
Q2:Pandas UDF 是否总比普通 UDF 快?
答:通常更快,因为向量化;但要注意批量大小、序列化与内存峰值,仍需实测。
12|数据倾斜:诊断与治理
12.1 诊断
- UI 上某 Task 特别慢,
Shuffle Read
异常大;抽样统计发现热点 Key。
12.2 治理策略
- 优先 BHJ(小表广播绕开大表 Shuffle)。
- AQE Skew(自动切分大分区)。
- Salting(热点 Key 加随机前缀拆散,下游再聚合)。
- 两阶段聚合、业务前置预聚合/预过滤。
12.3 防御性建模
- 分桶/分区策略提前考虑热点分布;冷热数据分离。
📘 Q&A
Q1:增加分区数能治好所有倾斜吗?
答:不能。只会把“一个大长尾”拆成“多个小长尾”。需要 AQE Skew/Salting/广播/预聚合 的组合拳。
Q2:怎么快速定位热点 Key?
答:采样统计频次 TopN、查看按 Key 的记录数直方图;在 UI 里结合分区数据量推断。
13|容错与失败恢复(FetchFailed 等)
13.1 常见错误
FetchFailed
(远端块丢失/超时)、磁盘错误、网络抖动、Executor 异常退出。
13.2 恢复与重试
- 失败时 Spark 会重跑相关 map 端 生成输出,再重启 reduce;外部 Shuffle 服务可减少重算范围。
13.3 降低风险
- 调整网络超时与重试;降低并发拉取;开启 External Shuffle Service;黑名单/推测执行减少坏节点影响。
📘 Q&A
Q1:为什么重试次数多了反而更慢?
答:频繁重试掩盖根因(如网络/磁盘故障),浪费资源。应先定位原因再定策略。
Q2:推测执行能解决长尾吗?
答:对非确定性长尾有帮助(多副本竞争),但对确定性热点帮助有限,应结合倾斜治理。
14|如何读 Spark UI(以 Shuffle 为主线)
14.1 SQL 页面
- 看 Physical/Final Plan 是否含
Exchange
/Sort
/*JoinExec
,确认策略与 AQE 改写。
14.2 Stages 页面
- 关注
Shuffle Read/Write
、Task 数、最大任务时长,定位网络/排序/长尾瓶颈。
14.3 Executors 页面
- 看 Peak Memory(广播/哈希表压力)、GC Time、I/O。
14.4 截图与误读
- 注意取 Final Plan(AQE 生效后的最终计划),避免解读到初始计划。
📘 Q&A
Q1:看到 Exchange
就一定慢吗?
答:不一定。关键看数据规模、并行度与下游算子;必要但可控的 Exchange
是正常的。
Q2:如何判断是 I/O 还是 CPU 瓶颈?
答:结合 Shuffle Read/Write
(I/O)、GC/CPU 利用率(CPU)、任务时间分布综合判断。
15|上手实验:5 组可复现练习(含记录模板)
建议:每组都记录用时、Shuffle 读写、最长 Task、输出文件数、关键配置,并附 UI 截图。
15.1 实验一:groupByKey
vs reduceByKey
- 目标:证明 map‑side 预聚合可显著减少 Shuffle。
- 要点:同数据规模下,
reduceByKey
的Shuffle Read
应明显更低。
15.2 实验二:repartition
vs coalesce
- 目标:理解全局打散与合并;观察
Exchange
差异与输出文件数变化。
15.3 实验三:序列化与压缩
- 目标:开启 Kryo、Shuffle 压缩,观察端到端时间与 GC 变化。
15.4 实验四:分区数与小文件
- 目标:64/256/1024 分区对性能与产物(文件数/大小)的影响对比。
15.5 实验五:AQE 合并与倾斜拆分
- 目标:开启 AQE,观察 Final Plan 分区数减少与 Skew 拆分;记录时间收益。
15.6 实验记录建议
- 描述数据规模、代码片段、配置、UI 截图、结论要点,便于复现与评审。
📘 Q&A
Q1:为什么我本地实验看不出差异?
答:数据量太小或硬件太强,瓶颈不明显。增大数据、关掉 cache、确保触发 Shuffle 再测试。
Q2:实验时需要关 AQE 吗?
答:基准对比时建议先关,得到原始策略结果;再开 AQE 看自适应增益。
Comments