一、为什么 Join 经常最慢

  • 高概率触发 Shuffle:按 Join Key 重分布 → 网络 I/O + 磁盘 I/O
  • 还要排序/建哈希表:SMJ 需要排序;BHJ/SHJ 需构建哈希表并探测。
  • 容易长尾:某些分区数据过大(数据倾斜),拖慢整体作业。

一个容易懂的比喻:

  • BHJ:把小字典复印发到每个教室(Executor),大班同学(大表)就近查字典。
  • SMJ:两队人先各自按学号排序,再按序对齐比对
  • SHJ:两边人按学号分到对应教室,每个教室建哈希表再对齐。

二、三大等值 Join 策略

1) Broadcast Hash Join(BHJ)

原理

  • 小表广播到每个 Executor,本地构建哈希表;大表分片逐分区探测 → 通常无需对大表重分布

触发

  • 自动:spark.sql.autoBroadcastJoinThreshold 以内(字节)倾向广播。
  • 强制:SQL /*+ BROADCAST(dim) */;DataFrame broadcast(dim)df.hint("broadcast")

适用

  • 明显“小”的维表 + “大”的事实表;希望绕开 Shuffle缓解倾斜

优点

  • 常常最快;对轻度倾斜更友好。

易错

  • 广播对象过大 → OOM;Join 条件不是等值或含 UDF → 可能无法走 BHJ;同一管道多次广播 → 内存浪费。

2) Sort‑Merge Join(SMJ)

原理

  • 两侧按 Key 排序,再线性合并

触发

  • 等值 Join;引擎常偏好 SMJ(可配)。
  • 强制:SQL /*+ MERGE(t) */spark.sql.join.preferSortMergeJoin=true

适用

  • 双大表;需要可落盘、稳定的策略;与 bucketing/sort 配合更佳。

优点

  • 稳定、可扩展;对内存敏感度低于 Hash

易错

  • 排序与 Shuffle 成本高;并行度不足时长尾明显。

3) Shuffle Hash Join(SHJ)

原理

  • 两边按 Key 重分区到同一分区,在分区内构建哈希表并探测

触发

  • 等值 Join;可用 /*+ SHUFFLE_HASH(t) */ 偏向该策略。

适用

  • 中等规模、分区内可容纳哈希表;排序成本不划算的场景。

优点

  • 不需要排序;内存合适时性能好。

易错

  • 分区内一侧过大 → Hash 表 OOM/频繁 spill;对倾斜敏感。

三、引擎如何选(统计信息 + 阈值 + 偏好 + Hint)

  • 统计信息:表大小/行数/基数 → 决定是否能广播、是否需要排序等。
  • 阈值spark.sql.autoBroadcastJoinThreshold(单位字节,建议显式设置)。
  • 偏好spark.sql.join.preferSortMergeJoin(是否偏向 SMJ)。
  • HintsBROADCAST / MERGE / SHUFFLE_HASH 明确意图。
快速建议:
能 BHJ 就 BHJ双大表用 SMJ中等规模试 SHJ;始终用 EXPLAIN FORMATTED + Spark UI 验证。

四、AQE(Adaptive Query Execution)三板斧

开关:spark.sql.adaptive.enabled=true

1) 动态广播

  • 运行时发现某侧变小(落入阈值) → 自动切 BHJ

2) 倾斜 Join 拆分

  • 识别异常大分区 → 切分成多个子分区并行处理,缩短长尾。

3) 动态合并 Shuffle 分区

  • 上游输出很小 → 合并下游小分区,降低调度开销。
观察:最终物理计划(AQE Final Plan)、Shuffle 分区数变化、长尾是否消失。

五、Bucketing / Sort:用“数据布局”减 Shuffle 成本

  • 分桶(bucketing):事实表/维表对 Join Key 同桶、同桶数 → Join 时减少/跳过重分区
  • 排序:配合 SMJ 可复用已排序,降低排序成本。
  • 注意:桶数一致、Key 完全一致、保留元数据;实际收益以 EXPLAIN + UI 实测为准。

