Hadoop工作原理图-WordCount示例,hadoop-wordcount


一个Mapper对应一个碎片段。

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;

import java.io.IOException;

/**
 * author: test
 * date: 2015/1/25.
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    /**
     * 输入:
     * 行所在的下标为key,类型为LongWritable
     * 行的内容为value,类型为Text
     *
     * 输出:
     * key: Text
     * value: IntWritable
     */
    //此方法循环调用,从文件的split中,读取每行调用一次,把该行所在的下标为key,以该行的值(内容)为value    protected void map(LongWritable key, Text value,
                       Context context) throws IOException, InterruptedException {
        String[] words = StringUtils.split(value.toString(), ' ');
        for (String word : words) {
            context.write(new Text(word), new IntWritable(1));
        }
    }
}

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * author: test
 * date: 2015/1/25.
 */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    /**
     * 此方法循环调用,每组调用一次
     * 这组的特点是:key相同,value可能有多个
     */
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        context.write(new Text(key), new IntWritable(sum));
    }
}

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


/**
 * author: test
 * date: 2015/1/25.
 */
public class RunJob {
    public static void main(String[] args) {

        Configuration conf = new Configuration();//装在src或者classPath下的所有配置文件

        try {
            Job job = Job.getInstance();
            job.setJarByClass(RunJob.class);
            job.setJobName("WordCount");
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReducer.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);

            FileSystem fs = FileSystem.get(conf);
            FileInputFormat.addInputPath(job, new Path("D:/hadoop/input/input"));
            Path output = new Path("D:/hadoop/output/wc");
            if (fs.exists(output)) {
                fs.delete(output, true);//递归删除
            }
            FileOutputFormat.setOutputPath(job, output);
            if (job.waitForCompletion(true)) {
                System.out.println("Job Done!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}



执行:
1.打jar包,名字为wc.jar

2.hadoop jar wc.jar com.xxx.RunJob(入口类)



how to kill a MapReduce job

Depending on the version, do:

version <2.3.0

Kill a hadoop job:

hadoop job -kill $jobId

You can get a list of all jobId's doing:

hadoop job -list

version >=2.3.0

Kill a hadoop job:

yarn application -kill $ApplicationId

You can get a list of all ApplicationId's doing:

yarn application -list


hadoop与job相关的命令:
1.查看 Job 信息:
hadoop job -list 
2.杀掉 Job: 
hadoop  job –kill  job_id
3.指定路径下查看历史日志汇总:
hadoop job -history output-dir 
4.作业的更多细节: 
hadoop job -history all output-dir 
5.打印map和reduce完成百分比和所有计数器:
hadoop job –status job_id 
6.杀死任务。被杀死的任务不会不利于失败尝试:
hadoop jab -kill-task <task-id> 
7.使任务失败。被失败的任务会对失败尝试不利:
hadoop job  -fail-task <task-id>


0
0
   

查看评论

相关内容