01|导读与学习路径

1.1 为什么要学 Shuffle

  • 性能天花板:一旦出现宽依赖(跨分区数据重分布),就会触发 Shuffle,涉及网络、磁盘、排序/哈希、内存等多重成本。
  • 稳定性关键:Shuffle 常伴随长尾任务FetchFailed、OOM 等问题;解决这些问题的能力直接决定交付质量。
  • 通用性:聚合、排序、去重、重分区、Join 都可能触发 Shuffle,覆盖绝大多数离线/交互/流式场景。

1.2 学完能做什么

  • 看懂 EXPLAIN FORMATTED 中的 ExchangeSortMergeJoinBroadcastHashJoinShuffledHashJoin 等物理算子。
  • 在 Spark UI 里定位瓶颈(Shuffle Read/Write、最长 Task、GC、Peak Memory)。
  • AQE分区/序列化/压缩预聚合/广播 等手段进行系统调优与故障排查。

1.3 学习路径

  1. 先理解概念与触发 → 2) 端到端生命周期 → 3) 实现演化 → 4) 参数与组件 → 5) Join 与 Shuffle → 6) 优化与倾斜 → 7) AQE → 8) UI 阅读 → 9) 实战复现。

📘 Q&A

Q1:Shuffle 一定慢吗?
答:不必然,但网络与磁盘是硬成本;即使硬件很好,不合理的分区/策略也会拖慢。优化目标不是“消灭 Shuffle”,而是“必要且可控的 Shuffle”。

Q2:我只写 DataFrame/SQL,还需要懂 Shuffle 吗?
答:需要。SQL 物理计划中的 Exchange 就是 Shuffle 的信号;优化 SQL 的核心之一就是优化这些 Exchange 的产生与代价。


02|环境与工具准备(本地与集群)

2.1 版本与依赖建议

  • Spark 3.3+(建议 3.4/3.5),JDK 8+/11,Scala 2.12,或 PySpark(Python 3.8+)。
  • 本地至少 8C/16G 更顺手;有集群更好。

2.2 运行与调试

  • 提交:spark-submit(集群)、spark-shell/pyspark(本地 REPL)。
  • 监控:运行时访问 Spark UI(Driver 默认 4040 端口),历史作业看 History Server

2.3 造数与基准

  • spark.range(n) 方便造整形序列;rand() 造随机列;cache() 可复用数据。
  • 对比实验时固定随机种子、记录配置与数据量,确保结论可复现。

📘 Q&A

Q1:为什么本地能跑、集群更慢?
答:集群涉及网络 I/O、远程存储、YARN/K8s 调度、资源隔离等因素;本地与集群瓶颈不同,调优要以目标环境为准。

Q2:UI 打不开怎么办?
答:确认 Driver 端口是否被占用;在日志中查 “Started SparkUI at …”;必要时设置 spark.ui.port


03|先修:Spark 计算模型速览

3.1 RDD / DataFrame / Dataset 定位

  • RDD:底层抽象,元素是对象,灵活但开销大。
  • DataFrame:带 Schema 的列式抽象,走 Catalyst + Tungsten + Codegen,通常默认首选。
  • Dataset(Scala/Java):强类型的 DataFrame;PySpark 不提供 Dataset(是 Dataset[Row] 的包装)。

3.2 Job / Stage / Task 与依赖

  • 窄依赖:每个子分区只依赖父 RDD 的一个分区(mapfilter)。
  • 宽依赖:子分区依赖多个父分区(groupByKeyreduceByKeyjoinrepartitionorderBy),会产生 Shuffle
  • Stage:遇到宽依赖切分 Stage;Stage 内是窄依赖链路,可流水线执行。

3.3 计划与算子

  • EXPLAIN FORMATTED 可查看 Logical/Physical PlanExchange 表示重分区(即 Shuffle)。

3.4 最小示例(PySpark)

from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.getOrCreate()

df = (spark.range(10_0000)  # 10 万
        .select((F.col("id")%1000).alias("user_id"),
                (F.col("id")%100).alias("amount")))

res = (df.groupBy("user_id").agg(F.sum("amount").alias("amt"))
         .orderBy(F.col("amt").desc()))

res.explain("formatted")  # 观察是否有 Exchange、Sort

📘 Q&A

Q1:为什么 orderBy 几乎必定 Shuffle?
答:全局排序需要聚集所有分区的比较信息,必须重分布数据才能得到全序。

Q2:sortWithinPartitions 会 Shuffle 吗?
答:不会。它只在分区内排序,不改变分区边界。


