冬雨财经 发表于 2024-12-20 09:32:56

hive udaf 输入输出处置处罚参考手册

根本了解hive udaf

本次分享主要是介绍一下我在udaf开发中遇到的难点。可以让大家背面必要开发udaf时做个参考。
我对这篇文档的定位呢,就是如果你必要开发一个udaf函数,然后百度了解一下,查个代码模版。然后参考我的这个文档。能资助解决类型输入输出的处置处罚问题。因为我在开发的时间,网上没搜到类型输入输出的处置处罚的比力详细的内容。一点点摸索,讨教测试才完成的。希望可以帮大家跳过这个过程。
当然,做分享的时间还是简单说下udaf的根本的内容。
hive 自定义udaf函数对于hive 引擎,spark引擎是通用的。
最简单的方式开发hive 自定义udaf函数 可以 extends UDAF ,覆写此中的方法,但是这样写出来的udaf,由于类型转换处置处罚的不好,性能差,查询速度很慢,根本被淘汰了。数据量一大就不可用了。
以是现在自定义udaf根本都是继承AbstractGenericUDAFResolver类大概实现GenericUDAFResolver2接口
本次案例继承AbstractGenericUDAFResolver
然后定义一个内部类继承GenericUDAFEvaluator 并覆写相应方法。


必要我们关注并覆写的方法:



init 初始化函数
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
mode = m;
return null;
}

iterate map 迭代函数

public abstract void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;


terminatePartial 中心输出函数

public abstract Object terminatePartial(AggregationBuffer agg) throws HiveException;


merge 聚合函数
public abstract void merge(AggregationBuffer agg, Object partial) throws HiveException;



terminate 终极输出函数
public abstract Object terminate(AggregationBuffer agg) throws HiveException;

这五个方法要结合下面的阶段来看怎么用


udaf的阶段与方法




https://i-blog.csdnimg.cn/img_convert/c846b407764a70b2e91896b2989a4bba.png

下面以average udaf为例来介绍运行过程。
起首 看下average 背后的map reduce 过程
假设t表有a字段int类型,内里有1-10 十个数字。
执行select average(a) from t;
https://i-blog.csdnimg.cn/direct/95b52168d4484414842a78ae09f9d91d.png

average是有map和reduce的任务。
map过程接收数字,然后保存两个变量,一个是数字的个数,一个是数字的总和。
reduce过程接收各个map效果的两个变量,然后数字的个数相加得到总个数。数字的总和相加得到总和。末了相除输出。
上述过程在udaf中的运算过程如下:

起首是part1 也就是map阶段
先进入init函数。这个函数不做业务逻辑处置处罚,是用于获取输入的数据类型的。数据类型背面详细讲。
然后进入iterate函数,依次读入数据。修改变量值。
然后进入terminatePartial shuffer输出到当地文件

然后是part2 也就是combiner阶段
读当地的shuffer文件,先进init函数。
然后进入merge函数,数字个数相加,总和相加。
然后进入terminatePartial shuffer输出到当地文件

然后进入final阶段 也就是reduce
读取map的效果先进入init函数。
然后进入merge函数,数字个数相加,总和相加。
然后进入terminate函数将终极的均匀数输出。



重点难点

上面主线介绍完了。下面介绍关键点。
上面主线中有两个问题必要解决一下。
第一点是在单个阶段中,必要调用多个方法。必要传递变量值。
变量值怎样保存呢?有两种方法。第一种就是定义全局变量。不管几个方法,我在全局变量中天然都能用。解决了传递问题。
第二种方式是udaf提供了一个中心类。也是udaf保举的方式。我不确定不用可不可以,理论可以。
下面是iterate的方法。第一个参数就是这个中心类。这个参数是必须有的。第二个参数是实际读的数据。
public abstract void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;
这是第一个问题。这个问题比力简单,大不了就用这个中心类就行了,然后在这个中心类中定义一个long型属性和一个double属性用于存放个数和总和变量。就不用全局变量了。

第二个问题就是udaf开发的难点:


类型转换

map阶段读数据是从hadoop上读,读取的是hive类型,而且读到的可能是int,可能是long,假如存放的是long型的数据,由于执行引擎的差别,读进内存时的类型也有所差别。可能是java的long,可能是hive的writablelong,也可能是lazylong。这个地方如果本身强转处置处罚不完全就很轻易报错。

https://i-blog.csdnimg.cn/img_convert/7c54598f43936a793fae65bc38d4d069.png

combiner阶段和reduce阶段的读数据就轻易多了。因为这个数据是我们写进去的。我们写进去是writablelong,那读就是writablelong。具体操纵稍后介绍。
这里明白每个阶段读数据都必要类型转换就可以了。

