Read.java
public class Read {
public static void main(String[] args) {
String conn = "db_url";
String username = "*****";
String pwd = "*****";
String sql = "INSERT INTO table (coloumn) values (?)";
Properties props = new Properties();
props.put("bootstrap.servers", "10.247.36.174:3306");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
rops.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = null;
try (Connection con = DriverManager.getConnection(conn, username, pwd);
PreparedStatement ps = con.prepareStatement(sql);
BufferedReader br = new BufferedReader(new FileReader("All.log"));) {
String line = null;
processMessages(line, br, listparam, ps);
break;
}
} catch (Exception ex) {
System.err.println("Error in \n" + ex);
} finally {
producer = new KafkaProducer<>(props);
String msg = "Done";
producer.send(new ProducerRecord<String, String>("HelloKafka", msg));
System.out.println("Sent: " + msg);
}
producer.close();
}
我是卡夫卡的新手。有人可以告诉我哪一部分做错了吗?我不知道如何在java中使用Kafka。在 ReadLg.java 中,我想从日志文件中读取并将其插入到数据库中,然后在完成后我想向 RetrieveData.java 发送一条消息,以便它可以启动。检索数据将运行,但空闲等待来自 ReadLg.java 的消息。这是一个不好的方法吗?还是旧方法?有什么建议或帮助解决这个问题吗?我不断收到错误“无法连接到 IP 地址”,
MYYA
相关分类