手记

Kafka 连接入门:轻松实现数据摄入

学习如何使用Kafka Connect将日志文件中的数据提取并导入到Kafka主题,配置属性参数,并有效发送消息。

Kafka Connect

瑞典中文翻译
瑞典中文翻译
介绍

在这篇文章里,我们将介绍Kafka Connect,这是Kafka生态系统中的核心组件之一,旨在将来自各种源的数据传输到Kafka主题,再传输至目标目的地。Kafka Connect简化了数据摄入,允许您将来自日志文件和数据库的数据移动到Kafka,而无需大量编码。

在这篇指南里,我们将重点讲解如何设置Kafka Connect配置,定义属性文件内容,并配置Kafka以高效地发送消息。

🌟 自己节奏的学习者们
🎯 喜欢按自己的节奏学习吗?掌控自己的成长,立刻开始吧!👉 [Here]

Kafka Connect 概述

Kafka Connect 是 Kafka 内部的数据集成平台,允许您快速且轻松地在外部源和 Kafka 主题之间进行数据流传输。使用 Kafka Connect,您可以从各种来源摄取数据而无需编写自定义代码,从而简化了可扩展数据管道的设置过程。

Kafka Connect 可以做什么呢?你知道吗?

Kafka Connect 促进以下几种数据之间的传输:

  • 标准来源:例如流日志文件、数据库表、syslog日志以及消息队列系统(如JMS或Flume)。
  • 标准目标端:这些可以是文件系统,比如HDFS,其他数据库表或消息队列系统。Kafka Connect甚至可以将数据从一个Kafka主题传输到另一个Kafka主题。

Kafka Connect 提供两种操作模式:

  1. 独立模式: 适用于开发来说和测试环境。
  2. 分布式模式: 专为需要高度可扩展性和容错性的生产环境设计。

在这节教程里,我们将专注于独立模式(Standalone Mode)来展示如何设置。相比之下,最好使用分布式模式以提高可靠性和可扩展性。

定义Kafka Connect来生成消息

在我们开始设置Kafka Connect之前,我们需要确保有一个可用的Kafka主题来进行消息生产。以下步骤详细说明了如何准备Kafka Connect属性文件。并检查接收我们数据源消息的Kafka主题是否已存在。

配置Kafka Connect设置

Kafka Connect 需要两个关键配置文件,以便从数据源向 Kafka 主题生产消息。

  1. 独立属性文件:全局配置Kafka Connect的设置,例如代理信息和偏移存储。
  2. 源属性文件:指定源数据的位置(例如日志文件位置)和目标Kafka主题名。
第一步:确认 Kafka 主题(topic)是否存在

首先,确保Kafka里已经有一个主题。我们将使用Kafka的命令行工具检查是否已经创建了类似itversity_retail这样的主题,并且该主题的配置是否正确。

打开终端,执行这个命令来描述这个话题:

    /opt/kafka/bin/kafka-topics.sh \  
      --zookeeper m01.itversity.com:2181,m02.itversity.com:2181,w01.itversity.com:2181 \  
      --describe \  
      --topic `whoami`_retail

反斜杠用于命令的续行。这个命令用于描述指定的主题(描述主题的详细信息)。whoami是用于生成主题名称的部分,具体指的是whoami_retail。这些是用于连接到ZooKeeper集群的服务器地址。topic参数后跟的是要描述的主题名称。

此命令检查一个以当前登录用户为前缀(whoami)的主题名称,并确认配置细节,比如分区数和复制因子。

提示whoami 命令会将当前用户名动态地赋给主题名称,确保在多用户环境中唯一。

第 2 步:配置 Kafka Connect 工作目录

为了保持条理清晰,可以为Kafka Connect设置创建一个专门文件夹。

请在终端中运行以下命令以创建目录并切换到该目录:
mkdir -p ~/kafka_connect/retail_logs_produce  
cd ~/kafka_connect/retail_logs_produce

👩‍🏫 专业指导
💡 想要获得专家支持和个性化指导吗? 🤝 加入此课程,让专业人士引领你走向成功之路!🎓 👉 点击这里了解详情 [Here]

步骤3:复制默认的配置文件

Kafka Connect 在 Kafka 安装目录中提供了一些默认配置模板。可以将这些模板复制到工作目录中以便自定义。

    cp /opt/kafka/config/connect-standalone.properties ~/kafka_connect/retail_logs_produce/retail_log_standalone.properties  
    cp /opt/kafka/config/connect-file-source.properties ~/kafka_connect/retail_logs_produce/retail_log_file_source.properties
步骤 4:配置独立的属性配置文件

在编辑器中打开并更新 retail_log_standalone.properties 文件中的以下关键属性值。

  • Bootstrap Servers : 指定 Kafka 服务器。对于多节点集群,列出每个 Kafka 服务器以确保冗余。
