Flink 系统架构与核心概念

一、系统架构

1. 作业管理器(JobManager)

  • JobManager 是 Flink 集群中任务管理和调度的核心,是控制应用执行的主进程。也就是说,每个应用都应该被唯一的 JobManager 所控制执行
  • JobManger 包含3个不同的组件:

(1) JobMaster

  • JobMaster 是 JobManager 中最核心的组件,负责处理单独的作业(Job)
  • 多个 Job 可以同时运行在一个 Flink 集群中,每个 Job 对应一个 JobMaster
  • 作业提交时,JobMaster 会接收到要执行的应用,然后把 JobGraph 转换成一个物理层面的数据流图,这个图被叫作“执行图”(Execution Graph),它包含了所有可以并发执行的任务
  • JobMaster 会向资源管理器(ResourceManager)发出请求,申请执行任务必要的资源。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的 TaskManager 上
  • 在运行过程中,JobMaster 会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调

(2) 资源管理器(ResourceManager)

  • ResourceManager 主要负责资源的分配和管理,在 Flink 集群中只有一个
  • 资源主要是指 TaskManager 的任务槽(task slots)。任务槽就是 Flink 集群中的资源调配单元,包含了机器用来执行计算的一组 CPU 和内存资源。每一个任务(Task)都需要分配到一个 slot 上执行

(3) 分发器(Dispatcher)

  • Dispatcher 主要负责提供一个 REST 接口,用来提交应用,并且负责为每一个新提交的作业启动一个新的 JobMaster 组件
  • Dispatcher 也会启动一个 Web UI,用来方便地展示和监控作业执行的信息
  • Dispatcher 在架构中并不是必需的,在不同的部署模式下可能会被忽略掉

2. 任务管理器(TaskManager)

  • TaskManager 是 Flink 中的工作进程,数据流的具体计算就是它来做的,Flink 集群中至少有一个
  • 每一个 TaskManager 都包含了一定数量的任务槽(task slots)。slot 是资源调度的最小单位,slot 的数量限制了 TaskManager 能够并行处理的任务数量
  • 启动之后,TaskManager 会向资源管理器注册它的 slots;收到资源管理器的指令后,TaskManager 就会将一个或者多个槽位提供给 JobMaster 调用,JobMaster 就可以分配任务来执行了
  • 在执行过程中,TaskManager 可以缓冲数据,还可以跟其他运行同一应用的 TaskManager 交换数据

二、核心概念

1. 并行度(Parallelism)

  • 当要处理的数据量非常大时,我们可以把一个算子操作“复制”多份到多个节点,数据来了之后就可以到其中任意一个执行。这样一个算子(operator)任务就被拆分成了多个并行的子任务(subtasks),再将它们分发到不同节点,就真正实现了并行计算
  • 在 Flink 执行过程中,每一个算子可以包含一个或多个子任务,这些子任务在不同的线程、不同的物理机或不同的容器中完全独立地执行
  • 一个特定算子的子任务的个数被称为并行度(parallelism)。包含并行子任务的数据流,就是并行数据流,它需要多个分区(stream partition)来分配并行任务
  • 一个程序中,不同的算子可能具有不同的并行度。一般一个流程序的并行度,可以认为是其所有算子中最大的并行度
  • 可以通过以下方法来设置并行度(优先级由高到低):

(1) 代码中设置

  • 在算子后调用 setParallelism() 方法,来设置当前算子的并行度。该方法只针对当前算子的并行度
1
stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2);
  • 直接调用执行环境的 setParallelism() 方法,全局设定并行度。该方法针当前代码中所有算子的默认并行度
1
env.setParallelism(2);

由于 keyBy 不是算子,所以无法对 keyBy 设置并行度

(2) 提交应用时设置

  • 在使用 flink run 命令提交应用时,可以增加 -p 参数来指定当前应用程序执行的并行度
1
$ bin/flink run –p 2 –c com.lb.flink.Main ./flink-test.jar
  • Web UI 上提交作业时也可以在对应输入框中直接设置并行度

(3) 配置文件中设置

  • 可以直接在集群的配置文件 flink-conf.yaml 中更改默认并行度
  • 该设置对于整个集群上提交的所有作业有效,默认值为 1
1
parallelism.default: 2
  • 在 IDEA 开发环境中,没有配置文件,默认并行度就是当前机器的 CPU 核心数

2. 算子链(Operator Chain)

  • 一个数据流在算子之间传输数据的形式可以是一对一(one-to-one)的直通(forwarding)模式,也可以是打乱的重分区(redistributing)模式,具体是哪一种形式,取决于算子的种类
  • 一对一(One-to-one,Forwarding):
    • 该模式的数据流会维护分区以及元素的顺序。比如 source 算子读取数据之后,可以直接发送给 map 算子做处理,它们之间不需要重新分区,也不需要调整数据的顺序
    • map、filter、flatMap 等算子都是这种对应关系,类似于 Spark 中的窄依赖
  • 重分区(Redistributing):
    • 该模式的数据流的分区会发生改变。比如 map 和 keyBy/window 算子之间,以及 keyBy/window 和 Sink 算子之间
    • 每一个算子的子任务,会根据数据传输的策略,把数据发送到不同的下游目标任务。这些传输方式都会引起重分区的过程。类似于 Spark 中的 shuffle
  • 合并算子链
    • 在 Flink 中,并行度相同的一对一(one-to-one)算子操作,可以直接链接在一起形成一个“大”的任务(task),每个 task 会被一个线程执行。这样的技术被称为“算子链”(Operator Chain)
    • 算子链可以减少线程之间的切换和基于缓存区的数据交换,在减少时延的同时提升吞吐量
    • Flink 默认会按照算子链的原则进行链接合并,如果想要禁止合并或者自行定义,也可以在代码中对算子做一些特定的设置
    • 禁用算子链适用于拆分两个复杂的算子,使其在不同的线程中运行。也可用来定位出问题的算子
