1 介绍自己,讲几个你熟悉的项目
2 数据倾斜怎么处理
3 spark的宽窄依赖
4 数仓模型分层 分层有啥好处
5 有了解过画像吗,自己怎么实现的,有运用算法吗
6 数仓数据质量监控和数据治理怎么实现
7 有做过实时吗,实时怎么实现数据不延迟,如果稳定可靠产出实时指标

面试问题详解(电商场景为例)

1. 自我介绍和项目经验

项目背景: 我负责某电商平台的数据仓库建设,日处理订单数据500万+,用户行为数据10亿+条。

核心项目:

项目一:电商实时数仓建设

  • 技术栈: Flink + Kafka + ClickHouse + Hive
  • 业务场景: 实现商品销售、用户行为、库存变化的实时监控
  • 核心成果: 实时指标延迟从30分钟降低到秒级,支撑双11等大促活动

项目二:用户画像系统

  • 技术栈: Spark + HBase + Redis + TensorFlow
  • 业务价值: 构建360度用户标签体系,CTR提升25%,个性化推荐准确率提升30%

项目三:数据质量治理平台

  • 解决痛点: 从人工发现问题到自动化监控,数据质量问题发现时间从小时级降到分钟级

2. 数据倾斜处理方案

电商场景中的数据倾斜问题

典型场景:

  • 热门商品订单数据倾斜(爆款商品订单量是普通商品的1000倍)
  • 用户行为数据倾斜(头部用户行为数据占总量60%)
  • 地域维度倾斜(一线城市订单量远超其他地区)

解决方案

1. 预聚合(适用于订单统计场景)

-- 问题:某爆款商品订单量巨大导致倾斜
-- 解决:先按商品+小时预聚合,再汇总到天
WITH hourly_agg AS (
  SELECT 
    product_id, 
    DATE_FORMAT(order_time, 'yyyy-MM-dd-HH') as hour,
    COUNT(*) as order_cnt,
    SUM(amount) as total_amount
  FROM orders 
  WHERE dt = '2024-01-01'
  GROUP BY product_id, DATE_FORMAT(order_time, 'yyyy-MM-dd-HH')
)
SELECT 
  product_id,
  SUM(order_cnt) as daily_orders,
  SUM(total_amount) as daily_amount
FROM hourly_agg
GROUP BY product_id

2. 加盐法(适用于用户行为分析)

-- 问题:头部用户行为数据倾斜
-- 解决:给用户ID加随机盐值,分散计算
SELECT 
  user_id,
  action_type,
  COUNT(*) as action_cnt
FROM (
  SELECT 
    user_id,
    action_type,
    CONCAT(user_id, '_', FLOOR(RAND() * 100)) as salted_user_id
  FROM user_behavior 
  WHERE dt = '2024-01-01'
) t
GROUP BY user_id, action_type

3. 两阶段聚合

-- 第一阶段:加盐分散
-- 第二阶段:去盐汇总

4. 调优参数

spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=64MB

3. Spark宽窄依赖

定义对比

依赖类型 定义 数据传输 容错性
窄依赖 父RDD每个分区最多被一个子RDD分区使用 无需Shuffle 容错性好
宽依赖 父RDD分区被多个子RDD分区使用 需要Shuffle 容错性差

电商场景实例

窄依赖示例

// 用户行为数据过滤和映射 - 窄依赖
val userBehavior = spark.read.table("user_behavior")
val filteredData = userBehavior
  .filter($"action_type" === "click")  // 窄依赖:filter
  .map(row => (row.getString("user_id"), 1))  // 窄依赖:map

宽依赖示例

// 订单金额统计 - 宽依赖
val orderStats = orders
  .groupBy("user_id")  // 宽依赖:groupBy需要Shuffle
  .agg(
    sum("amount").as("total_amount"),
    count("*").as("order_count")
  )
  .join(users, "user_id")  // 宽依赖:join需要Shuffle

Stage划分规则

  • Stage边界: 宽依赖操作会导致Stage切分

电商ETL流程:

数据读取 → filter/map → Stage1     ↓ (Shuffle)groupBy/join → 结果输出 → Stage2

4. 数仓分层设计

电商数仓分层架构

ODS层(操作数据存储层)

职责: 原始数据存储,保持与业务系统一致

-- 订单表
CREATE TABLE ods_order (
  order_id STRING,
  user_id STRING, 
  product_id STRING,
  amount DECIMAL(10,2),
  order_time TIMESTAMP,
  status STRING,
  dt STRING  -- 分区字段
) PARTITIONED BY (dt)

