一、Flink 集群部署
debian201 | debian202 | debian203 |
---|---|---|
JobManager、TaskManager | TaskManager | TaskManager |
1. 集群角色
- Client:客户端获取代码并做转换,之后提交给 JobManager
- JobManager:对作业进行中央调度管理。它获取到要执行的作业后,会进一步处理转换,然后分发任务给众多的 TaskManager
- TaskManager:真正执行数据处理任务的角色
2. 集群部署
1 | $ tar -zxvf flink-1.17.0-bin-scala_2.12.tgz -C /opt/app/ |
- 可通过 http://debian201:8081 访问 Web UI
3. 向集群提交作业
- 添加打包插件的配置
1 | <build> |
- 因为集群中已经具备任务运行所需的所有依赖,所以可以直接使用
original
的 jar 包 - 在 Web 界面的
Submit New Job
菜单中上传 jar 包,点击 jar 包,配置程序入口主类的全类名、任务运行的并行度、任务运行所需的配置参数和保存点路径等,点击提交将任务提交到集群中运行 - 可在
Running Jobs
菜单中查看任务运行情况 - 也可通过命令行提交任务:
1 | $ bin/flink run -m debian201:8081 -c com.lb.flink.wc.SocketStreamWordCount ./flink-test-1.0-SNAPSHOT.jar |
二、Flink 部署模式
- Flink 是一个非常灵活的处理框架,它支持多种不同的部署场景,还可以和不同的资源管理平台方便地集成
- 在一些应用场景中,对于集群资源分配和占用的方式,可能会有特定的需求。Flink 为各种场景提供了不同的部署模式,主要有以下三种:会话模式(Session Mode)、单作业模式(Per-Job Mode)、应用模式(Application Mode)
- 它们的区别主要在于:集群的生命周期以及资源的分配方式,以及应用的 main 方法到底在哪里执行——客户端(Client)还是 JobManager
1. 会话模式(Session Mode)
- 需要先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业
- 集群启动时所有资源就都已经确定,所以所有提交的作业会竞争集群中的资源
- 适合单个规模小、执行时间短的大量作业
2. 单作业模式(Per-Job Mode)
- 会话模式因为资源共享会导致很多问题,为了更好地隔离资源,可以为每个提交的作业启动一个集群,这就是单作业模式
- 作业完成后,集群就会关闭,资源也会被释放
- 单作业模式在生产环境运行更加稳定,是实际应用的首选模式
- Flink 本身无法直接这样运行,单作业模式一般需要借助一些资源管理框架来启动集群,比如 YARN、Kubernetes
3. 应用模式(Application Mode)
- 前面两种模式,应用代码都是先在客户端上执行,然后由客户端提交给 JobManager。这种方式客户端需要占用大量网络带宽,去下载依赖和把二进制数据发送给 JobManager;加上很多情况下我们提交作业用的是同一个客户端,就会加重客户端所在节点的资源消耗
- 应用模式直接把应用提交到 JobManger 上运行。而这也就代表着,我们需要为每一个提交的应用单独启动一个 JobManager,也就是创建一个集群。这个 JobManager 只为执行这一个应用而存在,执行结束之后 JobManager 也就关闭了
- 单作业模式是通过客户端来提交的,客户端解析出的每一个作业对应一个集群;而应用模式下,是直接由 JobManager 执行应用程序的
三、Flink 运行模式
1. Standalone 运行模式
- Standalone 模式是独立运行的,不依赖任何外部的资源管理平台。一般只用在开发测试或作业非常少的场景下
- 缺点:如果资源不足,或者出现故障,没有自动扩展或重分配资源的保证,必须手动处理
(1) 会话模式
- 集群部署章节就是 Standalone 集群的会话模式部署
(2) 单作业模式
- Standalone 集群不支持单作业模式部署
(3) 应用模式
- 应用模式不会提前创建集群,所以不能调用 start-cluster.sh 脚本
- 使用 standalone-job.sh 来创建一个 JobManager
1 | $ cp flink-test-1.0-SNAPSHOT.jar lib/ |
2. YARN 运行模式
- 客户端把 Flink 应用提交给 Yarn 的 ResourceManager,Yarn 的 ResourceManager 会向 Yarn 的 NodeManager 申请容器。在这些容器上,Flink 会部署 JobManager 和 TaskManager 的实例,从而启动集群。Flink 会根据运行在 JobManger 上的作业所需要的 Slot 数量动态分配 TaskManager 资源
(1) Yarn 环境准备
1 | $ vim /etc/profile.d/my.sh |
(2) 会话模式
- 向 Yarn 集群申请资源,开启一个 Yarn 会话,启动 Flink 集群
- Yarn Session 启动之后会给出一个 Web UI 地址以及一个 Application ID,用户可以通过 Web UI 或者命令行两种方式提交作业
1 | # `-d`:让Yarn Session后台运行 |
- Flink 1.11.0 版本不再使用 -n 参数和 -s 参数指定 TaskManager 数量和 slot 数量,Yarn 会按照需求动态分配 TaskManager 和 slot。所以从这个意义上讲,Yarn 的会话模式也不会把集群资源固定,同样是动态分配的
(3) 单作业模式
- 直接向 Yarn 提交一个单独的作业,从而启动一个 Flink 集群
1 | # 提交作业 |
- 如果启动过程中报错
Trying to access closed classloader
,可在配置 flink-conf.yaml 中设置classloader.check-leaked-classloader: false
(4) 应用模式
- 与单作业模式类似
1 | # 提交作业 |
(5) 上传 HDFS 提交
- 将 Flink 本身的依赖和用户 jar 可以预先上传到 HDFS,而不需要单独发送到集群,这就使得作业提交更加轻量了
1 | $ hadoop fs -mkdir /flink-dist |
四、历史服务器
- 运行 Flink Job 的集群一旦停止,只能去 Yarn 或本地磁盘上查看日志,不再可以查看作业挂掉之前的运行的 Web UI,很难清楚知道作业在挂的那一刻到底发生了什么
- Flink 提供了历史服务器,用来在相应的 Flink 集群关闭后查询已完成作业的统计信息
- Flink 任务停止后,JobManager 会将已经完成任务的统计信息进行存档,历史服务器进程则在任务停止后可以对任务统计信息进行查询。比如最后一次的 Checkpoint、任务运行时的相关配置等
1 | $ hadoop fs -mkdir -p /flink-his |