手记

Spark介绍及搭建

Spark简介

什么是Spark?

Apache Spark是用于大规模数据处理的快速(fast)和通用(general)引擎,由加州伯克利分校AMP(Algorithms、Machines and People Lab,在算法、机器和人之间通过大规模集成来展现大数据的应用平台)实验室开发的大数据处理框架。
Spark提供了大数据处理的一站式解决方案,以Spark Core为基础推出了Spark SQL、Spark Streaming、MLlib、GraphX、SparkR等组件。整个Spark生态体系称为BDAS,即:伯克利数据分析栈。

Spark特点

Spark具有运行速度快、易用性好、通用型强和随处运行的特点。

运行速度快(Speed)

如果Spark基于内存读取,速度是Hadoop的100倍;使用磁盘读取,也是Hadoop的十倍。spark之所以能够比Hadoop快,有两点主要原因:内存计算和引入DAG执行引擎。

  • Spark在迭代计算过程中数据默认是保存在内存中的,后续计算直接读取内存中的结果即可。而Hadoop每一步计算都是直接将结果存储在磁盘中,后续的计算从磁盘重新读取上次计算结果。基于内存读取数据的速度比磁盘读取的速度高出两个数量级,这就致使Spark比Hadoop快很多,尤其是迭代计算。

  • Spark在实际执行任务前,将计算步骤根据依赖关系形成DAG图(有向无环图),然后对DAG进行优化计算路径,大大减少了I/O读取操作。


    speed

易用性好(Ease of Use)

Spark支持Scala、Java、Python、R语言编写应用程序,并且提供了Scala、Python、R的交互式操作shell。


easy

通用型强(generality)

Spark提供了一站式的大数据解决方案,生态圈BADS包含了:提供内存计算框架的Spark Core、用于结构化查询的Spark SQL、用于实时计算的Spark Streaming、用于机器学习的MLlib和用于图计算的GraphX。


generality

随处运行(Runs Everywhere)

Spark提供了本地Local运行模式,用来学习和测试(当然还有许多用途,比如我们正在做的一个项目就是基于Local模式的)。对于集群部署模式,Spark能够以YARN、Mesos和自身提供的Standalone作为资源管理调度框架来执行作业。对于数据源,Spark能够读取HDFS、Cassandra、HBase、S3、Alluxio等数据源数据。


everywhere

Spark历史发展

Spark在2009年开始编写,在随后的四年里在伯克利AMP实验室逐渐形成了现有Spark的雏形。在2013年6月成为Apache孵化项目,8个月后成为Apache顶级项目,随后进入了快速发展。在2014年5月发布了1.0.0正式版本,在随后的时间里3到4个月发布一个小版本,在2016年7月推出了Spark2.0版本,到现在Spark稳定版为spark 2.2。


history

Spark生态系统

Spark BDAS以Spark Core分布式计算引擎为核心,在Spark Core之上扩展了用于结构化查询的Spark SQL、用于实时数据处理的Spark Streaming、用于机器学习的MLlib、用于图计算的GraphX和用于统计分析的SparkR等。


ecosystem

Spark Core

Spark Core是整个Spark生态体系的核心,是一个分布式大数据处理框架。Spark Core中定义了弹性分布式数据集RDD、资源管理、任务调度、任务执行、内存计算等核心功能。其中资源管理即可以使用Spark自身提供的Standalone资源管理器,也可以使用第三方资源调度框架YARN、Mesos、Kubernetes等,相比较而言,第三方资源调度框架提供了更细粒度的资源管理。

Spark SQL

