手记

IDEA开发Spark Streaming

本人使用spark2.1.0, scala 2.11.0 hadoop 2.7.3。打开IDEA新建一个Maven工程,
去Spark官网查找maven对spark的配置,比如寻找kafka的配置,maven中的使用

Source  ArtifactKafka   spark-streaming-kafka-0-8_2.11Flume   spark-streaming-flume_2.11Kinesis spark-streaming-kinesis-asl_2.11 [Amazon Software License]

最后我们在pom.xml中的配置如下:

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.kason.spark</groupId>
    <artifactId>spark_platform_learn</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spark.version>2.1.0</spark.version>
        <scala.version>2.11</scala.version>
        <hadoop.version>2.7.3</hadoop.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.39</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    </dependencies>

    <!-- maven官方 http://repo1.maven.org/maven2/  或 http://repo2.maven.org/maven2/ (延迟低一些) -->
    <repositories>
        <repository>
            <id>central</id>
            <name>Maven Repository Switchboard</name>
            <layout>default</layout>
            <url>http://repo2.maven.org/maven2</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>

        <plugins>
            <plugin>
                <!-- MAVEN 编译使用的JDK版本 -->
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build></project>

SparkStreaming Demo code:

package com.scala.action.streamingimport org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}/**
  * Created by kason_zhang on 4/7/2017.
  */object SparkStreaming {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[3]").setAppName("BasicStreamingExample")
    val ssc = new StreamingContext(conf, Seconds(5))

    val lines = ssc.socketTextStream("10.64.24.78" , 9999)
    val words = lines.flatMap(_.split(" "))
    val wc = words.map(x => (x, 1)).reduceByKey((x, y) => x + y)
    wc.print
    wc.saveAsTextFiles("D:\\work\\cloud\\log\\word.txt")
    println("pandas: sscstart")
    ssc.start()
    println("pandas: awaittermination")
    ssc.awaitTermination()
    println("pandas: done!")
  }

}

Centos 安装nc(netstat)

yum install nc

启动监听端口Server
nc -lk 9999
防火墙开启9999端口

[kason@kason Desktop]$ suPassword: [root@kason Desktop]# /sbin/iptables -I INPUT -p tcp --dport 9999 -j ACCEPT[root@kason Desktop]# /etc/init.d/iptables saveiptables: Saving firewall rules to /etc/sysconfig/iptables:[  OK  ]
[root@kason Desktop]# service iptables restartiptables: Setting chains to policy ACCEPT: filter          [  OK  ]iptables: Flushing firewall rules:                         [  OK  ]iptables: Unloading modules:                               [  OK  ]iptables: Applying firewall rules:                         [  OK  ]
[root@kason Desktop]# vi /etc/init.d/iptables [root@kason Desktop]# /etc/init.d/iptables status

之后Linux下执行nc -lk 9999然后就可以在下面输入要发送的数据

[kason@kason Desktop]$ nc -lk 9999hell owoo
goo ndate
better best
goo ndate
vi /e   
hello world
hello world out
hell o
kkl portal
hell o kkl

IDEA的输出结果如下:

-------------------------------------------
Time: 1491875390000 ms
-------------------------------------------
(o,1)
(kkl,1)
(hell,1)

因此使用IDEA开发一个简单的demo就完成了。



作者:kason_zhang
链接:https://www.jianshu.com/p/fd28dcdf3a1b


0人推荐
随时随地看视频
慕课网APP