1.有了解过什么计算引擎的源码是怎么实现的吗?
2.flink 的处理框架和spark 的处理框架的区别
3.场景题:
购物车页面推荐:在用户下单时,统计用户一年的消费情况,以便于在购物车页面做出合理的推荐,这个怎么处理呢?你对这个问题是怎么理解的?
4.广播是如何实现的?
5.rdd 具体是如何实现的
6.spark 里边 partition是什么?
7.怎么去识别一个 sparksql 任务是否有倾斜或者需要优化?
8.会用 sparkui 吗10.对 jvm 的理解
9.堆内内存和堆在内存的理解

答案如下

1. 计算引擎源码实现解析(以Spark为例)

考察知识点

Spark核心组件(RDD、DAGScheduler、TaskScheduler)的源码结构、关键类与方法、核心流程(如任务提交、Stage划分)的实现逻辑

参考回答

以Spark 3.x为例,其核心计算逻辑集中在spark-core模块,核心源码实现可拆解为RDD抽象层任务调度层执行层三部分:

(1)RDD抽象层的源码实现

RDD(弹性分布式数据集)是Spark的核心数据结构,源码定义在org.apache.spark.rdd.RDD类中,核心属性与方法如下:

  • 核心方法
    1. compute(split: Partition, context: TaskContext): Iterator[T]:抽象方法,由子类实现(如MapPartitionsRDD实现数据转换);
    2. getPartitions: Array[Partition]:返回RDD的分区列表(如HadoopRDD返回HadoopPartition数组);
    3. getDependencies: Seq[Dependency[_]]:返回依赖关系(窄依赖NarrowDependency或宽依赖ShuffleDependency);
    4. partitioner: Option[Partitioner]:定义分区策略(影响Shuffle时数据分布)。

核心属性(构造函数参数):

abstract class RDD[T: ClassTag](
  @transient private val _sc: SparkContext,
  @transient private val deps: Seq[Dependency[_]]  // 依赖关系列表
) extends Serializable with Logging {
  val id: Int = sc.newRddId()  // RDD唯一标识
  private var partitioner: Option[Partitioner] = None  // 分区器(如HashPartitioner)
  private var dependencies_ : Seq[Dependency[_]] = _  // 依赖关系缓存
  // 其他属性:首选位置、检查点目录等
}

(2)任务调度层的源码实现

DAGSchedulerorg.apache.spark.scheduler.DAGScheduler)负责Stage划分与任务提交,核心流程:

  1. 接收任务:通过runJob方法接收行动操作(如collect)提交的任务;
  2. Stage划分:调用getOrCreateShuffleMapStagecreateResultStage,基于宽窄依赖回溯划分Stage(见问题8/9的算法实现);
  3. 提交Stage:通过submitStage方法递归提交Stage(先提交依赖的父Stage,再提交当前Stage);
  4. 生成Task:为每个Stage生成ShuffleMapTask(中间结果)或ResultTask(最终结果),封装到TaskSet中。

TaskSchedulerorg.apache.spark.scheduler.TaskScheduler)负责Task分配与执行,核心实现:

  • 接口定义:submitTasks(taskSet: TaskSet)接收任务集,cancelTasks(stageId: Int)取消任务;
  • 具体实现(TaskSchedulerImpl):
    • SchedulerBackend(如YarnSchedulerBackend)与集群管理器通信;
    • 通过TaskSetManager管理TaskSet,监控Task运行状态(成功/失败/重试);
    • 按 locality 级别(PROCESS_LOCAL > NODE_LOCAL > RACK_LOCAL)优先调度Task,减少数据传输。

(3)执行层的源码实现

Executororg.apache.spark.executor.Executor)负责执行Task,核心逻辑:

  1. 接收LaunchTask消息,反序列化TaskDescription
  2. 处理Shuffle:通过ShuffleManager(如SortShuffleManager)完成Map端写入(ShuffleWriter)和Reduce端读取(ShuffleReader)。

创建TaskRunner线程执行Task:

class TaskRunner(...) extends Runnable {
  override def run(): Unit = {
    val task = ser.deserialize[Task[Any]](taskDescription.serializedTask)  // 反序列化Task
    val result = task.run(...)  // 执行Task(调用RDD.compute方法)
    sendResultToDriver(result)  // 将结果发送给Driver
  }
}

补充回答注意要点

  1. 核心看依赖关系处理Shuffle机制ShuffleDependencyshuffleId绑定Stage,SortShuffleWriter通过临时文件写入中间结果;
  2. 源码阅读入口:从SparkContext.runJob追踪到DAGScheduler.submitJob,再到TaskScheduler.submitTasks,最后到Executor执行;
  3. 关键优化点:Tungsten内存管理(MemoryManager)、WholeStageCodegen(将多个RDD操作合并为单个Java方法)。

2. Flink与Spark处理框架的区别

考察知识点

处理模型(批/流)、数据模型、执行机制、时间语义、状态管理、容错机制的核心差异

参考回答