Spark SQL的前身是Shark,当时Hive几乎是SQL on Hadoop(将SQL翻译成MapReduce作业)的唯一选择,鉴于Hive的性能以及Spark的兼容性,由此Shark而生。Shark就是Hive on Spark,将Hive的HQL翻译成Spark上的RDD,然后再通过Hive的Metadata获取数据库信息,最后交由Spark运算。
在2014年7月1日SparkSubmit峰会上,Databricks宣布终止Shark的开发,转而开发自己的Spark SQL。因为Shark本身是对Hive的改造,只是替换了Hive的物理执行引擎,使之能够快速运行。但是,Shark继承了大量的Hive代码,因此对于优化和维护增加了成本,并且Hive本身基于MapReduce设计的,而这部分设计成为了整个项目的瓶颈。
Spark SQL能够使SQL查询和Spark编程(RDD、Dataset、DataFrame)无缝混合,并且提供了统一访问外部数据源的方式,包括:Hive、Avro、Parquet、JSON、ORC和JDBC等。对于Hive的集成,Spark SQL本身支持HiveQL语法以及Hive SerDes和UDF,允许你访问已有的Hive仓库。


Hive support


Spark SQL还对商业智能工具(BI)提供了JDBC和ODBC的标准支持。


BI support


Spark SQL在性能上也比Shark有很大提升,Spark SQL主要做了以下几点优化:

  • 内存列存储(In-Memory Columnar Storage):Spark SQL的表数据在内存中存储不是使用原生态的JVM对象存储方式,而是采用内存列存储(就是我们说的缓存).

  • 字节码生成技术(Bytecode Generation):Spark 1.1.0在Catalyst模块的Expressions增加了Codegen模块,使用动态字节码生成技术,对匹配的表达式采用特定的代码动态编译。另外对SQL表达式的也做了GC优化。

  • Scala代码优化:Spark SQL在使用Scala编写代码的时候,尽量避免了一些低效、容易GC的代码。虽然增加了编写代码难度,但对于用户来说接口统一。

Spark Streaming

Spark Streaming可以轻松构建高吞吐、可扩展、高容错的流式应用程序。Spark Streaming可以支持多种数据源进行类似Map、Reduce、Join等复杂操作,然后将结果保存到外部系统、数据库或实时仪表盘上。


Spark Streaming

MLlib

MLlib是Apache Spark提供的可扩展机器学习库,提供了包括分类、聚类、回归、协同过滤等算法,同时也包括了相关测试和数据生成器。

GraphX

GraphX是用来操作图(比如社交网络的朋友圈)的程序库,可以进行并行的图计算。

其他

Spark生态体系还有许多框架比如SparkR、BlinkDB、Alluxio等,这些在之后的学习中可以在做总结分享。

Spark集群搭建

Spark部署可以分为Local、standalone、YARN、Mesos、Kubernetes(实验阶段)这几种部署模式。其中Local为本地部署模式,在单个机器上部署(也支持伪分布式),用来进行实验、测试等。其他几种部署模式都属于集群部署模式,standalone部署模式使用spark自带的资源管理器进行资源分配,YARN、Mesos、Kubernetes等使用的是第三方资源管理框架。
这里讲解是使用spark自带的standalone部署模式部署,我们先来看下spark集群相关的概述。

集群模式概述

Spark应用程序作为独立的进程在集群上运行,由主程序(main)的SparkContext来协调应用程序和集群。SparkContext能够连接多种集群管理器(standalone、YARN、Mesos等),一旦连接上集群管理器,spark就能够获取集群节点中的Executor。Executor是用来计算和存储应用程序的进程,由集群中的Worker节点启动。当得到该应用程序的Executor后,就会发送应用程序代码到Executor上,最后SparkContext向executor发送需要运行的task。


cluster


上面是集群应用程序运行的简单架构,对于这架构需要有几点说明:

  • 每个应用程序都有自己的Executor,每个Executor使用多个线程执行task。这样做有利于在调度端(每个驱动程序调度自己的task)和执行端(不同应用程序的任务运行在不同的JVM中)相互隔离应用程序。这样做也导致不同应用程序之间不能共享数据,但是你可以使用外部存储系统来达到这个目的。

  • spark对于底层的集群管理器是未知的,无论使用什么集群管理器,spark只要获取到Executor就能与其通信。这样集群管理器作为spark的一个插件,可以根据自己的需求使用不同的集群管理器。

  • 应用程序提交虽然可以使用客户端和服务器模式,但是无论什么模式都需要确保执行应用程序的节点能够与其它work节点通信,因为驱动任务调度器需要和executor通信。

