1. 完整数仓的搭建流程
  2. 一个spark任务2,3h执行,stage id 优化方案
  3. ods接入的交易系统数据质量差,通过dwd层进行过滤,怎么解决?
  4. 慢节点
  5. rss
  6. clickhouse和doris的区别、使用性能上的差异
  7. 怎么学习-看书,大数据之路这本书的收获
  8. mr中shuffle的优化手段

1. 完整数仓的搭建流程

考察知识点

数仓建设全生命周期管理(需求到运维)、分层架构设计逻辑、维度建模方法、工具链选型与协同、数据质量治理嵌入节点

参考回答

数仓搭建是围绕“业务价值落地”的系统性工程,需遵循“业务驱动、规范先行、迭代优化”原则,全流程分6个核心阶段,周期2-6个月(视业务规模调整):

(1)需求调研与规划

  • 核心目标:明确“为什么建仓”和“建什么”,避免脱离业务的盲目开发。
  • 关键任务:
    1. 业务需求访谈:联合运营、产品、财务部门,通过场景调研梳理核心诉求(如“实时GMV监控”“用户留存分析”),输出《业务需求清单》并标注优先级(P0/P1/P2)。
    2. 数据源全景梳理:盘点全公司数据来源,包括结构化数据(MySQL订单表、用户表)、非结构化数据(APP埋点日志)、外部数据(第三方广告数据),输出《数据源清单》,含字段含义、更新频率(实时/小时级/T+1)、数据量、负责人。
    3. 范围界定:确定首期建设业务域(如交易、用户、商品域),明确“不做什么”(如暂不接入海外业务数据),避免范围蔓延。
  • 产出物:《数仓建设需求规格说明书》《数据源清单》《业务域优先级矩阵》。

(2)架构设计与规范制定

  • 核心目标:搭建可扩展、易维护架构,统一开发标准,降低迭代成本。
  • 关键任务:
    1. 分层架构设计:按“数据流向”和“粒度”划分层级:
      • ODS层(操作数据层):存储原始数据,保留数据源格式,仅做结构化存储(如日志转Hive表),作为“数据入口”。
      • DWD层(明细数据层):清洗(去重、过滤异常)、脱敏(手机号加密)、标准化(时间格式统一),输出干净明细数据。
      • DWM层(中间数据层):轻度聚合+通用加工(如按“用户+小时”聚合行为数据),生成可复用中间结果,减少上层重复计算。
      • DWS层(汇总数据层):按业务主题(如“商品销售”)高度聚合,生成指标化数据(如日销售额、支付用户数)。
      • ADS层(应用数据层):输出报表/看板数据,同步至MySQL/ClickHouse供BI工具调用。
    2. 维度建模:采用Kimball星型模型(事实表+维度表):
      • 事实表:记录业务事件(如订单表),含度量值(金额)和维度外键(用户ID);
      • 维度表:描述上下文(如用户表),数据稳定,更新频率低。 输出《数仓模型ER图》,明确表间关联关系。
    3. 规范制定
      • 命名规范:表名“层级_业务域_用途_粒度”(如dws_trade_sales_day),字段名“业务含义_类型”(如user_login_time_datetime);
      • SQL规范:需加字段注释,禁止SELECT *,子查询嵌套不超过3层;
      • 版本控制:用Git管理脚本,提交信息规范(如“[新增] dwd_trade_order表”)。
    4. 工具链选型
      • 存储:HDFS(海量数据)、HBase(实时读写);
      • 计算:Spark(批/流处理)、Hive(SQL引擎);
      • 调度:Airflow(任务依赖管理);
      • 质量监控:Griffin(数据校验);
      • 元数据:Atlas(表血缘管理)。
  • 产出物:《数仓架构设计文档》《模型ER图》《开发规范手册》《工具链清单》。

