开发HIVE的UDTF自定义函数,hiveudtf自定义


[Author]: kwu 

UDTF(User-Defined Table-Generating Functions) 用来解决 输入一行输出多行(On-to-many maping) 的需求,开发HIVE的UDTF自定义函数具体步骤如下:


1、继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF,实现initialize, process, close三个方法。


2、UDTF首先会调用initialize方法,此方法返回UDTF的返回行的信息(返回个数,类型)。


3、初始化完成后,会调用process方法,真正的处理过程在process函数中,在process中,每一次forward()调用产生一行;如果产生多列可以将多个列的值放在一个数组中,然后将该数组传入到forward()函数。


4、最后close()方法调用,对需要清理的方法进行清理。


5、代码实例,实现的功能比较简单,首先按 "\001" 切分,再处理字符串,其中涉及对JSON的处理

package com.hexun.udtf;

import java.util.ArrayList;

import net.sf.json.JSON;
import net.sf.json.JSONSerializer;

import org.apache.commons.beanutils.PropertyUtils;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

public class UDTFDratio extends GenericUDTF {

	public void close() throws HiveException {

	}

	// 返回UDTF的处理行的信息(个数,类型)。
	public StructObjectInspector initialize(ObjectInspector[] args)
			throws UDFArgumentException {
		if (args.length != 1) {
			throw new UDFArgumentLengthException(
					"ExplodeMap takes only one argument");
		}
		if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
			throw new UDFArgumentException(
					"ExplodeMap takes string as a parameter");
		}
		ArrayList<String> fieldNames = new ArrayList<String>();
		ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();

		fieldNames.add("col1");
		fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
		fieldNames.add("col2");
		fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
		fieldNames.add("col3");
		fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
		fieldNames.add("col4");
		fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
		fieldNames.add("col5");
		fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
		fieldNames.add("col6");
		fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
		fieldNames.add("col7");
		fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
		fieldNames.add("col8");
		fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

		return ObjectInspectorFactory.getStandardStructObjectInspector(
				fieldNames, fieldOIs);
	}

	// 对传入的参数进行处理,可以通过forword()方法返回结果
	public void process(Object[] args) throws HiveException {
		String input = args[0].toString();
		String[] splited = input.split("\001");

		String[] result = new String[8];

		for (int i = 0; i < splited.length; i++) {
			if (i == 0) {
				String head = splited[i];
				String userId = head.substring(0, head.indexOf("_"));
				String cookieId = head.substring(head.indexOf("_") + 1);
				
				result[0] = userId;
				result[1] = cookieId;
			} else {
				String json = splited[i];
				JSON jo = JSONSerializer.toJSON(json);
				Object o = JSONSerializer.toJava(jo);
				
				try{
					String sex = PropertyUtils.getProperty(o, "sex").toString();
					result[2] = sex;

					String age = PropertyUtils.getProperty(o, "age").toString();
					result[3] = age;
					
					String ppt = PropertyUtils.getProperty(o, "ppt").toString();
					result[4] = ppt;

					String degree = PropertyUtils.getProperty(o, "degree").toString();
					result[5] = degree;
					
					String favor = PropertyUtils.getProperty(o, "favor").toString();
					result[6] = favor;
					
					String commercial = PropertyUtils.getProperty(o, "commercial").toString();
					result[7] = commercial;
					
				}catch(Exception e){
					e.printStackTrace();
				}
			}
		}

		forward(result);
	}

}

示例代码涉及的JAR包



6、hive命令行操作,引入UDTF前,需要先加入JSON的依赖包

add jar /opt/softwares/lib/commons-beanutils-1.7.0.jar;
add jar /opt/softwares/lib/commons-collections-3.2.jar;
add jar /opt/softwares/lib/commons-lang-2.4.jar;
add jar /opt/softwares/lib/commons-logging-1.1.3.jar;
add jar /opt/softwares/lib/ezmorph-1.0.3.jar;
add jar /opt/softwares/lib/json-lib-2.2.3-jdk15.jar;
add jar /opt/softwares/UDF.jar;
create temporary function explode_map3 as 'com.hexun.udtf.UDTFDratio';

insert into table stage.dratio PARTITION (day='${yesterday}') select explode_map3(datadratio) as (col1,col2,col3,col4,col5,col6,col7,col8) from stage.dratio_tmp;


相关内容