04|什么是 Shuffle(定义、直觉与代价)

4.1 定义

为满足“同 key 的记录必须到同一分区”或“改变分区策略”的需要,Spark 触发跨分区重分布,即 Shuffle。

4.2 直觉

像邮局按地址分拣信件;或把多个班的“同学号”聚到同一教室,便于点名和统计。

4.3 成本构成

  • 网络:跨机拉取,连接/传输耗时。
  • 磁盘:溢写、索引、落盘。
  • CPU/内存:排序/哈希、序列化/反序列化、压缩/解压。
  • 长尾:某些分区数据异常大导致任务拖慢。

4.4 何时不会 Shuffle

  • 纯窄依赖:map / filter / mapPartitions
  • coalesce(num, shuffle=false) 尽量不打散。

📘 Q&A

Q1:“没有 Shuffle 就一定快”成立吗?
答:不一定。没有 Shuffle 但对象开销巨大、UDF 低效也可能慢。优化要综合观察算子/对象/内存/IO

Q2:怎么初步判断一条 SQL 会不会 Shuffle?
答:看是否存在分布式聚合/排序/重分区需求;EXPLAIN 中若有 Exchange 基本意味着会 Shuffle。


05|Shuffle 触发场景与 DAG 切分

5.1 典型触发

  • 聚合:groupByKeyreduceByKeyaggregateByKeygroupBy().agg()
  • 连接:join(等值/非等值)、cogroup
  • 去重/重分区:distinctrepartition
  • 排序:orderBysortByKey
  • 分发:distributeBybucketBy(落地时)

5.2 写法影响

  • groupByKey 会把所有值拉到网络上,易放大;reduceByKey/aggregateByKeymap 端预聚合
  • repartition 一定全局打散coalesce 尽量不打散。

5.3 SQL/DF 的 Exchange

  • Exchange (hashpartitioning(key, N)):哈希重分区。
  • Exchange (range):范围重分区,常用于全局排序或均衡。

5.4 案例:Stage 切分对比(思路)

  • 同一任务分别用 groupByKeyreduceByKey 实现,观察 Stage 数、Shuffle Read/Write 差异。

📘 Q&A

Q1:distinct 为什么会 Shuffle?
答:需要把相同值聚在一起去重,属于全局性操作。

Q2:coalesce(1) 会很快吗?
答:它减少分区数,可能导致单任务写超大文件和长尾;慎用。


06|Shuffle 生命周期:Map 写 → Reduce 拉(端到端)

6.1 Map 端(写出)

  • 分区器:决定记录被发送到哪个 reduce 分区(HashPartitioner/RangePartitioner/自定义)。
  • ShuffleWriter:把记录写入中间文件;内存不足触发 spill,最终归并合并
  • 压缩spark.shuffle.compressspark.shuffle.spill.compress

6.2 Reduce 端(拉取)

  • Block 拉取:依据索引定位 map 输出块;通过网络获取。
  • 聚合/排序:哈希表聚合或按序归并;内存不足继续 spill。

6.3 并发与流控

  • spark.reducer.maxReqsInFlight 控制并发拉取请求数。
  • spark.network.timeoutspark.shuffle.io.maxRetriesretryWait 控制超时与重试。

6.4 文件与索引

  • 数据文件:按分区顺序写;索引文件记录各分区的偏移,便于定位。

📘 Q&A

Q1:为什么要压缩?
答:降低网络与磁盘开销,尽管会增加 CPU。对于大规模 Shuffle,通常收益大于开销

Q2:Map 端 spill 是坏事吗?
答:是容量不足的自然反应,不必恐慌;但频繁 spill 暗示内存/并行度/对象过多,需要调优。


07|实现演化:Hash Shuffle → Sort Shuffle → Tungsten

7.1 Hash-based Shuffle(早期)

  • 每个 reduce 分区维护独立输出 → 文件爆炸、FD 耗尽,不可扩展。

7.2 Sort-based Shuffle(主流)

  • Map 端对数据按(分区、Key)排序后顺序写 → 大幅减少文件数量;代价是排序 CPU/内存。

7.3 Tungsten / UnsafeRow / Off-heap

  • 使用 UnsafeRow 二进制布局与堆外内存,减少对象与 GC,提升缓存友好性与吞吐。

7.4 Push-based Shuffle(补充)

  • Map 端将数据主动推送给下游合并点做预合并,降低 reduce 拉取碎片;启用依赖环境支持。

📘 Q&A

