奇怪的MultipleOutputs
奇怪的MultipleOutputs
问题1
使用命名通道的时间和不使用命名通道的时间,两者相差很大,跑10万数据 一个要5分钟+,另一个只要10s
mos.write("Text",NullWritable.get(), value, bizType+"/part");
5分钟
mos.write(NullWritable.get(), value, bizType+"/part");
10秒
问题2
最初很明显的可以看出是map的时间差异,以为这个差异hdfs的write造成的
于是专门测试RecordWriter的写时间,发现两者差异不大,后来打算继承MultipleOutputs来测试,结果发现MultipleOutputs 大部分都是私有方法,于是重写了MultipleOutputs,绝大部分都是拷贝原来的MultipleOutputs,结果发现时间由原来的5分钟缩短为10秒,这又是咋回事?环境问题?
寻找答案
后来通过反射调用私有方法测试,才发现原来是使用了不同的版本的MultipleOutputs(服务端hadoop -mapred-0.19.1,本地hadoop-mapred-0.20.2-cdh3u0)造成的差异,而且瓶颈在0.19.1
private TaskAttemptContext getContext(String nameOutput) throws IOException {
// The following trick leverages the instantiation of a record writer via
// the job thus supporting arbitrary output formats.
Job job = new Job(context.getConfiguration());
job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput));
job.setOutputKeyClass(getNamedOutputKeyClass(context, nameOutput));
job.setOutputValueClass(getNamedOutputValueClass(context, nameOutput));
TaskAttemptContext taskContext = new TaskAttemptContext(
job.getConfiguration(), context.getTaskAttemptID());
return taskContext;
}
就是
Job job = new Job(context.getConfiguration());
这行代码,在0.19.1的版本中Job的初始化比较复杂和耗时
那为什么0.20.2-cdh3u0版没这个问题? 再看看0.20.2-cdh3u0的该实现
private TaskAttemptContext getContext(String nameOutput) throws IOException {
TaskAttemptContext taskContext = taskContexts.get(nameOutput);
if (taskContext != null) {
return taskContext;
}
// The following trick leverages the instantiation of a record writer via
// the job thus supporting arbitrary output formats.
Job job = new Job(context.getConfiguration());
job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput));
job.setOutputKeyClass(getNamedOutputKeyClass(context, nameOutput));
job.setOutputValueClass(getNamedOutputValueClass(context, nameOutput));
taskContext = new TaskAttemptContext(
job.getConfiguration(), context.getTaskAttemptID());
taskContexts.put(nameOutput, taskContext);
return taskContext;
}
该实现首先缓存了taskContext
,也就是不会重复的构造Job
事例
另外这个版本的Job已经退化成了Conf,舍弃了JobClient,已经很轻了, 所以重复调用getContext
没有开销
这个就解答了第二个问题,再看看第一个问题
-
不使用namedOutput的write
public void write(KEYOUT key, VALUEOUT value, String baseOutputPath) throws IOException, InterruptedException { checkBaseOutputPath(baseOutputPath); TaskAttemptContext taskContext = new TaskAttemptContext( context.getConfiguration(), context.getTaskAttemptID()); getRecordWriter(taskContext, baseOutputPath).write(key, value); }
-
使用namedOutput的write
public <K, V> void write(String namedOutput, K key, V value, String baseOutputPath) throws IOException, InterruptedException { checkNamedOutputName(context, namedOutput, false); checkBaseOutputPath(baseOutputPath); if (!namedOutputs.contains(namedOutput)) { throw new IllegalArgumentException("Undefined named output '" + namedOutput + "'"); } TaskAttemptContext taskContext = getContext(namedOutput); getRecordWriter(taskContext, baseOutputPath).write(key, value); }
差异明显就在不用namedOutput的write使用了getContext
方法
最后的结论
- 老版(0.19.1)的MultipleOutputs的使用namedOutput的write方法重复调用有问题,导致使用namedOutput的write和不用namedOutput的write有很大差异
- 新版(0.20.2-cdh3u0)已经做了性能上的优化,所以不存在上述问题
附上代码
public class MultiFile {
public static class MapClass extends
Mapper<LongWritable, Text, NullWritable, Text> {
private MultipleOutputs<NullWritable, Text> mos;
protected void setup(Context context) throws IOException,
InterruptedException {
mos = new MultipleOutputs<NullWritable, Text>(context);
}
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
//===write your biz start=====
String[] tokens = line.split("\u0001");
if (tokens.length < 13) {
return;
}
String bizType = tokens[12];
if ("\\N".equals(bizType)) {
bizType = "empty";
}
//===write your biz end====
mos.write(NullWritable.get(), value, bizType + "/part");
}
protected void cleanup(Context context) throws IOException,
InterruptedException {
mos.close();
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// conf.set(MyFileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
// "false");
Job job = new Job(conf, "mutifile");
job.setJarByClass(MultiFile.class);
job.setMapperClass(MapClass.class);
job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
该例子将一个或者多个输入文件,按照bizType(一行中的某个字段)进行分类输出, 比如所有的输入中的bizType有3个(1,2,3),那么输出会是这样
${outputpath}/1/part-m-00000
${outputpath}/2/part-m-00000
${outputpath}/3/part-m-00000
${outputpath}/part-m-00000
可以看到有多少个bizType,就会有有多少个目录,目录里面就是输出文件,本例样本数据少,所以只有一个输出文件,另外除了目录之外,还多了一个和目录平级的part-m-00000,大小为0,这就是默认的输出
评论暂时关闭