map阶段,combiner阶段和reduce阶段的输出也有两个问题必要注意。
第一,输出的时间一定不可以输出java类型,一定要输出hive的类型,也就是实现了WritableComparable接口的类型。在计算是我们是用的java类型,输出前要先转换,再return。
第二,输出的时间不只是在terminatePartial大概terminate函数输出了就完事了。不可以。我们还必要特意的告知udaf,我们在每个阶段输出的是什么类型。而告知的方式也不是直接把类型传给udaf,而是要使用输出类型的对应的Inspector。我叫它类型描述器。
以是我们的数据对于java来说如果是long类型,那么我们除了要用long,也要用到LongWritable和PrimitiveObjectInspector。这就是udaf贫困的地方。

接下来的篇幅将侧重过细介绍类型的具体处置处罚。




输入输出类型处置处罚的具体操纵



map输入的处置处罚

average举例。map读取的数据是int大概long大概byte这种数字类型。
从hive读取。由于引擎的差别,以是实际读到内存中的数据类型不确定性很大。以是这个地方非常不发起本身强转处置处罚。
我们在读取之前不清晰它的类型,但是在实际读取时,udaf是可以或许知道读取的具体类型的。它会上报给init。我们可以在init的方法中拿到。
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
inputOI0 = (PrimitiveObjectInspector) parameters;
}
init方法的第一个参数是阶段的意思。
m==Mode.PARTIAL1||Mode.COMPLETE 体现当前阶段是map
m==Mode.PARTIAL2 体现当前阶段是combiner
m==FINAL 体现当前阶段是reduce
现在处置处罚的map阶段。以是我们取m==Mode.PARTIAL1
然后ObjectInspector[] parameters就是udaf给我们传进来的类型信息,这个类型信息是以类型描述器传进来的。


下面介绍一下类型描述器

主要使用到的是Inspector的各种子类,第一大子类就是ObjectInspector,这个子类也是接口,我们用不到。我们用到的是ObjectInspector 的各种子类。
ObjectInspector有五个子类
PRIMITIVE,
LIST,
MAP,
STRUCT,
UNION;
以我的履历,没用到第五个。

PrimitiveObjectInspector描述器,可以用于描述各种根本类型。

https://i-blog.csdnimg.cn/img_convert/5e9eaa94a10dc3ef7569f96f8dd4c720.png
我们average传入的数字,以是,在PARTIAL1阶段,传进来的ObjectInspector一定是PrimitiveObjectInspector。以是我们可以强转成PrimitiveObjectInspector。作为全局变量保存。
然后下一步,在iterate中,拿到这个数字的类型描述器。现在我们必要通过这个描述器将数字变成java的类型。可以使用工具类PrimitiveObjectInspectorUtils。
long num = PrimitiveObjectInspectorUtils.getLong(parameters, inputOI0);


map输出的处置处罚

map阶段的输出其实就是两个变量,一个是数字个数,一个是数字的总和。
起首在init中,一定要定义输出的类型描述器。
我们在init中return的东西,是类型描述器。我们的变量是long类型,那么我们的输出就是long的对应的类型描述器,也就是writableLongObjectInspector。
两个变量都是long类型。但是必要注意的是,我们输出的只能是一个个体,不能是两个。以是这两个变量不能作为两个long存在,可以作为list,可以作为map。也可以作为struct。struct就是结构体的意思,是类型描述器的一种。

如果是以list输出
那么
return ObjectInspectorUtils.getStandardObjectInspector(ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableLongObjectInspector));

如果以map输出
那么

return ObjectInspectorUtils.getStandardObjectInspector(ObjectInspectorFactory.getStandardMapObjectInspector(PrimitiveObjectInspectorFactory.writableLongObjectInspector,PrimitiveObjectInspectorFactory.writableLongObjectInspector));

如果以struct输出
//先建一个描述器的list,里面放两个long描述器
ArrayList<ObjectInspector> foi = new ArrayList<ObjectInspector>();
foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector);
foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector);
//再建一个string list,里面放两个名称,一一对应上面描述器的名称
ArrayList<String> fname = new ArrayList<String>();
fname.add("num");
fname.add("sum");
//然后利用工具类返回StructObjectInspector
return ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi); 使用这个工具类我们创建struct比力轻易。但是其实天生的东西比力复杂。
struct是以块的方式存放数据。上面的数据被分成两个数据块,第一个是num块,第二个是sum块。num和sum是块的名字,我们可以通过名字取出各个块。
每个数据块又包含两部分数据,一个是数据的具体值,一个是这个数据的类型描述器。


init函数只是告知map阶段输出的类型,
map阶段的实际输出在terminatePartial中进行。
此方法返回object 。
return的数据就是输出的数据。

