Hive 优化及高可用
核心思想:把 Hive SQL 当作 MapReduce 程序去优化。
以下 SQL 不会转为 MapReduce 来执行:
- select 仅查询本表字段。
- where仅对本表字段做条件过滤。
Hive 优化
# 对简单的 不需要聚合的 类似 select <col> from <table> LIMIT n 语句,不需要起 MR Job,直接通过 Fetch task 获取数据
set hive.fetch.task.conversion=more;
# 开启本地模式
set hive.exec.mode.local.auto=true;
# 表示加载文件的最大值,默认为 128M,若大于该配置仍会以集群方式来运行
hive.exec.mode.local.auto.inputbytes.max
# 开启并行模式
set hive.exec.parallel=true;
# 一次 SQL 计算中允许并行执行的 job 个数的最大值,默认值为 8
hive.exec.parallel.thread.number
# 开启严格模式,默认为 nonstrict 非严格模式
set hive.mapred.mode=strict;
Hive 排序
- Order By:对于查询结果做全排序,只允许有一个 reduce 处理。(当数据量较大时,应慎用。严格模式下,必须结合 limit 来使用)
- Sort By:对于单个 reduce 的数据进行排序。
- Distribute By:分区排序,经常和 Sort By 结合使用。
- Cluster By:相当于 Sort By + Distribute By。(Cluster By 不能通过 asc、desc的方式指定排序规则;可通过 distribute by column sort by column asc | desc 的方式)
Hive Join
启用自动的 Map join:
# 该参数为true时,Hive自动对左边的表统计量,如果是小表就加入内存,即对小表使用 Map join
set hive.auto.convert.join = true;
# 大表小表判断的阈值,如果表的大小小于该值则会被加载到内存中运行,默认为 25M
hive.mapjoin.smalltable.filesize;
# 默认值:true;是否忽略 mapjoin hint 即 mapjoin 标记
hive.ignore.mapjoin.hint;
# 默认值:true;将普通的 join 转化为普通的 mapjoin 时,是否将多个 mapjoin 转化为一个 mapjoin
hive.auto.convert.join.noconditionaltask;
# 将多个 mapjoin 转化为一个 mapjoin 时,其表的最大值,默认为 10M
hive.auto.convert.join.noconditionaltask.size;
- Hive Join:今可能使用相同的连接键(会转化为一个 MapReduce 作业)
- 大表 join 大表:
- 空 key 过滤:有时 join 超时是因为某些 key 对应的数据太多,而相同 key 对应的数据都会发送到相同的 reduce 上,从而导致内存不够。此时我们应该仔细分析这些异常的 key,很多情况下,这些 key 对应的数据是异常数据,我们需要在 SQL 语句中进行过滤。
- 空 key 转换:有时虽然某个 key 为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在 join 的结果中,此时我们可以将表 a 中 key 为空的字段赋一个随机的值,使得数据随机均匀地分布到不同的 reduce 上。
Map-Side 聚合
开启在 Map 端的聚合:
# 将顶层的聚合操作放在 Map 阶段执行,从而减轻清洗阶段数据传输和 Reduce 阶段的执行时间,提升总体性能。该设置会消耗更多的内存。
set hive.map.aggr=true;
# map端group by执行聚合时处理的多少行数据(默认:100000)
hive.groupby.mapaggr.checkinterval;
# 进行聚合的最小比例(预先对100000条数据做聚合,若聚合之后的数据量/100000的值大于该配置0.5,则不会聚合)
hive.map.aggr.hash.min.reduction;
# map 端聚合使用的内存的最大值
hive.map.aggr.hash.percentmemory;
# map 端做聚合操作是 hash 表的最大可用内容,大于该值则会触发 flush
hive.map.aggr.hash.force.flush.memory.threshold;
# 是否对 Group By 产生的数据倾斜做优化,默认为 false
hive.groupby.skewindata;
合并小文件
- 文件数目小,容易在文件存储端造成压力,给 hdfs 造成压力,影响效率。
- 设置合并属性:
- 是否合并 map 输出文件:hive.merge.mapfiles=true;
- 是否合并 reduce 输出文件:hive.merge.mapredfiles=true;
- 合并文件的大小:hive.merge.size.per.task=25610001000;
- 去重统计:数据量小的时候无所谓,数据量大的情况下,由于 count distinct 操作需要用一个 Reduce Task 来完成,这一个 Reduce 需要处理的数据量太大,就会导致整个 Job 很难完成,一般 count distinct 使用先 group by 再 count 的方式替换。
控制 Hive 中 Map 以及 Reduce 的数量
- Map 数量相关的参数:
- mapred.max.split.size:一个split的最大值,即每个map处理文件的最大值,默认为 256M
- mapred.min.split.size.per.node:一个节点上split的最小值,默认为 1
- mapred.min.split.size.per.rack:一个机架上split的最小值,默认为 1
- Reduce 数量相关的参数:
- mapred.reduce.tasks:强制指定 reduce 任务的数量。默认为 1
- hive.exec.reducers.bytes.per.reducer:每个 reduce 任务处理的数据量。默认为 256M
- hive.exec.reducers.max:每个任务最大的 reduce 数。默认为 1009
Hive JVM 重用
- 适用场景:
- 小文件个数过多
- task 个数过多
- 通过 set mapred.job.reuse.jvm.num.tasks=n; 来设置
- n 为 task 插槽个数
- 缺点:设置开启后,task 插槽会一直占用资源,不论是否有 task 运行,直到所有的 task 即整个 job 全部执行完成时,才会释放所有的 task 插槽资源。
Hive 高可用
环境如下:
| - | caroly01 | caroly02 | caroly03 | caroly04 |
| ———– | ——– | ——– | ——– | ——– |
| Zookeeper | | √ | √ | √ |
| Hiveserver2 | | √ | | √ |
| beeline | | | √ | |
在『caroly02』的hist-site.xml
中增加如下节点:
<property>
<name>hive.server2.support.dynamic.service.discovery</name>
<value>true</value>
</property>
<property>
<name>hive.server2.zookeeper.namespace</name>
<value>hiveserver2_zk</value>
</property>
<property>
<name>hive.zookeeper.quorum</name>
<value>caroly02:2181,caroly03:2181,caroly04:2181</value>
</property>
<property>
<name>hive.zookeeper.client.port</name>
<value>2181</value>
</property>
<property>
<name>hive.server2.thrift.bind.host</name>
<value>caroly02</value>
</property>
<property>
<name>hive.server2.thrift.port</name>
<value>10001</value>
</property>
在『caroly04』的hist-site.xml
中增加如下节点:
<property>
<name>hive.server2.support.dynamic.service.discovery</name>
<value>true</value>
</property>
<property>
<name>hive.server2.zookeeper.namespace</name>
<value>hiveserver2_zk</value>
</property>
<property>
<name>hive.zookeeper.quorum</name>
<value>caroly02:2181,caroly03:2181,caroly04:2181</value>
</property>
<property>
<name>hive.zookeeper.client.port</name>
<value>2181</value>
</property>
<property>
<name>hive.server2.thrift.bind.host</name>
<value>caroly04</value>
</property>
<property>
<name>hive.server2.thrift.port</name>
<value>10001</value>
</property>
访问
beeline
!connect jdbc:hive2://caroly02,caroly03,caroly04/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2_zk root 123
jdbc
public class HiveJdbcClient2 {
private static String driverName = "org.apache.hive.jdbc.HiveDriver";
public static void main(String[] args) throws SQLException {
try {
Class.forName(driverName);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
Connection conn = DriverManager.getConnection("jdbc:hive2://caroly02,caroly03,caroly04/default;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2_zk", "root", "");
Statement stmt = conn.createStatement();
String sql = "select * from psn";
ResultSet res = stmt.executeQuery(sql);
while (res.next()) {
System.out.println(res.getString(1));
}
}
}
压缩
开启『map』输出阶段压缩可以减少『job』中『map』和『Reduce task』间数据传输量。
# 开启hive中间传输数据压缩功能
set hive.exec.compress.intermediate=true;
# 开启mapreduce中map输出压缩功能
set mapreduce.map.output.compress=true;
# 设置mapreduce中map输出数据的压缩方式
set mapreduce.map.output.compress.codec= org.apache.hadoop.io.compress.SnappyCodec;
当『Hive』将输出写入到表中时,输出内容同样可以进行压缩。属性『hive.exec.compress.output』控制着这个功能。用户可能需要保持默认设置文件中的默认值false
,这样默认的输出就是非压缩的纯文本文件了。用户可以通过在查询语句或执行脚本中设置这个值为true
,来开启输出结果压缩功能。
# 开启hive最终输出数据压缩功能
set hive.exec.compress.output=true;
# 开启mapreduce最终输出数据压缩
set mapreduce.output.fileoutputformat.compress=true;
# 设置mapreduce最终数据输出压缩方式
set mapreduce.output.fileoutputformat.compress.codec = org.apache.hadoop.io.compress.SnappyCodec;
# 设置mapreduce最终数据输出压缩为块压缩
set mapreduce.output.fileoutputformat.compress.type=BLOCK;
# 测试一下输出结果是否是压缩文件
insert overwrite local directory '/root/data' select * from aaaa;
文件存储
- 行存储:查询满足条件的一整行数据的时候,列存储则需要去每个聚集的字段找到对应的每个列的值,行存储只需要找到其中一个值,其余的值都在相邻地方,所以此时行存储查询的速度更快。例如:TEXTFILE 和SEQUENCEFILE。
- TEXTFILE:默认格式,数据不做压缩,磁盘开销大,数据解析开销大。可结合 Gzip、Bzip2 使用(系统自动检查,执行查询时自动解压),但使用这种方式,hive 不会对数据进行切分,从而无法对数据进行并行操作。
- 列存储:因为每个字段的数据聚集存储,在查询只需要少数几个字段的时候,能大大减少读取的数据量;每个字段的数据类型一定是相同的,列式存储可以针对性的设计更好的设计压缩算法。例如:ORC 和 PARQUET。
- ORC:Orc (Optimized Row Columnar)是 hive 0.11 版里引入的新的存储格式。可以看到每个 Orc 文件由 1 个或多个 stripe 组成,每个 stripe 250MB 大小,这个 Stripe 实际相当于 RowGroup 概念,不过大小由 4MB->250MB,这样应该能提升顺序读的吞吐率。每个 Stripe 里有三部分组成,分别是 Index Data, Row Data, Stripe Footer。
- Index Data:一个轻量级的 index,默认是每隔 1W 行做一个索引。这里做的索引应该只是记录某行的各字段在 Row Data 中的 offset。
- Row Data:存的是具体的数据,先取部分行,然后对这些行按列进行存储。对每个列进行了编码,分成多个 Stream 来存储。
- Stripe Footer:存的是各个 Stream 的类型,长度等信息。每个文件有一个 File Footer,这里面存的是每个 Stripe 的行数,每个 Column 的数据类型信息等;每个文件的尾部是一个 PostScript,这里面记录了整个文件的压缩类型以及 FileFooter 的长度信息等。在读取文件时,会 seek 到文件尾部读 PostScript,从里面解析到 File Footer 长度,再读 FileFooter,从里面解析到各个 Stripe 信息,再读各个 Stripe,即从后往前读。
- PARQUET:Parquet 是面向分析型业务的列式存储格式。Parquet 文件是以二进制方式存储的,所以是不可以直接读取的,文件中包括该文件的数据和元数据,因此 Parquet 格式文件是自解析的。通常情况下,在存储 Parquet 数据的时候会按照 Block 大小设置行组的大小,由于一般情况下每一个 Mapper 任务处理数据的最小单位是一个 Block,这样可以把每一个行组由一个 Mapper 任务处理,增大任务执行并行度。