手记

Datax3.0 Hive Reader插件

1-介绍:

上一篇文章已经介绍了Datax插件开发的完整流程。
针对Hive Reader插件的开发,我们之间从代码开始

2-实现原理

如果想要从hive表中把数据按照一列一列把数据取出来,可以使用官方的hdfsreader.
在某些时候,我们想要使用更灵活的方式,比如使用hive sql查询语句导出sql查询的数据.
实现原理:
根据配置的sql,通过将查询结果保存到一张新的临时hive表中这种方式.
然后获取新表的hdfs文件地址,然后读取文件到缓冲区,最后删除临时的表.
当然还有其他方式,比如使用hive-hcatalog,后面也会分享。

3-参数定义

* hiveSql
 * 描述:需要执行导出的sql,可以是多个
 * 必选:是
 * 默认值:无
* defaultFS 
 * 描述:Hadoop hdfs文件系统namenode节点地址。
 * 必选:是
 * 默认值:无

4-环境准备

* hadoop fs -mkdir /user/datax_tmp 先创建临时目录,否则会报错(临时hive 表使用) TODO 后期在代码中处理
* 执行datax任务的机器要按照hive,并且配置好环境变量

5-代码实现

/**
 * @author dalizu on 2018/11/10.
 * @version v1.0
 * @desc hive reder
 */
public class HiveReader {
    /**
     * Job 中的方法仅执行一次,Task 中方法会由框架启动多个 Task 线程并行执行。
     * <p/>
     * 整个 Reader 执行流程是:
     * <pre>
     * Job类init-->prepare-->split
     *
     * Task类init-->prepare-->startRead-->post-->destroy
     * Task类init-->prepare-->startRead-->post-->destroy
     *
     * Job类post-->destroy
     * </pre>
     */
    public static class Job extends Reader.Job {
        private static final Logger LOG = LoggerFactory
                .getLogger(Job.class);
        private Configuration readerOriginConfig = null;
        @Override
        public void init() {
            LOG.info("init() begin...");
            this.readerOriginConfig = super.getPluginJobConf();//获取配置文件信息{parameter 里面的参数}
            this.validate();
            LOG.info("init() ok and end...");
        }
        private void validate() {
            this.readerOriginConfig.getNecessaryValue(Key.DEFAULT_FS,
                    HiveReaderErrorCode.DEFAULT_FS_NOT_FIND_ERROR);
            List<String> sqls = this.readerOriginConfig.getList(Key.HIVE_SQL, String.class);
            if (null == sqls || sqls.size() == 0) {
                throw DataXException.asDataXException(
                        HiveReaderErrorCode.SQL_NOT_FIND_ERROR,
                        "您未配置hive sql");
            }
        }
        @Override
        public List<Configuration> split(int adviceNumber) {
            //按照Hive  sql的个数 获取配置文件的个数
            LOG.info("split() begin...");
            List<String> sqls = this.readerOriginConfig.getList(Key.HIVE_SQL, String.class);
            List<Configuration> readerSplitConfigs = new ArrayList<Configuration>();
            Configuration splitedConfig = null;
            for (String querySql : sqls) {
                splitedConfig = this.readerOriginConfig.clone();
                splitedConfig.set(Key.HIVE_SQL, querySql);
                readerSplitConfigs.add(splitedConfig);
            }
            return readerSplitConfigs;
        }
        //全局post
        @Override
        public void post() {
            LOG.info("任务执行完毕,hive reader post");
        }
        @Override
        public void destroy() {
        }
    }
    public static class Task extends Reader.Task {
        private static final Logger LOG = LoggerFactory
                .getLogger(Task.class);
        private Configuration taskConfig;
        private String hiveSql;
        private String tmpPath;
        private String tableName;
        private DFSUtil dfsUtil = null;
        private HashSet<String> sourceFiles;
        @Override
        public void init() {
            //获取配置
            this.taskConfig = super.getPluginJobConf();//获取job 分割后的每一个任务单独的配置文件
            this.hiveSql = taskConfig.getString(Key.HIVE_SQL);//获取hive sql
            taskConfig.set(Key.FIELDDELIMITER, '\001');//设置hive 存储文件 hdfs默认的分隔符
            tmpPath = Constant.TMP_PREFIX + KeyUtil.genUniqueKey();//创建临时Hive表 存储地址
            LOG.info("配置分隔符后:" + this.taskConfig.toJSON());
            this.dfsUtil = new DFSUtil(this.taskConfig);//初始化工具类
        }
        @Override
        public void prepare() {
            //创建临时Hive表,指定存储地址
            tableName = hiveTableName();
            String hiveCmd = "create table " + tableName + " LOCATION '" + tmpPath + "' as " + hiveSql;
            LOG.info("hiveCmd ----> :" + hiveCmd);
            //执行脚本,创建临时表
            if (!ShellUtil.exec(new String[]{"hive", "-e", "\"" + hiveCmd + "\""})) {
                throw DataXException.asDataXException(
                        HiveReaderErrorCode.SHELL_ERROR,
                        "创建hive临时表脚本执行失败");
            }
            LOG.info("创建hive 临时表结束 end!!!");
            LOG.info("prepare(), start to getAllFiles...");
            List<String> path = new ArrayList<String>();
            path.add(tmpPath);
            this.sourceFiles = dfsUtil.getAllFiles(path, Constant.TEXT);
            LOG.info(String.format("您即将读取的文件数为: [%s], 列表为: [%s]",
                    this.sourceFiles.size(),
                    StringUtils.join(this.sourceFiles, ",")));
        }
        @Override
        public void startRead(RecordSender recordSender) {
            //读取临时hive表的hdfs文件
            LOG.info("read start");
            for (String sourceFile : this.sourceFiles) {
                LOG.info(String.format("reading file : [%s]", sourceFile));
                //默认读取的是TEXT文件格式
                InputStream inputStream = dfsUtil.getInputStream(sourceFile);
                UnstructuredStorageReaderUtil.readFromStream(inputStream, sourceFile, this.taskConfig,
                        recordSender, this.getTaskPluginCollector());
                if (recordSender != null) {
                    recordSender.flush();
                }
            }
            LOG.info("end read source files...");
        }
        //只是局部post  属于每个task
        @Override
        public void post() {
            LOG.info("one task hive read post...");
            deleteTmpTable();
        }
        private void deleteTmpTable() {
            String hiveCmd = "drop table " + tableName;
            LOG.info("hiveCmd ----> :" + hiveCmd);
            //执行脚本,创建临时表
            if (!ShellUtil.exec(new String[]{"hive", "-e", "\"" + hiveCmd + "\""})) {
                throw DataXException.asDataXException(
                        HiveReaderErrorCode.SHELL_ERROR,
                        "删除hive临时表脚本执行失败");
            }
        }
        @Override
        public void destroy() {
            LOG.info("hive read destroy...");
        }
        //创建hive临时表名称
        private String hiveTableName() {
            StringBuilder str = new StringBuilder();
            FastDateFormat fdf = FastDateFormat.getInstance("yyyyMMdd");
            str.append(Constant.TABLE_NAME_PREFIX).append(fdf.format(new Date()))
                    .append("_").append(KeyUtil.genUniqueKey());
            return str.toString();
        }
    }
}
注意:
代码中注释比较详细
代码中使用的DFSUtil 工具类 是copy  官方hdfsreader里面的。
希望大家先理解整个插件的运行流程,代码最上方有。

