Spark/pyspark/python

海哥  金牌会员 | 2024-11-20 19:22:28 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 881|帖子 881|积分 2643





Spark

焦点组件:
Spark core(焦点,提供了整个框架的基本功能,以RDD为数据抽象)
Spark sql(基于spark core,为结构化的数据的处置惩罚提供了方便,允许用户以类似sql的方式查询数据)
Spark streaming(基于spark core,专注于数据的流式盘算)
Spark graphx(用于图盘算,提供了图算法和图操作的api)
Spark mllib(呆板学习库,包含了各种常用的呆板学习算法和工具)
部署方式:
local部署(当地部署)(需要运维人员,在一台服务器上部署,一般是用于测试代码)
clustrt部署(集群部署)(需要运维人员,多态服务器上部署,多台服务器资源需要统一管理,利用资源调度服务(standalone,yarn))
云服务:(省事,可以直接利用)(数据存储在当地,云服务毗连当地数据进行盘算,然后将结果存储再当地,只用利用云服务的盘算资源)
Pip list(查看python环境中已经安装的模块)
Conda可以快速在多个假造环境中切换,也可以自己创建假造环境
Conda删除假造环境: conda remove -n 假造环境名 –all
在进行脚本情势开发时利用vim很不方便,可以借助pycharm的远程开发,执行操作服务器,进行代码编写
Pycharm远程毗连:
Standlone集群环境配置:
利用standlone来完成盘算任务:
Yarn模式:
假造机中快丢失题目办理方法:
重新进行hdfs初始化(hdfs namenode -format)
日记服务配置:(查看spark盘算任务流程)
不同部署模式的spark利用:
交互式
脚本式
脚本式可能报错:
没有找到python环境
办理办法一:利用os模块进行配置
办法二:把环境变量复制到bashrc中(由于远程读取不到/etc/profile)
提升速度:将/spark/jars包中的数据复制到hdfs分布式存储
Spark盘算程序的运行说明:
Pyspark指令参数:
Pyspark --master yarn –name yarn_demo(指定进程名称)
Pyspark –master yarn –driver-memory 2G(指定driver内存)(指的是最多可以利用2G,不是不停都利用2G)
–num-executors 3指定execotr数量(只能在yarn模式利用,默认两个)
–executor-cores 2指定execotr焦点数量
下令行提交文件运行:spark-submit b.py
Cluster模式可能会报错:(只能在yarn模式下利用)
办理办法:spark-submit –master yarn –deploy-mode cluster python文件
集群模式提交后,不会输出信息,由于在node1上提交的代码可能在node3上运行,此时node1无法显示信息
实际开发中也不用print输出,而是将结果保存在hdfs或者其他存储服务器中
spark://node001:7077 的 Spark Master(spark的网站)
driver:管理spark的盘算任务
executor:详细执行盘算任务线程的
rdd

弹性分布式数据集合
Spark中的一种数据类型(另有dataframe和dataset类型)
管理spark类型数据
开发中可以通过类情势自定义数据类型
可以对海量数据根据需求进行分区,每一份数据都会有对应的task线程执行盘算
分布式 利用集群中多台呆板的盘算资源进行盘算
管理spark的内存数据,完成对应的盘算
类似python中的list列表
特性
Rdd只读,不能0被修改,如果要修改只能对原来的rdd进行盘算生成一个新的rdd
支持分区(可以对海量数据根据需求进行分区,每一份数据都会有对应的task线程执行盘算)
Rdd的恒久化
缓存:可以将盘算中的结果缓存起来(存在内存或者当地磁盘),如果后序盘算错误但是从缓存位置重新盘算
容错
Checkpoint(作用跟缓存一样,区别是可以将数据存储在hdfs中)
创建rdd数据:
  将需要盘算的数据转化为rdd的数据,就可以利用spark的内存盘算方法进行分布式盘算
  Rdd数据的转发方法是由saprkcontext提供的,所以需要老师成sparkcontext,
  Sparkcontext称为spark的入口类
