手记

Flume 实现自己的实时日志(4)

最近接触到Flume,这里通过一些小案例做一些学习的分享。主要包括以下内容:
1-概念、2-源码编译、3-快速入门:https://www.imooc.com/article/278218
4-源码解读:https://www.imooc.com/article/278294
5-TAILDIR监听日志文件,源码修改、6-TAILDIR监听日志文件到HDFS的案例:
https://www.imooc.com/article/278481
7-TAILDIR监听日志文件到Kafka的案例
8-TAILDIR监听日志文件到ES6.X版本的案例(包括自己实现ES高版本的Sink)
注:本系列所有文章基于Flume 1.7.0
所有分析和注释代码都在:https://github.com/lizu18xz/flume-release-1.7.0

TAILDIR监听日志文件到Kafka的案例

(1)配置文件
1.配置文件
[taildir-avro.conf]
agent.sources = avro_sources01
agent.channels = avro_channel01
agent.sinks = avro_sink01

agent.sources.avro_sources01.type = TAILDIR
agent.sources.avro_sources01.positionFile = /home/elasticsearch/data/flume/taildir_position.json
agent.sources.avro_sources01.filegroups = f1
agent.sources.avro_sources01.filegroups.f1 = /home/elasticsearch/data/weblog/.*
agent.sources.avro_sources01.serviceName = dataWorks
agent.sources.avro_sources01.channels = avro_channel01

agent.channels.avro_channel01.type = memory
agent.channels.avro_channel01.capacity = 1000000
agent.channels.avro_channel01.transactionCapacity = 2000

agent.sinks.avro_sink01.type = avro
agent.sinks.avro_sink01.hostname = 192.168.88.129
agent.sinks.avro_sink01.port = 4545
agent.sinks.avro_sink01.channel = avro_channel01

[avro-kafka.conf]
agent.sources = kafka_sources01
agent.channels = kafka_channel01
agent.sinks = kafka_sink01

agent.sources.kafka_sources01.type = avro
agent.sources.kafka_sources01.bind = 192.168.88.129
agent.sources.kafka_sources01.port = 4545
agent.sources.kafka_sources01.channels = kafka_channel01

agent.channels.kafka_channel01.type = memory
agent.channels.kafka_channel01.capacity = 1000000
agent.channels.kafka_channel01.transactionCapacity = 6000

agent.sinks.kafka_sink01.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafka_sink01.kafka.topic = flume-kafka
agent.sinks.kafka_sink01.kafka.bootstrap.servers = localhost:9092
agent.sinks.kafka_sink01.flumeBatchSize = 6000
agent.sinks.kafka_sink01.kafka.producer.acks = 1
agent.sinks.kafka_sink01.kafka.producer.linger.ms = 1
agent.sinks.kafka_sink01.kafka.producer.compression.type = snappy
agent.sinks.kafka_sink01.channel = kafka_channel01

(2)启动kafka和flume
启动消费日志到kafka生产者的agent
./bin/flume-ng agent --conf conf -f /home/hadoop/app/data/avro-kafka.conf -n agent -Dflume.root.logger=INFO,console

启动生产日志的agent
bin/flume-ng agent --conf conf -f /home/elasticsearch/data/flume/taildir-avro.conf -n agent -Dflume.root.logger=INFO,console

(3)生产数据,启动kafka消费者
echo "2019-02-12 10:33:26 [com.faya.data.controller.LoginController]-[INFO] 用户登陆入参:userId = 10" >log.txt

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic flume-kafka --from-beginning

注意,需要现在kafka 创建flume-kafka的topic.

(4)kafka sink参数解析
http://flume.apache.org/releases/content/1.7.0/FlumeUserGuide.html#kafka-sink
可以在官网上面查看相关的参数
[type]	–	Must be set to org.apache.flume.sink.kafka.KafkaSink
类型必须这样设置,之前源码分析的时候讲过具体原因。
[kafka.bootstrap.servers,kafka.topic]都是基本的参数
[flumeBatchSize]  –  How many messages to process in one batch.一批处理多少条消息
注意和kafka 中的batch.size不是同一个:
如果接收的条数大于这个flumeBatchSize的数量,就会进行producer.flush(),
当达到配置的批次数量后,直接刷新,该方法会将数据全部生产到Kafka,Prevent(防止) 
linger.ms from holding the batch,防止linger.ms参数控制这个批次。
这个配置是flume进行控制的,和kafka的batch.size参数无关。
部分代码:

for (; processedEvents < batchSize; processedEvents += 1) {
        //从channel获取event
        event = channel.take();
        byte[] eventBody = event.getBody();
        Map<String, String> headers = event.getHeaders();

        eventTopic = headers.get(TOPIC_HEADER);
        if (eventTopic == null) {
          eventTopic = topic;
        }
        eventKey = headers.get(KEY_HEADER);
      
        Integer partitionId = null;
        try {
          ProducerRecord<String, byte[]> record;
         
          if (partitionId != null) {
            record = new ProducerRecord<String, byte[]>(eventTopic, partitionId, eventKey,
                serializeEvent(event, useAvroEventFormat));
          } else {
            record = new ProducerRecord<String, byte[]>(eventTopic, eventKey,
                serializeEvent(event, useAvroEventFormat));
          }
          kafkaFutures.add(producer.send(record, new SinkCallback(startTime)));
        } catch (NumberFormatException ex) {
          throw new EventDeliveryException("Non integer partition id specified", ex);
        } catch (Exception ex) {
          ...
          throw new EventDeliveryException("Could not send event", ex);
        }
      }
      //Prevent(防止) linger.ms from holding the batch,当达到配置的批次数量后,直接刷新,该方法会将数据全部生产到Kafka
      producer.flush();

[Other Kafka Producer Properties]	–	可以配置kafka producer的所有支持的
参数,For example: kafka.producer.linger.ms

[kafka.producer.linger.ms]    单位是ms      延迟 可以获取更多消息
[kafka.producer.batch.size]   单位是bytes   控制一次发送的大小,批处理

这两个都是kafka client里面的参数,当达到一个后就会进行批量发送数据了
关于kafka producer的相关参数可以到官网查看:
http://kafka.apache.org/documentation.html#producerapi

未完待续

8-TAILDIR监听日志文件到ES6.X版本的案例(包括自己实现ES高版本的Sink)

代码地址

https://github.com/lizu18xz/flume-release-1.7.0
0人推荐
随时随地看视频
慕课网APP