Hive

Hive

背景

  • 引入原因:
    • 对存在HDFS上的文件或HBase中的表进行查询时,是要手工写一堆MapReduce代码
    • 对于统计任务,只能由动MapReduce的程序员才能搞定
    • 耗时耗力,更多精力没有有效的释放出来
  • Hive基于一个统一的查询分析层,通过SQL语句的方式对HDFS上的数据进行查询、统计和分析

Hive是什么?

  • Hive是一个SQL解析引擎,将SQL语句转译成MR Job,然后再Hadoop平台上运行,达到快速开发的目的。
  • Hive中的表是纯逻辑表,就只是表的定义等,即表的元数据。本质就是Hadoop的目录/文件,达到了元数据与数据存储分离的目的
  • Hive本身不存储数据,它完全依赖HDFS和MapReduce。
  • Hive的内容是读多写少,不支持对数据的改写和删除(0.14版本之后支持,但默认未开启,需手动配置开启)
  • Hive中没有定义专门的数据格式,由用户指定,需要指定三个属性:
    • 列分隔符 : 空格、\t 、\001
    • 行分隔符 : \n
    • 读取文件数据的方法 : TextFile、SequenceFile(二进制)

      SequenceFile(二进制): 是Hadoop提供的一种二进制文件,<key,Value>形式序列化到文件中,java Writeable接口进行序列化和反序列化
      RcFile: 是Hive专门退出的,一种面向列的数据格式

Hive中的SQL与传统SQL区别

  • UDF/UDAF/UDTF

  • 都是函数

  • UDF: 直接应用于slecet语句,通常查询的时候,需要对字段做一些格式化处理(大小写转换),特点:一进一出,1比1

  • UDAF: 多对一

  • UDTF: 一对多

  • 读时模式: 只有hive读的时候才会检查,解析字段和schema

    • 有点: load data非常迅速,因为在写的过程中是不需要解析数据
  • 写时模式:

    • 缺点:写的慢,需要建立索引、压缩、数据一致性、字段检查等等
    • 有点: 读的时候会得到优化

与传统关系数据库特点比较

  • hive和关系数据库存储文件的系统不同,hive使用的是hadoop的HDFS(hadoop的分布式文件系统),关系数据库则是服务器本地的文件系统;
  • hive使用的计算模型是mapreduce,而关系数据库则是自己设计的计算模型;
  • 关系数据库都是为实时查询的业务进行设计的,而hive则是为海量数据做数据挖掘设计的,实时性很差
  • Hive很容易扩展自己的存储能力和计算能力,这个是继承hadoop的,而关系数据库在这个方面要比数据库差很多。

Hive体系架构

  • 用户接口: client终端
  • 语句转换: sql -> mapreduce
  • Hive本身不会生成mr,而是通过一个执行计划来执行mr(xml文件->mapper、reducer模块),默认本地数据库:derby(单用户模式常用)

Hive数据管理

  • Hive支持4个数据模型:
    • Table 内部表
    • External Table 外部表
    • Partition 分区表
    • Bucket 分区表,分桶表