shell工具类:

/**
 * @author dalizu on 2018/11/10.
 * @version v1.0
 * @desc 执行shell脚本
 */
public class ShellUtil {
    private static final int SUCCESS = 0;
    private static final Logger LOG = LoggerFactory.getLogger(RetryUtil.class);
    public static boolean exec(String [] command){
        try {
            Process process=Runtime.getRuntime().exec(command);
            read(process.getInputStream());
            StringBuilder errMsg= read(process.getErrorStream());
            // 等待程序执行结束并输出状态
            int exitCode = process.waitFor();
            if (exitCode == SUCCESS) {
                LOG.info("脚本执行成功");
                return true;
            } else {
                LOG.info("脚本执行失败[ERROR]:"+errMsg.toString());
                return false;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
    private static StringBuilder  read(InputStream inputStream) {
        StringBuilder resultMsg=new StringBuilder();
        try {
            BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
            String line;
            while ((line = reader.readLine()) != null) {
                resultMsg.append(line);
                resultMsg.append("\r\n");
            }
            return resultMsg;
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                inputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return null;
    }
}

6-测试准备

创建必须的临时目录

hadoop fs -mkdir /user/datax_tmp

创建目标mysql表

CREATE TABLE `user_hive_mysql` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `username` varchar(50),
  `telephone` varchar(30),
  `mail` varchar(50),
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;

创建hive表

CREATE TABLE `mysql_to_hive`(
  `id` int, 
  `username` string, 
  `telephone` string, 
  `mail` string)
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY '\t';

  接下来load数据到hive表中。

  配置datax任务的json文件

{
  "job": {
    "setting": {
      "speed": {
        "channel":1
      }
    },
    "content": [
      {
        "reader": {
          "name": "hivereader",
          "parameter": {
            "hiveSql": [
              "select username,telephone,mail from mysql_to_hive;"
            ],
            "defaultFS": "hdfs://hadoop001:8020"
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "writeMode": "insert",
            "username": "root",
            "password": "root123",
            "column": [
              "username",
              "telephone",
              "mail"
            ],
            "session": [
              "set session sql_mode='ANSI'"
            ],
            "preSql": [
              ""
            ],
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://localhost:3306/datax",
                "table": [
                  "user_hive_mysql"
                ]
              }
            ]
          }
         }
        }
    ]
  }
}


执行脚本:


python bin/datax.py ../../data/dataxJob/hive_to_mysql.json

测试结果

任务启动时刻                    : 2018-11-09 23:37:08
任务结束时刻                    : 2018-11-09 23:37:39
任务总计耗时                    :                 31s
任务平均流量                    :                8B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                  21
读写失败总数                    :                   0

hiveSql  也可以写多个sql:

"hiveSql": [
              "select username,telephone,mail from mysql_to_hive;",
              "select name,tel,age from t_orc;"
            ],

测试结果ok:

任务启动时刻                    : 2018-11-09 23:46:13
任务结束时刻                    : 2018-11-09 23:47:34
任务总计耗时                    :                 81s
任务平均流量                    :                3B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                  23
读写失败总数                    :                   0

  可以看出不管hive 源表不管是什么格式,都支持,因为我们的中间表是textfile  

7-思考:

如果想要一个源数据导入到多个目标源在怎么做?  
如果需要完整代码可以留言。

  







9人推荐
随时随地看视频
慕课网APP

热门评论

写sql的时候 哪里参数指定数据库?

你好,我想问下你的HiveReader 能否进行条件查询,还有我看你配置文件中的HiveReader的文件路径并没有填,能否分享下jar包和源码,谢谢

查看全部评论