手记

Flume学习系列(七)---- 自定义Sink到Mysql

前言:接上一篇,我们总结了一下自定义Sink的流程,这次我们实现一个自己的Sink,将数据Sink到Mysql数据库中。我们还是使用 Flume学习系列(二)----实战Spooling到HDFS中的的源,但是Sink我们不用HDFS,用MysqlSink。

一、创建数据库相关

create database flume;use flume;DROP TABLE IF EXISTS `income`;CREATE TABLE `income` (  `userid` varchar(36) NOT NULL , #用户唯一编号  `county` varchar(3) NOT NULL, #县  `town` varchar(3) NOT NULL,  #镇  `income` int(11) DEFAULT NULL, #收入
  PRIMARY KEY (`userid`)
) ENGINE=InnoDB  DEFAULT CHARSET=utf8;

二、编写自定义MysqlSink

    回顾一下数据的格式:(为了插入数据方便,我把最后一部分之间的####变成了逗号,同时给每个字段都加上了单引号)

[INFO ] 2018-08-20 18:40:20 'e2a07cc1-f0e4-46e0-8bad-59303b1085fd','AMU','bml','2148168'

    [INFO]与[2018-08-20 18:40:20]与['e2a07cc1-f0e4-46e0-8bad-59303b1085fd','AMU','bml','2148168']之间用制表符分割。这个就是我们的body的内容,一会通过split去切。

    自定义Sink代码如下:

package com.zhb.flume;import java.sql.Connection;import java.sql.DriverManager;import java.sql.SQLException;import java.sql.Statement;import org.apache.flume.Channel;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.EventDeliveryException;import org.apache.flume.Transaction;import org.apache.flume.conf.Configurable;import org.apache.flume.sink.AbstractSink;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.google.common.base.Preconditions;public class MysqlSinker extends AbstractSink implements Configurable {    private static final Logger logger = LoggerFactory.getLogger(MysqlSinker.class);    private Connection connect;    private Statement stmt;    private String columnName;    private String url;    private String user;    private String password;    private String tableName;    // 在整个sink结束时执行一遍
    @Override
    public synchronized void stop() {        // TODO Auto-generated method stub
        super.stop();
    }    // 在整个sink开始时执行一遍,用来初始化数据库连接
    @Override
    public synchronized void start() {        // TODO Auto-generated method stub
        super.start();        try {
            connect = DriverManager.getConnection(url, user, password);            // 连接URL为 jdbc:mysql//服务器地址/数据库名 ,后面的2个参数分别是登陆用户名和密码
            stmt = connect.createStatement();
        } catch (SQLException e) {            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }    // 不断循环调用,处理消息Event(本例就是插入数据库)
    public Status process() throws EventDeliveryException {        // TODO Auto-generated method stub
        //事务,获取event什么的都是模板。仿照别的sink写就OK
        Channel ch = getChannel();
        Transaction txn = ch.getTransaction();
        Event event = null;
        txn.begin();        while (true) {
            event = ch.take();            if (event != null) {                break;
            }
        }        try {
            String rawbody = new String(event.getBody());            //logger.error("rawbody:"+rawbody);
            String body = rawbody.split("\t")[2];            //logger.error("spiltbody:"+body);
            if (body.split(",").length == columnName.split(",").length) {
                String sql = "insert into " + tableName + "(" + columnName + ") values(" + body + ")";  
                //logger.error("sql:"+sql);
                stmt.executeUpdate(sql);
                txn.commit();                return Status.READY;
            } else {
                txn.rollback();                return null;
            }
        } catch (Throwable th) {
            txn.rollback();            if (th instanceof Error) {                throw (Error) th;
            } else {                throw new EventDeliveryException(th);
            }
        } finally {
            txn.close();
        }

    }    //从配置文件中读取各种属性,并进行一些非空验证
    public void configure(Context context) {
        columnName = context.getString("column_name");
        Preconditions.checkNotNull(columnName, "column_name must be set!!");
        url = context.getString("url");
        Preconditions.checkNotNull(url, "url must be set!!");
        user = context.getString("user");
        Preconditions.checkNotNull(user, "user must be set!!");        //我的mysql没有密码。所以这里不检查密码为空
        password = context.getString("password");        // Preconditions.checkNotNull(password, "password must be set!!");
        tableName = context.getString("tableName");
        Preconditions.checkNotNull(tableName, "tableName must be set!!");
    }

}

    将写好的程序打成jar包放到flume的lib下。

三、编写配置文件

    flume.conf的内容如下:

# my application flume configuration
#agent2 name
agent2.sources=source2
agent2.sinks=sink2
agent2.channels=channel2


#Spooling Directory
#set source2
agent2.sources.source2.type=spooldir
agent2.sources.source2.spoolDir=/Users/jsj/eclipse-workspace/logs

agent2.sources.source2.channels=channel2
agent2.sources.source2.fileHeader = falseagent2.sources.source2.interceptors = i1
agent2.sources.source2.interceptors.i1.type = timestamp

#set sink2
agent2.sinks.sink2.type=com.zhb.flume.MysqlSinker
agent2.sinks.sink2.url =jdbc:mysql://127.0.0.1:3306/flumeagent2.sinks.sink2.tableName= income
agent2.sinks.sink2.user=root
#为空就不写,不要写""agent2.sinks.sink2.password=
agent2.sinks.sink2.column_name=userid,county,town,income
agent2.sinks.sink2.channel=channel2

#set channel2
agent2.channels.channel2.type=memory
agent2.channels.channel2.capacity=10000agent2.channels.channel2.transactionCapacity=1000agent2.channels.channel2.keep-alive=30

四、验证

    进入到flume的bin目录下,执行./flume-ng agent -c ../conf -f ../conf/flume.conf -Dflume.root.logger=INFO,console -n agent 2

    成功启动flume后,新开一个终端,将生成的log文件拷贝到spooling 监控的文件夹下:cp /Users/jsj/eclipse-workspace/log4j/src/main/java/testlog.log* /Users/jsj/eclipse-workspace/logs
看下数据库。

001.jpg


    大功告成,成功插入到数据库。自定义Sink成功了。

五、总结

    本文实现了自定义sink,将数据sink到mysql中。说一下心得吧,中间有问题的时候怎么调试,就在你编写的类里用logger去输出你想看的日志即可。我在上面的代码中注释掉了。至此,flume的绝大部分内容都结束了。



作者:小北觅
链接:https://www.jianshu.com/p/70911083784c


0人推荐
随时随地看视频
慕课网APP