如何从 Kafka Producer 返回具有数十亿条记录的 ArrayList?

我准备了一个 kafka 生产者,它将一个 List 放入 kafka 主题中。它适用于 100 万行/记录。我得到的生产文件包含 1.1 亿多条记录。 在我的 KafkaProducer 处理如此庞大的数据的最佳方法是什么?


下面是代码,我曾经处理过 100 万条记录,将其放入 kafka 主题大约需要 4 分钟。


import java.io.BufferedReader;

import java.io.File;

import java.io.FileInputStream;

import java.io.IOException;

import java.io.InputStreamReader;

import java.io.RandomAccessFile;

import java.nio.charset.StandardCharsets;

import java.util.ArrayList;

import java.util.List;

import java.util.Map;


import org.apache.kafka.connect.data.Schema;

import org.apache.kafka.connect.data.SchemaBuilder;

import org.apache.kafka.connect.data.Struct;

import org.apache.kafka.connect.source.SourceRecord;

import org.apache.kafka.connect.source.SourceTask;


public class KafkaSourceTask extends SourceTask {


    private String filename;


    private String topic;


    private RandomAccessFile raf;


    private long lastRecordedOffset = 0L;


    private BufferedReader bufferedReader = null;


    Schema schema = SchemaBuilder.struct().field("emp_id", 

            Schema.STRING_SCHEMA).field("name", Schema.STRING_SCHEMA)

            .field("last_name", Schema.STRING_SCHEMA).field("department", 

            Schema.STRING_SCHEMA).build();


public void start(Map<String, String> props) {

    filename = props.get("file");

    topic = props.get("topic");


}


对此的任何帮助或建议将不胜感激。在此先感谢您。


一只萌萌小番薯
浏览 124回答 2
2回答

慕无忌1623718

首先,在将 Kafka 生产者批处理记录发送给代理之前,您应该检查并使用这两个配置linger.ms&nbsp;和batch.record.size.现在您可以使用另一个线程来读取文件(我认为它是每行一条记录)并将它们放入 java 队列中,并使用托管 kafka 生产者的线程连续读取该队列。多个生产者被认为是一种反模式,尤其是在写入 Kafka 主题时,请查看 Single Writer Principle。好吧,无论哪种方式,您都必须稍微调整一下您的 kafka 生产者,但就像@cricket_007 所说,您应该考虑使用带有文件 csv 连接器的 kafka 连接,至少如果您找不到适合您的连接器,您可以开发一个连接器自己。希望这会有所帮助。

慕码人8056858

拥有数十亿条记录的 ArrayList?想想看,如果你有 10 亿条记录,而每条记录的大小只有 1 个字节(一个可笑的低估),你就有 1 个 SI GB 的内存消耗。根据“大数据”的粗略和现成的定义,作为不适合单个主机上内存的数据,你要么处于边缘,要么超过那个点,你需要开始使用大数据技术。首先你可以尝试多线程,然后你可以在多台机器上尝试多线程,这是使用 Kafka 的优势——客户端 API——无论是从消费端还是生产端,都可以轻松实现。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java