Flink 入门

一、概述

1. 简介

  • Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。核心目标是数据流上的有状态计算(Stateful Computations over Data Streams)
  • 有状态的流处理:把流处理需要的额外数据保存成一个状态,然后针对这条数据进行处理,并且更新状态
    • 状态在内存中:优点速度快;缺点可靠性差
    • 状态在分布式系统中:优点可靠性高;缺点速度慢
  • 官网:https://flink.apache.org/

  • 在德语中,Flink 一词表示快速、灵巧。项目的 logo 是一只彩色的松鼠
  • Flink 起源于 Stratosphere 项目,它是由 3 所地处柏林的大学和欧洲其他一些大学在 2010~2014 年共同进行的研究项目,由柏林理工大学的教授 Volker Markl 领衔开发。2014 年 4 月,Stratosphere 的代码捐赠给了 Apache 软件基金会,Flink 就是在此基础上被重新设计出来的
    • 2014 年 8 月,Flink 第一个版本 0.6 正式发布,与此同时 Fink 的几位核心开发者创办 Data Artisans 公司
    • 2014 年 12 月,Flink 项目完成孵化
    • 2015 年 4 月,Flink 发布了里程碑式的重要版本 0.9.0
    • 2019 年 1 月,长期对 Flink 投入研发的阿里巴巴,以 9000 万欧元的价格收购了 Data Artisans 公司
    • 2019 年 8 月,阿里巴巴将内部版本 Blink 开源,合并入 Flink 1.9.0 版本
  • 高吞吐和低延迟:每秒处理数百万个事件,毫秒级延迟
  • 结果的准确性:Flink 提供了事件时间(event-time)和处理时间(processing-time)语义。对于乱序事件流,事件时间语义仍然能提供一致且准确的结果
  • 精确一次(exactly-once)的状态一致性保证
  • 可以连接到最常用的外部系统,如 Kafka、Hive、JDBC、HDFS、Redis 等
  • 高可用:本身高可用的设置,加上与 K8s、YARN 和 Mesos 的紧密集成,再加上从故障中快速恢复和动态扩展任务的能力,Flink 能做到以极少的停机时间 7×24 全天候运行
  • 电商和市场营销:实时数据报表、广告投放、实时推荐
  • 物联网(IOT):传感器实时数据采集和显示、实时报警,交通运输业
  • 物流配送和服务业:订单状态实时更新、通知信息推送
  • 银行和金融业:实时结算和通知推送,实时检测异常行为
  • Spark 以批处理为根本
    • 数据模型:Spark 采用 RDD 模型,Spark Streaming 的 DStream 实际上就是一组组小批数据 RDD 的集合
    • 运行时架构:Spark 是批计算,将 DAG 划分为不同的 stage,一个完成后才可以计算下一个
  • Flink 以流处理为根本
    • 数据模型:Flink 基本数据模型是数据流,以及事件(Event)序列
    • 运行时架构:Flink 是标准的流执行模式,一个事件在一个节点处理完后可以直接发往下一个节点进行处理
Flink Spark Streaming
计算模型 流计算 微批处理
时间语义 事件时间、处理时间 处理时间
窗口 多、灵活 少、不灵活(窗口必须是批次的整数倍)
状态 没有
流式SQL 没有
  • 底层 API(处理函数):对最原始数据加工处理。底层 API 与 DataStream API 相集成,可以处理复杂的计算
  • 核心 API:DataStream API(流处理)和 DataSet API(批处理)封装了底层处理函数,提供了通用的模块,比如转换(transformations,包括 map、flatmap 等),连接(joins),聚合(aggregations),窗口(windows)操作等。Flink 1.12 以后,DataStream API 已经实现真正的流批一体,DataSet API 已经过时
  • Table API:以表为中心的声明式编程,其中表可能会动态变化。Table API 遵循关系模型:表有二维数据结构,类似于关系数据库中的表;同时 API 提供可比较的操作,例如 select、project、join、group-by、aggregate 等。可以在表与 DataStream/DataSet 之间无缝切换,以允许程序将 Table API 与 DataStream 以及 DataSet 混合使用
  • SQL API:这一层在语法与表达能力上与 Table API 类似,但是是以 SQL 查询表达式的形式表现程序。SQL 抽象与 Table API 交互密切,同时 SQL 查询可以直接在 Table API 定义的表上执行

二、快速上手

1. 创建项目

  • 创建一个 Maven 工程
  • 添加项目依赖(IDEA 启动时,配置包含 Provided 的依赖)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<properties>
<flink.version>1.17.0</flink.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
  • 数据准备
1
2
3
4
$ cat input/word.txt
hello flink
hello world
hello java

