大家好,我是大圣,很高兴又和大家见面。
前面几篇文章说了高并发、多线程、锁和MVCC的一系列等并发知识点,不知道小伙伴们看完有没有疑惑一个问题。
这个问题就是为什么我们在使用 Java 多线程的时候会出现数据竞争的问题,按理说 Flink 的并行计算,底层也是开多个线程去进行处理。
但是我们在使用 Flink 框架的时候却从来不需要考虑这些问题。
这篇文章咱们就来探究一下这个问题。
java 多线程为什么会存在数据竞争
举例说明
想象一个图书馆,有多个学生(线程)都想要借阅同一本书(共享资源/内存位置)。图书馆只有这一本书的一个副本。
单线程环境:这就像图书馆规定只允许一个学生进入并借阅书籍。这样,学生之间就不会因为想要同一本书而发生冲突。
多线程环境(Java或其他语言):现在图书馆允许多个学生同时进入。如果两个或多个学生都想要借同一本书,他们可能会抢夺这本书,因为它只有一个副本。这就是数据竞争。
多核CPU:想象我们的图书馆有多个入口和通道,这样更多的学生可以同时进入。这使得学生们更有可能在同一时间抢夺那本书。
单JVM实例:尽管有多个通道,但图书馆内的所有书籍都存储在同一个大房间里(共享的堆内存)。所以,无论学生从哪个通道进入,他们都想要借阅存储在这个房间里的那本特定的书。
专业解释
其实我觉得最本质的原因是 Java 程序是跑在一个 JVM 实例里面的,JVM的共享堆内存模型为Java线程提供了一个易于共享和修改数据的环境,导致多个线程并行执行并尝试访问或修改同一片内存区域。
有人说了,如果服务器的 CPU 是 4核的,我的程序用线程池开了 3 个线程,可能会 3 个线程并行的跑在 CPU 的 3 个核里面,但是这三个线程是跑在一个 JVM 实例里面的,程序相关的副本全部都只有一份,所以会存在数据竞争的问题,我觉得这是 Java 多线程会存在数据竞争的本质原因。
为什么 Flink 不会存在数据竞争
举例说明
大家先看一下下面这个代码:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream = env.fromElements("a", "b", "c", "d");
env.setParallelism(4)
stream.map(new RichMapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
return value;
}
}).print();
env.execute("MyFlinkJob");
}
}
这个代码就是加载数据源,然后进行用了一个 map 算子,然后进行打印。
解释
在上面代码中我把并行度设置为 4 ,并且我这个程序是在 Yarn 集群上面跑的时候,然后 Slot 给的是 1 的时候。这个时候会启动 4 台 TaskManager,并且在每一台 TaskManager 里面启动一个 JVM 进程实例。
这样每个JVM 都有其自己的程序副本(这包括你的变量、函数、对象等),就相当于是跑在 4 台机器上,在物理上是隔离的。这样是不存在多个线程去操作同一个变量,进而操作同一块内存区域,从而出现数据竞争的情况。
上面这段话我感觉太抽象了,我举一个例子大家就明白了。
举例
例子:想象一个超级市场连锁品牌,它在四个不同的城市有四个独立的分店。每个分店都有自己的仓库和库存系统。
并行度为4
这就像这个连锁品牌在四个城市开设了四个独立的超市分店。每家店都独立运营。
Yarn集群上的执行 & Slot为1
每个城市的分店都有自己的仓库和管理者(这是TaskManager和它的JVM实例)。每个仓库都有自己的库存系统(这是你的程序和它的变量/对象)。
程序副本
即使所有分店都遵循相同的连锁品牌标准和操作方式,但每家店的库存系统都是独立的。如果一家店销售了一件商品,其他三家店的库存系统并不受影响。
物理隔离
由于这四个分店位于不同的城市,他们彼此之间是物理隔离的。一个城市的分店卖出的商品,不会影响其他城市分店的库存。这就像每个TaskManager运行在自己的JVM实例中,彼此间的数据和状态是隔离的。
总结:在这个例子中,每个超市分店都是独立的,就像Flink的每个TaskManager在自己的JVM实例中运行一样。即使它们都执行相同的任务(如销售商品),但由于它们在物理上是隔离的,所以不会存在数据竞争的情况。
争议点
有人说,你并行度为 4,然后 slot 为 2,这个时候只会启动 2 台 TaskManager,同时创建两个 JVM 实例,这样的话,也会出现数据竞争的情况的。
你觉得会吗?我觉得不会。
JVM 内存隔离:在 Flink 1.10 之前,所有的 Slot 都在同一个 JVM 进程中运行,并共享 JVM 的堆内存。这可能导致一个 Slot 中的任务因内存问题而影响到其他 Slot 中的任务。
从 Flink 1.10 开始,Flink 引入了一个新特性,允许对每个 Slot 的 JVM 堆内存进行隔离。这意味着,尽管每个 Slot 仍然在同一个 JVM 中运行,但每个 Slot 都有自己的内存区域,这有助于防止一个任务的内存问题影响其他任务。
所以也不会出现多个线程操作同一块区域。
总结
有人又提出疑问说,当 Flink 通过 keyBy 算子,进行 Shuffle 的时候,这种情况也不会出现数据竞争吗?
答案是不会的。 下篇文章咱们继续说为什么不会,也会对 keyBy 源码以及设计理念进行深层次分析,全是干货。
另外之前的文章说会更新 Flink Sink HDFS 源码分析的,但是这一篇没有说,这是因为说 Flink Sink HDFS 的时候会用到今天说的内容,所以我就想提前写这一篇做个铺垫。