大数据_排序、分区、合并,数据排序分区合并


一、排序:按照key2进行排序
    1、数字的排序
    2、字符串的排序
    3、对象的排序: 实现WritableComparable接口  (1)序列化   (2)可被排序

        员工数据 Employee.java  ----> 作为key2输出
        复习SQL:order by 后面可以跟  列名、表达式、别名、序号(第四列)  desc
                 desc 只作用于离他最近的一个列
                      order by a desc,b desc

        (1)一个列的排序
        (2)多个列的排序

二、分区:Partition: 根据Map的输出(k2  v2)进行分区
    1、默认情况下,MapReduce只有一个分区(只有一个输出文件)
    2、什么是分区?(重要)
    3、举例:Demo: 按照员工的部门号进行分区,相同部门号的员工输出到一个分区中
        日志:
            17/12/18 21:47:24 INFO mapreduce.Job:  map 100% reduce 0%
            17/12/18 21:47:49 INFO mapreduce.Job:  map 100% reduce 33%
            17/12/18 21:47:55 INFO mapreduce.Job:  map 100% reduce 67%
            17/12/18 21:47:56 INFO mapreduce.Job:  map 100% reduce 100%

三、合并:Combiner
    1、MapReduce的任务中,可以没有Combiner
    2、Combiner是一种特殊的Reducer,是在Mapper端先做一次Reducer,用来减少Map的输出,从而提高的效率。
    3、注意事项:
        (1)有些情况,不能使用Combiner  -----> 求平均值

        (2)引入Combiner,不引人Combiner,一定不能改变原理的逻辑。(MapReduce编程案例:实现倒排索引)

    错误:
    Error: java.io.IOException: wrong value class: class org.apache.hadoop.io.DoubleWritable is not class org.apache.hadoop.io.IntWritable

四、什么时候Shuffle

五、MapReduce编程案例

字符串的排序

package demo.mr.sort.text;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

//                                       k1             v1     k2      v2
public class MyTextMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    @Override
    protected void map(LongWritable key1, Text value1, Context context)
            throws IOException, InterruptedException {
        //数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
        String data = value1.toString();

        //分词
        String[] words = data.split(",");

        //输出job列 作为key2
        context.write(new Text(words[2]), NullWritable.get());
    }

}
package demo.mr.sort.text;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MyTextJob {

    public static void main(String[] args) throws Exception {
        // 创建一个job
        Job job = Job.getInstance(new Configuration());

        //指定任务的入口
        job.setJarByClass(MyTextJob.class);

        //指定任务的mapper和输出的数据类型
        job.setMapperClass(MyTextMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        //指定自己的比较器
        job.setSortComparatorClass(MyTextComparator.class);

        //指定任务的输入和输出
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //执行任务
        job.waitForCompletion(true);
    }

}
package demo.mr.sort.text;

import org.apache.hadoop.io.Text;

public class MyTextComparator extends Text.Comparator{

    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
        // TODO Auto-generated method stub
        return -super.compare(b1, s1, l1, b2, s2, l2);
    }

}

对象的排序

package demo.mr.sort.object;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

public class Employee implements WritableComparable<Employee> {

    //定义员工的属性  7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
    private int empno;//员工号
    private String ename;//姓名
    private String job;//职位
    private int mgr;//经理的员工号
    private String hiredate;//入职日期
    private int sal;//月薪
    private int comm;//奖金
    private int deptno;//部门号


    @Override
    public String toString() {
        return "Employee [empno=" + empno + ", ename=" + ename + ", sal=" + sal + ", deptno=" + deptno + "]";
    }


//  @Override
//  public int compareTo(Employee o) {
//      // 排序规则:一个列的排序
//      //按照员工的薪水排序
//      if(this.sal >= o.getSal()){
//          return 1;
//      }else{
//          return -1;
//      }
//  }

    @Override
    public int compareTo(Employee o) {
        // 排序规则:两个列的排序
        //第一个列:部门号
        if(this.deptno > o.getDeptno()){
            return 1;
        }else if(this.deptno < o.getDeptno()){
            return -1;
        }

        //第二个列:薪水
        if(this.sal >= o.getSal()){
            return 1;
        }else{
            return -1;
        }
    }


