hive udaf 输入输出处置处罚参考手册

打印 上一主题 下一主题

主题 800|帖子 800|积分 2400

根本了解hive udaf

本次分享主要是介绍一下我在udaf开发中遇到的难点。可以让大家背面必要开发udaf时做个参考。
我对这篇文档的定位呢,就是如果你必要开发一个udaf函数,然后百度了解一下,查个代码模版。然后参考我的这个文档。能资助解决类型输入输出的处置处罚问题。因为我在开发的时间,网上没搜到类型输入输出的处置处罚的比力详细的内容。一点点摸索,讨教测试才完成的。希望可以帮大家跳过这个过程。
当然,做分享的时间还是简单说下udaf的根本的内容。
hive 自定义udaf函数对于hive 引擎,spark引擎是通用的。
最简单的方式开发hive 自定义udaf函数 可以 extends UDAF ,覆写此中的方法,但是这样写出来的udaf,由于类型转换处置处罚的不好,性能差,查询速度很慢,根本被淘汰了。数据量一大就不可用了。
以是现在自定义udaf根本都是继承AbstractGenericUDAFResolver类大概实现GenericUDAFResolver2接口
本次案例继承AbstractGenericUDAFResolver
然后定义一个内部类继承GenericUDAFEvaluator 并覆写相应方法。


必要我们关注并覆写的方法:



init 初始化函数
  1. public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
  2.   mode = m;
  3.   return null;
  4. }
复制代码


iterate map 迭代函数

  1. public abstract void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;
复制代码



terminatePartial 中心输出函数

  1. public abstract Object terminatePartial(AggregationBuffer agg) throws HiveException;
复制代码



merge 聚合函数
  1. public abstract void merge(AggregationBuffer agg, Object partial) throws HiveException;
复制代码




terminate 终极输出函数
  1. public abstract Object terminate(AggregationBuffer agg) throws HiveException;
复制代码


这五个方法要结合下面的阶段来看怎么用


udaf的阶段与方法






下面以average udaf为例来介绍运行过程。
起首 看下average 背后的map reduce 过程
假设t表有a字段int类型,内里有1-10 十个数字。
执行select average(a) from t;


average是有map和reduce的任务。
map过程接收数字,然后保存两个变量,一个是数字的个数,一个是数字的总和。
reduce过程接收各个map效果的两个变量,然后数字的个数相加得到总个数。数字的总和相加得到总和。末了相除输出。
上述过程在udaf中的运算过程如下:

起首是part1 也就是map阶段
先进入init函数。这个函数不做业务逻辑处置处罚,是用于获取输入的数据类型的。数据类型背面详细讲。
然后进入iterate函数,依次读入数据。修改变量值。
然后进入terminatePartial shuffer输出到当地文件

然后是part2 也就是combiner阶段
读当地的shuffer文件,先进init函数。
然后进入merge函数,数字个数相加,总和相加。
然后进入terminatePartial shuffer输出到当地文件

然后进入final阶段 也就是reduce
读取map的效果先进入init函数。
然后进入merge函数,数字个数相加,总和相加。
然后进入terminate函数将终极的均匀数输出。



重点难点

上面主线介绍完了。下面介绍关键点。
上面主线中有两个问题必要解决一下。
第一点是在单个阶段中,必要调用多个方法。必要传递变量值。
变量值怎样保存呢?有两种方法。第一种就是定义全局变量。不管几个方法,我在全局变量中天然都能用。解决了传递问题。
第二种方式是udaf提供了一个中心类。也是udaf保举的方式。我不确定不用可不可以,理论可以。
下面是iterate的方法。第一个参数就是这个中心类。这个参数是必须有的。第二个参数是实际读的数据。
  1. public abstract void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;
复制代码

这是第一个问题。这个问题比力简单,大不了就用这个中心类就行了,然后在这个中心类中定义一个long型属性和一个double属性用于存放个数和总和变量。就不用全局变量了。

第二个问题就是udaf开发的难点:


类型转换

map阶段读数据是从hadoop上读,读取的是hive类型,而且读到的可能是int,可能是long,假如存放的是long型的数据,由于执行引擎的差别,读进内存时的类型也有所差别。可能是java的long,可能是hive的writablelong,也可能是lazylong。这个地方如果本身强转处置处罚不完全就很轻易报错。



combiner阶段和reduce阶段的读数据就轻易多了。因为这个数据是我们写进去的。我们写进去是writablelong,那读就是writablelong。具体操纵稍后介绍。
这里明白每个阶段读数据都必要类型转换就可以了。

