当碰到一些复杂特殊的盘算场景时,只通过pyspark的内置函数无法达到我们想要实现的效果,此时,可通过自界说函数然后注册为UDF函数,就能够很好的解决复杂盘算场景标题,且盘算效率非常快速。
- # 配置spark接口
- import os
- import findspark
- from pyspark import SparkConf
- from pyspark.sql import SparkSession
- os.environ["JAVA_HOME"] = "/usr/local/jdk1.8.0_192"
- findspark.init("/usr/local/hadoop/spark-2.4.4-bin-hadoop2.6/")
- # 设置内存大小
- conf = SparkConf()
- # conf.set('fs.defaultFS','hdfs://dmns1') # 指定文件路径
- conf.set("spark.driver.memory", "4g")
- conf.set("spark.executor.memory", "4g")
- conf.set("spark.port.maxRetries", "64") # 设置可以绑定的最大端口数
- spark = SparkSession.builder.appName("udf_spark").master("local[*]").enableHiveSupport().config(conf=conf).getOrCreate()
- spark.sparkContext.setLogLevel('ERROR')
复制代码- # 导入数据类型
- from pyspark.sql.types import IntegerType,DoubleType,StringType
- # 实例化自定义类
- ov = OfferValue()
- # 自定义函数 计算复杂场景
- def my_function(offer):
- offer_list = offer.split(',')
- offer_value = ov.get_value(offer_list)
- return offer_value
- # 注册为udf函数 命名为my_function
- spark.udf.register('my_function',my_function, DoubleType())
- # sqlAPI调用udf函数
- spark.sql("create table database.table1 stored as parquet as \
- select my_function(offer_list) as avg_value \
- from database.table2")
- # 关闭spark接口
- spark.stop()
- print(f"计算完成")
复制代码 盘算5000多万数据,仅需一分钟不到,效率非常高。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |