手记

为什么选择Kafka?开发者友好的事件驱动架构指南

弗兰兹·卡夫卡(Franz Kafka)是什么呢?

Kafka 是一个开源的, 分布式事件流处理系统,旨在处理实时数据流。

最初由 LinkedIn 开发,后来在 Apache 软件基金会开源,Kafka 现在被广泛用于构建高吞吐量、容错能力和可扩展的数据流、实时数据分析和事件驱动架构。

Kafka 解决了哪些问题?

在卡夫卡之前的时代,传统的消息队列系统如RabbitMQActiveMQ被广泛使用,但它们在处理大规模和高吞吐量的实时数据流时却存在限制。

Kafka 是这样设计的,为了通过提供以下这些功能来解决这些问题。

  • 大规模数据处理 – Kafka 优化了高容量数据流的摄入、存储和分发。
  • 容错性 – Kafka 在多个节点间复制数据,确保即使某个节点失败,数据仍然可用。
  • 持久性 – 消息会持久化到磁盘,允许消费者在需要时重新播放消息。
  • 支持事件驱动架构 – 它支持微服务之间的异步通信需求,非常适合现代云应用。
何时使用Kafka?

当你需要这些功能时,Kafka是个好选择。

  • 高吞吐量、实时的数据处理 – 非常适合日志处理、金融交易和物联网数据流的应用场景。
  • 微服务解耦 – Kafka作为中间件,可以帮助微服务异步通信,无需直接依赖对方。
  • 事件驱动系统 – 如果你的系统架构设计主要是为了应对变化(比如用户事件触发多个后续操作),Kafka是很好的选择。
  • 可靠的消息传递和持久性 – 与传统的消息队列可能丢失消息不同,Kafka保留消息一段时间(时间长短可以配置),确保消息的持久性和可重播性。
  • 可扩展性和容错性 – Kafka的分布式特性使其能够水平扩展,同时通过复制确保容错性。
Kafka 是怎么运作的

卡夫卡包含几个核心组件:

GIF

1. 信息

在 Kafka 中,消息 是数据的最基本单位。

它可以是一个 JSON 对象、一个字符串,或者任何形式的二进制数据。

消息可能有一个关联的键值,这决定了消息将被存放在哪个分区段里。

2. 话题:

一个话题是一个逻辑通道(例如:日志、交易、订单),生产者发送消息,消费者读取消息。话题帮助分类消息。

3. 制作人

一个生产者是指使用Kafka客户端发布消息到主题中的消息。消息可以通过以下三种方式发布,分别为:

  • 发送并忘记 – 生产者发送消息而不等待确认,确保了最大速度但可能会丢失数据。
  • 同步发送 – 生产者等待 Kafka 的确认后再继续,确保了数据的可靠性但增加了延时。
  • 异步发送 – 生产者异步批量发送消息,提供了速度和可靠性的平衡。

点这里看图,这是一张关于...的图片。

Kafka 允许配置确认(ACK)来平衡可靠性和性能:

  • ACK 0 – 不需要确认(最快但最危险)。
  • ACK 1 – 消息由领导者接收到后进行确认(较快但相对不安全)。
  • ACK All – 只有当所有副本接收到消息时才进行确认(较慢但最稳妥)。

生产者优化策略 (Producer Optimizations)

  • 消息压缩与批量处理 – Kafka 生产者可以在发送消息到 Broker 之前对其进行批处理和压缩。这可以提高吞吐量并减少磁盘使用,但会增加 CPU 使用率。
  • Avro 序列化与反序列化 – 使用 Avro 取代 JSON 需要事先定义模式,但它可以提高性能并降低存储消耗。

4. 磁盘分区

Kafka 主题被划分为 分区,这使得并行处理和可扩展性成为可能。

分区内的消息是按顺序排列且无法更改的。

5 用户

一个客户端从分区读取消息,并通过偏移追踪位置。

消费者可以调整偏移量,以重新处理旧消息。

Kafka 消费者采用的是轮询方式,这意味着它们会不断地向消息服务器请求数据,而是由消息服务器主动推送数据。

消费优化

  • 分区分配策略:

  • 范围 – 消费者获得连续的分区。
  • 轮循 – 分区均匀地分配给各个消费者。
  • 粘性 – 尽量减少重新平衡期间的变化。
  • 协作式粘性 – 与粘性类似,但允许协作重新平衡。
  • 批量大小设置 – 消费者可以定义每个轮询周期应检索的记录数或数据量。

6. 消费者群体

一个消费者小组就是一群一起处理订阅的主题消息的消费者。

Kafka 确保一个消费组中的每个消费者只会消费一个分区,这样就能保证消息的顺序了。

7. 偏置管理