六、非等值 Join 与过滤下推

  • 非等值(>, <, BETWEEN, !=)常无法走 BHJ/SMJ/SHJ,容易退化 Nested Loop
  • 优化思路:
    • 先过滤后 Join:将高选择性谓词前置。
    • 离散化/分桶映射:把范围映射为离散键,近似等值。
    • 拆解:等值部分先 Join,再对结果过滤。

七、从 0 到 1:可跑实验(PySpark)

建议先关闭 AQE 观察“原始策略”,再打开 AQE 观察“自适应改写”。

1) 准备环境与数据

from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.appName("join-lab").getOrCreate()

# 关闭 AQE(第一轮实验)
spark.conf.set("spark.sql.adaptive.enabled", "false")

# 事实表(大)
fact = (spark.range(5_000_000)
    .select((F.col("id") % 100_000).alias("user_id"),
            (F.col("id") % 100).cast("double").alias("amount")))

# 维表(小~中)
dim = (spark.range(100_000)
    .select(F.col("id").cast("int").alias("user_id"),
            (F.col("id") % 5).alias("tier")))

2) 三策略最小闭环

# 2.1 BHJ:强制广播维表
from pyspark.sql.functions import broadcast
res_bhj = fact.join(broadcast(dim), "user_id")
res_bhj.explain("formatted")

# 2.2 SMJ:禁止广播,偏好 Merge
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
res_smj = fact.join(dim.hint("merge"), "user_id")
res_smj.explain("formatted")

# 2.3 SHJ:禁止广播,偏好 Hash
spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
res_shj = fact.join(dim.hint("shuffle_hash"), "user_id")
res_shj.explain("formatted")
观察项:EXPLAIN FORMATTED 中的物理算子应分别为
BroadcastHashJoinExec / SortMergeJoinExec / ShuffledHashJoinExec
Spark UI:Shuffle Read/Write、Peak Memory、任务数、最长 Task

3) 开启 AQE 再跑

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 20*1024*1024)  # 例:20MB
res_aqe = fact.join(dim, "user_id")
res_aqe.explain("formatted")  # 观察是否动态广播与分区合并

4) 制造倾斜并治理(可选)

# 10% 数据聚到 user_id = 0,制造长尾
skew_fact = fact.withColumn("user_id",
    F.when((F.rand() < 0.10), F.lit(0)).otherwise(F.col("user_id"))
)

# A) 调分区
spark.conf.set("spark.sql.shuffle.partitions", "400")

# B) AQE 倾斜
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
res_skew_aqe = skew_fact.join(dim, "user_id")

# C) 手写 salting(示意)
saltN = 8
dim_salted = dim.withColumn("salt", (F.rand()*saltN).cast("int"))
skew_fact_salted = skew_fact.withColumn("salt", (F.rand()*saltN).cast("int"))
res_salt = skew_fact_salted.join(dim_salted,
    on=[skew_fact_salted.user_id==dim_salted.user_id,
        skew_fact_salted.salt==dim_salted.salt],
    how="inner")

八、倾斜与大小表组合策略

  • 优先 BHJ:大表 + 小表 → 广播小表,绕开重分布。
  • AQE Skew:自动识别并拆分大分区。
  • Salting:对热点 Key 加随机前缀打散,后续再聚合还原。
  • 预聚合/预过滤:减少 Join 前输入量。
  • 合理并行度spark.sql.shuffle.partitions 与集群资源匹配。