    @Override
    public void readFields(DataInput input) throws IOException {
        // 代表反序列化:输入
        this.empno = input.readInt();
        this.ename = input.readUTF();
        this.job = input.readUTF();
        this.mgr = input.readInt();
        this.hiredate = input.readUTF();
        this.sal = input.readInt();
        this.comm = input.readInt();
        this.deptno = input.readInt();
    }

    @Override
    public void write(DataOutput output) throws IOException {
        // 代表序列化过程,输出
        output.writeInt(this.empno);
        output.writeUTF(this.ename);
        output.writeUTF(this.job);
        output.writeInt(this.mgr);
        output.writeUTF(this.hiredate);
        output.writeInt(this.sal);
        output.writeInt(this.comm);
        output.writeInt(this.deptno);
    }

    public int getEmpno() {
        return empno;
    }
    public void setEmpno(int empno) {
        this.empno = empno;
    }
    public String getEname() {
        return ename;
    }
    public void setEname(String ename) {
        this.ename = ename;
    }
    public String getJob() {
        return job;
    }
    public void setJob(String job) {
        this.job = job;
    }
    public int getMgr() {
        return mgr;
    }
    public void setMgr(int mgr) {
        this.mgr = mgr;
    }
    public String getHiredate() {
        return hiredate;
    }
    public void setHiredate(String hiredate) {
        this.hiredate = hiredate;
    }
    public int getSal() {
        return sal;
    }
    public void setSal(int sal) {
        this.sal = sal;
    }
    public int getComm() {
        return comm;
    }
    public void setComm(int comm) {
        this.comm = comm;
    }
    public int getDeptno() {
        return deptno;
    }
    public void setDeptno(int deptno) {
        this.deptno = deptno;
    }
}
package demo.mr.sort.object;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

//                                                                k2 就是这个员工对象
public class SortEmployeeMapper extends Mapper<LongWritable, Text, Employee, NullWritable> {

    @Override
    protected void map(LongWritable key1, Text value1, Context context)
            throws IOException, InterruptedException {
        // 数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
        String data = value1.toString();

        //分词
        String[] words = data.split(",");

        //创建员工的对象
        Employee e = new Employee();

        //设置员工的属性
        //员工号
        e.setEmpno(Integer.parseInt(words[0]));
        //姓名
        e.setEname(words[1]);
        //职位
        e.setJob(words[2]);
        //经理号: 注意:有些员工没有经理
        try{
            e.setMgr(Integer.parseInt(words[3]));
        }catch(Exception ex){
            //没有老板号
            e.setMgr(-1);
        }

        //入职日期
        e.setHiredate(words[4]);
        //薪水
        e.setSal(Integer.parseInt(words[5]));
        //奖金:注意:有些员工没有奖金
        try{
            e.setComm(Integer.parseInt(words[6]));
        }catch(Exception ex){
            //没有奖金
            e.setComm(0);
        }
        //部门号
        e.setDeptno(Integer.parseInt(words[7]));

        //输出
        context.write(e, NullWritable.get());
    }

}
package demo.mr.sort.object;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SortEmployeeMain {

    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(SortEmployeeMain.class);

        job.setMapperClass(SortEmployeeMapper.class);
        job.setMapOutputKeyClass(Employee.class);
        job.setMapOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }
}

分区