当消费者读取到一条消息时,它会更新其相应的偏移位置——即其最后处理的消息的位置。

  • 自动提交 – Kafka 自动定期提交偏移量。
  • 手动提交 – 应用程序可以以同步或异步方式提交偏移量。

8. 中介

一个代理服务器是 Kafka 服务器,它保存消息、管理偏移量并处理客户端的请求。

多个经纪人组成一个Kafka集群(Kafka集群),以提高可扩展性和容错性。

9. 动物园管理员

Zookeeper 管理元数据,跟踪代理节点,并处理领导者选举。

不过,更新的 Kafka 版本正试图摆脱对 Zookeeper 的依赖性。

示例:Kafka实战

要更好地理解 Kafka,让我们看一个简单的示例:一个生产者发送消息到一个主题,两个不同的消费者分别处理这些消息,一个模拟电子邮件通知功能,另一个则将消息存储在数据库中。

设置 Kafka(使用 docker-compose.yml)

    services:
      zookeeper:
        image: confluentinc/cp-zookeeper:latest
        container_name: zookeeper
        restart: always
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181

      kafka:
        image: confluentinc/cp-kafka:latest
        container_name: kafka
        restart: always
        depends_on:
          - zookeeper
        ports:
          - "9092:9092"
          - "29092:29092"
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
          KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_INTERNAL://0.0.0.0:29092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

点击进入全屏 点击退出全屏

生产者代码 (producer.js)

    const { Kafka } = require("kafkajs");

    const kafka = new Kafka({
      clientId: "family-producer",
      brokers: ["localhost:9092"],
    });
    const producer = kafka.producer();

    async function sendMessage() {
      await producer.connect();
      console.log("🟢 生产者已连接");

      const message = {
        id: Date.now(),
        content: `Hi Mom! 当前时间是 ${new Date().getMinutes()}:${new Date().getSeconds()}`,
      };
      await producer.send({
        topic: "family-topic",
        messages: [{ value: JSON.stringify(message) }],
      });

      console.log(`📨 已发送: ${JSON.stringify(message)}`);
      await producer.disconnect();
    }

    sendMessage();

全屏 退出全屏

接收电子邮件通知的脚本(consumer.js)

const { Kafka } = require("kafkajs");

const kafka = new Kafka({
  clientId: "family-email-consumer",
  brokers: ["localhost:9092"],
});
const consumer = kafka.consumer({ groupId: "email-group" });

async function consumeMessages() {
  await consumer.connect();
  await consumer.subscribe({ topic: "family-topic", fromBeginning: true });
  console.log("🟢 邮件消费者已连接");

  await consumer.run({
    eachMessage: async ({ message }) => {
      const msg = JSON.parse(message.value.toString());
      console.log(`📩 收到通知: "${msg.content}"`);
      console.log(`📧 发送了邮件: "${msg.content}" \n`);
    },
  });
}

consumeMessages();

进入全屏 退出全屏

数据库存储客户端(dbconsumer.js)

    const { Kafka } = require("kafkajs");

    const kafka = new Kafka({
      clientId: "family-db-consumer",
      brokers: ["localhost:9092"],
    });
    const consumer = kafka.consumer({ groupId: "db-group" });

    async function consumeMessages() {
      await consumer.connect();
      await consumer.subscribe({ topic: "家庭主题", fromBeginning: true });
      console.log("🟢 家庭数据库消费者已连接");

      await consumer.run({
        eachMessage: async ({ message }) => {
          const msg = JSON.parse(message.value.toString());
          console.log(`💾 正在将消息 "${msg.content}" 存入数据库 \n`);
        },
      });
    }

    consumeMessages();

点进来全屏 点出去全屏

最后的想法

Kafka 是一个强大的工具,极大地改变了实时数据流处理。

然而,尽管它提供了惊人的可扩展性和持久力,我们仍然需要确保它是否适合你的架构。

稍后见!我将写一篇下一篇文章比较 Kafka 和 Redis 的应用场景,看看我们应该在什么情况下选择哪一个。🚀

阿瑟拉(Athreya)也叫 Maneshwar

技术写作者 | 10万+ 阅读者 | i3 槽配 Mint | 学习、构建、改进,边写边乐 :)

我一直都在使用或推广一个超级方便的工具,叫做LiveAPI,你知道吗?

LiveAPI 让你几分钟内就能搞定这些 API 的文档化

借助LiveAPI,您可以快速创建交互式API文档页面,让用户可以直接在浏览器中测试API。

。点击可以查看详细内容。

如果你对手动为你的API接口创建文档感到厌烦,这个工具可能会让你的生活更轻松些。

资料来源:有些图片取自1

0人推荐
随时随地看视频
慕课网APP