猿问

如何在 Flink 程序中打印 kafka 主题数据?

我通过这个指令创建了一个主题:


C:\kafka_2.12-0.10.2.1>.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test < C:\User11\Desktop\Data.csv

然后我测试了该主题是否正确使用了该数据。之后想在Flink程序中打印topic,我的程序是这样的:


 try{

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


    Properties properties = new Properties();

    DataStream<String> stream = env

            .addSource(new FlinkKafkaConsumer09<String>("test", new SimpleStringSchema(),properties));


           stream.print();

    env.execute();

    } catch (Exception e) {

        e.printStackTrace();

    }

但是我得到了这个信息(因为信息太长我不得不写一些):


[main] INFO org.apache.flink.streaming.api.environment.LocalStreamEnvironment - 在本地嵌入式 Flink mini 集群上运行作业 [main] INFO org.apache.flink.runtime.minicluster.MiniCluster - 启动 Flink Mini Cluster [main] INFO org.apache.flink.runtime.minicluster.MiniCluster - 启动指标注册表 [main] INFO org.apache.flink.runtime.metrics.MetricRegistryImpl - 没有配置指标报告器,不会公开/报告指标。[main] INFO org.apache.flink.runtime.minicluster.MiniCluster - 启动 RPC 服务 [flink-akka.actor.default-dispatcher-2] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger 启动 [main] INFO org.apache.flink.runtime.minicluster.MiniCluster - 启动高可用性服务 [main] INFO org.apache.flink.runtime.blob.BlobServer - 创建 BLOB 服务器存储目录 C:


另外,我也看到了这个链接,但它没有解决我的问题: How to access/read kafka topic data from flink?


你能告诉我这里有什么问题吗?


函数式编程
浏览 238回答 1
1回答

白衣非少年

问题解决了。首先,我用这个命令填满了 Kafka 主题:/home/kafka_2.11-2.0.0/bin/kafka-console-producer.sh --broker-list 10.32.0.2:9092,10.32.0.3:9092,10.32.0.4:9092 --topic flinkTopic < transactions2.csv然后,使用此代码,我可以打印 Kafka 主题:&nbsp;final StreamExecutionEnvironment env =&nbsp;&nbsp;StreamExecutionEnvironment.getExecutionEnvironment();&nbsp;Properties prop = new Properties();&nbsp;prop.setProperty("bootstrap.servers",&nbsp;&nbsp;"10.32.0.2:9092,10.32.0.3:9092,10.32.0.4:9092");&nbsp;prop.setProperty("group.id", "test");&nbsp; &nbsp; FlinkKafkaConsumer<String> myConsumer= new FlinkKafkaConsumer<>&nbsp;&nbsp; ("flinkTopic", new SimpleStringSchema(),prop);&nbsp; &nbsp; myConsumer.setStartFromEarliest();&nbsp; &nbsp; DataStream<String> stream = env.addSource(myConsumer);&nbsp; &nbsp; stream.print();&nbsp; &nbsp; env.execute("Flink Streaming Java API Skeleton");我希望它对其他人有用。
随时随地看视频慕课网APP

相关分类

Java
我要回答