Pig用户自定义函数(UDF)


我们以气温统计和词频统计为例,讲解以下三种用户自定义函数。

用户自定义函数

什么时候需要用户自定义函数呢?和其它语言一样,当你希望简化程序结构或者需要重用程序代码时,函数就是你不二选择。

Pig的用户自定义函数可以用Java编写,但是也可以用Python或Javascript编写。我们接下来以Java为例。

自定义过滤函数

我们仍然以先前的代码为例:

第二个语句的作用就是筛选合法的数据。如果我们采用用户自定义函数,则第二个语句可以写成:

这种写法更容易理解,也更容易在多个地方重用。接下来的问题就是如何定义这个isValid函数。代码如下:

com.oserp.pigudf;

java.io.IOException;

org.apache.pig.FilterFunc;

org.apache.pig.data.Tuple;

 

IsValidTemperature FilterFunc {

        

         Boolean exec(Tuple tuple) IOException {            

                   Object object = .get(0);

                   temperature = (Integer)object;            

                   temperature != 999;

         }

}

接下来,我们需要:

1)  编译代码并打包成jar文件,比如pigudf.jar。

2)  通过register命令将这个jar文件注册到pig环境:

//参数为jar文件的本地路径

此时,我们就可以用以下语句调用这个函数:

看起来这个函数名太长,不便输入。我们可以用定义别名的方式代替:

回到代码,我们可发现:

1)  需要定义一个继承自FilterFunc的类。

2)  重写这个类的exec方法。这个方法的参数只有一个tuple,但是调用时可以传递多个参数,你可以通过索引号获得对应的参数值,比如tuple.get(1)表示取第二个参数。

3)  调用时,需要使用类的全名。(当然你可以自定义别名)

4)  更多的验证需要读者自行在函数中添加,比如判断是否为null等等。

 

备注:用Eclipse编写Pig自定义函数时,你可能需要引用到一些Hadoop的库文件。比较容易的方式是在新建项目时指定项目类型为MapReduce项目,这样Eclipse就会自动设置库引用的相关信息。

 

自定义运算函数(Eval function)

仍然以前面的数据文件为例:

1990 21

1990 18

1991 21

1992 30

1992 999

1990 23

假设我们希望通过温度值获得一个温度的分类信息,比如我们把温度大于划分为以下类型:

温度                            分类

x>=30                          hot

x>=10 and x<30        moderate

x<10                                      cool

则我们可以定义以下函数,代码如下:

com.oserp.pigudf;

java.io.IOException;

org.apache.pig.EvalFunc;

org.apache.pig.data.Tuple;

 

EvalFunc<String> {

        

         String exec(Tuple tuple) IOException {               

                   Object object = tuple.get(0);

                   temperature = (Integer)object;

                  

                   (temperature >= 30){

                            ;

                   }

                   (temperature >=10){

                            ;

                   }

                   {

                            ;

                   }                

         }

}

依次输入以下Pig语句:

输出结果如下:

代码比较简单,该类继承自EvalFunc类,且我们要明确定义返回值类型。

 

有些时候其它类库可能包含有功能相近的Java函数,我们是否可以直接将这些库函数拿过来使用呢?可以。以下语句调用了trim函数,用于去掉name字段前后的空格:

其中的InvokeForString是一个Invoker(不知道该如何翻译啊),其通过反射机制调用,返回值是String类型。其它类似的还有InvokeForInt,InvokeForLong, InvokeForDouble,InvokeForFloat等等。

 

自定义加载函数

我们以词频统计为例,讲解如何自定义加载函数。(统计各个单词出现的频率,由高到低排序)

一般情况下,load语句加载数据时,一行会被生成一个tuple。而统计词频时,我们希望每个单词生成一个tuple。我们的测试数据文件只有两行数据,如下:

Thisis a map a reduce program

mapreduce partition combiner

我们希望load后能得到如下形式的数据,每个单词一个tuple:

(This)

(is)

(a)

(map)

(a)

(reduce)

 

先看代码:

com.oserp.pigudf;

java.io.IOException;

java.util.ArrayList;

java.util.List;

org.apache.hadoop.io.Text;

org.apache.hadoop.mapreduce.InputFormat;

org.apache.hadoop.mapreduce.Job;

org.apache.hadoop.mapreduce.RecordReader;

org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

org.apache.pig.LoadFunc;

org.apache.pig.backend.executionengine.ExecException;

org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;

org.apache.pig.data.BagFactory;

org.apache.pig.data.DataBag;

org.apache.pig.data.Tuple;

org.apache.pig.data.TupleFactory;

 

 

WordCountLoadFunc LoadFunc {

        

         RecordReader ;

         TupleFactory = TupleFactory.getInstance();

         BagFactory = BagFactory.getInstance();

        

                 

         InputFormatgetInputFormat() IOException {

                   TextInputFormat();

         }      

 

        

         Tuple getNext() IOException {

                    

                   {

                           

                            (!.nextKeyValue()){

                                     ;

                                     }

                            Textvalue = (Text).getCurrentValue();

                            Stringline = value.toString();

                            String[]words =  line.split();

                           

                           

                           

                           

                           

                           

                            List<Tuple>tuples = ArrayList<Tuple>();                    

                            Tupletuple = ;

                            (String word : words) {

                                     tuple= .newTuple();

                                     tuple.append(word);

                                     tuples.add(tuple);

                                     }

                           

                            DataBagbag = .newDefaultBag(tuples);

                            Tupleresult = .newTuple(bag);

                           

                            result;

                   }

                   (InterruptedException e) {

                            ExecException(e);

                   }

                  

         }

 

        

         prepareToRead(RecordReader reader,PigSplit arg1)

                            IOException {

                   . = reader;

         }

 

        

         setLocation(String location, Job job) IOException {

                   FileInputFormat.setInputPaths(job,location);          

         }

 

}

 

依次执行以下命令:

显示结果如下:

 

注意schema的定义格式:

相关内容

    暂无相关文章