红丝带网

五万字,Hive知识体系保姆级总结

因篇幅有限,获取本文完整的PDF文档(带目录),请私信我,发送:hive

一. Hive概览

1.1 hive的简介

Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供类SQL查询功能。

其本质是将SQL转换为MapReduce/Spark的任务进行运算,底层由HDFS来提供数据的存储,说白了hive可以理解为一个将SQL转换为MapReduce/Spark的任务的工具,甚至更进一步可以说hive就是一个MapReduce/Spark Sql的客户端

为什么要使用hive ?

主要的原因有以下几点:

  • 学习MapReduce的成本比较高, 项目周期要求太短, MapReduce如果要实现复杂的查询逻辑开发的难度是比较大的。
  • 而如果使用hive, hive采用操作接口类似SQL语法, 提高快速开发的能力. 避免去书写MapReduce,减少学习成本, 而且提供了功能的扩展

hive的特点:

  1. 可扩展 : Hive可以自由的扩展集群的规模,一般情况下不需要重启服务。
  2. 延展性 : Hive支持用户自定义函数,用户可以根据自己的需求来实现自己的函数。
  3. 容错 : 良好的容错性,节点出现问题SQL仍可完成执行。

1.2 hive的架构

基本组成:

用户接口:包括CLI、JDBC/ODBC、WebGUI。其中,CLI(command line interface)为shell命令行;JDBC/ODBC是Hive的JAVA实现,与传统数据库JDBC类似;WebGUI是通过浏览器访问Hive。

元数据存储:通常是存储在关系数据库如mysql/derby中。Hive 将元数据存储在数据库中。Hive 中的元数据包括表的名字,表的列和分区及其属性,表的属性(是否为外部表等),表的数据所在目录等。

解释器、编译器、优化器、执行器:完成HQL 查询语句从词法分析、语法分析、编译、优化以及查询计划的生成。生成的查询计划存储在HDFS 中,并在随后有MapReduce 调用执行。

1.3 hive与hadoop的关系

Hive利用HDFS存储数据,利用MapReduce查询分析数据

1.4 hive与传统数据库对比

​ hive主要是用于海量数据的离线数据分析

  1. 查询语言。由于 SQL 被广泛的应用在数据仓库中,因此,专门针对 Hive 的特性设计了类 SQL 的查询语言 HQL。熟悉 SQL 开发的开发者可以很方便的使用 Hive 进行开发。
  2. 数据存储位置。Hive 是建立在 Hadoop 之上的,所有 Hive 的数据都是存储在 HDFS 中的。而数据库则可以将数据保存在块设备或者本地文件系统中。
  3. 数据格式。Hive 中没有定义专门的数据格式,数据格式可以由用户指定,用户定义数据格式需要指定三个属性:列分隔符(通常为空格、”\t”、”\x001″)、行分隔符(”\n”)以及读取文件数据的方法(Hive 中默认有三个文件格式 TextFile,SequenceFile 以及 RCFile)。由于在加载数据的过程中,不需要从用户数据格式到 Hive 定义的数据格式的转换,因此,Hive 在加载的过程中不会对数据本身进行任何修改,而只是将数据内容复制或者移动到相应的 HDFS 目录中。而在数据库中,不同的数据库有不同的存储引擎,定义了自己的数据格式。所有数据都会按照一定的组织存储,因此,数据库加载数据的过程会比较耗时。
  4. 数据更新。由于 Hive 是针对数据仓库应用设计的,而数据仓库的内容是读多写少的。因此,Hive 中不支持对数据的改写和添加,所有的数据都是在加载的时候中确定好的。而数据库中的数据通常是需要经常进行修改的,因此可以使用 INSERT INTO ... VALUES 添加数据,使用 UPDATE ... SET 修改数据。
  5. 索引。之前已经说过,Hive 在加载数据的过程中不会对数据进行任何处理,甚至不会对数据进行扫描,因此也没有对数据中的某些 Key 建立索引。Hive 要访问数据中满足条件的特定值时,需要暴力扫描整个数据,因此访问延迟较高。由于 MapReduce 的引入, Hive 可以并行访问数据,因此即使没有索引,对于大数据量的访问,Hive 仍然可以体现出优势。数据库中,通常会针对一个或者几个列建立索引,因此对于少量的特定条件的数据的访问,数据库可以有很高的效率,较低的延迟。由于数据的访问延迟较高,决定了 Hive 不适合在线数据查询。
  6. 执行。Hive 中大多数查询的执行是通过 Hadoop 提供的 MapReduce 来实现的,而数据库通常有自己的执行引擎。
  7. 执行延迟。之前提到,Hive 在查询数据的时候,由于没有索引,需要扫描整个表,因此延迟较高。另外一个导致 Hive 执行延迟高的因素是 MapReduce 框架。由于 MapReduce 本身具有较高的延迟,因此在利用 MapReduce 执行 Hive 查询时,也会有较高的延迟。相对的,数据库的执行延迟较低。当然,这个低是有条件的,即数据规模较小,当数据规模大到超过数据库的处理能力的时候,Hive 的并行计算显然能体现出优势。
  8. 可扩展性。由于 Hive 是建立在 Hadoop 之上的,因此 Hive 的可扩展性是和 Hadoop 的可扩展性是一致的(世界上最大的 Hadoop 集群在 Yahoo!,2009年的规模在 4000 台节点左右)。而数据库由于 ACID 语义的严格限制,扩展行非常有限。目前最先进的并行数据库 Oracle 在理论上的扩展能力也只有 100 台左右。
  9. 数据规模。由于 Hive 建立在集群上并可以利用 MapReduce 进行并行计算,因此可以支持很大规模的数据;对应的,数据库可以支持的数据规模较小。

总结:hive具有sql数据库的外表,但应用场景完全不同,hive只适合用来做批量数据统计分析。

1.5 hive的数据存储

  1. Hive中所有的数据都存储在 HDFS 中,没有专门的数据存储格式(可支持Text,SequenceFile,ParquetFile,ORC格式RCFILE等)

SequenceFile是hadoop中的一种文件格式: 文件内容是以序列化的kv对象来组织的

  1. 只需要在创建表的时候告诉 Hive 数据中的列分隔符和行分隔符,Hive 就可以解析数据。
  2. Hive 中包含以下数据模型:DB、Table,External Table,Partition,Bucket。
  • db:在hdfs中表现为hive.metastore.warehouse.dir目录下一个文件夹。
  • table:在hdfs中表现所属db目录下一个文件夹。
  • external table:与table类似,不过其数据存放位置可以在任意指定路径。
  • partition:在hdfs中表现为table目录下的子目录。
  • bucket:在hdfs中表现为同一个表目录下根据hash散列之后的多个文件。

二、Hive表类型

2.1 Hive 数据类型

Hive的基本数据类型有:TINYINT,SAMLLINT,INT,BIGINT,BOOLEAN,FLOAT,DOUBLE,STRING,TIMESTAMP(V0.8.0+)和BINARY(V0.8.0+)

Hive的集合类型有:STRUCT,MAP和ARRAY

Hive主要有四种数据模型(即表):内部表、外部表、分区表和桶表。

表的元数据保存传统的数据库的表中,当前hive只支持Derby和MySQL数据库

2.2 Hive 内部表

Hive中的内部表和传统数据库中的表在概念上是类似的,Hive的每个表都有自己的存储目录,除了外部表外,所有的表数据都存放在配置在hive-site.xml文件的${hive.metastore.warehouse.dir}/table_name目录下。

创建内部表:

CREATE TABLE IF NOT EXISTS students(user_no INT,name STRING,sex STRING,  
         grade STRING COMMOT '班级')COMMONT '学生表'  
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ','
STORE AS TEXTFILE;      

2.3 Hive 外部表

被external修饰的为外部表(external table),外部表指向已经存在在Hadoop HDFS上的数据,除了在删除外部表时只删除元数据而不会删除表数据外,其他和内部表很像。

创建外部表:

CREATE EXTERNAL TABLE IF NOT EXISTS students(user_no INT,name STRING,sex STRING,  
         class STRING COMMOT '班级')COMMONT '学生表'  
ROW FORMAT DELIMITED  
FIELDS TERMINATED BY ','  
STORE AS SEQUENCEFILE 
LOCATION '/usr/test/data/students.txt';   

2.4 Hive 分区表

分区表的每一个分区都对应数据库中相应分区列的一个索引,但是其组织方式和传统的关系型数据库不同。在Hive中,分区表的每一个分区都对应表下的一个目录,所有的分区的数据都存储在对应的目录中。

比如说,分区表partitinTable有包含nation(国家)、ds(日期)和city(城市)3个分区,其中nation = china,ds = 20130506,city = Shanghai则对应HDFS上的目录为:

/datawarehouse/partitinTable/nation=china/city=Shanghai/ds=20130506/

分区中定义的变量名不能和表中的列相同

创建分区表:

CREATE TABLE IF NOT EXISTS students(user_no INT,name STRING,sex STRING,
         class STRING COMMOT '班级')COMMONT '学生表'  
PARTITIONED BY (ds STRING,country STRING)  
ROW FORMAT DELIMITED  
FIELDS TERMINATED BY ','  
STORE AS SEQUENCEFILE;

2.5 Hive 分桶表

桶表就是对指定列进行哈希(hash)计算,然后会根据hash值进行切分数据,将具有不同hash值的数据写到每个桶对应的文件中。

将数据按照指定的字段进行分成多个桶中去,说白了就是将数据按照字段进行划分,可以将数据按照字段划分到多个文件当中去。

创建分桶表:

CREATE TABLE IF NOT EXISTS students(user_no INT,name STRING,sex STRING,  
         class STRING COMMOT '班级',score SMALLINT COMMOT '总分')COMMONT '学生表'  
PARTITIONED BY (ds STRING,country STRING)  
CLUSTERED BY(user_no) SORTED BY(score) INTO 32 BUCKETS  
ROW FORMAT DELIMITED  
FIELDS TERMINATED BY ','  
STORE AS SEQUENCEFILE;      

2.6 Hive 视图

在 Hive 中,视图是逻辑数据结构,可以通过隐藏复杂数据操作(Joins, 子查询, 过滤,数据扁平化)来于简化查询操作。

与关系数据库不同的是,Hive视图并不存储数据或者实例化。一旦创建 HIve 视图,它的 schema 也会立刻确定下来。对底层表后续的更改(如 增加新列)并不会影响视图的 schema。如果底层表被删除或者改变,之后对视图的查询将会 failed。基于以上 Hive view 的特性,我们在ETL和数据仓库中对于经常变化的表应慎重使用视图

创建视图:

CREATE VIEW employee_skills
 AS
SELECT name, skills_score['DB'] AS DB,
skills_score['Perl'] AS Perl, 
skills_score['Python'] AS Python,
skills_score['Sales'] as Sales, 
skills_score['HR'] as HR 
FROM employee;

创建视图的时候是不会触发 MapReduce 的 Job,因为只存在元数据的改变。

但是,当对视图进行查询的时候依然会触发一个 MapReduce Job 进程:SHOW CREATE TABLE 或者 DESC FORMATTED TABLE 语句来显示通过 CREATE VIEW 语句创建的视图。以下是对Hive 视图的 DDL操作:

更改视图的属性:

ALTER VIEW employee_skills 
SET TBLPROPERTIES ('comment' = 'This is a view');

重新定义视图:

ALTER VIEW employee_skills AS 
SELECT * from employee ;

删除视图:

DROP VIEW employee_skills; 

三、Hive数据抽样

当数据规模不断膨胀时,我们需要找到一个数据的子集来加快数据分析效率。因此我们就需要通过筛选和分析数据集为了进行模式 & 趋势识别。目前来说有三种方式来进行抽样:随机抽样,桶表抽样,和块抽样。

3.1 随机抽样

关键词:rand()函数

使用rand()函数进行随机抽样,limit关键字限制抽样返回的数据,其中rand函数前的distribute和sort关键字可以保证数据在mapper和reducer阶段是随机分布的。

案例如下:

select * from table_name 
where col=xxx 
distribute by rand() sort by rand() 
limit num; 

使用order 关键词:

案例如下:

select * from table_name 
where col=xxx 
order by rand() 
limit num; 

经测试对比,千万级数据中进行随机抽样 order by方式耗时更长,大约多30秒左右。

3.2 块抽样

关键词:tablesample()函数

  1. tablesample(n percent) 根据hive表数据的大小按比例抽取数据,并保存到新的hive表中。如:抽取原hive表中10%的数据

注意:测试过程中发现,select语句不能带where条件且不支持子查询,可通过新建中间表或使用随机抽样解决。

select * from xxx tablesample(10 percent) 数字与percent之间要有空格
  1. tablesample(nM) 指定抽样数据的大小,单位为M。
select * from xxx tablesample(20M) 数字与M之间不要有空格
  1. tablesample(n rows) 指定抽样数据的行数,其中n代表每个map任务均取n行数据,map数量可通过hive表的简单查询语句确认(关键词:number of mappers: x)
select * from xxx tablesample(100 rows) 数字与rows之间要有空格

3.3 桶表抽样

关键词:tablesample (bucket x out of y [on colname])

其中x是要抽样的桶编号,桶编号从1开始,colname表示抽样的列,y表示桶的数量。

hive中分桶其实就是根据某一个字段Hash取模,放入指定数据的桶中,比如将表table_1按照ID分成100个桶,其算法是hash(id) % 100,这样,hash(id) % 100 = 0的数据被放到第一个桶中,hash(id) % 100 = 1的记录被放到第二个桶中。创建分桶表的关键语句为:CLUSTER BY语句。

例如:将表随机分成10组,抽取其中的第一个桶的数据:

select * from table_01 
tablesample(bucket 1 out of 10 on rand())

四、Hive计算引擎

目前Hive支持MapReduce、Tez和Spark 三种计算引擎。

4.1 MR计算引擎

MR运行的完整过程:

Map在读取数据时,先将数据拆分成若干数据,并读取到Map方法中被处理。数据在输出的时候,被分成若干分区并写入内存缓存(buffer)中,内存缓存被数据填充到一定程度会溢出到磁盘并排序,当Map执行完后会将一个机器上输出的临时文件进行归并存入到HDFS中。

当Reduce启动时,会启动一个线程去读取Map输出的数据,并写入到启动Reduce机器的内存中,在数据溢出到磁盘时会对数据进行再次排序。当读取数据完成后会将临时文件进行合并,作为Reduce函数的数据源。

4.2 Tez计算引擎

Apache Tez是进行大规模数据处理且支持DAG作业的计算框架,它直接源于MapReduce框架,除了能够支持MapReduce特性,还支持新的作业形式,并允许不同类型的作业能够在一个集群中运行。

Tez将原有的Map和Reduce两个操作简化为一个概念——Vertex,并将原有的计算处理节点拆分成多个组成部分:Vertex Input、Vertex Output、Sorting、Shuffling和Merging。计算节点之间的数据通信被统称为Edge,这些分解后的元操作可以任意灵活组合,产生新的操作,这些操作经过一些控制程序组装后,可形成一个大的DAG作业。

通过允许Apache Hive运行复杂的DAG任务,Tez可以用来处理数据,之前需要多个MR jobs,现在一个Tez任务中。

Tez和MapReduce作业的比较

  • Tez绕过了MapReduce很多不必要的中间的数据存储和读取的过程,直接在一个作业中表达了MapReduce需要多个作业共同协作才能完成的事情。
  • Tez和MapReduce一样都运行使用YARN作为资源调度和管理。但与MapReduce on YARN不同,Tez on YARN并不是将作业提交到ResourceManager,而是提交到AMPoolServer的服务上,AMPoolServer存放着若干已经预先启动ApplicationMaster的服务。
  • 当用户提交一个作业上来后,AMPoolServer从中选择一个ApplicationMaster用于管理用户提交上来的作业,这样既可以节省ResourceManager创建ApplicationMaster的时间,而又能够重用每个ApplicationMaster的资源,节省了资源释放和创建时间。