Q1:现代 Spark 用的哪种?
答:以 Sort-based Shuffle 为主,并结合 Tungsten 的内存/二进制优化。

Q2:为什么 Hash 方案会文件爆炸?
答:map×reduce 的组合带来非常多的独立输出,分区一多就难以管理。


08|关键组件与内部机制

8.1 组件一览

  • ShuffleManager:生命周期管理。
  • BlockManager:数据块存储与传输(内存/磁盘/网络)。
  • ShuffleBlockResolver:索引、文件定位。
  • SerializerManager:Java/Kryo 序列化。
  • External Shuffle Service:Executor 挂掉仍可保留 map 输出,减少重算。

8.2 与存储层

  • 本地磁盘的读写路径、与 HDFS/对象存储的协作;注意读写热点与并发。

8.3 序列化与压缩

  • Kryo 更紧凑更快;注册类进一步减少开销。
  • 压缩编解码器选择影响 CPU 与 I/O 平衡。

📘 Q&A

Q1:External Shuffle Service 有必要开吗?
答:大集群/长作业建议开启,有助于减少 FetchFailed 触发的整段重算

Q2:Kryo 一定比 Java 序列化快?
答:一般更快更小,但要正确注册类并结合对象模式使用,才能稳定获益。


09|参数与默认值解读(按主题分组)

9.1 并行度与分区

  • spark.sql.shuffle.partitions:SQL/DF 的下游并行度基准;结合集群资源、数据规模调整。
  • spark.default.parallelism:RDD 默认并行度。

9.2 压缩与序列化

  • spark.shuffle.compressspark.shuffle.spill.compress:建议开启。
  • spark.serializer:使用 KryoSerializerspark.kryo.registrator 注册类。

9.3 拉取并发与网络

  • spark.reducer.maxReqsInFlightspark.reducer.maxBlocksInFlightPerAddress
  • spark.network.timeoutspark.shuffle.io.maxRetriesspark.shuffle.io.retryWait

9.4 其他关键

  • spark.shuffle.file.buffer(写缓冲)、spark.locality.wait(局部性等待)、
  • spark.shuffle.service.enabled(External Shuffle Service)。

📘 Q&A

Q1:shuffle.partitions 是越大越好吗?
答:不是。过大带来调度开销与小文件;过小则并行度不足、易长尾。要以 UI 指标与目标文件大小迭代调参

Q2:网络参数怎么定?
答:结合集群带宽/延迟做小规模压测;观察 FetchFailedTimeout、内存峰值后再调整并发与重试。


10|Join 与 Shuffle(策略选择与成本差异)

10.1 等值 Join 三策略

  • BHJ(广播哈希):把小表发到每个 Executor,本地哈希探测,大表无需 Shuffle,通常最快。
  • SMJ(排序合并):两侧排序后线性合并,稳定扩展,适合双大表
  • SHJ(Shuffle 哈希):两侧重分区,在分区内构建哈希,适合中等规模且内存可承载。

10.2 阈值与 Hints

  • spark.sql.autoBroadcastJoinThreshold 控制是否自动广播。
  • Hints:BROADCAST / MERGE / SHUFFLE_HASH 可干预物理计划。

10.3 Bucketing / Sort

  • 两侧一致分桶/排序可减少或跳过 Exchange/Sort;以 EXPLAIN + UI 实证为准。

10.4 实操建议

  • 能广播就广播(小表);双大表选 SMJ;中等规模尝试 SHJ;始终 EXPLAIN 校验。

📘 Q&A

Q1:AQE 为什么会把 SMJ 改成 BHJ?
答:运行时发现一侧实际很小(落入阈值),代价模型选择更优的 BHJ。

Q2:非等值 Join 怎么办?
答:可能落到嵌套循环或代价更高策略。先过滤下推、或将区间离散化近似等值,再 Join。


11|性能优化:从代码到集群

11.1 代码层

  • 过滤/列裁剪前置,减少上游数据量。
  • reduceByKey/aggregateByKey 代替 groupByKey,做 map-side 预聚合
  • 少用 Python UDF;优先内置函数或 Pandas UDF(向量化)。

11.2 分区层

  • repartition(全局打散)与 coalesce(合并、不打散)。
  • RangePartitioner/自定义 partitioner 对热点键做均衡。

11.3 集群层

  • 合理的 shuffle.partitions;开启压缩与 Kryo;治理小文件(目标文件大小、后续 compaction)。

11.4 交付物

  • 配置清单、EXPLAIN 片段、UI 截图、数据规模说明;让优化可复现、可审核。