DWD层(数据明细层)

职责: 数据清洗、标准化,业务过程建模

-- 清洗后的订单明细
CREATE TABLE dwd_order_detail (
  order_id STRING,
  user_id STRING,
  product_id STRING,
  category_id STRING,
  brand_id STRING,
  amount DECIMAL(10,2),
  quantity INT,
  order_time TIMESTAMP,
  pay_time TIMESTAMP,
  cancel_time TIMESTAMP,
  order_status STRING,
  dt STRING
) PARTITIONED BY (dt)

DWS层(数据服务层/轻度汇总层)

职责: 按主题域进行轻度汇总

-- 用户日粒度统计
CREATE TABLE dws_user_action_daycount (
  user_id STRING,
  login_count BIGINT,
  cart_count BIGINT, 
  order_count BIGINT,
  payment_count BIGINT,
  total_amount DECIMAL(10,2),
  dt STRING
) PARTITIONED BY (dt)

ADS层(数据应用层)

职责: 面向具体应用场景的数据集市

-- 商品销售报表
CREATE TABLE ads_product_sale_report (
  product_id STRING,
  product_name STRING,
  category_name STRING,
  brand_name STRING,
  sale_count BIGINT,
  sale_amount DECIMAL(10,2),
  profit_amount DECIMAL(10,2),
  dt STRING
) PARTITIONED BY (dt)

分层好处

1. 职责清晰

  • ODS: 数据接入,保持原始性
  • DWD: 数据清洗,统一标准
  • DWS: 公共指标,复用性强
  • ADS: 业务应用,直接服务

2. 降低复杂度

-- 不分层:复杂的一次性查询
SELECT 
  p.category_name,
  SUM(o.amount) as total_sales
FROM ods_order o
JOIN ods_product p ON o.product_id = p.product_id
WHERE o.status = 'paid' 
  AND o.dt = '2024-01-01'
GROUP BY p.category_name;

-- 分层:简单的层次查询
SELECT category_name, total_sales 
FROM ads_category_sale_report 
WHERE dt = '2024-01-01';

3. 提升性能

  • 预计算: DWS层预聚合常用指标
  • 减少重复: 避免重复的数据清洗和关联
  • 分区裁剪: 合理的分区设计

4. 数据治理

  • 血缘关系: 清晰的数据流向
  • 影响分析: 上游变更影响评估
  • 质量监控: 分层设置质量检查点

5. 用户画像系统

电商用户画像架构

标签体系设计

用户画像标签体系
├── 人口属性标签
│   ├── 性别、年龄、城市等级
│   └── 职业、收入水平、教育程度
├── 行为属性标签  
│   ├── 访问频次、停留时长、浏览深度
│   └── 购买频次、客单价、复购周期
├── 消费偏好标签
│   ├── 品类偏好、品牌偏好、价格敏感度
│   └── 促销敏感度、新品偏好度
└── 风险控制标签
    ├── 信用等级、退货率、投诉率
    └── 黄牛识别、羊毛党识别

实现架构

# 特征工程示例
class UserFeatureEngineer:
    def __init__(self):
        self.spark = SparkSession.builder.appName("UserProfile").getOrCreate()
    
    def extract_behavior_features(self, user_id):
        """提取行为特征"""
        # 最近30天行为统计
        behavior_sql = f"""
        SELECT 
            user_id,
            COUNT(CASE WHEN action_type='click' THEN 1 END) as click_cnt_30d,
            COUNT(CASE WHEN action_type='cart' THEN 1 END) as cart_cnt_30d,
            COUNT(CASE WHEN action_type='purchase' THEN 1 END) as purchase_cnt_30d,
            AVG(session_duration) as avg_session_duration,
            MAX(CASE WHEN action_type='purchase' THEN create_time END) as last_purchase_time
        FROM user_behavior 
        WHERE user_id = '{user_id}' 
        AND dt >= date_sub(current_date(), 30)
        GROUP BY user_id
        """
        return self.spark.sql(behavior_sql)
    
    def extract_preference_features(self, user_id):
        """提取偏好特征"""
        preference_sql = f"""
        SELECT 
            user_id,
            -- 品类偏好(TF-IDF)
            category_preference_vector,
            -- 价格偏好
            AVG(amount) as avg_order_amount,
            STDDEV(amount) as price_variance,
            -- 时间偏好
            HOUR(order_time) as prefer_hour,
            DAYOFWEEK(order_time) as prefer_day
        FROM dwd_order_detail 
        WHERE user_id = '{user_id}'
        AND dt >= date_sub(current_date(), 90)
        """
        return self.spark.sql(preference_sql)

