我正在尝试 Flink 的新 Python 流 API 并尝试使用./flink-1.6.1/bin/pyflink-stream.sh examples/read_from_kafka.py. python 脚本相当简单,我只是尝试从现有主题中使用并将所有内容发送到 stdout(或日志目录中的 *.out 文件,默认情况下输出方法在该文件中发出数据)。
import glob
import os
import sys
from java.util import Properties
from org.apache.flink.streaming.api.functions.source import SourceFunction
from org.apache.flink.streaming.api.collector.selector import OutputSelector
from org.apache.flink.api.common.serialization import SimpleStringSchema
directories=['/home/user/flink/flink-1.6.1/lib']
for directory in directories:
for jar in glob.glob(os.path.join(directory,'*.jar')):
sys.path.append(jar)
from org.apache.flink.streaming.connectors.kafka import FlinkKafkaConsumer09
props = Properties()
config = {"bootstrap_servers": "localhost:9092",
"group_id": "flink_test",
"topics": ["TopicCategory-TopicName"]}
props.setProperty("bootstrap.servers", config['bootstrap_servers'])
props.setProperty("group_id", config['group_id'])
props.setProperty("zookeeper.connect", "localhost:2181")
def main(factory):
consumer = FlinkKafkaConsumer09([config["topics"]], SimpleStringSchema(), props)
env = factory.get_execution_environment()
env.add_java_source(consumer) \
.output()
env.execute()
我从 Maven 存储库中抓取了一些 jar 文件,即flink-connector-kafka-0.9_2.11-1.6.1.jar,flink-connector-kafka-base_2.11-1.6.1.jar并将kafka-clients-0.9.0.1.jar它们复制到 Flink 的lib目录中。除非我误解了文档,否则这足以让 Flink 加载 kafka 连接器。确实,如果我删除这些 jar 中的任何一个,导入将失败,但这似乎不足以实际调用该计划。添加一个 for 循环来动态添加这些sys.path也不起作用。
九州编程
心有法竹
相关分类