(3)ODS层开

  • 核心目标:同步多源数据至数仓,确保“不丢、不重、不乱”。
  • 关键任务:
    1. 数据同步脚本开发
      • 结构化数据(MySQL):用Sqoop全量同步历史数据,增量同步基于“时间戳”(如update_time > 上一次同步时间);
      • 日志数据:Flume采集→Kafka缓存→Spark Streaming写入Hive ODS表,延迟<5分钟;
      • 外部数据:API调用第三方数据,转结构化后写入ODS表。
    2. ODS表创建:用Hive外部表关联HDFS路径,表结构与数据源一致(如日志表含“用户ID、行为类型、时间戳”)。
    3. 同步校验:每日比对源表与ODS表记录数(误差率≤0.1%),检查核心字段(如order_id)空值率≤0.01%。
  • 产出物:ODS层表(10-30张)、同步脚本(Sqoop/Flume配置)、校验脚本。

(4)分层开发(DWD→DWS→ADS)

  • 产出物:各层表(50-200张)、ETL脚本、表结构文档。

ADS层开发:输出报表数据,如ads_trade_sales_report(销售日报含同比):

INSERT OVERWRITE TABLE ads_trade_sales_report PARTITION (dt='${dt}')
SELECT
  goods_category,
  total_sales,
  ROUND((total_sales - last_year_sales)/last_year_sales*100,2) AS yoy_growth
FROM (
  SELECT
    goods_category,
    total_sales,
    LAG(total_sales, 365) OVER (PARTITION BY goods_category ORDER BY dt) AS last_year_sales
  FROM dws_trade_sales_day
  WHERE dt BETWEEN DATE_SUB('${dt}',365) AND '${dt}'
) t WHERE dt='${dt}';

DWS层开发:主题聚合,如dws_trade_sales_day(按品类+地区聚合日销售):

INSERT OVERWRITE TABLE dws_trade_sales_day PARTITION (dt='${dt}')
SELECT
  g.category AS goods_category,
  u.region,
  COUNT(DISTINCT o.order_id) AS order_cnt,
  SUM(o.amount) AS total_sales
FROM dwd_trade_order_detail o
LEFT JOIN dim_goods g ON o.goods_id = g.goods_id
LEFT JOIN dim_user u ON o.user_id = u.user_id
WHERE o.pay_status=1
GROUP BY g.category, u.region;

DWM层开发:轻度聚合,如dwm_trade_order_hourly(按小时+支付状态聚合):

INSERT OVERWRITE TABLE dwm_trade_order_hourly PARTITION (dt='${dt}')
SELECT
  DATE_FORMAT(create_time, 'HH') AS hour,
  pay_status,
  COUNT(DISTINCT order_id) AS order_cnt,
  SUM(amount) AS total_amount
FROM dwd_trade_order_detail
GROUP BY DATE_FORMAT(create_time, 'HH'), pay_status;

DWD层开发:清洗+脱敏+标准化,示例SQL:

INSERT OVERWRITE TABLE dwd_trade_order_detail PARTITION (dt='${dt}')
SELECT
  order_id,
  user_id,
  goods_id,
  COALESCE(amount, 0) AS amount, -- 空值补0
  FROM_UNIXTIME(pay_time/1000, 'yyyy-MM-dd HH:mm:ss') AS pay_time, -- 时间戳转格式
  pay_status,
  FROM_UNIXTIME(create_time/1000, 'yyyy-MM-dd HH:mm:ss') AS create_time
FROM ods_trade_order
WHERE dt='${dt}'
  AND order_id IS NOT NULL -- 过滤空值
  AND amount > 0; -- 过滤异常金额

(5)测试与上线

  • 数据测试:验证准确性(DWD与ODS订单数误差≤0.1%)、完整性(核心字段空值率≤0.01%)、一致性(跨表关联字段匹配)。
  • 性能测试:批处理任务在时效内完成(如每日6点前产出数据),实时任务延迟<5分钟,CPU/内存使用率≤80%。
  • 灰度上线:先上线交易域,选择运营部试用,收集反馈后全量发布,输出《数仓使用手册》。
  • 产出物:《测试报告》《上线方案》《使用手册》。

