手记

Sping boot结合Spark做成一个长服务

功能

常见的Spark离线任务可能处理完就结束了,这里我们结合SpringBoot实现一个可以一直运行的Spark任务,在这种基础上可以做更多的扩展功能。

项目结构

创建一个maven项目:
src
   main--
       java
       resource
       scala

包含java和scala代码。因为包含scala,所以需要在pom中新增如下插件:
          
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.3.1</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                    <recompileMode>incremental</recompileMode>
                    <javacArgs>
                        <javacArg>-Xlint:unchecked</javacArg>
                        <javacArg>-Xlint:deprecation</javacArg>
                    </javacArgs>
                    <args>
                        <!-- work-around for https://issues.scala-lang.org/browse/SI-8358 -->
                        <arg>-nobootcp</arg>
                    </args>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>

打包方式

1-首先我们想要所有依赖的包单独放在一个目录下,这样我们更新代码之后只需要传递一个很小的jar包。
需要新增如下两个插件。
		     <plugin>
		                <groupId>org.apache.maven.plugins</groupId>
		                <artifactId>maven-jar-plugin</artifactId>
		                <configuration>
		                    <archive>
		                        <manifest>
		                            <addClasspath>true</addClasspath>
		                            <classpathPrefix>lib/</classpathPrefix>
		                            <mainClass>com.demo.WebApplication</mainClass>
		                        </manifest>
		                        <manifestEntries>
		                            <version>${project.version}</version>
		                        </manifestEntries>
		                    </archive>
		                </configuration>
		            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <executions>
                    <execution>
                        <id>copy</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <!--指定的依赖路径-->
                            <outputDirectory>
                                ${project.build.directory}/lib
                            </outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

2-针对spark相关的包进行排除:<scope>provided</scope>
       <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>

		 <dependency>
		            <groupId>org.apache.spark</groupId>
		            <artifactId>spark-core_2.11</artifactId>
		            <version>${spark.version}</version>
		            <exclusions>
		                <exclusion>
		                    <groupId>org.slf4j</groupId>
		                    <artifactId>slf4j-log4j12</artifactId>
		                </exclusion>
		                <exclusion>
		                    <groupId>com.google.code.gson</groupId>
		                    <artifactId>gson</artifactId>
		                </exclusion>
		               <exclusion>
		                   <groupId>com.fasterxml.jackson.module</groupId>
		                   <artifactId>jackson-module-scala_2.11</artifactId>
		               </exclusion>
		                <exclusion>
		                    <groupId>com.fasterxml.jackson.core</groupId>
		                    <artifactId>jackson-databind</artifactId>
		                </exclusion>
		            </exclusions>
		            <scope>provided</scope>
		        </dependency>

代码实现

1-SpringBoot 相关  启动类,controller,service:
@SpringBootApplication
public class WebApplication {

    public static void main(String[] args) {
        SpringApplication.run(WebApplication.class,args);
    }

}

@RestController
public class DemoController {

    @Autowired
    private DemoService demoService;

    @GetMapping("/write")
    public void demo(@RequestParam("path") String path, @RequestParam("outPut")String outPut){

        demoService.test(path,outPut);
    }

}
@Slf4j
@Service
public class DemoService {
    public void test(String path,String outPut){
        log.info("test~~~");
        new Thread(new Runnable() {
            @Override
            public void run() {
                ParquetUtils parquetUtils=new ParquetUtils();
                parquetUtils.write(path,outPut);
            }
        }).start();
    }
}

2-Spark相关  获取sparkSession
/**
 * @author dalizu on 2019/11/13.
 * @version v1.0
 * @desc 获取SparkSession
 */
public class SingleSpark {


    private static volatile SparkSession sparkSession=null;

    private SingleSpark(){}

    public static SparkSession getInstance(){
        if(sparkSession==null){

            synchronized (SingleSpark.class){

                if(sparkSession==null){
                    sparkSession=SparkSession.builder()
                            /*.master("local[5]")*/
                            .getOrCreate();
                }

            }

        }
        return sparkSession;
    }

}

3-最后在scala包里实现service中具体调用的逻辑这里简单读取本地文件,然后通过spark sql写入parquet文件。
class ParquetUtils extends Serializable{


  def write(path:String,outPut:String): Unit = {


    val sparkSession=SingleSpark.getInstance();
    val sc=sparkSession.sparkContext

    val textRdd=sc.textFile(path)

    val struct=StructType{
      Array(
        StructField("col01",StringType),
        StructField("col02",StringType)
      )
    }

    val df=sparkSession.createDataFrame(textRdd.map(line=>{
      val lines=line.split(",")
      Row(lines(0),lines(1))
    }),struct)


    df.select("*").show()

    df.write.parquet(outPut)

    val outDF = sparkSession.read.parquet(outPut)

    // Parquet files can also be used to create a temporary view and then used in SQL statements
    outDF.createOrReplaceTempView("parquetFile")
    val col01DF = sparkSession.sql("SELECT col01 FROM parquetFile")

    col01DF.show()

  }

}

