问题现象

做数仓的同学肯定都遇到过这种情况:

-- 这个SQL能跑死人,2小时还在执行
SELECT count(distinct user_id) FROM user_behavior_log 
WHERE dt = '2024-01-01';

-- 但这个1分钟就出结果
SELECT count(*) FROM user_behavior_log 
WHERE dt = '2024-01-01';

-- 更离谱的是,数据量一大就直接OOM
SELECT count(distinct user_id) FROM user_behavior_log 
WHERE dt >= '2024-01-01' AND dt <= '2024-01-31';

同样是COUNT,为什么加个DISTINCT性能就崩了?

深度剖析:MapReduce执行机制

1. 普通count(*)的执行流程

数据分布:
HDFS Block1: [row1, row2, row3, row4]
HDFS Block2: [row5, row6, row7, row8]  
HDFS Block3: [row9, row10, row11, row12]

Map阶段:
Mapper1 处理 Block1 → 输出 <"count", 4>
Mapper2 处理 Block2 → 输出 <"count", 4>
Mapper3 处理 Block3 → 输出 <"count", 4>

Shuffle阶段:
所有<"count", value>发送到Reducer

Reduce阶段:
Reducer收到: <"count", [4, 4, 4]>
执行Sum操作: 4 + 4 + 4 = 12

关键特点:

  • Map阶段可以预聚合,输出数据量极小
  • 只需要1个Reducer做最终汇总
  • Shuffle传输的数据量很少

2. count(distinct)的执行流程

原始数据:
Block1: [user1, user2, user1, user3]
Block2: [user2, user4, user1, user5]
Block3: [user3, user5, user6, user1]

Map阶段:
Mapper1 → 输出: <user1, null>, <user2, null>, <user1, null>, <user3, null>
Mapper2 → 输出: <user2, null>, <user4, null>, <user1, null>, <user5, null>
Mapper3 → 输出: <user3, null>, <user5, null>, <user6, null>, <user1, null>

Shuffle阶段:
所有distinct的key必须发送到同一个Reducer:
Reducer收到: <user1, [null,null,null,null]>
             <user2, [null,null]>
             <user3, [null,null]>
             <user4, [null]>
             <user5, [null,null]>
             <user6, [null]>

Reduce阶段:
只有1个Reducer处理所有数据
对每个key去重,最终count = 6

性能瓶颈分析:

  1. Map阶段无法预聚合
    • 必须输出所有原始数据
    • 输出数据量 = 输入数据量
  2. Shuffle阶段数据传输巨大
    • 10亿条数据 → 10亿条记录需要传输
    • 网络带宽成为瓶颈
  3. Reduce阶段单点处理
    • 所有数据汇聚到1个Reducer
    • 内存压力巨大,容易OOM

3. 为什么不能用多个Reducer?

很多人会问:为什么不能用多个Reducer并行处理?

错误假设:

假设用2个Reducer处理用户去重:

数据分片:
Reducer1处理: [user1, user2, user1, user3, user2]
Reducer2处理: [user2, user4, user1, user5, user3]

各自去重:
Reducer1得到: {user1, user2, user3} → count = 3
Reducer2得到: {user2, user4, user1, user5, user3} → count = 5

最终结果: 3 + 5 = 8 ❌

正确答案应该是: {user1, user2, user3, user4, user5} → count = 5

根本问题:

  • 分布式去重违背了去重的全局性要求
  • 每个Reducer只能看到部分数据,无法判断全局唯一性
  • Hash分片会导致同一个user_id分散到不同Reducer

详细优化方案

方案1:两阶段聚合(推荐)

实现原理: 将count(distinct)转换为两个阶段:

  1. 第一阶段:GROUP BY去重
  2. 第二阶段:COUNT行数
-- 原始慢SQL
SELECT count(distinct user_id) 
FROM user_behavior_log 
WHERE dt = '2024-01-01';

-- 优化后(方法1)
SELECT count(*) FROM (
    SELECT user_id 
    FROM user_behavior_log 
    WHERE dt = '2024-01-01'
    GROUP BY user_id
) tmp;

-- 优化后(方法2,更明确)
WITH distinct_users AS (
    SELECT distinct user_id 
    FROM user_behavior_log 
    WHERE dt = '2024-01-01'
)
SELECT count(*) FROM distinct_users;

执行流程详解:

第一阶段 - GROUP BY user_id:
Map阶段:
Mapper1: [user1,user2,user1] → <user1,null>, <user2,null>, <user1,null>
Mapper2: [user2,user3,user1] → <user2,null>, <user3,null>, <user1,null>

Shuffle阶段:
按user_id hash分发到不同Reducer:
Reducer1收到: user1的所有记录
Reducer2收到: user2的所有记录  
Reducer3收到: user3的所有记录

Reduce阶段:
Reducer1输出: <user1, null>(去重后只有1条)
Reducer2输出: <user2, null>
Reducer3输出: <user3, null>

第二阶段 - COUNT(*):
Map阶段:读取第一阶段结果,每行输出 <"count", 1>
Reduce阶段:Sum所有1得到最终结果

