Spark Standalone 集群搭建
Apache Spark 是一个围绕速度、易用性和复杂分析构建的大数据处理框架,最初在 2009 年由加州大学伯克利分校的 AMPLab 开发,并于 2010 年成为 Apache 的开源项目之一,与 Hadoop 和 Storm 等其他大数据技术相比,Spark有如下优势:
- Spark 提供了一个全面、统一的框架用于管理各种有着不同性质(文本数据、图表数据等)的数据集和数据源(批量数据或实时的流数据)的大数据处理的需求。
- 官方资料介绍 Spark 可以将 Hadoop 集群中的应用在内存中的运行速度提升 100 倍,甚至能够将应用在磁盘上的运行速度提升 10 倍。
流程图及特点
流程图
- 构建 Spark Application 的运行环境,启动 SparkContext;
- SparkContext 向资源管理器(可以是 Standalone,Mesos,Yarn)申请运行 Executor 资源,并启动 StandaloneExecutorbackend;
- Executor 向 SparkContext 申请 Task;
- SparkContext 将应用程序分发给 Executor;
- SparkContext 构建城 DAG 图,将 DAG 图分解城 Stage、将 Taskset 发送给 Task Scheduler,最后由 Task Scheduler 将 Task 发送给 Executor 运行;
- Task 在 Executor 上运行,运行完释放所有资源。
特点
- 每个 Application 获取专属的 Executor 进程,该进程在 Application 期间一直驻留,并以多线程方式运行 Task。这种 Application 隔离机制是有优势的,无论是从调度角度看(每个 Driver 调度他自己的任务),还是从运行角度看(来自不同 Application 的 Task 运行在不同 JVM 中),当然这样意味着 Spark Application 不能跨应用程序共享数据,除非将数据写入外部存储系统。
- Spark 与资源管理器无关,只要能够获取 Executor 进程,并能保持相互通信就可以了。
- 提交 SparkContext的Client 应该靠近 Worker 节点(运行 Executor 的节点),最好是在同一个 Rack 里,因为 Spark Application 运行过程中 SparkContext 和 Executor 之间有大量的信息交换。
- Task 采用了数据本地性和推测执行的优化机制。
Spark 安装
上传解压(『caroly01』)
tar xf spark-2.3.1-bin-hadoop2.6.tgz -C /opt/caroly cd /opt/caroly mv spark-2.3.1-bin-hadoop2.6 spark-2.3.1
修改配置文件(『caroly01』)
cd /opt/caroly/spark-2.3.1/conf cp slaves.template slaves vi slaves
change 19: 修改 19 行:配置从节点 caroly02 caroly03
cp spark-env.sh.template spark-env.sh vi spark-env.sh
add 58: 在 58 行增加:指定 master 节点、提交任务端口、每台 worker 核数、每台 worker 可支配的内存、配置 Master - HA export SPARK_MASTER_HOST=caroly01 export SPARK_MASTER_PORT=7077 export SPARK_WORKER_CORES=2 export SPARK_WORKER_MEMORY=3g export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=caroly02:2181,caroly03:2181,caroly04:2181 -Dspark.deploy.zookeeper.dir=/MasterHA1114"
vi /opt/caroly/spark-2.3.1/sbin/spark-config.sh add 34: export JAVA_HOME=/usr/java/jdk1.8.0_251-amd64
为了方便使用 WebUI,可以进行以下配置
cd /opt/caroly/spark-2.3.1/conf cp spark-defaults.conf.template spark-defaults.conf vi spark-defaults.conf
add 28: 在 28 行增加:增加配置 spark.eventLog.enabled true spark.eventLog.dir hdfs://caroly01:8020/spark/data/log spark.history.fs.logDirectory hdfs://caroly01:8020/spark/data/log spark.eventLog.compress true
hdfs dfs -mkdir -p /spark/data/log
分发到其他节点(『caroly01』)
cd /opt/caroly/ scp -r spark-2.3.1/ caroly02:`pwd` scp -r spark-2.3.1/ caroly03:`pwd`
设置从节点(『caroly02』)
cd /opt/caroly/spark-2.3.1/conf vi spark-env.sh
change 58: 修改 58 行 export SPARK_MASTER_HOST=caroly02
启动(『caroly01』)
cd /opt/caroly/spark-2.3.1/sbin/ ./start-all.sh
启动从节点(『caroly02』)
cd /opt/caroly/spark-2.3.1/sbin/ ./start-master.sh
访问
http://caroly01:8080/
基于 Standalone 和 Yarn 任务提交
在每台节点中的
yarn-site.xml
中配置关闭虚拟内存检查<property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property>
任务提交
./spark-submit --master spark://caroly01:7077 --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100
基于 Standalone client 和 cluster 任务提交
Standalone-client:
命令:
./spark-submit --master spark://caroly01:7077 --deploy-mode client --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100
过程:
- 集群启动,Worker 向 Master 提交资源。
- 在客户端提交 Spark Application,Driver 首先会在客户端启动。
- 客户端向 Master 申请资源(当前 class 任务里的资源),Master 会找到满足资源的节点,启动 Executor。
- Executor 启动之后,会反向注册给 Driver。Driver 发送 task,监控 task 执行,回收结果。
特点:Spark 基于 standalone-client 模式提交任务,每个 Spark Application 都有自己的独立的 Driver,如果在客户端提交 100 个 Application,会有 100 个 Driver 进程在客户端启动,Driver 负责发送 task,监控 task 执行,回收结果,很容易造成客户端网卡流量激增问题。这种模式适用于程序测试,不适用于生产环境。在客户端可以看到 task 的执行和结果。
Standalone-cluster
命令:
./spark-submit --master spark://caroly01:7077 --deploy-mode cluster --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100
过程:
- 集群启动,Worker 向 Master 提交资源。
- 在客户端提交 Spark Application,首先客户端会向 Master 申请启动 Driver,Master 收到请求之后,会随机找到一台满足资源的 Worker 节点启动 Driver。
- Driver 启动后向 Master 申请资源,Master 找到满足资源的节点,启动 Executor,
- Executor 启动之后,会反向注册给 Driver。Driver 发送 task,监控 task 执行,回收结果。
特点:Spark 基于 Standalone-cluster 模式提交任务,当在客户端提交多个 Application 时,Driver 是随机在某些 Worker 节点启动,客户端就没有网卡流量激增问题,将这种问题分散到集群中。在客户端看不到 task 执行和结果。这种模式适用于生产环境。
基于 Yarn-client 模式提交任务
命令:
./spark-submit --master yarn --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100
./spark-submit --master yarn-client --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100
./spark-submit --master yarn --deploy-mode client --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100
过程:
- 集群启动,NodeManager 向 ResourceManager 提交资源。
- 在客户端提交 Spark Application,首先会在客户端启动 Driver。
- 客户端向 RS 申请启动 ApplicationMaster(AM),RS 收到请求之后,随机找到一台 NM 节点启动 AM。
- AM 启动之后,会向 RS 申请资源,用于启动 Executor。
- RS 返回一批 NM,AM 连接这些 NM 启动 Executor。
- Executor 启动之后,会反向注册给 Driver。Driver 发送 task,监控 task 执行,回收结果。
特点:Spark 基于 Yarn-client 模式提交任务,当在 Driver 提交多个 Application 时,会有网卡流量激增问题,这种模式适用于程序测试,不适用于生产环境。在客户端可以看到 task 的执行和结果。
问题记录
Exception in thread "main" java.lang.Exception: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.
at org.apache.spark.deploy.SparkSubmitArguments.validateSubmitArguments(SparkSubmitArguments.scala:288)
at org.apache.spark.deploy.SparkSubmitArguments.validateArguments(SparkSubmitArguments.scala:248)
at org.apache.spark.deploy.SparkSubmitArguments.<init>(SparkSubmitArguments.scala:120)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:130)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
解决方法:在命令行中执行如下指令:
export HADOOP_CONF_DIR=/opt/caroly/hadoop-2.9.2/etc/hadoop/
基于 Yarn-cluster 模式提交任务
命令:
./spark-submit --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100
./spark-submit --master yarn-cluster --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100
过程:
- 集群启动,NodeManager 向 ResourceManager 提交资源。
- 在客户端提交 Spark Application,首先客户端会向 RS 申请启动 AM,RS 收到请求之后,随机在一台 NM 中启动 AM。
- AM 启动之后(Driver),会向 RS 申请资源,用于启动 Executor。
- RS 返回一批 NM,AM 连接这些 NM 启动 Executor
- Executor 启动之后,会反向注册给 AM(Driver),AM 发送 task,监控 task,回收结果。
特点:Spark 基于 Yarn-cluster 模式提交任务,当有多个 Application 提交时,每个 Application 的 Driver(AM)是分散到集群中的 NM 中启动,没有客户端的网卡流量激增问题。将这种问题分散到集群中。在客户端看不到 task 的执行和结果,要去 webui 中查看。这种模式适用于生产环境。
SparkStreaming + Kafka Receiver 模式
该模式处理数据采用了 receiver 接收器的模式,需要一个 task 一直处于占用接收数据,接收来的数据存储级别:MEMORY_AND_DISK_SER_2,这种模式几乎没有用的。
存在丢失数据的问题:
当接收完消息后,更新完 zookeeper offset 后,如果 Driver 挂掉,Driver 下的 Executor 也会被 Killed,在 Executor 内存中的数据多少会有丢失。
如何解决丢失数据问题:
开启 WAL(Write Ahead Log),预写日志机制。当 Executor 备份完数据之后,向 HDFS 中也备份一份数据,备份完成之后,再去更新消费者 offset。如果开启 WAL 机制,可以将接收来的数据存储级别降级,例如:MEMORY_AND_DISK_SER。开启 WAL 机制要设置 checkpoint。
开启 WAL 机制,带来了新的问题:
必须数据备份到 HDFS 完成之后,才会更新 offset,下一步才会汇报数据位置,再发 task 处理数据,会造成数据处理的延迟加大。
Receiver 模式的并行度:【每一批次生成的 DStream 中的 RDD 的分区数】
spark.streaming.blockInterval= 200ms
在 batchInterval 内每隔 200ms,将接收来的数据封装到一个 block 中,batchInterval 时间内生成的这些 block 组成了当前这个 batch。假设 batchInterval = 5s,5s 内生成的一个 batch 中就有 25 个 block。RDD->batch,RDD->partition, batch->block,这里每一个 block 就是对应 RDD 中的一个个的 partition。
提高 RDD 的并行度:
当在 batchInterval 时间一定情况下,减少 spark.streaming.blockInterval 值,建议这个值不要低于 50ms。
缺点:
- 存在丢失数据问题,不常用。
- 就算开启 WAL 机制解决了丢失数据问题,带来了新的问题,数据处理延迟大。
- receiver 模式底层消费 Kafka,采用的是 High Level Consumer API 实现,不关心消费者 offset,无法从每批次中获取消费者 offset 和指定从某个 offset 继续消费数据。
- Receiver 模式采用 zookeeper 来维护消费者 offset。
SparkStreaming + Kafka Direct 模式
不需要一个 task 一直接收数据,当前批次处理数据时,直接读取数据处理。Direct 模式并行度与读取的 topic 中 partition 的个数一对一。Direct 模式使用 Spark 自己来维护消费者 offset,默认 offset 存储在内存中,如果设置了 checkpoint,在 checkpoint 中也有一份。Direct 模式可以做到手动维护消费者 offset。
提高并行度:
- 增大读取的 topic 的 partition 个数
- 读取过来 DStrem 之后,可以重新分区。
Direct 模式相对于 Receiver 模式:
- 简化了并行度。默认的并行度与读取的 Kafka 中 topic 的 partition 个数一对一。
- Receiver 模式采用 zookeeper 来维护消费者 offset,Direct 模式使用 Spark 自己来维护消费者 offset。
- Receiver 模式采用消费 Kafka 的 High Level Consumer API 实现,Direct 模式采用的是读取 Kafka 的 Simple Consumer API 可以做到手动维护 offset。