Rdd算子:
  主要分为两类:
  Transformation:
  转化算子:对rdd数据进行转化盘算得到新的rdd,定义了一个线程任务
  Action:
  执行算子:触发盘算任务,让盘算任务进行执行,得到结果(触发线程执行的)
高级算子

多个rdd的方法
分区算子(特别算子)mappartition:
一次性读取rdd1中的所有数据,然后通过迭代器进行盘算
缓存是将数据存储在磁盘或者内存上,盘算竣事,缓存自动清空
缓存和checkpoint:
共享变量

Spark在盘算中会有两个进程:一个dirver负责管理盘算任务的 另有一个excutor负责执行task盘算任务的
Driver进程和excutor进程需要进行数据通报 ,此时就需要通过共享变量
Spark有两种共享变量的实现: 广播变量和累加器
广播变量:(将dirver端数据传到excutor端)
节省通讯成本和服务器资源
累加器:广播变量是excutor端每个task各自盘算修改数据
累加器是在dirver端对数据进行盘算修改
依赖:

窄依赖(一对一):每个父rdd的一个partition(分区)最多被一个子rdd的一个pattition(分区利用)
宽依赖(一对多):父rdd的一个partition(分区)会被多个子rdd的partition(分区)利用(分区聚合)
DAG管理依赖关系:
DAG是一中有向无环图,是一个图盘算速算法
管理rdd依赖关系,保证rdd按照依赖关系进行数据的顺序盘算
会根据rdd的依赖关系将盘算过程分为多个盘算步骤,每个盘算步骤称为一个stage
在盘算的rdd依赖关系中,一旦发生了宽依赖,就会进行步骤拆分
日记查看依赖关系和盘算流程
  APP是盘算应用程序
  Job是盘算任务
  Dag管理依赖关系
  Stage盘算步骤的划分
  Task线程 完成该步骤下方法的盘算
Spark的运行流程(内核调度)

Spark执行diver后执行sparkcontext,sparkcontext会调用下面那三个类:
  (末了)Task Scheduler (知道taskset(每个task)后调用Scheduler Backend获取资源并为每个task分配资源,维护task和excuter之间的对应关系,将分配好的task交给Scheduler Backend分发给excutor执行,在运行期间调用Scheduler Backend获取task执行环境)
  (接着)Dag Scheduler (excuter创建后执行代码当执行action算子后会产生一个job,然后提交到dag进行stage划分,当宽依赖发生时拆分新的stage,分析stage里的task,生成新的task描述存进taskset中)
  (起首)Scheduler Backend (哀求resourceManager获取资源,创建excuter并保持通讯)
shuffle(了解)

无论时mpreduce还是spark—shuffle的本质就是通报数据


  • mapreduce中的shuffle作用:将map盘算后的数据通报给reduce利用
  • mapreduce中的shuffle过程:分区,排序,合并(规约)


  • spark中也有shuffle:当执行宽依赖的算子就会进行shuffle,将rdd的数据传给下一个rdd,进行数据互换
spark中shuffle的两个部分:


  • shuffle write写
  • shuffle read读
  • 会进行文件的读写,影响spark的盘算速度(所以尽量能不利用宽依赖就不利用)
spark的shuffle方法类:


  • 是spark封装好的处置惩罚shuffle的方法
  • hashshuffle

    • 根据分区数进行hash取余分组
    • 不同余数的放在一起
    • hash盘算
    • spark1.2版本前主要利用,之后引入了sortshuffle
    • spark2.0之后,删除了hashshuffle
    • 优化的hashshuffle和未优化

  • sortshuffle

    • 排序
    • bypass模式(跳过模式,不再利用排序的方式)版本和平凡模式版本
    • bypass模式版本不会进行排序
    • 根hashshuffle的区别是不会将分好组的文件整理,而是直接将每个分区的不同余数的小文件放在一个大文件中(根据索引辨认不同的余数的数据,然后根据索引进行大文件读取)
    • 平凡模式版本会排序进行shuffle
    • 可以通过配置指定按照那种模式执行

