手记

Spark统计采集的JSON日志数据并提交至YARN

在许多运维监控平台或电商平台里面,由于客户方面不允许我们直接连接客户机器的数据库只给我们提供一个日志文件,那这时候如何根据这个日志文件快速筛选出数据分析师想要的数据呢?
在这里我推荐是使用Spark的DS/DF功能,查阅百度百科以后我们知道,他的定义如下:

A DataFrame is a Dataset organized into named columns
    以列(列名、列类型、列值)的形式构成的分布式数据集
          id   int    10
          name string pk
DF和DS本质上的区别:
DF是弱类型,DS是强类型,因此如果出现读取字段错误的问题,前者要在运行时才报错,后者还在写代码编译阶段就会报错

因此对于一个有规则的数据集,我们可以采用这种方式将数据虚拟成一张表,然后最终使用SQL落地给分析师用户
话不多说,直接上代码:

  1. 首先我们需要在IDEA搭建一个maven工程,导入Spark相关的依赖
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.tools.version>2.11</scala.tools.version>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.4.3</spark.version>
        <hadoop.version>2.6.0-cdh5.15.1</hadoop.version>
    </properties>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <!--Spark SQL依赖-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive-thriftserver_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
		<!-- Hadoop相关依赖-->
		<dependency>
		    <groupId>org.apache.hadoop</groupId>
		    <artifactId>hadoop-client</artifactId>
		    <version>${hadoop.version}</version>
		</dependency>
  1. 新建一个scala文件
import org.apache.spark.sql.{DataFrame, SparkSession}

object DataFrameAPIApp {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().master("local").appName("DataFrameAPIApp").getOrCreate()
    import spark.implicits._```
    val zips: DataFrame = spark.read.json("您本地的json文件目录")
    zips.printSchema()  // 查看schema信息
    zips.createOrReplaceTempView("zips")
    // 以下是执行sql
    spark.sql("select _id,city,pop,state from zips where state='CA' order by pop desc limit 10").show()


    spark.stop()
  }
}
  1. 提交到YARN上或本地执行
结尾的hdfs地址可以是本地的文件路径地址
./spark-submit \
--class  com.imooc.bigdata.chapter02.SparkWordCountAppV2 \
--master yarn \
--name SparkWordCountAppV2 \
/home/hadoop/lib/sparksql-train-1.0.jar \
hdfs://hadoop000:8020/pk/wc.data hdfs://hadoop000:8020/pk/out
1人推荐
随时随地看视频
慕课网APP