Spark Standalone 集群搭建

Apache Spark 是一个围绕速度、易用性和复杂分析构建的大数据处理框架,最初在 2009 年由加州大学伯克利分校的 AMPLab 开发,并于 2010 年成为 Apache 的开源项目之一,与 Hadoop 和 Storm 等其他大数据技术相比,Spark有如下优势:

  • Spark 提供了一个全面、统一的框架用于管理各种有着不同性质(文本数据、图表数据等)的数据集和数据源(批量数据或实时的流数据)的大数据处理的需求。
  • 官方资料介绍 Spark 可以将 Hadoop 集群中的应用在内存中的运行速度提升 100 倍,甚至能够将应用在磁盘上的运行速度提升 10 倍。

流程图及特点

流程图

  1. 构建 Spark Application 的运行环境,启动 SparkContext;
  2. SparkContext 向资源管理器(可以是 Standalone,Mesos,Yarn)申请运行 Executor 资源,并启动 StandaloneExecutorbackend;
  3. Executor 向 SparkContext 申请 Task;
  4. SparkContext 将应用程序分发给 Executor;
  5. SparkContext 构建城 DAG 图,将 DAG 图分解城 Stage、将 Taskset 发送给 Task Scheduler,最后由 Task Scheduler 将 Task 发送给 Executor 运行;
  6. Task 在 Executor 上运行,运行完释放所有资源。
特点
  1. 每个 Application 获取专属的 Executor 进程,该进程在 Application 期间一直驻留,并以多线程方式运行 Task。这种 Application 隔离机制是有优势的,无论是从调度角度看(每个 Driver 调度他自己的任务),还是从运行角度看(来自不同 Application 的 Task 运行在不同 JVM 中),当然这样意味着 Spark Application 不能跨应用程序共享数据,除非将数据写入外部存储系统。
  2. Spark 与资源管理器无关,只要能够获取 Executor 进程,并能保持相互通信就可以了。
  3. 提交 SparkContext的Client 应该靠近 Worker 节点(运行 Executor 的节点),最好是在同一个 Rack 里,因为 Spark Application 运行过程中 SparkContext 和 Executor 之间有大量的信息交换。
  4. Task 采用了数据本地性和推测执行的优化机制。

Spark 安装

  • 上传解压(『caroly01』)

  • tar xf spark-2.3.1-bin-hadoop2.6.tgz -C /opt/caroly
    cd /opt/caroly
    mv spark-2.3.1-bin-hadoop2.6 spark-2.3.1
    

  • 修改配置文件(『caroly01』)

  • cd /opt/caroly/spark-2.3.1/conf
    cp slaves.template slaves
    vi slaves
    
    • change 19:        修改 19 行:配置从节点
      
      caroly02
      caroly03
      
  • cp spark-env.sh.template spark-env.sh
    vi spark-env.sh
    
    • add 58:       在 58 行增加:指定 master 节点、提交任务端口、每台 worker 核数、每台 worker 可支配的内存、配置 Master - HA
      
      export SPARK_MASTER_HOST=caroly01
      export SPARK_MASTER_PORT=7077
      export SPARK_WORKER_CORES=2
      export SPARK_WORKER_MEMORY=3g
      
      export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=caroly02:2181,caroly03:2181,caroly04:2181 -Dspark.deploy.zookeeper.dir=/MasterHA1114"
      
  • vi /opt/caroly/spark-2.3.1/sbin/spark-config.sh
    add 34:
    export JAVA_HOME=/usr/java/jdk1.8.0_251-amd64
    
  • 为了方便使用 WebUI,可以进行以下配置

  • cd /opt/caroly/spark-2.3.1/conf
    cp spark-defaults.conf.template spark-defaults.conf
    vi spark-defaults.conf
    
    • add 28:       在 28 行增加:增加配置
      
      spark.eventLog.enabled           true
      spark.eventLog.dir               hdfs://caroly01:8020/spark/data/log
      spark.history.fs.logDirectory    hdfs://caroly01:8020/spark/data/log
      spark.eventLog.compress            true
      
  • hdfs dfs -mkdir -p /spark/data/log
    

  • 分发到其他节点(『caroly01』)

  • cd /opt/caroly/
    scp -r spark-2.3.1/ caroly02:`pwd`
    scp -r spark-2.3.1/ caroly03:`pwd`
    

  • 设置从节点(『caroly02』)

  • cd /opt/caroly/spark-2.3.1/conf
    vi spark-env.sh
    
    • change 58:        修改 58 行
      
      export SPARK_MASTER_HOST=caroly02
      

  • 启动(『caroly01』)

  • cd /opt/caroly/spark-2.3.1/sbin/
    ./start-all.sh
    
  • 启动从节点(『caroly02』)

  • cd /opt/caroly/spark-2.3.1/sbin/
    ./start-master.sh
    

  • 访问

  • http://caroly01:8080/
    