Tez相比于MapReduce有几点重大改进

  • 当查询需要有多个reduce逻辑时,Hive的MapReduce引擎会将计划分解,每个Redcue提交一个MR作业。这个链中的所有MR作业都需要逐个调度,每个作业都必须从HDFS中重新读取上一个作业的输出并重新洗牌。而在Tez中,几个reduce接收器可以直接连接,数据可以流水线传输,而不需要临时HDFS文件,这种模式称为MRR(Map-reduce-reduce*)。
  • Tez还允许一次发送整个查询计划,实现应用程序动态规划,从而使框架能够更智能地分配资源,并通过各个阶段流水线传输数据。对于更复杂的查询来说,这是一个巨大的改进,因为它消除了IO/sync障碍和各个阶段之间的调度开销。
  • 在MapReduce计算引擎中,无论数据大小,在洗牌阶段都以相同的方式执行,将数据序列化到磁盘,再由下游的程序去拉取,并反序列化。Tez可以允许小数据集完全在内存中处理,而MapReduce中没有这样的优化。仓库查询经常需要在处理完大量的数据后对小型数据集进行排序或聚合,Tez的优化也能极大地提升效率。

4.3 Spark计算引擎

Apache Spark是专为大规模数据处理而设计的快速、通用支持DAG(有向无环图)作业的计算引擎,类似于Hadoop MapReduce的通用并行框架,可用来构建大型的、低延迟的数据分析应用程序。

Spark是用于大规模数据处理的统一分析引擎,基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性高可伸缩性,允许用户将Spark部署在大量硬件之上,形成集群。

Spark运行流程

Spark运行流程

Spark具有以下几个特性。

1.高效性

Spark会将作业构成一个DAG,优化了大型作业一些重复且浪费资源的操作,对查询进行了优化,重新编写了物理执行引擎,如可以实现MRR模式。

2.易用性

Spark不同于MapReducer只提供两种简单的编程接口,它提供了多种编程接口去操作数据,这些操作接口如果使用MapReduce去实现,需要更多的代码。Spark的操作接口可以分为两类:transformation(转换)和action(执行)。Transformation包含map、flatmap、distinct、reduceByKey和join等转换操作;Action包含reduce、collect、count和first等操作。

3.通用性

Spark针对实时计算、批处理、交互式查询,提供了统一的解决方案。但在批处理方面相比于MapReduce处理同样的数据,Spark所要求的硬件设施更高,MapReduce在相同的设备下所能处理的数据量会比Spark多。所以在实际工作中,Spark在批处理方面只能算是MapReduce的一种补充。

4.兼容性

Spark和MapReduce一样有丰富的产品生态做支撑。例如Spark可以使用YARN作为资源管理器,Spark也可以处理Hbase和HDFS上的数据。

五、存储与压缩

5.1 Hive存储格式

Hive支持的存储数的格式主要有:TEXTFILE(行式存储) 、SEQUENCEFILE(行式存储)、ORC(列式存储)、PARQUET(列式存储)。

5.1.1 行式存储和列式存储

上图左边为逻辑表,右边第一个为行式存储,第二个为列式存储。

行存储的特点: 查询满足条件的一整行数据的时候,列存储则需要去每个聚集的字段找到对应的每个列的值,行存储只需要找到其中一个值,其余的值都在相邻地方,所以此时行存储查询的速度更快。select *

列存储的特点: 因为每个字段的数据聚集存储,在查询只需要少数几个字段的时候,能大大减少读取的数据量;每个字段的数据类型一定是相同的,列式存储可以针对性的设计更好的设计压缩算法。 select 某些字段效率更高。

5.1.2 TEXTFILE

默认格式,数据不做压缩,磁盘开销大,数据解析开销大。可结合Gzip、Bzip2使用(系统自动检查,执行查询时自动解压),但使用这种方式,hive不会对数据进行切分,从而无法对数据进行并行操作。

5.1.3 ORC格式

Orc (Optimized Row Columnar)是hive 0.11版里引入的新的存储格式。

可以看到每个Orc文件由1个或多个stripe组成,每个stripe250MB大小,这个Stripe实际相当于RowGroup概念,不过大小由4MB->250MB,这样能提升顺序读的吞吐率。每个Stripe里有三部分组成,分别是Index Data,Row Data,Stripe Footer:

  1. Index Data:一个轻量级的index,默认是每隔1W行做一个索引。这里做的索引只是记录某行的各字段在Row Data中的offset。
  2. Row Data:存的是具体的数据,先取部分行,然后对这些行按列进行存储。对每个列进行了编码,分成多个Stream来存储。
  3. Stripe Footer:存的是各个stripe的元数据信息

每个文件有一个File Footer,这里面存的是每个Stripe的行数,每个Column的数据类型信息等;每个文件的尾部是一个PostScript,这里面记录了整个文件的压缩类型以及FileFooter的长度信息等。在读取文件时,会seek到文件尾部读PostScript,从里面解析到File Footer长度,再读FileFooter,从里面解析到各个Stripe信息,再读各个Stripe,即从后往前读。

5.1.4 PARQUET格式

Parquet是面向分析型业务的列式存储格式,由Twitter和Cloudera合作开发,2015年5月从Apache的孵化器里毕业成为Apache顶级项目。

Parquet文件是以二进制方式存储的,所以是不可以直接读取的,文件中包括该文件的数据和元数据,因此Parquet格式文件是自解析的。

通常情况下,在存储Parquet数据的时候会按照Block大小设置行组的大小,由于一般情况下每一个Mapper任务处理数据的最小单位是一个Block,这样可以把每一个行组由一个Mapper任务处理,增大任务执行并行度。Parquet文件的格式如下图所示。

上图展示了一个Parquet文件的内容,一个文件中可以存储多个行组,文件的首位都是该文件的Magic Code,用于校验它是否是一个Parquet文件,Footer length记录了文件元数据的大小,通过该值和文件长度可以计算出元数据的偏移量,文件的元数据中包括每一个行组的元数据信息和该文件存储数据的Schema信息。除了文件中每一个行组的元数据,每一页的开始都会存储该页的元数据,在Parquet中,有三种类型的页:数据页、字典页和索引页。数据页用于存储当前行组中该列的值,字典页存储该列值的编码字典,每一个列块中最多包含一个字典页,索引页用来存储当前行组下该列的索引,目前Parquet中还不支持索引页。

5.2 Hive压缩格式

在实际工作当中,hive当中处理的数据,一般都需要经过压缩,前期我们在学习hadoop的时候,已经配置过hadoop的压缩,我们这里的hive也是一样的可以使用压缩来节省我们的MR处理的网络带宽

mr支持的压缩格式:

压缩格式 工具 算法 文件扩展名 是否可切分 DEFAULT 无 DEFAULT .deflate 否 Gzip gzip DEFAULT .gz 否 bzip2 bzip2 bzip2 .bz2 是 LZO lzop LZO .lzo 否 LZ4 无 LZ4 .lz4 否 Snappy 无 Snappy .snappy 否

hadoop支持的解压缩的类:

压缩格式 对应的编码/解码器 DEFLATE org.apache.hadoop.io.compress.DefaultCodec gzip org.apache.hadoop.io.compress.GzipCodec bzip2 org.apache.hadoop.io.compress.BZip2Codec LZO com.hadoop.compression.lzo.LzopCodec LZ4 org.apache.hadoop.io.compress.Lz4Codec Snappy org.apache.hadoop.io.compress.SnappyCodec

压缩性能的比较:

压缩算法 原始文件大小 压缩文件大小 压缩速度 解压速度 gzip 8.3GB 1.8GB 17.5MB/s 58MB/s bzip2 8.3GB 1.1GB 2.4MB/s 9.5MB/s LZO 8.3GB 2.9GB 49.3MB/s 74.6MB/s

