- 完整数仓的搭建流程
- 一个spark任务2,3h执行,stage id 优化方案
- ods接入的交易系统数据质量差,通过dwd层进行过滤,怎么解决?
- 慢节点
- rss
- clickhouse和doris的区别、使用性能上的差异
- 怎么学习-看书,大数据之路这本书的收获
- mr中shuffle的优化手段
1. 完整数仓的搭建流程
考察知识点
数仓建设全生命周期管理(需求到运维)、分层架构设计逻辑、维度建模方法、工具链选型与协同、数据质量治理嵌入节点
参考回答
数仓搭建是围绕“业务价值落地”的系统性工程,需遵循“业务驱动、规范先行、迭代优化”原则,全流程分6个核心阶段,周期2-6个月(视业务规模调整):
(1)需求调研与规划
- 核心目标:明确“为什么建仓”和“建什么”,避免脱离业务的盲目开发。
- 关键任务:
- 业务需求访谈:联合运营、产品、财务部门,通过场景调研梳理核心诉求(如“实时GMV监控”“用户留存分析”),输出《业务需求清单》并标注优先级(P0/P1/P2)。
- 数据源全景梳理:盘点全公司数据来源,包括结构化数据(MySQL订单表、用户表)、非结构化数据(APP埋点日志)、外部数据(第三方广告数据),输出《数据源清单》,含字段含义、更新频率(实时/小时级/T+1)、数据量、负责人。
- 范围界定:确定首期建设业务域(如交易、用户、商品域),明确“不做什么”(如暂不接入海外业务数据),避免范围蔓延。
- 产出物:《数仓建设需求规格说明书》《数据源清单》《业务域优先级矩阵》。
(2)架构设计与规范制定
- 核心目标:搭建可扩展、易维护架构,统一开发标准,降低迭代成本。
- 关键任务:
- 分层架构设计:按“数据流向”和“粒度”划分层级:
- ODS层(操作数据层):存储原始数据,保留数据源格式,仅做结构化存储(如日志转Hive表),作为“数据入口”。
- DWD层(明细数据层):清洗(去重、过滤异常)、脱敏(手机号加密)、标准化(时间格式统一),输出干净明细数据。
- DWM层(中间数据层):轻度聚合+通用加工(如按“用户+小时”聚合行为数据),生成可复用中间结果,减少上层重复计算。
- DWS层(汇总数据层):按业务主题(如“商品销售”)高度聚合,生成指标化数据(如日销售额、支付用户数)。
- ADS层(应用数据层):输出报表/看板数据,同步至MySQL/ClickHouse供BI工具调用。
- 维度建模:采用Kimball星型模型(事实表+维度表):
- 事实表:记录业务事件(如订单表),含度量值(金额)和维度外键(用户ID);
- 维度表:描述上下文(如用户表),数据稳定,更新频率低。 输出《数仓模型ER图》,明确表间关联关系。
- 规范制定:
- 命名规范:表名“层级_业务域_用途_粒度”(如
dws_trade_sales_day
),字段名“业务含义_类型”(如user_login_time_datetime
); - SQL规范:需加字段注释,禁止
SELECT *
,子查询嵌套不超过3层; - 版本控制:用Git管理脚本,提交信息规范(如“[新增] dwd_trade_order表”)。
- 命名规范:表名“层级_业务域_用途_粒度”(如
- 工具链选型:
- 存储:HDFS(海量数据)、HBase(实时读写);
- 计算:Spark(批/流处理)、Hive(SQL引擎);
- 调度:Airflow(任务依赖管理);
- 质量监控:Griffin(数据校验);
- 元数据:Atlas(表血缘管理)。
- 分层架构设计:按“数据流向”和“粒度”划分层级:
- 产出物:《数仓架构设计文档》《模型ER图》《开发规范手册》《工具链清单》。
(3)ODS层开
- 核心目标:同步多源数据至数仓,确保“不丢、不重、不乱”。
- 关键任务:
- 数据同步脚本开发:
- 结构化数据(MySQL):用Sqoop全量同步历史数据,增量同步基于“时间戳”(如
update_time > 上一次同步时间
); - 日志数据:Flume采集→Kafka缓存→Spark Streaming写入Hive ODS表,延迟<5分钟;
- 外部数据:API调用第三方数据,转结构化后写入ODS表。
- 结构化数据(MySQL):用Sqoop全量同步历史数据,增量同步基于“时间戳”(如
- ODS表创建:用Hive外部表关联HDFS路径,表结构与数据源一致(如日志表含“用户ID、行为类型、时间戳”)。
- 同步校验:每日比对源表与ODS表记录数(误差率≤0.1%),检查核心字段(如
order_id
)空值率≤0.01%。
- 数据同步脚本开发:
- 产出物:ODS层表(10-30张)、同步脚本(Sqoop/Flume配置)、校验脚本。
(4)分层开发(DWD→DWS→ADS)
- 产出物:各层表(50-200张)、ETL脚本、表结构文档。
ADS层开发:输出报表数据,如ads_trade_sales_report
(销售日报含同比):
INSERT OVERWRITE TABLE ads_trade_sales_report PARTITION (dt='${dt}')
SELECT
goods_category,
total_sales,
ROUND((total_sales - last_year_sales)/last_year_sales*100,2) AS yoy_growth
FROM (
SELECT
goods_category,
total_sales,
LAG(total_sales, 365) OVER (PARTITION BY goods_category ORDER BY dt) AS last_year_sales
FROM dws_trade_sales_day
WHERE dt BETWEEN DATE_SUB('${dt}',365) AND '${dt}'
) t WHERE dt='${dt}';
DWS层开发:主题聚合,如dws_trade_sales_day
(按品类+地区聚合日销售):
INSERT OVERWRITE TABLE dws_trade_sales_day PARTITION (dt='${dt}')
SELECT
g.category AS goods_category,
u.region,
COUNT(DISTINCT o.order_id) AS order_cnt,
SUM(o.amount) AS total_sales
FROM dwd_trade_order_detail o
LEFT JOIN dim_goods g ON o.goods_id = g.goods_id
LEFT JOIN dim_user u ON o.user_id = u.user_id
WHERE o.pay_status=1
GROUP BY g.category, u.region;
DWM层开发:轻度聚合,如dwm_trade_order_hourly
(按小时+支付状态聚合):
INSERT OVERWRITE TABLE dwm_trade_order_hourly PARTITION (dt='${dt}')
SELECT
DATE_FORMAT(create_time, 'HH') AS hour,
pay_status,
COUNT(DISTINCT order_id) AS order_cnt,
SUM(amount) AS total_amount
FROM dwd_trade_order_detail
GROUP BY DATE_FORMAT(create_time, 'HH'), pay_status;
DWD层开发:清洗+脱敏+标准化,示例SQL:
INSERT OVERWRITE TABLE dwd_trade_order_detail PARTITION (dt='${dt}')
SELECT
order_id,
user_id,
goods_id,
COALESCE(amount, 0) AS amount, -- 空值补0
FROM_UNIXTIME(pay_time/1000, 'yyyy-MM-dd HH:mm:ss') AS pay_time, -- 时间戳转格式
pay_status,
FROM_UNIXTIME(create_time/1000, 'yyyy-MM-dd HH:mm:ss') AS create_time
FROM ods_trade_order
WHERE dt='${dt}'
AND order_id IS NOT NULL -- 过滤空值
AND amount > 0; -- 过滤异常金额
(5)测试与上线
- 数据测试:验证准确性(DWD与ODS订单数误差≤0.1%)、完整性(核心字段空值率≤0.01%)、一致性(跨表关联字段匹配)。
- 性能测试:批处理任务在时效内完成(如每日6点前产出数据),实时任务延迟<5分钟,CPU/内存使用率≤80%。
- 灰度上线:先上线交易域,选择运营部试用,收集反馈后全量发布,输出《数仓使用手册》。
- 产出物:《测试报告》《上线方案》《使用手册》。
(6)运维与迭代
- 日常运维:监控任务状态(失败自动重试+告警)、数据质量(Griffin校验+日报)、集群资源(磁盘/内存使用率)。
- 问题处理:2小时内解决数据异常(如脚本Bug),优化低效任务(如合并小文件)。
- 迭代优化:按业务新增需求(如“跨境业务域”)扩展模型,每季度复盘架构并优化(如用Flink替代Spark Streaming)。
- 产出物:《运维日志》《问题台账》《迭代方案》。
补充注意要点
- 规范先行:命名、SQL等规范需严格执行,避免后期“表名混乱”“字段含义模糊”;
- 业务对齐:每阶段同步业务方,确保数仓输出匹配需求(如报表指标与财务口径一致);
- 渐进式建设:中小公司可简化层级(ODS→DWD→DWS),避免过度设计;
- 文档化:所有设计、开发、运维过程形成文档,便于团队协作与知识传承。
2. Spark任务2-3小时执行,Stage优化方案
考察知识点
Spark Stage划分原理(宽/窄依赖)、Stage瓶颈(Shuffle、数据倾斜、资源不足)、优化策略(减少Stage、Shuffle调优、资源配置)
参考回答
Spark任务耗时过长,核心因Stage划分不合理、Shuffle开销大、数据倾斜或资源不足,需从四方面优化:
(1)减少Stage数量(避免不必要宽依赖)
Spark按依赖划分Stage:窄依赖(map
/filter
)不拆分Stage,宽依赖(groupByKey
/join
)触发Shuffle并拆分Stage。Stage越多,调度开销越大。
用coalesce
替代repartition
:减少分区数时用coalesce
(无Shuffle),避免repartition
(有Shuffle):
val df = df.coalesce(100) // 将1000个分区合并为100个,无Shuffle
合并连续Shuffle算子:将“groupBy(A)
+groupBy(B)
”改为groupBy(A,B)
,减少一次Shuffle:
// 优化前(2个Stage)
val step1 = df.groupBy("user_id").agg(count("order_id").alias("cnt"))
val step2 = step1.groupBy("date").agg(count("user_id").alias("user_cnt"))
// 优化后(1个Stage)
val result = df.groupBy("date","user_id").agg(count("order_id").alias("cnt"))
.groupBy("date").agg(count("user_id").alias("user_cnt"))
广播Join替代普通Join:小表(<100MB)广播至Executor,避免Shuffle:
import org.apache.spark.sql.functions.broadcast
val largeDF = spark.read.table("dwd_trade_order") // 100GB
val smallDF = spark.read.table("dim_user") // 50MB
val result = largeDF.join(broadcast(smallDF), "user_id") // 无Shuffle
(2)优化Shuffle性能(降低IO与网络开销)
Shuffle涉及“Map排序→分区→磁盘写入→Reduce拉取→合并”,是性能瓶颈核心。
- 优化Shuffle排序参数:
- 加大Map端内存缓冲区:
spark.shuffle.file.buffer=64k
(默认32k),减少溢写次数; - 关闭不必要排序:
spark.shuffle.sort.bypassMergeThreshold=200
(分区数≤200时跳过排序)。
- 加大Map端内存缓冲区:
合并小文件:读取前合并Hive小文件,避免Shuffle“小文件风暴”:
// 读取时合并(Spark SQL)
spark.sql("SET hive.merge.mapfiles=true")
spark.sql("SET hive.merge.size.per.task=256000000") // 合并后文件256MB
val df = spark.read.table("ods_trade_order")
// 写入时控制文件数
df.write.option("maxRecordsPerFile", 1000000).saveAsTable("dwd_trade_order")
压缩Shuffle数据:启用Snappy压缩(平衡压缩率与速度),减少IO与网络传输:
spark.conf.set("spark.shuffle.compress", "true") // Map输出压缩
spark.conf.set("spark.shuffle.spill.compress", "true") // 溢写压缩
spark.conf.set("spark.io.compression.codec", "snappy") // 压缩算法
调整Shuffle并行度:默认spark.sql.shuffle.partitions=200
,数据量大时设为CPU总核心数2-4倍(如80核集群设800),确保单Task处理100-200MB数据:
spark.conf.set("spark.sql.shuffle.partitions", "800") // 代码配置
// 提交命令配置:spark-submit --conf spark.sql.shuffle.partitions=800 ...
(3)解决数据倾斜(消除长尾Stage)
数据倾斜指某Task处理数据量远超其他(如某Task处理10GB,其他100MB),需先定位再处理。
- 处理方案:
启用推测执行:为慢Task启动备份Task,哪个先完成用哪个结果(应急方案):
spark.conf.set("spark.speculation", "true")
spark.conf.set("spark.speculation.multiplier", "1.5") // 耗时超平均1.5倍启动备份
空值处理:user_id=null
等无效值直接过滤或加盐:
// 过滤无意义空值
val filteredDF = df.filter(col("user_id").isNotNull)
// 有意义空值加盐
val saltedDF = df.withColumn("user_id",
when(col("user_id").isNull, concat(lit("null_"), floor(rand()*10)))
.otherwise(col("user_id"))
)
Key加盐:对热点Key加随机前缀(如10086
→10086_010086_9
),拆分后聚合:
val saltedDF = df.withColumn("salted_key",
when(col("user_id")==="10086", concat(col("user_id"), lit("_"), floor(rand()*10)))
.otherwise(col("user_id"))
)
val aggDF = saltedDF.groupBy("salted_key").agg(count("order_id").alias("cnt"))
val result = aggDF.withColumn("user_id", split(col("salted_key"), "_")(0))
.groupBy("user_id").agg(sum("cnt").alias("cnt"))
定位倾斜Key:通过Spark UI(Stage→Tasks→Shuffle Read)找到数据量异常Task,再通过SQL查询倾斜Key:
SELECT user_id, COUNT(*) AS cnt FROM dwd_trade_order
GROUP BY user_id ORDER BY cnt DESC LIMIT 10; // 取Top10高频Key
(4)优化资源配置(提升计算能力)
- 优化数据本地化:设置
spark.locality.wait=3000
(等待3秒本地化执行),优先读取本地数据,减少网络IO。
动态资源分配:按需调整Executor数量,提高资源利用率:
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.minExecutors", "5")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "50")
增加Executor资源:配置Executor数量、核心数、内存,提升并行能力:
spark-submit \\\\
--num-executors 30 \\\\ # 30个Executor
--executor-cores 4 \\\\ # 每个4核
--executor-memory 16G \\\\ # 每个16G内存
--driver-memory 8G \\\\ # Driver内存8G
--class com.example.SparkJob job.jar
补充注意要点
- 先定位瓶颈:通过Spark UI分析Stage耗时占比(如Shuffle占60%则优先优化Shuffle),避免盲目调参;
- 小步迭代:每次优化一个点(如先调并行度),验证效果后再调整其他项;
- 结合场景:实时任务(监控大屏)可牺牲精度(用
approx_count_distinct
),离线报表需保证准确性; - 避免过度优化:任务耗时若可接受(如2小时不影响业务),无需投入过多资源。
3. ODS接入的交易系统数据质量差,通过DWD层过滤解决
考察知识点
数据质量治理全流程(事前预防、事中清洗、事后监控)、DWD清洗策略(去重/补全/过滤)、DQC工具应用、业务协同机制
参考回答
ODS层交易数据质量差(重复、空值、格式混乱),需通过“业务定标准+技术做清洗+DQC控质量”闭环解决:
This post is for subscribers on the 网站会员 and 成为小万的高级会员 tiers only
Subscribe NowAlready have an account? Sign In