九、常见坑清单(面试高频)

  1. Join Key 类型不一致(int vs string)→ 隐式 cast、计划变差。
  2. Join 条件含 UDF/非确定函数 → 等值优化失效。
  3. NULL 语义:需要用 <=> 做 null‑safe 等值比较。
  4. 无用列 select * → 宽表传输、网络放大。
  5. 阈值默认随版本不同 → 显式设置并 EXPLAIN
  6. 无统计信息 → 代价估计失灵(记得 ANALYZE)。
  7. SMJ 并行度不足 → 排序长尾;提升 shuffle.partitions
  8. SHJ 分区内基数过大 → Hash 表 OOM;换 BHJ/SMJ 或预聚合。
  9. 广播对象太大 → OOM;先投影必需列后广播。
  10. “只增分区就能治倾斜” → 可能把长尾拆成多个小长尾,要用 AQE/Salting。

十、参数与 Hint 速查(关键词)

作用
spark.sql.autoBroadcastJoinThreshold广播阈值(字节)
spark.sql.join.preferSortMergeJoin偏好 SMJ
spark.sql.adaptive.enabled开启 AQE
spark.sql.adaptive.skewJoin.enabled倾斜 Join
spark.sql.shuffle.partitions下游并行度
/*+ BROADCAST(t) */强制广播
/*+ MERGE(t) */偏向 SMJ
/*+ SHUFFLE_HASH(t) */偏向 SHJ
表格只放关键词,不写长句,避免干扰排版。

十一、课堂小测(5 分钟)【在最后】

  • 选择:5MB 维表 + 100GB 事实表做等值 Join,首选策略?
  • 问答:为何开启 AQE 后计划可能从 SMJ 变成 BHJ?
  • 动手autoBroadcastJoinThreshold=-1preferSortMergeJoin=true 时,如何强制 SHJ?

十二、课后作业

  1. 三策略 A/B 测试:分别跑 BHJ/SMJ/SHJ,记录运行时间、Shuffle 读写、任务数、最长 Task,写出对比结论。
  2. AQE 提升报告:开启 AQE + Skew Join,复跑倾斜场景,截图“最终物理计划”和 UI 指标,描述提升点。
  3. Bucketing 尝鲜:对同一 Key 写两张分桶表,Join 对比是否减少 Exchange/Sort,并记录限制条件。

十三、Scala 附:Hints 与 EXPLAIN 快速参考

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val spark = SparkSession.builder.getOrCreate()
import spark.implicits._

val fact = spark.range(5_000_000)
  .select(($"id" % 100000).as("user_id"), ($"id" % 100).cast("double").as("amount"))
val dim = spark.range(100000)
  .select($"id".cast("int").as("user_id"), ($"id" % 5).as("tier"))

// BHJ
val bhj = fact.join(broadcast(dim), Seq("user_id"))
bhj.explain("formatted")

// SMJ
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
val smj = fact.join(dim.hint("merge"), Seq("user_id"))
smj.explain("formatted")

// SHJ
spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
val shj = fact.join(dim.hint("shuffle_hash"), Seq("user_id"))
shj.explain("formatted")

课堂小测(5 分钟)【答案】

一、选择题

题干:5MB 维表 + 100GB 事实表做等值 Join,首选策略?

标准答案

BHJ(Broadcast Hash Join)

讲解要点

  • 绕开大表 Shuffle:广播 5MB 小表到每个 Executor,本地哈希探测大表。
  • 阈值命中:5MB 通常低于常用广播阈值(建议显式设置阈值,避免版本差异)。
  • 排除条件:非等值UDF 参与条件内存紧张(可精简列后再广播,或改 SMJ)。

截图位点(占位)

  • ![SQL DAG|BHJ 无大表 Exchange](/images/spark/quiz/bhj-dag.png)
  • ![Stages|Shuffle Read/Write 低](/images/spark/quiz/bhj-stages.png)

二、问答题

题干:为何开启 AQE 后计划可能从 SMJ 变成 BHJ

标准答案

  • 运行时统计变小:上游过滤/裁剪后,一侧真实数据量落入广播阈值,AQE 动态改写为 BHJ
  • 代价更优:BHJ 省去排序与大表重分布,运行时代价模型判定更快。
  • 联动优化:AQE 合并小分区本地读取等,进一步降低数据量与数据移动。