Flink与Spark作为主流大数据计算引擎,核心差异体现在流处理能力执行模型上,具体对比如下:

维度 Spark Flink
核心定位 批处理优先,流处理基于微批(Micro-Batch) 流处理优先,批处理作为流的特例(DataSet是有界流)
数据模型 RDD(不可变分区集合)、DataFrame(结构化数据) DataStream(无界流)、DataSet(有界流)、Table(动态表)
执行机制 基于DAG的Stage划分,Task按Stage串行执行 基于Operator Chain的连续流执行,Operator间直接传输数据
时间语义 仅支持处理时间(Processing Time) 支持处理时间、事件时间(Event Time)、摄入时间(Ingestion Time),内置Watermark处理迟到数据
状态管理 无原生状态管理,依赖外部存储(如Redis) 内置状态后端(MemoryStateBackend、RocksDBStateBackend),支持状态快照与增量 checkpoint
容错机制 基于RDD血统(Lineage)重算,Checkpoint保存元数据 基于Chandy-Lamport算法的异步Checkpoint,保存全量/增量状态,恢复速度快
延迟与吞吐量 微批延迟(秒级),吞吐量大 低延迟(毫秒级),吞吐量接近Spark(优化后)
适用场景 批处理(ETL、数据分析)、离线机器学习 实时数据处理(实时报表、风控)、流机器学习、事件驱动应用

核心技术差异解析

  1. 流处理模型
    • Spark Streaming将流数据切分为微批(默认100ms一批),本质是“批处理模拟流”,延迟无法突破批大小;
    • Flink采用“真正的流处理”,数据逐条处理(或按极小缓冲处理),支持毫秒级延迟,通过Watermark解决事件时间乱序问题。
  2. 执行计划
    • Spark将算子按Stage拆分,Stage间通过Shuffle传输数据,Stage内Task并行执行;
    • Flink将相邻算子链化(Operator Chaining)为一个Task,数据在内存中直接传递(无序列化),减少开销。
  3. 状态管理
    • Spark Streaming的状态需手动维护(如updateStateByKey),容错依赖Checkpoint+重算,大状态性能差;
    • Flink的状态由框架管理,支持Keyed State(按Key分区)和Operator State(算子级状态),RocksDB后端支持TB级状态。

补充回答注意要点

  1. 避免“非此即彼”:批处理优先选Spark(生态成熟),实时处理选Flink(低延迟+状态管理);
  2. 版本演进:Spark 3.0+引入Structured Streaming的Continuous Processing(低延迟模式),但功能有限;Flink 1.12+优化批处理性能,缩小与Spark差距;
  3. 核心指标:流处理延迟(Flink < 100ms,Spark Streaming ~ 1s),批处理TPC-DS性能(Spark略优)。

3. 购物车页面推荐场景解决方案

考察知识点

数据采集与存储、离线+实时计算架构、用户画像构建、推荐算法选型、工程落地考量

参考回答

用户下单时的购物车推荐需结合历史消费数据实时行为,核心目标是“提升关联购买率”,整体方案分5步:

(1)数据采集与存储

  • 采集内容
    • 历史数据:用户近1年的订单(金额、时间、商品ID、品类)、浏览/加购记录、评价标签;
    • 实时数据:当前购物车商品、会话行为(如最近点击的商品)。
  • 存储方案
    • 历史数据:订单表(Hive/ClickHouse,按用户ID分区)、行为日志(HDFS/对象存储,Parquet格式);
    • 实时数据:Kafka(接收购物车变更事件)、Redis(缓存用户实时会话)。

(2)用户消费情况统计(离线计算)

通过Spark/Flink批处理计算用户特征,核心维度:

  1. 消费能力:年消费总额、平均客单价、最大单笔金额(划分高/中/低消费层级);
  2. 消费频率:下单次数、复购率(30天内再次购买同一品类的比例)、活跃时段(如周末晚上);
  3. 品类偏好:购买次数TOP3品类、该品类的平均消费占比(如70%消费集中在母婴品类);
  4. 价格敏感度:购买商品的价格区间分布(如80%商品在50-200元)。
  • 计算周期:T+1更新(每日凌晨计算前一天数据,合并到全年统计中);
  • 存储:用户特征表(HBase/MySQL,Key=用户ID,Value=特征JSON)。

(3)实时推荐计算

当用户打开购物车时,触发实时推荐:

  1. 召回层(快速筛选候选商品):
    • 基于历史品类偏好:召回用户TOP3品类的关联商品(如买过奶粉的用户召回 diapers);
    • 基于当前购物车:用关联规则(如“购买面包的用户70%会买牛奶”)召回互补品;
    • 热度过滤:排除用户近30天已购买的商品,加入近期热销品(TOP50)。
  2. 排序层(精准打分):
    • 特征:用户对候选商品的历史点击率、商品与当前购物车的匹配度、价格是否在用户偏好区间;
    • 模型:轻量LR模型(实时性优先)或深度学习模型(如Wide&Deep,离线训练+在线预测);
    • 输出:TOP5推荐商品,按“点击转化率”降序排列。