Snappy生成的压缩文件要大20%到100%。在64位模式下的core i7处理器的单内核上,Snappy以250 MB/秒或更多的速度压缩,并以500 MB/秒或更多的速度解压。

实现压缩hadoop需要配置的压缩参数:

hive配置压缩的方式:

  1. 开启map端的压缩方式:
1.1)开启hive中间传输数据压缩功能
 hive (default)>set hive.exec.compress.intermediate=true;
1.2)开启mapreduce中map输出压缩功能
 hive (default)>set mapreduce.map.output.compress=true;
1.3)设置mapreduce中map输出数据的压缩方式
 hive (default)>set mapreduce.map.output.compress.codec= org.apache.hadoop.io.compress.SnappyCodec;
1.4)执行查询语句
 select count(1) from score;
  1. 开启reduce端的压缩方式
1)开启hive最终输出数据压缩功能
 hive (default)>set hive.exec.compress.output=true;
2)开启mapreduce最终输出数据压缩
 hive (default)>set mapreduce.output.fileoutputformat.compress=true;
3)设置mapreduce最终数据输出压缩方式
 hive (default)> set mapreduce.output.fileoutputformat.compress.codec = org.apache.hadoop.io.compress.SnappyCodec;
4)设置mapreduce最终数据输出压缩为块压缩
 hive (default)>set mapreduce.output.fileoutputformat.compress.type=BLOCK;
5)测试一下输出结果是否是压缩文件
 insert overwrite local directory '/export/servers/snappy' select * from score distribute by s_id sort by s_id desc;

5.3 存储和压缩相结合

ORC存储方式的压缩:

Key Default Notes orc.compress ZLIB 高级压缩(可选: NONE, ZLIB, SNAPPY) orc.compress.size 262,144 每个压缩块中的字节数 orc.stripe.size 67,108,864 每条stripe中的字节数 orc.row.index.stride 10,000 索引条目之间的行数(必须是>= 1000) orc.create.index true 是否创建行索引 orc.bloom.filter.columns "" 逗号分隔的列名列表,应该为其创建bloom过滤器 orc.bloom.filter.fpp 0.05 bloom过滤器的假阳性概率(必须是>0.0和<1.0)

创建一个非压缩的ORC存储方式:

1)建表语句
    create table log_orc_none(
    track_time string,
    url string,
    session_id string,
    referer string,
    ip string,
    end_user_id string,
    city_id string
    )ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS orc tblproperties ("orc.compress"="NONE");
2)插入数据
 insert into table log_orc_none select * from log_text ;
3)查看插入后数据
 dfs -du -h /user/hive/warehouse/myhive.db/log_orc_none;
 结果显示:
 7.7 M  /user/hive/warehouse/log_orc_none/123456_0

创建一个SNAPPY压缩的ORC存储方式:

1)建表语句
    create table log_orc_snappy(
    track_time string,
    url string,
    session_id string,
    referer string,
    ip string,
    end_user_id string,
    city_id string
    )ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS orc tblproperties ("orc.compress"="SNAPPY");
2)插入数据
 insert into table log_orc_snappy select * from log_text ;
3)查看插入后数据
 dfs -du -h /user/hive/warehouse/myhive.db/log_orc_snappy ;
 结果显示: 
 3.8 M  /user/hive/warehouse/log_orc_snappy/123456_0
4)上一节中默认创建的ORC存储方式,导入数据后的大小为
 2.8 M  /user/hive/warehouse/log_orc/123456_0
 比Snappy压缩的还小。原因是orc存储文件默认采用ZLIB压缩。比snappy压缩的小。
5)存储方式和压缩总结:
 在实际的项目开发当中,hive表的数据存储格式一般选择:orc或parquet。压缩方式一般选择snappy。

5.4 主流存储文件性能对比

从存储文件的压缩比和查询速度两个角度对比。

压缩比比较

  • TextFile
(1)创建表,存储数据格式为TEXTFILE
    create table log_text (
    track_time string,
    url string,
    session_id string,
    referer string,
    ip string,
    end_user_id string,
    city_id string
    )ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE ;

(2)向表中加载数据
 load data local inpath '/export/servers/hivedatas/log.data' into table log_text ;

(3)查看表中数据大小,大小为18.1M
 dfs -du -h /user/hive/warehouse/myhive.db/log_text;
 结果显示: 
 18.1 M  /user/hive/warehouse/log_text/log.data
  • ORC
(1)创建表,存储数据格式为ORC
    create table log_orc(
    track_time string,
    url string,
    session_id string,
    referer string,
    ip string,
    end_user_id string,
    city_id string
    )ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS orc ;

(2)向表中加载数据
 insert into table log_orc select * from log_text ;

(3)查看表中数据大小
 dfs -du -h /user/hive/warehouse/myhive.db/log_orc;
 结果显示:
 2.8 M  /user/hive/warehouse/log_orc/123456_0
  • Parquet
1)创建表,存储数据格式为parquet
    create table log_parquet(
    track_time string,
    url string,
    session_id string,
    referer string,
    ip string,
    end_user_id string,
    city_id string
    )ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS PARQUET ; 

2)向表中加载数据
 insert into table log_parquet select * from log_text ;

3)查看表中数据大小
 dfs -du -h /user/hive/warehouse/myhive.db/log_parquet;
 结果显示:
 13.1 M  /user/hive/warehouse/log_parquet/123456_0

数据压缩比结论:

ORC > Parquet > textFile

存储文件的查询效率测试

  • textFile
hive (default)> select count(*) from log_text;
_c0
100000
Time taken: 21.54 seconds, Fetched: 1 row(s)  
  • ORC
hive (default)> select count(*) from log_orc;
_c0
100000
Time taken: 20.867 seconds, Fetched: 1 row(s) 
  • Parquet
hive (default)> select count(*) from log_parquet; 
_c0
100000
Time taken: 22.922 seconds, Fetched: 1 row(s)

存储文件的查询效率比较:

ORC > TextFile > Parquet

六、Hive Sql 大全

本节基本涵盖了Hive日常使用的所有SQL,因为SQL太多,所以将SQL进行了如下分类: 一、DDL语句(数据定义语句):
对数据库的操作:包含创建、修改数据库
对数据表的操作:分为内部表及外部表,分区表和分桶表
二、DQL语句(数据查询语句):
单表查询、关联查询
hive函数:包含聚合函数,条件函数,日期函数,字符串函数等
行转列及列转行:lateral view 与 explode 以及 reflect
窗口函数与分析函数
其他一些窗口函数

hive的DDL语法

对数据库的操作

  • 创建数据库:
create database if not exists myhive;
说明:hive的表存放位置模式是由hive-site.xml当中的一个属性指定的 :hive.metastore.warehouse.dir

创建数据库并指定hdfs存储位置 :
create database myhive2 location '/myhive2';
  • 修改数据库:
alter  database  myhive2  set  dbproperties('createtime'='20210329');

说明:可以使用alter database 命令来修改数据库的一些属性。但是数据库的元数据信息是不可更改的,包括数据库的名称以及数据库所在的位置

  • 查看数据库详细信息
查看数据库基本信息
hive (myhive)> desc  database  myhive2;

查看数据库更多详细信息
hive (myhive)> desc database extended  myhive2;
  • 删除数据库
删除一个空数据库,如果数据库下面有数据表,那么就会报错
drop  database  myhive2;

强制删除数据库,包含数据库下面的表一起删除
drop  database  myhive  cascade; 

对数据表的操作

对管理表(内部表)的操作:

  • 建内部表:
hive (myhive)> use myhive; -- 使用myhive数据库
hive (myhive)> create table stu(id int,name string);
hive (myhive)> insert into stu values (1,"zhangsan");
hive (myhive)> insert into stu values (1,"zhangsan"),(2,"lisi");  -- 一次插入多条数据
hive (myhive)> select * from stu;
  • hive建表时候的字段类型:

分类 类型 描述 字面量示例 原始类型 BOOLEAN true/false TRUE TINYINT 1字节的有符号整数 -128~127 1Y SMALLINT 2个字节的有符号整数,-32768~32767 1S INT 4个字节的带符号整数 1 BIGINT 8字节带符号整数 1L FLOAT 4字节单精度浮点数1.0 DOUBLE 8字节双精度浮点数 1.0 DEICIMAL 任意精度的带符号小数 1.0 STRING 字符串,变长 “a”,’b’ VARCHAR 变长字符串 “a”,’b’ CHAR 固定长度字符串 “a”,’b’ BINARY 字节数组 无法表示 TIMESTAMP 时间戳,毫秒值精度 122327493795 DATE 日期 ‘2016-03-29’ INTERVAL 时间频率间隔 复杂类型 ARRAY 有序的的同类型的集合 array(1,2) MAP key-value,key必须为原始类型,value可以任意类型 map(‘a’,1,’b’,2) STRUCT 字段集合,类型可以不同 struct(‘1’,1,1.0), named_stract(‘col1’,’1’,’col2’,1,’clo3’,1.0) UNION 在有限取值范围内的一个值 create_union(1,’a’,63)

对decimal类型简单解释下
用法:decimal(11,2) 代表最多有11位数字,其中后2位是小数,整数部分是9位;如果整数部分超过9位,则这个字段就会变成null;如果小数部分不足2位,则后面用0补齐两位,如果小数部分超过两位,则超出部分四舍五入
也可直接写 decimal,后面不指定位数,默认是 decimal(10,0) 整数10位,没有小数

七、Hive执行计划

Hive SQL的执行计划描述SQL实际执行的整体轮廓,通过执行计划能了解SQL程序在转换成相应计算引擎的执行逻辑,掌握了执行逻辑也就能更好地把握程序出现的瓶颈点,从而能够实现更有针对性的优化。此外还能帮助开发者识别看似等价的SQL其实是不等价的,看似不等价的SQL其实是等价的SQL。可以说执行计划是打开SQL优化大门的一把钥匙

要想学SQL执行计划,就需要学习查看执行计划的命令:explain,在查询语句的SQL前面加上关键字explain是查看执行计划的基本方法。

学会explain,能够给我们工作中使用hive带来极大的便利!

查看SQL的执行计划

Hive提供的执行计划目前可以查看的信息有以下几种:

  • explain:查看执行计划的基本信息;
  • explain dependency:dependency在explain语句中使用会产生有关计划中输入的额外信息。它显示了输入的各种属性;
  • explain authorization:查看SQL操作相关权限的信息;
  • explain vectorization:查看SQL的向量化描述信息,显示为什么未对Map和Reduce进行矢量化。从 Hive 2.3.0 开始支持;
  • explain analyze:用实际的行数注释计划。从 Hive 2.2.0 开始支持;
  • explain cbo:输出由Calcite优化器生成的计划。CBO 从 Hive 4.0.0 版本开始支持;
  • explain locks:这对于了解系统将获得哪些锁以运行指定的查询很有用。LOCKS 从 Hive 3.2.0 开始支持;
  • explain ast:输出查询的抽象语法树。AST 在 Hive 2.1.0 版本删除了,存在bug,转储AST可能会导致OOM错误,将在4.0.0版本修复;
  • explain extended:加上 extended 可以输出有关计划的额外信息。这通常是物理信息,例如文件名,这些额外信息对我们用处不大;

1. explain 的用法

Hive提供了explain命令来展示一个查询的执行计划,这个执行计划对于我们了解底层原理,Hive 调优,排查数据倾斜等很有帮助。

使用语法如下:

explain query;

在 hive cli 中输入以下命令(hive 2.3.7):

explain select sum(id) from test1;

得到结果:

STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 depends on stages: Stage-1

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: test1
            Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
            Select Operator
              expressions: id (type: int)
              outputColumnNames: id
              Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
              Group By Operator
                aggregations: sum(id)
                mode: hash
                outputColumnNames: _col0
                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
                Reduce Output Operator
                  sort order:
                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
                  value expressions: _col0 (type: bigint)
      Reduce Operator Tree:
        Group By Operator
          aggregations: sum(VALUE._col0)
          mode: mergepartial
          outputColumnNames: _col0
          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
          File Output Operator
            compressed: false
            Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
            table:
                input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink

看完以上内容有什么感受,是不是感觉都看不懂,不要着急,下面将会详细讲解每个参数,相信你学完下面的内容之后再看 explain 的查询结果将游刃有余。

一个HIVE查询被转换为一个由一个或多个stage组成的序列(有向无环图DAG)。这些stage可以是MapReduce stage,也可以是负责元数据存储的stage,也可以是负责文件系统的操作(比如移动和重命名)的stage

我们将上述结果拆分看,先从最外层开始,包含两个大的部分:

  1. stage dependencies: 各个stage之间的依赖性
  2. stage plan: 各个stage的执行计划

先看第一部分 stage dependencies ,包含两个 stage,Stage-1 是根stage,说明这是开始的stage,Stage-0 依赖 Stage-1,Stage-1执行完成后执行Stage-0。

再看第二部分 stage plan,里面有一个 Map Reduce,一个MR的执行计划分为两个部分:

  1. Map Operator Tree: MAP端的执行计划树
  2. Reduce Operator Tree: Reduce端的执行计划树

这两个执行计划树里面包含这条sql语句的 operator:

  1. TableScan:表扫描操作,map端第一个操作肯定是加载表,所以就是表扫描操作,常见的属性: alias: 表名称Statistics: 表统计信息,包含表中数据条数,数据大小等
  2. Select Operator: 选取操作,常见的属性 : expressions:需要的字段名称及字段类型outputColumnNames:输出的列名称Statistics:表统计信息,包含表中数据条数,数据大小等
  3. Group By Operator:分组聚合操作,常见的属性: aggregations:显示聚合函数信息mode:聚合模式,值有 hash:随机聚合,就是hash partition;partial:局部聚合;final:最终聚合keys:分组的字段,如果没有分组,则没有此字段outputColumnNames:聚合之后输出列名Statistics: 表统计信息,包含分组聚合之后的数据条数,数据大小等
  4. Reduce Output Operator:输出到reduce操作,常见属性: sort order:值为空 不排序;值为 + 正序排序,值为 - 倒序排序;值为 +- 排序的列为两列,第一列为正序,第二列为倒序
  5. Filter Operator:过滤操作,常见的属性: predicate:过滤条件,如sql语句中的where id>=1,则此处显示(id >= 1)
  6. Map Join Operator:join 操作,常见的属性: condition map:join方式 ,如Inner Join 0 to 1 Left Outer Join0 to 2keys: join 的条件字段outputColumnNames: join 完成之后输出的字段Statistics: join 完成之后生成的数据条数,大小等
  7. File Output Operator:文件输出操作,常见的属性 compressed:是否压缩table:表的信息,包含输入输出文件格式化方式,序列化方式等
  8. Fetch Operator 客户端获取数据操作,常见的属性: limit,值为 -1 表示不限制条数,其他值为限制的条数

2. explain 的使用场景

本节介绍 explain 能够为我们在生产实践中带来哪些便利及解决我们哪些迷惑

案例一:join 语句会过滤 null 的值吗?

现在,我们在hive cli 输入以下查询计划语句

select a.id,b.user_name from test1 a join test2 b on a.id=b.id;

问:上面这条 join 语句会过滤 id 为 null 的值吗

执行下面语句:

explain select a.id,b.user_name from test1 a join test2 b on a.id=b.id;

我们来看结果 (为了适应页面展示,仅截取了部分输出信息):

TableScan
 alias: a
 Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
 Filter Operator
    predicate: id is not null (type: boolean)
    Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
    Select Operator
        expressions: id (type: int)
        outputColumnNames: _col0
        Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
        HashTable Sink Operator
           keys:
             0 _col0 (type: int)
             1 _col0 (type: int)
 ...

从上述结果可以看到 predicate: id is not null 这样一行,说明 join 时会自动过滤掉关联字段为 null 值的情况,但 left join 或 full join 是不会自动过滤null值的,大家可以自行尝试下。

案例二:group by 分组语句会进行排序吗?

