使用Debezium、Kafka和NestJS技术关于如何跟踪MongoDB数据变更的分步教程,包括缓存失效用例。
简介在现代分布式应用架构中,所有内容都需要保持同步和及时更新;这确实是为了满足业务使用的需要,从技术角度来看,这既具挑战性又复杂。
最近我碰到了一个案例,我们需要追踪MongoDB中的变化。我们的场景涉及处理来自不同应用程序的各种查询,将它们存储在MongoDB中,并确保数据的一致性。我们有一个REST层,提供了来自MongoDB的查询列表,对查询进行分页,并通过使用Redis缓存来优化性能。关键的挑战是让缓存的数据失效,以便用户总是能看到最新的信息。
在这篇文章中,我们将探讨如何利用变更数据捕获工具 Debezium 设计缓存失效策略。不过,我们不会涉及具体的缓存实现细节。
高级设计
我们如何追踪每一个 MongoDB 数据变更?
MongoDB 提供了变更流,用于实时通知,或者我们可以利用第三方的数据变更捕获开源工具 Debezium,它内部引用了 MongoDB 的操作日志。
- 变更流: 变更流允许应用程序跟踪数据库中数据的实时更改,提供了一种订阅数据更改的方式,可以订阅单个集合、多个集合、一个数据库或整个部署中的所有数据更改。
const changeStream = collection.watch();
changeStream.on('change', (next) => {
console.log('有变化了', next);
});
变更流数据
2. 变更数据捕获: 变更数据捕获是另一种用于跟踪数据库中变化的方法。它提供了更详细的变化记录,包括更改前后数据的值。我们正在考虑使用开源的变更数据捕获工具Debezium,其中一个数据源是oplog(操作日志)。Oplog(操作日志)是一个特殊的固定大小的集合,记录了对数据库进行的所有写操作。
await consumer.subscribe({
topic: 'prefix.database.collection_name',
fromBeginning: true,
});
await consumer.run({
eachMessage: async (messagePayload: EachMessagePayload) => {
try {
const { op, after, before } = JSON.parse(messagePayload.value.toString());
switch (op) {
case "c": // 创建
break;
case "u": // 更新
break;
case "d": // 删除
break;
}
} catch (error) {
console.error(
`处理主题 ${messagePayload.topic} 的消息时遇到错误:`,
error.stack
);
}
},
});
Debezium 使用 MongoDB 的 oplog 来跟踪所有数据库变更(插入、更新、删除),并将这些变更实时推送到类似 Kafka 的消息队列。您的应用程序可以消费这些变更。
疾控中心
要了解各个组件为了更好地理解我们用例中的组件,我们需要列出正在使用的技术工具和解决方案。我们使用 Docker Compose 来尝试本地解决方案,并使用 Docker Desktop 更直观地了解发生了什么。我们还需要配置源 Debezium MongoDB 连接器、Kafka、MongoDB、Redis 和 Debezium。
- 容器化: 使用 Docker Compose 在
docker-compose.yml
文件中定义 MongoDB、Kafka、Zookeeper(Kafka 所需)、Debezium 和 Redis 的服务。此设置确保每个服务都有隔离的环境,便于管理和配置依赖。 - MongoDB 配置: 需要在容器中运行 MongoDB,并将其配置为副本集,因为 Debezium 需要通过 oplog 监控更改。
- Debezium 配置: 配置 Debezium 的 MongoDB 连接器,以监视 MongoDB 中特定的数据库或集合。这包括设置连接器配置,指定要捕获哪些更改以及如何将这些更改流式传输到 Kafka 主题。
- Kafka 集成: 确保 Kafka 和 Zookeeper 运行正常。Debezium 将捕获的更改推送到 Kafka 主题,其他应用程序或服务可以消费这些主题中的更改。
- NestJS 开发: 需要使用 NestJS 开发后端服务或微服务,这些服务连接到 Kafka 以消费这些更改事件。这包括为特定主题创建消费者,以处理实时更新或根据数据更改执行操作。
Debezium是什么?Debezium的工作原理是什么?
Debezium 是一个开源工具,旨在帮助将现有数据源中的数据流式传输到 Kafka,使传统数据源能够融入到基于流的架构中。它提供了高质量且开源的变更捕获连接器,支持多种数据库系统,因此它是从关系型数据库流式传输数据到 Kafka 的推荐工具。
[Debezium 架构]通常,你用 Apache Kafka Connect 来部署 Debezium。Kafka Connect 是一个框架和运行时……debezium.io](https://debezium.io/documentation/reference/stable/architecture.html?source=post_page-----ae620b20c98d--------------------------------)Debezium 可以捕获数据库中的更改并将这些更改发布为消息到消息代理,比如 Apache Kafka。它支持许多关系型和 NoSQL 数据库,并使用 Kafka Connect 作为静态数据和实时数据之间的桥梁。可以配置 Debezium 以捕获数据库中的更改并将其发布到 Kafka 主题,如下图所示。
Debezium是什么?Debezium是怎么运作的。
实施(更常用于日常对话中)我们可以在多种情况下应用变化数据捕获,比如以下用例:
- 事件驱动的通知:当特定数据库更改发生时,触发实时警报或通知。
- 实时分析:构建仪表盘以实时反映 MongoDB 数据库的更新。
- 数据复制:实时流传输 MongoDB 更改到其他数据库,如 PostgreSQL 或 Elasticsearch。
- 缓存管理:根据数据库更改实时更新或无效缓存数据。
- 审计跟踪:维护所有更改的全面日志,以满足合规性和调试需求。
当我寻找缓存失效的用例时,我发现了一篇 Gunnar Morling(见 https://github.com/gunnarmorling)在 Debezium 博客上的文章;链接如下。
使用变更数据捕获自动失效缓存:Debezium是一个开源的分布式变更数据捕获平台:启动它,指向你的数据库系统,然后……了解更多详情,请访问让我们试着在本地机器上设置并实施变更数据捕获功能。
第一步:使用 Docker Compose 来配置 MongoDB
如我之前所说,为了支持 MongoDB 的变更数据捕获功能,我们需要配置一个副本集;我们不能在单机版的 MongoDB 上使用它。单机版的 MongoDB 不保存操作日志(oplog);这个功能只在 MongoDB 作为副本集或分片集群时才提供。
为了追踪文档在更改前后的状态,需要在集合上启用更改前后的状态追踪。
- 副本集配置:要在本地机器上将MongoDB配置为副本集,我们必须启用身份验证,以确保节点间的安全连接,并为用于内部身份验证的SSL密钥文件提供适当的权限。
- 变更流配置:配置MongoDB以启用为希望监控的集合启用变更前后的数据捕获。这涉及设置合适的命令或脚本来激活这些功能,确保记录文档的原始状态(变更前)和修改状态(变更后)。
- 创建一个初始的Shell脚本文件
#!/bin/bash
# 此脚本会初始化 MongoDB 的副本集。它会等待 MongoDB 服务准备好接受连接,然后使用 'mongosh' 命令连接到 MongoDB 并初始化副本集配置。
# 脚本设计为在一个依赖于 MongoDB 容器的单独容器中运行,以确保在尝试初始化副本集之前 MongoDB 服务已经完全启动并运行。
echo ====================================================
echo ============= 初始化副本集 =============
echo ====================================================
# 循环直到 MongoDB 服务准备好接受连接
until mongosh --host mongo:27017 --eval 'quit(0)' &>/dev/null; do
echo "等待 mongod 启动..."
sleep 5
done
echo "MongoDB 启动成功,现在开始初始化副本集..."
# 连接到 MongoDB 服务并初始化副本集
mongosh --host mongo:27017 -u root -p rootpassword --authenticationDatabase admin <<EOF
rs.initiate({
_id: "rs0",
members: [
{ _id: 0, host: "localhost:27017" }
]
})
while (!rs.isMaster().ismaster) {
sleep(1000); // 每秒检查一次主节点状态
}
print("副本集初始化完毕。");
use database;
// 检查集合是否已存在
if (db.getCollectionNames().includes("queries")) {
print("集合 'queries' 已存在。正在修改其设置...");
db.runCommand({
collMod: "queries",
changeStreamPreAndPostImages: { enabled: true }
});
} else {
print("集合 'queries' 不存在。正在创建...");
db.createCollection("queries", {
changeStreamPreAndPostImages: { enabled: true }
});
}
EOF
echo "副本集初始化完成,预图象已启用。"
2. 创建SSL密钥对
openssl rand -base64 756 > ./init/key # 生成一个 base64 编码的随机密钥并保存到 key 文件
chmod 400 ./init/key # 设置 key 文件权限为只读
chmod +x ./init/init-replica.sh # 使 init-replica.sh 脚本可执行
3. 创建一个 Docker Compose 配置文件
services:
mongo:
image: mongo:latest
restart: unless-stopped
command: ["--replSet", "rs0", "--keyFile", "/etc/mongo-keyfile"]
environment:
- MONGO_INITDB_ROOT_USERNAME=root
- MONGO_INITDB_ROOT_PASSWORD=rootpassword
- MONGO_INITDB_DATABASE=database
- MONGO_REPLICA_SET_NAME=rs0
ports:
- '27017:27017'
volumes:
- mongodb_data:/data/db
- ./init/key:/etc/mongo-keyfile:ro
networks:
- cdc网络
健康检查:
test: mongosh --host localhost:27017 --eval 'db.adminCommand("ping")' || exit 1
interval: 5s
timeout: 30s
start_period: 0s
start_delay: 1s
retries: 30
mongo-init-replica:
image: mongo:latest
依赖于:
- mongo
volumes:
- ./init/init-replica.sh:/docker-entrypoint-initdb.d/init-replica.sh:ro
entrypoint: ["/docker-entrypoint-initdb.d/init-replica.sh"]
networks:
- cdc网络
卷:
mongodb_data:
driver: local
网络:
cdc网络:
driver: bridge
4.. 测试
我们如何确保设置无误,或者如何查看数据库和集合等资料?对于这些问题,我们可以使用MongoDB Compass,并需要一个连接字符串来连接数据库。
MONGODB_URI=mongodb://root:rootpassword@localhost:27017/database?authSource=admin&replicaSet=rs0
Compass:MongoDB的GUI
第2步:用Docker Compose搭建Kafka
以下是用于设置Apache Kafka的Docker Compose配置,我们将使用它,并且还将使用Kafka UI来浏览Kafka界面。
- 创建一个 Docker Compose 文件
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.2
ports:
- '2181:2181'
environment:
ZOOKEEPER_CLIENT_PORT: 2181,
ZOOKEEPER_TICK_TIME: 2000
networks:
- cdc-network,
kafka:
image: confluentinc/cp-kafka:7.5.2
depends_on:
- zookeeper
ports:
- '29092:29092'
environment:
KAFKA_BROKER_ID: 1,
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181,
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092,
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT,
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
networks:
- cdc-network,
kafka-ui:
image: provectuslabs/kafka-ui:latest
depends_on:
- kafka
ports:
- '9090:8080'
environment:
KAFKA_CLUSTERS_0_NAME: local,
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092,
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
networks:
- cdc-network,
- 试试
我们用端口 9090 来运行 Kafka UI;请看下面的运行时的截图。
测试版 Kafka
第三步:设置 Debezium 与 Docker Compose
我们将使用Debezium 2.0,因为它支持多个数据库。Debezium服务依赖于Kafka和MongoDB,所以我们要先确保启动它们。
# Debezium MongoDB 连接器配置
debezium-mongo:
image: debezium/connect:2.0
依赖:
- kafka
- mongo
环境变量:
BOOTSTRAP_SERVERS: kafka:9092 # 启动服务器地址: kafka:9092
GROUP_ID: 1 # 组ID
CONFIG_STORAGE_TOPIC: debezium_config # 配置存储主题
OFFSET_STORAGE_TOPIC: debezium_offset # 偏移量存储主题
STATUS_STORAGE_TOPIC: debezium_status # 状态存储主题
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter # 键转换器
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter # 值转换器
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: false # 是否启用值转换器模式
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: false # 是否启用键转换器模式
端口映射:
- '8083:8083'
网络:
- cdc-network: CDC网络
我们将使用本机的8083端口,该端口用来运行Kafka Connect REST API接口,以管理连接器、查看状态等操作。
在我们的情况下,我们需要使用两个端点(endpoint):一个是已配置的连接器,另一个是连接器的状态。
RESTful API接口
我们也可以使用Debezium UI来创建连接器并查看状态,这是一个简单的方法,但在使用之前,我们首先需要做一些设置。
GitHub - debezium/debezium-ui: 一个用于 Debezium 的 Web 界面;有问题请在这里报告:https://issues.redhat.com/browse/DBZ。 - debezium/debezium-ui
删除步骤4:设置(MongoDB Debezium连接器):
这里我们需要确保传递所有有效信息,因为这是真正的窍门,可以利用Kafka同步MongoDB的数据变更。
配置 MongoDB Debezium 连接器包括监控 MongoDB 的副本集,定义源配置的属性,并将其与您的 Kafka Connect 设置集成。
- 创建或设置连接器
在我们的案例中,我们采用的配置如下;请根据您的配置更新相应的值。
发送HTTP POST请求的目标URL。
访问该地址: http://localhost:8083/connectors
配置
{
"name": "mongodb-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
// MongoDB 连接详情
"mongodb.hosts": "rs0/mongo:27017", // 请替换为您的 MongoDB 主机,
"mongodb.name": "dbserver1", // MongoDB 服务器的逻辑名称,
// 认证
"mongodb.user": "root",
"mongodb.password": "rootpassword",
"mongodb.authsource": "admin",
// 捕获模式 - 此设置确保您捕获更新的前后状态
"capture.mode": "change_streams_update_full_with_pre_image",
// 监控的集合
"database.include.list": "database", // 根据需要调整
"collection.include.list": "database.queries", // 根据需要调整
// 任务数量
"tasks.max": "3", // 根据资源和需求调整
// Kafka 主题命名
"topic.prefix": "dbserver1", // Kafka 主题的前缀
// 架构和事件处理
"output.schema": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
// 性能调优
"snapshot.mode": "initial", // 启动时执行初始快照
"snapshot.max.threads": "4", // 快照线程的数量
// 事务元数据
"provide.transaction.metadata": "true",
// 错误处理
"errors.retry.timeout": "0", // 失败时不重试,生产环境中您可能需要调整此设置
"errors.tolerance": "none", // 对错误的容忍度
// 监控心跳
"heartbeat.interval.ms": "30000", // 每 30 秒一次
// 其他
"max.queue.size": "8192", // 可以在内存中缓冲的最大记录数
"max.batch.size": "2048",
"poll.interval.ms": "1000" // 每 1000 毫秒调用一次
}
}
- 回答
{
"name": "mongodb-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.hosts": "rs0/mongo:27017",
"mongodb.name": "dbserver1",
"mongodb.user": "root",
"mongodb.password": "rootpassword",
"mongodb.authsource": "admin",
"capture.mode": "change_streams_update_full_with_pre_image",
"database.include.list": "database",
"collection.include.list": "database.queries",
"tasks.max": "3",
"topic.prefix": "dbserver1",
"output.schema": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"snapshot.mode": "initial",
"snapshot.max.threads": "4",
"provide.transaction.metadata": "true",
"errors.retry.timeout": "0",
"errors.tolerance": "none",
"heartbeat.interval.ms": "30000",
"max.queue.size": "8192",
"max.batch.size": "2048",
"poll.interval.ms": "1000",
"name": "mongodb-connector1"
},
"tasks": [],
"type": "source"
}
2. 查看状态
- HTTP GET 请求
http://localhost:8083/connectors/mongodb-connector/status
- 回答
{
"name": "mongodb-connector",
"connector": {
"状态": "RUNNING",
"worker_id": "172.18.0.7:8083"
},
"tasks": [
{
"id": 0,
"状态": "RUNNING",
"worker_id": "172.18.0.7:8083"
}
],
"type": "source"
}
如需了解更多详情,请查看下方的 MongoDB Debezium 连接器文档,该文档包含所需的所有细节。
Debezium MongoDB 连接器插件Debezium 的 MongoDB 连接器插件用于实时监控 MongoDB 副本集或分片集群中的文档变更完成此设置后,我们可以验证Kafka仪表板是否运行正常,并确保所有相关的主题已经在Kafka中创建。我们甚至可以检查与主题prefix.database_name.collection_name
下的集合相对应的消息。(例如:dbserver1.database.queries
)
卡夫卡作品的主题
步骤 5:设置 NestJS 以接收 Kafka 事件
如果你是NestJS的新手,请阅读下面为初学者准备的文章。如果你要找一个基本的Node.js模板(使用Express.js),请参考this repository。
const kafka = new Kafka({
clientId: this.appConfigService.kafkaClientId,
brokers: this.appConfigService.kafkaBrokers,
});
// 消费者组,允许其接收该组内的 Kafka 主题的消息
this.databaseConsumer = kafka.consumer({ groupId: 'change-tracker-group' }); // 这定义了一个名为 'change-tracker-group' 的消费者组
async onModuleInit() {
try {
await this.initializeConsumer(
this.databaseConsumer,
this.handleDatabaseChangeMessage,
);
} catch (error) {
console.error('控制台打印("消费者初始化出错:", 错误)');
}
}
这里可以看到,我们有个组ID,这个ID和之前的配置没关系。它创建了一个Kafka消费者,订阅了特定的消费者组(如change-tracker-group
),从而可以从该组内的Kafka主题接收消息。
在订阅主题时,我们可以使用 prefix.database_name.collection_name
这种格式(例如 dbserver1.database.queries
),并参照我们之前提供的配置信息。
private async initializeConsumer(
consumer: Consumer,
messageHandler: (messagePayload: EachMessagePayload) => Promise<void>,
) {
try {
await consumer.connect();
await consumer.subscribe({
topic: 'dbserver1.database.queries',
fromBeginning: true,
});
await consumer.run({
eachMessage: async (messagePayload: EachMessagePayload) => {
try {
await messageHandler.call(this, messagePayload);
} catch (error) {
console.error(
`处理来自主题 ${messagePayload.topic} 的消息时出错:`,
error.stack,
);
}
},
});
} catch (error) {
console.error('初始化消费者时出错:', error.stack);
}
}
private async handleDatabaseChangeMessage({ message }: EachMessagePayload) {
if (!message.value) {
console.warn('收到没有值的消息,跳过处理...');
return;
}
try {
const { op, after, before } = JSON.parse(message.value.toString());
console.info(`消息内容: ${message.value}`);
switch (op) {
case 'c':
case 'u':
case 'd':
await this.handleCacheInvalidation(op, after, before);
break;
default:
console.warn(`未知的操作: ${op}`);
break;
}
} catch (error) {
console.error('处理数据库变更消息时出错:', error.stack);
}
}
源码
第六步:测试一下安装情况
我们可以运行 Docker Compose 命令来启动创建的 Docker Compose 文件,并且只需运行 npm start
就可以启动应用。
Docker Compose工具
- 检查 Docker 桌面应用,确认所有容器是否已创建?
Docker桌面
- 运行 NestJS 应用
npm start
- 尝试在 MongoDB 中创建或更新一条记录。
MongoDB操作
在上面的截图里,左边我们创建或更新了一条记录,而在右边我们可以看到这是我们订阅的主题中的消息。
请见下文我们在该主题上收到的消息。
{
"before": "{\"_id\": {\"$oid\": \"67741cc0e35ad0bfd51115c8\"},\"userEmail\": \"user1@example.com\",\"moduleName\": \"prediction\",\"queryText\": \"今天气温会是多少?\",\"metadata\": {\"priority\": \"high\"}}",
"after": "{\"_id\": {\"$oid\": \"67741cc0e35ad0bfd51115c8\"},\"userEmail\": \"user2@example.com\",\"moduleName\": \"prediction\",\"queryText\": \"今天气温会是多少?\",\"metadata\": {\"priority\": \"high\"}}",
"patch": null,
"filter": null,
"updateDescription": {
"removedFields": null,
"updatedFields": "{\"userEmail\": \"user2@example.com\"}",
"truncatedArrays": null
},
"source": {
"version": "2.0.1.Final",
"connector": "mongodb",
"name": "dbserver1",
"ts_ms": 1735662855000,
"snapshot": "快照",
"db": "database",
"sequence": null,
"rs": "rs0",
"collection": "queries",
"ord": 2,
"lsid": null,
"txnNumber": "事务编号": null
},
"op": "u",
"ts_ms": 1735662855656,
"transaction": null
}
源码
请查看下面的源代码仓库,其中包含了从头到尾的解决方案,但这还不能直接用于生产。希望您能在这里或GitHub的问题部分提出您的意见和建议。
[GitHub - santoshshinde2012/cache-invalidation通过在 GitHub 上创建账户来帮助改进 santoshshinde2012/cache-invalidation [https://github.com/santoshshinde2012/cache-invalidation?source=post_page-----ae620b20c98d--------------------------------]]感谢阅读。请留下您的评论,如果这篇博客对你有帮助,请给我点个赞。请关注我以获取更多更新,期待你的留言。