手记

AutoMQ生态集成 - 将数据从 AutoMQ Kafka 导入 RisingWave 数据库

从 AutoMQ Kafka 导出数据到 RisingWave

RisingWave 是一个分布式的流数据库。它提供标准的SQL接口,兼容PostgreSQL语言标准,使其与PostgreSQL生态系统能够无缝集成,并且用户无需改动代码。RisingWave将流视作表,并允许用户以优雅的方式在流数据和历史数据之上编写复杂的查询语句。有了RisingWave,用户就可以纯粹专注于他们的查询分析逻辑,而无需学习Java或特定系统的底层API。

本文将介绍如何通过 RisingWave Cloud 将数据从 AutoMQ Kafka 导入 RisingWave 数据库。

info

本文中提及的 AutoMQ Kafka 术语,均特指安托盟丘(杭州)科技有限公司通过 GitHub AutoMQ 组织下开源的 automq-for-kafka 项目。

准备 AutoMQ Kafka 环境和测试数据

参考 AutoMQ 部署 AutoMQ 到 AWS▸ 部署 AutoMQ Kafka 集群。请确保 RisingWave 能够与您的 AutoMQ Kafka 服务器直接连接。你可以参考文档 Create a VPC connection 来创建 RisingWave Cloud和你的VPC之间的安全连接。

在AutoMQ Kafka中快速创建一个名为 example_topic 的主题并向其中写入一条测试 JSON 数据,可以通过以下步骤实现。

创建Topic

使用 Kafka 的命令行工具来创建主题。你需要有 Kafka 环境的访问权限,并且确保 Kafka 服务正在运行。以下是创建主题的命令:

./kafka-topics.sh --create --topic exampleto_topic --bootstrap-server 10.0.96.4:9092  --partitions 1 --replication-factor 1

注意:将 topic 和 bootstarp-server 替换为实际使用的 Kafka 服务器地址。

创建 topic 后可以用以下命令检查 topic 创建的结果。

./kafka-topics.sh --describe example_topic --bootstrap-server 10.0.96.4:9092

生成测试数据

生成一条简单的JSON格式的测试数据。

{  "id": 1,  "name": "test_user",  "timestamp": "2023-11-10T12:00:00",  "status": "active"}

写入测试数据

使用 Kafka 的命令行工具或者编程方式将测试数据写入到 example_topic。以下是使用命令行工具的一个示例:

echo '{"id": 1, "name": "test_user", "timestamp": "2023-11-10T12:00:00", "status": "active"}' | sh kafka-console-producer.sh --broker-list 10.0.96.4:9092 --topic example_topic

注意:将 topic 和 bootstarp-server 替换为实际使用的 Kafka 服务器地址。

使用如下命令可以查看刚写入的topic数据:

sh kafka-console-consumer.sh --bootstrap-server 10.0.96.4:9092 --topic example_topic --from-beginning

在 RisingWave Cloud 上创建 AutoMQ Kafka 源

  1. 到 RisingWave Cloud Clusters 创建集群。

  2. 到 RisingWave Cloud Source创建源。

  3. 指定集群和数据库,并且登入数据库。

  4. AutoMQ Kafka 100% 兼容 Apache Kafka, 因此只需要点击 Create source 并且选择 Kafka。

  5. 根据 RisingWave Cloud的引导界面配置连接器设置,源信息和schema信息。

  6. 确认生成的SQL语句,点击 Confirm 完成源的创建。

Note

默认安装的 AutoMQ Kafka 端口是 9092 并且 SSL 没有启用。 如果需要启用 SSL,请参考文档 Apache Kafka Documentation.在本例中,你可以使用JSON格式,并且设置启动模式为 earliest 来从头访问topic中的所有数据。

查询数据

  1. 到 RisingWave Cloud Console 登入集群。
  2. 运行如下的 SQL 语句来访问已经导入的数据。
SELECT * from your_source_name limit 1;

Note

使用你创建源时定义的名字替换 your_source_name。

END

0人推荐
随时随地看视频
慕课网APP