ToB企服应用市场:ToB评测及商务社交产业平台

标题: Spark/pyspark/python [打印本页]

作者: 海哥    时间: 2024-11-20 19:22
标题: Spark/pyspark/python




Spark

焦点组件:
Spark core(焦点,提供了整个框架的基本功能,以RDD为数据抽象)
Spark sql(基于spark core,为结构化的数据的处置惩罚提供了方便,允许用户以类似sql的方式查询数据)
Spark streaming(基于spark core,专注于数据的流式盘算)
Spark graphx(用于图盘算,提供了图算法和图操作的api)
Spark mllib(呆板学习库,包含了各种常用的呆板学习算法和工具)
部署方式:
local部署(当地部署)(需要运维人员,在一台服务器上部署,一般是用于测试代码)
clustrt部署(集群部署)(需要运维人员,多态服务器上部署,多台服务器资源需要统一管理,利用资源调度服务(standalone,yarn))
云服务:(省事,可以直接利用)(数据存储在当地,云服务毗连当地数据进行盘算,然后将结果存储再当地,只用利用云服务的盘算资源)
Pip list(查看python环境中已经安装的模块)
Conda可以快速在多个假造环境中切换,也可以自己创建假造环境
Conda删除假造环境: conda remove -n 假造环境名 –all
在进行脚本情势开发时利用vim很不方便,可以借助pycharm的远程开发,执行操作服务器,进行代码编写
Pycharm远程毗连:
Standlone集群环境配置:
利用standlone来完成盘算任务:
Yarn模式:
假造机中快丢失题目办理方法:
重新进行hdfs初始化(hdfs namenode -format)
日记服务配置:(查看spark盘算任务流程)
不同部署模式的spark利用:
交互式
脚本式
脚本式可能报错:
没有找到python环境
办理办法一:利用os模块进行配置
办法二:把环境变量复制到bashrc中(由于远程读取不到/etc/profile)
提升速度:将/spark/jars包中的数据复制到hdfs分布式存储
Spark盘算程序的运行说明:
Pyspark指令参数:
Pyspark --master yarn –name yarn_demo(指定进程名称)
Pyspark –master yarn –driver-memory 2G(指定driver内存)(指的是最多可以利用2G,不是不停都利用2G)
–num-executors 3指定execotr数量(只能在yarn模式利用,默认两个)
–executor-cores 2指定execotr焦点数量
下令行提交文件运行:spark-submit b.py
Cluster模式可能会报错:(只能在yarn模式下利用)
办理办法:spark-submit –master yarn –deploy-mode cluster python文件
集群模式提交后,不会输出信息,由于在node1上提交的代码可能在node3上运行,此时node1无法显示信息
实际开发中也不用print输出,而是将结果保存在hdfs或者其他存储服务器中
spark://node001:7077 的 Spark Master(spark的网站)
driver:管理spark的盘算任务
executor:详细执行盘算任务线程的
rdd

弹性分布式数据集合
Spark中的一种数据类型(另有dataframe和dataset类型)
管理spark类型数据
开发中可以通过类情势自定义数据类型
可以对海量数据根据需求进行分区,每一份数据都会有对应的task线程执行盘算
分布式 利用集群中多台呆板的盘算资源进行盘算
管理spark的内存数据,完成对应的盘算
类似python中的list列表
特性
Rdd只读,不能0被修改,如果要修改只能对原来的rdd进行盘算生成一个新的rdd
支持分区(可以对海量数据根据需求进行分区,每一份数据都会有对应的task线程执行盘算)
Rdd的恒久化
缓存:可以将盘算中的结果缓存起来(存在内存或者当地磁盘),如果后序盘算错误但是从缓存位置重新盘算
容错
Checkpoint(作用跟缓存一样,区别是可以将数据存储在hdfs中)
创建rdd数据:
  将需要盘算的数据转化为rdd的数据,就可以利用spark的内存盘算方法进行分布式盘算
  Rdd数据的转发方法是由saprkcontext提供的,所以需要老师成sparkcontext,
  Sparkcontext称为spark的入口类
Rdd算子:
  主要分为两类:
  Transformation:
  转化算子:对rdd数据进行转化盘算得到新的rdd,定义了一个线程任务
  Action:
  执行算子:触发盘算任务,让盘算任务进行执行,得到结果(触发线程执行的)
