我准备了一个 kafka 生产者,它将一个 List 放入 kafka 主题中。它适用于 100 万行/记录。我得到的生产文件包含 1.1 亿多条记录。 在我的 KafkaProducer 处理如此庞大的数据的最佳方法是什么?
下面是代码,我曾经处理过 100 万条记录,将其放入 kafka 主题大约需要 4 分钟。
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.RandomAccessFile;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
public class KafkaSourceTask extends SourceTask {
private String filename;
private String topic;
private RandomAccessFile raf;
private long lastRecordedOffset = 0L;
private BufferedReader bufferedReader = null;
Schema schema = SchemaBuilder.struct().field("emp_id",
Schema.STRING_SCHEMA).field("name", Schema.STRING_SCHEMA)
.field("last_name", Schema.STRING_SCHEMA).field("department",
Schema.STRING_SCHEMA).build();
public void start(Map<String, String> props) {
filename = props.get("file");
topic = props.get("topic");
}
对此的任何帮助或建议将不胜感激。在此先感谢您。
慕无忌1623718
慕码人8056858
相关分类