Kafka 是一个开源的, 分布式事件流处理系统,旨在处理实时数据流。
最初由 LinkedIn 开发,后来在 Apache 软件基金会开源,Kafka 现在被广泛用于构建高吞吐量、容错能力和可扩展的数据流、实时数据分析和事件驱动架构。
在卡夫卡之前的时代,传统的消息队列系统如RabbitMQ和ActiveMQ被广泛使用,但它们在处理大规模和高吞吐量的实时数据流时却存在限制。
Kafka 是这样设计的,为了通过提供以下这些功能来解决这些问题。
- 大规模数据处理 – Kafka 优化了高容量数据流的摄入、存储和分发。
- 容错性 – Kafka 在多个节点间复制数据,确保即使某个节点失败,数据仍然可用。
- 持久性 – 消息会持久化到磁盘,允许消费者在需要时重新播放消息。
- 支持事件驱动架构 – 它支持微服务之间的异步通信需求,非常适合现代云应用。
当你需要这些功能时,Kafka是个好选择。
- 高吞吐量、实时的数据处理 – 非常适合日志处理、金融交易和物联网数据流的应用场景。
- 微服务解耦 – Kafka作为中间件,可以帮助微服务异步通信,无需直接依赖对方。
- 事件驱动系统 – 如果你的系统架构设计主要是为了应对变化(比如用户事件触发多个后续操作),Kafka是很好的选择。
- 可靠的消息传递和持久性 – 与传统的消息队列可能丢失消息不同,Kafka保留消息一段时间(时间长短可以配置),确保消息的持久性和可重播性。
- 可扩展性和容错性 – Kafka的分布式特性使其能够水平扩展,同时通过复制确保容错性。
卡夫卡包含几个核心组件:
GIF
1. 信息
在 Kafka 中,消息 是数据的最基本单位。
它可以是一个 JSON 对象、一个字符串,或者任何形式的二进制数据。
消息可能有一个关联的键值,这决定了消息将被存放在哪个分区段里。
2. 话题:
一个话题是一个逻辑通道(例如:日志、交易、订单),生产者发送消息,消费者读取消息。话题帮助分类消息。
3. 制作人
一个生产者是指使用Kafka客户端发布消息到主题中的消息。消息可以通过以下三种方式发布,分别为:
- 发送并忘记 – 生产者发送消息而不等待确认,确保了最大速度但可能会丢失数据。
- 同步发送 – 生产者等待 Kafka 的确认后再继续,确保了数据的可靠性但增加了延时。
- 异步发送 – 生产者异步批量发送消息,提供了速度和可靠性的平衡。
点这里看图,这是一张关于...的图片。
Kafka 允许配置确认(ACK)来平衡可靠性和性能:
- ACK 0 – 不需要确认(最快但最危险)。
- ACK 1 – 消息由领导者接收到后进行确认(较快但相对不安全)。
- ACK All – 只有当所有副本接收到消息时才进行确认(较慢但最稳妥)。
生产者优化策略 (Producer Optimizations)
- 消息压缩与批量处理 – Kafka 生产者可以在发送消息到 Broker 之前对其进行批处理和压缩。这可以提高吞吐量并减少磁盘使用,但会增加 CPU 使用率。
- Avro 序列化与反序列化 – 使用 Avro 取代 JSON 需要事先定义模式,但它可以提高性能并降低存储消耗。
4. 磁盘分区
Kafka 主题被划分为 分区,这使得并行处理和可扩展性成为可能。
分区内的消息是按顺序排列且无法更改的。
5 用户
一个客户端从分区读取消息,并通过偏移追踪位置。
消费者可以调整偏移量,以重新处理旧消息。
Kafka 消费者采用的是轮询方式,这意味着它们会不断地向消息服务器请求数据,而是由消息服务器主动推送数据。
消费优化
-
分区分配策略:
- 范围 – 消费者获得连续的分区。
- 轮循 – 分区均匀地分配给各个消费者。
- 粘性 – 尽量减少重新平衡期间的变化。
- 协作式粘性 – 与粘性类似,但允许协作重新平衡。
- 批量大小设置 – 消费者可以定义每个轮询周期应检索的记录数或数据量。
6. 消费者群体
一个消费者小组就是一群一起处理订阅的主题消息的消费者。
Kafka 确保一个消费组中的每个消费者只会消费一个分区,这样就能保证消息的顺序了。
7. 偏置管理
当消费者读取到一条消息时,它会更新其相应的偏移位置——即其最后处理的消息的位置。
- 自动提交 – Kafka 自动定期提交偏移量。
- 手动提交 – 应用程序可以以同步或异步方式提交偏移量。
8. 中介
一个代理服务器是 Kafka 服务器,它保存消息、管理偏移量并处理客户端的请求。
多个经纪人组成一个Kafka集群(Kafka集群),以提高可扩展性和容错性。
9. 动物园管理员
Zookeeper 管理元数据,跟踪代理节点,并处理领导者选举。
不过,更新的 Kafka 版本正试图摆脱对 Zookeeper 的依赖性。
示例:Kafka实战要更好地理解 Kafka,让我们看一个简单的示例:一个生产者发送消息到一个主题,两个不同的消费者分别处理这些消息,一个模拟电子邮件通知功能,另一个则将消息存储在数据库中。
设置 Kafka(使用 docker-compose.yml)
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
restart: always
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
restart: always
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_INTERNAL://0.0.0.0:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
点击进入全屏 点击退出全屏
生产者代码 (producer.js)
const { Kafka } = require("kafkajs");
const kafka = new Kafka({
clientId: "family-producer",
brokers: ["localhost:9092"],
});
const producer = kafka.producer();
async function sendMessage() {
await producer.connect();
console.log("🟢 生产者已连接");
const message = {
id: Date.now(),
content: `Hi Mom! 当前时间是 ${new Date().getMinutes()}:${new Date().getSeconds()}`,
};
await producer.send({
topic: "family-topic",
messages: [{ value: JSON.stringify(message) }],
});
console.log(`📨 已发送: ${JSON.stringify(message)}`);
await producer.disconnect();
}
sendMessage();
全屏 退出全屏
接收电子邮件通知的脚本(consumer.js)
const { Kafka } = require("kafkajs");
const kafka = new Kafka({
clientId: "family-email-consumer",
brokers: ["localhost:9092"],
});
const consumer = kafka.consumer({ groupId: "email-group" });
async function consumeMessages() {
await consumer.connect();
await consumer.subscribe({ topic: "family-topic", fromBeginning: true });
console.log("🟢 邮件消费者已连接");
await consumer.run({
eachMessage: async ({ message }) => {
const msg = JSON.parse(message.value.toString());
console.log(`📩 收到通知: "${msg.content}"`);
console.log(`📧 发送了邮件: "${msg.content}" \n`);
},
});
}
consumeMessages();
进入全屏 退出全屏
数据库存储客户端(dbconsumer.js)
const { Kafka } = require("kafkajs");
const kafka = new Kafka({
clientId: "family-db-consumer",
brokers: ["localhost:9092"],
});
const consumer = kafka.consumer({ groupId: "db-group" });
async function consumeMessages() {
await consumer.connect();
await consumer.subscribe({ topic: "家庭主题", fromBeginning: true });
console.log("🟢 家庭数据库消费者已连接");
await consumer.run({
eachMessage: async ({ message }) => {
const msg = JSON.parse(message.value.toString());
console.log(`💾 正在将消息 "${msg.content}" 存入数据库 \n`);
},
});
}
consumeMessages();
点进来全屏 点出去全屏
Kafka 是一个强大的工具,极大地改变了实时数据流处理。
然而,尽管它提供了惊人的可扩展性和持久力,我们仍然需要确保它是否适合你的架构。
稍后见!我将写一篇下一篇文章比较 Kafka 和 Redis 的应用场景,看看我们应该在什么情况下选择哪一个。🚀
阿瑟拉(Athreya)也叫 Maneshwar技术写作者 | 10万+ 阅读者 | i3 槽配 Mint | 学习、构建、改进,边写边乐 :)
我一直都在使用或推广一个超级方便的工具,叫做LiveAPI,你知道吗?
LiveAPI 让你几分钟内就能搞定这些 API 的文档化
借助LiveAPI,您可以快速创建交互式API文档页面,让用户可以直接在浏览器中测试API。
如果你对手动为你的API接口创建文档感到厌烦,这个工具可能会让你的生活更轻松些。
资料来源:有些图片取自1。