Flink 环境搭建

debian201 debian202 debian203
JobManager、TaskManager TaskManager TaskManager

1. 集群角色

  • Client:客户端获取代码并做转换,之后提交给 JobManager
  • JobManager:对作业进行中央调度管理。它获取到要执行的作业后,会进一步处理转换,然后分发任务给众多的 TaskManager
  • TaskManager:真正执行数据处理任务的角色

2. 集群部署

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
$ tar -zxvf flink-1.17.0-bin-scala_2.12.tgz -C /opt/app/
$ cd /opt/app/flink-1.17.0
$ vi conf/flink-conf.yaml
#JobManager
jobmanager.rpc.address: debian201
jobmanager.bind-host: 0.0.0.0
#TaskManager
taskmanager.bind-host: 0.0.0.0
taskmanager.host: debian201 # debian202/debian203
#Web
rest.address: debian201
rest.bind-address: 0.0.0.0
#进程可使用到的全部内存,包括JVM元空间和其他开销
#jobmanager.memory.process.size: 1600M
#taskmanager.memory.process.size: 1728M
#每个TaskManager能够分配的Slot数量
#taskmanager.numberOfTaskSlots: 1
#Flink任务执行的并行度
#parallelism.default: 1
$ vi conf/workers
debian201
debian202
debian203
$ vi conf/masters
debian201:8081

$ bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host debian201.
Starting taskexecutor daemon on host debian201.
Starting taskexecutor daemon on host debian202.
Starting taskexecutor daemon on host debian203.
$ xcall jps
=============== debian201 ===============
4453 StandaloneSessionClusterEntrypoint # JobManager
4458 TaskManagerRunner # TaskManager
4533 Jps
=============== debian202 ===============
2872 TaskManagerRunner
2941 Jps
=============== debian203 ===============
2948 Jps
2876 TaskManagerRunner

3. 向集群提交作业

  • 添加打包插件的配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
<exclude>org.apache.hadoop:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers combine.children="append">
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
  • 因为集群中已经具备任务运行所需的所有依赖,所以可以直接使用 original 的 jar 包
  • 在 Web 界面的 Submit New Job 菜单中上传 jar 包,点击 jar 包,配置程序入口主类的全类名、任务运行的并行度、任务运行所需的配置参数和保存点路径等,点击提交将任务提交到集群中运行
  • 可在 Running Jobs 菜单中查看任务运行情况
  • 也可通过命令行提交任务:
1
$ bin/flink run -m debian201:8081 -c com.lb.flink.wc.SocketStreamWordCount ./flink-test-1.0-SNAPSHOT.jar
  • Flink 是一个非常灵活的处理框架,它支持多种不同的部署场景,还可以和不同的资源管理平台方便地集成
  • 在一些应用场景中,对于集群资源分配和占用的方式,可能会有特定的需求。Flink 为各种场景提供了不同的部署模式,主要有以下三种:会话模式(Session Mode)、单作业模式(Per-Job Mode)、应用模式(Application Mode)
  • 它们的区别主要在于:集群的生命周期以及资源的分配方式,以及应用的 main 方法到底在哪里执行——客户端(Client)还是 JobManager

1. 会话模式(Session Mode)

  • 需要先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业
  • 集群启动时所有资源就都已经确定,所以所有提交的作业会竞争集群中的资源
  • 适合单个规模小、执行时间短的大量作业

2. 单作业模式(Per-Job Mode)

  • 会话模式因为资源共享会导致很多问题,为了更好地隔离资源,可以为每个提交的作业启动一个集群,这就是单作业模式
  • 作业完成后,集群就会关闭,资源也会被释放
  • 单作业模式在生产环境运行更加稳定,是实际应用的首选模式
  • Flink 本身无法直接这样运行,单作业模式一般需要借助一些资源管理框架来启动集群,比如 YARN、Kubernetes

3. 应用模式(Application Mode)

  • 前面两种模式,应用代码都是先在客户端上执行,然后由客户端提交给 JobManager。这种方式客户端需要占用大量网络带宽,去下载依赖和把二进制数据发送给 JobManager;加上很多情况下我们提交作业用的是同一个客户端,就会加重客户端所在节点的资源消耗
  • 应用模式直接把应用提交到 JobManger 上运行。而这也就代表着,我们需要为每一个提交的应用单独启动一个 JobManager,也就是创建一个集群。这个 JobManager 只为执行这一个应用而存在,执行结束之后 JobManager 也就关闭了
  • 单作业模式是通过客户端来提交的,客户端解析出的每一个作业对应一个集群;而应用模式下,是直接由 JobManager 执行应用程序的

1. Standalone 运行模式

  • Standalone 模式是独立运行的,不依赖任何外部的资源管理平台。一般只用在开发测试或作业非常少的场景下
  • 缺点:如果资源不足,或者出现故障,没有自动扩展或重分配资源的保证,必须手动处理

(1) 会话模式

  • 集群部署章节就是 Standalone 集群的会话模式部署

(2) 单作业模式

  • Standalone 集群不支持单作业模式部署

(3) 应用模式

  • 应用模式不会提前创建集群,所以不能调用 start-cluster.sh 脚本
  • 使用 standalone-job.sh 来创建一个 JobManager