性能提升原理:

  • 第一阶段可以并行去重,避免单点瓶颈
  • 第二阶段只需要统计行数,数据量很小
  • 总体时间复杂度从O(n)变为O(n/k),k为并行度

方案2:HyperLogLog近似算法

算法原理: HyperLogLog是一种概率数据结构,用极小的内存空间估算集合基数。

-- Hive内置函数
SELECT approx_count_distinct(user_id) 
FROM user_behavior_log 
WHERE dt = '2024-01-01';

-- 可以指定精度参数
SELECT approx_count_distinct(user_id, 0.01) 
FROM user_behavior_log 
WHERE dt = '2024-01-01';

技术细节:

HyperLogLog工作原理:
1. 对每个user_id计算hash值
2. 观察hash值的二进制前导零个数
3. 用少量registers记录最大前导零个数
4. 基于概率论估算总体基数

内存使用:
- 标准HLL只需要1.5KB内存
- 精度可达2%误差率
- 无论数据量多大,内存占用固定

适用场景:

  • 实时UV统计
  • 大数据量快速估算
  • 对精确度要求不高的场景

注意事项:

-- 错误用法:在有限集合上使用
SELECT approx_count_distinct(gender) FROM users; 
-- gender只有male/female,用HLL浪费

-- 正确用法:大基数估算
SELECT approx_count_distinct(user_id) FROM billion_records;

方案3:分桶预聚合

建表策略:

-- 创建分桶表
CREATE TABLE user_behavior_log_bucket (
    user_id string,
    event_name string,
    event_time bigint,
    dt string
) 
CLUSTERED BY (user_id) INTO 256 BUCKETS
PARTITIONED BY (dt string)
STORED AS ORC;

-- 数据导入时自动分桶
INSERT INTO user_behavior_log_bucket PARTITION(dt='2024-01-01')
SELECT user_id, event_name, event_time, '2024-01-01' as dt
FROM user_behavior_log_raw 
WHERE dt = '2024-01-01';

查询优化:

-- 方案3.1:桶级别并行
SELECT sum(bucket_distinct_count) FROM (
    SELECT count(distinct user_id) as bucket_distinct_count
    FROM user_behavior_log_bucket 
    WHERE dt = '2024-01-01'
    GROUP BY hash(user_id) % 32  -- 32个并行任务
) tmp;

-- 方案3.2:采样估算
SELECT count(distinct user_id) * 100  -- 1%采样,结果*100
FROM user_behavior_log_bucket 
WHERE dt = '2024-01-01' 
AND hash(user_id) % 100 = 0;  -- 1%采样

分桶原理:

  • 相同user_id总是落在同一个bucket
  • 可以并行处理多个bucket
  • 最后合并各bucket的去重结果

方案4:Bitmap精确去重

适用于ID范围有限的场景:

-- 假设user_id是连续整数,范围[1, 10000000]
-- 可以用bitmap精确记录每个ID是否出现

-- 方案4.1:Hive Bitmap函数
SELECT bitmap_count(
    bitmap_or_agg(to_bitmap(cast(user_id as bigint)))
) 
FROM user_behavior_log 
WHERE dt = '2024-01-01';

-- 方案4.2:手动实现
WITH user_bitmap AS (
    SELECT collect_set(cast(user_id as int)) as user_set
    FROM user_behavior_log 
    WHERE dt = '2024-01-01'
)
SELECT size(user_set) FROM user_bitmap;

Bitmap优势:

  • 内存效率高:1000万用户只需1.25MB
  • 支持交集、并集等复杂运算
  • 计算速度快

适用条件:

  • ID必须是整数
  • ID范围不能太大(<1亿)
  • 稀疏度不能太高

方案5:维表关联优化

**场景:**用户维表相对较小,可以广播优化

-- 原始SQL:大表自关联去重
SELECT count(distinct user_id) 
FROM user_behavior_log 
WHERE dt = '2024-01-01';

-- 优化:先关联用户维表过滤
SELECT count(distinct u.user_id) 
FROM user_behavior_log l
JOIN user_dim u ON l.user_id = u.user_id
WHERE l.dt = '2024-01-01'
AND u.status = 'active';  -- 只统计活跃用户

-- 进一步优化:广播小表
SELECT /*+ MAPJOIN(u) */ count(distinct u.user_id) 
FROM user_behavior_log l
JOIN user_dim u ON l.user_id = u.user_id
WHERE l.dt = '2024-01-01';

实际生产案例

案例1:某电商平台UV统计

业务背景:

  • 日均10亿条行为数据
  • 需要统计日活跃用户数
  • 要求准确率99.9%以上

优化前:

SELECT count(distinct user_id) 
FROM user_behavior_log 
WHERE dt = '2024-01-01';

执行结果:
- 执行时间:3小时45分钟
- 资源消耗:512个vcore,2TB内存
- 成功率:60%(经常OOM失败)

优化后:

-- 两阶段聚合
SELECT count(*) FROM (
    SELECT user_id 
    FROM user_behavior_log 
    WHERE dt = '2024-01-01'
    GROUP BY user_id
) tmp;

