手记

基于实时数据追踪的MongoDB缓存失效设计:使用Debezium、Kafka和NestJS

使用Debezium、Kafka和NestJS技术关于如何跟踪MongoDB数据变更的分步教程,包括缓存失效用例。

简介

在现代分布式应用架构中,所有内容都需要保持同步和及时更新;这确实是为了满足业务使用的需要,从技术角度来看,这既具挑战性又复杂。

最近我碰到了一个案例,我们需要追踪MongoDB中的变化。我们的场景涉及处理来自不同应用程序的各种查询,将它们存储在MongoDB中,并确保数据的一致性。我们有一个REST层,提供了来自MongoDB的查询列表,对查询进行分页,并通过使用Redis缓存来优化性能。关键的挑战是让缓存的数据失效,以便用户总是能看到最新的信息。

在这篇文章中,我们将探讨如何利用变更数据捕获工具 Debezium 设计缓存失效策略。不过,我们不会涉及具体的缓存实现细节。

高级设计

我们如何追踪每一个 MongoDB 数据变更?

MongoDB 提供了变更流,用于实时通知,或者我们可以利用第三方的数据变更捕获开源工具 Debezium,它内部引用了 MongoDB 的操作日志。

  1. 变更流: 变更流允许应用程序跟踪数据库中数据的实时更改,提供了一种订阅数据更改的方式,可以订阅单个集合、多个集合、一个数据库或整个部署中的所有数据更改。
    const changeStream = collection.watch();  
    changeStream.on('change', (next) => {  
        console.log('有变化了', next);  
    });

变更流数据

2. 变更数据捕获: 变更数据捕获是另一种用于跟踪数据库中变化的方法。它提供了更详细的变化记录,包括更改前后数据的值。我们正在考虑使用开源的变更数据捕获工具Debezium,其中一个数据源是oplog(操作日志)。Oplog(操作日志)是一个特殊的固定大小的集合,记录了对数据库进行的所有写操作。

    await consumer.subscribe({  
      topic: 'prefix.database.collection_name',  
      fromBeginning: true,  
    });  

    await consumer.run({  
      eachMessage: async (messagePayload: EachMessagePayload) => {  
        try {  
          const { op, after, before } = JSON.parse(messagePayload.value.toString());  
          switch (op) {  
            case "c": // 创建  
              break;  
            case "u": // 更新  
              break;  
            case "d": // 删除  
              break;  
          }  
        } catch (error) {  
          console.error(  
            `处理主题 ${messagePayload.topic} 的消息时遇到错误:`,  
            error.stack  
          );  
        }  
      },  
    });

Debezium 使用 MongoDB 的 oplog 来跟踪所有数据库变更(插入、更新、删除),并将这些变更实时推送到类似 Kafka 的消息队列。您的应用程序可以消费这些变更。

疾控中心

要了解各个组件

为了更好地理解我们用例中的组件,我们需要列出正在使用的技术工具和解决方案。我们使用 Docker Compose 来尝试本地解决方案,并使用 Docker Desktop 更直观地了解发生了什么。我们还需要配置源 Debezium MongoDB 连接器、Kafka、MongoDB、Redis 和 Debezium。

  • 容器化: 使用 Docker Compose 在 docker-compose.yml 文件中定义 MongoDB、Kafka、Zookeeper(Kafka 所需)、Debezium 和 Redis 的服务。此设置确保每个服务都有隔离的环境,便于管理和配置依赖。
  • MongoDB 配置: 需要在容器中运行 MongoDB,并将其配置为副本集,因为 Debezium 需要通过 oplog 监控更改。
  • Debezium 配置: 配置 Debezium 的 MongoDB 连接器,以监视 MongoDB 中特定的数据库或集合。这包括设置连接器配置,指定要捕获哪些更改以及如何将这些更改流式传输到 Kafka 主题。
  • Kafka 集成: 确保 Kafka 和 Zookeeper 运行正常。Debezium 将捕获的更改推送到 Kafka 主题,其他应用程序或服务可以消费这些主题中的更改。
  • NestJS 开发: 需要使用 NestJS 开发后端服务或微服务,这些服务连接到 Kafka 以消费这些更改事件。这包括为特定主题创建消费者,以处理实时更新或根据数据更改执行操作。