两种都是将相同的key值放在一起处置惩罚
缓存一般都是为了处置惩罚速度不一致题目
spark shuffle配置(了解 )需要颠末多次实验
spark并行度(理解)



  • 资源并行度

    • task在指定任务能利用cpu焦点的数量
    • 多任务,多个进程或多个线程执行任务

      • 两种方式

        • 并行:多个任务同时执行
        • 并发:任务交替执行
        • 和cpu焦点数有关

          • cpu焦点是4核:有两个线程任务,两个线程任务可以 并行执行
          • cpu焦点是4核:有八个线程任务,并发执行



    • spark中cpu焦点数据设置

      • –num-executors=2
      • executor-cores=2
      • 资源并行度 = num-executors*executor-cores=4
      • 可以利用shell执行查看当前服务器的cpu焦点数 cat /proc/cpuinfo
      • excutor是运行在多态服务器上的


  • 数据并行度

    • 是task数量,task由分区数决定
    • 为了保证task能充分利用cpu资源,实现并行盘算,需要计划分区数应该和并行度一致
    • 在实际公司要根据公司资源并行度进行设置分区数

   spark的rdd
  rdd的创建:在创建rdd的同时可以指定分区数,分区数和资源并行度保持一致
  rdd的操作:算子的利用transformation和action
  rdd的高级用法(用于优化代码)共享变量:缓存和checkpoint
  rdd的原理概念:rdd依赖,内核调度,shuffle,并行度
  spark SQL

hive的盘算流程:创建库,表metastore存储元数据(库名,表名,字段,表的路径),将元数据存进mysql中的hive3库


  • 先容:sparksql是apche spark中用于处置惩罚结构化数据(dataframe和datasets的模块)

    • 在saprk1.0版本是引入的saprksql

  • 数据结构分类:

    • 结构数据:表结构数据,由行列组成,并且表述了数据的属性和类型,表信息
    • 半结构化数据:spark中可以通过方法将半结构化数据转为结构化数据

      • 好比:xml,json
      • 描述数据的存储结构,但是无法描述数据的类型

    • 非结构化数据 rdd可以处置惩罚(文本,图片,视频)

  • 特点:

    • 易整合

      • 利用sql配合spark一起利用,封装了不同语言的dsl方法

    • 统一数据访问

      • 利用read方法可以读取hdfs的数据,mysql的数据,不同类型的文件数据(json,csv,orc)
      • 利用write方法可以写入hdfs,mysql不同类型的文件

    • 兼容hive

      • 利用hivesql方法

    • 尺度的数据毗连

      • 利用jdbc和odbc毗连方式毗连sparksql

    • 过程

      • 先从mysql中查询元数据再去hdfs中读取数据文件


  • 是spark独立开发的工具
  • 对spark兼容性更好,优化性能得到提升
  • sparksql的本质是将sql语句转化为rdd执行,catalyst引擎负责将sql转化为rdd
  • sparksql可以毗连利用hive的metastore服务,管理表的元数据
  • shark

    • 在spark1.0之前,也就是sparksql没有之前,用来利用sql操作spark数据的
    • 将hive的sql语句转化为rdd,spark去执行,然后在hdfs中创建库表目次
    • 运行模式是hive on spark
    • 会将hivesql转化为spark的rdd
    • shark是基于hive开发的,维护麻烦,2015年停止维护

  • 数据类型
   数据模型spark封装一个底子数据模型(数据类型)rdd
  然后根据rdd进行再次封装变成新的数据类型dataFrame
  然后根据dataframe再次封装得到了dataset类型
  

  • rdd 弹性分布式集合 利用python,java,scala,R
   [1,zhangsan,19,2,lisi,20] #列表的情势
  

  • dataframe类型 结构化数据 行列 表信息(数据的属性(字段)和类型) 利用python,java,scala,R

    • row类 行数据 rdd中一个列表元素

  1. from pyspark.sql import Row
  2. # 使用row类型生成一行数据
  3. r1 = Row(1,'wuhao',20)
  4. # 从行中可以通过下标取值
  5. print(r1[1])
  6. # 使用row类型生成数据的同时指定字段名
  7. r2 = Row(id=2,name='dashuaige',age=20)
  8. # 从行中通过字段名取值
  9. print(r2['name'])