📘 Q&A

*Q1:为何“select ”会拖慢?
答:宽表传输放大网络与序列化开销;列裁剪能显著降压。

Q2:Pandas UDF 是否总比普通 UDF 快?
答:通常更快,因为向量化;但要注意批量大小、序列化与内存峰值,仍需实测。


12|数据倾斜:诊断与治理

12.1 诊断

  • UI 上某 Task 特别慢,Shuffle Read 异常大;抽样统计发现热点 Key

12.2 治理策略

  • 优先 BHJ(小表广播绕开大表 Shuffle)。
  • AQE Skew(自动切分大分区)。
  • Salting(热点 Key 加随机前缀拆散,下游再聚合)。
  • 两阶段聚合业务前置预聚合/预过滤

12.3 防御性建模

  • 分桶/分区策略提前考虑热点分布;冷热数据分离。

📘 Q&A

Q1:增加分区数能治好所有倾斜吗?
答:不能。只会把“一个大长尾”拆成“多个小长尾”。需要 AQE Skew/Salting/广播/预聚合 的组合拳。

Q2:怎么快速定位热点 Key?
答:采样统计频次 TopN、查看按 Key 的记录数直方图;在 UI 里结合分区数据量推断。


13|容错与失败恢复(FetchFailed 等)

13.1 常见错误

  • FetchFailed(远端块丢失/超时)、磁盘错误、网络抖动、Executor 异常退出。

13.2 恢复与重试

  • 失败时 Spark 会重跑相关 map 端 生成输出,再重启 reduce;外部 Shuffle 服务可减少重算范围。

13.3 降低风险

  • 调整网络超时与重试;降低并发拉取;开启 External Shuffle Service;黑名单/推测执行减少坏节点影响。

📘 Q&A

Q1:为什么重试次数多了反而更慢?
答:频繁重试掩盖根因(如网络/磁盘故障),浪费资源。应先定位原因再定策略。

Q2:推测执行能解决长尾吗?
答:对非确定性长尾有帮助(多副本竞争),但对确定性热点帮助有限,应结合倾斜治理。


14|如何读 Spark UI(以 Shuffle 为主线)

14.1 SQL 页面

  • Physical/Final Plan 是否含 Exchange/Sort/*JoinExec,确认策略与 AQE 改写。

14.2 Stages 页面

  • 关注 Shuffle Read/Write、Task 数、最大任务时长,定位网络/排序/长尾瓶颈。

14.3 Executors 页面

  • Peak Memory(广播/哈希表压力)、GC Time、I/O。

14.4 截图与误读

  • 注意取 Final Plan(AQE 生效后的最终计划),避免解读到初始计划。

📘 Q&A

Q1:看到 Exchange 就一定慢吗?
答:不一定。关键看数据规模、并行度与下游算子;必要但可控的 Exchange 是正常的。

Q2:如何判断是 I/O 还是 CPU 瓶颈?
答:结合 Shuffle Read/Write(I/O)、GC/CPU 利用率(CPU)、任务时间分布综合判断。


15|上手实验:5 组可复现练习(含记录模板)

建议:每组都记录用时、Shuffle 读写、最长 Task、输出文件数、关键配置,并附 UI 截图。

15.1 实验一:groupByKey vs reduceByKey

  • 目标:证明 map‑side 预聚合可显著减少 Shuffle。
  • 要点:同数据规模下,reduceByKeyShuffle Read 应明显更低。

15.2 实验二:repartition vs coalesce

  • 目标:理解全局打散与合并;观察 Exchange 差异与输出文件数变化。

15.3 实验三:序列化与压缩

  • 目标:开启 Kryo、Shuffle 压缩,观察端到端时间与 GC 变化。

15.4 实验四:分区数与小文件

  • 目标:64/256/1024 分区对性能与产物(文件数/大小)的影响对比。

15.5 实验五:AQE 合并与倾斜拆分

  • 目标:开启 AQE,观察 Final Plan 分区数减少与 Skew 拆分;记录时间收益。

15.6 实验记录建议

  • 描述数据规模、代码片段、配置、UI 截图、结论要点,便于复现与评审。

📘 Q&A

Q1:为什么我本地实验看不出差异?
答:数据量太小或硬件太强,瓶颈不明显。增大数据、关掉 cache、确保触发 Shuffle 再测试。

Q2:实验时需要关 AQE 吗?
答:基准对比时建议先关,得到原始策略结果;再 AQE 看自适应增益。