一、为什么 Join 经常最慢
- 高概率触发 Shuffle:按 Join Key 重分布 → 网络 I/O + 磁盘 I/O。
- 还要排序/建哈希表:SMJ 需要排序;BHJ/SHJ 需构建哈希表并探测。
- 容易长尾:某些分区数据过大(数据倾斜),拖慢整体作业。
一个容易懂的比喻:
- BHJ:把小字典复印发到每个教室(Executor),大班同学(大表)就近查字典。
- SMJ:两队人先各自按学号排序,再按序对齐比对。
- SHJ:两边人按学号分到对应教室,每个教室建哈希表再对齐。
二、三大等值 Join 策略
1) Broadcast Hash Join(BHJ)
原理
- 把小表广播到每个 Executor,本地构建哈希表;大表分片逐分区探测 → 通常无需对大表重分布。
触发
- 自动:
spark.sql.autoBroadcastJoinThreshold
以内(字节)倾向广播。 - 强制:SQL
/*+ BROADCAST(dim) */
;DataFramebroadcast(dim)
或df.hint("broadcast")
。
适用
- 明显“小”的维表 + “大”的事实表;希望绕开 Shuffle 与缓解倾斜。
优点
- 常常最快;对轻度倾斜更友好。
易错
- 广播对象过大 → OOM;Join 条件不是等值或含 UDF → 可能无法走 BHJ;同一管道多次广播 → 内存浪费。
2) Sort‑Merge Join(SMJ)
原理
- 两侧按 Key 排序,再线性合并。
触发
- 等值 Join;引擎常偏好 SMJ(可配)。
- 强制:SQL
/*+ MERGE(t) */
;spark.sql.join.preferSortMergeJoin=true
。
适用
- 双大表;需要可落盘、稳定的策略;与 bucketing/sort 配合更佳。
优点
- 稳定、可扩展;对内存敏感度低于 Hash。
易错
- 排序与 Shuffle 成本高;并行度不足时长尾明显。
3) Shuffle Hash Join(SHJ)
原理
- 两边按 Key 重分区到同一分区,在分区内构建哈希表并探测。
触发
- 等值 Join;可用
/*+ SHUFFLE_HASH(t) */
偏向该策略。
适用
- 中等规模、分区内可容纳哈希表;排序成本不划算的场景。
优点
- 不需要排序;内存合适时性能好。
易错
- 分区内一侧过大 → Hash 表 OOM/频繁 spill;对倾斜敏感。
三、引擎如何选(统计信息 + 阈值 + 偏好 + Hint)
- 统计信息:表大小/行数/基数 → 决定是否能广播、是否需要排序等。
- 阈值:
spark.sql.autoBroadcastJoinThreshold
(单位字节,建议显式设置)。 - 偏好:
spark.sql.join.preferSortMergeJoin
(是否偏向 SMJ)。 - Hints:
BROADCAST / MERGE / SHUFFLE_HASH
明确意图。
快速建议:
能 BHJ 就 BHJ;双大表用 SMJ;中等规模试 SHJ;始终用EXPLAIN FORMATTED
+ Spark UI 验证。
四、AQE(Adaptive Query Execution)三板斧
开关:spark.sql.adaptive.enabled=true
1) 动态广播
- 运行时发现某侧变小(落入阈值) → 自动切 BHJ。
2) 倾斜 Join 拆分
- 识别异常大分区 → 切分成多个子分区并行处理,缩短长尾。
3) 动态合并 Shuffle 分区
- 上游输出很小 → 合并下游小分区,降低调度开销。
观察:最终物理计划(AQE Final Plan)、Shuffle 分区数变化、长尾是否消失。
五、Bucketing / Sort:用“数据布局”减 Shuffle 成本
- 分桶(bucketing):事实表/维表对 Join Key 同桶、同桶数 → Join 时减少/跳过重分区。
- 排序:配合 SMJ 可复用已排序,降低排序成本。
- 注意:桶数一致、Key 完全一致、保留元数据;实际收益以
EXPLAIN
+ UI 实测为准。
六、非等值 Join 与过滤下推
- 非等值(
>
,<
,BETWEEN
,!=
)常无法走 BHJ/SMJ/SHJ,容易退化 Nested Loop。 - 优化思路:
- 先过滤后 Join:将高选择性谓词前置。
- 离散化/分桶映射:把范围映射为离散键,近似等值。
- 拆解:等值部分先 Join,再对结果过滤。
七、从 0 到 1:可跑实验(PySpark)
建议先关闭 AQE 观察“原始策略”,再打开 AQE 观察“自适应改写”。
1) 准备环境与数据
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.appName("join-lab").getOrCreate()
# 关闭 AQE(第一轮实验)
spark.conf.set("spark.sql.adaptive.enabled", "false")
# 事实表(大)
fact = (spark.range(5_000_000)
.select((F.col("id") % 100_000).alias("user_id"),
(F.col("id") % 100).cast("double").alias("amount")))
# 维表(小~中)
dim = (spark.range(100_000)
.select(F.col("id").cast("int").alias("user_id"),
(F.col("id") % 5).alias("tier")))
2) 三策略最小闭环
# 2.1 BHJ:强制广播维表
from pyspark.sql.functions import broadcast
res_bhj = fact.join(broadcast(dim), "user_id")
res_bhj.explain("formatted")
# 2.2 SMJ:禁止广播,偏好 Merge
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
res_smj = fact.join(dim.hint("merge"), "user_id")
res_smj.explain("formatted")
# 2.3 SHJ:禁止广播,偏好 Hash
spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
res_shj = fact.join(dim.hint("shuffle_hash"), "user_id")
res_shj.explain("formatted")
观察项:EXPLAIN FORMATTED
中的物理算子应分别为BroadcastHashJoinExec
/SortMergeJoinExec
/ShuffledHashJoinExec
。
Spark UI:Shuffle Read/Write、Peak Memory、任务数、最长 Task。
3) 开启 AQE 再跑
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 20*1024*1024) # 例:20MB
res_aqe = fact.join(dim, "user_id")
res_aqe.explain("formatted") # 观察是否动态广播与分区合并
4) 制造倾斜并治理(可选)
# 10% 数据聚到 user_id = 0,制造长尾
skew_fact = fact.withColumn("user_id",
F.when((F.rand() < 0.10), F.lit(0)).otherwise(F.col("user_id"))
)
# A) 调分区
spark.conf.set("spark.sql.shuffle.partitions", "400")
# B) AQE 倾斜
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
res_skew_aqe = skew_fact.join(dim, "user_id")
# C) 手写 salting(示意)
saltN = 8
dim_salted = dim.withColumn("salt", (F.rand()*saltN).cast("int"))
skew_fact_salted = skew_fact.withColumn("salt", (F.rand()*saltN).cast("int"))
res_salt = skew_fact_salted.join(dim_salted,
on=[skew_fact_salted.user_id==dim_salted.user_id,
skew_fact_salted.salt==dim_salted.salt],
how="inner")
八、倾斜与大小表组合策略
- 优先 BHJ:大表 + 小表 → 广播小表,绕开重分布。
- AQE Skew:自动识别并拆分大分区。
- Salting:对热点 Key 加随机前缀打散,后续再聚合还原。
- 预聚合/预过滤:减少 Join 前输入量。
- 合理并行度:
spark.sql.shuffle.partitions
与集群资源匹配。
九、常见坑清单(面试高频)
- Join Key 类型不一致(int vs string)→ 隐式
cast
、计划变差。 - Join 条件含 UDF/非确定函数 → 等值优化失效。
- NULL 语义:需要用
<=>
做 null‑safe 等值比较。 - 无用列
select *
→ 宽表传输、网络放大。 - 阈值默认随版本不同 → 显式设置并
EXPLAIN
。 - 无统计信息 → 代价估计失灵(记得 ANALYZE)。
- SMJ 并行度不足 → 排序长尾;提升
shuffle.partitions
。 - SHJ 分区内基数过大 → Hash 表 OOM;换 BHJ/SMJ 或预聚合。
- 广播对象太大 → OOM;先投影必需列后广播。
- “只增分区就能治倾斜” → 可能把长尾拆成多个小长尾,要用 AQE/Salting。
十、参数与 Hint 速查(关键词)
项 | 作用 |
---|---|
spark.sql.autoBroadcastJoinThreshold | 广播阈值(字节) |
spark.sql.join.preferSortMergeJoin | 偏好 SMJ |
spark.sql.adaptive.enabled | 开启 AQE |
spark.sql.adaptive.skewJoin.enabled | 倾斜 Join |
spark.sql.shuffle.partitions | 下游并行度 |
/*+ BROADCAST(t) */ | 强制广播 |
/*+ MERGE(t) */ | 偏向 SMJ |
/*+ SHUFFLE_HASH(t) */ | 偏向 SHJ |
表格只放关键词,不写长句,避免干扰排版。
十一、课堂小测(5 分钟)【在最后】
- 选择:5MB 维表 + 100GB 事实表做等值 Join,首选策略?
- 问答:为何开启 AQE 后计划可能从 SMJ 变成 BHJ?
- 动手:
autoBroadcastJoinThreshold=-1
、preferSortMergeJoin=true
时,如何强制 SHJ?
十二、课后作业
- 三策略 A/B 测试:分别跑 BHJ/SMJ/SHJ,记录运行时间、Shuffle 读写、任务数、最长 Task,写出对比结论。
- AQE 提升报告:开启 AQE + Skew Join,复跑倾斜场景,截图“最终物理计划”和 UI 指标,描述提升点。
- Bucketing 尝鲜:对同一 Key 写两张分桶表,Join 对比是否减少 Exchange/Sort,并记录限制条件。
十三、Scala 附:Hints 与 EXPLAIN 快速参考
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val spark = SparkSession.builder.getOrCreate()
import spark.implicits._
val fact = spark.range(5_000_000)
.select(($"id" % 100000).as("user_id"), ($"id" % 100).cast("double").as("amount"))
val dim = spark.range(100000)
.select($"id".cast("int").as("user_id"), ($"id" % 5).as("tier"))
// BHJ
val bhj = fact.join(broadcast(dim), Seq("user_id"))
bhj.explain("formatted")
// SMJ
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
val smj = fact.join(dim.hint("merge"), Seq("user_id"))
smj.explain("formatted")
// SHJ
spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
val shj = fact.join(dim.hint("shuffle_hash"), Seq("user_id"))
shj.explain("formatted")
课堂小测(5 分钟)【答案】
一、选择题
题干:5MB 维表 + 100GB 事实表做等值 Join,首选策略?
标准答案
BHJ(Broadcast Hash Join)
讲解要点
- 绕开大表 Shuffle:广播 5MB 小表到每个 Executor,本地哈希探测大表。
- 阈值命中:5MB 通常低于常用广播阈值(建议显式设置阈值,避免版本差异)。
- 排除条件:非等值、UDF 参与条件、内存紧张(可精简列后再广播,或改 SMJ)。
截图位点(占位)


