Hadoop读书笔记(八)MapReduce 打成jar包demo,hadoopmapreduce


Hadoop读书笔记(一)Hadoop介绍:http://blog.csdn.net/caicongyang/article/details/39898629

Hadoop读书笔记(二)HDFS的shell操作:http://blog.csdn.net/caicongyang/article/details/41253927

Hadoop读书笔记(三)Java API操作HDFS:http://blog.csdn.net/caicongyang/article/details/41290955

Hadoop读书笔记(四)HDFS体系结构 :http://blog.csdn.net/caicongyang/article/details/41322649

Hadoop读书笔记(五)MapReduce统计单词demohttp://blog.csdn.net/caicongyang/article/details/41453579

Hadoop读书笔记(六)MapReduce自定义数据类型demo:http://blog.csdn.net/caicongyang/article/details/41490379

Hadoop读书笔记(七)MapReduce 0.x版本API使用demo:http://blog.csdn.net/caicongyang/article/details/41493325

1.代码改造

KpiApp.java

package cmd;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
 * 
 * <p> 
 * Title: KpiApp.java 
 * Package mapReduce 
 * </p>
 * <p>
 * Description: 统计流量  (打包jar命令行运行) :extends Configured implements Tool
 * <p>
 * @author Tom.Cai
 * @created 2014-11-25 下午10:23:33 
 * @version V1.0 
 *
 */
public class KpiApp extends Configured implements Tool{

	public static void main(String[] args) throws Exception {
		ToolRunner.run(new KpiApp(), args);
	}

	@Override
	public int run(String[] arg0) throws Exception {
		String INPUT_PATH = arg0[0];
		String OUT_PATH = arg0[1];
		FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), new Configuration());
		Path outPath = new Path(OUT_PATH);
		if (fileSystem.exists(outPath)) {
			fileSystem.delete(outPath, true);
		}

		Job job = new Job(new Configuration(), KpiApp.class.getSimpleName());
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		job.setInputFormatClass(TextInputFormat.class);

		job.setMapperClass(KpiMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(KpiWite.class);

		job.setPartitionerClass(HashPartitioner.class);
		job.setNumReduceTasks(1);

		job.setReducerClass(KpiReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(KpiWite.class);

		FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
		job.setOutputFormatClass(TextOutputFormat.class);

		job.waitForCompletion(true);
		return 0;
	}
	
	
	static class KpiMapper extends Mapper<LongWritable, Text, Text, KpiWite> {
		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			String[] splited = value.toString().split("\t");
			String num = splited[1];
			KpiWite kpi = new KpiWite(splited[6], splited[7], splited[8], splited[9]);
			context.write(new Text(num), kpi);
		}
	}

	static class KpiReducer extends Reducer<Text, KpiWite, Text, KpiWite> {
		@Override
		protected void reduce(Text key, Iterable<KpiWite> value, Context context) throws IOException, InterruptedException {
			long upPackNum = 0L;
			long downPackNum = 0L;
			long upPayLoad = 0L;
			long downPayLoad = 0L;
			for (KpiWite kpi : value) {
				upPackNum += kpi.upPackNum;
				downPackNum += kpi.downPackNum;
				upPayLoad += kpi.upPayLoad;
				downPayLoad += kpi.downPayLoad;
			}
			context.write(key, new KpiWite(String.valueOf(upPackNum), String.valueOf(downPackNum), String.valueOf(upPayLoad), String.valueOf(downPayLoad)));
		}
	}
}

class KpiWite implements Writable {
	long upPackNum;
	long downPackNum;
	long upPayLoad;
	long downPayLoad;

	public KpiWite() {
	}

	public KpiWite(String upPackNum, String downPackNum, String upPayLoad, String downPayLoad) {
		this.upPackNum = Long.parseLong(upPackNum);
		this.downPackNum = Long.parseLong(downPackNum);
		this.upPayLoad = Long.parseLong(upPayLoad);
		this.downPayLoad = Long.parseLong(downPayLoad);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.upPackNum = in.readLong();
		this.downPackNum = in.readLong();
		this.upPayLoad = in.readLong();
		this.downPayLoad = in.readLong();
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeLong(upPackNum);
		out.writeLong(downPackNum);
		out.writeLong(upPayLoad);
		out.writeLong(downPayLoad);
	}

}

2.打成jar

通过Eclipse的Export将上述的类打成kpi.jar

将jar包上传到Linux下通过 hadoop jar xxx.jar [parameter] [parameter]命令执行即可

例如:hadoop jar kpi.jar hdfs://192.168.80.100:9000/wlan hdfs://192.168.80.100:9000/wlan_out

即将上篇的代码完成改造可以在命令行下运行!


欢迎大家一起讨论学习!

有用的自己收!

记录与分享,让你我共成长!欢迎查看我的其他博客;我的博客地址:http://blog.csdn.net/caicongyang






相关内容