高级算子

多个rdd的方法
分区算子(特别算子)mappartition:
一次性读取rdd1中的所有数据,然后通过迭代器进行盘算
缓存是将数据存储在磁盘或者内存上,盘算竣事,缓存自动清空
缓存和checkpoint:
共享变量

Spark在盘算中会有两个进程:一个dirver负责管理盘算任务的 另有一个excutor负责执行task盘算任务的
Driver进程和excutor进程需要进行数据通报 ,此时就需要通过共享变量
Spark有两种共享变量的实现: 广播变量和累加器
广播变量:(将dirver端数据传到excutor端)
节省通讯成本和服务器资源
累加器:广播变量是excutor端每个task各自盘算修改数据
累加器是在dirver端对数据进行盘算修改
依赖:

窄依赖(一对一):每个父rdd的一个partition(分区)最多被一个子rdd的一个pattition(分区利用)
宽依赖(一对多):父rdd的一个partition(分区)会被多个子rdd的partition(分区)利用(分区聚合)
DAG管理依赖关系:
DAG是一中有向无环图,是一个图盘算速算法
管理rdd依赖关系,保证rdd按照依赖关系进行数据的顺序盘算
会根据rdd的依赖关系将盘算过程分为多个盘算步骤,每个盘算步骤称为一个stage
在盘算的rdd依赖关系中,一旦发生了宽依赖,就会进行步骤拆分
日记查看依赖关系和盘算流程
  APP是盘算应用程序
  Job是盘算任务
  Dag管理依赖关系
  Stage盘算步骤的划分
  Task线程 完成该步骤下方法的盘算
Spark的运行流程(内核调度)

Spark执行diver后执行sparkcontext,sparkcontext会调用下面那三个类:
  (末了)Task Scheduler (知道taskset(每个task)后调用Scheduler Backend获取资源并为每个task分配资源,维护task和excuter之间的对应关系,将分配好的task交给Scheduler Backend分发给excutor执行,在运行期间调用Scheduler Backend获取task执行环境)
  (接着)Dag Scheduler (excuter创建后执行代码当执行action算子后会产生一个job,然后提交到dag进行stage划分,当宽依赖发生时拆分新的stage,分析stage里的task,生成新的task描述存进taskset中)
  (起首)Scheduler Backend (哀求resourceManager获取资源,创建excuter并保持通讯)
shuffle(了解)

无论时mpreduce还是spark—shuffle的本质就是通报数据


spark中shuffle的两个部分:

spark的shuffle方法类:

两种都是将相同的key值放在一起处置惩罚
缓存一般都是为了处置惩罚速度不一致题目
spark shuffle配置(了解 )需要颠末多次实验
spark并行度(理解)


   spark的rdd
  rdd的创建:在创建rdd的同时可以指定分区数,分区数和资源并行度保持一致
  rdd的操作:算子的利用transformation和action
  rdd的高级用法(用于优化代码)共享变量:缓存和checkpoint
  rdd的原理概念:rdd依赖,内核调度,shuffle,并行度
  spark SQL

hive的盘算流程:创建库,表metastore存储元数据(库名,表名,字段,表的路径),将元数据存进mysql中的hive3库

   数据模型spark封装一个底子数据模型(数据类型)rdd
  然后根据rdd进行再次封装变成新的数据类型dataFrame
  然后根据dataframe再次封装得到了dataset类型
  
   [1,zhangsan,19,2,lisi,20] #列表的情势
  
  1. from pyspark.sql import Row
  2. # 使用row类型生成一行数据
  3. r1 = Row(1,'wuhao',20)
  4. # 从行中可以通过下标取值
  5. print(r1[1])
  6. # 使用row类型生成数据的同时指定字段名
  7. r2 = Row(id=2,name='dashuaige',age=20)
  8. # 从行中通过字段名取值
  9. print(r2['name'])
复制代码

  1. [
  2.     [1,zhangsan,19],
  3.     [2,lisi,20]   
  4. ]
  5. schema
  6. {
  7.     id:int,
  8.     name:string,
  9.     age:int
  10. }
复制代码

  dataframe

详解


   
  1. [
  2.     [1,zhangsan,19],
  3.     [2,lisi,20]   
  4. ]
  5. schema
  6. {
  7.     id:int,
  8.     name:string,
  9.     age:int
  10. }
