Hadoop Java程序-files功能测试


之前一直用Hadoop streaming方式,-file功能非常实用,可以动态上传文件,例如一些配置文件等。之后开始寻找java程序中的-file功能,费了很大功夫,一直没有测试通过。

后来发现GenericOptionsParser能解析一些特有命令参数,并且做相应处理,例如:遇到-files参数时,将文件上传到mapper节点。经过测试,-files命令参数必须在hadoop jar后紧接着,这个可以通过streaming来查看使用规范,如下:

Usage: $HADOOP_HOME/bin/hadoop jar \
          $HADOOP_HOME/hadoop-streaming.jar [options]
Options:
  -input    <path>    DFS input file(s) for the Map step
  -output  <path>    DFS output directory for the Reduce step
  -mapper  <cmd|JavaClassName>      The streaming command to run
  -combiner <cmd|JavaClassName> The streaming command to run
  -reducer  <cmd|JavaClassName>      The streaming command to run
  -file    <file>    File/dir to be shipped in the Job jar file.
 Deprecated. Use generic option "-files" instead
  -inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional.
  -outputformat TextOutputFormat(default)|JavaClassName  Optional.
  -partitioner JavaClassName  Optional.
  -numReduceTasks <num>  Optional.
  -inputreader <spec>  Optional.
  -cmdenv  <n>=<v>    Optional. Pass env.var to streaming commands
  -mapdebug <path>  Optional. To run this script when a map task fails
  -reducedebug <path>  Optional. To run this script when a reduce task fails
  -io <identifier>  Optional.
  -lazyOutput Optional. Lazily create Output
  -verbose


Generic options supported are
-conf <configuration file>    specify an application configuration file
-D <property=value>            use value for given property
-fs <local|namenode:port>      specify a namenode
-jt <local|jobtracker:port>    specify a job tracker
-files <comma separated list of files>    specify comma separated files to be copied to the map reduce cluster
-libjars <comma separated list of jars>    specify comma separated jar files to include in the classpath.
-archives <comma separated list of archives>    specify comma separated archives to be unarchived on the compute machines.

The general command line syntax is
bin/hadoop command [genericOptions] [commandOptions]

hadoop 执行java程序也需要遵循该命令参数规范,特别是-D -libjars -files等参数。

测试代码:

package wordcount.com.cn; 
 
import java.io.BufferedReader; 
import java.io.FileReader; 
import java.io.IOException; 
import java.util.ArrayList; 
import java.util.List; 
 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.util.GenericOptionsParser; 
 
@SuppressWarnings("deprecation") 
public class WordCount { 
     
     
     
    static class SimpleMapper extends Mapper<LongWritable,Text,Text,Text> 
    { 
        BufferedReader reader = null; 
        List<String> lines = new ArrayList<String>(); //简单测试,没有任何业务逻辑 
         
        public void setup(Context context) throws IOException 
        { 
            FileReader fr = new FileReader("test_upload_file");  //必须和上传文件名一致 
            reader = new BufferedReader(fr); 
             
            String line = null; 
            while((line = reader.readLine()) != null) 
                lines.add(line); 
            System.out.println(lines); 
        } 
        @Override 
        public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException 
        { 
            for(String line:lines) 
                context.write(new Text("key"),new Text(line)); 
        } 
    } 
     
    static class SimpleReducer extends Reducer<Text,Text,Text,Text> 
    { 
        public void reduce(Text key, Iterable<Text> values,, Context context)throws IOException, InterruptedException 
        { 
            for(Text value: values) 
                    { 
                        context.write(key, value); 
                    } 
        } 
    } 
     
 
    /**
    * @param args
    * @throws IOException 
    * @throws InterruptedException 
    * @throws ClassNotFoundException 
    */ 
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 
        // TODO Auto-generated method stub 
        Configuration conf = new Configuration(); 
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 
        for (String s:otherArgs) 
            System.out.println(s); 
        if (otherArgs.length != 2) { 
          System.err.println("Usage: Wordcount -files test_upload_file input output"); 
          System.exit(2); 
        } 
         
        Job job = new Job(conf); 
        job.setJarByClass(WordCount.class); 
         
        FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 
 
        job.setNumReduceTasks(0); 
        job.setMapperClass(SimpleMapper.class); 
        job.setReducerClass(SimpleReducer.class); 
         
        job.setOutputKeyClass(Text.class); 
        job.setOutputValueClass(Text.class); 
         
        System.exit(job.waitForCompletion(true)? 0: 1); 
       
    } 
 

执行测试:

hadoop jar WordCount.jar -files test_upload_file  /user/lmc/tmp/input /user/lmc/tmp/output

测试通过,告捷!

相关内容