name HDFS Directory
table mobile_user /lbs/mobile_user
Partition action = insight, day= 20131020 /lbs/mobile_user/action=insight/day=20131020
Bucket clustedby user into 32 buckets /lbs/mobile_user/action=insight/day=20131020/part-00000
  • hive的表本质就是Hadoop的目录/文件

    • hive默认表存放路径一般都是在你工作目录的hive目录里面,按表名做文件夹分开,如果你有分区表的话,分区值是子文件夹,可以直接在其它的M/R job里直接应用这部分数据
  • 特点:

    • 在导入数据到外部表,数据并没有移动到自己的数据仓库目录下,也就是说外部表中的数据并不是由它自己来管理的!而表则不一样;
    • 在删除表的时候,Hive将会把属于表的元数据和数据全部删掉;而删除外部表的时候,Hive仅仅删除外部表的元数据,数据是不会删除的!
  • Hive中的Partition

    • 在Hive 中,表中的一个Partition 对应于表下的一个目录,所有的Partition 的数据都存储在对应的目录中
      • 例如:pvs表中包含ds 和city 两个Partition,则
      • 对应于ds = 20090801, ctry= US 的HDFS 子目录为:/wh/pvs/ds=20090801/ctry=US;
      • 对应于ds = 20090801, ctry= CA 的HDFS 子目录为;/wh/pvs/ds=20090801/ctry=CA
    • partition是辅助查询,缩小查询范围,加快数据的检索速度和对数据按照一定的规格和条件进行管理。
  • Hive中的Bucket

    • hive中table可以拆分成partition,table和partition可以通过‘CLUSTERED BY ’进一步分bucket,bucket中的数据可以通过‘SORT BY’排序。
    • create table bucket_user(id int,namestring)clustered by (id) into 4 buckets;
    • ‘set hive.enforce.bucketing= true’ 可以自动控制上一轮reduce的数量从而适配bucket的个数,当然,用户也可以自主设置mapred.reduce.tasks去适配bucket个数
    • Bucket主要作用:
      • 数据sampling
      • 提升某些查询操作效率,例如mapsidejoin
    • 查看sampling数据:
      • hive> select * from student tablesample(bucket 1 out of 2 on id);
      • tablesample是抽样语句,语法:TABLESAMPLE(BUCKET x OUT OF y)
      • y必须是table总bucket数的倍数或者因子。hive根据y的大小,决定抽样的比例。例如,table总共分了64份,当y=32时,抽取(64/32=)2个bucket的数据,当y=128时,抽取(64/128=)1/2个bucket的数据。x表示从哪个bucket开始抽取。例如,table总bucket数为32,tablesample(bucket 3 out of 16),表示总共抽取(32/16=)2个bucket的数据,分别为第3个bucket和第(3+16=)19个bucket的数据。

Hive数据类型

原生类型

  • TINYINT
  • SMALLINT
  • INT
  • BIGINT
  • BOOLEAN
  • FLOAT
  • DOUBLE
  • STRING
  • BINARY(Hive 0.8.0以上才可用)
  • TIMESTAMP(Hive 0.8.0以上才可用)

复合类型

  • Arrays:ARRAY
  • Maps:MAP<primitive_type, data_type>
  • Structs:STRUCT<col_name: data_type[COMMENT col_comment],……>
  • Union:UNIONTYPE<data_type, data_type,……>