复制代码
创建

   创建dataframe数据
  需要利用一个sparksession的类创建
  sparksession类是在sparkcontext的底子上进行了封装,也就是sparksession类中包含了sparkcontext
  基本创建
  1. # 导入SparkSession和Row
  2. from pyspark.sql import SparkSession,Row
  3. # 导入定义字段的类型
  4. from pyspark.sql.types import *
  5. # dataframe的创建方法是由sparkSession提供的,需要生成sparksession对象
  6. # 是一个固定写法
  7. ss = SparkSession.builder.getOrCreate()
  8. # 定义每行数据
  9. r1 = Row(id=1,name='张三',age=20)
  10. r2 = Row(id=2,name='李四',age=19)
  11. # 定义schema信息
  12. schema_type = StructType().add('id',IntegerType).add('name',StringType).add('age',IntegerType)
  13. # 创建dataframe数据
  14. # 使用sparksession对象中的createDataFrame
  15. # 第一个参数,指定行数据,将每行数据放入列表中
  16. # 第二个参数,指定表信息
  17. df = ss.createDataFrame([r1,r2],schema_type)
  18. # 查看df的表数据
  19. # 使用dataframe下的show方法
  20. df.show()
  21. # 查看schema信息
  22. df.printSchema()
复制代码
rdd的二维数据转化为dataframe

  1. from pyspark.sql import SparkSession
  2. from pyspark.sql.types import *
  3. # 生成sparkSession对象
  4. ss = SparkSession.builder.getOrCreate()
  5. # sparkSession中可以通过sparkcontext获取sparkcontext对象
  6. sc = ss.sparkContext
  7. # 生成rdd
  8. rdd = sc.parallrlize([[1,'张三',19],[2,'李四',20]])
  9. # 定义schema信息
  10. schema_type = StructType().add('id',IntegerType).add('name',StringType).add('age',IntegerType)
  11. # 将二维的rdd数据转化为dataframe数据
  12. df = rdd.toDF(schema_type)
  13. # 查看dataframe数据
  14. df.show()
复制代码
基本利用

   spark提供的DSL方法和sql的关键词一样,利用方式与sql基本类似,在进行数据处置惩罚时,要按照SQL的的执行顺序去思索怎样处置惩罚数据
  from join 知道数据在哪 df本身就是要处置惩罚的数据 df.join(df2)
  where 过滤需要处置惩罚的数据 df.join(df2).where()
  group by 聚合 数据的盘算 df.join(df2).where().groupby().sum()
  having 盘算后的数据进行过滤 df.join(df2).where().groupby().sum().where()
  select 展示数据的字段 df.join(df2).where().groupby().sum().where().select()
  order by 展示数据的排序 df.join(df2).where().groupby().sum().where().select().orderBy()
  limit 展示数据的数量 df.join(df2).where().groupby().sum().where().select().orderBy().limit()
  
  DSL方法之后完成后回得到一个处置惩罚后的新的df
  1. # 先将rdd转化为二维的形式
  2. # 再转化为dataframe
  3. rdd = rdd.map(lambda x:[int(x.split(',')[0]),x.split(',')[1],int(x.split()[2])])
  4. # 定义schema信息
  5. schema_type = StructType().add('id',IntegerType).add('name',StringType).add('age',IntegerType)
  6. df = rdd.toDF(schema_type)
  7. # df数据进行操作
  8. # 会返回一个新的df
  9. # 指定查询的字段数据
  10. select_df1 = df.select('name')      #select name from df;
  11. select_df2 = df.select('name','age')  #select name,age from df;
  12. select_df3 = df.select(df['name'],df['age'])    #select name,age from df;
  13. # where的条件过滤  返回一个新的df
  14. # 一个条件  年龄大于20的数据
  15. where_df1 = df.where('age>20')   #select * from df where age>20;
  16. # 多个条件的与或非
  17. where_df2 = df.where('id>10 and name like "王%"')   #select * from df where id>10 and name like "王%";
  18. # group by分组一般结合聚合(sum,avg,min,max)操作
  19. # df.groupby()和df.groupBy是一样的逻辑
  20. groupby_df1 = df.groupby('gender').sum('age')     #select gender,sum(age) from df group by(gender);
  21. # 多个字段分组
  22. groupby_df2 = df.groupby('gender','age') #select * from df group by(gender,age)
  23. # 分组后的数据过滤  sql中的having
  24. where_groupby_df1 = df.groupby('gender').sum('age').where('sum(age)>80')   #select gender,sum(age) from df group by gender  having sum(age)>80;
  25. # orderby排序
  26. # 底层调用的是sort,默认是升序
  27. orderby_df1 = df.orderBy('age') # select * from df order by age;
  28. orderby_df2 = df.orderBy('age',ascending=False) # select * from df order by age desc;
  29. orderby_df3 = df.orderBy(['age','id'],ascending=False) #多字段排序 select * from df order by age,id desc;
  30. # limit指定返回的数据数量(spark中不能指定分页)
  31. limit_df = df.limit(5) # select * from df limit 5;
  32. # 将多个方法放在一起使用,主义执行顺序
  33. all_df1 = df.where('age>20').groupby('gender').sum('age').where('sum(age)>30').orderBy('sum(age)')