算法应用

1. 聚类算法(K-means)
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

# 用户价值分层
def user_value_clustering():
    # 特征向量化
    assembler = VectorAssembler(
        inputCols=["recency", "frequency", "monetary"], 
        outputCol="features"
    )
    
    # K-means聚类
    kmeans = KMeans(k=5, seed=1)
    model = kmeans.fit(user_rfm_features)
    
    # 用户分群:高价值、中价值、低价值、新用户、流失用户
    return model.transform(user_rfm_features)
2. 协同过滤
from pyspark.ml.recommendation import ALS

# 商品推荐
def collaborative_filtering():
    als = ALS(
        maxIter=10, 
        regParam=0.1, 
        userCol="user_id", 
        itemCol="product_id", 
        ratingCol="rating"
    )
    model = als.fit(user_item_rating)
    return model.recommendForAllUsers(10)
3. 特征工程
-- 时间窗口特征
WITH user_metrics AS (
  SELECT 
    user_id,
    -- 7天特征
    SUM(CASE WHEN dt >= date_sub(current_date(), 7) THEN amount ELSE 0 END) as amount_7d,
    -- 30天特征  
    SUM(CASE WHEN dt >= date_sub(current_date(), 30) THEN amount ELSE 0 END) as amount_30d,
    -- 趋势特征
    amount_7d / NULLIF(amount_30d, 0) as recent_trend
  FROM dwd_order_detail
  GROUP BY user_id
)

标签更新策略

  • 实时标签: Flink处理,秒级更新(如当前浏览商品)
  • 准实时标签: 每小时更新(如今日消费金额)
  • 离线标签: T+1更新(如消费偏好、生命周期)

6. 数据质量监控和治理

电商数仓质量监控体系

质量监控维度

1. 完整性监控
-- 订单数据完整性检查
CREATE VIEW data_quality_completeness AS
SELECT 
  dt,
  'order' as table_name,
  COUNT(*) as total_records,
  COUNT(order_id) as non_null_order_id,
  COUNT(user_id) as non_null_user_id,
  COUNT(amount) as non_null_amount,
  -- 完整性率
  COUNT(order_id) / COUNT(*) as order_id_completeness_rate,
  COUNT(user_id) / COUNT(*) as user_id_completeness_rate
FROM ods_order
WHERE dt = '${bizdate}'
GROUP BY dt;
2. 一致性监控
-- 跨表一致性检查:订单总数与支付总数
WITH order_count AS (
  SELECT dt, COUNT(*) as order_cnt
  FROM ods_order 
  WHERE status IN ('paid', 'shipped', 'delivered')
  AND dt = '${bizdate}'
  GROUP BY dt
),
payment_count AS (
  SELECT dt, COUNT(*) as payment_cnt  
  FROM ods_payment
  WHERE status = 'success'
  AND dt = '${bizdate}'
  GROUP BY dt
)
SELECT 
  o.dt,
  o.order_cnt,
  p.payment_cnt,
  ABS(o.order_cnt - p.payment_cnt) as diff_cnt,
  ABS(o.order_cnt - p.payment_cnt) / o.order_cnt as diff_rate
FROM order_count o
JOIN payment_count p ON o.dt = p.dt;
3. 准确性监控
-- 业务规则检查
SELECT 
  dt,
  COUNT(*) as total_orders,
  -- 异常数据统计
  COUNT(CASE WHEN amount <= 0 THEN 1 END) as negative_amount_cnt,
  COUNT(CASE WHEN quantity <= 0 THEN 1 END) as negative_quantity_cnt,
  COUNT(CASE WHEN order_time > pay_time THEN 1 END) as time_logic_error_cnt,
  -- 异常率
  COUNT(CASE WHEN amount <= 0 THEN 1 END) / COUNT(*) as negative_amount_rate
FROM dwd_order_detail  
WHERE dt = '${bizdate}'
GROUP BY dt;
4. 时效性监控
# 数据时效性监控
def monitor_data_timeliness():
    current_time = datetime.now()
    
    # 检查各层数据更新时间
    layers = ['ods', 'dwd', 'dws', 'ads']
    for layer in layers:
        latest_partition = get_latest_partition(f"{layer}_order")
        delay_hours = (current_time - latest_partition).total_seconds() / 3600
        
        # 告警阈值
        if delay_hours > get_sla_threshold(layer):
            send_alert(f"{layer}层数据延迟{delay_hours}小时")

治理实施方案