二、问答题
题干:为何开启 AQE 后计划可能从 SMJ 变成 BHJ?
标准答案
- 运行时统计变小:上游过滤/裁剪后,一侧真实数据量落入广播阈值,AQE 动态改写为 BHJ。
- 代价更优:BHJ 省去排序与大表重分布,运行时代价模型判定更快。
- 联动优化:AQE 合并小分区、本地读取等,进一步降低数据量与数据移动。
截图位点(占位)


三、动手题
题干:autoBroadcastJoinThreshold=-1
、preferSortMergeJoin=true
时,如何强制 SHJ?
目标:禁自动广播、偏好 SMJ 的前提下,仍让引擎采用 SHJ。
提示:Hint 优先,但需满足等值 Join等前提;为避免计划被改写,建议本题关闭 AQE 进行验证。
A. 配置前置(建议)
# PySpark
)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1"spark.conf.set("spark.sql.join.preferSortMergeJoin", "true"
)spark.conf.set("spark.sql.adaptive.enabled", "false") # 避免 AQE 改写
B. SQL 解(强制 SHJ)
SELECT /*+ SHUFFLE_HASH(f), SHUFFLE_HASH(d) */
f.user_id, f.amount, d.tierFROM
fact fJOIN
dim dON f.user_id =
d.user_id;-- 前置:
-- SET spark.sql.autoBroadcastJoinThreshold = -1;
-- SET spark.sql.join.preferSortMergeJoin = true;
-- 可选: SET spark.sql.adaptive.enabled = false;
C. PySpark 解(DataFrame Hint)
# 假定 fact(user_id, amount), dim(user_id, tier) 已就绪(见长文的造数代码)
)
res_shj = fact.hint("shuffle_hash").join(dim.hint("shuffle_hash"), "user_id"res_shj.explain("formatted") # 期望: ShuffledHashJoinExec
D. 计划校验(简易自动校验,可选)
import
io, sys
buf = io.StringIO()
old = sys.stdout
sys.stdout = bufres_shj.explain("formatted"
)
sys.stdout = old
plan = buf.getvalue()print("Plan snippet:\n", "\n".join(plan.splitlines()[:30
]))assert "ShuffledHashJoin" in plan, "未命中 SHJ,请检查 hint/前置条件"
E. 截图位点(占位)


F. 常见误区(强制 SHJ 失败原因)
- 非等值 Join / 条件含 UDF → 无法用 SHJ。
- 单分区基数过大 → Hash 表 OOM 或引擎保护性退回 SMJ。
- AQE 开着时又触发了动态广播 → 计划被改回 BHJ;本题请先关 AQE 验证。
四、度量记录模板
表格只放关键词/数字,避免长句。
实验环境
项 | 值 |
---|---|
Spark | 3.x |
模式 | Local/Cluster |
核心/内存 | 8C/16G |
存储 | SSD |
三策略对比
指标 | BHJ | SMJ | SHJ |
---|---|---|---|
用时(s) | |||
Shuffle Read(GB) | |||
Shuffle Write(GB) | |||
Max Task(s) | |||
Peak Mem(GB) |
AQE 前后
指标 | 关闭 | 开启 |
---|---|---|
用时(s) | ||
分区数 | ||
是否动态广播 | 否/是 | 否/是 |
五、提交规范(作业区)
- 选择题:1–2 句解释+一张 DAG 截图(BHJ 无大表 Exchange)。
- 问答题:Final Plan 截图(SMJ→BHJ)+ 1–2 句原因。
- 动手题:
EXPLAIN FORMATTED
片段(含ShuffledHashJoinExec
)+ Stages 截图(无 Sort 算子)+ 失败复盘(若未命中)。
六、速查清单(上机前看一眼)
- 小表 + 大表(等值)→ BHJ
- 双大表 → SMJ
- 中等规模、排序不划算 → SHJ
- AQE:动态广播 / 倾斜拆分 / 合并分区
- Hints:
BROADCAST
/MERGE
/SHUFFLE_HASH
- 必查:
EXPLAIN FORMATTED
+ Spark UI(SQL/Stages/Executors)
Comments