map阶段,combiner阶段和reduce阶段的输出也有两个问题必要注意。
第一,输出的时间一定不可以输出java类型,一定要输出hive的类型,也就是实现了WritableComparable接口的类型。在计算是我们是用的java类型,输出前要先转换,再return。
第二,输出的时间不只是在terminatePartial大概terminate函数输出了就完事了。不可以。我们还必要特意的告知udaf,我们在每个阶段输出的是什么类型。而告知的方式也不是直接把类型传给udaf,而是要使用输出类型的对应的Inspector。我叫它类型描述器。
以是我们的数据对于java来说如果是long类型,那么我们除了要用long,也要用到LongWritable和PrimitiveObjectInspector。这就是udaf贫困的地方。

接下来的篇幅将侧重过细介绍类型的具体处置处罚。




输入输出类型处置处罚的具体操纵



map输入的处置处罚

average举例。map读取的数据是int大概long大概byte这种数字类型。
从hive读取。由于引擎的差别,以是实际读到内存中的数据类型不确定性很大。以是这个地方非常不发起本身强转处置处罚。
我们在读取之前不清晰它的类型,但是在实际读取时,udaf是可以或许知道读取的具体类型的。它会上报给init。我们可以在init的方法中拿到。
  1. public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
  2.   inputOI0 = (PrimitiveObjectInspector) parameters[0];
  3. }
复制代码

init方法的第一个参数是阶段的意思。
m==Mode.PARTIAL1||Mode.COMPLETE 体现当前阶段是map
m==Mode.PARTIAL2 体现当前阶段是combiner
m==FINAL 体现当前阶段是reduce
现在处置处罚的map阶段。以是我们取m==Mode.PARTIAL1
然后ObjectInspector[] parameters就是udaf给我们传进来的类型信息,这个类型信息是以类型描述器传进来的。


下面介绍一下类型描述器

主要使用到的是Inspector的各种子类,第一大子类就是ObjectInspector,这个子类也是接口,我们用不到。我们用到的是ObjectInspector 的各种子类。
ObjectInspector有五个子类
PRIMITIVE,
LIST,
MAP,
STRUCT,
UNION;
以我的履历,没用到第五个。

PrimitiveObjectInspector描述器,可以用于描述各种根本类型。


我们average传入的数字,以是,在PARTIAL1阶段,传进来的ObjectInspector一定是PrimitiveObjectInspector。以是我们可以强转成PrimitiveObjectInspector。作为全局变量保存。
然后下一步,在iterate中,拿到这个数字的类型描述器。现在我们必要通过这个描述器将数字变成java的类型。可以使用工具类PrimitiveObjectInspectorUtils。
  1. long num = PrimitiveObjectInspectorUtils.getLong(parameters[0], inputOI0);
复制代码



map输出的处置处罚

map阶段的输出其实就是两个变量,一个是数字个数,一个是数字的总和。
起首在init中,一定要定义输出的类型描述器。
我们在init中return的东西,是类型描述器。我们的变量是long类型,那么我们的输出就是long的对应的类型描述器,也就是writableLongObjectInspector。
两个变量都是long类型。但是必要注意的是,我们输出的只能是一个个体,不能是两个。以是这两个变量不能作为两个long存在,可以作为list,可以作为map。也可以作为struct。struct就是结构体的意思,是类型描述器的一种。

如果是以list输出
那么
  1. return ObjectInspectorUtils.getStandardObjectInspector(ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableLongObjectInspector));
复制代码


如果以map输出
那么

  1. return ObjectInspectorUtils.getStandardObjectInspector(ObjectInspectorFactory.getStandardMapObjectInspector(PrimitiveObjectInspectorFactory.writableLongObjectInspector,PrimitiveObjectInspectorFactory.writableLongObjectInspector));
复制代码


如果以struct输出
  1. //先建一个描述器的list,里面放两个long描述器
  2. ArrayList<ObjectInspector> foi = new ArrayList<ObjectInspector>();
  3. foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector);
  4. foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector);
  5. //再建一个string list,里面放两个名称,一一对应上面描述器的名称
  6. ArrayList<String> fname = new ArrayList<String>();
  7. fname.add("num");
  8. fname.add("sum");
  9. //然后利用工具类返回StructObjectInspector
  10. return ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi);
复制代码
使用这个工具类我们创建struct比力轻易。但是其实天生的东西比力复杂。
struct是以块的方式存放数据。上面的数据被分成两个数据块,第一个是num块,第二个是sum块。num和sum是块的名字,我们可以通过名字取出各个块。
每个数据块又包含两部分数据,一个是数据的具体值,一个是这个数据的类型描述器。


init函数只是告知map阶段输出的类型,
map阶段的实际输出在terminatePartial中进行。
此方法返回object 。
return的数据就是输出的数据。

如果上面init写的类型是list,那么这里就要return list,注意,list内里的格式不是long,而是LongWritable
  1. ArrayList<LongWritable> write_list=new ArrayList<LongWritable>() ;
  2. write_list.add(new LongWritable(num));
  3. write_list.add(new LongWritable(sum));
复制代码

