手记

源码分析Kafka之Producer

**Kafka**是一款很棒的消息系统,可以看看我之前写的 [后端好书阅读与推荐][1]来了解一下它的整体设计。今天我们就来深入了解一下它的实现细节(我fork了一份[代码][2]),首先关注**Producer**这一方。


要使用kafka首先要实例化一个`KafkaProducer`,需要有**brokerIP、序列化器**等**必要Properties**以及**acks(0、1、n)、compression、retries、batch.size**等**非必要Properties**,通过这个简单的接口可以控制**Producer**大部分行为,实例化后就可以调用`send`方法发送消息了。


核心实现是这个方法:


    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {

        // intercept the record, which can be potentially modified; this method does not throw exceptions

        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);//①

        return doSend(interceptedRecord, callback);//②

    }

通过不同的模式可以实现**发送即忘**(忽略返回结果)、**同步发送**(获取返回的future对象,回调函数置为null)、**异步发送**(设置回调函数)三种消息模式。



<!--more-->



我们来看看消息类`ProducerRecord`有哪些属性:


    private final String topic;//主题

    private final Integer partition;//分区

    private final Headers headers;//头

    private final K key;//键

    private final V value;//值

    private final Long timestamp;//时间戳


它有多个构造函数,可以适应不同的消息类型:比如有无分区、有无key等。


①中`ProducerInterceptors`(有0 ~ 无穷多个,形成一个拦截链)对`ProducerRecord`进行拦截处理(比如打上时间戳,进行审计与统计等操作)


    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {

        ProducerRecord<K, V> interceptRecord = record;

        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {

            try {

                interceptRecord = interceptor.onSend(interceptRecord);

            } catch (Exception e) {

                // 不抛出异常,继续执行下一个拦截器

                if (record != null)

                    log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);

                else

                    log.warn("Error executing interceptor onSend callback", e);

            }

        }

        return interceptRecord;

    }

如果用户有定义就进行处理并返回处理后的`ProducerRecord`,否则直接返回本身。

然后②中`doSend`真正发送消息,并且是异步的(源码太长只保留关键):


    private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {

        TopicPartition tp = null;

        try {

            // 序列化 key 和 value

            byte[] serializedKey;

            try {

                serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());

            } catch (ClassCastException cce) {

            }

            byte[] serializedValue;

            try {

                serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());

            } catch (ClassCastException cce) {

            }

            // 计算分区获得主题与分区

            int partition = partition(record, serializedKey, serializedValue, cluster);

            tp = new TopicPartition(record.topic(), partition);

            // 回调与事务处理省略。

            Header[] headers = record.headers().toArray();

            // 消息追加到RecordAccumulator中

            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,

                    serializedValue, headers, interceptCallback, remainingWaitMs);

            // 该批次满了或者创建了新的批次就要唤醒IO线程发送该批次了,也就是sender的wakeup方法

            if (result.batchIsFull || result.newBatchCreated) {

                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);

                this.sender.wakeup();

            }

            return result.future;

        } catch (Exception e) {

            // 拦截异常并抛出

            this.interceptors.onSendError(record, tp, e);

            throw e;

        }

    }



下面是计算分区的方法:


    private int partition(ProducerRecord<K, V> record, 

    byte[] serializedKey, byte[] serializedValue, Cluster cluster) {

        Integer partition = record.partition();

        // 消息有分区就直接使用,否则就使用分区器计算

        return partition != null ?

                partition :

                partitioner.partition(

                        record.topic(), record.key(), serializedKey,

                         record.value(), serializedValue, cluster);

    }


默认的分区器`DefaultPartitioner`实现方式是如果partition存在就直接使用,否则根据key计算partition,如果key也不存在就使用round robin算法分配partition。


    /**

     * The default partitioning strategy:

     * <ul>

     * <li>If a partition is specified in the record, use it

     * <li>If no partition is specified but a key is present choose a partition based on a hash of the key

     * <li>If no partition or key is present choose a partition in a round-robin fashion

     */

    public class DefaultPartitioner implements Partitioner {

    

        private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

        

        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

            int numPartitions = partitions.size();

            if (keyBytes == null) {//key为空 

                int nextValue = nextValue(topic);

                List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);//可用的分区

                if (availablePartitions.size() > 0) {//有分区,取模就行

                    int part = Utils.toPositive(nextValue) % availablePartitions.size();

                    return availablePartitions.get(part).partition();

                } else {// 无分区,

                    return Utils.toPositive(nextValue) % numPartitions;

                }

            } else {// key 不为空,计算key的hash并取模获得分区

                return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

            }

        }

    

        private int nextValue(String topic) {

            AtomicInteger counter = topicCounterMap.get(topic);

            if (null == counter) {

                counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());

                AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);

                if (currentCounter != null) {

                    counter = currentCounter;

                }

            }

            return counter.getAndIncrement();//返回并加一,在取模的配合下就是round robin

        }

    }



