pyspark自界说UDF函数

打印 上一主题 下一主题

主题 670|帖子 670|积分 2012

当碰到一些复杂特殊的盘算场景时,只通过pyspark的内置函数无法达到我们想要实现的效果,此时,可通过自界说函数然后注册为UDF函数,就能够很好的解决复杂盘算场景标题,且盘算效率非常快速。
  1. # 配置spark接口
  2. import os
  3. import findspark
  4. from pyspark import SparkConf
  5. from pyspark.sql import SparkSession
  6. os.environ["JAVA_HOME"] = "/usr/local/jdk1.8.0_192"
  7. findspark.init("/usr/local/hadoop/spark-2.4.4-bin-hadoop2.6/")
  8. # 设置内存大小
  9. conf = SparkConf()
  10. # conf.set('fs.defaultFS','hdfs://dmns1') # 指定文件路径
  11. conf.set("spark.driver.memory", "4g")
  12. conf.set("spark.executor.memory", "4g")
  13. conf.set("spark.port.maxRetries", "64") # 设置可以绑定的最大端口数
  14. spark = SparkSession.builder.appName("udf_spark").master("local[*]").enableHiveSupport().config(conf=conf).getOrCreate()
  15. spark.sparkContext.setLogLevel('ERROR')
复制代码
  1. # 导入数据类型
  2. from pyspark.sql.types import IntegerType,DoubleType,StringType
  3. # 实例化自定义类
  4. ov = OfferValue()
  5. # 自定义函数 计算复杂场景
  6. def my_function(offer):
  7.     offer_list = offer.split(',')
  8.     offer_value = ov.get_value(offer_list)
  9.     return offer_value
  10. # 注册为udf函数 命名为my_function
  11. spark.udf.register('my_function',my_function, DoubleType())
  12. # sqlAPI调用udf函数
  13. spark.sql("create table database.table1 stored as parquet as \
  14. select my_function(offer_list) as avg_value \
  15. from database.table2")
  16. # 关闭spark接口
  17. spark.stop()
  18. print(f"计算完成")
复制代码
盘算5000多万数据,仅需一分钟不到,效率非常高。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

刘俊凯

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表