如果上面init写的类型是map,return 也是map
  1. HashMap<LongWritable,LongWritable> write_map=new HashMap<>() ;
  2. HashMap.put(new LongWritable(num),new LongWritable(sum));
复制代码

如果上面init写的类型是struct,return也是struct ,这里可以返回一个LongWritable数组,如果struct内里类型不一致的话,就返回object数组
  1. Object[] longresult = new Object[2];
  2. longresult[0] = new LongWritable(num);
  3. longresult[1] = new LongWritable(sum);
  4. return longresult;
复制代码



combiner输入的处置处罚

combiner的输入是map的输出。
我们已经知道明白的类型了,以是可以在init中处置处罚,也可以直接在merge中强转。
我选择在merge中强转。merge函数的第二个参数,就是传进来的数据。
  1. public abstract void merge(AggregationBuffer agg, Object partial) throws HiveException;
复制代码

如果上面传的是list,先创建一个list描述器,然后使用描述器将list获取出来。
  1. StandardListObjectInspector listinspect=(StandardListObjectInspector)ObjectInspectorUtils.getStandardObjectInspector(ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableLongObjectInspector));
  2. ArrayList<LongWritable> list =(ArrayList<LongWritable>) listinspect.getList(partial);
复制代码
如果上面传的是map,先创建一个list描述器,然后使用描述器将map获取出来。
  1. StandardMapObjectInspector mapinspect= (StandardMapObjectInspector)ObjectInspectorUtils.getStandardObjectInspector(ObjectInspectorFactory.getStandardMapObjectInspector(PrimitiveObjectInspectorFactory.writableLongObjectInspector,PrimitiveObjectInspectorFactory.writableLongObjectInspector));
  2. HashMap<LongWritable,LongWritable> map= (HashMap<LongWritable,LongWritable>)mapinspect.getMap(partial);
复制代码
如果上面传的是struct,先创建一个struct描述器,然后使用描述器将struct获取出来。
这个struct描述器本身创建比力贫困,以是我在init函数中获取
起首定义一个全局变量StructObjectInspector soi;
然后在init中
  1. if (mode == Mode.PARTIAL2 || mode == Mode.FINAL) {
  2.     soi = (StructObjectInspector) parameters[0];
  3. }
复制代码
然后在merge中
  1. //先用名称拿出strct中的想要处理的数据块num_field
  2. StructField num_field = soi.getStructFieldRef("num");
  3. //然后用数据块num_field拿到具体数据值,和这个数据的描述器
  4. Object num_obj = soi.getStructFieldData(partial, num_field);
  5. PrimitiveObjectInspector num_Inspector = (PrimitiveObjectInspector)num_field.getFieldObjectInspector();
  6. //最后用这个数据的描述器和这个具体数据值,转换成相应的java数据类型。
  7. long num = PrimitiveObjectInspectorUtils.getLong(num_obj, num_Inspector);
复制代码



reduce输入的处置处罚

reduce的输入处置处罚和combiner完全一致



combiner输出的处置处罚

combiner的输出与map的输出完全一致。



reduce输出的处置处罚

在init定义输出的类型描述器,writableDouble
  1. if (mode == Mode.FINAL) {
  2.     return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
  3. }
复制代码
然后实际输出在terminate函数中,输出DoubleWritable。
return new DoubleWritable(sum/num);



部分java,hive,描述器类型对应关系参考


java类型
hive输入
hive输出
描述器
int
PrimitiveObjectInspectorUtils.getInt()
IntWritable
PrimitiveObjectInspector
long
PrimitiveObjectInspectorUtils.getLong()
LongWritable
PrimitiveObjectInspector
double
PrimitiveObjectInspectorUtils.getDouble()
DoubleWritable
PrimitiveObjectInspector
string
PrimitiveObjectInspectorUtils.getString()
Text
PrimitiveObjectInspector
list<Long>
StandardListObjectInspector fieldObjectInspector;
fieldObjectInspector.getList();
ObjectInspectorFactory
.getStandardListObjectInspector
StandardListObjectInspector
map<String,Integer>
StandardMapObjectInspector fieldObjectInspector;
fieldObjectInspector.getMap();
ObjectInspectorFactory
.getStandardMapObjectInspector
StandardMapObjectInspector
int,list<Long> (组合)
soi.getStructFieldRef();
soi.getStructFieldData();
ObjectInspectorFactory.getStandardStructObjectInspector
StructObjectInspector



其他问题:


如果class AggregateAgg implements GenericUDAFEvaluator.AggregationBuffer {}
中的属性不只有根本数据类型。比如包含字符串,list,map等。
那么spark大概hive在执行的时间,如果启用map端聚合。在分配内存的时间不知道分配多少。会报错。
解决方式是在使用的时间配置参数set hive.map.aggr = false;
大概在这个类上加注解@GenericUDAFEvaluator.AggregationType(estimable = true)
效果都是不启用map端聚合。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

冬雨财经

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表