摘要:SQL 是数据处理领域的“通用语”,但在流计算(Streaming)的世界里,SQL 的语义发生了微妙而深刻的变化。本系列文章旨在帮助开发者从 DataStream API 的“命令式编程”思维切换到 SQL 的“声明式编程”思维。作为上篇,本文将以约 3500 字的篇幅,深入剖析 Flink SQL 的核心概念——动态表(Dynamic Table)的底层流转机制,详解 Source/Sink 的 DDL 定义与参数调优,并手把手教你掌握 Flink 1.13+ 引入的标准窗口表值函数(Window TVF)。
更多内容可以加入我的discord社群,我们大家一起学习!小万也会去搜集各种高质量的文章和技术论文来给大家分享!https://discord.gg/aBheA6V8DQ
1. 引言:为什么流计算需要 SQL?
在 Flink 的早期版本(1.9 之前),DataStream API 是绝对的王者。虽然它灵活强大,能精确控制每一条数据的处理逻辑(ProcessFunction)和状态定时器(Timer),但它的门槛极高:
- 开发效率低:实现一个简单的“每分钟统计一次 PV”,需要手写 Source、Window、Aggregate、Sink,代码量轻松破百行。
- 运维难度大:状态(State)的管理全靠开发者自觉,KeyedState 的 TTL 需要手动配置,一旦疏忽就会导致内存泄漏。
- 优化门槛高:如何复用状态?如何解决数据倾斜?这些都需要资深专家才能搞定。
Flink SQL 的出现彻底改变了这一局面。它不仅仅是一个简单的 API 包装,更是一个智能的逻辑执行计划优化器。
- 统一批流:同一套 SQL,既可以跑在 Hive 的离线历史数据上(Batch Mode),也可以跑在 Kafka 的实时数据流上(Streaming Mode)。
- 自动优化:底层的 Query Optimizer 会自动帮你选择最优的 Join 策略(Hash Join vs Sort Merge Join),自动处理状态的清理,屏蔽了底层复杂性。
2. 核心思维模型:动态表 (Dynamic Table)
理解 Flink SQL 的关键,在于理解 Stream (流) 与 Table (表) 之间的二元性。这是 Flink SQL 区别于传统数据库 SQL 的根本所在。
2.1 什么是动态表?
在 MySQL 或 Hive 中,表是静态的(Static)。当你执行 SELECT * FROM T 时,数据库通过扫描此刻的文件或 B+ 树,给你返回一个确定的结果集,查询随之结束。
但在 Flink 中,数据流是源源不断的。我们如何用 SQL 查询一个“正在发生变化”的东西? Flink 引入了 动态表 (Dynamic Table) 的概念:
- 流 -> 表:我们将数据流视为一个正在发生变化的表。每一条新流入的数据(Event),都相当于对这个表执行了一次
INSERT操作。 - 表 -> 流:我们在动态表上执行 SQL 查询(Continuous Query),查询的结果也是一个动态表。为了将结果输出到外部系统(如 Kafka),我们需要将这个结果表转换回数据流(Changelog Stream)。
2.2 连续查询 (Continuous Query)
Flink SQL 执行的查询是连续查询。 与传统数据库不同,Flink 的查询任务启动后,永远不会终止。它会一直驻留在集群中,随着上游数据的到来,实时更新查询结果。
2.3 深入底层:三种结果流语义 (The 3 Stream Types)
这是 Flink SQL 最难理解,也是生产环境排查错误(如 Kafka Sink 报错)最关键的部分。Flink 内部通过 Changelog 机制来在算子间传递数据。
2.3.1 追加流 (Append-only Stream)
- 定义:结果表只会有新数据插入,已有的数据永远不会被修改或删除。
- 场景:简单的过滤(Filter)、映射(Map)或非窗口聚合(如 Tumble Window)。
- 物理消息:只有
INSERT (+I)消息。 - Sink 要求:任何 Sink 都可以支持(Kafka, File, HBase)。
SQL 示例:
SELECT id, name FROM users WHERE age > 18;
2.3.2 回撤流 (Retract Stream)
- 定义:结果表的数据可能会被修改。因为流数据是 append 的,无法直接 update,Flink 采用“先撤回旧值,再发送新值”的策略。
- 场景:普通的
GROUP BY聚合。- 当用户 A 第一次点击:输出
+I[A, 1]。 - 当用户 A 第二次点击:
- 先发送
-D[A, 1](告诉下游,撤回刚才那条 1)。 - 再发送
+I[A, 2](告诉下游,现在是 2 了)。
- 先发送
- 当用户 A 第一次点击:输出
- Sink 要求:下游必须能处理 Delete/Update 消息。普通的 Kafka Consumer 只能看到一堆数据,看不出撤回逻辑,需要配合专门的 CDC 消费者。
SQL 示例:
SELECT user, COUNT(*) as cnt FROM clicks GROUP BY user;
2.3.3 更新插入流 (Upsert Stream)
- 定义:类似于数据库的
REPLACE INTO。要求结果表必须有 主键 (Primary Key)。
- 物理消息:
UPDATE_BEFORE (-U): 旧值(有时可省略)。UPDATE_AFTER (+U): 新值。
- 场景:使用 Upsert Kafka Connector 或 JDBC Sink 时。
- Sink 要求:Sink 必须支持幂等更新(如 HBase, Redis, JDBC, Upsert-Kafka)。
3. 基础实战:DDL 定义与 Connector 详解
Flink SQL 使用标准的 CREATE TABLE 语法来定义外部数据源。这不仅仅是定义 Schema,更是定义了数据的物理获取方式。
3.1 引入依赖
在 Maven 项目中,除了 Flink 核心依赖,通过 SQL 开发还需要以下“四大家族”:
<!-- 1. Table API 核心桥接层 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 2. Planner (Blink 优化器) - 运行时必须 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 3. Kafka Connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 4. Format 解析器 (如 JSON) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
3.2 深度解析:Kafka Source DDL
假设 Kafka 中有一条 json 数据流,Topic 为 user_behavior。
CREATE TABLE source_kafka (
`user_id` STRING,
`item_id` STRING,
`behavior` STRING,
`ts` BIGINT,
-- [深度] 计算列:定义处理时间属性
`proc_time` AS PROCTIME(),
-- [深度] 计算列:定义事件时间属性 (基于 ts 字段)
`event_time` AS TO_TIMESTAMP_LTZ(ts, 3),
-- [深度] Watermark 定义:允许最大 5 秒乱序
WATERMARK FOR `event_time` AS `event_time` - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'flink-sql-group-001',
-- [参数详解 1] 启动模式
-- 'earliest-offset': 从头消费
-- 'latest-offset': 从最新消费
-- 'group-offsets': 接上次提交的 Offset 继续消费 (生产环境最常用)
-- 'timestamp': 从指定时间戳开始
'scan.startup.mode' = 'group-offsets',
-- [参数详解 2] 格式
-- 'json': 普通 JSON
-- 'debezium-json': 处理 CDC 数据 (带 op 字段)
-- 'avro': 需配合 Schema Registry
'format' = 'json',
-- [参数详解 3] JSON 解析失败处理
-- 'true': 解析失败跳过当前行 (防止脏数据导致任务挂掉)
-- 'false': 报错停止任务
'json.ignore-parse-errors' = 'true'
);
3.3 深度解析:JDBC Sink (MySQL)
将统计结果写入 MySQL,通常要求实现幂等更新。
CREATE TABLE sink_mysql (
`user_id` STRING,
`item_count` BIGINT,
-- [关键] 必须指定主键,Flink 才能根据 ID 进行 UPDATE
PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/flink_test',
'table-name' = 'user_stats',
'username' = 'root',
'password' = 'pwd',
-- [调优] 写入批次大小,积攒 50 条写一次 DB,提高吞吐
'sink.buffer-flush.max-rows' = '50',
-- [调优] 写入最大等待时间,防止数据太少一直不 flush
'sink.buffer-flush.interval' = '1s',
-- [调优] 写入重试次数
'sink.max-retries' = '3'
);
4. 时间属性与 Watermark 详解
在 Flink SQL 中,时间属性(Time Attribute)是窗口操作的基石。
This post is for subscribers on the 网站会员 and 成为小万的高级会员 tiers only
Subscribe NowAlready have an account? Sign In