复制代码
   利用sparksession提供的sql方法,编写sql语句执行
  留意这里只能写hivesql,与mysql内里有些语法是不同的
  1. # 使用sql语句去操作spark数据
  2. # 1.先将df数据指定为一个表
  3. df.createOrReplaceTempView('stu')
  4. # 使用sparksession提供的sql方法
  5. sqldf_1 = ss.sql('select * form stu')
  6. # 展示数据
  7. print('----------------sqldf--------------------')
  8. print(sqldf_1.show())
复制代码
高级的DSL方法


  1. from pyspark.sql import SparkSession
  2. from pyspark.sql.types import *
  3. # 生成sparkSession对象
  4. ss = SparkSession.builder.getOrCreate()
  5. # sparkSession中可以通过sparkcontext获取sparkcontext对象
  6. sc = ss.sparkContext
  7. # 生成rdd
  8. rdd1 = sc.parallrlize([[1,'张三',19],[2,'李四',20]])
  9. rdd2 = sc.parallrlize([[1,'吴昊',19],[2,'王五',20]])
  10. # 定义schema信息
  11. schema_type = StructType().add('id',IntegerType).add('name',StringType).add('age',IntegerType)
  12. # 将二维的rdd数据转化为dataframe数据(必须是二维嵌套)
  13. df1 = rdd1.toDF(schema_type)
  14. df2 = rdd2.toDF(schema_type)
  15. # join的关联
  16. # 内关联返回一个新的rdd
  17. # 第一个参数:关联的df
  18. # 第二个字段关联的字段
  19. # 第三个参数,关联方式(默认不写是内关联)
  20. join_df1 = df1.join(df2,'id')
  21. # 左关联
  22. join_df1 = df1.join(df2,'id','left')
  23. # 右关联
  24. join_df1 = df1.join(df2,'id','right')
  25. # 查看dataframe数据
  26. df1.show()
  27. df2.show()
复制代码

  1. from pyspark.sql import SparkSession
  2. from pyspark.sql.types import *
  3. # 生成sparkSession对象
  4. ss = SparkSession.builder.getOrCreate()
  5. # sparkSession中可以通过sparkcontext获取sparkcontext对象
  6. sc = ss.sparkContext
  7. # 设置checkpoint存储位置
  8. sc.setCheckpointDir('hdfs:///spark_checkpoint')
  9. # 生成rdd
  10. rdd = sc.parallrlize([[1,'张三',19],[2,'李四',20]])
  11. # 定义schema信息
  12. schema_type = StructType().add('id',IntegerType).add('name',StringType).add('age',IntegerType)
  13. # 将二维的rdd数据转化为dataframe数据(必须是二维嵌套)
  14. df = rdd.toDF(schema_type)
  15. # df进行缓存
  16. df.persist()
  17. # df进行checkpoint
  18. df.checkpoint()
  19. new_df = df.where('id>10') # 再进行df计算时,如果计算错误,就直接读取df的缓存数据或者checkpoint再进行计算
  20. # 查看dataframe数据
  21. new_df.show()