复制代码


  • schema类 表信息
    1. from pyspark.sql.types import *
    2. # 定义schema信息
    3. # 使用structType类进行定义
    4. # add是指定字段信息
    5. # 第一个参数,字段名
    6. # 第二个参数,字段信息
    7. # 第三个参数,是否允许为空值,默认是True
    8. schema_type = StructType.add('id',IntegerType(),False).add('name',StringType()).add('age',IntegerType())
    复制代码
  1. [
  2.     [1,zhangsan,19],
  3.     [2,lisi,20]   
  4. ]
  5. schema
  6. {
  7.     id:int,
  8.     name:string,
  9.     age:int
  10. }
复制代码


  • datasets类型 结构化数据 利用java,scala

    • row类 一行数据 一个dataframe
    • schema类 表信息
    • 从datasets中取出的一行数据可以当作dataframe类型操作

  dataframe

详解



  • 结构化数据 行列 表信息(数据的属性(字段)和类型) 利用python,java,scala,R
   

  • row类 行数据 rdd中一个列表元素
    1. from pyspark.sql import Row
    2. # 使用row类型生成一行数据
    3. r1 = Row(1,'wuhao',20)
    4. # 从行中可以通过下标取值
    5. print(r1[1])
    6. # 使用row类型生成数据的同时指定字段名
    7. r2 = Row(id=2,name='dashuaige',age=20)
    8. # 从行中通过字段名取值
    9. print(r2['name'])
    复制代码
  • schema类 表信息
    1. from pyspark.sql.types import *
    2. # 定义schema信息
    3. # 使用structType类进行定义
    4. # add是指定字段信息
    5. # 第一个参数,字段名
    6. # 第二个参数,字段信息
    7. # 第三个参数,是否允许为空值,默认是True
    8. schema_type = StructType.add('id',IntegerType(),False).add('name',StringType()).add('age',IntegerType())
    复制代码
  1. [
  2.     [1,zhangsan,19],
  3.     [2,lisi,20]   
  4. ]
  5. schema
  6. {
  7.     id:int,
  8.     name:string,
  9.     age:int
  10. }
复制代码
创建

   创建dataframe数据
  需要利用一个sparksession的类创建
  sparksession类是在sparkcontext的底子上进行了封装,也就是sparksession类中包含了sparkcontext
  基本创建
  1. # 导入SparkSession和Row
  2. from pyspark.sql import SparkSession,Row
  3. # 导入定义字段的类型
  4. from pyspark.sql.types import *
  5. # dataframe的创建方法是由sparkSession提供的,需要生成sparksession对象
  6. # 是一个固定写法
  7. ss = SparkSession.builder.getOrCreate()
  8. # 定义每行数据
  9. r1 = Row(id=1,name='张三',age=20)
  10. r2 = Row(id=2,name='李四',age=19)
  11. # 定义schema信息
  12. schema_type = StructType().add('id',IntegerType).add('name',StringType).add('age',IntegerType)
  13. # 创建dataframe数据
  14. # 使用sparksession对象中的createDataFrame
  15. # 第一个参数,指定行数据,将每行数据放入列表中
  16. # 第二个参数,指定表信息
  17. df = ss.createDataFrame([r1,r2],schema_type)
  18. # 查看df的表数据
  19. # 使用dataframe下的show方法
  20. df.show()
  21. # 查看schema信息
  22. df.printSchema()