1
2
3
4
5
6
7
8
// 全局禁用算子链
env.disableOperatorChaining();

// 禁用算子链(影响前后算子)
.map(word -> Tuple2.of(word, 1L)).disableChaining();

// 从当前算子开始新链(影响前算子)
.map(word -> Tuple2.of(word, 1L)).startNewChain();

3. 任务槽(Task Slots)

  • Flink 中每一个 TaskManager 都是一个 JVM 进程,它可以启动多个独立的线程,来并行执行多个子任务(subtask)
  • TaskManager 的计算资源是有限的,并行的任务数就需要有一定的限制,因此需要对每个任务运行所占用的资源做出明确的划分
  • 每个任务槽(task slot)其实表示了 TaskManager 拥有的计算资源的一个固定大小的子集。假如一个 TaskManager 有三个 slot,那么它会将管理的内存平均分成三份,每个 slot 独占一份
  • slot 目前仅仅用来隔离内存,不会涉及 CPU 的隔离。在具体应用中,可以将 slot 数量配置为机器的 CPU 核心数,尽量避免不同任务之间对 CPU 的竞争
  • 可以在 Flink 配置文件中设置每个 TaskManager 的 slot 数量 taskmanager.numberOfTaskSlots,默认 1 个

(1) 任务槽的共享

  • 默认情况下,Flink 是允许子任务共享 slot 的,且 slot 中的子任务是同时执行的
  • 同一个作业下,对于不同算子的并行子任务,可以放到同一个 slot 上执行;对于相同算子的并行子任务,需要分配到不同的 slot 上
  • 好处:
    • 将资源密集型和非密集型的任务同时放到一个 slot 中,它们就可以自行分配对资源占用的比例,从而保证最重的任务平均分配给所有的 TaskManager
    • slot 共享允许我们保存完整的作业管道。这样即使某个 TaskManager 出现故障宕机,其他节点也可以完全不受影响,作业的任务可以继续执行
  • 只有属于同一个 slot 共享组的子任务,才会开启 slot 共享(默认都是 default 组)。不同组之间的任务是完全隔离的,必须分配到不同的 slot 上
  • 如果希望某个算子对应的任务完全独占一个 slot,或者只有某一部分算子共享 slot,可以手动指定其 slot 共享组。此时总共需要的 slot 数量,就是各个 slot 共享组最大并行度的总和
1
2
// 指定当前算子的共享组
.map(word -> Tuple2.of(word, 1L)).slotSharingGroup("1");

(2) 任务槽和并行度的关系

  • 任务槽是静态的概念,是指 TaskManager 具有的并发执行能力;而并行度是动态概念,也就是 TaskManager 运行程序时实际使用的并发能力
  • 假设一共有 3 个 TaskManager,每一个 TaskManager 中的 slot 数量设置为 3 个,那么一共有 9 个 task slot,表示集群最多能执行 9 个同一算子的并行子任务
  • 整个流处理程序的并行度就是所有算子并行度中最大的那个,这代表了运行程序需要的 slot 数量
  • 只有 slot 数量能满足程序最大并行度时,程序才能执行

三、作业提交流程

1. Standalone 会话模式作业提交流程

2. 逻辑流图/作业流图/执行流图/物理流图

  • 逻辑流图(StreamGraph)
    • 根据用户通过 DataStream API 编写的代码生成的最初的 DAG 图,用来表示程序的拓扑结构。这一步一般在客户端完成
  • 作业流图(JobGraph)
    • StreamGraph 经过优化后生成的就是作业流图(JobGraph),这是提交给 JobManager 的数据结构,确定了当前作业中所有任务的划分
    • 主要的优化为:将多个符合条件的节点链接在一起合并成一个任务节点,形成算子链,这样可以减少数据交换的消耗。JobGraph 一般也是在客户端生成的,在作业提交时传递给 JobMaster
  • 执行流图(ExecutionGraph)
    • JobMaster 收到 JobGraph 后,会根据它来生成执行流图(ExecutionGraph)。ExecutionGraph 是 JobGraph 的并行化版本,是调度层最核心的数据结构
    • 与 JobGraph 最大的区别就是按照并行度对并行子任务进行了拆分,并明确了任务间数据传输的方式
  • 物理流图(Physical Graph)
    • JobMaster 生成执行流图后,会将它分发给 TaskManager;各个 TaskManager 会根据执行图部署任务,最终的物理执行过程也会形成一张图,一般就叫作物理流图(Physical Graph)
    • 物理流图只是具体执行层面的图,并不是一个具体的数据结构
    • 物理流图主要就是在执行图的基础上,进一步确定数据存放的位置和收发的具体方式。有了物理流图,TaskManager 就可以对传递来的数据进行处理计算了

3. Yarn 应用模式作业提交流程