Hive 高可用
Hive 高可用
环境如下:
| - | caroly01 | caroly02 | caroly03 | caroly04 |
| ———– | ——– | ——– | ——– | ——– |
| Zookeeper | | √ | √ | √ |
| Hiveserver2 | | √ | | √ |
| beeline | | | √ | |
在『caroly02』的hist-site.xml
中增加如下节点:
<property>
<name>hive.server2.support.dynamic.service.discovery</name>
<value>true</value>
</property>
<property>
<name>hive.server2.zookeeper.namespace</name>
<value>hiveserver2_zk</value>
</property>
<property>
<name>hive.zookeeper.quorum</name>
<value>caroly02:2181,caroly03:2181,caroly04:2181</value>
</property>
<property>
<name>hive.zookeeper.client.port</name>
<value>2181</value>
</property>
<property>
<name>hive.server2.thrift.bind.host</name>
<value>caroly02</value>
</property>
<property>
<name>hive.server2.thrift.port</name>
<value>10001</value>
</property>
在『caroly04』的hist-site.xml
中增加如下节点:
<property>
<name>hive.server2.support.dynamic.service.discovery</name>
<value>true</value>
</property>
<property>
<name>hive.server2.zookeeper.namespace</name>
<value>hiveserver2_zk</value>
</property>
<property>
<name>hive.zookeeper.quorum</name>
<value>caroly02:2181,caroly03:2181,caroly04:2181</value>
</property>
<property>
<name>hive.zookeeper.client.port</name>
<value>2181</value>
</property>
<property>
<name>hive.server2.thrift.bind.host</name>
<value>caroly04</value>
</property>
<property>
<name>hive.server2.thrift.port</name>
<value>10001</value>
</property>
访问
beeline
!connect jdbc:hive2://caroly02,caroly03,caroly04/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2_zk root 123
jdbc
public class HiveJdbcClient2 {
private static String driverName = "org.apache.hive.jdbc.HiveDriver";
public static void main(String[] args) throws SQLException {
try {
Class.forName(driverName);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
Connection conn = DriverManager.getConnection("jdbc:hive2://caroly02,caroly03,caroly04/default;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2_zk", "root", "");
Statement stmt = conn.createStatement();
String sql = "select * from psn";
ResultSet res = stmt.executeQuery(sql);
while (res.next()) {
System.out.println(res.getString(1));
}
}
}
压缩
开启『map』输出阶段压缩可以减少『job』中『map』和『Reduce task』间数据传输量。
# 开启hive中间传输数据压缩功能
set hive.exec.compress.intermediate=true;
# 开启mapreduce中map输出压缩功能
set mapreduce.map.output.compress=true;
# 设置mapreduce中map输出数据的压缩方式
set mapreduce.map.output.compress.codec= org.apache.hadoop.io.compress.SnappyCodec;
当『Hive』将输出写入到表中时,输出内容同样可以进行压缩。属性『hive.exec.compress.output』控制着这个功能。用户可能需要保持默认设置文件中的默认值false
,这样默认的输出就是非压缩的纯文本文件了。用户可以通过在查询语句或执行脚本中设置这个值为true
,来开启输出结果压缩功能。
# 开启hive最终输出数据压缩功能
set hive.exec.compress.output=true;
# 开启mapreduce最终输出数据压缩
set mapreduce.output.fileoutputformat.compress=true;
# 设置mapreduce最终数据输出压缩方式
set mapreduce.output.fileoutputformat.compress.codec = org.apache.hadoop.io.compress.SnappyCodec;
# 设置mapreduce最终数据输出压缩为块压缩
set mapreduce.output.fileoutputformat.compress.type=BLOCK;
# 测试一下输出结果是否是压缩文件
insert overwrite local directory '/root/data' select * from aaaa;
文件存储
- 行存储:查询满足条件的一整行数据的时候,列存储则需要去每个聚集的字段找到对应的每个列的值,行存储只需要找到其中一个值,其余的值都在相邻地方,所以此时行存储查询的速度更快。例如:TEXTFILE 和SEQUENCEFILE。
- TEXTFILE:默认格式,数据不做压缩,磁盘开销大,数据解析开销大。可结合 Gzip、Bzip2 使用(系统自动检查,执行查询时自动解压),但使用这种方式,hive 不会对数据进行切分,从而无法对数据进行并行操作。
- 列存储:因为每个字段的数据聚集存储,在查询只需要少数几个字段的时候,能大大减少读取的数据量;每个字段的数据类型一定是相同的,列式存储可以针对性的设计更好的设计压缩算法。例如:ORC 和 PARQUET。
- ORC:Orc (Optimized Row Columnar)是 hive 0.11 版里引入的新的存储格式。可以看到每个 Orc 文件由 1 个或多个 stripe 组成,每个 stripe 250MB 大小,这个 Stripe 实际相当于 RowGroup 概念,不过大小由 4MB->250MB,这样应该能提升顺序读的吞吐率。每个 Stripe 里有三部分组成,分别是 Index Data, Row Data, Stripe Footer。
- Index Data:一个轻量级的 index,默认是每隔 1W 行做一个索引。这里做的索引应该只是记录某行的各字段在 Row Data 中的 offset。
- Row Data:存的是具体的数据,先取部分行,然后对这些行按列进行存储。对每个列进行了编码,分成多个 Stream 来存储。
- Stripe Footer:存的是各个 Stream 的类型,长度等信息。每个文件有一个 File Footer,这里面存的是每个 Stripe 的行数,每个 Column 的数据类型信息等;每个文件的尾部是一个 PostScript,这里面记录了整个文件的压缩类型以及 FileFooter 的长度信息等。在读取文件时,会 seek 到文件尾部读 PostScript,从里面解析到 File Footer 长度,再读 FileFooter,从里面解析到各个 Stripe 信息,再读各个 Stripe,即从后往前读。
- PARQUET:Parquet 是面向分析型业务的列式存储格式。Parquet 文件是以二进制方式存储的,所以是不可以直接读取的,文件中包括该文件的数据和元数据,因此 Parquet 格式文件是自解析的。通常情况下,在存储 Parquet 数据的时候会按照 Block 大小设置行组的大小,由于一般情况下每一个 Mapper 任务处理数据的最小单位是一个 Block,这样可以把每一个行组由一个 Mapper 任务处理,增大任务执行并行度。