执行结果:
- 执行时间:12分钟
- 资源消耗:256个vcore,512GB内存
- 成功率:100%
- 准确率:100%

性能提升:

  • 执行时间提升:18.75倍
  • 资源消耗减少:75%
  • 稳定性大幅提升

案例2:某视频平台实时UV

业务背景:

  • 需要5分钟级别的实时UV统计
  • 可以接受2-3%的误差
  • 峰值QPS达到100万

解决方案:

-- 使用HyperLogLog
SELECT approx_count_distinct(user_id, 0.02) 
FROM user_behavior_log 
WHERE event_time >= unix_timestamp() - 300;  -- 最近5分钟

配置优化:
- 设置mapreduce.job.reduces=1  -- HLL不需要多个reducer
- 设置hive.exec.parallel=true  -- 启用并行执行
- 设置tez.runtime.io.sort.mb=512  -- 增加sort缓冲区

效果对比:

指标 count(distinct) approx_count_distinct
执行时间 45分钟 3分钟
内存消耗 8GB 50MB
准确率 100% 97.8%
稳定性 差(频繁OOM)

调优参数详解

Hive参数优化

-- 启用CBO优化器
SET hive.cbo.enable=true;
SET hive.compute.query.using.stats=true;

-- 优化distinct聚合
SET hive.map.aggr=true;  -- 启用map端聚合
SET hive.groupby.skewindata=true;  -- 处理数据倾斜
SET hive.groupby.mapaggr.checkinterval=100000;  -- map聚合检查间隔

-- 内存优化
SET mapreduce.map.memory.mb=4096;
SET mapreduce.reduce.memory.mb=8192;
SET mapreduce.map.java.opts=-Xmx3276m;
SET mapreduce.reduce.java.opts=-Xmx6553m;

-- 针对distinct的特殊优化
SET hive.optimize.distinct.rewrite=true;  -- 自动重写distinct查询

Spark参数优化

# Spark SQL配置
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

# 针对distinct优化
spark.conf.set("spark.sql.optimizer.distinctBeforeIntersect.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

监控和排查

1. 识别count(distinct)性能问题

关键指标监控:

-- 查看正在运行的慢查询
SHOW PROCESSLIST;

-- 分析查询计划
EXPLAIN EXTENDED 
SELECT count(distinct user_id) FROM user_behavior_log;

-- 查看资源使用情况
SHOW CLUSTER RESOURCE USAGE;

典型症状:

  • Reduce阶段只有1个task在运行
  • 其他Reduce tasks都是FINISHED状态
  • 内存使用率接近100%
  • 网络IO异常高

2. 排查工具

Yarn资源监控:

# 查看应用详情
yarn application -status application_123456_0001

# 查看Container日志
yarn logs -applicationId application_123456_0001

# 实时监控资源使用
yarn top

Hive查询分析:

-- 查看查询统计信息
ANALYZE TABLE user_behavior_log COMPUTE STATISTICS;

-- 查看列统计信息  
ANALYZE TABLE user_behavior_log COMPUTE STATISTICS FOR COLUMNS user_id;

-- 查看基数估算
SHOW TABLE EXTENDED LIKE 'user_behavior_log';

面试必考要点

1. 为什么count(distinct)慢?

标准答案模板: "count(distinct)慢的根本原因是MapReduce框架的限制。具体来说:

执行机制问题:

  • Map阶段无法预聚合,必须输出所有原始数据
  • Shuffle阶段需要传输全量数据到单个Reducer
  • Reduce阶段只能用1个Reducer处理,形成性能瓶颈

资源瓶颈:

  • 网络带宽:大量数据shuffle传输
  • 内存压力:单节点需要存储所有distinct值
  • 计算瓶颈:单点处理无法利用集群并行能力

数据倾斜: 如果某些值出现频率很高,会进一步加剧单点压力。"

2. 有哪些优化方案?

回答要点:

  1. 两阶段聚合(必须掌握)
    • 原理:先GROUP BY去重,再COUNT行数
    • 优势:充分利用并行计算
    • 适用:精确计算场景
  2. 近似算法
    • HyperLogLog:2%误差,固定内存
    • 适用:实时计算、大数据量估算
  3. 分桶预聚合
    • 建表时按关键字段分桶
    • 查询时桶级别并行处理
  4. 业务优化
    • 采样统计
    • 维表过滤
    • 时间窗口分割

3. 生产环境最佳实践

关键原则:

  • 避免在大表上直接使用count(distinct)
  • 根据业务需求选择精确vs近似算法
  • 合理设置并行度和资源配置
  • 建立监控告警机制

具体建议:

-- 大表(>1亿行)推荐
SELECT count(*) FROM (SELECT user_id FROM big_table GROUP BY user_id) t;

-- 实时场景推荐  
SELECT approx_count_distinct(user_id) FROM stream_table;

-- 小表(<1000万行)可以直接用
SELECT count(distinct user_id) FROM small_table;

记住关键点:大数据场景下,任何导致单点处理的操作都是性能杀手!