Spark目前支持的集群管理器有:StandaloneApache MesosHadoop YARNKubernetes (experimental)

关键术语描述

术语描述
Application构建在spark上的用户程序,由集群上的驱动程序(driver program)和执行器(executor)组成。
Application jar包含spark应用程序的jar包,应用程序如果依赖其它应用,需要将这些依赖也打到这个jar包中。注意,spark和hadoop相关依赖不需要打进来,在运行时spark会自动添加。
Driver program运行应用程序的main()函数和创建SparkContext的进程。
Cluster manager用于获取资源的外部服务,可以是standalone、YARN、Mesos等。
Deploy model驱动程序运行的位置。cluster模式下驱动程序在集群内部的节点执行,client模式下驱动程序在用户提交的集群外部节点上运行。
Work node集群中运行应用程序代码的节点。
Executor为执行应用程序在Work节点上启动的进程,用于运行task并将数据保存到磁盘或内存中。每个应用程序都有自己的executors。
Task一个任务工作单元,发送给executor执行。
Job由Spark执行action RDD产生,由多个task组成用于并行计算。
Stage每个Job由DAGScheduler划分成多个Stage,每个Stage包含一组task。

standalone集群安装

安装standalone模式,只需要在集群中的每个节点安装Spark的编译版本,你可以直接下载预编译版本http://spark.apache.org/downloads.html,也可以下载源码自己构建http://spark.apache.org/docs/latest/building-spark.html

部署环境

节点角色
192.168.0.1Master、Work
192.168.0.2Work
192.168.0.3Work

集群节点

节点角色
192.168.0.1Master、Work
192.168.0.2Work
192.168.0.3Work
软件版本
JDK1.8+
Scala2.11+
Spark2.2.0

软件版本

软件版本
JDK1.8+
Scala2.11+
Spark2.2.0

spark下载

spark预编译版本和源代码都可以从http://spark.apache.org/downloads.html 进行下载,根据自己的需求选择相应版本,如果需要灵活控制对应的hadoop、hive版本可以自己编译源代码。

wget http://mirror.bit.edu.cn/apache/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
tar -zxvf spark-2.2.0-bin-hadoop2.7.tgz

编辑slave列表文件

在${SPARK_HOME}/conf目录创建 slaves文件(目录下已经有了slaves.template,可以直接复制或重命名),并且将需要运行Work进程节点的主机名称(或IP)添加到给文件中。

cp conf/slaves.template conf/slaves
vim conf/slaves#添加运行Work服务的主机192.168.0.1
192.168.0.2
192.168.0.3

配置集群

你可以对集群进一步配置,spark集群配置文件为conf/spark-env.sh,默认conf目录下有一个spark-env.sh.template,你可以直接复制或重命名。