看下面这条sql

select id,max(user_name) from test1 group by id;

问:group by 分组语句会进行排序吗

直接来看 explain 之后结果 (为了适应页面展示,仅截取了部分输出信息)

 TableScan
    alias: test1
    Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE
    Select Operator
        expressions: id (type: int), user_name (type: string)
        outputColumnNames: id, user_name
        Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE
        Group By Operator
           aggregations: max(user_name)
           keys: id (type: int)
           mode: hash
           outputColumnNames: _col0, _col1
           Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE
           Reduce Output Operator
             key expressions: _col0 (type: int)
             sort order: +
             Map-reduce partition columns: _col0 (type: int)
             Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE
             value expressions: _col1 (type: string)
 ...

我们看 Group By Operator,里面有 keys: id (type: int) 说明按照 id 进行分组的,再往下看还有 sort order: + ,说明是按照 id 字段进行正序排序的

案例三:哪条sql执行效率高呢?

观察两条sql语句

SELECT
 a.id,
 b.user_name
FROM
 test1 a
JOIN test2 b ON a.id = b.id
WHERE
 a.id > 2;
SELECT
 a.id,
 b.user_name
FROM
 (SELECT * FROM test1 WHERE id > 2) a
JOIN test2 b ON a.id = b.id;

这两条sql语句输出的结果是一样的,但是哪条sql执行效率高呢

有人说第一条sql执行效率高,因为第二条sql有子查询,子查询会影响性能;

有人说第二条sql执行效率高,因为先过滤之后,在进行join时的条数减少了,所以执行效率就高了。

到底哪条sql效率高呢,我们直接在sql语句前面加上 explain,看下执行计划不就知道了嘛!

在第一条sql语句前加上 explain,得到如下结果

hive (default)> explain select a.id,b.user_name from test1 a join test2 b on a.id=b.id where a.id >2;
OK
Explain
STAGE DEPENDENCIES:
  Stage-4 is a root stage
  Stage-3 depends on stages: Stage-4
  Stage-0 depends on stages: Stage-3

STAGE PLANS:
  Stage: Stage-4
    Map Reduce Local Work
      Alias -> Map Local Tables:
        $hdt$_0:a
          Fetch Operator
            limit: -1
      Alias -> Map Local Operator Tree:
        $hdt$_0:a
          TableScan
            alias: a
            Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: (id > 2) (type: boolean)
              Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
              Select Operator
                expressions: id (type: int)
                outputColumnNames: _col0
                Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
                HashTable Sink Operator
                  keys:
                    0 _col0 (type: int)
                    1 _col0 (type: int)

  Stage: Stage-3
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: b
            Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: (id > 2) (type: boolean)
              Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
              Select Operator
                expressions: id (type: int), user_name (type: string)
                outputColumnNames: _col0, _col1
                Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
                Map Join Operator
                  condition map:
                       Inner Join 0 to 1
                  keys:
                    0 _col0 (type: int)
                    1 _col0 (type: int)
                  outputColumnNames: _col0, _col2
                  Statistics: Num rows: 2 Data size: 27 Basic stats: COMPLETE Column stats: NONE
                  Select Operator
                    expressions: _col0 (type: int), _col2 (type: string)
                    outputColumnNames: _col0, _col1
                    Statistics: Num rows: 2 Data size: 27 Basic stats: COMPLETE Column stats: NONE
                    File Output Operator
                      compressed: false
                      Statistics: Num rows: 2 Data size: 27 Basic stats: COMPLETE Column stats: NONE
                      table:
                          input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                          output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                          serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
      Local Work:
        Map Reduce Local Work

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink

在第二条sql语句前加上 explain,得到如下结果

hive (default)> explain select a.id,b.user_name from(select * from  test1 where id>2 ) a join test2 b on a.id=b.id;
OK
Explain
STAGE DEPENDENCIES:
  Stage-4 is a root stage
  Stage-3 depends on stages: Stage-4
  Stage-0 depends on stages: Stage-3

STAGE PLANS:
  Stage: Stage-4
    Map Reduce Local Work
      Alias -> Map Local Tables:
        $hdt$_0:test1
          Fetch Operator
            limit: -1
      Alias -> Map Local Operator Tree:
        $hdt$_0:test1
          TableScan
            alias: test1
            Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: (id > 2) (type: boolean)
              Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
              Select Operator
                expressions: id (type: int)
                outputColumnNames: _col0
                Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
                HashTable Sink Operator
                  keys:
                    0 _col0 (type: int)
                    1 _col0 (type: int)

  Stage: Stage-3
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: b
            Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: (id > 2) (type: boolean)
              Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
              Select Operator
                expressions: id (type: int), user_name (type: string)
                outputColumnNames: _col0, _col1
                Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
                Map Join Operator
                  condition map:
                       Inner Join 0 to 1
                  keys:
                    0 _col0 (type: int)
                    1 _col0 (type: int)
                  outputColumnNames: _col0, _col2
                  Statistics: Num rows: 2 Data size: 27 Basic stats: COMPLETE Column stats: NONE
                  Select Operator
                    expressions: _col0 (type: int), _col2 (type: string)
                    outputColumnNames: _col0, _col1
                    Statistics: Num rows: 2 Data size: 27 Basic stats: COMPLETE Column stats: NONE
                    File Output Operator
                      compressed: false
                      Statistics: Num rows: 2 Data size: 27 Basic stats: COMPLETE Column stats: NONE
                      table:
                          input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                          output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                          serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
      Local Work:
        Map Reduce Local Work

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink

大家有什么发现,除了表别名不一样,其他的执行计划完全一样,都是先进行 where 条件过滤,在进行 join 条件关联。说明 hive 底层会自动帮我们进行优化,所以这两条sql语句执行效率是一样的

以上仅列举了3个我们生产中既熟悉又有点迷糊的例子,explain 还有很多其他的用途,如查看stage的依赖情况、排查数据倾斜、hive 调优等,小伙伴们可以自行尝试。

2. explain dependency的用法

explain dependency用于描述一段SQL需要的数据来源,输出是一个json格式的数据,里面包含以下两个部分的内容:

  • input_partitions:描述一段SQL依赖的数据来源表分区,里面存储的是分区名的列表,如果整段SQL包含的所有表都是非分区表,则显示为空。
  • input_tables:描述一段SQL依赖的数据来源表,里面存储的是Hive表名的列表。

使用explain dependency查看SQL查询非分区普通表,在 hive cli 中输入以下命令:

explain dependency select s_age,count(1) num from student_orc;

得到结果:

{"input_partitions":[],"input_tables":[{"tablename":"default@student_tb _orc","tabletype":"MANAGED_TABLE"}]}

使用explain dependency查看SQL查询分区表,在 hive cli 中输入以下命令:

explain dependency select s_age,count(1) num from student_orc_partition;

得到结果:

{"input_partitions":[{"partitionName":"default@student_orc_partition@ part=0"}, 
{"partitionName":"default@student_orc_partition@part=1"}, 
{"partitionName":"default@student_orc_partition@part=2"}, 
{"partitionName":"default@student_orc_partition@part=3"},
{"partitionName":"default@student_orc_partition@part=4"}, 
{"partitionName":"default@student_orc_partition@part=5"},
{"partitionName":"default@student_orc_partition@part=6"},
{"partitionName":"default@student_orc_partition@part=7"},
{"partitionName":"default@student_orc_partition@part=8"},
{"partitionName":"default@student_orc_partition@part=9"}], 
"input_tables":[{"tablename":"default@student_orc_partition", "tabletype":"MANAGED_TABLE"}]

explain dependency的使用场景有两个:

  • 场景一:快速排除。快速排除因为读取不到相应分区的数据而导致任务数据输出异常。例如,在一个以天分区的任务中,上游任务因为生产过程不可控因素出现异常或者空跑,导致下游任务引发异常。通过这种方式,可以快速查看SQL读取的分区是否出现异常。
  • 场景二:理清表的输入,帮助理解程序的运行,特别是有助于理解有多重子查询,多表连接的依赖输入。

下面通过两个案例来看explain dependency的实际运用:

案例一:识别看似等价的代码

对于刚接触SQL的程序员,很容易将

select * from a inner join b on a.no=b.no and a.f>1 and a.f<3;

等价于

select * from a inner join b on a.no=b.no where a.f>1 and a.f<3;

我们可以通过案例来查看下它们的区别:

代码1

select 
a.s_no 
from student_orc_partition a 
inner join 
student_orc_partition_only b 
on a.s_no=b.s_no and a.part=b.part and a.part>=1 and a.part<=2;

代码2

select 
a.s_no 
from student_orc_partition a 
inner join 
student_orc_partition_only b 
on a.s_no=b.s_no and a.part=b.part 
where a.part>=1 and a.part<=2;

我们看下上述两段代码explain dependency的输出结果:

代码1的explain dependency结果

{"input_partitions": 
[{"partitionName":"default@student_orc_partition@part=0"}, 
{"partitionName":"default@student_orc_partition@part=1"}, 
{"partitionName":"default@student_orc_partition@part=2"},
{"partitionName":"default@student_orc_partition_only@part=1"}, 
{"partitionName":"default@student_orc_partition_only@part=2"}], 
"input_tables": [{"tablename":"default@student_orc_partition","tabletype":"MANAGED_TABLE"}, {"tablename":"default@student_orc_partition_only","tabletype":"MANAGED_TABLE"}]}

代码2的explain dependency结果

{"input_partitions": 
[{"partitionName":"default@student_orc_partition@part=1"}, 
{"partitionName" : "default@student_orc_partition@part=2"},
{"partitionName" :"default@student_orc_partition_only@part=1"},
{"partitionName":"default@student_orc_partition_only@part=2"}], 
"input_tables": [{"tablename":"default@student_orc_partition","tabletype":"MANAGED_TABLE"}, {"tablename":"default@student_orc_partition_only","tabletype":"MANAGED_TABLE"}]}

通过上面的输出结果可以看到,其实上述的两个SQL并不等价,代码1在内连接(inner join)中的连接条件(on)中加入非等值的过滤条件后,并没有将内连接的左右两个表按照过滤条件进行过滤,内连接在执行时会多读取part=0的分区数据。而在代码2中,会过滤掉不符合条件的分区。

案例二:识别SQL读取数据范围的差别

代码1

explain dependency
select
a.s_no 
from student_orc_partition a 
left join 
student_orc_partition_only b 
on a.s_no=b.s_no and a.part=b.part and b.part>=1 and b.part<=2;

代码2

explain dependency 
select 
a.s_no 
from student_orc_partition a 
left join 
student_orc_partition_only b 
on a.s_no=b.s_no and a.part=b.part and a.part>=1 and a.part<=2;

以上两个代码的数据读取范围是一样的吗?答案是不一样,我们通过explain dependency来看下:

代码1的explain dependency结果

{"input_partitions": 
[{"partitionName": "default@student_orc_partition@part=0"}, 
{"partitionName":"default@student_orc_partition@part=1"}, …中间省略7个分区
{"partitionName":"default@student_orc_partition@part=9"}, 
{"partitionName":"default@student_orc_partition_only@part=1"}, 
{"partitionName":"default@student_orc_partition_only@part=2"}], 
"input_tables": [{"tablename":"default@student_orc_partition","tabletype":"MANAGED_TABLE"}, {"tablename":"default@student_orc_partition_only","tabletype":"MANAGED_TABLE"}]}

代码2的explain dependency结果

{"input_partitions": 
[{"partitionName":"default@student_orc_partition@part=0"}, 
{"partitionName":"default@student_orc_partition@part=1"}, …中间省略7个分区 
{"partitionName":"default@student_orc_partition@part=9"}, 
{"partitionName":"default@student_orc_partition_only@part=0"}, 
{"partitionName":"default@student_orc_partition_only@part=1"}, …中间省略7个分区 
{"partitionName":"default@student_orc_partition_only@part=9"}],
"input_tables": [{"tablename":"default@student_orc_partition","tabletype":"MANAGED_TABLE"}, {"tablename":"default@student_orc_partition_only","tabletype":"MANAGED_TABLE"}]}

可以看到,对左外连接在连接条件中加入非等值过滤的条件,如果过滤条件是作用于右表(b表)有起到过滤的效果,则右表只要扫描两个分区即可,但是左表(a表)会进行全表扫描。如果过滤条件是针对左表,则完全没有起到过滤的作用,那么两个表将进行全表扫描。这时的情况就如同全外连接一样都需要对两个数据进行全表扫描。

在使用过程中,容易认为代码片段2可以像代码片段1一样进行数据过滤,通过查看explain dependency的输出结果,可以知道不是如此。

3. explain authorization 的用法

通过explain authorization可以知道当前SQL访问的数据来源(INPUTS) 和数据输出(OUTPUTS),以及当前Hive的访问用户 (CURRENT_USER)和操作(OPERATION)。

在 hive cli 中输入以下命令:

explain authorization 
select variance(s_score) from student_tb_orc;

结果如下:

INPUTS: 
  default@student_tb_orc 
OUTPUTS: 
  hdfs://node01:8020/tmp/hive/hdfs/cbf182a5-8258-4157-9194- 90f1475a3ed5/-mr-10000 
CURRENT_USER: 
  hdfs 
OPERATION: 
  QUERY 
AUTHORIZATION_FAILURES: 
  No privilege 'Select' found for inputs { database:default, table:student_ tb_orc, columnName:s_score}

从上面的信息可知:

上面案例的数据来源是defalut数据库中的 student_tb_orc表;

数据的输出路径是hdfs://node01:8020/tmp/hive/hdfs/cbf182a5-8258-4157-9194-90f1475a3ed5/-mr-10000;

当前的操作用户是hdfs,操作是查询;

观察上面的信息我们还会看到AUTHORIZATION_FAILURES信息,提示对当前的输入没有查询权限,但如果运行上面的SQL的话也能够正常运行。为什么会出现这种情况?Hive在默认不配置权限管理的情况下不进行权限验证,所有的用户在Hive里面都是超级管理员,即使不对特定的用户进行赋权,也能够正常查询

最后

通过上面对explain的介绍,可以发现explain中有很多值得我们去研究的内容,读懂 explain 的执行计划有利于我们优化Hive SQL,同时也能提升我们对SQL的掌控力。

八、Hive SQL底层执行原理

本节结构采用宏观着眼,微观入手,从整体到细节的方式剖析 Hive SQL 底层原理。第一节先介绍 Hive 底层的整体执行流程,然后第二节介绍执行流程中的 SQL 编译成 MapReduce 的过程,第三节剖析 SQL 编译成 MapReduce 的具体实现原理。

Hive 底层执行架构

我们先来看下 Hive 的底层执行架构图, Hive 的主要组件与 Hadoop 交互的过程:

Hive底层执行架构

在 Hive 这一侧,总共有五个组件:

  1. UI:用户界面。可看作我们提交SQL语句的命令行界面。
  2. DRIVER:驱动程序。接收查询的组件。该组件实现了会话句柄的概念。
  3. COMPILER:编译器。负责将 SQL 转化为平台可执行的执行计划。对不同的查询块和查询表达式进行语义分析,并最终借助表和从 metastore 查找的分区元数据来生成执行计划。
  4. METASTORE:元数据库。存储 Hive 中各种表和分区的所有结构信息。
  5. EXECUTION ENGINE:执行引擎。负责提交 COMPILER 阶段编译好的执行计划到不同的平台上。

上图的基本流程是:

步骤1:UI 调用 DRIVER 的接口;

步骤2:DRIVER 为查询创建会话句柄,并将查询发送到 COMPILER(编译器)生成执行计划;

步骤3和4:编译器从元数据存储中获取本次查询所需要的元数据,该元数据用于对查询树中的表达式进行类型检查,以及基于查询谓词修建分区;

步骤5:编译器生成的计划是分阶段的DAG,每个阶段要么是 map/reduce 作业,要么是一个元数据或者HDFS上的操作。将生成的计划发给 DRIVER。

如果是 map/reduce 作业,该计划包括 map operator trees 和一个 reduce operator tree,执行引擎将会把这些作业发送给 MapReduce :

步骤6、6.1、6.2和6.3:执行引擎将这些阶段提交给适当的组件。在每个 task(mapper/reducer) 中,从HDFS文件中读取与表或中间输出相关联的数据,并通过相关算子树传递这些数据。最终这些数据通过序列化器写入到一个临时HDFS文件中(如果不需要 reduce 阶段,则在 map 中操作)。临时文件用于向计划中后面的 map/reduce 阶段提供数据。

步骤7、8和9:最终的临时文件将移动到表的位置,确保不读取脏数据(文件重命名在HDFS中是原子操作)。对于用户的查询,临时文件的内容由执行引擎直接从HDFS读取,然后通过Driver发送到UI。

Hive SQL 编译成 MapReduce 过程

编译 SQL 的任务是在上节中介绍的 COMPILER(编译器组件)中完成的。Hive将SQL转化为MapReduce任务,整个编译过程分为六个阶段:

Hive SQL编译过程

  1. 词法、语法解析: Antlr 定义 SQL 的语法规则,完成 SQL 词法,语法解析,将 SQL 转化为抽象语法树 AST Tree;

Antlr是一种语言识别的工具,可以用来构造领域语言。使用Antlr构造特定的语言只需要编写一个语法文件,定义词法和语法替换规则即可,Antlr完成了词法分析、语法分析、语义分析、中间代码生成的过程。

  1. 语义解析: 遍历 AST Tree,抽象出查询的基本组成单元 QueryBlock;
  2. 生成逻辑执行计划: 遍历 QueryBlock,翻译为执行操作树 OperatorTree;
  3. 优化逻辑执行计划: 逻辑层优化器进行 OperatorTree 变换,合并 Operator,达到减少 MapReduce Job,减少数据传输及 shuffle 数据量;
  4. 生成物理执行计划: 遍历 OperatorTree,翻译为 MapReduce 任务;
  5. 优化物理执行计划: 物理层优化器进行 MapReduce 任务的变换,生成最终的执行计划。

下面对这六个阶段详细解析:

为便于理解,我们拿一个简单的查询语句进行展示,对5月23号的地区维表进行查询:

select * from dim.dim_region where dt = '2021-05-23';

阶段一:词法、语法解析

根据Antlr定义的sql语法规则,将相关sql进行词法、语法解析,转化为抽象语法树AST Tree:

ABSTRACT SYNTAX TREE:
TOK_QUERY
    TOK_FROM 
    TOK_TABREF
           TOK_TABNAME
               dim
                 dim_region
    TOK_INSERT
      TOK_DESTINATION
          TOK_DIR
              TOK_TMP_FILE
        TOK_SELECT
          TOK_SELEXPR
              TOK_ALLCOLREF
        TOK_WHERE
          =
              TOK_TABLE_OR_COL
                  dt
                    '2021-05-23'

阶段二:语义解析

遍历AST Tree,抽象出查询的基本组成单元QueryBlock:

AST Tree生成后由于其复杂度依旧较高,不便于翻译为mapreduce程序,需要进行进一步抽象和结构化,形成QueryBlock。

QueryBlock是一条SQL最基本的组成单元,包括三个部分:输入源,计算过程,输出。简单来讲一个QueryBlock就是一个子查询。

QueryBlock的生成过程为一个递归过程,先序遍历 AST Tree ,遇到不同的 Token 节点(理解为特殊标记),保存到相应的属性中。

阶段三:生成逻辑执行计划

遍历QueryBlock,翻译为执行操作树OperatorTree:

Hive最终生成的MapReduce任务,Map阶段和Reduce阶段均由OperatorTree组成。

基本的操作符包括:

  • TableScanOperator
  • SelectOperator
  • FilterOperator
  • JoinOperator
  • GroupByOperator
  • ReduceSinkOperator`

Operator在Map Reduce阶段之间的数据传递都是一个流式的过程。每一个Operator对一行数据完成操作后之后将数据传递给childOperator计算。

由于Join/GroupBy/OrderBy均需要在Reduce阶段完成,所以在生成相应操作的Operator之前都会先生成一个ReduceSinkOperator,将字段组合并序列化为Reduce Key/value, Partition Key。

阶段四:优化逻辑执行计划

Hive中的逻辑查询优化可以大致分为以下几类:

  • 投影修剪
  • 推导传递谓词
  • 谓词下推
  • 将Select-Select,Filter-Filter合并为单个操作
  • 多路 Join
  • 查询重写以适应某些列值的Join倾斜

阶段五:生成物理执行计划

生成物理执行计划即是将逻辑执行计划生成的OperatorTree转化为MapReduce Job的过程,主要分为下面几个阶段:

  1. 对输出表生成MoveTask
  2. 从OperatorTree的其中一个根节点向下深度优先遍历
  3. ReduceSinkOperator标示Map/Reduce的界限,多个Job间的界限
  4. 遍历其他根节点,遇过碰到JoinOperator合并MapReduceTask
  5. 生成StatTask更新元数据
  6. 剪断Map与Reduce间的Operator的关系

阶段六:优化物理执行计划

Hive中的物理优化可以大致分为以下几类:

  • 分区修剪(Partition Pruning)
  • 基于分区和桶的扫描修剪(Scan pruning)
  • 如果查询基于抽样,则扫描修剪
  • 在某些情况下,在 map 端应用 Group By
  • 在 mapper 上执行 Join
  • 优化 Union,使Union只在 map 端执行
  • 在多路 Join 中,根据用户提示决定最后流哪个表
  • 删除不必要的 ReduceSinkOperators
  • 对于带有Limit子句的查询,减少需要为该表扫描的文件数
  • 对于带有Limit子句的查询,通过限制 ReduceSinkOperator 生成的内容来限制来自 mapper 的输出
  • 减少用户提交的SQL查询所需的Tez作业数量
  • 如果是简单的提取查询,避免使用MapReduce作业
  • 对于带有聚合的简单获取查询,执行不带 MapReduce 任务的聚合
  • 重写 Group By 查询使用索引表代替原来的表
  • 当表扫描之上的谓词是相等谓词且谓词中的列具有索引时,使用索引扫描

经过以上六个阶段,SQL 就被解析映射成了集群上的 MapReduce 任务。

SQL编译成MapReduce具体原理

在阶段五-生成物理执行计划,即遍历 OperatorTree,翻译为 MapReduce 任务,这个过程具体是怎么转化的呢

我们接下来举几个常用 SQL 语句转化为 MapReduce 的具体步骤:

Join的实现原理

以下面这个SQL为例,讲解 join 的实现:

select u.name, o.orderid from order o join user u on o.uid = u.uid;

在map的输出value中为不同表的数据打上tag标记,在reduce阶段根据tag判断数据来源。MapReduce的过程如下:

MapReduce CommonJoin的实现

Group By的实现原理

以下面这个SQL为例,讲解 group by 的实现:

select rank, isonline, count(*) from city group by rank, isonline;

将GroupBy的字段组合为map的输出key值,利用MapReduce的排序,在reduce阶段保存LastKey区分不同的key。MapReduce的过程如下:

MapReduce Group By的实现

Distinct的实现原理

以下面这个SQL为例,讲解 distinct 的实现:

select dealid, count(distinct uid) num from order group by dealid;

当只有一个distinct字段时,如果不考虑Map阶段的Hash GroupBy,只需要将GroupBy字段和Distinct字段组合为map输出key,利用mapreduce的排序,同时将GroupBy字段作为reduce的key,在reduce阶段保存LastKey即可完成去重:

MapReduce Distinct的实现

九、Hive千亿级数据倾斜

十、Hive企业级性能优化

十一、Hive大厂面试真题

附:九个最易出错的SQL讲解

因篇幅有限,获取本文完整的PDF文档(带目录),请私信我,发送:hive

赞 ()