1 介绍自己,讲几个你熟悉的项目
2 数据倾斜怎么处理
3 spark的宽窄依赖
4 数仓模型分层 分层有啥好处
5 有了解过画像吗,自己怎么实现的,有运用算法吗
6 数仓数据质量监控和数据治理怎么实现
7 有做过实时吗,实时怎么实现数据不延迟,如果稳定可靠产出实时指标
面试问题详解(电商场景为例)
1. 自我介绍和项目经验
项目背景: 我负责某电商平台的数据仓库建设,日处理订单数据500万+,用户行为数据10亿+条。
核心项目:
项目一:电商实时数仓建设
- 技术栈: Flink + Kafka + ClickHouse + Hive
- 业务场景: 实现商品销售、用户行为、库存变化的实时监控
- 核心成果: 实时指标延迟从30分钟降低到秒级,支撑双11等大促活动
项目二:用户画像系统
- 技术栈: Spark + HBase + Redis + TensorFlow
- 业务价值: 构建360度用户标签体系,CTR提升25%,个性化推荐准确率提升30%
项目三:数据质量治理平台
- 解决痛点: 从人工发现问题到自动化监控,数据质量问题发现时间从小时级降到分钟级
2. 数据倾斜处理方案
电商场景中的数据倾斜问题
典型场景:
- 热门商品订单数据倾斜(爆款商品订单量是普通商品的1000倍)
- 用户行为数据倾斜(头部用户行为数据占总量60%)
- 地域维度倾斜(一线城市订单量远超其他地区)
解决方案
1. 预聚合(适用于订单统计场景)
-- 问题:某爆款商品订单量巨大导致倾斜
-- 解决:先按商品+小时预聚合,再汇总到天
WITH hourly_agg AS (
SELECT
product_id,
DATE_FORMAT(order_time, 'yyyy-MM-dd-HH') as hour,
COUNT(*) as order_cnt,
SUM(amount) as total_amount
FROM orders
WHERE dt = '2024-01-01'
GROUP BY product_id, DATE_FORMAT(order_time, 'yyyy-MM-dd-HH')
)
SELECT
product_id,
SUM(order_cnt) as daily_orders,
SUM(total_amount) as daily_amount
FROM hourly_agg
GROUP BY product_id
2. 加盐法(适用于用户行为分析)
-- 问题:头部用户行为数据倾斜
-- 解决:给用户ID加随机盐值,分散计算
SELECT
user_id,
action_type,
COUNT(*) as action_cnt
FROM (
SELECT
user_id,
action_type,
CONCAT(user_id, '_', FLOOR(RAND() * 100)) as salted_user_id
FROM user_behavior
WHERE dt = '2024-01-01'
) t
GROUP BY user_id, action_type
3. 两阶段聚合
-- 第一阶段:加盐分散
-- 第二阶段:去盐汇总
4. 调优参数
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=64MB
3. Spark宽窄依赖
定义对比
依赖类型 | 定义 | 数据传输 | 容错性 |
---|---|---|---|
窄依赖 | 父RDD每个分区最多被一个子RDD分区使用 | 无需Shuffle | 容错性好 |
宽依赖 | 父RDD分区被多个子RDD分区使用 | 需要Shuffle | 容错性差 |
电商场景实例
窄依赖示例
// 用户行为数据过滤和映射 - 窄依赖
val userBehavior = spark.read.table("user_behavior")
val filteredData = userBehavior
.filter($"action_type" === "click") // 窄依赖:filter
.map(row => (row.getString("user_id"), 1)) // 窄依赖:map
宽依赖示例
// 订单金额统计 - 宽依赖
val orderStats = orders
.groupBy("user_id") // 宽依赖:groupBy需要Shuffle
.agg(
sum("amount").as("total_amount"),
count("*").as("order_count")
)
.join(users, "user_id") // 宽依赖:join需要Shuffle
Stage划分规则
- Stage边界: 宽依赖操作会导致Stage切分
电商ETL流程:
数据读取 → filter/map → Stage1 ↓ (Shuffle)groupBy/join → 结果输出 → Stage2
4. 数仓分层设计
电商数仓分层架构
ODS层(操作数据存储层)
职责: 原始数据存储,保持与业务系统一致
-- 订单表
CREATE TABLE ods_order (
order_id STRING,
user_id STRING,
product_id STRING,
amount DECIMAL(10,2),
order_time TIMESTAMP,
status STRING,
dt STRING -- 分区字段
) PARTITIONED BY (dt)
DWD层(数据明细层)
职责: 数据清洗、标准化,业务过程建模
-- 清洗后的订单明细
CREATE TABLE dwd_order_detail (
order_id STRING,
user_id STRING,
product_id STRING,
category_id STRING,
brand_id STRING,
amount DECIMAL(10,2),
quantity INT,
order_time TIMESTAMP,
pay_time TIMESTAMP,
cancel_time TIMESTAMP,
order_status STRING,
dt STRING
) PARTITIONED BY (dt)
DWS层(数据服务层/轻度汇总层)
职责: 按主题域进行轻度汇总
-- 用户日粒度统计
CREATE TABLE dws_user_action_daycount (
user_id STRING,
login_count BIGINT,
cart_count BIGINT,
order_count BIGINT,
payment_count BIGINT,
total_amount DECIMAL(10,2),
dt STRING
) PARTITIONED BY (dt)
ADS层(数据应用层)
职责: 面向具体应用场景的数据集市
-- 商品销售报表
CREATE TABLE ads_product_sale_report (
product_id STRING,
product_name STRING,
category_name STRING,
brand_name STRING,
sale_count BIGINT,
sale_amount DECIMAL(10,2),
profit_amount DECIMAL(10,2),
dt STRING
) PARTITIONED BY (dt)
分层好处
1. 职责清晰
- ODS: 数据接入,保持原始性
- DWD: 数据清洗,统一标准
- DWS: 公共指标,复用性强
- ADS: 业务应用,直接服务
2. 降低复杂度
-- 不分层:复杂的一次性查询
SELECT
p.category_name,
SUM(o.amount) as total_sales
FROM ods_order o
JOIN ods_product p ON o.product_id = p.product_id
WHERE o.status = 'paid'
AND o.dt = '2024-01-01'
GROUP BY p.category_name;
-- 分层:简单的层次查询
SELECT category_name, total_sales
FROM ads_category_sale_report
WHERE dt = '2024-01-01';
3. 提升性能
- 预计算: DWS层预聚合常用指标
- 减少重复: 避免重复的数据清洗和关联
- 分区裁剪: 合理的分区设计
4. 数据治理
- 血缘关系: 清晰的数据流向
- 影响分析: 上游变更影响评估
- 质量监控: 分层设置质量检查点
5. 用户画像系统
电商用户画像架构
标签体系设计
用户画像标签体系
├── 人口属性标签
│ ├── 性别、年龄、城市等级
│ └── 职业、收入水平、教育程度
├── 行为属性标签
│ ├── 访问频次、停留时长、浏览深度
│ └── 购买频次、客单价、复购周期
├── 消费偏好标签
│ ├── 品类偏好、品牌偏好、价格敏感度
│ └── 促销敏感度、新品偏好度
└── 风险控制标签
├── 信用等级、退货率、投诉率
└── 黄牛识别、羊毛党识别
实现架构
# 特征工程示例
class UserFeatureEngineer:
def __init__(self):
self.spark = SparkSession.builder.appName("UserProfile").getOrCreate()
def extract_behavior_features(self, user_id):
"""提取行为特征"""
# 最近30天行为统计
behavior_sql = f"""
SELECT
user_id,
COUNT(CASE WHEN action_type='click' THEN 1 END) as click_cnt_30d,
COUNT(CASE WHEN action_type='cart' THEN 1 END) as cart_cnt_30d,
COUNT(CASE WHEN action_type='purchase' THEN 1 END) as purchase_cnt_30d,
AVG(session_duration) as avg_session_duration,
MAX(CASE WHEN action_type='purchase' THEN create_time END) as last_purchase_time
FROM user_behavior
WHERE user_id = '{user_id}'
AND dt >= date_sub(current_date(), 30)
GROUP BY user_id
"""
return self.spark.sql(behavior_sql)
def extract_preference_features(self, user_id):
"""提取偏好特征"""
preference_sql = f"""
SELECT
user_id,
-- 品类偏好(TF-IDF)
category_preference_vector,
-- 价格偏好
AVG(amount) as avg_order_amount,
STDDEV(amount) as price_variance,
-- 时间偏好
HOUR(order_time) as prefer_hour,
DAYOFWEEK(order_time) as prefer_day
FROM dwd_order_detail
WHERE user_id = '{user_id}'
AND dt >= date_sub(current_date(), 90)
"""
return self.spark.sql(preference_sql)
算法应用
1. 聚类算法(K-means)
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
# 用户价值分层
def user_value_clustering():
# 特征向量化
assembler = VectorAssembler(
inputCols=["recency", "frequency", "monetary"],
outputCol="features"
)
# K-means聚类
kmeans = KMeans(k=5, seed=1)
model = kmeans.fit(user_rfm_features)
# 用户分群:高价值、中价值、低价值、新用户、流失用户
return model.transform(user_rfm_features)
2. 协同过滤
from pyspark.ml.recommendation import ALS
# 商品推荐
def collaborative_filtering():
als = ALS(
maxIter=10,
regParam=0.1,
userCol="user_id",
itemCol="product_id",
ratingCol="rating"
)
model = als.fit(user_item_rating)
return model.recommendForAllUsers(10)
3. 特征工程
-- 时间窗口特征
WITH user_metrics AS (
SELECT
user_id,
-- 7天特征
SUM(CASE WHEN dt >= date_sub(current_date(), 7) THEN amount ELSE 0 END) as amount_7d,
-- 30天特征
SUM(CASE WHEN dt >= date_sub(current_date(), 30) THEN amount ELSE 0 END) as amount_30d,
-- 趋势特征
amount_7d / NULLIF(amount_30d, 0) as recent_trend
FROM dwd_order_detail
GROUP BY user_id
)
标签更新策略
- 实时标签: Flink处理,秒级更新(如当前浏览商品)
- 准实时标签: 每小时更新(如今日消费金额)
- 离线标签: T+1更新(如消费偏好、生命周期)
6. 数据质量监控和治理
电商数仓质量监控体系
质量监控维度
1. 完整性监控
-- 订单数据完整性检查
CREATE VIEW data_quality_completeness AS
SELECT
dt,
'order' as table_name,
COUNT(*) as total_records,
COUNT(order_id) as non_null_order_id,
COUNT(user_id) as non_null_user_id,
COUNT(amount) as non_null_amount,
-- 完整性率
COUNT(order_id) / COUNT(*) as order_id_completeness_rate,
COUNT(user_id) / COUNT(*) as user_id_completeness_rate
FROM ods_order
WHERE dt = '${bizdate}'
GROUP BY dt;
2. 一致性监控
-- 跨表一致性检查:订单总数与支付总数
WITH order_count AS (
SELECT dt, COUNT(*) as order_cnt
FROM ods_order
WHERE status IN ('paid', 'shipped', 'delivered')
AND dt = '${bizdate}'
GROUP BY dt
),
payment_count AS (
SELECT dt, COUNT(*) as payment_cnt
FROM ods_payment
WHERE status = 'success'
AND dt = '${bizdate}'
GROUP BY dt
)
SELECT
o.dt,
o.order_cnt,
p.payment_cnt,
ABS(o.order_cnt - p.payment_cnt) as diff_cnt,
ABS(o.order_cnt - p.payment_cnt) / o.order_cnt as diff_rate
FROM order_count o
JOIN payment_count p ON o.dt = p.dt;
3. 准确性监控
-- 业务规则检查
SELECT
dt,
COUNT(*) as total_orders,
-- 异常数据统计
COUNT(CASE WHEN amount <= 0 THEN 1 END) as negative_amount_cnt,
COUNT(CASE WHEN quantity <= 0 THEN 1 END) as negative_quantity_cnt,
COUNT(CASE WHEN order_time > pay_time THEN 1 END) as time_logic_error_cnt,
-- 异常率
COUNT(CASE WHEN amount <= 0 THEN 1 END) / COUNT(*) as negative_amount_rate
FROM dwd_order_detail
WHERE dt = '${bizdate}'
GROUP BY dt;
4. 时效性监控
# 数据时效性监控
def monitor_data_timeliness():
current_time = datetime.now()
# 检查各层数据更新时间
layers = ['ods', 'dwd', 'dws', 'ads']
for layer in layers:
latest_partition = get_latest_partition(f"{layer}_order")
delay_hours = (current_time - latest_partition).total_seconds() / 3600
# 告警阈值
if delay_hours > get_sla_threshold(layer):
send_alert(f"{layer}层数据延迟{delay_hours}小时")
治理实施方案
1. 元数据管理
{
"table_name": "dwd_order_detail",
"business_owner": "电商业务部",
"technical_owner": "数据平台团队",
"update_frequency": "daily",
"data_source": ["ods_order", "ods_product", "ods_user"],
"quality_rules": [
{
"rule_name": "amount_positive",
"rule_sql": "amount > 0",
"severity": "high"
}
],
"lineage": {
"upstream": ["ods_order", "ods_product"],
"downstream": ["dws_user_action_daycount", "ads_product_sale_report"]
}
}
2. 自动化质量检查
class DataQualityFramework:
def __init__(self):
self.rules = self.load_quality_rules()
def run_quality_check(self, table_name, partition):
results = []
for rule in self.rules.get(table_name, []):
try:
# 执行质量规则
result = self.execute_quality_rule(rule, table_name, partition)
results.append(result)
# 严重问题阻断后续流程
if result['severity'] == 'high' and result['pass_rate'] < 0.95:
raise DataQualityException(f"表{table_name}质量检查失败")
except Exception as e:
self.log_error(f"质量检查异常: {str(e)}")
return results
3. 血缘关系管理
-- 血缘关系表
CREATE TABLE data_lineage (
upstream_table STRING,
downstream_table STRING,
dependency_type STRING, -- direct, indirect
create_time TIMESTAMP,
update_time TIMESTAMP
);
-- 影响分析查询
WITH RECURSIVE lineage_tree AS (
-- 直接下游
SELECT downstream_table, 1 as level
FROM data_lineage
WHERE upstream_table = 'ods_order'
UNION ALL
-- 递归查询间接下游
SELECT dl.downstream_table, lt.level + 1
FROM data_lineage dl
JOIN lineage_tree lt ON dl.upstream_table = lt.downstream_table
WHERE lt.level < 5 -- 防止无限递归
)
SELECT * FROM lineage_tree ORDER BY level;
7. 实时数据处理
电商实时数仓架构
整体架构
业务系统 → Binlog → Canal → Kafka → Flink → ClickHouse
↓
Redis/HBase
↓
实时大屏/告警
核心技术方案
1. 数据采集 - Canal + Kafka
# Canal配置
canal.instance.mysql.slaveId=1234
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=mysql-bin.000001
canal.instance.master.position=107
# Kafka Topic设计
order_binlog_topic:
partitions: 12 # 根据并发度设计
replication-factor: 3
key: order_id # 保证同一订单有序
2. 实时计算 - Flink
// 订单实时统计
object OrderRealtimeAnalysis {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 数据源
val orderStream = env
.addSource(new FlinkKafkaConsumer[String]("order_binlog", new SimpleStringSchema(), properties))
.map(parseOrderEvent)
.assignTimestampsAndWatermarks(new OrderTimestampExtractor())
// 实时销售金额统计(5分钟窗口)
val salesStats = orderStream
.filter(_.eventType == "INSERT") // 新订单
.keyBy(_.categoryId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new SalesAggregateFunction())
// 输出到ClickHouse
salesStats.addSink(new ClickHouseSink())
env.execute("Order Realtime Analysis")
}
}
// 聚合函数
class SalesAggregateFunction extends AggregateFunction[OrderEvent, SalesAccumulator, SalesResult] {
override def add(order: OrderEvent, acc: SalesAccumulator): SalesAccumulator = {
acc.totalAmount += order.amount
acc.orderCount += 1
acc
}
override def getResult(acc: SalesAccumulator): SalesResult = {
SalesResult(acc.totalAmount, acc.orderCount, acc.totalAmount / acc.orderCount)
}
}
3. 延迟控制策略
Watermark策略
// 自适应Watermark
class AdaptiveTimestampExtractor extends AssignerWithPeriodicWatermarks[OrderEvent] {
private val maxOutOfOrderness = 30000L // 30秒
private var currentMaxTimestamp = 0L
override def extractTimestamp(element: OrderEvent, previousElementTimestamp: Long): Long = {
val timestamp = element.orderTime
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
timestamp
}
override def getCurrentWatermark: Watermark = {
new Watermark(currentMaxTimestamp - maxOutOfOrderness)
}
}
背压控制
# Flink配置优化
parallelism.default: 12
taskmanager.memory.process.size: 4g
taskmanager.memory.managed.fraction: 0.6
# Checkpoint配置
state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
execution.checkpointing.interval: 60000 # 1分钟
execution.checkpointing.mode: EXACTLY_ONCE
4. 可靠性保障
Exactly Once语义
// Kafka Sink配置
val kafkaSink = FlinkKafkaProducer
.builder()
.setTopicSelector(new TopicSelector[SalesResult]() {
override def apply(record: SalesResult): String = "realtime_sales_stats"
})
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setKafkaProducerConfig(producerConfig)
.build()
容错机制
// 自定义容错处理
class OrderProcessFunction extends ProcessFunction[OrderEvent, SalesResult] {
private lazy val salesState = getRuntimeContext.getState(
new ValueStateDescriptor[Double]("sales", classOf[Double])
)
override def processElement(order: OrderEvent,
ctx: ProcessFunction[OrderEvent, SalesResult]#Context,
out: Collector[SalesResult]): Unit = {
try {
val currentSales = Option(salesState.value()).getOrElse(0.0)
val newSales = currentSales + order.amount
salesState.update(newSales)
out.collect(SalesResult(order.categoryId, newSales, ctx.timestamp()))
} catch {
case ex: Exception =>
// 记录异常数据到侧输出流
ctx.output(errorOutputTag, OrderError(order, ex.getMessage))
}
}
}
监控告警体系
1. 关键指标监控
# 实时监控指标
class RealtimeMetrics:
def __init__(self):
self.prometheus_client = PrometheusClient()
def monitor_latency(self):
"""监控端到端延迟"""
# 从Kafka消费延迟
kafka_lag = self.get_kafka_consumer_lag()
# Flink处理延迟
flink_latency = self.get_flink_processing_latency()
# ClickHouse写入延迟
clickhouse_latency = self.get_clickhouse_insert_latency()
total_latency = kafka_lag + flink_latency + clickhouse_latency
# 延迟告警(超过5秒)
if total_latency > 5000:
self.send_alert(f"实时链路延迟异常: {total_latency}ms")
def monitor_throughput(self):
"""监控吞吐量"""
current_qps = self.get_current_qps()
expected_qps = self.get_expected_qps()
# 吞吐量告警(低于预期50%)
if current_qps < expected_qps * 0.5:
self.send_alert(f"实时处理吞吐量异常: {current_qps} < {expected_qps}")
2. 数据一致性校验
-- 实时与离线数据一致性校验
WITH realtime_stats AS (
SELECT
toDate(create_time) as dt,
category_id,
SUM(total_amount) as rt_amount,
SUM(order_count) as rt_count
FROM realtime_sales_stats
WHERE dt = today()
GROUP BY dt, category_id
),
offline_stats AS (
SELECT
dt,
category_id,
SUM(amount) as off_amount,
COUNT(*) as off_count
FROM dws_product_category_daycount
WHERE dt = today()
GROUP BY dt, category_id
)
SELECT
r.dt,
r.category_id,
r.rt_amount,
o.off_amount,
ABS(r.rt_amount - o.off_amount) as diff_amount,
ABS(r.rt_amount - o.off_amount) / o.off_amount as diff_rate
FROM realtime_stats r
FULL OUTER JOIN offline_stats o
ON r.dt = o.dt AND r.category_id = o.category_id
WHERE diff_rate > 0.05; -- 差异率超过5%告警
性能优化要点
- 并行度调优: 根据数据量和资源合理设置并行度
- 状态后端: 选择RocksDB存储大状态数据
- 序列化: 使用Avro/Protobuf提升序列化性能
- 窗口优化: 合理选择窗口类型和大小
- 资源隔离: 不同优先级任务分离部署
总结
以上回答涵盖了电商数仓的核心技术点,从数据倾斜处理到实时计算,每个问题都结合了具体的业务场景和技术实现。在实际面试中,建议:
- 结合具体项目经验 - 不要只讲理论,要有具体的数据量、性能指标
- 突出业务价值 - 技术服务于业务,要体现技术方案的业务价值
- 展示问题解决能力 - 遇到的问题、分析思路、解决方案、效果验证
- 体现技术深度 - 不仅要会用,还要理解原理,能够进行优化调优
面试官通常会根据你的回答继续深入追问,所以每个技术点都要做好深挖的准备。
Comments