复制代码
内置函数

   利用spark中的内置函数完成对字段数据的操作,和hive中的内置函数操作基本一样
  
  1. # 生成rdd
  2. rdd = sc.parallrlize([
  3.     [1,'张三',19,'1989-5-19'],
  4.     [2,'李四',20,'2003-12-04']
  5. ])
  6. # 定义schema信息
  7. schema_type = StructType().add('id',IntegerType).\
  8.     add('name',StringType).\
  9.     add('age',IntegerType).\
  10.     add('data',StringType).\
  11.     add('unix_t',StringType)
  12. # 将二维的rdd数据转化为dataframe数据(必须是二维嵌套)
  13. df = rdd.toDF(schema_type)
  14. df.checkpoint()
  15. # 内置函数使用
  16. # 字符串操作
  17. # 字符串拼接
  18. new_df1 = df.select(f.concat('name','age'))   #select concat(name,age) from df;
  19. # 指定拼接字符
  20. new_df2 = df.select(f.concat_ws(',','name','age'))   #select concat_ws(',',name,age) from df;
  21. # 字符串截取
  22. # 第一个参数是要截取的字段,第二个是从第几个字符开始截取,第三个是要截取多少个字符
  23. new_df3 = df.select(f.substring('name',1,5))   #select substring(name,1,5) from df;
  24. # 字符串切割
  25. # 第一个参数是要分割的字段,第二个是按什么分割
  26. # 分割完成后每条数据会变成一个列表
  27. new_df4 = df.select(f.split('data','-'))    #select split(data,'-') from df;
  28. # 对分割后的数据可以进行下标取值
  29. new_df44 = df.select(f.split('data','-')[0])
  30. # 展示结果
  31. new_df1.show()
  32. print('-------------指定拼接分隔符---------------')
  33. new_df2.show()
  34. print('-------------字符串截取---------------')
  35. new_df3.show()
  36. print('-------------字符串分割---------------')
  37. new_df4.show()
  38. print('-------------字符串分割和下标取值---------------')
  39. new_df44.show()
复制代码

  1. # 生成rdd
  2. rdd = sc.parallrlize([
  3.     [1,'张三',19,'1989-5-19'],
  4.     [2,'李四',20,'2003-12-04']
  5. ])
  6. # 定义schema信息
  7. schema_type = StructType().add('id',IntegerType).\
  8.     add('name',StringType).\
  9.     add('age',IntegerType).\
  10.     add('data',StringType).\
  11.     add('unix_t',StringType)
  12. # 将二维的rdd数据转化为dataframe数据(必须是二维嵌套)
  13. df = rdd.toDF(schema_type)
  14. df.checkpoint()
  15. # 获取当前日期
  16. new_df5 = df.select(f.current_date())
  17. # 获取当前日期和时间信息
  18. new_df55 = df.select(f.current_timestamp())
  19. # 获取unix时间
  20. new_df555 = df.select(f.unix_timestamp())
  21. # 将unix时间转化为标准时间,可以通过format=修改默认标准时间格式
  22. new_df5555 = df.select(f.from_unixtime('unix_t'))
  23. # 时间加减,单位为天(默认加一天,将数据改成负数就是减)
  24. new_df55555 = df.select(f.date_add('data',1))
  25. print('-------------获取当前日期---------------')
  26. new_df5.show()
  27. print('-------------获取当前日期和时间---------------')
  28. new_df55.show()
  29. print('-------------获取unix时间---------------')
  30. new_df555.show()
  31. print('-------------将unix时间转化为标准时间---------------')
  32. new_df5555.show()
  33. print('-------------时间加减---------------')
  34. new_df55555.show()
复制代码

  1. # 聚合(内置函数中的聚合方法先分组再聚合)
  2. # 配合agg方法(如果不写agg就只能单聚合)
  3. new_df6 = df.groupby('gender').agg(f.sum('age'))   #select gender,sum(age) from df order by gender;
  4. # 配合agg方法进行多个聚合
  5. new_df66 = df.groupby('gender').agg(f.sum('age'),f.avg('age'),f.max('age'))
  6. # 多个聚合保留两位小数
  7. #select gender,round(sum(age),2) from df order by gender;
  8. new_df666 = df.groupby('gender').agg(f.round(f.sum('age'),2),f.round(f.avg('age'),2),f.round(f.max('age'),2))
复制代码

  1. # 对列起别名
  2. new_df6666 = df.groupby('gender').agg(f.round(f.sum('age'),2).alias('sum_age'),f.round(f.avg('age'),2).alias('avg_age'),f.round(f.max('age'),2).alias('max_age'))
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4