cp conf/spark-env.sh.template conf/spark-env.sh
vim spark-env.sh#添加集群配置属性JAVA_HOME=${JAVA_HOME}          #指定jdk安装路径SPARK_MASTER_HOST=192.168.0.1   #指定master节点主机名称或IPSPARK_MASTER_PORT=7077          #master后台通信端口,默认为7077SPARK_MASTER_WEBUI_PORT=8080    #master web ui端口,默认为8080SPARK_MASTER_OPTS=              #应用于master的配置属性,可配置属性查看下面列表,使用-Dx=y模式配置,默认为noneSPARK_LOCAL_DIR=/opt/spark/work-data #spark临时目录,包含map输出文件和存储在磁盘上的RDD。该目录应该在本地快速的磁盘上,可以用逗号分割指定多个目录。默认为${SPARK_HOME}/workSPARK_WORKER_CORES=2            #允许spark应用程序使用本地机器的cpu数,默认为全部可用cpu数SPARK_WORKER_MEMORY=1G          #允许spark应用程序使用本地机器的内存数,默认全部最少为1G。如果要为应用程序指定内存,使用spark.executor.memory属性。SPARK_WORKER_PORT=7078          #work后台通信端口,默认随机SPARK_WORKER_WEBUI_PORT=8081    #work web ui端口,默认为8081SPARK_WORKER_DIR=/opt/spark/work-data #用于运行应用程序的目录,里面存储日志和临时空间使用。默认为${SPARK_HOME}/workSPARK_WORKER_OPTS=              #应用于work的配置属性,可配置属性查看下面列表,使用-Dx=y模式配置,默认为noneSPARK_DAEMON_MEMORY=1G          #分配给spark master和worker自身运行的内存。默认为1gSPARK_DAEMON_JAVA_OPTS=         #spark master和worker自身使用的JVM参数,使用-Dx=y模式配置

SPARK_MASTER_OPTS可配置属性:

属性默认值含义
spark.deploy.retainedApplications200显示完成应用程序的最大数量,超过该设置会把旧的应用程序会从ui中删除
spark.deploy.retainedDrivers200显示完成驱动程序的最大数量,超过该设置会把旧的驱动程序会从ui中删除
spark.deploy.spreadOuttruestandalone集群管理是否应该跨节点分布应用程序或者说是否将应用程序尽可能分布到少量节点上。对于HDFS中数据局部性来说,扩展通常更好,但是对于计算密集型的负载,整合到一起通常更有效。
spark.deploy.defaultCores无限如果没有设置spark.cores.max,则为应用程序提供最大的内核数为该默认值。如果都未设置,应用程序会尽可能的获取所有内核。对于共享集群来说,通常设置一个较低的值,防止用户默认使用全部cpu
spark.deploy.maxExecutorRetries10限制standalone集群管理器对于失败的executor个数达到配置上限则进行删除,如果不配置,只要有一个executor正在运行,应用程序就不会被删除
spark.worker.timeout60master在规定时间内没有接受work心跳则认为该work丢失,以s为单位

SPARK_WORKER_OPTS可以配置属性:

属性默认值含义
spark.worker.cleanup.enabledfalse启动定时清理woker/application目录,只对standalone部署模式有影响。只有应用程序停止后,才会定期删除
spark.worker.cleanup.interval1800 (30 minutes)定时清除worker下旧的应用程序工作目录,以s为单位
spark.worker.cleanup.appDataTtl604800 (7 days, 7 * 24 * 3600)每个woker下保留应用程序工作目录的秒数(跟上一个区别还不清楚)
spark.worker.ui.compressedLogFileLengthCacheSize100设置压缩日志文件缓存的大小

分发spark安装

将安装好的spark分发到各个slave节点。

scp -r spark-2.2.0-bin-hadoop2.7 yangjianzhang@192.168.0.2:/home/yangjianzhang/server/spark/scp -r spark-2.2.0-bin-hadoop2.7 yangjianzhang@192.168.0.3:/home/yangjianzhang/server/spark/

启动服务

spark启动脚本在${SPARK_HOME}/sbin目录下,包含了master和slave相关的启动、停止脚本。如果我们要启动所有slave(conf/slaves配置的),使用start-slaves.sh脚本。如果启动本机slave,则使用start-slave.sh <master-spark-URL>

sbin/start-master.sh #启动master服务sbin/start-slaves.sh #启动slave服务

访问UI页面

master UI地址为:http://master:8080/
worker UI地址为:http://worker:8081/



作者:零度沸腾_yjz
链接:https://www.jianshu.com/p/cdfa89e6ec70


0人推荐
随时随地看视频
慕课网APP