打包测试

mvn clean package
1-首先准备一个hadoop环境和spark。
修改spark-env.sh
HADOOP_CONF_DIR=/home/hadoop/app/hadoop-2.6.0-cdh5.7.0

2-获取springBoot依赖的包
因为打包完成后lib下面会包含所有的包,包括spark相关,但是会引起很多jar包冲突的问题,所以我们需要排除spark相关的jar包。当在
服务器运行的时候,我们安装的spark已经提供了那些包,因此我们
找到jar包下的MANIFEST.MF文件,里面有我们打包完成后启动web项目的依赖Class-Path: 
web-spark-1.0-SNAPSHOT.jar\META-INF\MANIFEST.MF
Class-Path: lib/spring-boot-starter-web-2.0.2.RELEASE.jar ....

我们获取这些包,然后放到环境的lib目录下:
获取lib,不包含spark相关的
cp lib/spring-boot-starter-web-2.0.2.RELEASE.jar lib/spring-boot-starter-json-2.0.2.RELEASE.jar tmp
cp lib/jackson-datatype-jdk8-2.9.5.jar lib/jackson-datatype-jsr310-2.9.5.jar lib/jackson-module-parameter-names-2.9.5.jar lib/spring-boot-starter-tomcat-2.0.2.RELEASE.jar tmp
cp lib/tomcat-embed-core-8.5.31.jar lib/tomcat-embed-el-8.5.31.jar lib/tomcat-embed-websocket-8.5.31.jar lib/spring-web-5.0.6.RELEASE.jar tmp
cp lib/spring-beans-5.0.6.RELEASE.jar lib/spring-webmvc-5.0.6.RELEASE.jar lib/spring-aop-5.0.6.RELEASE.jar lib/spring-context-5.0.6.RELEASE.jar tmp
cp lib/spring-expression-5.0.6.RELEASE.jar lib/spring-boot-starter-2.0.2.RELEASE.jar lib/spring-boot-2.0.2.RELEASE.jar lib/spring-boot-starter-logging-2.0.2.RELEASE.jar tmp
cp lib/log4j-to-slf4j-2.10.0.jar lib/log4j-api-2.10.0.jar lib/javax.annotation-api-1.3.2.jar lib/spring-core-5.0.6.RELEASE.jar lib/spring-jcl-5.0.6.RELEASE.jar lib/snakeyaml-1.19.jar tmp
cp lib/spring-boot-autoconfigure-2.0.2.RELEASE.jar lib/slf4j-nop-1.7.2.jar tmp
cp lib/slf4j-api-1.7.2.jar lib/validation-api-1.1.0.Final.jar lib/hibernate-validator-5.2.4.Final.jar lib/jboss-logging-3.2.1.Final.jar tmp
cp lib/classmate-1.1.0.jar lib/scala-library-2.11.8.jar lib/paranamer-2.3.jar tmp
cp lib/jul-to-slf4j-1.7.16.jar lib/jackson-module-scala_2.11-2.9.5.jar tmp
cp lib/scala-reflect-2.11.11.jar lib/jackson-core-2.9.5.jar lib/jackson-annotations-2.9.5.jar lib/jackson-databind-2.9.5.jar lib/jackson-module-paranamer-2.9.5.jar tmp

mv tmp lib

提交到yarn

1-使用下面的命令提交到yarn运行:
spark-submit --master yarn \
--name web-spark \
--class com.demo.WebApplication \
--executor-memory 1G \
--num-executors 1 \
/home/hadoop/jars/web-spark-1.0-SNAPSHOT.jar

如下错误:
Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher

1.因为在spark-default.conf增加了配置
spark.yarn.jars=hdfs://hadoop003:8020/sparkjars/*
所以需要把相关的包上传上去
cd /home/hadoop/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/jars
hdfs dfs -put * hdfs://hadoop003:8020/sparkjars/

再次提交
spark-submit --master yarn \
--name web-spark \
--jars $(echo /home/hadoop/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/jars/*.jar | tr ' ' ',') \
--class com.demo.WebApplication \
--executor-memory 1G \
--num-executors 1 \
/home/hadoop/jars/web-spark-1.0-SNAPSHOT.jar

通过controller提供的接口访问:

结果:

+-----+-----+
|col01|col02|
+-----+-----+
|    a|    g|
|    b|    d|
|    c|    f|
+-----+-----+


+-----+
|col01|
+-----+
|    a|
|    b|
|    c|
+-----+

http://192.168.76.142:8088/cluster/scheduler

完整实现

完整实现请参考:
https://github.com/lizu18xz/web-spark
0人推荐
随时随地看视频
慕课网APP