package demo.mr.partition;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class Employee implements Writable {

    //定义员工的属性  7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
    private int empno;//员工号
    private String ename;//姓名
    private String job;//职位
    private int mgr;//经理的员工号
    private String hiredate;//入职日期
    private int sal;//月薪
    private int comm;//奖金
    private int deptno;//部门号


    @Override
    public String toString() {
        return "Employee [empno=" + empno + ", ename=" + ename + ", deptno=" + deptno + "]";
    }


    @Override
    public void readFields(DataInput input) throws IOException {
        // 代表反序列化:输入
        this.empno = input.readInt();
        this.ename = input.readUTF();
        this.job = input.readUTF();
        this.mgr = input.readInt();
        this.hiredate = input.readUTF();
        this.sal = input.readInt();
        this.comm = input.readInt();
        this.deptno = input.readInt();
    }

    @Override
    public void write(DataOutput output) throws IOException {
        // 代表序列化过程,输出
        output.writeInt(this.empno);
        output.writeUTF(this.ename);
        output.writeUTF(this.job);
        output.writeInt(this.mgr);
        output.writeUTF(this.hiredate);
        output.writeInt(this.sal);
        output.writeInt(this.comm);
        output.writeInt(this.deptno);
    }

    public int getEmpno() {
        return empno;
    }
    public void setEmpno(int empno) {
        this.empno = empno;
    }
    public String getEname() {
        return ename;
    }
    public void setEname(String ename) {
        this.ename = ename;
    }
    public String getJob() {
        return job;
    }
    public void setJob(String job) {
        this.job = job;
    }
    public int getMgr() {
        return mgr;
    }
    public void setMgr(int mgr) {
        this.mgr = mgr;
    }
    public String getHiredate() {
        return hiredate;
    }
    public void setHiredate(String hiredate) {
        this.hiredate = hiredate;
    }
    public int getSal() {
        return sal;
    }
    public void setSal(int sal) {
        this.sal = sal;
    }
    public int getComm() {
        return comm;
    }
    public void setComm(int comm) {
        this.comm = comm;
    }
    public int getDeptno() {
        return deptno;
    }
    public void setDeptno(int deptno) {
        this.deptno = deptno;
    }
}
package demo.mr.partition;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

//                                                                  k2部门号          v2员工对象
public class MyPartitionMapper extends Mapper<LongWritable, Text, IntWritable, Employee> {

    @Override
    protected void map(LongWritable key1, Text value1, Context context)
            throws IOException, InterruptedException {
        // 数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
        String data = value1.toString();

        //分词
        String[] words = data.split(",");

        //创建员工的对象
        Employee e = new Employee();

        //设置员工的属性
        //员工号
        e.setEmpno(Integer.parseInt(words[0]));
        //姓名
        e.setEname(words[1]);
        //职位
        e.setJob(words[2]);
        //经理号: 注意:有些员工没有经理
        try{
            e.setMgr(Integer.parseInt(words[3]));
        }catch(Exception ex){
            //没有老板号
            e.setMgr(-1);
        }

        //入职日期
        e.setHiredate(words[4]);
        //薪水
        e.setSal(Integer.parseInt(words[5]));
        //奖金:注意:有些员工没有奖金
        try{
            e.setComm(Integer.parseInt(words[6]));
        }catch(Exception ex){
            //没有奖金
            e.setComm(0);
        }
        //部门号
        e.setDeptno(Integer.parseInt(words[7]));

        //输出  k2是部门号  v2是员工对象
        context.write(new IntWritable(e.getDeptno()), e);
    }

}
package demo.mr.partition;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

//把相同部门的员工输出到HDFS                                                  k4 部门号    v4 员工
public class MyPartitionReducer extends Reducer<IntWritable, Employee, IntWritable, Employee> {

    @Override
    protected void reduce(IntWritable k3, Iterable<Employee> v3,Context context)
            throws IOException, InterruptedException {
        // 直接输出
        for(Employee e:v3){
            context.write(k3, e);
        }
    }

}
package demo.mr.partition;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

//分区规则:根据Map的输出建立分区                                                                     k2                v2
public class MyPartitioner extends Partitioner<IntWritable, Employee>{

    /**
     * numPartition: 分区的个数
     */
    @Override
    public int getPartition(IntWritable k2, Employee e, int numPartition) {
        // 分区规则
        if(e.getDeptno() == 10){
            //放入一号分区
            return 1%numPartition;
        }else if(e.getDeptno() == 20){
            //放入二号分区
            return 2%numPartition;
        }else{
            //30号部门放入0号分区
            return 3%numPartition;
        }
    }

}
package demo.mr.partition;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MyPartitionMain {

    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(MyPartitionMain.class);

        job.setMapperClass(MyPartitionMapper.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Employee.class);

        //指定分区的规则
        job.setPartitionerClass(MyPartitioner.class);
        //指定分区的个数
        job.setNumReduceTasks(3); //3代表三个分区

        job.setReducerClass(MyPartitionReducer.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Employee.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);        

    }
}

