Hive SQL优化
发布日期:2022-02-14 23:02:50 浏览次数:44 分类:技术文章

本文共 6142 字,大约阅读时间需要 20 分钟。

参考文档

基本思想

  • HQL优化,本质是对MR/RDD的优化,可以通过explain hql来查看执行计划

常见思路

  • 表优化
    • 分区(分区多,文件多)
      • 静态分区
      • 动态分区
        set hive.exec.dynamic.partition=true;set hive.exec.dynamic.partition.mode=nonstrict;
    • 分桶
      • 规则:对分桶字段值进行哈希,哈希值除以桶的个数求余,余数决定了该条记录在哪个桶中,也就是余数相同的在一个桶中
      • 好处
        • 方便抽样
        • 提高join的效率:若join的两张表有相同的列,且该列均进行了分桶,则join时将相同值的桶进行join操作即可,大大减少join的数据量
      set hive.enforce.bucketing=true;set hive.enforce.sorting=true;  # 开启强制排序,插数据到表中会进行强制排序,默认false
  • query优化
    • IO
      • 只查询需要的列
      • 尽可能用分区表,避免全表扫描
    • 数据倾斜
      • join
        set hive.optimize.skewjoin=true;  # 如果是join过程中出现倾斜 应该设置为trueset hive.skewjoin.key=100000;   # 这个是join的键对应的记录条数,超过这个值则会进行优化
      • group by
        set hive.group.skewindata=true;   # 如果是group by过程出现倾斜,应该设置为trueset hive.groupby.mapaggr.checkinterval=100000;  # 这个是group的键对应的记录条数超过这个值则会进行优化
      • count distinct:执行的MR是以GroupBy分组,再对distinct列排序,然后输出交给Reduce,所以,在reduce之前,本地的map已经完成预计算,并且提前做了一次聚合运算,如果此时的distinct造成了数据不平衡,则reduce时就会出现计算的数据量有大有小,即数据倾斜
        • 解决方法:考虑用group by的子查询来替换count(distinct)
          # 替换前select count(distinct id) from tablename;select a, sum(b), count(distinct c), count(distinct d) from test group by a;# 替换后select count(1) from (select id from tablename group by id) tmp;select a, sum(b) as b, count(c) as c, count(d) as dfrom(select a,0 as b, c, null as d from test group by a,cunion allselect a,0  as b, null as c, d from test group by a, dunion allselect a,b,null as c, null as d from test)tmp1 group by a;          ```
      • 常见值(null, 0, 1, -1, -99等默认值):如果常见值的占比比较大时,较容易出现数据倾斜
    • 表关联
      • 大表放后:MR从后往前构建数据
      • 同列关联:如可能,用同一列关联 同列关联,无论关联多少表都是一个Map搞定,如果不是同列,就会新开一个MR
      • mapjoin
        • 关联操作中有一张小表
        • 不等值的连接操作
        set hive.auto.convert.join=true;  #  hive.mapjoin.smalltable.filesize默认值是25mb,小表小于25mb自动启动mapjoin
      • bucket join
        • 两个表以相同方式划分桶
        • 两个表的桶个数是倍数关系
        # 优化前select m.cid, u.idfrom order m  join customer u on m.cid = u.idwhere m.dt='2018-06-08'# 优化后: where条件放在map端,而不是reduce端select m.cid, u.idfrom  (select cid from order where dt = '2018-06-08') m join customer u on m.cid = u.id;
    • Hive job优化
      • 并行化执行:关联性不大的阶段,可以并行化执行
        set hive.exec.parallel=true;set hive.exec.parallel.thread.number=8;
      • 本地化执行
        set hive.exec.mode.local.auto=true;
        • join满足如下条件才可用本地模式
          • job的输入数据大小必须小于参数:hive.exec.mode.local.auto.inputbytes.max(默认128MB)
          • job的map必须小于参数:hive.exec.mode.local.auto.tasks.max(默认4)
          • job的reduce数必须为0或1
      • 小文件合并:合并文件数由mapred.max.split.size限制的大小决定
        • 合并输入
          set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
        • 合并输出
          set hive.merge.smallfiles.avgsize=256000000;  # 当输出文件平均大小小于该值,启动新job合并文件set hive.merge.size.per.task=64000000;  # 合并之后的文件大小
    • JVM重利用:Job长时间保留slot,直到作业结束。若有较多任务和较多小文件的任务时,该参数很有意义:减少执行时间。但此值不宜过大,有些作业会有reduce任务,一旦没有完成,则map占用slot不会释放,其它作业需要等待
      set mapred.job.reuse.jvm.num.tasks=20;
    • 压缩数据
      • 中间压缩:处理hive查询的多个job之间的数据,进行压缩
        set hive.exec.compress.intermediate=true;set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;set hive.intermediate.compression.type=BLOCK;
      • 输出压缩
        set hive.exec.compress.output=true;set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;set mapred.output.compression.type=BLOCK;
    • Hive Map优化
      • map参数
        • 原则
          • 如果想增加map个数,则设置mapred.map.tasks为一个较大的值
          • 如果想减少map个数,则设置mapred.min.split.size为一个较大的值
          • 输入文件size巨大,但不是小文件:增大mapred.min.split.size的值
          • 输入文件数量巨大,且都是小文件,就是单个文件的size小于blockSize:增大mapred.min.split.size不可行,需要使用CombineFileInputFormat将多个input path合并成一个InputSplit送给mapper处理,从而减少mapper的数量
        • 参数
          # 默认map个数default_num = total_size / block_size;# 期望大小goal_num = mapred.map.tasks;# 设置处理的文件大小split_size=max(mapred.min.split.size, block_size)split_num = total_size / split_size;# 计算map个数compute_map_num = min(split_num, max(default_num, goal_num))          ```
      • map端聚合
        set hive.map.aggr=true;
      • 推测执行
        mapred.map.tasks.speculative.execution  # 默认为true
    • shuffle优化
      • map端参数
        # 默认100M。map节点运行没完成时,若内存数据过多,该设置就是内存缓冲的大小,在shuffle之前该项定了map输出结果在内存占用buffer的大小,当buffer达到阈值,则启动后台线程对buffer内存sort,然后spill到硬盘io.sort.mb  # 上面参数buffer的阈值,默认0.8(80%)io.sort.spill.percent  # 默认值3。当spill数量不低于该值时,则combiner函数会在merge产生结果文件之间运行min.num.spill.for.combine  # 默认10。当一个map task执行完成后,本地磁盘上有若干spill文件,map task最后一件事就是执行merge sort,执行时每次同时打开多个spill文件,同时打开的文件数量由该值决定。说明:打开的文件越多,不一定merge sort就越快,也要根据数据情况适当的调整io.sort.factor  # 默认值0.05。io.sort.mb中用来保存map output记录边界的百分比,其他缓存用来保存数据io.sort.record.percent
      • reduce端参数
        # 默认5,reduce copy数据的线程数mapred.reduce.parallel.copies  # 默认 300(s)。reduce下载线程最大等待时间mapred.reduce.copy.backoff  # 默认0.7(70%)。Reduce用来存放从Map节点取过来的数据所用的内存占堆内存的比例mapred.job.shuffle.input.buffer.percent  # 默认值0。sort完成后reduce计算阶段用来缓存数据的百分比mapred.job.reduce.input.buffer.percent
    • reduce优化
      • reduce操作
        • 聚合函数:sum, count, distinct …
        • 高级查询:group by; join; distribute by; cluster by; order by…
      • 推测执行:默认为true
        mapred.reduce.tasks.speculative.execution  # hadoophive.mapred.reduce.tasks.speculative.execution  # hive,二者效果一样,二选一即可
      • reduce优化
        set mapred.reduce.tasks=10;  # 直接设置hive.exec.reducers.max   # 默认999hive.exec.reducers.bytes.per.reducer   # 默认1G
      • 计算公式
        numTasks = min(maxReducers, input.size / perReducer)maxReducers = hive.exec.reducers.maxperReducer=hive.exec.reducers.bytes.per.reducer
    • 相关参数
      • hive
        # Hive 自动转换联接无条件(mapjoin)大小,默认20Mhive.auto.convert.join.noconditionaltask.size#  小文件平均大小合并阈值。默认16Mhive.merge.smallfiles.avgsize# Spark 执行程序最大 Java 堆栈大小。默认256Mspark.executor.memory# Spark 驱动程序最大 Java 堆栈大小。默认256Mspark.driver.memory# Spark 驱动程序内存开销。默认26Mspark.yarn.driver.memoryOverhead# Spark 驱动程序内存开销。默认26Mspark.yarn.executor.memoryOverhead
      • yarn
        # 每个作业的 Reduce 任务的默认数量,默认1mapreduce.job.reduces# 容器内存。默认8Gyarn.nodemanager.resource.memory-mb# 最大容器内存。默认64Gyarn.scheduler.maximum-allocation-mb#  最大容器虚拟 CPU 内核数量。默认32yarn.scheduler.maximum-allocation-vcores

Explain执行计划

  • 不走MR
    • SQL
      explainselect date, week, week_num, quarter, rank, pay_amt, pct_rank from home_page_pay_rankwhere year='2020'
    • explain
      1	  STAGE DEPENDENCIES:2	  Stage-0 is a root stage3	4	STAGE PLANS:5	  Stage: Stage-06	    Fetch Operator7	      limit: -18	      Processor Tree:9	        TableScan10	          alias: home_page_pay_rank11	          filterExpr: (year = '2020') (type: boolean)12	          Select Operator13	            expressions: date (type: string), week (type: string), week_num (type: string), quarter (type: string), rank (type: int), pay_amt (type: string), pct_rank (type: string)14	            outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col615	            ListSink16

转载地址:https://blog.csdn.net/fish2009122/article/details/106355012 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:Python读取配置文件之python2与python3的区别
下一篇:Python代码审查

发表评论

最新留言

路过按个爪印,很不错,赞一个!
[***.219.124.196]2024年04月13日 09时53分10秒