我通过这个指令创建了一个主题:
C:\kafka_2.12-0.10.2.1>.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test < C:\User11\Desktop\Data.csv
然后我测试了该主题是否正确使用了该数据。之后想在Flink程序中打印topic,我的程序是这样的:
try{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer09<String>("test", new SimpleStringSchema(),properties));
stream.print();
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
但是我得到了这个信息(因为信息太长我不得不写一些):
[main] INFO org.apache.flink.streaming.api.environment.LocalStreamEnvironment - 在本地嵌入式 Flink mini 集群上运行作业 [main] INFO org.apache.flink.runtime.minicluster.MiniCluster - 启动 Flink Mini Cluster [main] INFO org.apache.flink.runtime.minicluster.MiniCluster - 启动指标注册表 [main] INFO org.apache.flink.runtime.metrics.MetricRegistryImpl - 没有配置指标报告器,不会公开/报告指标。[main] INFO org.apache.flink.runtime.minicluster.MiniCluster - 启动 RPC 服务 [flink-akka.actor.default-dispatcher-2] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger 启动 [main] INFO org.apache.flink.runtime.minicluster.MiniCluster - 启动高可用性服务 [main] INFO org.apache.flink.runtime.blob.BlobServer - 创建 BLOB 服务器存储目录 C:
另外,我也看到了这个链接,但它没有解决我的问题: How to access/read kafka topic data from flink?
你能告诉我这里有什么问题吗?
白衣非少年
相关分类