Datax 很方便扩展插件,我们通过实现一个简单的kafka writer 来看一下如何新增插件
1-扩展插件之前,推荐大家先阅读Datax的官方文档
https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md
我们会看到Datax几个核心的概念:
Job: Job是DataX用以描述从一个源头到一个目的端的同步作业,是DataX数据同步的最小业务单元。比如:从一张mysql的表同步到odps的一个表的特定分区。 Task: Task是为最大化而把Job拆分得到的最小执行单元。比如:读一张有1024个分表的mysql分库分表的Job,拆分成1024个读Task,用若干个并发执行。 TaskGroup: 描述的是一组Task集合。在同一个TaskGroupContainer执行下的Task集合称之为TaskGroup JobContainer: Job执行器,负责Job全局拆分、调度、前置语句和后置语句等工作的工作单元。类似Yarn中的JobTracker TaskGroupContainer: TaskGroup执行器,负责执行一组Task的工作单元,类似Yarn中的TaskTracker。
简而言之, Job拆分成Task,在分别在框架提供的容器中执行,插件只需要实现Job和Task两部分逻辑。
物理执行模型
框架为插件提供物理上的执行能力(线程)。DataX框架有三种运行模式: Standalone: 单进程运行,没有外部依赖。 Local: 单进程运行,统计信息、错误信息汇报到集中存储。 Distrubuted: 分布式多进程运行,依赖DataX Service服务。 当然,上述三种模式对插件的编写而言没有什么区别,你只需要避开一些小错误, 插件就能够在单机/分布式之间无缝切换了。 当JobContainer和TaskGroupContainer运行在同一个进程内时,就是单机模式(Standalone和Local); 当它们分布在不同的进程中执行时,就是分布式(Distributed)模式。 备注:开源的DATAX是没有分布式功能的,大家可以按照上述的思想,自己研究开发。
2 -创建kafkawriter模块
<module>kafkawriter</module> 引入kafkawriter所需要的依赖 因为kafka版本比较多,这里选用0.9.0.1这个版本。 <kafka.version>0.9.0.1</kafka.version> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>
当然插件还要引入框架核心的一些依赖
<dependencies> <dependency> <groupId>com.alibaba.datax</groupId> <artifactId>datax-common</artifactId> <version>${datax-project-version}</version> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>16.0.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency> </dependencies>
接下来要配置Datax打包的一些配置
首先我们要了解Datax打包的方式:Maven-assembly-plugin 1、作用:要想将写的程序和它本身所依赖的jar包一起build到一个包里,是maven中针对打包任务而提供的标准插件。 2、其他作用: 1)提供一个把工程依赖元素、模块、网站文档等其他文件存放到单个归档文件里。 2)打包成指定格式分发包,支持各种主流的格式如zip、tar.gz、jar和war等,具体打包哪些文件是高度可控的。 3)能够自定义包含/排除指定的目录或文件。 总体来说,实现插件maven-assembly-plugin需要两个步骤: 第1步骤:pom.xml文件里配置maven-assembly-plugin,指定描述文件 第2步骤:描述文件配置具体参数
在kafkawriter的pom文件中需要新增
<plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptors> <!--描述文件路径--> <descriptor>src/main/assembly/package.xml</descriptor> </descriptors> <finalName>datax</finalName> </configuration> <executions> <execution> <id>dwzip</id> <phase>package</phase> <!-- 绑定到package生命周期阶段上 --> <goals> <goal>single</goal> <!-- 只运行一次 --> </goals> </execution> </executions> </plugin>
kafkawriter中的package.xml我们的配置:
<id></id> <formats> <format>dir</format> </formats> <includeBaseDirectory>false</includeBaseDirectory> <fileSets> <fileSet> <directory>src/main/resources</directory> <includes> <include>plugin.json</include> </includes> <outputDirectory>plugin/writer/kafkawriter</outputDirectory> </fileSet> <fileSet> <directory>target/</directory> <includes> <include>kafkawriter-0.0.1-SNAPSHOT.jar</include> </includes> <outputDirectory>plugin/writer/kafkawriter</outputDirectory> </fileSet> </fileSets> <dependencySets> <dependencySet> <useProjectArtifact>false</useProjectArtifact> <outputDirectory>plugin/writer/kafkawriter/libs</outputDirectory> <scope>runtime</scope> </dependencySet> </dependencySets>
1-formats标签 maven-assembly-plugin 支持的打包格式有zip、tar、tar.gz (or tgz)、tar.bz2 (or tbz2)、jar、dir、war,可以同时指定多个打包格式 <formats> <format>dir</format> ...可配置多个 </formats> 2-dependencySets标签: 用来定制工程依赖 jar 包的打包方式 3-fileSets 管理一组文件的存放位置
3-模块创建和配置完成后,接下来进入代码的开发
开发之前我们要考虑,如果做一个kafkawriter需要哪些参数?
1-topic 主题,我们要往哪个主题写入消息 2-kafka broker 地址 3-分隔符,每行消息的分隔符
代码
package com.alibaba.datax.plugin.writer.kafkawriter; import com.alibaba.datax.common.element.Column; import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.plugin.RecordReceiver; import com.alibaba.datax.common.spi.Writer; import com.alibaba.datax.common.util.Configuration; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.Properties; /** * @author dalizu on 2018/11/7. * @version v1.0 * @desc */ public class KafkaWriter extends Writer { public static class Job extends Writer.Job { private static final Logger log = LoggerFactory.getLogger(Job.class); private Configuration conf = null; @Override public void init() { this.conf = super.getPluginJobConf();//获取配置文件信息{parameter 里面的参数} log.info("kafka writer params:{}", conf.toJSON()); //校验 参数配置 this.validateParameter(); } private void validateParameter() { //toipc 必须填 this.conf .getNecessaryValue( Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE); this.conf .getNecessaryValue( Key.BOOTSTRAP_SERVERS, KafkaWriterErrorCode.REQUIRED_VALUE); } @Override public void prepare() { } @Override public List<Configuration> split(int mandatoryNumber) { //按照reader 配置文件的格式 来 组织相同个数的writer配置文件 List<Configuration> configurations = new ArrayList<Configuration>(mandatoryNumber); for (int i = 0; i < mandatoryNumber; i++) { configurations.add(conf); } return configurations; } @Override public void post() { } @Override public void destroy() { } } public static class Task extends Writer.Task { private static final Logger log = LoggerFactory.getLogger(Task.class); private static final String NEWLINE_FLAG = System.getProperty("line.separator", "\n"); private Producer<String, String> producer; private String fieldDelimiter; private Configuration conf; @Override public void init() { this.conf = super.getPluginJobConf(); fieldDelimiter = conf.getUnnecessaryValue(Key.FIELD_DELIMITER, "\t", null); //初始化kafka Properties props = new Properties(); props.put("bootstrap.servers", conf.getString(Key.BOOTSTRAP_SERVERS)); props.put("acks", "all");//这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。 props.put("retries", 0); // Controls how much bytes sender would wait to batch up before publishing to Kafka. //控制发送者在发布到kafka之前等待批处理的字节数。 //控制发送者在发布到kafka之前等待批处理的字节数。 满足batch.size和ling.ms之一,producer便开始发送消息 //默认16384 16kb props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer(props); } @Override public void startWrite(RecordReceiver lineReceiver) { log.info("start to writer kafka"); Record record = null; while ((record = lineReceiver.getFromReader()) != null) {//说明还在读取数据,或者读取的数据没处理完 //获取一行数据,按照指定分隔符 拼成字符串 发送出去 producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC), recordToString(record), recordToString(record))); } } @Override public void destroy() { if (producer != null) { producer.close(); } } private String recordToString(Record record) { int recordLength = record.getColumnNumber(); if (0 == recordLength) { return NEWLINE_FLAG; } Column column; StringBuilder sb = new StringBuilder(); for (int i = 0; i < recordLength; i++) { column = record.getColumn(i); sb.append(column.asString()).append(fieldDelimiter); } sb.setLength(sb.length() - 1); sb.append(NEWLINE_FLAG); return sb.toString(); } } }
4-代码开发完成后需要配置plugin.json
{ "name": "kafkawriter", "class": "com.alibaba.datax.plugin.writer.kafkawriter.KafkaWriter", "description": "简单插件,有待测试验证. 原理: TODO", "developer": "lizu" }
让框架可以加载此插件
还需要在Datax目录下的package.xml下面新增我们新开发的插件
<fileSet> <directory>kafkawriter/target/datax/</directory> <includes> <include>**/*.*</include> </includes> <outputDirectory>datax</outputDirectory> </fileSet> 打包准备测试:mvn -U clean package assembly:assembly -Dmaven.test.skip=true
5-测试Mysql-->kafka
配置文件:
{ "job": { "setting": { "speed": { "channel":1 } }, "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "root", "password": "root123", "connection": [ { "querySql": [ "select * from user;" ], "jdbcUrl": [ "jdbc:mysql://localhost:3306/datax" ] } ] } }, "writer": { "name": "kafkawriter", "parameter": { "topic": "test-topic", "bootstrapServers": "192.168.88.129:9092", "fieldDelimiter":"," } } } ] } }
可以去kafka 命令行 启动一个消费者 查看是否有消息。
DATAX日志: 任务启动时刻 : 2018-11-08 11:38:37 任务结束时刻 : 2018-11-08 11:38:57 任务总计耗时 : 20s 任务平均流量 : 470.20KB/s 记录写入速度 : 5867rec/s 读出记录总数 : 117358 读写失败总数 : 0
我是用代码写的消费者去消费,并且把offset保存到了本地文件
117358
和日志传输数量完全一样
消费端控制台:
... offset = 58538, key = 224928,99991285,99991285,99991285,991285,null,991285,哈哈哈哈哈哈哈哈哈哈哈哈哈哈,2018-09-20 15:30:44,10.10.10.1 , value = 224928,99991285,99991285,99991285,991285,null,991285,哈哈哈哈哈哈哈哈哈哈哈哈哈哈,2018-09-20 15:30:44,10.10.10.1 offset = 58539, key = 224929,99991311,99991311,99991311,991311,null,991311,哈哈哈哈哈哈哈哈哈哈哈哈哈哈,2018-09-20 15:30:44,10.10.10.1 , value = 224929,99991311,99991311,99991311,991311,null,991311,哈哈哈哈哈哈哈哈哈哈哈哈哈哈,2018-09-20 15:30:44,10.10.10.1 ......
到此一个简单的kafkawriter开发完成
如果需要完整的插件代码或者消费者代码,可以留言。
热门评论
kudo支持吗
kudo支持吗
请问支持hive到kafka吗?