1
2
3
4
5
6
7
8
$ cp flink-test-1.0-SNAPSHOT.jar lib/
# 启动JobManager:直接指定入口类,脚本会到lib目录扫描所有的jar包
$ bin/standalone-job.sh start --job-classname com.lb.flink.wc.SocketStreamWordCount
# 启动TaskManager
$ bin/taskmanager.sh start

$ bin/taskmanager.sh stop
$ bin/standalone-job.sh stop

2. YARN 运行模式

  • 客户端把 Flink 应用提交给 Yarn 的 ResourceManager,Yarn 的 ResourceManager 会向 Yarn 的 NodeManager 申请容器。在这些容器上,Flink 会部署 JobManager 和 TaskManager 的实例,从而启动集群。Flink 会根据运行在 JobManger 上的作业所需要的 Slot 数量动态分配 TaskManager 资源

(1) Yarn 环境准备

1
2
3
4
5
6
7
8
9
$ vim /etc/profile.d/my.sh
export HADOOP_HOME=/opt/app/hadoop-2.7.3
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`

# 启动集群
$ start-dfs.sh
$ start-yarn.sh

(2) 会话模式

  • 向 Yarn 集群申请资源,开启一个 Yarn 会话,启动 Flink 集群
  • Yarn Session 启动之后会给出一个 Web UI 地址以及一个 Application ID,用户可以通过 Web UI 或者命令行两种方式提交作业
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# `-d`:让Yarn Session后台运行
# `-jm`(--jobManagerMemory):配置JobManager所需内存,单位MB
# `-tm`(--taskManagerMemory):配置每个TaskManager所需内存,单位MB
# `-nm`(--name):配置在Yarn UI界面上显示的任务名
# `-qu`(--queue):指定Yarn队列名
$ bin/yarn-session.sh -nm test -d
Found Web Interface debian201:39612 of application 'application_1702521858432_0001'.
JobManager Web Interface: http://debian201:39612

# 提交作业:客户端可以自行确定JobManager的地址
$ bin/flink run -c com.lb.flink.wc.SocketStreamWordCount ./flink-test-1.0-SNAPSHOT.jar
# 任务提交成功后,可在Yarn的Web UI界面查看运行情况

# 停止Session:可在Yarn的Web UI上停止,也可通过命令方式
$ echo "stop" | ./bin/yarn-session.sh -id application_1702521858432_0002
  • Flink 1.11.0 版本不再使用 -n 参数和 -s 参数指定 TaskManager 数量和 slot 数量,Yarn 会按照需求动态分配 TaskManager 和 slot。所以从这个意义上讲,Yarn 的会话模式也不会把集群资源固定,同样是动态分配的

(3) 单作业模式

  • 直接向 Yarn 提交一个单独的作业,从而启动一个 Flink 集群
1
2
3
4
5
6
# 提交作业
$ bin/flink run -d -t yarn-per-job -c com.lb.flink.wc.SocketStreamWordCount ./flink-test-1.0-SNAPSHOT.jar

# 停止Job:通过WebUI或命令行方式
$ bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
$ bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>
  • 如果启动过程中报错 Trying to access closed classloader,可在配置 flink-conf.yaml 中设置 classloader.check-leaked-classloader: false

(4) 应用模式

  • 与单作业模式类似
1
2
3
4
5
6
# 提交作业
$ bin/flink run-application -t yarn-application -c com.lb.flink.wc.SocketStreamWordCount ./flink-test-1.0-SNAPSHOT.jar

# 停止Job:通过WebUI或命令行方式
$ bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
$ bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>

(5) 上传 HDFS 提交

  • 将 Flink 本身的依赖和用户 jar 可以预先上传到 HDFS,而不需要单独发送到集群,这就使得作业提交更加轻量了
1
2
3
4
5
6
7
8
$ hadoop fs -mkdir /flink-dist
$ hadoop fs -put lib/ /flink-dist
$ hadoop fs -put plugins/ /flink-dist

$ hadoop fs -mkdir /flink-jars
$ hadoop fs -put flink-test-1.0-SNAPSHOT.jar /flink-jars

$ bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://debian201:8020/flink-dist" -c com.lb.flink.wc.SocketStreamWordCount hdfs://debian201:8020/flink-jars/flink-test-1.0-SNAPSHOT.jar

四、历史服务器

  • 运行 Flink Job 的集群一旦停止,只能去 Yarn 或本地磁盘上查看日志,不再可以查看作业挂掉之前的运行的 Web UI,很难清楚知道作业在挂的那一刻到底发生了什么
  • Flink 提供了历史服务器,用来在相应的 Flink 集群关闭后查询已完成作业的统计信息
  • Flink 任务停止后,JobManager 会将已经完成任务的统计信息进行存档,历史服务器进程则在任务停止后可以对任务统计信息进行查询。比如最后一次的 Checkpoint、任务运行时的相关配置等
1
2
3
4
5
6
7
8
9
10
$ hadoop fs -mkdir -p /flink-his
$ vi conf/flink-config.yaml
jobmanager.archive.fs.dir: hdfs://debian201:8020/flink-his
historyserver.web.address: debian201
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://debian201:8020/flink-his
historyserver.archive.fs.refresh-interval: 5000

$ bin/historyserver.sh start
$ bin/historyserver.sh stop