以上就是发送消息的逻辑处理,接下来我们再看看消息发送的物理处理。


`Sender`(是一个`Runnable`,被包含在一个IO线程`ioThread`中,该线程不断从`RecordAccumulator`队列中的读取消息并通过`Selector`将数据发送给**Broker**)的`wakeup`方法,实际上是`KafkaClient`接口的`wakeup`方法,由`NetworkClient`类实现,采用了NIO,也就是`java.nio.channels.Selector.wakeup()`方法实现。


`Sender`的`run`中主要逻辑是不停执行准备消息和等待消息:


    long pollTimeout = sendProducerData(now);//③

    client.poll(pollTimeout, now);//④


③完成消息设置并保存到信道中,然后监听感兴趣的key,由`KafkaChannel`实现。


    public void setSend(Send send) {

        if (this.send != null)

            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);

        this.send = send;

        this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);

    }

    

    // transportLayer的一种实现中的相关方法

    public void addInterestOps(int ops) {

        key.interestOps(key.interestOps() | ops);

    }


④主要是`Selector`的`poll`,其select被wakeup唤醒:


    public void poll(long timeout) throws IOException {

        /* check ready keys */

        long startSelect = time.nanoseconds();

        int numReadyKeys = select(timeout);//wakeup使其停止阻塞

        long endSelect = time.nanoseconds();

        this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());


        if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {

            Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();


            // Poll from channels that have buffered data (but nothing more from the underlying socket)

            if (dataInBuffers) {

                keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice

                Set<SelectionKey> toPoll = keysWithBufferedRead;

                keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed

                pollSelectionKeys(toPoll, false, endSelect);

            }


            // Poll from channels where the underlying socket has more data

            pollSelectionKeys(readyKeys, false, endSelect);

            // Clear all selected keys so that they are included in the ready count for the next select

            readyKeys.clear();


            pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);

            immediatelyConnectedKeys.clear();

        } else {

            madeReadProgressLastPoll = true; //no work is also "progress"

        }


        long endIo = time.nanoseconds();

        this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());

    }

   


其中`pollSelectionKeys`方法会调用如下方法完成消息发送:


    public Send write() throws IOException {

        Send result = null;

        if (send != null && send(send)) {

            result = send;

            send = null;

        }

        return result;

    }

    

    private boolean send(Send send) throws IOException {

        send.writeTo(transportLayer);

        if (send.completed())

            transportLayer.removeInterestOps(SelectionKey.OP_WRITE);

        return send.completed();

    }


`Send`是一次数据发包,一般由`ByteBufferSend`或者`MultiRecordsSend`实现,其`writeTo`调用`transportLayer`的`write`方法,一般由`PlaintextTransportLayer`或者`SslTransportLayer`实现,区分是否使用**ssl**:

    

    public long writeTo(GatheringByteChannel channel) throws IOException {

        long written = channel.write(buffers);

        if (written < 0)

            throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");

        remaining -= written;

        pending = TransportLayers.hasPendingWrites(channel);

        return written;

    }

    

    public int write(ByteBuffer src) throws IOException {

        return socketChannel.write(src);

    }



到此就把**Producer**的**业务相关逻辑处理**和**非业务相关的网络** 2方面的主要流程梳理清楚了。其他额外的功能是通过一些配置保证的。


比如顺序保证就是`max.in.flight.requests.per.connection`,`InFlightRequests`的`doSend`会进行判断(由`NetworkClient`的`canSendRequest`调用),只要该参数设为1即可保证当前包未确认就不能发送下一个包从而实现有序性


    public boolean canSendMore(String node) {

        Deque<NetworkClient.InFlightRequest> queue = requests.get(node);

        return queue == null || queue.isEmpty() ||

               (queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection);

    }

    

再比如可靠性,通过设置**acks**,`Sender`中`sendProduceRequest`的`clientRequest`加入了回调函数:


        RequestCompletionHandler callback = new RequestCompletionHandler() {

            public void onComplete(ClientResponse response) {

                handleProduceResponse(response, recordsByPartition, time.milliseconds());//调用completeBatch

            }

        };

        

         /**

         * 完成或者重试投递,这里如果acks不对就会重试

         *

         * @param batch The record batch

         * @param response The produce response

         * @param correlationId The correlation id for the request

         * @param now The current POSIX timestamp in milliseconds

         */

        private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,

                                   long now, long throttleUntilTimeMs) {

        }

        

        public class ProduceResponse extends AbstractResponse {

          /**

             * Possible error code:

             * INVALID_REQUIRED_ACKS (21)

             */

        }

        

kafka源码一层一层包装很多,错综复杂,如有错误请大家不吝赐教。    


  [1]: https://segmentfault.com/a/1190000014724277#articleHeader3

  [2]: https://github.com/MageekChiu/kafka


0人推荐
随时随地看视频
慕课网APP