(4)工程落地架构

数据采集 → Kafka → Flink实时处理 → Redis(实时特征)
                          ↓
HDFS/Hive(历史数据) → Spark离线计算 → HBase(用户特征库)
                          ↓
                      推荐模型训练(离线)
                          ↓
购物车请求 → API服务 → 实时推荐引擎(召回+排序) → 返回推荐结果

(5)优化策略

  • 冷启动:新用户用热门商品+地域偏好(如北京用户推荐羽绒服);
  • 时效性:大促期间(如双11)每小时更新一次热门商品,非大促每日更新;
  • AB测试:同时运行“品类关联”和“协同过滤”两种算法,按点击率选择最优方案。

补充回答注意要点

  1. 核心是“历史统计+实时触发”结合,避免纯离线推荐的滞后性;
  2. 工程上需平衡性能:召回层控制候选商品数量(<100),排序模型用轻量级算法(响应时间<100ms);
  3. 业务指标:关注“推荐商品点击率”“加购转化率”,持续迭代特征与模型。

4. 广播机制的实现(Spark与Flink)

考察知识点

广播变量的原理(序列化、分发、缓存)、Spark Broadcast与Flink BroadcastState的差异、适用场景

参考回答

广播(Broadcast)是将小数据集高效分发到所有计算节点的机制,避免冗余传输,Spark与Flink的实现各有侧重:

(1)Spark Broadcast的实现

  • 核心类org.apache.spark.broadcast.Broadcast,具体实现有TorrentBroadcast(默认)和HttpBroadcast
  • 实现流程
    1. 序列化:Driver将广播数据(如小表)序列化为字节数组,分成多个块(默认4MB/块);
    2. 分布式存储:Driver保留一个块,其他块通过BitTorrent协议分发到Executor(每个Executor缓存部分块,避免Driver单点压力);
    3. Executor获取:Executor需要时,从Driver或其他Executor下载缺失的块,合并后反序列化,缓存在内存(可持久化到磁盘);
    4. 失效机制:任务完成后,广播变量在Executor上的缓存会被清理(可通过unpersist()手动清理)。

代码示例

val sc = spark.sparkContext
val smallTable = sc.parallelize(Seq(("a", 1), ("b", 2))).collectAsMap()
val broadcastVar = sc.broadcast(smallTable)  // 广播小表
largeRDD.map { case (key, value) =>
  (key, value + broadcastVar.value.getOrElse(key, 0))  // Executor读取广播变量
}

Flink的广播用于“流与小表关联”,通过BroadcastState存储广播数据,核心类BroadcastStream

  • 实现流程
    1. 广播流创建:将小表数据包装为BroadcastStream(标记为广播流);
    2. 状态绑定:通过connect方法关联主流与广播流,将广播数据存入BroadcastState(每个TaskManager一个副本,内存或RocksDB存储);
    3. 动态更新:支持广播流数据动态更新(如配置表变更),BroadcastState会自动同步到所有TaskManager;
    4. 关联计算:主流数据与BroadcastState中的数据关联(如用户行为流关联用户画像广播表)。

代码示例

DataStream<User> userStream = ...;  // 主流:用户行为
DataStream<Config> configStream = ...;  // 广播流:配置数据

// 创建广播描述符
MapStateDescriptor<String, Config> configDesc = new MapStateDescriptor<>(...);
BroadcastStream<Config> broadcastConfig = configStream.broadcast(configDesc);

// 关联主流与广播流
DataStream<Result> resultStream = userStream
  .connect(broadcastConfig)
  .process(new BroadcastProcessFunction<User, Config, Result>() {
    @Override
    public void processElement(User user, ...) {
      Config config = broadcastState.get(user.getType());  // 读取广播状态
      // 业务逻辑
    }
  });

(3)核心差异

特性 Spark Broadcast Flink BroadcastState
数据类型 静态数据(广播后不可变) 支持动态更新(广播流可追加/覆盖)
生命周期 与Job绑定(Job结束后销毁) 与流任务生命周期一致(持续运行)
适用场景 批处理中Join小表、传递配置参数 流处理中动态关联配置表、规则引擎
存储位置 Executor内存(可持久化到磁盘) TaskManager内存/RocksDB(大状态)

补充回答注意要点

  1. 广播的核心是“一次分发,多次使用”,适用于数据量小(Spark建议<1GB,Flink建议<100MB)的场景;
  2. Spark广播避免使用collect()后广播(Driver内存压力大),应直接广播RDD;
  3. Flink广播需注意BroadcastState的更新原子性(通过applyToKeyedState保证)。

5. RDD的具体实现

考察知识点

RDD的核心属性、依赖关系实现、Transformation与Action的执行逻辑、容错机制

参考回答

RDD(Resilient Distributed Dataset)是Spark的核心抽象,本质是不可变的分布式元素集合,其实现依赖5个核心属性和依赖关系管理:

This post is for subscribers on the 网站会员 and 成为小万的高级会员 tiers only

Subscribe Now

Already have an account?