如果上面init写的类型是list,那么这里就要return list,注意,list内里的格式不是long,而是LongWritable
ArrayList<LongWritable> write_list=new ArrayList<LongWritable>() ;
write_list.add(new LongWritable(num));
write_list.add(new LongWritable(sum));
如果上面init写的类型是map,return 也是map
HashMap<LongWritable,LongWritable> write_map=new HashMap<>() ;
HashMap.put(new LongWritable(num),new LongWritable(sum));
如果上面init写的类型是struct,return也是struct ,这里可以返回一个LongWritable数组,如果struct内里类型不一致的话,就返回object数组
Object[] longresult = new Object;
longresult = new LongWritable(num);
longresult = new LongWritable(sum);
return longresult;


combiner输入的处置处罚

combiner的输入是map的输出。
我们已经知道明白的类型了,以是可以在init中处置处罚,也可以直接在merge中强转。
我选择在merge中强转。merge函数的第二个参数,就是传进来的数据。
public abstract void merge(AggregationBuffer agg, Object partial) throws HiveException;
如果上面传的是list,先创建一个list描述器,然后使用描述器将list获取出来。
StandardListObjectInspector listinspect=(StandardListObjectInspector)ObjectInspectorUtils.getStandardObjectInspector(ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableLongObjectInspector));
ArrayList<LongWritable> list =(ArrayList<LongWritable>) listinspect.getList(partial); 如果上面传的是map,先创建一个list描述器,然后使用描述器将map获取出来。
StandardMapObjectInspector mapinspect= (StandardMapObjectInspector)ObjectInspectorUtils.getStandardObjectInspector(ObjectInspectorFactory.getStandardMapObjectInspector(PrimitiveObjectInspectorFactory.writableLongObjectInspector,PrimitiveObjectInspectorFactory.writableLongObjectInspector));
HashMap<LongWritable,LongWritable> map= (HashMap<LongWritable,LongWritable>)mapinspect.getMap(partial); 如果上面传的是struct,先创建一个struct描述器,然后使用描述器将struct获取出来。
这个struct描述器本身创建比力贫困,以是我在init函数中获取
起首定义一个全局变量StructObjectInspector soi;
然后在init中
if (mode == Mode.PARTIAL2 || mode == Mode.FINAL) {
    soi = (StructObjectInspector) parameters;
} 然后在merge中
//先用名称拿出strct中的想要处理的数据块num_field
StructField num_field = soi.getStructFieldRef("num");
//然后用数据块num_field拿到具体数据值,和这个数据的描述器
Object num_obj = soi.getStructFieldData(partial, num_field);
PrimitiveObjectInspector num_Inspector = (PrimitiveObjectInspector)num_field.getFieldObjectInspector();
//最后用这个数据的描述器和这个具体数据值,转换成相应的java数据类型。
long num = PrimitiveObjectInspectorUtils.getLong(num_obj, num_Inspector);


reduce输入的处置处罚

reduce的输入处置处罚和combiner完全一致



combiner输出的处置处罚

combiner的输出与map的输出完全一致。



reduce输出的处置处罚

在init定义输出的类型描述器,writableDouble
if (mode == Mode.FINAL) {
    return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
} 然后实际输出在terminate函数中,输出DoubleWritable。
return new DoubleWritable(sum/num);



部分java,hive,描述器类型对应关系参考


java类型
hive输入
hive输出
描述器
int
PrimitiveObjectInspectorUtils.getInt()
IntWritable
PrimitiveObjectInspector
long
PrimitiveObjectInspectorUtils.getLong()
LongWritable
PrimitiveObjectInspector
double
PrimitiveObjectInspectorUtils.getDouble()
DoubleWritable
PrimitiveObjectInspector
string
PrimitiveObjectInspectorUtils.getString()
Text
PrimitiveObjectInspector
list<Long>
StandardListObjectInspector fieldObjectInspector;
fieldObjectInspector.getList();
ObjectInspectorFactory
.getStandardListObjectInspector
StandardListObjectInspector
map<String,Integer>
StandardMapObjectInspector fieldObjectInspector;
fieldObjectInspector.getMap();
ObjectInspectorFactory
.getStandardMapObjectInspector
StandardMapObjectInspector
int,list<Long> (组合)
soi.getStructFieldRef();
soi.getStructFieldData();
ObjectInspectorFactory.getStandardStructObjectInspector
StructObjectInspector



其他问题:


如果class AggregateAgg implements GenericUDAFEvaluator.AggregationBuffer {}
中的属性不只有根本数据类型。比如包含字符串,list,map等。
那么spark大概hive在执行的时间,如果启用map端聚合。在分配内存的时间不知道分配多少。会报错。
解决方式是在使用的时间配置参数set hive.map.aggr = false;
大概在这个类上加注解@GenericUDAFEvaluator.AggregationType(estimable = true)
效果都是不启用map端聚合。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: hive udaf 输入输出处置处罚参考手册