复制代码
rdd的二维数据转化为dataframe


  • rdd.toDF()
  1. from pyspark.sql import SparkSession
  2. from pyspark.sql.types import *
  3. # 生成sparkSession对象
  4. ss = SparkSession.builder.getOrCreate()
  5. # sparkSession中可以通过sparkcontext获取sparkcontext对象
  6. sc = ss.sparkContext
  7. # 生成rdd
  8. rdd = sc.parallrlize([[1,'张三',19],[2,'李四',20]])
  9. # 定义schema信息
  10. schema_type = StructType().add('id',IntegerType).add('name',StringType).add('age',IntegerType)
  11. # 将二维的rdd数据转化为dataframe数据
  12. df = rdd.toDF(schema_type)
  13. # 查看dataframe数据
  14. df.show()
复制代码
基本利用


  • dsl方法 dataframe方法
   spark提供的DSL方法和sql的关键词一样,利用方式与sql基本类似,在进行数据处置惩罚时,要按照SQL的的执行顺序去思索怎样处置惩罚数据
  from join 知道数据在哪 df本身就是要处置惩罚的数据 df.join(df2)
  where 过滤需要处置惩罚的数据 df.join(df2).where()
  group by 聚合 数据的盘算 df.join(df2).where().groupby().sum()
  having 盘算后的数据进行过滤 df.join(df2).where().groupby().sum().where()
  select 展示数据的字段 df.join(df2).where().groupby().sum().where().select()
  order by 展示数据的排序 df.join(df2).where().groupby().sum().where().select().orderBy()
  limit 展示数据的数量 df.join(df2).where().groupby().sum().where().select().orderBy().limit()
  
  DSL方法之后完成后回得到一个处置惩罚后的新的df
  1. # 先将rdd转化为二维的形式
  2. # 再转化为dataframe
  3. rdd = rdd.map(lambda x:[int(x.split(',')[0]),x.split(',')[1],int(x.split()[2])])
  4. # 定义schema信息
  5. schema_type = StructType().add('id',IntegerType).add('name',StringType).add('age',IntegerType)
  6. df = rdd.toDF(schema_type)
  7. # df数据进行操作
  8. # 会返回一个新的df
  9. # 指定查询的字段数据
  10. select_df1 = df.select('name')      #select name from df;
  11. select_df2 = df.select('name','age')  #select name,age from df;
  12. select_df3 = df.select(df['name'],df['age'])    #select name,age from df;
  13. # where的条件过滤  返回一个新的df
  14. # 一个条件  年龄大于20的数据
  15. where_df1 = df.where('age>20')   #select * from df where age>20;
  16. # 多个条件的与或非
  17. where_df2 = df.where('id>10 and name like "王%"')   #select * from df where id>10 and name like "王%";
  18. # group by分组一般结合聚合(sum,avg,min,max)操作
  19. # df.groupby()和df.groupBy是一样的逻辑
  20. groupby_df1 = df.groupby('gender').sum('age')     #select gender,sum(age) from df group by(gender);
  21. # 多个字段分组
  22. groupby_df2 = df.groupby('gender','age') #select * from df group by(gender,age)
  23. # 分组后的数据过滤  sql中的having
  24. where_groupby_df1 = df.groupby('gender').sum('age').where('sum(age)>80')   #select gender,sum(age) from df group by gender  having sum(age)>80;
  25. # orderby排序
  26. # 底层调用的是sort,默认是升序
  27. orderby_df1 = df.orderBy('age') # select * from df order by age;
  28. orderby_df2 = df.orderBy('age',ascending=False) # select * from df order by age desc;
  29. orderby_df3 = df.orderBy(['age','id'],ascending=False) #多字段排序 select * from df order by age,id desc;
  30. # limit指定返回的数据数量(spark中不能指定分页)
  31. limit_df = df.limit(5) # select * from df limit 5;
  32. # 将多个方法放在一起使用,主义执行顺序
  33. all_df1 = df.where('age>20').groupby('gender').sum('age').where('sum(age)>30').orderBy('sum(age)')
