猿问

Apache Spark 可以使用 TCP 侦听器作为输入吗?

Apache Spark 可以使用 TCP 侦听器作为输入吗?如果是,也许有人有执行该操作的 java 代码示例。

我试图找到关于此的示例,但所有教程都展示了如何通过 TCP 定义到数据服务器的输入连接,而不是使用等待传入数据的 TCP 侦听器。


12345678_0001
浏览 123回答 2
2回答

繁星点点滴滴

是的,可以使用 Spark 监听 TCP 端口并处理任何传入数据。您正在寻找的是Spark Streaming。为了方便:import org.apache.spark.*;import org.apache.spark.api.java.function.*;import org.apache.spark.streaming.*;import org.apache.spark.streaming.api.java.*;import scala.Tuple2;// Create a local StreamingContext with two working thread and batch interval of 1 secondSparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));// Create a DStream that will connect to hostname:port, like localhost:9999JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);// Split each line into wordsJavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());// Count each word in each batchJavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);// Print the first ten elements of each RDD generated in this DStream to the consolewordCounts.print();jssc.start();              // Start the computationjssc.awaitTermination();   // Wait for the computation to terminate

慕姐8265434

Spark没有内置的TCP服务器来等待生产者和缓冲数据。Spark 通过其 API 库在 TCP、Kafka 等的轮询机制上工作。要使用传入的 TCP 数据,您需要有一个 Spark 可以连接到的外部 TCP 服务器,如 Shaido 在示例中所解释的那样。
随时随地看视频慕课网APP

相关分类

Java
我要回答