bootstrap.servers=w01.itversity.com:9092,w02.itversity.com:9092

注意:如果是单节点环境,请使用localhost:9092。

  • 键值转换器:设置转换器为 StringConverter 以处理文本数据。
    key.converter=org.apache.kafka.connect.storage.StringConverter   
    value.converter=org.apache.kafka.connect.storage.StringConverter
  • 文件偏移量跟踪,指定一个文件位置来跟踪数据源中上次读取的位置。
    offset.storage.file.filename=/home/your_username/kafka_connect/retail_logs_produce/retail_logs.offsets
  • REST端口:指定一个唯一的端口号以避免在共享环境中发生冲突。选择一个介于10000到65535之间的唯一端口号。
rest.port=18083

你的独立的属性文件应该像这样写。

    bootstrap.servers=w01.itversity.com:9092,w02.itversity.com:9092  
    key.converter=org.apache.kafka.connect.storage.StringConverter  
    value.converter=org.apache.kafka.connect.storage.StringConverter  
    offset.storage.file.filename=/home/your_username/kafka_connect/retail_logs_produce/retail_logs.offsets  
    offset.flush.interval.ms=10000  
    rest.port=18083

步骤 5:配置源文件的属性:

接下来,打开 retail_log_file_source.properties 文件并设置数据源和 Kafka 主题的配置。

  • 连接器名称:给这个连接器取一个独一无二的名字。
    name=itversity零售文件源
  • 连接器类型:使用 FileStreamSource 读取文件。
    connector.class=FileStreamSource
  • 日志文件路径 : 定义要读取的日志文件路径。
    file=/opt/gen_logs/logs/access.log
  • Kafka 主题名称 : 指定目标主题名称。
    topic=itversity_retail

你的源配置文件应该看起来像这样:

    name=itversity-retail-file-source  
    connector.class=FileStreamSource  
    tasks.max=1  
    file=/opt/gen_logs/logs/access.log  
    topic=itversity_retail

提示:确保Kafka中存在主题名称,避免出现导入错误。

启动Kafka Connect以生成消息

一旦这两个配置文件到位,你就可以开始以独立模式启动Kafka Connect了。

步骤:1 运行 Kafka Connect 单机模式

    connect-standalone.sh retail_log_standalone.properties retail_log_file_source.properties

零售日志独立模式启动脚本

步骤:2 监控 Kafka Connect 的输出:当 Kafka Connect 读取指定的日志文件时,消息应被生产到 itversity_retail 主题中。你应该能看到 Kafka Connect 记录的读取日志消息。

成功设置Kafka Connect的技巧

为了更好地利用Kafka Connect,这里有几个实用的小建议:

  1. 使用独立模式进行测试:独立模式非常适合用于实验和测试Kafka Connect。在生产环境中请使用分布式模式,因为它能提供更高的容错性。
  2. 定期监控偏移量:Kafka Connect依赖于偏移量来跟踪已经摄入的数据。确保定期监控偏移量,特别是在处理日志文件或持续流式数据时。
  3. 管理端口冲突:使用独特的REST端口号以避免在多用户或多实例环境下发生冲突。
  4. 验证主题名称:启动Kafka Connect前,始终确认目标Kafka主题已经存在,以防止数据摄入错误。

🤔 寻求清晰的朋友们
🚦 不知道从哪里开始或如何评估进度吗?没问题,我们来帮您的忙!从这篇详尽的回顾文章开始,找到您的路吧!✨ 👉 [请点击这里]

下一步行动

当你有了基本的Kafka Connect管道运行起来后,你可能想要进一步探索更多高级功能:

  • 尝试不同的数据源:设置各种数据源的连接器,例如数据库、API或消息队列。
  • 切换到分布式模式:尝试将Kafka Connect部署为分布式模式,以实现生产级别的可靠性和可扩展性。
  • 集成Kafka Streams:一旦数据流入Kafka,就可以使用Kafka Streams进行实时数据处理,以丰富或转换数据,然后再送到最终目的地。
结论部分

Kafka Connect 提供了一个强大的解决方案来管理跨平台的数据摄取,消除了编写自定义脚本的需要。通过按照本指南操作,您已在单机模式下配置了 Kafka Connect,设置了相关配置文件,并成功创建了一个从日志文件到 Kafka 主题的数据摄取管道。

保持联络。

💡关注一下Durga Gadiraju 了解更多关于Kafka和数据流的教程。

🔄分享此指南,帮助其他人更快地开始使用Kafka Connect,和其他人一起.

_💬_有疑问?_留下评论一起讨论吧!

0人推荐
随时随地看视频
慕课网APP