1.有了解过什么计算引擎的源码是怎么实现的吗?
2.flink 的处理框架和spark 的处理框架的区别
3.场景题:
购物车页面推荐:在用户下单时,统计用户一年的消费情况,以便于在购物车页面做出合理的推荐,这个怎么处理呢?你对这个问题是怎么理解的?
4.广播是如何实现的?
5.rdd 具体是如何实现的
6.spark 里边 partition是什么?
7.怎么去识别一个 sparksql 任务是否有倾斜或者需要优化?
8.会用 sparkui 吗10.对 jvm 的理解
9.堆内内存和堆在内存的理解
答案如下
1. 计算引擎源码实现解析(以Spark为例)
考察知识点
Spark核心组件(RDD、DAGScheduler、TaskScheduler)的源码结构、关键类与方法、核心流程(如任务提交、Stage划分)的实现逻辑
参考回答
以Spark 3.x为例,其核心计算逻辑集中在spark-core
模块,核心源码实现可拆解为RDD抽象层、任务调度层、执行层三部分:
(1)RDD抽象层的源码实现
RDD(弹性分布式数据集)是Spark的核心数据结构,源码定义在org.apache.spark.rdd.RDD
类中,核心属性与方法如下:
- 核心方法:
compute(split: Partition, context: TaskContext): Iterator[T]
:抽象方法,由子类实现(如MapPartitionsRDD
实现数据转换);getPartitions: Array[Partition]
:返回RDD的分区列表(如HadoopRDD
返回HadoopPartition
数组);getDependencies: Seq[Dependency[_]]
:返回依赖关系(窄依赖NarrowDependency
或宽依赖ShuffleDependency
);partitioner: Option[Partitioner]
:定义分区策略(影响Shuffle时数据分布)。
核心属性(构造函数参数):
abstract class RDD[T: ClassTag](
@transient private val _sc: SparkContext,
@transient private val deps: Seq[Dependency[_]] // 依赖关系列表
) extends Serializable with Logging {
val id: Int = sc.newRddId() // RDD唯一标识
private var partitioner: Option[Partitioner] = None // 分区器(如HashPartitioner)
private var dependencies_ : Seq[Dependency[_]] = _ // 依赖关系缓存
// 其他属性:首选位置、检查点目录等
}
(2)任务调度层的源码实现
DAGScheduler(org.apache.spark.scheduler.DAGScheduler
)负责Stage划分与任务提交,核心流程:
- 接收任务:通过
runJob
方法接收行动操作(如collect
)提交的任务; - Stage划分:调用
getOrCreateShuffleMapStage
和createResultStage
,基于宽窄依赖回溯划分Stage(见问题8/9的算法实现); - 提交Stage:通过
submitStage
方法递归提交Stage(先提交依赖的父Stage,再提交当前Stage); - 生成Task:为每个Stage生成
ShuffleMapTask
(中间结果)或ResultTask
(最终结果),封装到TaskSet
中。
TaskScheduler(org.apache.spark.scheduler.TaskScheduler
)负责Task分配与执行,核心实现:
- 接口定义:
submitTasks(taskSet: TaskSet)
接收任务集,cancelTasks(stageId: Int)
取消任务; - 具体实现(
TaskSchedulerImpl
):- 用
SchedulerBackend
(如YarnSchedulerBackend
)与集群管理器通信; - 通过
TaskSetManager
管理TaskSet,监控Task运行状态(成功/失败/重试); - 按 locality 级别(PROCESS_LOCAL > NODE_LOCAL > RACK_LOCAL)优先调度Task,减少数据传输。
- 用
(3)执行层的源码实现
Executor(org.apache.spark.executor.Executor
)负责执行Task,核心逻辑:
- 接收
LaunchTask
消息,反序列化TaskDescription
; - 处理Shuffle:通过
ShuffleManager
(如SortShuffleManager
)完成Map端写入(ShuffleWriter
)和Reduce端读取(ShuffleReader
)。
创建TaskRunner
线程执行Task:
class TaskRunner(...) extends Runnable {
override def run(): Unit = {
val task = ser.deserialize[Task[Any]](taskDescription.serializedTask) // 反序列化Task
val result = task.run(...) // 执行Task(调用RDD.compute方法)
sendResultToDriver(result) // 将结果发送给Driver
}
}
补充回答注意要点
- 核心看依赖关系处理和Shuffle机制:
ShuffleDependency
的shuffleId
绑定Stage,SortShuffleWriter
通过临时文件写入中间结果; - 源码阅读入口:从
SparkContext.runJob
追踪到DAGScheduler.submitJob
,再到TaskScheduler.submitTasks
,最后到Executor
执行; - 关键优化点:
Tungsten
内存管理(MemoryManager
)、WholeStageCodegen
(将多个RDD操作合并为单个Java方法)。
2. Flink与Spark处理框架的区别
考察知识点
处理模型(批/流)、数据模型、执行机制、时间语义、状态管理、容错机制的核心差异
参考回答
Flink与Spark作为主流大数据计算引擎,核心差异体现在流处理能力和执行模型上,具体对比如下:
维度 | Spark | Flink |
---|---|---|
核心定位 | 批处理优先,流处理基于微批(Micro-Batch) | 流处理优先,批处理作为流的特例(DataSet是有界流) |
数据模型 | RDD(不可变分区集合)、DataFrame(结构化数据) | DataStream(无界流)、DataSet(有界流)、Table(动态表) |
执行机制 | 基于DAG的Stage划分,Task按Stage串行执行 | 基于Operator Chain的连续流执行,Operator间直接传输数据 |
时间语义 | 仅支持处理时间(Processing Time) | 支持处理时间、事件时间(Event Time)、摄入时间(Ingestion Time),内置Watermark处理迟到数据 |
状态管理 | 无原生状态管理,依赖外部存储(如Redis) | 内置状态后端(MemoryStateBackend、RocksDBStateBackend),支持状态快照与增量 checkpoint |
容错机制 | 基于RDD血统(Lineage)重算,Checkpoint保存元数据 | 基于Chandy-Lamport算法的异步Checkpoint,保存全量/增量状态,恢复速度快 |
延迟与吞吐量 | 微批延迟(秒级),吞吐量大 | 低延迟(毫秒级),吞吐量接近Spark(优化后) |
适用场景 | 批处理(ETL、数据分析)、离线机器学习 | 实时数据处理(实时报表、风控)、流机器学习、事件驱动应用 |
核心技术差异解析
- 流处理模型:
- Spark Streaming将流数据切分为微批(默认100ms一批),本质是“批处理模拟流”,延迟无法突破批大小;
- Flink采用“真正的流处理”,数据逐条处理(或按极小缓冲处理),支持毫秒级延迟,通过
Watermark
解决事件时间乱序问题。
- 执行计划:
- Spark将算子按Stage拆分,Stage间通过Shuffle传输数据,Stage内Task并行执行;
- Flink将相邻算子链化(Operator Chaining)为一个Task,数据在内存中直接传递(无序列化),减少开销。
- 状态管理:
- Spark Streaming的状态需手动维护(如
updateStateByKey
),容错依赖Checkpoint+重算,大状态性能差; - Flink的状态由框架管理,支持Keyed State(按Key分区)和Operator State(算子级状态),RocksDB后端支持TB级状态。
- Spark Streaming的状态需手动维护(如
补充回答注意要点
- 避免“非此即彼”:批处理优先选Spark(生态成熟),实时处理选Flink(低延迟+状态管理);
- 版本演进:Spark 3.0+引入Structured Streaming的Continuous Processing(低延迟模式),但功能有限;Flink 1.12+优化批处理性能,缩小与Spark差距;
- 核心指标:流处理延迟(Flink < 100ms,Spark Streaming ~ 1s),批处理TPC-DS性能(Spark略优)。
3. 购物车页面推荐场景解决方案
考察知识点
数据采集与存储、离线+实时计算架构、用户画像构建、推荐算法选型、工程落地考量
参考回答
用户下单时的购物车推荐需结合历史消费数据与实时行为,核心目标是“提升关联购买率”,整体方案分5步:
(1)数据采集与存储
- 采集内容:
- 历史数据:用户近1年的订单(金额、时间、商品ID、品类)、浏览/加购记录、评价标签;
- 实时数据:当前购物车商品、会话行为(如最近点击的商品)。
- 存储方案:
- 历史数据:订单表(Hive/ClickHouse,按用户ID分区)、行为日志(HDFS/对象存储,Parquet格式);
- 实时数据:Kafka(接收购物车变更事件)、Redis(缓存用户实时会话)。
(2)用户消费情况统计(离线计算)
通过Spark/Flink批处理计算用户特征,核心维度:
- 消费能力:年消费总额、平均客单价、最大单笔金额(划分高/中/低消费层级);
- 消费频率:下单次数、复购率(30天内再次购买同一品类的比例)、活跃时段(如周末晚上);
- 品类偏好:购买次数TOP3品类、该品类的平均消费占比(如70%消费集中在母婴品类);
- 价格敏感度:购买商品的价格区间分布(如80%商品在50-200元)。
- 计算周期:T+1更新(每日凌晨计算前一天数据,合并到全年统计中);
- 存储:用户特征表(HBase/MySQL,Key=用户ID,Value=特征JSON)。
(3)实时推荐计算
当用户打开购物车时,触发实时推荐:
- 召回层(快速筛选候选商品):
- 基于历史品类偏好:召回用户TOP3品类的关联商品(如买过奶粉的用户召回 diapers);
- 基于当前购物车:用关联规则(如“购买面包的用户70%会买牛奶”)召回互补品;
- 热度过滤:排除用户近30天已购买的商品,加入近期热销品(TOP50)。
- 排序层(精准打分):
- 特征:用户对候选商品的历史点击率、商品与当前购物车的匹配度、价格是否在用户偏好区间;
- 模型:轻量LR模型(实时性优先)或深度学习模型(如Wide&Deep,离线训练+在线预测);
- 输出:TOP5推荐商品,按“点击转化率”降序排列。
(4)工程落地架构
数据采集 → Kafka → Flink实时处理 → Redis(实时特征)
↓
HDFS/Hive(历史数据) → Spark离线计算 → HBase(用户特征库)
↓
推荐模型训练(离线)
↓
购物车请求 → API服务 → 实时推荐引擎(召回+排序) → 返回推荐结果
(5)优化策略
- 冷启动:新用户用热门商品+地域偏好(如北京用户推荐羽绒服);
- 时效性:大促期间(如双11)每小时更新一次热门商品,非大促每日更新;
- AB测试:同时运行“品类关联”和“协同过滤”两种算法,按点击率选择最优方案。
补充回答注意要点
- 核心是“历史统计+实时触发”结合,避免纯离线推荐的滞后性;
- 工程上需平衡性能:召回层控制候选商品数量(<100),排序模型用轻量级算法(响应时间<100ms);
- 业务指标:关注“推荐商品点击率”“加购转化率”,持续迭代特征与模型。
4. 广播机制的实现(Spark与Flink)
考察知识点
广播变量的原理(序列化、分发、缓存)、Spark Broadcast与Flink BroadcastState的差异、适用场景
参考回答
广播(Broadcast)是将小数据集高效分发到所有计算节点的机制,避免冗余传输,Spark与Flink的实现各有侧重:
(1)Spark Broadcast的实现
- 核心类:
org.apache.spark.broadcast.Broadcast
,具体实现有TorrentBroadcast
(默认)和HttpBroadcast
。 - 实现流程:
- 序列化:Driver将广播数据(如小表)序列化为字节数组,分成多个块(默认4MB/块);
- 分布式存储:Driver保留一个块,其他块通过BitTorrent协议分发到Executor(每个Executor缓存部分块,避免Driver单点压力);
- Executor获取:Executor需要时,从Driver或其他Executor下载缺失的块,合并后反序列化,缓存在内存(可持久化到磁盘);
- 失效机制:任务完成后,广播变量在Executor上的缓存会被清理(可通过
unpersist()
手动清理)。
代码示例:
val sc = spark.sparkContext
val smallTable = sc.parallelize(Seq(("a", 1), ("b", 2))).collectAsMap()
val broadcastVar = sc.broadcast(smallTable) // 广播小表
largeRDD.map { case (key, value) =>
(key, value + broadcastVar.value.getOrElse(key, 0)) // Executor读取广播变量
}
(2)Flink BroadcastState的实现
Flink的广播用于“流与小表关联”,通过BroadcastState
存储广播数据,核心类BroadcastStream
。
- 实现流程:
- 广播流创建:将小表数据包装为
BroadcastStream
(标记为广播流); - 状态绑定:通过
connect
方法关联主流与广播流,将广播数据存入BroadcastState
(每个TaskManager一个副本,内存或RocksDB存储); - 动态更新:支持广播流数据动态更新(如配置表变更),
BroadcastState
会自动同步到所有TaskManager; - 关联计算:主流数据与
BroadcastState
中的数据关联(如用户行为流关联用户画像广播表)。
- 广播流创建:将小表数据包装为
代码示例:
DataStream<User> userStream = ...; // 主流:用户行为
DataStream<Config> configStream = ...; // 广播流:配置数据
// 创建广播描述符
MapStateDescriptor<String, Config> configDesc = new MapStateDescriptor<>(...);
BroadcastStream<Config> broadcastConfig = configStream.broadcast(configDesc);
// 关联主流与广播流
DataStream<Result> resultStream = userStream
.connect(broadcastConfig)
.process(new BroadcastProcessFunction<User, Config, Result>() {
@Override
public void processElement(User user, ...) {
Config config = broadcastState.get(user.getType()); // 读取广播状态
// 业务逻辑
}
});
(3)核心差异
特性 | Spark Broadcast | Flink BroadcastState |
---|---|---|
数据类型 | 静态数据(广播后不可变) | 支持动态更新(广播流可追加/覆盖) |
生命周期 | 与Job绑定(Job结束后销毁) | 与流任务生命周期一致(持续运行) |
适用场景 | 批处理中Join小表、传递配置参数 | 流处理中动态关联配置表、规则引擎 |
存储位置 | Executor内存(可持久化到磁盘) | TaskManager内存/RocksDB(大状态) |
补充回答注意要点
- 广播的核心是“一次分发,多次使用”,适用于数据量小(Spark建议<1GB,Flink建议<100MB)的场景;
- Spark广播避免使用
collect()
后广播(Driver内存压力大),应直接广播RDD; - Flink广播需注意
BroadcastState
的更新原子性(通过applyToKeyedState
保证)。
5. RDD的具体实现
考察知识点
RDD的核心属性、依赖关系实现、Transformation与Action的执行逻辑、容错机制
参考回答
RDD(Resilient Distributed Dataset)是Spark的核心抽象,本质是不可变的分布式元素集合,其实现依赖5个核心属性和依赖关系管理:
This post is for subscribers on the 网站会员 and 成为小万的高级会员 tiers only
Subscribe NowAlready have an account? Sign In