(6)运维与迭代

  • 日常运维:监控任务状态(失败自动重试+告警)、数据质量(Griffin校验+日报)、集群资源(磁盘/内存使用率)。
  • 问题处理:2小时内解决数据异常(如脚本Bug),优化低效任务(如合并小文件)。
  • 迭代优化:按业务新增需求(如“跨境业务域”)扩展模型,每季度复盘架构并优化(如用Flink替代Spark Streaming)。
  • 产出物:《运维日志》《问题台账》《迭代方案》。

补充注意要点

  • 规范先行:命名、SQL等规范需严格执行,避免后期“表名混乱”“字段含义模糊”;
  • 业务对齐:每阶段同步业务方,确保数仓输出匹配需求(如报表指标与财务口径一致);
  • 渐进式建设:中小公司可简化层级(ODS→DWD→DWS),避免过度设计;
  • 文档化:所有设计、开发、运维过程形成文档,便于团队协作与知识传承。

2. Spark任务2-3小时执行,Stage优化方案

考察知识点

Spark Stage划分原理(宽/窄依赖)、Stage瓶颈(Shuffle、数据倾斜、资源不足)、优化策略(减少Stage、Shuffle调优、资源配置)

参考回答

Spark任务耗时过长,核心因Stage划分不合理、Shuffle开销大、数据倾斜或资源不足,需从四方面优化:

(1)减少Stage数量(避免不必要宽依赖)

Spark按依赖划分Stage:窄依赖(map/filter)不拆分Stage,宽依赖(groupByKey/join)触发Shuffle并拆分Stage。Stage越多,调度开销越大。

coalesce替代repartition:减少分区数时用coalesce(无Shuffle),避免repartition(有Shuffle):

val df = df.coalesce(100) // 将1000个分区合并为100个,无Shuffle

合并连续Shuffle算子:将“groupBy(A)+groupBy(B)”改为groupBy(A,B),减少一次Shuffle:

// 优化前(2个Stage)
val step1 = df.groupBy("user_id").agg(count("order_id").alias("cnt"))
val step2 = step1.groupBy("date").agg(count("user_id").alias("user_cnt"))
// 优化后(1个Stage)
val result = df.groupBy("date","user_id").agg(count("order_id").alias("cnt"))
               .groupBy("date").agg(count("user_id").alias("user_cnt"))

广播Join替代普通Join:小表(<100MB)广播至Executor,避免Shuffle:

import org.apache.spark.sql.functions.broadcast
val largeDF = spark.read.table("dwd_trade_order") // 100GB
val smallDF = spark.read.table("dim_user") // 50MB
val result = largeDF.join(broadcast(smallDF), "user_id") // 无Shuffle

(2)优化Shuffle性能(降低IO与网络开销)

Shuffle涉及“Map排序→分区→磁盘写入→Reduce拉取→合并”,是性能瓶颈核心。

  • 优化Shuffle排序参数
    • 加大Map端内存缓冲区:spark.shuffle.file.buffer=64k(默认32k),减少溢写次数;
    • 关闭不必要排序:spark.shuffle.sort.bypassMergeThreshold=200(分区数≤200时跳过排序)。

合并小文件:读取前合并Hive小文件,避免Shuffle“小文件风暴”:

// 读取时合并(Spark SQL)
spark.sql("SET hive.merge.mapfiles=true")
spark.sql("SET hive.merge.size.per.task=256000000") // 合并后文件256MB
val df = spark.read.table("ods_trade_order")
// 写入时控制文件数
df.write.option("maxRecordsPerFile", 1000000).saveAsTable("dwd_trade_order")

压缩Shuffle数据:启用Snappy压缩(平衡压缩率与速度),减少IO与网络传输:

spark.conf.set("spark.shuffle.compress", "true") // Map输出压缩
spark.conf.set("spark.shuffle.spill.compress", "true") // 溢写压缩
spark.conf.set("spark.io.compression.codec", "snappy") // 压缩算法

调整Shuffle并行度:默认spark.sql.shuffle.partitions=200,数据量大时设为CPU总核心数2-4倍(如80核集群设800),确保单Task处理100-200MB数据:

spark.conf.set("spark.sql.shuffle.partitions", "800") // 代码配置
// 提交命令配置:spark-submit --conf spark.sql.shuffle.partitions=800 ...

(3)解决数据倾斜(消除长尾Stage)

数据倾斜指某Task处理数据量远超其他(如某Task处理10GB,其他100MB),需先定位再处理。

  • 处理方案

启用推测执行:为慢Task启动备份Task,哪个先完成用哪个结果(应急方案):

spark.conf.set("spark.speculation", "true")
spark.conf.set("spark.speculation.multiplier", "1.5") // 耗时超平均1.5倍启动备份

空值处理user_id=null等无效值直接过滤或加盐:

// 过滤无意义空值
val filteredDF = df.filter(col("user_id").isNotNull)
// 有意义空值加盐
val saltedDF = df.withColumn("user_id",
  when(col("user_id").isNull, concat(lit("null_"), floor(rand()*10)))
  .otherwise(col("user_id"))
)

Key加盐:对热点Key加随机前缀(如1008610086_010086_9),拆分后聚合:

val saltedDF = df.withColumn("salted_key",
  when(col("user_id")==="10086", concat(col("user_id"), lit("_"), floor(rand()*10)))
  .otherwise(col("user_id"))
)
val aggDF = saltedDF.groupBy("salted_key").agg(count("order_id").alias("cnt"))
val result = aggDF.withColumn("user_id", split(col("salted_key"), "_")(0))
                  .groupBy("user_id").agg(sum("cnt").alias("cnt"))

定位倾斜Key:通过Spark UI(Stage→Tasks→Shuffle Read)找到数据量异常Task,再通过SQL查询倾斜Key:

SELECT user_id, COUNT(*) AS cnt FROM dwd_trade_order
GROUP BY user_id ORDER BY cnt DESC LIMIT 10; // 取Top10高频Key

(4)优化资源配置(提升计算能力)

  • 优化数据本地化:设置spark.locality.wait=3000(等待3秒本地化执行),优先读取本地数据,减少网络IO。

动态资源分配:按需调整Executor数量,提高资源利用率:

spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.minExecutors", "5")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "50")

增加Executor资源:配置Executor数量、核心数、内存,提升并行能力:

spark-submit \\\\
  --num-executors 30 \\\\ # 30个Executor
  --executor-cores 4 \\\\ # 每个4核
  --executor-memory 16G \\\\ # 每个16G内存
  --driver-memory 8G \\\\ # Driver内存8G
  --class com.example.SparkJob job.jar

补充注意要点

  • 先定位瓶颈:通过Spark UI分析Stage耗时占比(如Shuffle占60%则优先优化Shuffle),避免盲目调参;
  • 小步迭代:每次优化一个点(如先调并行度),验证效果后再调整其他项;
  • 结合场景:实时任务(监控大屏)可牺牲精度(用approx_count_distinct),离线报表需保证准确性;
  • 避免过度优化:任务耗时若可接受(如2小时不影响业务),无需投入过多资源。

3. ODS接入的交易系统数据质量差,通过DWD层过滤解决

考察知识点

数据质量治理全流程(事前预防、事中清洗、事后监控)、DWD清洗策略(去重/补全/过滤)、DQC工具应用、业务协同机制

参考回答

ODS层交易数据质量差(重复、空值、格式混乱),需通过“业务定标准+技术做清洗+DQC控质量”闭环解决:

This post is for subscribers on the 网站会员 and 成为小万的高级会员 tiers only

Subscribe Now

Already have an account?