欢迎回家
我们一直在改变

Hive的自定义函数

Hive内置函数

Hive中自带了大量的内置函数,详细可参看如下资源:

官方文档:

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-Built-inFunctions .

网友整理的中文文档: http://blog.csdn.net/wisgood/article/details/17376393 .

开发过程中应该尽量使用Hive内置函数,毕竟Hive内置函数经过了大量的测试,性能普遍较好,任何一点性能上的问题在大数据量上跑时候都会被放大。

自定义函数

同SQL Server一样,Hive也允许用户自定义函数,这大大扩展了Hive的功能,Hive是用 Java 语言写的,所以自定义函数也需要用Java来写。

编写一个Hive的自定义函数,需要新建一个Java类来继承UDF类并实现evaluate()函数,evaluate()函数中编写自定义函数的实现逻辑,返回值给Hive使用,需要注意的是,evaluate()函数的输入输出都必须是Hadoop的数据类型,以便可以被MapReduce程序来进行序列化反序列化。编写完成后将Java程序打成Jar包,在Hive会话中载入Jar包来使用自定义函数。

在执行Hive语句时,遇到一个自定义函数就会实例化一个类,并执行对应的evaluate()函数,每行输入都会调用一次evaluate()函数,所以在编写自定义函数时,一定要注意大数据量时的资源占用问题。

Hive中的自定义函数依据输入输出数据的个数,分为以下几类:

UDF用户自定义函数(一进一出)

这种是最普通最常见的自定义函数,类似内置函数length()、yaer()等函数,输入为一个值,输出也为一个值。下面是一个获取唯一ID的自定义函数例子:

package com.autohome.ics.bigdata.common.Date;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import java.util.Date;

/*
 生成一个指定长度的随机字符串(最长为36位)
 */
@org.apache.hadoop.hive.ql.exec.Description(name = "UUID",
        extended = "示例:select UUID(32) from src;",
        value = "_FUNC_(leng)-生成一个指定长度的随机字符串(最长为36位)")
@UDFType(deterministic = false)
public class UUID extends UDF {
    public Text evaluate(IntWritable leng) {
        String uuid = java.util.UUID.randomUUID().toString();
        int le = leng.get();
        le = le > uuid.length() ? uuid.length() : le;
        return new Text(uuid.substring(0, le));
    }

/*
 生成一个随机字符串
*/
public Text evaluate() {
    String uuid = java.util.UUID.randomUUID().toString();
    return new Text(uuid);
}
}
  • 这个实例是获取一个指定长度的随机字符串自定义函数,这个自定义函数创建了一个类UUID,继承于UDF父类。
  • UUID类要实现evaluate函数,获取一个指定长度的随机字符串。
  • evaluate函数是可以有多个重载的。
  • Description是自定义函数的描述信息。
  • 这里有一个参数deterministic,是标识这个自定义函数是否是那种输入确定时输出就确定的函数,默认是true,比如length函数就是如果输入同一个值,那么输出肯定是一致的,但是我们这里的UUID就算输入确定,但是输出也是不确定的,所以要将deterministic设置为false。

UDAF用户自定义聚合函数(多进一出)

UDAF是自定义聚合函数,类似于sum()、avg(),这一类函数输入是多个值,输出是一个值。

UDAF是需要hive sql语句和group by联合使用的。

聚合函数常常需要对大量数组进行操作,所以在编写程序时,一定要注意内存溢出问题。

UDAF分为两种:简单UDAF和通用UDAF。简单UDAF写起来比较简单,但是因为使用了JAVA的反射机制导致性能有所损失,另外有些特性不能使用,如可变参数列表,通用UDAF可以使用所有功能,但是写起来比较复杂。

(1) 简单UDAF实例

