手记

Flume 实现自己的实时日志(3)

最近接触到Flume,这里通过一些小案例做一些学习的分享。主要包括以下内容:
1-概念、2-源码编译、3-快速入门:https://www.imooc.com/article/278218
4-源码解读:https://www.imooc.com/article/278294
5-TAILDIR监听日志文件,源码修改,新增系统名称
6-TAILDIR监听日志文件到HDFS的案例
7-TAILDIR监听日志文件到Kafka的案例
8-TAILDIR监听日志文件到ES6.X版本的案例(包括自己实现ES高版本的Sink)
注:本系列所有文章基于Flume 1.7.0
所有分析和注释代码都在:https://github.com/lizu18xz/flume-release-1.7.0

TAILDIR监听日志文件

对于TAILDIR的使用,大家可以到
http://flume.apache.org/releases/content/1.7.0/FlumeUserGuide.html去查看
具体的细节。
如果我想在每次收集到的日志前面都加上来自哪个应用或者系统的日志,那么就需要简单对
TAILDIR Source的代码进行修改。
比如:
源格式:2019-02-12 10:33:26 [com.faya.data.controller.LoginController]-[INFO] 用户登陆入参:userId = 10
修改后:serviceWeb 2019-02-12 10:33:26 [com.faya.data.controller.LoginController]-[INFO] 用户登陆入参:userId = 10

(1)源码修改,org.apache.flume.source.taildir.TaildirSource
首先关注configure方法,这里可以获取到配置文件配置的参数,我们可以在这里入手:
//add 新增,获取系统名称,默认是webLog
serviceName=context.getString("serviceName", "webLog");
然后找到process()方法,这里是具体监听文件的地方。
        //add 设置获取的系统名称
        tf.setServiceName(serviceName);
        if (tf.needTail()) {
          tailFileProcess(tf, true);
        }
最后会到org.apache.flume.source.taildir.TailFile里面真正获取数据
Event readEvent(boolean backoffWithoutNL, boolean addByteOffset)
修改此方法,获取内容,然后追加系统名称,重新构建event,然后返回
    String row=new String(line.line, Charset.defaultCharset());
    if(row.contains("INFO")==true || 
       row.contains("WARN")==true || row.contains("ERROR")==true ||
       row.contains("DEBUG")==true){
       row=serviceName+" "+row;
    }
    Event event = EventBuilder.withBody(row,Charset.defaultCharset());
    //Event event = EventBuilder.withBody(line.line);

(2)配置修改
经过以上修改后,可以在配置文件中新增serviceName的配置
agent.sources.avro_sources01.type = TAILDIR
agent.sources.avro_sources01.serviceName = dataWorks
(3)测试
打包测试

TAILDIR监听日志文件到HDFS的案例

后面的小案例我们都会引入avro source和avro sink.简单来说就是类似rpc服务一样。
可以到官网上面搜索:Setting multi-agent flow 就可以明白了。
http://flume.apache.org/releases/content/1.7.0/FlumeUserGuide.html

(1)配置文件 
[taildir-avro.conf]
agent.sources = avro_sources01
agent.channels = avro_channel01
agent.sinks = avro_sink01

agent.sources.avro_sources01.type = TAILDIR
agent.sources.avro_sources01.positionFile = /home/elasticsearch/data/flume/taildir_position.json
agent.sources.avro_sources01.filegroups = f1
agent.sources.avro_sources01.filegroups.f1 = /home/elasticsearch/data/weblog/.*
agent.sources.avro_sources01.serviceName = dataWorks
agent.sources.avro_sources01.channels = avro_channel01

agent.channels.avro_channel01.type = memory
agent.channels.avro_channel01.capacity = 1000000
agent.channels.avro_channel01.transactionCapacity = 2000

agent.sinks.avro_sink01.type = avro
agent.sinks.avro_sink01.hostname = 192.168.88.129
agent.sinks.avro_sink01.port = 4545
agent.sinks.avro_sink01.channel = avro_channel01

[avro-hdfs.conf]
agent.sources = hdfs_sources01
agent.channels = hdfs_channel01
agent.sinks = hdfs_sink01

agent.sources.hdfs_sources01.type = avro
agent.sources.hdfs_sources01.bind = 192.168.88.129
agent.sources.hdfs_sources01.port = 4545
agent.sources.hdfs_sources01.channels = hdfs_channel01

agent.channels.hdfs_channel01.type = memory
agent.channels.hdfs_channel01.capacity = 1000000
agent.channels.hdfs_channel01.transactionCapacity = 6000

agent.sinks.hdfs_sink01.type = hdfs
agent.sinks.hdfs_sink01.hdfs.path = hdfs://192.168.88.129:8020/flumeSink/
agent.sinks.hdfs_sink01.hdfs.filePrefix = log-
agent.sinks.hdfs_sink01.hdfs.rollInterval = 0
agent.sinks.hdfs_sink01.hdfs.rollSize = 1048576
agent.sinks.hdfs_sink01.hdfs.rollCount = 0
agent.sinks.hdfs_sink01.hdfs.batchSize = 10
agent.sinks.hdfs_sink01.hdfs.writeFormat = text
agent.sinks.hdfs_sink01.hdfs.fileType = DataStream
agent.sinks.hdfs_sink01.channel = hdfs_channel01

(2)启动hdfs
(3)分别启动flume
启动消费日志的agent
./bin/flume-ng agent --conf conf -f /home/hadoop/app/data/avro-hdfs.conf -n agent -Dflume.root.logger=INFO,console

启动生产日志的agent
./bin/flume-ng agent --conf conf -f /home/elasticsearch/data/flume/taildir-avro.conf -n agent -Dflume.root.logger=INFO,console

(4)模拟生产日志信息
echo "2019-02-12 10:33:26 [com.faya.data.controller.LoginController]-[INFO] 用户登陆入参:userId = 10" >log.txt

(5)查看hdfs文件
hadoop fs -text /flumeSink/log-.1549943212865.tmp

(6)相关参数含义,来自官网:
HDFS Sink
hdfs.rollInterval	30	Number of seconds to wait before rolling current file (0 = never roll based on time interval)
hdfs.rollSize	1024	File size to trigger roll, in bytes (0: never roll based on file size)
hdfs.rollCount	10	Number of events written to file before it rolled (0 = never roll based on number of events)

未完待续

7-TAILDIR监听日志文件到Kafka的案例
8-TAILDIR监听日志文件到ES6.X版本的案例(包括自己实现ES高版本的Sink)

代码地址

https://github.com/lizu18xz/flume-release-1.7.0
0人推荐
随时随地看视频
慕课网APP