1. 元数据管理
{
  "table_name": "dwd_order_detail",
  "business_owner": "电商业务部",
  "technical_owner": "数据平台团队",
  "update_frequency": "daily",
  "data_source": ["ods_order", "ods_product", "ods_user"],
  "quality_rules": [
    {
      "rule_name": "amount_positive",
      "rule_sql": "amount > 0",
      "severity": "high"
    }
  ],
  "lineage": {
    "upstream": ["ods_order", "ods_product"],
    "downstream": ["dws_user_action_daycount", "ads_product_sale_report"]
  }
}
2. 自动化质量检查
class DataQualityFramework:
    def __init__(self):
        self.rules = self.load_quality_rules()
        
    def run_quality_check(self, table_name, partition):
        results = []
        for rule in self.rules.get(table_name, []):
            try:
                # 执行质量规则
                result = self.execute_quality_rule(rule, table_name, partition)
                results.append(result)
                
                # 严重问题阻断后续流程
                if result['severity'] == 'high' and result['pass_rate'] < 0.95:
                    raise DataQualityException(f"表{table_name}质量检查失败")
                    
            except Exception as e:
                self.log_error(f"质量检查异常: {str(e)}")
                
        return results
3. 血缘关系管理
-- 血缘关系表
CREATE TABLE data_lineage (
  upstream_table STRING,
  downstream_table STRING, 
  dependency_type STRING,  -- direct, indirect
  create_time TIMESTAMP,
  update_time TIMESTAMP
);

-- 影响分析查询
WITH RECURSIVE lineage_tree AS (
  -- 直接下游
  SELECT downstream_table, 1 as level
  FROM data_lineage  
  WHERE upstream_table = 'ods_order'
  
  UNION ALL
  
  -- 递归查询间接下游
  SELECT dl.downstream_table, lt.level + 1
  FROM data_lineage dl
  JOIN lineage_tree lt ON dl.upstream_table = lt.downstream_table
  WHERE lt.level < 5  -- 防止无限递归
)
SELECT * FROM lineage_tree ORDER BY level;

7. 实时数据处理

电商实时数仓架构

整体架构

业务系统 → Binlog → Canal → Kafka → Flink → ClickHouse
                                  ↓
                              Redis/HBase
                                  ↓  
                            实时大屏/告警

核心技术方案

1. 数据采集 - Canal + Kafka
# Canal配置
canal.instance.mysql.slaveId=1234
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=mysql-bin.000001
canal.instance.master.position=107

# Kafka Topic设计
order_binlog_topic:
  partitions: 12  # 根据并发度设计
  replication-factor: 3
  key: order_id   # 保证同一订单有序
// 订单实时统计
object OrderRealtimeAnalysis {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
    // 数据源
    val orderStream = env
      .addSource(new FlinkKafkaConsumer[String]("order_binlog", new SimpleStringSchema(), properties))
      .map(parseOrderEvent)
      .assignTimestampsAndWatermarks(new OrderTimestampExtractor())
    
    // 实时销售金额统计(5分钟窗口)
    val salesStats = orderStream
      .filter(_.eventType == "INSERT") // 新订单
      .keyBy(_.categoryId)
      .window(TumblingEventTimeWindows.of(Time.minutes(5)))
      .aggregate(new SalesAggregateFunction())
    
    // 输出到ClickHouse
    salesStats.addSink(new ClickHouseSink())
    
    env.execute("Order Realtime Analysis")
  }
}

// 聚合函数
class SalesAggregateFunction extends AggregateFunction[OrderEvent, SalesAccumulator, SalesResult] {
  override def add(order: OrderEvent, acc: SalesAccumulator): SalesAccumulator = {
    acc.totalAmount += order.amount
    acc.orderCount += 1
    acc
  }
  
  override def getResult(acc: SalesAccumulator): SalesResult = {
    SalesResult(acc.totalAmount, acc.orderCount, acc.totalAmount / acc.orderCount)
  }
}
3. 延迟控制策略
Watermark策略
// 自适应Watermark
class AdaptiveTimestampExtractor extends AssignerWithPeriodicWatermarks[OrderEvent] {
  private val maxOutOfOrderness = 30000L // 30秒
  private var currentMaxTimestamp = 0L
  
  override def extractTimestamp(element: OrderEvent, previousElementTimestamp: Long): Long = {
    val timestamp = element.orderTime
    currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
    timestamp
  }
  
  override def getCurrentWatermark: Watermark = {
    new Watermark(currentMaxTimestamp - maxOutOfOrderness)
  }
}
背压控制
# Flink配置优化
parallelism.default: 12
taskmanager.memory.process.size: 4g
taskmanager.memory.managed.fraction: 0.6

