手记

探究 Flink 框架底层(一)

大家好,我是大圣,很高兴又和大家见面。

前面几篇文章说了高并发、多线程、锁和MVCC的一系列等并发知识点,不知道小伙伴们看完有没有疑惑一个问题。

这个问题就是为什么我们在使用 Java 多线程的时候会出现数据竞争的问题,按理说 Flink 的并行计算,底层也是开多个线程去进行处理。

但是我们在使用 Flink 框架的时候却从来不需要考虑这些问题。

这篇文章咱们就来探究一下这个问题。

java 多线程为什么会存在数据竞争

举例说明

想象一个图书馆,有多个学生(线程)都想要借阅同一本书(共享资源/内存位置)。图书馆只有这一本书的一个副本。

单线程环境:这就像图书馆规定只允许一个学生进入并借阅书籍。这样,学生之间就不会因为想要同一本书而发生冲突。

多线程环境(Java或其他语言):现在图书馆允许多个学生同时进入。如果两个或多个学生都想要借同一本书,他们可能会抢夺这本书,因为它只有一个副本。这就是数据竞争。

多核CPU:想象我们的图书馆有多个入口和通道,这样更多的学生可以同时进入。这使得学生们更有可能在同一时间抢夺那本书。

单JVM实例:尽管有多个通道,但图书馆内的所有书籍都存储在同一个大房间里(共享的堆内存)。所以,无论学生从哪个通道进入,他们都想要借阅存储在这个房间里的那本特定的书。

专业解释

其实我觉得最本质的原因是 Java 程序是跑在一个 JVM 实例里面的,JVM的共享堆内存模型为Java线程提供了一个易于共享和修改数据的环境,导致多个线程并行执行并尝试访问或修改同一片内存区域。

有人说了,如果服务器的 CPU 是 4核的,我的程序用线程池开了 3 个线程,可能会 3 个线程并行的跑在 CPU 的 3 个核里面,但是这三个线程是跑在一个 JVM 实例里面的,程序相关的副本全部都只有一份,所以会存在数据竞争的问题,我觉得这是 Java 多线程会存在数据竞争的本质原因。

为什么 Flink 不会存在数据竞争

举例说明

大家先看一下下面这个代码:

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> stream = env.fromElements("a", "b", "c", "d");
        env.setParallelism(4)
        
        stream.map(new RichMapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                return value;
            }
        }).print();

        env.execute("MyFlinkJob");
    }
}

这个代码就是加载数据源,然后进行用了一个 map 算子,然后进行打印。

解释

在上面代码中我把并行度设置为 4 ,并且我这个程序是在 Yarn 集群上面跑的时候,然后 Slot 给的是 1 的时候。这个时候会启动 4 台 TaskManager,并且在每一台 TaskManager 里面启动一个 JVM 进程实例。

这样每个JVM 都有其自己的程序副本(这包括你的变量、函数、对象等),就相当于是跑在 4 台机器上,在物理上是隔离的。这样是不存在多个线程去操作同一个变量,进而操作同一块内存区域,从而出现数据竞争的情况。

上面这段话我感觉太抽象了,我举一个例子大家就明白了。

举例

例子:想象一个超级市场连锁品牌,它在四个不同的城市有四个独立的分店。每个分店都有自己的仓库和库存系统。

并行度为4

这就像这个连锁品牌在四个城市开设了四个独立的超市分店。每家店都独立运营。

Yarn集群上的执行 & Slot为1

每个城市的分店都有自己的仓库和管理者(这是TaskManager和它的JVM实例)。每个仓库都有自己的库存系统(这是你的程序和它的变量/对象)。

程序副本

即使所有分店都遵循相同的连锁品牌标准和操作方式,但每家店的库存系统都是独立的。如果一家店销售了一件商品,其他三家店的库存系统并不受影响。

物理隔离

由于这四个分店位于不同的城市,他们彼此之间是物理隔离的。一个城市的分店卖出的商品,不会影响其他城市分店的库存。这就像每个TaskManager运行在自己的JVM实例中,彼此间的数据和状态是隔离的。

总结:在这个例子中,每个超市分店都是独立的,就像Flink的每个TaskManager在自己的JVM实例中运行一样。即使它们都执行相同的任务(如销售商品),但由于它们在物理上是隔离的,所以不会存在数据竞争的情况。

争议点

有人说,你并行度为 4,然后 slot 为 2,这个时候只会启动 2 台 TaskManager,同时创建两个 JVM 实例,这样的话,也会出现数据竞争的情况的。

你觉得会吗?我觉得不会。

JVM 内存隔离:在 Flink 1.10 之前,所有的 Slot 都在同一个 JVM 进程中运行,并共享 JVM 的堆内存。这可能导致一个 Slot 中的任务因内存问题而影响到其他 Slot 中的任务。

从 Flink 1.10 开始,Flink 引入了一个新特性,允许对每个 Slot 的 JVM 堆内存进行隔离。这意味着,尽管每个 Slot 仍然在同一个 JVM 中运行,但每个 Slot 都有自己的内存区域,这有助于防止一个任务的内存问题影响其他任务。

所以也不会出现多个线程操作同一块区域。

总结

有人又提出疑问说,当 Flink 通过 keyBy 算子,进行 Shuffle 的时候,这种情况也不会出现数据竞争吗?

答案是不会的。 下篇文章咱们继续说为什么不会,也会对 keyBy 源码以及设计理念进行深层次分析,全是干货。

另外之前的文章说会更新 Flink Sink HDFS 源码分析的,但是这一篇没有说,这是因为说 Flink Sink HDFS 的时候会用到今天说的内容,所以我就想提前写这一篇做个铺垫。

0人推荐
随时随地看视频
慕课网APP