复制代码

  • sql语句
   利用sparksession提供的sql方法,编写sql语句执行
  留意这里只能写hivesql,与mysql内里有些语法是不同的
  1. # 使用sql语句去操作spark数据
  2. # 1.先将df数据指定为一个表
  3. df.createOrReplaceTempView('stu')
  4. # 使用sparksession提供的sql方法
  5. sqldf_1 = ss.sql('select * form stu')
  6. # 展示数据
  7. print('----------------sqldf--------------------')
  8. print(sqldf_1.show())
复制代码
高级的DSL方法



  • join关联

    • 左关联
    • 右关联

  1. from pyspark.sql import SparkSession
  2. from pyspark.sql.types import *
  3. # 生成sparkSession对象
  4. ss = SparkSession.builder.getOrCreate()
  5. # sparkSession中可以通过sparkcontext获取sparkcontext对象
  6. sc = ss.sparkContext
  7. # 生成rdd
  8. rdd1 = sc.parallrlize([[1,'张三',19],[2,'李四',20]])
  9. rdd2 = sc.parallrlize([[1,'吴昊',19],[2,'王五',20]])
  10. # 定义schema信息
  11. schema_type = StructType().add('id',IntegerType).add('name',StringType).add('age',IntegerType)
  12. # 将二维的rdd数据转化为dataframe数据(必须是二维嵌套)
  13. df1 = rdd1.toDF(schema_type)
  14. df2 = rdd2.toDF(schema_type)
  15. # join的关联
  16. # 内关联返回一个新的rdd
  17. # 第一个参数:关联的df
  18. # 第二个字段关联的字段
  19. # 第三个参数,关联方式(默认不写是内关联)
  20. join_df1 = df1.join(df2,'id')
  21. # 左关联
  22. join_df1 = df1.join(df2,'id','left')
  23. # 右关联
  24. join_df1 = df1.join(df2,'id','right')
  25. # 查看dataframe数据
  26. df1.show()
  27. df2.show()
复制代码


  • 缓存和checkpoint

    • 相同点和不同点和rdd中的一样
    • 如果既进行了缓存又进行了checkpoint遇到盘算错误会先去缓存中寻找

  1. from pyspark.sql import SparkSession
  2. from pyspark.sql.types import *
  3. # 生成sparkSession对象
  4. ss = SparkSession.builder.getOrCreate()
  5. # sparkSession中可以通过sparkcontext获取sparkcontext对象
  6. sc = ss.sparkContext
  7. # 设置checkpoint存储位置
  8. sc.setCheckpointDir('hdfs:///spark_checkpoint')
  9. # 生成rdd
  10. rdd = sc.parallrlize([[1,'张三',19],[2,'李四',20]])
  11. # 定义schema信息
  12. schema_type = StructType().add('id',IntegerType).add('name',StringType).add('age',IntegerType)
  13. # 将二维的rdd数据转化为dataframe数据(必须是二维嵌套)
  14. df = rdd.toDF(schema_type)
  15. # df进行缓存
  16. df.persist()
  17. # df进行checkpoint
  18. df.checkpoint()
  19. new_df = df.where('id>10') # 再进行df计算时,如果计算错误,就直接读取df的缓存数据或者checkpoint再进行计算
  20. # 查看dataframe数据
  21. new_df.show()
