问题现象
做数仓的同学肯定都遇到过这种情况:
-- 这个SQL能跑死人,2小时还在执行
SELECT count(distinct user_id) FROM user_behavior_log
WHERE dt = '2024-01-01';
-- 但这个1分钟就出结果
SELECT count(*) FROM user_behavior_log
WHERE dt = '2024-01-01';
-- 更离谱的是,数据量一大就直接OOM
SELECT count(distinct user_id) FROM user_behavior_log
WHERE dt >= '2024-01-01' AND dt <= '2024-01-31';
同样是COUNT,为什么加个DISTINCT性能就崩了?
深度剖析:MapReduce执行机制
1. 普通count(*)的执行流程
数据分布:
HDFS Block1: [row1, row2, row3, row4]
HDFS Block2: [row5, row6, row7, row8]
HDFS Block3: [row9, row10, row11, row12]
Map阶段:
Mapper1 处理 Block1 → 输出 <"count", 4>
Mapper2 处理 Block2 → 输出 <"count", 4>
Mapper3 处理 Block3 → 输出 <"count", 4>
Shuffle阶段:
所有<"count", value>发送到Reducer
Reduce阶段:
Reducer收到: <"count", [4, 4, 4]>
执行Sum操作: 4 + 4 + 4 = 12
关键特点:
- Map阶段可以预聚合,输出数据量极小
- 只需要1个Reducer做最终汇总
- Shuffle传输的数据量很少
2. count(distinct)的执行流程
原始数据:
Block1: [user1, user2, user1, user3]
Block2: [user2, user4, user1, user5]
Block3: [user3, user5, user6, user1]
Map阶段:
Mapper1 → 输出: <user1, null>, <user2, null>, <user1, null>, <user3, null>
Mapper2 → 输出: <user2, null>, <user4, null>, <user1, null>, <user5, null>
Mapper3 → 输出: <user3, null>, <user5, null>, <user6, null>, <user1, null>
Shuffle阶段:
所有distinct的key必须发送到同一个Reducer:
Reducer收到: <user1, [null,null,null,null]>
<user2, [null,null]>
<user3, [null,null]>
<user4, [null]>
<user5, [null,null]>
<user6, [null]>
Reduce阶段:
只有1个Reducer处理所有数据
对每个key去重,最终count = 6
性能瓶颈分析:
- Map阶段无法预聚合
- 必须输出所有原始数据
- 输出数据量 = 输入数据量
- Shuffle阶段数据传输巨大
- 10亿条数据 → 10亿条记录需要传输
- 网络带宽成为瓶颈
- Reduce阶段单点处理
- 所有数据汇聚到1个Reducer
- 内存压力巨大,容易OOM
3. 为什么不能用多个Reducer?
很多人会问:为什么不能用多个Reducer并行处理?
错误假设:
假设用2个Reducer处理用户去重:
数据分片:
Reducer1处理: [user1, user2, user1, user3, user2]
Reducer2处理: [user2, user4, user1, user5, user3]
各自去重:
Reducer1得到: {user1, user2, user3} → count = 3
Reducer2得到: {user2, user4, user1, user5, user3} → count = 5
最终结果: 3 + 5 = 8 ❌
正确答案应该是: {user1, user2, user3, user4, user5} → count = 5
根本问题:
- 分布式去重违背了去重的全局性要求
- 每个Reducer只能看到部分数据,无法判断全局唯一性
- Hash分片会导致同一个user_id分散到不同Reducer
详细优化方案
方案1:两阶段聚合(推荐)
实现原理: 将count(distinct)转换为两个阶段:
- 第一阶段:GROUP BY去重
- 第二阶段:COUNT行数
-- 原始慢SQL
SELECT count(distinct user_id)
FROM user_behavior_log
WHERE dt = '2024-01-01';
-- 优化后(方法1)
SELECT count(*) FROM (
SELECT user_id
FROM user_behavior_log
WHERE dt = '2024-01-01'
GROUP BY user_id
) tmp;
-- 优化后(方法2,更明确)
WITH distinct_users AS (
SELECT distinct user_id
FROM user_behavior_log
WHERE dt = '2024-01-01'
)
SELECT count(*) FROM distinct_users;
执行流程详解:
第一阶段 - GROUP BY user_id:
Map阶段:
Mapper1: [user1,user2,user1] → <user1,null>, <user2,null>, <user1,null>
Mapper2: [user2,user3,user1] → <user2,null>, <user3,null>, <user1,null>
Shuffle阶段:
按user_id hash分发到不同Reducer:
Reducer1收到: user1的所有记录
Reducer2收到: user2的所有记录
Reducer3收到: user3的所有记录
Reduce阶段:
Reducer1输出: <user1, null>(去重后只有1条)
Reducer2输出: <user2, null>
Reducer3输出: <user3, null>
第二阶段 - COUNT(*):
Map阶段:读取第一阶段结果,每行输出 <"count", 1>
Reduce阶段:Sum所有1得到最终结果
性能提升原理:
- 第一阶段可以并行去重,避免单点瓶颈
- 第二阶段只需要统计行数,数据量很小
- 总体时间复杂度从O(n)变为O(n/k),k为并行度
方案2:HyperLogLog近似算法
算法原理: HyperLogLog是一种概率数据结构,用极小的内存空间估算集合基数。
-- Hive内置函数
SELECT approx_count_distinct(user_id)
FROM user_behavior_log
WHERE dt = '2024-01-01';
-- 可以指定精度参数
SELECT approx_count_distinct(user_id, 0.01)
FROM user_behavior_log
WHERE dt = '2024-01-01';
技术细节:
HyperLogLog工作原理:
1. 对每个user_id计算hash值
2. 观察hash值的二进制前导零个数
3. 用少量registers记录最大前导零个数
4. 基于概率论估算总体基数
内存使用:
- 标准HLL只需要1.5KB内存
- 精度可达2%误差率
- 无论数据量多大,内存占用固定
适用场景:
- 实时UV统计
- 大数据量快速估算
- 对精确度要求不高的场景
注意事项:
-- 错误用法:在有限集合上使用
SELECT approx_count_distinct(gender) FROM users;
-- gender只有male/female,用HLL浪费
-- 正确用法:大基数估算
SELECT approx_count_distinct(user_id) FROM billion_records;
方案3:分桶预聚合
建表策略:
-- 创建分桶表
CREATE TABLE user_behavior_log_bucket (
user_id string,
event_name string,
event_time bigint,
dt string
)
CLUSTERED BY (user_id) INTO 256 BUCKETS
PARTITIONED BY (dt string)
STORED AS ORC;
-- 数据导入时自动分桶
INSERT INTO user_behavior_log_bucket PARTITION(dt='2024-01-01')
SELECT user_id, event_name, event_time, '2024-01-01' as dt
FROM user_behavior_log_raw
WHERE dt = '2024-01-01';
查询优化:
-- 方案3.1:桶级别并行
SELECT sum(bucket_distinct_count) FROM (
SELECT count(distinct user_id) as bucket_distinct_count
FROM user_behavior_log_bucket
WHERE dt = '2024-01-01'
GROUP BY hash(user_id) % 32 -- 32个并行任务
) tmp;
-- 方案3.2:采样估算
SELECT count(distinct user_id) * 100 -- 1%采样,结果*100
FROM user_behavior_log_bucket
WHERE dt = '2024-01-01'
AND hash(user_id) % 100 = 0; -- 1%采样
分桶原理:
- 相同user_id总是落在同一个bucket
- 可以并行处理多个bucket
- 最后合并各bucket的去重结果
方案4:Bitmap精确去重
适用于ID范围有限的场景:
-- 假设user_id是连续整数,范围[1, 10000000]
-- 可以用bitmap精确记录每个ID是否出现
-- 方案4.1:Hive Bitmap函数
SELECT bitmap_count(
bitmap_or_agg(to_bitmap(cast(user_id as bigint)))
)
FROM user_behavior_log
WHERE dt = '2024-01-01';
-- 方案4.2:手动实现
WITH user_bitmap AS (
SELECT collect_set(cast(user_id as int)) as user_set
FROM user_behavior_log
WHERE dt = '2024-01-01'
)
SELECT size(user_set) FROM user_bitmap;
Bitmap优势:
- 内存效率高:1000万用户只需1.25MB
- 支持交集、并集等复杂运算
- 计算速度快
适用条件:
- ID必须是整数
- ID范围不能太大(<1亿)
- 稀疏度不能太高
方案5:维表关联优化
**场景:**用户维表相对较小,可以广播优化
-- 原始SQL:大表自关联去重
SELECT count(distinct user_id)
FROM user_behavior_log
WHERE dt = '2024-01-01';
-- 优化:先关联用户维表过滤
SELECT count(distinct u.user_id)
FROM user_behavior_log l
JOIN user_dim u ON l.user_id = u.user_id
WHERE l.dt = '2024-01-01'
AND u.status = 'active'; -- 只统计活跃用户
-- 进一步优化:广播小表
SELECT /*+ MAPJOIN(u) */ count(distinct u.user_id)
FROM user_behavior_log l
JOIN user_dim u ON l.user_id = u.user_id
WHERE l.dt = '2024-01-01';
实际生产案例
案例1:某电商平台UV统计
业务背景:
- 日均10亿条行为数据
- 需要统计日活跃用户数
- 要求准确率99.9%以上
优化前:
SELECT count(distinct user_id)
FROM user_behavior_log
WHERE dt = '2024-01-01';
执行结果:
- 执行时间:3小时45分钟
- 资源消耗:512个vcore,2TB内存
- 成功率:60%(经常OOM失败)
优化后:
-- 两阶段聚合
SELECT count(*) FROM (
SELECT user_id
FROM user_behavior_log
WHERE dt = '2024-01-01'
GROUP BY user_id
) tmp;
执行结果:
- 执行时间:12分钟
- 资源消耗:256个vcore,512GB内存
- 成功率:100%
- 准确率:100%
性能提升:
- 执行时间提升:18.75倍
- 资源消耗减少:75%
- 稳定性大幅提升
案例2:某视频平台实时UV
业务背景:
- 需要5分钟级别的实时UV统计
- 可以接受2-3%的误差
- 峰值QPS达到100万
解决方案:
-- 使用HyperLogLog
SELECT approx_count_distinct(user_id, 0.02)
FROM user_behavior_log
WHERE event_time >= unix_timestamp() - 300; -- 最近5分钟
配置优化:
- 设置mapreduce.job.reduces=1 -- HLL不需要多个reducer
- 设置hive.exec.parallel=true -- 启用并行执行
- 设置tez.runtime.io.sort.mb=512 -- 增加sort缓冲区
效果对比:
指标 | count(distinct) | approx_count_distinct |
---|---|---|
执行时间 | 45分钟 | 3分钟 |
内存消耗 | 8GB | 50MB |
准确率 | 100% | 97.8% |
稳定性 | 差(频繁OOM) | 好 |
调优参数详解
Hive参数优化
-- 启用CBO优化器
SET hive.cbo.enable=true;
SET hive.compute.query.using.stats=true;
-- 优化distinct聚合
SET hive.map.aggr=true; -- 启用map端聚合
SET hive.groupby.skewindata=true; -- 处理数据倾斜
SET hive.groupby.mapaggr.checkinterval=100000; -- map聚合检查间隔
-- 内存优化
SET mapreduce.map.memory.mb=4096;
SET mapreduce.reduce.memory.mb=8192;
SET mapreduce.map.java.opts=-Xmx3276m;
SET mapreduce.reduce.java.opts=-Xmx6553m;
-- 针对distinct的特殊优化
SET hive.optimize.distinct.rewrite=true; -- 自动重写distinct查询
Spark参数优化
# Spark SQL配置
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# 针对distinct优化
spark.conf.set("spark.sql.optimizer.distinctBeforeIntersect.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
监控和排查
1. 识别count(distinct)性能问题
关键指标监控:
-- 查看正在运行的慢查询
SHOW PROCESSLIST;
-- 分析查询计划
EXPLAIN EXTENDED
SELECT count(distinct user_id) FROM user_behavior_log;
-- 查看资源使用情况
SHOW CLUSTER RESOURCE USAGE;
典型症状:
- Reduce阶段只有1个task在运行
- 其他Reduce tasks都是FINISHED状态
- 内存使用率接近100%
- 网络IO异常高
2. 排查工具
Yarn资源监控:
# 查看应用详情
yarn application -status application_123456_0001
# 查看Container日志
yarn logs -applicationId application_123456_0001
# 实时监控资源使用
yarn top
Hive查询分析:
-- 查看查询统计信息
ANALYZE TABLE user_behavior_log COMPUTE STATISTICS;
-- 查看列统计信息
ANALYZE TABLE user_behavior_log COMPUTE STATISTICS FOR COLUMNS user_id;
-- 查看基数估算
SHOW TABLE EXTENDED LIKE 'user_behavior_log';
面试必考要点
1. 为什么count(distinct)慢?
标准答案模板: "count(distinct)慢的根本原因是MapReduce框架的限制。具体来说:
执行机制问题:
- Map阶段无法预聚合,必须输出所有原始数据
- Shuffle阶段需要传输全量数据到单个Reducer
- Reduce阶段只能用1个Reducer处理,形成性能瓶颈
资源瓶颈:
- 网络带宽:大量数据shuffle传输
- 内存压力:单节点需要存储所有distinct值
- 计算瓶颈:单点处理无法利用集群并行能力
数据倾斜: 如果某些值出现频率很高,会进一步加剧单点压力。"
2. 有哪些优化方案?
回答要点:
- 两阶段聚合(必须掌握)
- 原理:先GROUP BY去重,再COUNT行数
- 优势:充分利用并行计算
- 适用:精确计算场景
- 近似算法
- HyperLogLog:2%误差,固定内存
- 适用:实时计算、大数据量估算
- 分桶预聚合
- 建表时按关键字段分桶
- 查询时桶级别并行处理
- 业务优化
- 采样统计
- 维表过滤
- 时间窗口分割
3. 生产环境最佳实践
关键原则:
- 避免在大表上直接使用count(distinct)
- 根据业务需求选择精确vs近似算法
- 合理设置并行度和资源配置
- 建立监控告警机制
具体建议:
-- 大表(>1亿行)推荐
SELECT count(*) FROM (SELECT user_id FROM big_table GROUP BY user_id) t;
-- 实时场景推荐
SELECT approx_count_distinct(user_id) FROM stream_table;
-- 小表(<1000万行)可以直接用
SELECT count(distinct user_id) FROM small_table;
记住关键点:大数据场景下,任何导致单点处理的操作都是性能杀手!
Comments