Debezium是什么?Debezium的工作原理是什么?

Debezium 是一个开源工具,旨在帮助将现有数据源中的数据流式传输到 Kafka,使传统数据源能够融入到基于流的架构中。它提供了高质量且开源的变更捕获连接器,支持多种数据库系统,因此它是从关系型数据库流式传输数据到 Kafka 的推荐工具。

[Debezium 架构]通常,你用 Apache Kafka Connect 来部署 Debezium。Kafka Connect 是一个框架和运行时……debezium.io](https://debezium.io/documentation/reference/stable/architecture.html?source=post_page-----ae620b20c98d--------------------------------)

Debezium 可以捕获数据库中的更改并将这些更改发布为消息到消息代理,比如 Apache Kafka。它支持许多关系型和 NoSQL 数据库,并使用 Kafka Connect 作为静态数据和实时数据之间的桥梁。可以配置 Debezium 以捕获数据库中的更改并将其发布到 Kafka 主题,如下图所示。

Debezium是什么?Debezium是怎么运作的。

实施(更常用于日常对话中)

我们可以在多种情况下应用变化数据捕获,比如以下用例:

  1. 事件驱动的通知:当特定数据库更改发生时,触发实时警报或通知。
  2. 实时分析:构建仪表盘以实时反映 MongoDB 数据库的更新。
  3. 数据复制:实时流传输 MongoDB 更改到其他数据库,如 PostgreSQL 或 Elasticsearch。
  4. 缓存管理:根据数据库更改实时更新或无效缓存数据。
  5. 审计跟踪:维护所有更改的全面日志,以满足合规性和调试需求。

当我寻找缓存失效的用例时,我发现了一篇 Gunnar Morling(见 https://github.com/gunnarmorling)在 Debezium 博客上的文章;链接如下。

使用变更数据捕获自动失效缓存:Debezium是一个开源的分布式变更数据捕获平台:启动它,指向你的数据库系统,然后……了解更多详情,请访问

让我们试着在本地机器上设置并实施变更数据捕获功能。

第一步:使用 Docker Compose 来配置 MongoDB

如我之前所说,为了支持 MongoDB 的变更数据捕获功能,我们需要配置一个副本集;我们不能在单机版的 MongoDB 上使用它。单机版的 MongoDB 不保存操作日志(oplog);这个功能只在 MongoDB 作为副本集或分片集群时才提供。

为了追踪文档在更改前后的状态,需要在集合上启用更改前后的状态追踪。

  • 副本集配置:要在本地机器上将MongoDB配置为副本集,我们必须启用身份验证,以确保节点间的安全连接,并为用于内部身份验证的SSL密钥文件提供适当的权限。
  • 变更流配置:配置MongoDB以启用为希望监控的集合启用变更前后的数据捕获。这涉及设置合适的命令或脚本来激活这些功能,确保记录文档的原始状态(变更前)和修改状态(变更后)。
  1. 创建一个初始的Shell脚本文件
    #!/bin/bash  

    # 此脚本会初始化 MongoDB 的副本集。它会等待 MongoDB 服务准备好接受连接,然后使用 'mongosh' 命令连接到 MongoDB 并初始化副本集配置。  

    # 脚本设计为在一个依赖于 MongoDB 容器的单独容器中运行,以确保在尝试初始化副本集之前 MongoDB 服务已经完全启动并运行。  

    echo ====================================================  
    echo ============= 初始化副本集 =============  
    echo ====================================================  

    # 循环直到 MongoDB 服务准备好接受连接  
    until mongosh --host mongo:27017 --eval 'quit(0)' &>/dev/null; do  
      echo "等待 mongod 启动..."  
      sleep 5  
    done  

    echo "MongoDB 启动成功,现在开始初始化副本集..."  

    # 连接到 MongoDB 服务并初始化副本集  
    mongosh --host mongo:27017 -u root -p rootpassword --authenticationDatabase admin <<EOF  
    rs.initiate({  
      _id: "rs0",  
      members: [  
        { _id: 0, host: "localhost:27017" }  
      ]  
    })  

    while (!rs.isMaster().ismaster) {  
      sleep(1000); // 每秒检查一次主节点状态  
    }  

    print("副本集初始化完毕。");  

    use database;  

    // 检查集合是否已存在  

    if (db.getCollectionNames().includes("queries")) {  
      print("集合 'queries' 已存在。正在修改其设置...");  
      db.runCommand({  
        collMod: "queries",  
        changeStreamPreAndPostImages: { enabled: true }  
      });  
    } else {  
      print("集合 'queries' 不存在。正在创建...");  
      db.createCollection("queries", {  
        changeStreamPreAndPostImages: { enabled: true }  
      });  
    }  

    EOF  

    echo "副本集初始化完成,预图象已启用。"

2. 创建SSL密钥对

    openssl rand -base64 756 > ./init/key  # 生成一个 base64 编码的随机密钥并保存到 key 文件
    chmod 400 ./init/key  # 设置 key 文件权限为只读
    chmod +x ./init/init-replica.sh  # 使 init-replica.sh 脚本可执行

3. 创建一个 Docker Compose 配置文件

    services:    
      mongo:  
        image: mongo:latest  
        restart: unless-stopped  
        command: ["--replSet", "rs0", "--keyFile", "/etc/mongo-keyfile"]  
        environment:  
          - MONGO_INITDB_ROOT_USERNAME=root  
          - MONGO_INITDB_ROOT_PASSWORD=rootpassword  
          - MONGO_INITDB_DATABASE=database  
          - MONGO_REPLICA_SET_NAME=rs0  
        ports:  
          - '27017:27017'  
        volumes:  
          - mongodb_data:/data/db  
          - ./init/key:/etc/mongo-keyfile:ro  
        networks:  
          - cdc网络  
        健康检查:  
          test: mongosh --host localhost:27017 --eval 'db.adminCommand("ping")' || exit 1  
          interval: 5s  
          timeout: 30s  
          start_period: 0s  
          start_delay: 1s  
          retries: 30  

      mongo-init-replica:  
        image: mongo:latest  
        依赖于:  
          - mongo  
        volumes:  
          - ./init/init-replica.sh:/docker-entrypoint-initdb.d/init-replica.sh:ro  
        entrypoint: ["/docker-entrypoint-initdb.d/init-replica.sh"]  
        networks:  
          - cdc网络  

    卷:  
      mongodb_data:  
        driver: local  

    网络:  
      cdc网络:  
        driver: bridge

4.. 测试

我们如何确保设置无误,或者如何查看数据库和集合等资料?对于这些问题,我们可以使用MongoDB Compass,并需要一个连接字符串来连接数据库。

MONGODB_URI=mongodb://root:rootpassword@localhost:27017/database?authSource=admin&replicaSet=rs0

Compass:MongoDB的GUI

第2步:用Docker Compose搭建Kafka

以下是用于设置Apache Kafka的Docker Compose配置,我们将使用它,并且还将使用Kafka UI来浏览Kafka界面。

  1. 创建一个 Docker Compose 文件
    services:  
      zookeeper:  
        image: confluentinc/cp-zookeeper:7.5.2  
        ports:  
          - '2181:2181'  
        environment:  
          ZOOKEEPER_CLIENT_PORT: 2181,  
          ZOOKEEPER_TICK_TIME: 2000  

        networks:  
          - cdc-network,  

      kafka:  
        image: confluentinc/cp-kafka:7.5.2  
        depends_on:  
          - zookeeper  
        ports:  
          - '29092:29092'  
        environment:  
          KAFKA_BROKER_ID: 1,  
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181,  
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092,  
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,  
          KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT,  
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1  

        networks:  
          - cdc-network,  

      kafka-ui:  
        image: provectuslabs/kafka-ui:latest  
        depends_on:  
          - kafka  
        ports:  
          - '9090:8080'  
        environment:  
          KAFKA_CLUSTERS_0_NAME: local,  
          KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092,  
          KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181  

        networks:  
          - cdc-network,
  1. 试试

我们用端口 9090 来运行 Kafka UI;请看下面的运行时的截图。

测试版 Kafka

第三步:设置 Debezium 与 Docker Compose

我们将使用Debezium 2.0,因为它支持多个数据库。Debezium服务依赖于Kafka和MongoDB,所以我们要先确保启动它们。

# Debezium MongoDB 连接器配置
debezium-mongo:  
    image: debezium/connect:2.0  
    依赖:  
      - kafka  
      - mongo  
    环境变量:  
      BOOTSTRAP_SERVERS: kafka:9092  # 启动服务器地址: kafka:9092
      GROUP_ID: 1  # 组ID
      CONFIG_STORAGE_TOPIC: debezium_config  # 配置存储主题
      OFFSET_STORAGE_TOPIC: debezium_offset  # 偏移量存储主题
      STATUS_STORAGE_TOPIC: debezium_status  # 状态存储主题
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter  # 键转换器
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter  # 值转换器
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: false  # 是否启用值转换器模式
      CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: false  # 是否启用键转换器模式
    端口映射:  
      - '8083:8083'    
    网络:  
      - cdc-network: CDC网络

我们将使用本机的8083端口,该端口用来运行Kafka Connect REST API接口,以管理连接器、查看状态等操作。

在我们的情况下,我们需要使用两个端点(endpoint):一个是已配置的连接器,另一个是连接器的状态。

RESTful API接口

我们也可以使用Debezium UI来创建连接器并查看状态,这是一个简单的方法,但在使用之前,我们首先需要做一些设置。

GitHub - debezium/debezium-ui: 一个用于 Debezium 的 Web 界面;有问题请在这里报告:https://issues.redhat.com/browse/DBZ。 - debezium/debezium-ui

删除步骤4:设置(MongoDB Debezium连接器):

这里我们需要确保传递所有有效信息,因为这是真正的窍门,可以利用Kafka同步MongoDB的数据变更。

配置 MongoDB Debezium 连接器包括监控 MongoDB 的副本集,定义源配置的属性,并将其与您的 Kafka Connect 设置集成。

  1. 创建或设置连接器

在我们的案例中,我们采用的配置如下;请根据您的配置更新相应的值。

发送HTTP POST请求的目标URL。

访问该地址: http://localhost:8083/connectors

配置

{
  "name": "mongodb-connector",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",

    // MongoDB 连接详情
    "mongodb.hosts": "rs0/mongo:27017", // 请替换为您的 MongoDB 主机,
    "mongodb.name": "dbserver1", // MongoDB 服务器的逻辑名称,

    // 认证
    "mongodb.user": "root",
    "mongodb.password": "rootpassword",
    "mongodb.authsource": "admin",

    // 捕获模式 - 此设置确保您捕获更新的前后状态
    "capture.mode": "change_streams_update_full_with_pre_image",

    // 监控的集合
    "database.include.list": "database", // 根据需要调整
    "collection.include.list": "database.queries", // 根据需要调整

    // 任务数量
    "tasks.max": "3", // 根据资源和需求调整

    // Kafka 主题命名
    "topic.prefix": "dbserver1", // Kafka 主题的前缀

    // 架构和事件处理
    "output.schema": "true",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",

    // 性能调优
    "snapshot.mode": "initial", // 启动时执行初始快照
    "snapshot.max.threads": "4", // 快照线程的数量

    // 事务元数据
    "provide.transaction.metadata": "true",

    // 错误处理
    "errors.retry.timeout": "0", // 失败时不重试,生产环境中您可能需要调整此设置
    "errors.tolerance": "none", // 对错误的容忍度

    // 监控心跳
    "heartbeat.interval.ms": "30000", // 每 30 秒一次

    // 其他
    "max.queue.size": "8192", // 可以在内存中缓冲的最大记录数
    "max.batch.size": "2048",
    "poll.interval.ms": "1000" // 每 1000 毫秒调用一次
  }
}
  • 回答
{  
    "name": "mongodb-connector",  
    "config": {  
        "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",  
        "mongodb.hosts": "rs0/mongo:27017",  
        "mongodb.name": "dbserver1",  
        "mongodb.user": "root",  
        "mongodb.password": "rootpassword",  
        "mongodb.authsource": "admin",  
        "capture.mode": "change_streams_update_full_with_pre_image",  
        "database.include.list": "database",  
        "collection.include.list": "database.queries",  
        "tasks.max": "3",  
        "topic.prefix": "dbserver1",  
        "output.schema": "true",  
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",  
        "key.converter.schemas.enable": "false",  
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",  
        "value.converter.schemas.enable": "false",  
        "snapshot.mode": "initial",  
        "snapshot.max.threads": "4",  
        "provide.transaction.metadata": "true",  
        "errors.retry.timeout": "0",  
        "errors.tolerance": "none",  
        "heartbeat.interval.ms": "30000",  
        "max.queue.size": "8192",  
        "max.batch.size": "2048",  
        "poll.interval.ms": "1000",  
        "name": "mongodb-connector1"  
    },  
    "tasks": [],  
    "type": "source"  
}

2. 查看状态

  • HTTP GET 请求
http://localhost:8083/connectors/mongodb-connector/status
  • 回答
{
    "name": "mongodb-connector",
    "connector": {
        "状态": "RUNNING",
        "worker_id": "172.18.0.7:8083"
    },
    "tasks": [
        {
            "id": 0,
            "状态": "RUNNING",
            "worker_id": "172.18.0.7:8083"
        }
    ],
    "type": "source"
}

如需了解更多详情,请查看下方的 MongoDB Debezium 连接器文档,该文档包含所需的所有细节。

Debezium MongoDB 连接器插件Debezium 的 MongoDB 连接器插件用于实时监控 MongoDB 副本集或分片集群中的文档变更

完成此设置后,我们可以验证Kafka仪表板是否运行正常,并确保所有相关的主题已经在Kafka中创建。我们甚至可以检查与主题prefix.database_name.collection_name下的集合相对应的消息。(例如:dbserver1.database.queries

卡夫卡作品的主题

步骤 5:设置 NestJS 以接收 Kafka 事件

如果你是NestJS的新手,请阅读下面为初学者准备的文章。如果你要找一个基本的Node.js模板(使用Express.js),请参考this repository

    const kafka = new Kafka({  
      clientId: this.appConfigService.kafkaClientId,  
      brokers: this.appConfigService.kafkaBrokers,  
    });  
    // 消费者组,允许其接收该组内的 Kafka 主题的消息
    this.databaseConsumer = kafka.consumer({ groupId: 'change-tracker-group' }); // 这定义了一个名为 'change-tracker-group' 的消费者组

    async onModuleInit() {  
        try {  
          await this.initializeConsumer(  
            this.databaseConsumer,  
            this.handleDatabaseChangeMessage,  
          );  
        } catch (error) {  
          console.error('控制台打印("消费者初始化出错:", 错误)');  
        }  
    }

这里可以看到,我们有个组ID,这个ID和之前的配置没关系。它创建了一个Kafka消费者,订阅了特定的消费者组(如change-tracker-group),从而可以从该组内的Kafka主题接收消息。

在订阅主题时,我们可以使用 prefix.database_name.collection_name 这种格式(例如 dbserver1.database.queries),并参照我们之前提供的配置信息。

     private async initializeConsumer(  
        consumer: Consumer,  
        messageHandler: (messagePayload: EachMessagePayload) => Promise<void>,  
      ) {  
        try {  
          await consumer.connect();  

          await consumer.subscribe({  
            topic: 'dbserver1.database.queries',  
            fromBeginning: true,  
          });  

          await consumer.run({  
            eachMessage: async (messagePayload: EachMessagePayload) => {  
              try {  
                await messageHandler.call(this, messagePayload);  
              } catch (error) {  
                console.error(  
                  `处理来自主题 ${messagePayload.topic} 的消息时出错:`,  
                  error.stack,  
                );  
              }  
            },  
          });  
        } catch (error) {  
          console.error('初始化消费者时出错:', error.stack);  
        }  
      }  

      private async handleDatabaseChangeMessage({ message }: EachMessagePayload) {  
        if (!message.value) {  
          console.warn('收到没有值的消息,跳过处理...');  
          return;  
        }  

        try {  
          const { op, after, before } = JSON.parse(message.value.toString());  
          console.info(`消息内容: ${message.value}`);  
          switch (op) {  
            case 'c':  
            case 'u':  
            case 'd':  
              await this.handleCacheInvalidation(op, after, before);  
              break;  
            default:  
              console.warn(`未知的操作: ${op}`);  
              break;  
          }  
        } catch (error) {  
          console.error('处理数据库变更消息时出错:', error.stack);  
        }  
      }

源码

第六步:测试一下安装情况

我们可以运行 Docker Compose 命令来启动创建的 Docker Compose 文件,并且只需运行 npm start 就可以启动应用。

Docker Compose工具

  • 检查 Docker 桌面应用,确认所有容器是否已创建?

Docker桌面

  • 运行 NestJS 应用

npm start

  • 尝试在 MongoDB 中创建或更新一条记录。

MongoDB操作

在上面的截图里,左边我们创建或更新了一条记录,而在右边我们可以看到这是我们订阅的主题中的消息。

请见下文我们在该主题上收到的消息。

    {  
        "before": "{\"_id\": {\"$oid\": \"67741cc0e35ad0bfd51115c8\"},\"userEmail\": \"user1@example.com\",\"moduleName\": \"prediction\",\"queryText\": \"今天气温会是多少?\",\"metadata\": {\"priority\": \"high\"}}",  
        "after": "{\"_id\": {\"$oid\": \"67741cc0e35ad0bfd51115c8\"},\"userEmail\": \"user2@example.com\",\"moduleName\": \"prediction\",\"queryText\": \"今天气温会是多少?\",\"metadata\": {\"priority\": \"high\"}}",  
        "patch": null,  
        "filter": null,  
        "updateDescription": {  
            "removedFields": null,  
            "updatedFields": "{\"userEmail\": \"user2@example.com\"}",  
            "truncatedArrays": null  
        },  
        "source": {  
            "version": "2.0.1.Final",  
            "connector": "mongodb",  
            "name": "dbserver1",  
            "ts_ms": 1735662855000,  
            "snapshot": "快照",  
            "db": "database",  
            "sequence": null,  
            "rs": "rs0",  
            "collection": "queries",  
            "ord": 2,  
            "lsid": null,  
            "txnNumber": "事务编号": null  
        },  
        "op": "u",  
        "ts_ms": 1735662855656,  
        "transaction": null  
    }
源码

请查看下面的源代码仓库,其中包含了从头到尾的解决方案,但这还不能直接用于生产。希望您能在这里或GitHub的问题部分提出您的意见和建议。

[GitHub - santoshshinde2012/cache-invalidation通过在 GitHub 上创建账户来帮助改进 santoshshinde2012/cache-invalidation [https://github.com/santoshshinde2012/cache-invalidation?source=post_page-----ae620b20c98d--------------------------------]]

感谢阅读。请留下您的评论,如果这篇博客对你有帮助,请给我点个赞。请关注我以获取更多更新,期待你的留言。

0人推荐
随时随地看视频
慕课网APP