手记

Kafka学习分享(2)

1-消费者和消费组
2-KafkaConsumer的使用
3-KafkaConsumer在flume中的使用

消费者和消费组

消费者是负责订阅Kafka中的Topic,并且从订阅的主题上面拉取消息。
在Kafka中还有一层消费组的概念,每个消费组都有一个对应的消费组。
需要了解的是:每个消费者只能消费所分配到的分区中的消息。每一个分区的消息只能被一个
消费组中的一个消费者所消费。
每个消费组都有一个固定的名称,可以通过消费者客户端的参数group.id指定。

KafkaConsumer的使用

首先添加maven依赖 <kafka.version>0.9.0.1</kafka.version>
      <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>${kafka.version}</version>
      </dependency>

一般会按照以下方式去进行开发:
配置消费者客户端的参数,创建消费者实例
订阅需要消费的主题
拉取消息进行消费
提交消费的位移
关闭消费者
private static KafkaConsumer<String, String> createConsumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.88.129:9092");
        String consumeGroup = "cg1";
        props.put("group.id", consumeGroup);
        // Set this property, if auto commit should happen.
        props.put("enable.auto.commit", "false");
        props.put("auto.commit.interval.ms", "999999999999");
        // This is how to control number of messages being read in each poll
        props.put("max.partition.fetch.bytes", "135");
        props.put("heartbeat.interval.ms", "3000");
        props.put("session.timeout.ms", "6001");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return new KafkaConsumer<String, String>(props);
    }
private static void processRecords(KafkaConsumer<String, String> consumer) {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            long lastOffset = 0;
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("\n\roffset = %d, partition = %s, key = %s, value = %s\n", record.offset(), record.partition(),
                        record.key(), record.value());
                lastOffset = record.offset();
            }
            System.out.println("\n\r===========>lastOffset read: " + lastOffset);
            process();//处理拉取的数据
            consumer.commitSync();
        }
    }
上面简单实现了一个消费者。我们来进一步了解一下KafkaConsumer。
一般来说创建好消费者后,就需要订阅主题和分区了,Kafka支持直接订阅某些主题的特定分区。
在KafkaConsumer中使用assign()方法来实现这些功能。此方法具体如下:
public void assign(Collection<TopicPartition>partitions)

对于生产者来说,会有序列化器的功能,所以消费者也会提供一个反序列化器让我们获取到我们需要的
value.

在Kafka中消息的消费是基于拉模式的。Kafka中的消息消费是一个不断轮训的过程,消费者重复的
调用poll()方法。
消费者消费到的每条消息类型为ConsumerRecord.
poll()方法的返回值类型是ConsumerRecords,它用来表示一次拉取操作所获得的消息集内部包含
了若干的ConsumerRecord。

接下来就是消费位移的提交了。kafkaConsumer默认会自动提交位移。我们也可以关闭自动提交,
实现手动提交位移。这里先不做具体的讨论,后续会单独进行讨论。

对比KafkaProducer,KafkaConsumer是非线程安全的。但是并不意味着我们在消费消息的时候只能以单线程的方式进行。
我们可以通过多线程的方式进行消费。
即每个线程实例化一个KafkaConsumer对象,一个线程对应一个KafkaConsumer实例。
一个消费线程可以消费一个或者多个分区,所有的消费线程都属于同一个消费组。

当然还有其他的实现方式,后续继续讨论

KafkaConsumer在flume中的使用

在KafkaSource中我们可以看到比较完整的KafkaConsumer的使用
1-doConfigure
这里进行配置消费者客户端的参数
 private void setConsumerProps(Context ctx) {
    kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                   KafkaSourceConstants.DEFAULT_KEY_DESERIALIZER);
    kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                   KafkaSourceConstants.DEFAULT_VALUE_DESERIALIZER);
    //Defaults overridden based on config
    kafkaProps.putAll(ctx.getSubProperties(KafkaSourceConstants.KAFKA_CONSUMER_PREFIX));
    //These always take precedence over config
    kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    if (groupId != null) {
      kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    }
    kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
                   KafkaSourceConstants.DEFAULT_AUTO_COMMIT);
  }

2-doStart()
这里进行初始化消费者实例,订阅主题,拉取消息。
    //initialize a consumer.
    consumer = new KafkaConsumer<String, byte[]>(kafkaProps);

    // Subscribe for topics by already specified strategy
    subscriber.subscribe(consumer, new SourceRebalanceListener(rebalanceFlag));

    // Connect to kafka. 1 second is optimal time.
    it = consumer.poll(1000).iterator();
    log.info("Kafka source {} started.", getName());

3-doProcess()
这里对拉取的消息进行消费处理。
        // get next message
        ConsumerRecord<String, byte[]> message = it.next();
        kafkaKey = message.key();
        kafkaMessage = message.value();
        
        //可以直接获取到消息内容
        eventBody = message.value();
        headers.clear();
        headers = new HashMap<String, String>(4);

        //组装消息信息到event,加入到批次里面
        event = EventBuilder.withBody(eventBody, headers);
        eventList.add(event);

        // 对于每个分区,存储将要读取的下一个偏移量
        tpAndOffsetMetadata.put(new TopicPartition(message.topic(), message.partition()),
                new OffsetAndMetadata(message.offset() + 1, batchUUID));

      //开始提交一批次到channel中,提交偏移量到kafka
      if (eventList.size() > 0) {
        counter.addToKafkaEventGetTimer((System.nanoTime() - nanoBatchStartTime) / (1000 * 1000));
        counter.addToEventReceivedCount((long) eventList.size());
        //提交一批次到channel中,需要时间
        //注意:如果新增了一个消费者,这里耗时过久的话,另一个消费者已经启动,可能会造成还没有提交到kafka,导致这些没有被提交的数据会被重复消费

        getChannelProcessor().processEventBatch(eventList);
        counter.addToEventAcceptedCount(eventList.size());
        if (log.isDebugEnabled()) {
          log.debug("Wrote {} events to channel", eventList.size());
        }
        eventList.clear();

        //提交偏移量到kafka,同步提交 CommitFailedException
        if (!tpAndOffsetMetadata.isEmpty()) {
          long commitStartTime = System.nanoTime();
          consumer.commitSync(tpAndOffsetMetadata);
          long commitEndTime = System.nanoTime();
          counter.addToKafkaCommitTimer((commitEndTime - commitStartTime) / (1000 * 1000));
          tpAndOffsetMetadata.clear();
        }
        return Status.READY;
      }

这里包含了手动消费位移的提交.在完整的flume代码中还涉及到了消费者的再均衡操作。
后续会详细的讲解。这里整体看一下在flume中是怎么使用KafkaConsumer的。

后续

消费位移的提交方式
多线程消费探讨
再均衡
0人推荐
随时随地看视频
慕课网APP