控制Hive MAP个数详解


Hive的MAP数或者说MAPREDUCE的MAP数是由谁来决定的呢?inputsplit size,那么对于每一个inputsplit size是如何计算出来的,这是做MAP数调整的关键.
Hadoop给出了Inputformat接口用于描述输入数据的格式,其中一个关键的方法就是getSplits,对输入的数据进行分片.
Hive对InputFormat进行了封装:

而具体采用的实现是由参数hive.input.format来决定的,主要使用2中类型HiveInputFormat和CombineHiveInputFormat.
对于HiveInputFormat来说:


 public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
    //扫描每一个分区
    for (Path dir : dirs) {
      PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir);
    //获取分区的输入格式
      Class inputFormatClass = part.getInputFileFormatClass();
      InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
    //按照相应格式的分片算法获取分片
    //注意:这里的Inputformat只是old version API:org.apache.hadoop.mapred而不是org.apache.hadoop.mapreduce,因此不能采用新的API,否则在查询时会报异常:Input format must implement InputFormat.区别就是新的API的计算inputsplit size(Math.max(minSize, Math.min(maxSize, blockSize))和老的(Math.max(minSize, Math.min(goalSize, blockSize)))不一样;
      InputSplit[] iss = inputFormat.getSplits(newjob, numSplits / dirs.length);
      for (InputSplit is : iss) {
    //封装结果,返回
        result.add(new HiveInputSplit(is, inputFormatClass.getName()));
      }
    }
    return result.toArray(new HiveInputSplit[result.size()]);
}

 

对于CombineHiveInputFormat来说的计算就比较复杂了:


 public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
    //加载CombineFileInputFormatShim,这个类继承了org.apache.hadoop.mapred.lib.CombineFileInputFormat
    CombineFileInputFormatShim combine = ShimLoader.getHadoopShims()
        .getCombineFileInputFormat();
if (combine == null) {
//若为空则采用HiveInputFormat的方式,下同
      return super.getSplits(job, numSplits);
    }
    Path[] paths = combine.getInputPathsShim(job);
for (Path path : paths) {
//若是外部表,则按照HiveInputFormat方式分片
      if ((tableDesc != null) && tableDesc.isNonNative()) {
        return super.getSplits(job, numSplits);
      }
      Class inputFormatClass = part.getInputFileFormatClass();
      String inputFormatClassName = inputFormatClass.getName();
      InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
      if (this.mrwork != null && !this.mrwork.getHadoopSupportsSplittable()) {
        if (inputFormat instanceof TextInputFormat) {
        if ((new CompressionCodecFactory(job)).getCodec(path) != null)
//在未开启hive.hadoop.supports.splittable.combineinputformat(MAPREDUCE-1597)参数情况下,对于TextInputFormat并且为压缩则采用HiveInputFormat分片算法
                    return super.getSplits(job, numSplits);
        }
      }
    //对于连接式同上
      if (inputFormat instanceof SymlinkTextInputFormat) {
        return super.getSplits(job, numSplits);
      }
      CombineFilter f = null;
      boolean done = false;
Path filterPath = path;
//由参数hive.mapper.cannot.span.multiple.partitions控制,默认false;如果没true,则对每一个partition创建一个pool,以下省略为true的处理;对于同一个表的同一个文件格式的split创建一个pool为combine做准备;
      if (!mrwork.isMapperCannotSpanPartns()) {
        opList = HiveFileFormatUtils.doGetWorksFromPath(
                  pathToAliases, aliasToWork, filterPath);
        f = poolMap.get(new CombinePathInputFormat(opList, inputFormatClassName));
      }
      if (!done) {
        if (f == null) {
          f = new CombineFilter(filterPath);
          combine.createPool(job, f);
        } else {
          f.addPath(filterPath);
        }
      }
    }
if (!mrwork.isMapperCannotSpanPartns()) {
//到这里才调用combine的分片算法,继承了org.apache.hadoop.mapred.lib.CombineFileInputFormat extends 新版本CombineFileInputformat
      iss = Arrays.asList(combine.getSplits(job, 1));
}
//对于sample查询特殊处理
    if (mrwork.getNameToSplitSample() != null && !mrwork.getNameToSplitSample().isEmpty()) {
      iss = sampleSplits(iss);
}
//封装结果返回
    for (InputSplitShim is : iss) {
      CombineHiveInputSplit csplit = new CombineHiveInputSplit(job, is);
      result.add(csplit);
    }
    return result.toArray(new CombineHiveInputSplit[result.size()]);
  }

更多详情见请继续阅读下一页的精彩内容

Hive 的详细介绍:请点这里
Hive 的下载地址:请点这里

基于Hadoop集群的Hive安装

Hive内表和外表的区别

Hadoop + Hive + Map +reduce 集群安装部署

Hive本地独立模式安装

Hive学习之WordCount单词统计

  • 1
  • 2
  • 下一页

相关内容