Hive优化

  • Map的优化:

    • 作业会通过input的目录产生一个或者多个map任务。
    1
    set dfs.block.size(=128) ## 全局设置 ,直接影响reduce个数的参数
    • Map越多越好吗?是不是保证每个map处理接近文件块的大小?
    • 如何合并小文件,减少map数?
    1
    2
    3
    4
    set mapred.max.split.size=100000000;
    set mapred.min.split.size.per.node=100000000;
    set mapred.min.split.size.per.rack=100000000;
    set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
    • 如何适当的增加map数?
    1
    set mapred.map.tasks=10; ## 设置map个数,不一定生效,只用于参考
    • Map端聚合hive.map.aggr=true 。Mr中的Combiners
  • Reduce的优化:

    • hive.exec.reducers.bytes.per.reducer;## reduce任务处理的数据量
    1
    2
    reduce个数计算公式( hive.exec.reducers.bytes.per.reducer )
    reduce个数 = InputFileSize / bytes per reducer
    • 调整reduce的个数:
      • 设置reduce处理的数据量
      • set mapred.reduce.tasks=10 ## 设置reduce个数
    • 一个Reduce:
      • 没有group by
      • order by(可以使用distribute by和sort by)
      • 笛卡尔积
    1
    2
    3
    4
    5
    6
    7
    select pt,count(1)
    from popt_tbaccountcopy_mes
    where pt= '2012-07-04' group by pt; ## group by 增加reduce个数
    写成
    select count(1)
    from popt_tbaccountcopy_mes
    where pt= '2012-07-04';
  • 分区裁剪(partition)

    • Where中的分区条件,会提前生效,不必特意做子查询,直接Join和GroupBy
  • 笛卡尔积

    • join的时候不加on条件或者无效的on条件,Hive只能使用1个reducer来完成笛卡尔积
  • Map join

    • /*+ MAPJOIN(tablelist) */,必须是小表,不要超过1G,或者50万条记录
  • Union all

    • 先做union all再做join或group by等操作可以有效减少MR过程,尽管是多个Select,最终只有一个mr
  • Multi-insert & multi-group by

    • 从一份基础表中按照不同的维度,一次组合出不同的数据
    • FROM from_statement
    • INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1)] select_statement1 group by key1
    • INSERT OVERWRITE TABLE tablename2 [PARTITION(partcol2=val2 )] select_statement2 group by key2
    1
    2
    3
    4
    5
    6
    7
    8
    - Sql (group by key ) -> 1个MR Job
    - map相同的key -> 同一个reduce
    - 假设数据源存在数据倾斜问题
    - 10个reduce,最后大家可能都等1个reduce完成
    - mr:对key做改写来解决类似问题
    - hive.groupby.skewindata 会帮忙完成一定的负载均和
    - 1个MR job -> 2个MR job

  • Automatic merge

    • 当文件大小比阈值小时,hive会启动一个mr进行合并
    • hive.merge.mapfiles= true是否和并Map 输出文件,默认为True
    • hive.merge.mapredfiles= false是否合并Reduce 输出文件,默认为False
    • hive.merge.size.per.task= 25610001000合并文件的大小
  • Multi-Count Distinct

    • 必须设置参数:set hive.groupby.skewindata=true; ## 如果数据倾斜会默认帮助修复
    • select dt, count(distinct uniq_id), count(distinct ip)
    • from ods_logwhere dt=20170301 group by dt
  • 一个MR job

    1
    2
    3
    4
    SELECT a.val, b.val, c.val
    FROM a
    JOIN b ON (a.key= b.key1)
    JOIN c ON (a.key= c.key1)
  • 生成多个MR job

    1
    2
    3
    4
    SELECT a.val, b.val, c.val
    FROM a
    JOIN b ON (a.key= b.key1)
    JOIN c ON (c.key= b.key1)
  • Mr越少越好 ,区别on左边条件

  • hive会默认把昨天数据放到内存,右边数据做类似流数据

  • 最好每次写join的时候,小标放左边,达标放右边

  • 按照JOIN顺序中的最后一个表应该尽量是大表,因为JOIN前一阶段生成的数据会存在于Reducer的buffer中,通过stream最后面的表,直接从Reducer的buffer中读取已经缓冲的中间结果数据(这个中间结果数据可能是JOIN顺序中,前面表连接的结果的Key,数据量相对较小,内存开销就小),这样,与后面的大表进行连接时,只需要从buffer中读取缓存的Key,与大表中的指定Key进行连接,速度会更快,也可能避免内存缓冲区溢出。

    1
    2
    3
    4
    SELECT /*+ STREAMTABLE(a) */ a.val, b.val, c.valFROM a
    JOIN b ON (a.key= b.key1)
    JOIN c ON (c.key= b.key1);
    a表被视为大表
    1
    2
    3
    SELECT /*+ MAPJOIN(b) */ a.key, a.valueFROM a
    JOIN b ON a.key= b.key;
    MAPJION会把小表全部读入内存中,在map阶段直接拿另外一个表的数据和内存中表数据做匹配,由于在map是进行了join操作,省去了reduce运行的效率也会高很多.
  • Hive的Join优化——表连接顺序

  • 左连接时,左表中出现的JOIN字段都保留,右表没有连接上的都为空。

    1
    2
    3
    4
    5
    SELECT a.val, b.val
    FROM a
    LEFT OUTER JOIN b ON (a.key=b.key)
    WHERE a.ds='2009-07-07' AND b.ds='2009-07-07'
    ## 两张表连接后过滤
    1
    2
    3
    4
    5
    ASELECT a.val, b.val
    FROM a
    LEFT OUTER JOIN b
    ON (a.key=b.keyAND b.ds='2009-07-07' AND a.ds='2009-07-07')
    # on 先过滤再连接
  • 执行顺序是,首先完成2表JOIN,然后再通过WHERE条件进行过滤,这样在JOIN过程中可能会输出大量结果,再对这些结果进行过滤,比较耗时。可以进行优化,将WHERE条件放在ON后,在JOIN的过程中,就对不满足条件的记录进行了预先过滤。

  • Hive的优化——并行执行

  • 并行实行:

    • 同步执行hive的多个阶段,hive在执行过程,将一个查询转化成一个或者多个阶段。某个特定的job可能包含众多的阶段,而这些阶段可能并非完全相互依赖的,也就是说可以并行执行的,这样可能使得整个job的执行时间缩短。hive执行开启:set hive.exec.parallel=true
  • Hive的优化——数据倾斜

  • 操作

    • Join
    • Group by
    • Count Distinct
  • 原因

    • key分布不均导致的
    • 人为的建表疏忽
    • 业务数据特点
  • 症状

    • 任务进度长时间维持在99%(或100%),查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成。
    • 查看未完成的子任务,可以看到本地读写数据量积累非常大,通常超过10GB可以认定为发生数据倾斜。
  • 倾斜度

    • 平均记录数超过50w且最大记录数是超过平均记录数的4倍。
    • 最长时长比平均时长超过4分钟,且最大时长超过平均时长的2倍。
  • 万能方法

    • hive.groupby.skewindata=true
  • Hive的优化——数据倾斜——大小表关联

  • 原因

    • Hive在进行join时,按照join的key进行分发,而在join左边的表的数据会首先读入内存,如果左边表的key相对分散,读入内存的数据会比较小,join任务执行会比较快;而如果左边的表key比较集中,而这张表的数据量很大,那么数据倾斜就会比较严重,而如果这张表是小表,则还是应该把这张表放在join左边。
  • 思路

    • 将key相对分散,并且数据量小的表放在join的左边,这样可以有效减少内存溢出错误发生的几率
    • 使用map join让小的维度表先进内存。
  • 方法

    • Small_tablejoin big_table
  • Hive的优化——数据倾斜——大大表关联

  • 原因

    • 日志中有一部分的userid是空或者是0的情况,导致在用user_id进行hash分桶的时候,会将日志中userid为0或者空的数据分到一起,导致了过大的斜率。
  • 思路

    • 把空值的key变成一个字符串加上随机数,把倾斜的数据分到不同的reduce上,由于null值关联不上,处理后并不影响最终结果。
  • 方法

    • on case when (x.uid= ‘-‘ or x.uid= ‘0‘ or x.uidis null) then concat(‘dp_hive_search’,rand()) else x.uidend = f.user_id;
  • Hive的优化——数据倾斜——大大表关联(业务削减)

  • 案例

    1
    2
    3
    4
    5
    6
    7
        Select * from dw_logt join dw_usert1 on t.user_id=t1.user_id
    ````

    - 现象:两个表都上千万,跑起来很悬
    - 思路
    - 当天登陆的用户其实很少
    - 方法