# Checkpoint配置
state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
execution.checkpointing.interval: 60000  # 1分钟
execution.checkpointing.mode: EXACTLY_ONCE
4. 可靠性保障
Exactly Once语义
// Kafka Sink配置
val kafkaSink = FlinkKafkaProducer
  .builder()
  .setTopicSelector(new TopicSelector[SalesResult]() {
    override def apply(record: SalesResult): String = "realtime_sales_stats"
  })
  .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
  .setKafkaProducerConfig(producerConfig)
  .build()
容错机制
// 自定义容错处理
class OrderProcessFunction extends ProcessFunction[OrderEvent, SalesResult] {
  private lazy val salesState = getRuntimeContext.getState(
    new ValueStateDescriptor[Double]("sales", classOf[Double])
  )
  
  override def processElement(order: OrderEvent, 
                            ctx: ProcessFunction[OrderEvent, SalesResult]#Context, 
                            out: Collector[SalesResult]): Unit = {
    try {
      val currentSales = Option(salesState.value()).getOrElse(0.0)
      val newSales = currentSales + order.amount
      salesState.update(newSales)
      
      out.collect(SalesResult(order.categoryId, newSales, ctx.timestamp()))
      
    } catch {
      case ex: Exception =>
        // 记录异常数据到侧输出流
        ctx.output(errorOutputTag, OrderError(order, ex.getMessage))
    }
  }
}

监控告警体系

1. 关键指标监控
# 实时监控指标
class RealtimeMetrics:
    def __init__(self):
        self.prometheus_client = PrometheusClient()
    
    def monitor_latency(self):
        """监控端到端延迟"""
        # 从Kafka消费延迟
        kafka_lag = self.get_kafka_consumer_lag()
        
        # Flink处理延迟  
        flink_latency = self.get_flink_processing_latency()
        
        # ClickHouse写入延迟
        clickhouse_latency = self.get_clickhouse_insert_latency()
        
        total_latency = kafka_lag + flink_latency + clickhouse_latency
        
        # 延迟告警(超过5秒)
        if total_latency > 5000:
            self.send_alert(f"实时链路延迟异常: {total_latency}ms")
    
    def monitor_throughput(self):
        """监控吞吐量"""
        current_qps = self.get_current_qps()
        expected_qps = self.get_expected_qps()
        
        # 吞吐量告警(低于预期50%)
        if current_qps < expected_qps * 0.5:
            self.send_alert(f"实时处理吞吐量异常: {current_qps} < {expected_qps}")
2. 数据一致性校验
-- 实时与离线数据一致性校验
WITH realtime_stats AS (
  SELECT 
    toDate(create_time) as dt,
    category_id,
    SUM(total_amount) as rt_amount,
    SUM(order_count) as rt_count
  FROM realtime_sales_stats
  WHERE dt = today()
  GROUP BY dt, category_id
),
offline_stats AS (
  SELECT 
    dt,
    category_id, 
    SUM(amount) as off_amount,
    COUNT(*) as off_count
  FROM dws_product_category_daycount
  WHERE dt = today()
  GROUP BY dt, category_id
)
SELECT 
  r.dt,
  r.category_id,
  r.rt_amount,
  o.off_amount,
  ABS(r.rt_amount - o.off_amount) as diff_amount,
  ABS(r.rt_amount - o.off_amount) / o.off_amount as diff_rate
FROM realtime_stats r
FULL OUTER JOIN offline_stats o 
ON r.dt = o.dt AND r.category_id = o.category_id
WHERE diff_rate > 0.05;  -- 差异率超过5%告警

性能优化要点

  1. 并行度调优: 根据数据量和资源合理设置并行度
  2. 状态后端: 选择RocksDB存储大状态数据
  3. 序列化: 使用Avro/Protobuf提升序列化性能
  4. 窗口优化: 合理选择窗口类型和大小
  5. 资源隔离: 不同优先级任务分离部署

总结

以上回答涵盖了电商数仓的核心技术点,从数据倾斜处理到实时计算,每个问题都结合了具体的业务场景和技术实现。在实际面试中,建议:

  1. 结合具体项目经验 - 不要只讲理论,要有具体的数据量、性能指标
  2. 突出业务价值 - 技术服务于业务,要体现技术方案的业务价值
  3. 展示问题解决能力 - 遇到的问题、分析思路、解决方案、效果验证
  4. 体现技术深度 - 不仅要会用,还要理解原理,能够进行优化调优

面试官通常会根据你的回答继续深入追问,所以每个技术点都要做好深挖的准备。