奇怪的MultipleOutputs


问题1

使用命名通道的时间和不使用命名通道的时间,两者相差很大,跑10万数据 一个要5分钟+,另一个只要10s

mos.write("Text",NullWritable.get(), value, bizType+"/part");

5分钟

mos.write(NullWritable.get(), value, bizType+"/part");

10秒

问题2

最初很明显的可以看出是map的时间差异,以为这个差异hdfs的write造成的

于是专门测试RecordWriter的写时间,发现两者差异不大,后来打算继承MultipleOutputs来测试,结果发现MultipleOutputs 大部分都是私有方法,于是重写了MultipleOutputs,绝大部分都是拷贝原来的MultipleOutputs,结果发现时间由原来的5分钟缩短为10秒,这又是咋回事?环境问题?

寻找答案

后来通过反射调用私有方法测试,才发现原来是使用了不同的版本的MultipleOutputs(服务端hadoop -mapred-0.19.1,本地hadoop-mapred-0.20.2-cdh3u0)造成的差异,而且瓶颈在0.19.1

 private TaskAttemptContext getContext(String nameOutput) throws IOException {
    // The following trick leverages the instantiation of a record writer via
    // the job thus supporting arbitrary output formats.
    Job job = new Job(context.getConfiguration());
    job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput));
    job.setOutputKeyClass(getNamedOutputKeyClass(context, nameOutput));
    job.setOutputValueClass(getNamedOutputValueClass(context, nameOutput));
    TaskAttemptContext taskContext = new TaskAttemptContext(
      job.getConfiguration(), context.getTaskAttemptID());
    return taskContext;
  }

就是

Job job = new Job(context.getConfiguration());

这行代码,在0.19.1的版本中Job的初始化比较复杂和耗时

那为什么0.20.2-cdh3u0版没这个问题? 再看看0.20.2-cdh3u0的该实现

   private TaskAttemptContext getContext(String nameOutput) throws IOException {

    TaskAttemptContext taskContext = taskContexts.get(nameOutput);

    if (taskContext != null) {
      return taskContext;
    }

    // The following trick leverages the instantiation of a record writer via
    // the job thus supporting arbitrary output formats.
    Job job = new Job(context.getConfiguration());
    job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput));
    job.setOutputKeyClass(getNamedOutputKeyClass(context, nameOutput));
    job.setOutputValueClass(getNamedOutputValueClass(context, nameOutput));
    taskContext = new TaskAttemptContext(
      job.getConfiguration(), context.getTaskAttemptID());

    taskContexts.put(nameOutput, taskContext);

    return taskContext;
  }

该实现首先缓存了taskContext,也就是不会重复的构造Job事例 另外这个版本的Job已经退化成了Conf,舍弃了JobClient,已经很轻了, 所以重复调用getContext没有开销

这个就解答了第二个问题,再看看第一个问题

  • 不使用namedOutput的write

    public void write(KEYOUT key, VALUEOUT value, String baseOutputPath) 
        throws IOException, InterruptedException {
      checkBaseOutputPath(baseOutputPath);
      TaskAttemptContext taskContext = new TaskAttemptContext(
        context.getConfiguration(), context.getTaskAttemptID());
      getRecordWriter(taskContext, baseOutputPath).write(key, value);
    }
  • 使用namedOutput的write

    public <K, V> void write(String namedOutput, K key, V value,
        String baseOutputPath) throws IOException, InterruptedException {
      checkNamedOutputName(context, namedOutput, false);
      checkBaseOutputPath(baseOutputPath);
      if (!namedOutputs.contains(namedOutput)) {
        throw new IllegalArgumentException("Undefined named output '" +
          namedOutput + "'");
      }
      TaskAttemptContext taskContext = getContext(namedOutput);
      getRecordWriter(taskContext, baseOutputPath).write(key, value);
    }

差异明显就在不用namedOutput的write使用了getContext方法

最后的结论

  • 老版(0.19.1)的MultipleOutputs的使用namedOutput的write方法重复调用有问题,导致使用namedOutput的write和不用namedOutput的write有很大差异
  • 新版(0.20.2-cdh3u0)已经做了性能上的优化,所以不存在上述问题

附上代码

public class MultiFile {

    public static class MapClass extends
            Mapper<LongWritable, Text, NullWritable, Text> {

        private MultipleOutputs<NullWritable, Text> mos;

        protected void setup(Context context) throws IOException,
                InterruptedException {
            mos = new MultipleOutputs<NullWritable, Text>(context);
        }

        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();

            //===write your biz start=====
            String[] tokens = line.split("\u0001");
            if (tokens.length < 13) {
                return;
            }
            String bizType = tokens[12];

            if ("\\N".equals(bizType)) {
                bizType = "empty";
            }
            //===write your biz end====

            mos.write(NullWritable.get(), value, bizType + "/part");

        }

        protected void cleanup(Context context) throws IOException,
                InterruptedException {

            mos.close();
        }

    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        // conf.set(MyFileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
        // "false");
        Job job = new Job(conf, "mutifile");
        job.setJarByClass(MultiFile.class);
        job.setMapperClass(MapClass.class);
        job.setNumReduceTasks(0);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}

该例子将一个或者多个输入文件,按照bizType(一行中的某个字段)进行分类输出, 比如所有的输入中的bizType有3个(1,2,3),那么输出会是这样

${outputpath}/1/part-m-00000
${outputpath}/2/part-m-00000
${outputpath}/3/part-m-00000
${outputpath}/part-m-00000

可以看到有多少个bizType,就会有有多少个目录,目录里面就是输出文件,本例样本数据少,所以只有一个输出文件,另外除了目录之外,还多了一个和目录平级的part-m-00000,大小为0,这就是默认的输出


相关内容