Hive
Hive
AlexHive
背景
- 引入原因:
- 对存在HDFS上的文件或HBase中的表进行查询时,是要手工写一堆MapReduce代码
- 对于统计任务,只能由动MapReduce的程序员才能搞定
- 耗时耗力,更多精力没有有效的释放出来
- Hive基于一个统一的查询分析层,通过SQL语句的方式对HDFS上的数据进行查询、统计和分析
Hive是什么?
- Hive是一个SQL解析引擎,将SQL语句转译成MR Job,然后再Hadoop平台上运行,达到快速开发的目的。
- Hive中的表是纯逻辑表,就只是表的定义等,即表的元数据。本质就是Hadoop的目录/文件,达到了元数据与数据存储分离的目的
- Hive本身不存储数据,它完全依赖HDFS和MapReduce。
- Hive的内容是读多写少,不支持对数据的改写和删除(0.14版本之后支持,但默认未开启,需手动配置开启)
- Hive中没有定义专门的数据格式,由用户指定,需要指定三个属性:
- 列分隔符 : 空格、\t 、\001
- 行分隔符 : \n
- 读取文件数据的方法 : TextFile、SequenceFile(二进制)
SequenceFile(二进制): 是Hadoop提供的一种二进制文件,<key,Value>形式序列化到文件中,java Writeable接口进行序列化和反序列化
RcFile: 是Hive专门退出的,一种面向列的数据格式
Hive中的SQL与传统SQL区别
UDF/UDAF/UDTF
都是函数
UDF: 直接应用于slecet语句,通常查询的时候,需要对字段做一些格式化处理(大小写转换),特点:一进一出,1比1
UDAF: 多对一
UDTF: 一对多
读时模式: 只有hive读的时候才会检查,解析字段和
schema
- 有点: load data非常迅速,因为在写的过程中是不需要解析数据
写时模式:
- 缺点:写的慢,需要建立索引、压缩、数据一致性、字段检查等等
- 有点: 读的时候会得到优化
与传统关系数据库特点比较
- hive和关系数据库存储文件的系统不同,hive使用的是hadoop的HDFS(hadoop的分布式文件系统),关系数据库则是服务器本地的文件系统;
- hive使用的计算模型是mapreduce,而关系数据库则是自己设计的计算模型;
- 关系数据库都是为实时查询的业务进行设计的,而hive则是为海量数据做数据挖掘设计的,实时性很差
- Hive很容易扩展自己的存储能力和计算能力,这个是继承hadoop的,而关系数据库在这个方面要比数据库差很多。
Hive体系架构
- 用户接口: client终端
- 语句转换: sql -> mapreduce
- Hive本身不会生成mr,而是通过一个执行计划来执行mr(xml文件->mapper、reducer模块),默认本地数据库:derby(单用户模式常用)
Hive数据管理
- Hive支持4个数据模型:
- Table 内部表
- External Table 外部表
- Partition 分区表
- Bucket 分区表,分桶表
name | HDFS Directory | |
---|---|---|
table | mobile_user | /lbs/mobile_user |
Partition | action = insight, day= 20131020 | /lbs/mobile_user/action=insight/day=20131020 |
Bucket | clustedby user into 32 buckets | /lbs/mobile_user/action=insight/day=20131020/part-00000 |
hive的表本质就是Hadoop的目录/文件
- hive默认表存放路径一般都是在你工作目录的hive目录里面,按表名做文件夹分开,如果你有分区表的话,分区值是子文件夹,可以直接在其它的M/R job里直接应用这部分数据
特点:
- 在导入数据到外部表,数据并没有移动到自己的数据仓库目录下,也就是说外部表中的数据并不是由它自己来管理的!而表则不一样;
- 在删除表的时候,Hive将会把属于表的元数据和数据全部删掉;而删除外部表的时候,Hive仅仅删除外部表的元数据,数据是不会删除的!
Hive中的Partition
- 在Hive 中,表中的一个Partition 对应于表下的一个目录,所有的Partition 的数据都存储在对应的目录中
- 例如:pvs表中包含ds 和city 两个Partition,则
- 对应于ds = 20090801, ctry= US 的HDFS 子目录为:/wh/pvs/ds=20090801/ctry=US;
- 对应于ds = 20090801, ctry= CA 的HDFS 子目录为;/wh/pvs/ds=20090801/ctry=CA
- partition是辅助查询,缩小查询范围,加快数据的检索速度和对数据按照一定的规格和条件进行管理。
- 在Hive 中,表中的一个Partition 对应于表下的一个目录,所有的Partition 的数据都存储在对应的目录中
Hive中的Bucket
- hive中table可以拆分成partition,table和partition可以通过‘CLUSTERED BY ’进一步分bucket,bucket中的数据可以通过‘SORT BY’排序。
- create table bucket_user(id int,namestring)clustered by (id) into 4 buckets;
- ‘set hive.enforce.bucketing= true’ 可以自动控制上一轮reduce的数量从而适配bucket的个数,当然,用户也可以自主设置mapred.reduce.tasks去适配bucket个数
- Bucket主要作用:
- 数据sampling
- 提升某些查询操作效率,例如mapsidejoin
- 查看sampling数据:
- hive> select * from student tablesample(bucket 1 out of 2 on id);
- tablesample是抽样语句,语法:TABLESAMPLE(BUCKET x OUT OF y)
- y必须是table总bucket数的倍数或者因子。hive根据y的大小,决定抽样的比例。例如,table总共分了64份,当y=32时,抽取(64/32=)2个bucket的数据,当y=128时,抽取(64/128=)1/2个bucket的数据。x表示从哪个bucket开始抽取。例如,table总bucket数为32,tablesample(bucket 3 out of 16),表示总共抽取(32/16=)2个bucket的数据,分别为第3个bucket和第(3+16=)19个bucket的数据。
Hive数据类型
原生类型
- TINYINT
- SMALLINT
- INT
- BIGINT
- BOOLEAN
- FLOAT
- DOUBLE
- STRING
- BINARY(Hive 0.8.0以上才可用)
- TIMESTAMP(Hive 0.8.0以上才可用)
复合类型
- Arrays:ARRAY
- Maps:MAP<primitive_type, data_type>
- Structs:STRUCT<col_name: data_type[COMMENT col_comment],……>
- Union:UNIONTYPE<data_type, data_type,……>
Hive优化
Map的优化:
- 作业会通过input的目录产生一个或者多个map任务。
1
set dfs.block.size(=128) ## 全局设置 ,直接影响reduce个数的参数
- Map越多越好吗?是不是保证每个map处理接近文件块的大小?
- 如何合并小文件,减少map数?
1
2
3
4set mapred.max.split.size=100000000;
set mapred.min.split.size.per.node=100000000;
set mapred.min.split.size.per.rack=100000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;- 如何适当的增加map数?
1
set mapred.map.tasks=10; ## 设置map个数,不一定生效,只用于参考
- Map端聚合hive.map.aggr=true 。Mr中的Combiners
Reduce的优化:
- hive.exec.reducers.bytes.per.reducer;## reduce任务处理的数据量
1
2reduce个数计算公式( hive.exec.reducers.bytes.per.reducer )
reduce个数 = InputFileSize / bytes per reducer- 调整reduce的个数:
- 设置reduce处理的数据量
- set mapred.reduce.tasks=10 ## 设置reduce个数
- 一个Reduce:
- 没有group by
- order by(可以使用distribute by和sort by)
- 笛卡尔积
1
2
3
4
5
6
7select pt,count(1)
from popt_tbaccountcopy_mes
where pt= '2012-07-04' group by pt; ## group by 增加reduce个数
写成
select count(1)
from popt_tbaccountcopy_mes
where pt= '2012-07-04';分区裁剪(partition)
- Where中的分区条件,会提前生效,不必特意做子查询,直接Join和GroupBy
笛卡尔积
- join的时候不加on条件或者无效的on条件,Hive只能使用1个reducer来完成笛卡尔积
Map join
- /*+ MAPJOIN(tablelist) */,必须是小表,不要超过1G,或者50万条记录
Union all
- 先做union all再做join或group by等操作可以有效减少MR过程,尽管是多个Select,最终只有一个mr
Multi-insert & multi-group by
- 从一份基础表中按照不同的维度,一次组合出不同的数据
- FROM from_statement
- INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1)] select_statement1 group by key1
- INSERT OVERWRITE TABLE tablename2 [PARTITION(partcol2=val2 )] select_statement2 group by key2
1
2
3
4
5
6
7
8- Sql (group by key ) -> 1个MR Job
- map相同的key -> 同一个reduce
- 假设数据源存在数据倾斜问题
- 10个reduce,最后大家可能都等1个reduce完成
- mr:对key做改写来解决类似问题
- hive.groupby.skewindata 会帮忙完成一定的负载均和
- 1个MR job -> 2个MR jobAutomatic merge
- 当文件大小比阈值小时,hive会启动一个mr进行合并
- hive.merge.mapfiles= true是否和并Map 输出文件,默认为True
- hive.merge.mapredfiles= false是否合并Reduce 输出文件,默认为False
- hive.merge.size.per.task= 25610001000合并文件的大小
Multi-Count Distinct
- 必须设置参数:set hive.groupby.skewindata=true; ## 如果数据倾斜会默认帮助修复
- select dt, count(distinct uniq_id), count(distinct ip)
- from ods_logwhere dt=20170301 group by dt
一个MR job
1
2
3
4SELECT a.val, b.val, c.val
FROM a
JOIN b ON (a.key= b.key1)
JOIN c ON (a.key= c.key1)生成多个MR job
1
2
3
4SELECT a.val, b.val, c.val
FROM a
JOIN b ON (a.key= b.key1)
JOIN c ON (c.key= b.key1)Mr越少越好 ,区别on左边条件
hive会默认把昨天数据放到内存,右边数据做类似流数据
最好每次写join的时候,小标放左边,达标放右边
按照JOIN顺序中的最后一个表应该尽量是大表,因为JOIN前一阶段生成的数据会存在于Reducer的buffer中,通过stream最后面的表,直接从Reducer的buffer中读取已经缓冲的中间结果数据(这个中间结果数据可能是JOIN顺序中,前面表连接的结果的Key,数据量相对较小,内存开销就小),这样,与后面的大表进行连接时,只需要从buffer中读取缓存的Key,与大表中的指定Key进行连接,速度会更快,也可能避免内存缓冲区溢出。
1
2
3
4SELECT /*+ STREAMTABLE(a) */ a.val, b.val, c.valFROM a
JOIN b ON (a.key= b.key1)
JOIN c ON (c.key= b.key1);
a表被视为大表1
2
3SELECT /*+ MAPJOIN(b) */ a.key, a.valueFROM a
JOIN b ON a.key= b.key;
MAPJION会把小表全部读入内存中,在map阶段直接拿另外一个表的数据和内存中表数据做匹配,由于在map是进行了join操作,省去了reduce运行的效率也会高很多.Hive的Join优化——表连接顺序
左连接时,左表中出现的JOIN字段都保留,右表没有连接上的都为空。
1
2
3
4
5SELECT a.val, b.val
FROM a
LEFT OUTER JOIN b ON (a.key=b.key)
WHERE a.ds='2009-07-07' AND b.ds='2009-07-07'
## 两张表连接后过滤1
2
3
4
5ASELECT a.val, b.val
FROM a
LEFT OUTER JOIN b
ON (a.key=b.keyAND b.ds='2009-07-07' AND a.ds='2009-07-07')
# on 先过滤再连接执行顺序是,首先完成2表JOIN,然后再通过WHERE条件进行过滤,这样在JOIN过程中可能会输出大量结果,再对这些结果进行过滤,比较耗时。可以进行优化,将WHERE条件放在ON后,在JOIN的过程中,就对不满足条件的记录进行了预先过滤。
Hive的优化——并行执行
并行实行:
- 同步执行hive的多个阶段,hive在执行过程,将一个查询转化成一个或者多个阶段。某个特定的job可能包含众多的阶段,而这些阶段可能并非完全相互依赖的,也就是说可以并行执行的,这样可能使得整个job的执行时间缩短。hive执行开启:set hive.exec.parallel=true
Hive的优化——数据倾斜
操作
- Join
- Group by
- Count Distinct
原因
- key分布不均导致的
- 人为的建表疏忽
- 业务数据特点
症状
- 任务进度长时间维持在99%(或100%),查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成。
- 查看未完成的子任务,可以看到本地读写数据量积累非常大,通常超过10GB可以认定为发生数据倾斜。
倾斜度
- 平均记录数超过50w且最大记录数是超过平均记录数的4倍。
- 最长时长比平均时长超过4分钟,且最大时长超过平均时长的2倍。
万能方法
- hive.groupby.skewindata=true
Hive的优化——数据倾斜——大小表关联
原因
- Hive在进行join时,按照join的key进行分发,而在join左边的表的数据会首先读入内存,如果左边表的key相对分散,读入内存的数据会比较小,join任务执行会比较快;而如果左边的表key比较集中,而这张表的数据量很大,那么数据倾斜就会比较严重,而如果这张表是小表,则还是应该把这张表放在join左边。
思路
- 将key相对分散,并且数据量小的表放在join的左边,这样可以有效减少内存溢出错误发生的几率
- 使用map join让小的维度表先进内存。
方法
- Small_tablejoin big_table
Hive的优化——数据倾斜——大大表关联
原因
- 日志中有一部分的userid是空或者是0的情况,导致在用user_id进行hash分桶的时候,会将日志中userid为0或者空的数据分到一起,导致了过大的斜率。
思路
- 把空值的key变成一个字符串加上随机数,把倾斜的数据分到不同的reduce上,由于null值关联不上,处理后并不影响最终结果。
方法
- on case when (x.uid= ‘-‘ or x.uid= ‘0‘ or x.uidis null) then concat(‘dp_hive_search’,rand()) else x.uidend = f.user_id;
Hive的优化——数据倾斜——大大表关联(业务削减)
案例
1
2
3
4
5
6
7Select * from dw_logt join dw_usert1 on t.user_id=t1.user_id
````
- 现象:两个表都上千万,跑起来很悬
- 思路
- 当天登陆的用户其实很少
- 方法
Select/+MAPJOIN(t12)/ *
from dw_logt11
join (
select/+MAPJOIN(t)/ t1.*
from (
select user_idfrom dw_loggroup by user_id
) t
join dw_usert1
on t.user_id=t1.user_id
) t12
on t11.user_id=t12.user_id
1 |
|
select cast(count(distinct(user_id))+1 as bigint) as user_cnt
from tab_a
where user_idis not null and user_id<> ‘’
1 |
|
Select day,count(distinct session_id),count(distinct user_id) from log a group by day
1 | - 问题 |
select day,
count(case when type=’session’ then 1 else null end) as session_cnt,
count(case when type=’user’ then 1 else null end) as user_cnt
from (
select day,session_id,type
from (
select day,session_id,’session’ as type
from log
union all
select day user_id,’user’ as type
from log
)
group by day,session_id,type
) t1
group by day