基于 Standalone 和 Yarn 任务提交

  • 在每台节点中的 yarn-site.xml 中配置关闭虚拟内存检查

  • <property>  
        <name>yarn.nodemanager.vmem-check-enabled</name>  
        <value>false</value>  
    </property>
    

  • 任务提交

  • ./spark-submit --master spark://caroly01:7077 --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100
    

基于 Standalone client 和 cluster 任务提交

Standalone-client:

命令:

./spark-submit --master spark://caroly01:7077 --deploy-mode client --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100

过程

  1. 集群启动,Worker 向 Master 提交资源。
  2. 在客户端提交 Spark Application,Driver 首先会在客户端启动。
  3. 客户端向 Master 申请资源(当前 class 任务里的资源),Master 会找到满足资源的节点,启动 Executor。
  4. Executor 启动之后,会反向注册给 Driver。Driver 发送 task,监控 task 执行,回收结果。

特点:Spark 基于 standalone-client 模式提交任务,每个 Spark Application 都有自己的独立的 Driver,如果在客户端提交 100 个 Application,会有 100 个 Driver 进程在客户端启动,Driver 负责发送 task,监控 task 执行,回收结果,很容易造成客户端网卡流量激增问题。这种模式适用于程序测试,不适用于生产环境。在客户端可以看到 task 的执行和结果。


Standalone-cluster

命令:

./spark-submit --master spark://caroly01:7077 --deploy-mode cluster --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100

过程

  1. 集群启动,Worker 向 Master 提交资源。
  2. 在客户端提交 Spark Application,首先客户端会向 Master 申请启动 Driver,Master 收到请求之后,会随机找到一台满足资源的 Worker 节点启动 Driver。
  3. Driver 启动后向 Master 申请资源,Master 找到满足资源的节点,启动 Executor,
  4. Executor 启动之后,会反向注册给 Driver。Driver 发送 task,监控 task 执行,回收结果。

特点:Spark 基于 Standalone-cluster 模式提交任务,当在客户端提交多个 Application 时,Driver 是随机在某些 Worker 节点启动,客户端就没有网卡流量激增问题,将这种问题分散到集群中。在客户端看不到 task 执行和结果。这种模式适用于生产环境。


基于 Yarn-client 模式提交任务

命令:

./spark-submit --master yarn --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100
./spark-submit --master yarn-client --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100
./spark-submit --master yarn --deploy-mode client --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100

过程

  1. 集群启动,NodeManager 向 ResourceManager 提交资源。
  2. 在客户端提交 Spark Application,首先会在客户端启动 Driver。
  3. 客户端向 RS 申请启动 ApplicationMaster(AM),RS 收到请求之后,随机找到一台 NM 节点启动 AM。
  4. AM 启动之后,会向 RS 申请资源,用于启动 Executor。
  5. RS 返回一批 NM,AM 连接这些 NM 启动 Executor。
  6. Executor 启动之后,会反向注册给 Driver。Driver 发送 task,监控 task 执行,回收结果。

特点:Spark 基于 Yarn-client 模式提交任务,当在 Driver 提交多个 Application 时,会有网卡流量激增问题,这种模式适用于程序测试,不适用于生产环境。在客户端可以看到 task 的执行和结果。

问题记录
Exception in thread "main" java.lang.Exception: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.
        at org.apache.spark.deploy.SparkSubmitArguments.validateSubmitArguments(SparkSubmitArguments.scala:288)
        at org.apache.spark.deploy.SparkSubmitArguments.validateArguments(SparkSubmitArguments.scala:248)
        at org.apache.spark.deploy.SparkSubmitArguments.<init>(SparkSubmitArguments.scala:120)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:130)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

解决方法:在命令行中执行如下指令:

export HADOOP_CONF_DIR=/opt/caroly/hadoop-2.9.2/etc/hadoop/

基于 Yarn-cluster 模式提交任务

命令:

./spark-submit --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100
./spark-submit --master yarn-cluster --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100

过程

  1. 集群启动,NodeManager 向 ResourceManager 提交资源。
  2. 在客户端提交 Spark Application,首先客户端会向 RS 申请启动 AM,RS 收到请求之后,随机在一台 NM 中启动 AM。
  3. AM 启动之后(Driver),会向 RS 申请资源,用于启动 Executor。
  4. RS 返回一批 NM,AM 连接这些 NM 启动 Executor
  5. Executor 启动之后,会反向注册给 AM(Driver),AM 发送 task,监控 task,回收结果。

特点:Spark 基于 Yarn-cluster 模式提交任务,当有多个 Application 提交时,每个 Application 的 Driver(AM)是分散到集群中的 NM 中启动,没有客户端的网卡流量激增问题。将这种问题分散到集群中。在客户端看不到 task 的执行和结果,要去 webui 中查看。这种模式适用于生产环境。


SparkStreaming + Kafka Receiver 模式

该模式处理数据采用了 receiver 接收器的模式,需要一个 task 一直处于占用接收数据,接收来的数据存储级别:MEMORY_AND_DISK_SER_2,这种模式几乎没有用的

存在丢失数据的问题

当接收完消息后,更新完 zookeeper offset 后,如果 Driver 挂掉,Driver 下的 Executor 也会被 Killed,在 Executor 内存中的数据多少会有丢失。

如何解决丢失数据问题

开启 WAL(Write Ahead Log),预写日志机制。当 Executor 备份完数据之后,向 HDFS 中也备份一份数据,备份完成之后,再去更新消费者 offset。如果开启 WAL 机制,可以将接收来的数据存储级别降级,例如:MEMORY_AND_DISK_SER。开启 WAL 机制要设置 checkpoint。

开启 WAL 机制,带来了新的问题

必须数据备份到 HDFS 完成之后,才会更新 offset,下一步才会汇报数据位置,再发 task 处理数据,会造成数据处理的延迟加大。

Receiver 模式的并行度:【每一批次生成的 DStream 中的 RDD 的分区数】

spark.streaming.blockInterval= 200ms

在 batchInterval 内每隔 200ms,将接收来的数据封装到一个 block 中,batchInterval 时间内生成的这些 block 组成了当前这个 batch。假设 batchInterval = 5s,5s 内生成的一个 batch 中就有 25 个 block。RDD->batch,RDD->partition, batch->block,这里每一个 block 就是对应 RDD 中的一个个的 partition。

提高 RDD 的并行度

当在 batchInterval 时间一定情况下,减少 spark.streaming.blockInterval 值,建议这个值不要低于 50ms。

缺点

  1. 存在丢失数据问题,不常用。
  2. 就算开启 WAL 机制解决了丢失数据问题,带来了新的问题,数据处理延迟大。
  3. receiver 模式底层消费 Kafka,采用的是 High Level Consumer API 实现,不关心消费者 offset,无法从每批次中获取消费者 offset 和指定从某个 offset 继续消费数据。
  4. Receiver 模式采用 zookeeper 来维护消费者 offset。

SparkStreaming + Kafka Direct 模式

不需要一个 task 一直接收数据,当前批次处理数据时,直接读取数据处理。Direct 模式并行度与读取的 topic 中 partition 的个数一对一。Direct 模式使用 Spark 自己来维护消费者 offset,默认 offset 存储在内存中,如果设置了 checkpoint,在 checkpoint 中也有一份。Direct 模式可以做到手动维护消费者 offset。

提高并行度

  1. 增大读取的 topic 的 partition 个数
  2. 读取过来 DStrem 之后,可以重新分区。

Direct 模式相对于 Receiver 模式

  1. 简化了并行度。默认的并行度与读取的 Kafka 中 topic 的 partition 个数一对一。
  2. Receiver 模式采用 zookeeper 来维护消费者 offset,Direct 模式使用 Spark 自己来维护消费者 offset。
  3. Receiver 模式采用消费 Kafka 的 High Level Consumer API 实现,Direct 模式采用的是读取 Kafka 的 Simple Consumer API 可以做到手动维护 offset。