复制代码
内置函数

   利用spark中的内置函数完成对字段数据的操作,和hive中的内置函数操作基本一样
  

  • 字符串
  1. # 生成rdd
  2. rdd = sc.parallrlize([
  3.     [1,'张三',19,'1989-5-19'],
  4.     [2,'李四',20,'2003-12-04']
  5. ])
  6. # 定义schema信息
  7. schema_type = StructType().add('id',IntegerType).\
  8.     add('name',StringType).\
  9.     add('age',IntegerType).\
  10.     add('data',StringType).\
  11.     add('unix_t',StringType)
  12. # 将二维的rdd数据转化为dataframe数据(必须是二维嵌套)
  13. df = rdd.toDF(schema_type)
  14. df.checkpoint()
  15. # 内置函数使用
  16. # 字符串操作
  17. # 字符串拼接
  18. new_df1 = df.select(f.concat('name','age'))   #select concat(name,age) from df;
  19. # 指定拼接字符
  20. new_df2 = df.select(f.concat_ws(',','name','age'))   #select concat_ws(',',name,age) from df;
  21. # 字符串截取
  22. # 第一个参数是要截取的字段,第二个是从第几个字符开始截取,第三个是要截取多少个字符
  23. new_df3 = df.select(f.substring('name',1,5))   #select substring(name,1,5) from df;
  24. # 字符串切割
  25. # 第一个参数是要分割的字段,第二个是按什么分割
  26. # 分割完成后每条数据会变成一个列表
  27. new_df4 = df.select(f.split('data','-'))    #select split(data,'-') from df;
  28. # 对分割后的数据可以进行下标取值
  29. new_df44 = df.select(f.split('data','-')[0])
  30. # 展示结果
  31. new_df1.show()
  32. print('-------------指定拼接分隔符---------------')
  33. new_df2.show()
  34. print('-------------字符串截取---------------')
  35. new_df3.show()
  36. print('-------------字符串分割---------------')
  37. new_df4.show()
  38. print('-------------字符串分割和下标取值---------------')
  39. new_df44.show()
复制代码


  • 时间
  1. # 生成rdd
  2. rdd = sc.parallrlize([
  3.     [1,'张三',19,'1989-5-19'],
  4.     [2,'李四',20,'2003-12-04']
  5. ])
  6. # 定义schema信息
  7. schema_type = StructType().add('id',IntegerType).\
  8.     add('name',StringType).\
  9.     add('age',IntegerType).\
  10.     add('data',StringType).\
  11.     add('unix_t',StringType)
  12. # 将二维的rdd数据转化为dataframe数据(必须是二维嵌套)
  13. df = rdd.toDF(schema_type)
  14. df.checkpoint()
  15. # 获取当前日期
  16. new_df5 = df.select(f.current_date())
  17. # 获取当前日期和时间信息
  18. new_df55 = df.select(f.current_timestamp())
  19. # 获取unix时间
  20. new_df555 = df.select(f.unix_timestamp())
  21. # 将unix时间转化为标准时间,可以通过format=修改默认标准时间格式
  22. new_df5555 = df.select(f.from_unixtime('unix_t'))
  23. # 时间加减,单位为天(默认加一天,将数据改成负数就是减)
  24. new_df55555 = df.select(f.date_add('data',1))
  25. print('-------------获取当前日期---------------')
  26. new_df5.show()
  27. print('-------------获取当前日期和时间---------------')
  28. new_df55.show()
  29. print('-------------获取unix时间---------------')
  30. new_df555.show()
  31. print('-------------将unix时间转化为标准时间---------------')
  32. new_df5555.show()
  33. print('-------------时间加减---------------')
  34. new_df55555.show()
复制代码


  • 聚合

    • 内置函数中有聚合操作方法 .sum() .avg() .max() .min()
    • 需要配合df中的agg方法一起利用

  1. # 聚合(内置函数中的聚合方法先分组再聚合)
  2. # 配合agg方法(如果不写agg就只能单聚合)
  3. new_df6 = df.groupby('gender').agg(f.sum('age'))   #select gender,sum(age) from df order by gender;
  4. # 配合agg方法进行多个聚合
  5. new_df66 = df.groupby('gender').agg(f.sum('age'),f.avg('age'),f.max('age'))
  6. # 多个聚合保留两位小数
  7. #select gender,round(sum(age),2) from df order by gender;
  8. new_df666 = df.groupby('gender').agg(f.round(f.sum('age'),2),f.round(f.avg('age'),2),f.round(f.max('age'),2))
复制代码


  • 别名
  1. # 对列起别名
  2. new_df6666 = df.groupby('gender').agg(f.round(f.sum('age'),2).alias('sum_age'),f.round(f.avg('age'),2).alias('avg_age'),f.round(f.max('age'),2).alias('max_age'))
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

海哥

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表