Select/+MAPJOIN(t12)/ *
from dw_logt11
join (
select/+MAPJOIN(t)/ t1.*
from (
select user_idfrom dw_loggroup by user_id
) t
join dw_usert1
on t.user_id=t1.user_id
) t12
on t11.user_id=t12.user_id

1
2
3
4
5
6
7
8
9
10
11



- **Hive的优化——数据倾斜——聚合时存在大量特殊值**
- 原因
- 做count distinct时,该字段存在大量值为NULL或空的记录。
- 思路
- count distinct时,将值为空的情况单独处理,如果是计算count distinct,可以不用处理,直接过滤,在最后结果中加1。
- 如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union
- 方法

select cast(count(distinct(user_id))+1 as bigint) as user_cnt
from tab_a
where user_idis not null and user_id<> ‘’

1
2
3

- **Hive的优化——数据倾斜——空间换时间**
- 案例

Select day,count(distinct session_id),count(distinct user_id) from log a group by day

1
2
3
- 问题
- 同一个reduce上进行distinct操作时压力很大,distinct 会把所有数据汇聚到一个节点上
- 方法

select day,
count(case when type=’session’ then 1 else null end) as session_cnt,
count(case when type=’user’ then 1 else null end) as user_cnt
from (
select day,session_id,type
from (
select day,session_id,’session’ as type
from log
union all
select day user_id,’user’ as type
from log
)
group by day,session_id,type
) t1
group by day