package com.autohome.ics.bigdata.common.number;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import java.util.regex.Pattern;
/**
 * Created by 鸣宇淳 on 2016/8/30.
*对传入的字符串列表,按照数字大小进行排序后,找出最大的值
*/
public class MaxIntWithString extends UDAF {
    public static  class MaxIntWithStringUDAFEvaluator implements  UDAFEvaluator{
        //最终结果最大的值
        private IntWritable MaxResult;
        @Override
        public void init() {
            MaxResult=null;
        }
        //每次对一个新值进行聚集计算都会调用iterate方法
        //这里将String转换为int后,进行比较大小
        public boolean iterate(Text value)
        {
            if(value==null)
                return false;
            if(!isInteger(value.toString()))
            {
                return false;
            }
            int number=Integer.parseInt(value.toString());
            if(MaxResult==null)
                MaxResult=new IntWritable(number);
            else
                MaxResult.set(Math.max(MaxResult.get(), number));
            return true;
        }
        //Hive需要部分聚集结果的时候会调用该方法
        //会返回一个封装了聚集计算当前状态的对象
        public IntWritable terminatePartial()
        {
            return MaxResult;
        }
        //合并两个部分聚集值会调用这个方法
        public boolean merge(Text other)
        {
            return iterate(other);
        }
        //Hive需要最终聚集结果时候会调用该方法
        public IntWritable terminate()
        {
            return MaxResult;
        }
        private static boolean isInteger(String str) {
            Pattern pattern = Pattern.compile("^[-\\+]?[\\d]*$");
            return pattern.matcher(str).matches();
        }
    }
}
  • UDAF要继承于UDAF父类org.apache.hadoop.hive.ql.exec.UDAF。
  • 内部类要实现org.apache.hadoop.hive.ql.exec.UDAFEvaluator接口。
  • MaxIntWithStringUDAFEvaluator类里需要实现init、iterate、terminatePartial、merge、terminate这几个函数,是必不可少的.
  • init()方法用来进行全局初始化的。
  • iterate()中实现比较逻辑。
  • terminatePartial是Hive部分聚集时调用的,类似于MapReduce里的Combiner,这里能保证能得到各个部分的最大值。
  • merge是多个部分合并时调用的,得到了参与合并的最大值。
  • terminate是最终Reduce合并时调用的,得到最大值。

(2) 通用UDAF实例

开发通用UDAF有两个步骤,第一个是编写resolver类,第二个是编写evaluator类。resolver负责类型检查,操作符重载。evaluator真正实现UDAF的逻辑。通常来说,顶层UDAF类继承org.apache.hadoop.hive.ql.udf.GenericUDAFResolver2,里面编写嵌套类evaluator 实现UDAF的逻辑。 通用UDAF使用场景较少,详情可以参看内置函数的源码,或者官方文档。

UDTF自定义表生成函数(一进多出)

UDTF是将一个输入值转变为一个数组。

下面这个例子是从 nginx 日志中的agent信息中提取浏览器名称和版本号的自定义函数,输入参数类似于:

”Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML like Gecko) Chrome/31.0.1650.63 Safari/537.36”,输出为:Chrome 31.0.1650.63。

package com.autohome.ics.bigdata.common.String;
import cz.mallat.uasparser.OnlineUpdater;
import cz.mallat.uasparser.UASparser;
import cz.mallat.uasparser.UserAgentInfo;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
 * 解析浏览器信息UDTF
 *
 * 将日志中的http_user_agent,得到 browser_name,browser_version两个字段
 *  Created by ad on 2016/7/29.
 */
public class ParseUserAgentUDTF  extends GenericUDTF{
    private static UASparser uaSparser;
    static{
        try {
            uaSparser = new UASparser(OnlineUpdater.getVendoredInputStream());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    private final ListObjectInspector listIO = null;
    private final Object[] forwardObj = new Object[2];
    /**
     * 声明解析出来的字段名称和类型
     * @param argOIs
     * @return
     * @throws UDFArgumentException
     */
    @Override
    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
        if(argOIs.getAllStructFieldRefs().size() != 1){
            throw new UDFArgumentException("args error!");
        }
        ArrayList<String> fieldNames = new ArrayList<String>();
        ArrayList<ObjectInspector> fieldOIs = new ArrayList<>();
        fieldNames.add("browser_name");
        fieldNames.add("browser_version");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs);
    }
    @Override
    public void process(Object[] args) throws HiveException {
        // 真正解析的地方
      /*
      输入字符串:"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36"
      解析为:
       */
        String userAgent = args[0].toString();
        try {
            UserAgentInfo userAgentInfo = uaSparser.parse(userAgent);
            List<String> bws = new ArrayList<>();
            bws.add(userAgentInfo.getUaFamily());
            bws.add(userAgentInfo.getBrowserVersionInfo());
            super.forward(bws.toArray(new String[0]));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void close() throws HiveException {
    }
}

自定义函数使用方式:

  1. 打包为jar包(例如包名为:common.jar),因为这个程序引用了依赖uasparser,所以打包时注意应该将依赖uasparser打进去。
  2. hive中添加jar包 add jar lib/common.jar;
  3. 声明函数 create temporary function ParseUserAgent as ‘com.autohome.ics.bigdata.common.String. ParseUserAgentUDTF’;
  4. 查询 select ParseUserAgent(agent) from user_action_log_like;
  5. 结果 Chrome 31.0.1650.63 Chrome 31.0.1650.63 Chrome 31.0.1650.63 Chrome 31.0.1650.63 Chrome 31.0.1650.63

原文链接:https://www.codercto.com/a/5110.html

赞(0)
未经允许不得转载:91coding » Hive的自定义函数
分享到: 更多 (0)

评论 抢沙发

评论前必须登录!

立即登录   注册

91CODING 小白轻松上手,大牛稳健进步

关于我们免责声明