2. 批处理 WordCount

  • 批处理对数据的处理转换,是看作数据集来进行操作的
  • Flink 本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的 API。所以从 Flink 1.12 开始,官方推荐直接使用 DataStream API,可以在提交任务时通过将执行模式设为 BATCH 来进行批处理(-Dexecution.runtime-mode=BATCH
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class BatchWordCount {

public static void main(String[] args) throws Exception {
// 1. 创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 2. 从文件按行读取数据
DataSource<String> lines = env.readTextFile("input/word.txt");
// 3. 转换数据格式
FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word,1L));
}
}
});
// 4. 按照word进行分组
UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);
// 5. 分组内聚合统计
AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);
// 6. 打印结果
sum.print();
}
}
// (flink,1)
// (world,1)
// (hello,3)
// (java,1)

3. 流处理 WordCount

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class StreamWordCount {

public static void main(String[] args) throws Exception {
// 1. 创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 读取文件
DataStreamSource<String> lineStream = env.readTextFile("input/word.txt");
// 3. 转换、分组、求和,得到统计结果
SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1L));
}
}
}).keyBy(data -> data.f0).sum(1);
// 4. 打印
sum.print();
// 5. 执行
env.execute();
}
}
// 3> (java,1)
// 5> (hello,1)
// 5> (hello,2)
// 5> (hello,3)
// 13> (flink,1)
// 9> (world,1)

4. 读取 Socket 文本流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// nc -lk 7777
public class SocketStreamWordCount {

public static void main(String[] args) throws Exception {
// 1. 创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 读取文本流:指定主机名和端口号
DataStreamSource<String> lineStream = env.socketTextStream("debian201", 7777);
// 3. 转换、分组、求和,得到统计结果
SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream
.flatMap(
(String line, Collector<Tuple2<String, Long>> out) -> {
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1L));
}
}
) // 这里只能推断出返回的是Tuple2类型,无法得到Tuple2<String, Long>
.returns(Types.TUPLE(Types.STRING, Types.LONG)) // 显示指定类型
.keyBy(data -> data.f0)
.sum(1);
// 4. 打印
sum.print();
// 5. 执行
env.execute();
}
}
// # hello world
//13> (world,1)
//5> (hello,1)
// # hello flink
//2> (flink,1)
//5> (hello,2)
  • 由于 Java 中泛型擦除的存在,在某些特殊情况下(比如 Lambda 表达式中),自动提取的信息是不够精细的。这时就需要显式地提供类型信息,才能使应用程序正常工作
  • Flink 具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器

5. 使用本地 Web UI

  • 在开发环境,可以使用 StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()) 创建一个带本地 Web UI 的项目,用于本地测试
1
2
3
4
5
6
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
  • Flink 使用“类型信息”(TypeInformation)来统一表示数据类型。TypeInformation 类是 Flink 中所有类型描述符的基类。它涵盖了类型的一些基本属性,并为每个数据类型生成特定的序列化器、反序列化器和比较器
  • 对于常见的 Java 和 Scala 数据类型,Flink 都是支持的。Flink 在内部对不同的类型进行了划分,这些类型可以在 Types 工具类中找到
    • 基本类型:所有 Java 基本类型及其包装类,再加上 Void、String、Date、BigDecimal 和 BigInteger
    • 数组类型:包括基本类型数组(PRIMITIVE_ARRAY)和对象数组(OBJECT_ARRAY)
    • 复合数据类型
      • Java 元组类型(TUPLE):这是 Flink 内置的元组类型,是 Java API 的一部分。最多 25 个字段,也就是从 Tuple0~Tuple25,不支持空字段
      • Scala 样例类及 Scala 元组:不支持空字段
      • 行类型(ROW):可以认为是具有任意个字段的元组,并支持空字段
      • POJO:Flink 自定义的类,似于 Java Bean 模式的类
    • 辅助类型:Option、Either、List、Map 等
    • 泛型类型(GENERIC):Flink 支持所有的 Java 类和 Scala 类。不过如果没有按照上面 POJO 类型的要求来定义,就会被 Flink 当作泛型类来处理。Flink 会把泛型类型当作黑盒,无法获取它们内部的属性;它们也不是由 Flink 本身序列化的,而是由 Kryo 序列化的
  • 在这些类型中,元组类型和 POJO 类型最为灵活,因为它们支持创建复杂类型。相比之下,POJO 还支持在键(key)的定义中直接使用字段名,这会让代码的可读性大大增加。在项目实践中,往往会将流处理程序中的元素类型定为 Flink 的 POJO 类型
  • Flink 对 POJO 类型的要求如下:
    • 类是公有(public)的
    • 有一个无参的构造方法
    • 所有属性都是可访问的
    • 所有属性的类型都是可序列化的
  • 类型提示(Type Hints)
    • Flink 具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于 Java 中泛型擦除的存在,在某些特殊情况下(比如 Lambda 表达式中),自动提取的信息是不够精细的。这时就需要显式地提供类型信息,才能使应用程序正常工作或提高其性能
    • 为了解决这类问题,Java API 提供了专门的“类型提示”(type hints),它可以捕获泛型的类型信息,并且一直记录下来,为运行时提供足够的信息
1
2
3
4
5
// 对于map里传入的Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2<String, Long>
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));

// .returns(new TypeHint<Tuple2<String, Long>>(){}); // 代替写法