合并

package demo.mr.combiner.wc;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

//public class WordCountReducer extends Reducer<k3, v3, k4, v4> {
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text k3, Iterable<IntWritable> v3,Context context) throws IOException, InterruptedException {
        /*
         * context 代表reduce的上下文
         * 上文:Mapper
         * 下文:HDFS
         */

        //对v3进行求和
        int total = 0;
        for(IntWritable v:v3){
            total += v.get();
        }

        //输出:k4 单词     v4 频率
        context.write(k3, new IntWritable(total));
    }

}
package demo.mr.combiner.wc;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

//public class WordCountMapper extends Mapper<k1, v1, k2, v2> {
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    @Override
    protected void map(LongWritable key1, Text value1, Context context)
            throws IOException, InterruptedException {
        /*
         * context 代表Mapper的上下文
         * 上文:HDFS
         * 下文:Reducer
         */

        //取出数据:  I love Beijing
        String data = value1.toString();

        //分词
        String[] words = data.split(" ");

        //输出
        for(String word:words){
            //            k2 就是 单词                          v2: 记一次数
            context.write(new Text(word), new IntWritable(1));
        }
    }
}
package demo.mr.combiner.wc;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountMain {

    public static void main(String[] args) throws Exception {
        // 创建一个job:job = map + reduce
        Job job = Job.getInstance(new Configuration());

        //指定任务的入口
        job.setJarByClass(WordCountMain.class);

        //指定任务的Mapper和输出的数据类型: k2  v2
        job.setMapperClass(WordCountMapper.class);
        job.setMapOutputKeyClass(Text.class);    //指定k2
        job.setMapOutputValueClass(IntWritable.class);  //指定v2

        //增加一个Combiner的class
        job.setCombinerClass(WordCountReducer.class);

        //指定任务的Reducer和输出的数据类型: k4 v4
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);   //指定k4
        job.setOutputValueClass(IntWritable.class);   //指定v4

        //指定输入的路径(map)、输出的路径(reduce)
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //执行任务
        job.waitForCompletion(true);
    }
}

求平均值是不能是使用合并的

package demo.mr.combiner.avgsalary;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class AvgSalaryMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    @Override
    protected void map(LongWritable key1, Text value1, Context context)
            throws IOException, InterruptedException {
        // 数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
        String data = value1.toString();

        //分词
        String[] words = data.split(",");

        //输出:  k2:常量   v2:薪水
        context.write(new Text("salary"), new IntWritable(Integer.parseInt(words[5])));
    }

}
package demo.mr.combiner.avgsalary;

import java.io.IOException;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

//                                                                      v4:平均工资
public class AvgSalaryReducer extends Reducer<Text, IntWritable, Text, DoubleWritable> {

    @Override
    protected void reduce(Text k3, Iterable<IntWritable> v3,Context context) throws IOException, InterruptedException {
        int total = 0;
        int count = 0;

        for(IntWritable sal:v3){
            total += sal.get();

            count ++;
        }

        //输出
        context.write(new Text("Avg Salary is: "), new DoubleWritable(total/count));
    }

}
package demo.mr.combiner.avgsalary;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class AvgSalaryMain {

    public static void main(String[] args) throws Exception {
        // 创建一个job:job = map + reduce
        Job job = Job.getInstance(new Configuration());

        //指定任务的入口
        job.setJarByClass(AvgSalaryMain.class);

        //指定任务的Mapper和输出的数据类型: k2  v2
        job.setMapperClass(AvgSalaryMapper.class);
        job.setMapOutputKeyClass(Text.class);    //指定k2
        job.setMapOutputValueClass(IntWritable.class);  //指定v2

        //增加一个Combiner的class
        job.setCombinerClass(AvgSalaryReducer.class);

        //指定任务的Reducer和输出的数据类型: k4 v4
        job.setReducerClass(AvgSalaryReducer.class);
        job.setOutputKeyClass(Text.class);   //指定k4
        job.setOutputValueClass(DoubleWritable.class);   //指定v4

        //指定输入的路径(map)、输出的路径(reduce)
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //执行任务
        job.waitForCompletion(true);

    }

}

查看评论

相关内容

    暂无相关文章