截图位点(占位)

  • ![SQL Final Plan|SMJ→BHJ](/images/spark/quiz/aqe-final-plan-bhj.png)
  • ![Stages|分区数合并前后对比](/images/spark/quiz/aqe-coalesce.png)

三、动手题

题干autoBroadcastJoinThreshold=-1preferSortMergeJoin=true 时,如何强制 SHJ

目标:禁自动广播、偏好 SMJ 的前提下,仍让引擎采用 SHJ
提示:Hint 优先,但需满足等值 Join等前提;为避免计划被改写,建议本题关闭 AQE 进行验证。

A. 配置前置(建议)

# PySpark
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1"
)
spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.adaptive.enabled", "false") # 避免 AQE 改写

B. SQL 解(强制 SHJ)

SELECT /*+ SHUFFLE_HASH(f), SHUFFLE_HASH(d) */
f.user_id, f.amount, d.tier
FROM fact f
JOIN dim d
ON f.user_id = d.user_id;
-- 前置:
-- SET spark.sql.autoBroadcastJoinThreshold = -1;
-- SET spark.sql.join.preferSortMergeJoin = true;
-- 可选: SET spark.sql.adaptive.enabled = false;

C. PySpark 解(DataFrame Hint)

# 假定 fact(user_id, amount), dim(user_id, tier) 已就绪(见长文的造数代码)
res_shj = fact.hint("shuffle_hash").join(dim.hint("shuffle_hash"), "user_id"
)
res_shj.explain("formatted") # 期望: ShuffledHashJoinExec

D. 计划校验(简易自动校验,可选)

import io, sys
buf = io.StringIO()
old = sys.stdout
sys.stdout = buf
res_shj.explain("formatted")
sys.stdout = old
plan = buf.getvalue()
print("Plan snippet:\n", "\n".join(plan.splitlines()[:30]))
assert "ShuffledHashJoin" in plan, "未命中 SHJ,请检查 hint/前置条件"

E. 截图位点(占位)

  • ![SQL Plan|ShuffledHashJoinExec 关键行](/images/spark/quiz/shj-plan.png)
  • ![Stages|存在 Exchange 但无 Sort 算子](/images/spark/quiz/shj-stages.png)

F. 常见误区(强制 SHJ 失败原因)

  • 非等值 Join / 条件含 UDF → 无法用 SHJ。
  • 单分区基数过大 → Hash 表 OOM 或引擎保护性退回 SMJ。
  • AQE 开着时又触发了动态广播 → 计划被改回 BHJ;本题请先关 AQE 验证。

四、度量记录模板

表格只放关键词/数字,避免长句。

实验环境

Spark3.x
模式Local/Cluster
核心/内存8C/16G
存储SSD

三策略对比

指标BHJSMJSHJ
用时(s)
Shuffle Read(GB)
Shuffle Write(GB)
Max Task(s)
Peak Mem(GB)

AQE 前后

指标关闭开启
用时(s)
分区数
是否动态广播否/是否/是

五、提交规范(作业区)

  • 选择题:1–2 句解释+一张 DAG 截图(BHJ 无大表 Exchange)。
  • 问答题:Final Plan 截图(SMJ→BHJ)+ 1–2 句原因。
  • 动手题EXPLAIN FORMATTED 片段(含 ShuffledHashJoinExec)+ Stages 截图(无 Sort 算子)+ 失败复盘(若未命中)。

六、速查清单(上机前看一眼)

  • 小表 + 大表(等值)→ BHJ
  • 双大表 → SMJ
  • 中等规模、排序不划算 → SHJ
  • AQE:动态广播 / 倾斜拆分 / 合并分区
  • HintsBROADCAST / MERGE / SHUFFLE_HASH
  • 必查EXPLAIN FORMATTED + Spark UI(SQL/Stages/Executors)