Spark
焦点组件:
Spark core(焦点,提供了整个框架的基本功能,以RDD为数据抽象)
Spark sql(基于spark core,为结构化的数据的处置惩罚提供了方便,允许用户以类似sql的方式查询数据)
Spark streaming(基于spark core,专注于数据的流式盘算)
Spark graphx(用于图盘算,提供了图算法和图操作的api)
Spark mllib(呆板学习库,包含了各种常用的呆板学习算法和工具)
部署方式:
local部署(当地部署)(需要运维人员,在一台服务器上部署,一般是用于测试代码)
clustrt部署(集群部署)(需要运维人员,多态服务器上部署,多台服务器资源需要统一管理,利用资源调度服务(standalone,yarn))
云服务:(省事,可以直接利用)(数据存储在当地,云服务毗连当地数据进行盘算,然后将结果存储再当地,只用利用云服务的盘算资源)
Pip list(查看python环境中已经安装的模块)
Conda可以快速在多个假造环境中切换,也可以自己创建假造环境
Conda删除假造环境: conda remove -n 假造环境名 –all
在进行脚本情势开发时利用vim很不方便,可以借助pycharm的远程开发,执行操作服务器,进行代码编写
Pycharm远程毗连:
Standlone集群环境配置:
利用standlone来完成盘算任务:
Yarn模式:
假造机中快丢失题目办理方法:
重新进行hdfs初始化(hdfs namenode -format)
日记服务配置:(查看spark盘算任务流程)
不同部署模式的spark利用:
交互式
脚本式
脚本式可能报错:
没有找到python环境
办理办法一:利用os模块进行配置
办法二:把环境变量复制到bashrc中(由于远程读取不到/etc/profile)
提升速度:将/spark/jars包中的数据复制到hdfs分布式存储
Spark盘算程序的运行说明:
Pyspark指令参数:
Pyspark --master yarn –name yarn_demo(指定进程名称)
Pyspark –master yarn –driver-memory 2G(指定driver内存)(指的是最多可以利用2G,不是不停都利用2G)
–num-executors 3指定execotr数量(只能在yarn模式利用,默认两个)
–executor-cores 2指定execotr焦点数量
下令行提交文件运行:spark-submit b.py
Cluster模式可能会报错:(只能在yarn模式下利用)
办理办法:spark-submit –master yarn –deploy-mode cluster python文件
集群模式提交后,不会输出信息,由于在node1上提交的代码可能在node3上运行,此时node1无法显示信息
实际开发中也不用print输出,而是将结果保存在hdfs或者其他存储服务器中
spark://node001:7077 的 Spark Master(spark的网站)
driver:管理spark的盘算任务
executor:详细执行盘算任务线程的
rdd
弹性分布式数据集合
Spark中的一种数据类型(另有dataframe和dataset类型)
管理spark类型数据
开发中可以通过类情势自定义数据类型
可以对海量数据根据需求进行分区,每一份数据都会有对应的task线程执行盘算
分布式 利用集群中多台呆板的盘算资源进行盘算
管理spark的内存数据,完成对应的盘算
类似python中的list列表
特性
Rdd只读,不能0被修改,如果要修改只能对原来的rdd进行盘算生成一个新的rdd
支持分区(可以对海量数据根据需求进行分区,每一份数据都会有对应的task线程执行盘算)
Rdd的恒久化
缓存:可以将盘算中的结果缓存起来(存在内存或者当地磁盘),如果后序盘算错误但是从缓存位置重新盘算
容错
Checkpoint(作用跟缓存一样,区别是可以将数据存储在hdfs中)
创建rdd数据:
将需要盘算的数据转化为rdd的数据,就可以利用spark的内存盘算方法进行分布式盘算
Rdd数据的转发方法是由saprkcontext提供的,所以需要老师成sparkcontext,
Sparkcontext称为spark的入口类
Rdd算子:
主要分为两类:
Transformation:
转化算子:对rdd数据进行转化盘算得到新的rdd,定义了一个线程任务
Action:
执行算子:触发盘算任务,让盘算任务进行执行,得到结果(触发线程执行的)
高级算子
多个rdd的方法
分区算子(特别算子)mappartition:
一次性读取rdd1中的所有数据,然后通过迭代器进行盘算
缓存是将数据存储在磁盘或者内存上,盘算竣事,缓存自动清空
缓存和checkpoint:
共享变量
Spark在盘算中会有两个进程:一个dirver负责管理盘算任务的 另有一个excutor负责执行task盘算任务的
Driver进程和excutor进程需要进行数据通报 ,此时就需要通过共享变量
Spark有两种共享变量的实现: 广播变量和累加器
广播变量:(将dirver端数据传到excutor端)
节省通讯成本和服务器资源
累加器:广播变量是excutor端每个task各自盘算修改数据
累加器是在dirver端对数据进行盘算修改
依赖:
窄依赖(一对一):每个父rdd的一个partition(分区)最多被一个子rdd的一个pattition(分区利用)
宽依赖(一对多):父rdd的一个partition(分区)会被多个子rdd的partition(分区)利用(分区聚合)
DAG管理依赖关系:
DAG是一中有向无环图,是一个图盘算速算法
管理rdd依赖关系,保证rdd按照依赖关系进行数据的顺序盘算
会根据rdd的依赖关系将盘算过程分为多个盘算步骤,每个盘算步骤称为一个stage
在盘算的rdd依赖关系中,一旦发生了宽依赖,就会进行步骤拆分
日记查看依赖关系和盘算流程
APP是盘算应用程序
Job是盘算任务
Dag管理依赖关系
Stage盘算步骤的划分
Task线程 完成该步骤下方法的盘算
Spark的运行流程(内核调度)
Spark执行diver后执行sparkcontext,sparkcontext会调用下面那三个类:
(末了)Task Scheduler (知道taskset(每个task)后调用Scheduler Backend获取资源并为每个task分配资源,维护task和excuter之间的对应关系,将分配好的task交给Scheduler Backend分发给excutor执行,在运行期间调用Scheduler Backend获取task执行环境)
(接着)Dag Scheduler (excuter创建后执行代码当执行action算子后会产生一个job,然后提交到dag进行stage划分,当宽依赖发生时拆分新的stage,分析stage里的task,生成新的task描述存进taskset中)
(起首)Scheduler Backend (哀求resourceManager获取资源,创建excuter并保持通讯)
shuffle(了解)
无论时mpreduce还是spark—shuffle的本质就是通报数据
- mapreduce中的shuffle作用:将map盘算后的数据通报给reduce利用
- mapreduce中的shuffle过程:分区,排序,合并(规约)
- spark中也有shuffle:当执行宽依赖的算子就会进行shuffle,将rdd的数据传给下一个rdd,进行数据互换
spark中shuffle的两个部分:
- shuffle write写
- shuffle read读
- 会进行文件的读写,影响spark的盘算速度(所以尽量能不利用宽依赖就不利用)
spark的shuffle方法类:
- 是spark封装好的处置惩罚shuffle的方法
- hashshuffle
- 根据分区数进行hash取余分组
- 不同余数的放在一起
- hash盘算
- spark1.2版本前主要利用,之后引入了sortshuffle
- spark2.0之后,删除了hashshuffle
- 优化的hashshuffle和未优化
- sortshuffle
- 排序
- bypass模式(跳过模式,不再利用排序的方式)版本和平凡模式版本
- bypass模式版本不会进行排序
- 根hashshuffle的区别是不会将分好组的文件整理,而是直接将每个分区的不同余数的小文件放在一个大文件中(根据索引辨认不同的余数的数据,然后根据索引进行大文件读取)
- 平凡模式版本会排序进行shuffle
- 可以通过配置指定按照那种模式执行
两种都是将相同的key值放在一起处置惩罚
缓存一般都是为了处置惩罚速度不一致题目
spark shuffle配置(了解 )需要颠末多次实验
spark并行度(理解)
- 资源并行度
- task在指定任务能利用cpu焦点的数量
- 多任务,多个进程或多个线程执行任务
- 两种方式
- 并行:多个任务同时执行
- 并发:任务交替执行
- 和cpu焦点数有关
- cpu焦点是4核:有两个线程任务,两个线程任务可以 并行执行
- cpu焦点是4核:有八个线程任务,并发执行
- spark中cpu焦点数据设置
- –num-executors=2
- executor-cores=2
- 资源并行度 = num-executors*executor-cores=4
- 可以利用shell执行查看当前服务器的cpu焦点数 cat /proc/cpuinfo
- excutor是运行在多态服务器上的
- 数据并行度
- 是task数量,task由分区数决定
- 为了保证task能充分利用cpu资源,实现并行盘算,需要计划分区数应该和并行度一致
- 在实际公司要根据公司资源并行度进行设置分区数
spark的rdd
rdd的创建:在创建rdd的同时可以指定分区数,分区数和资源并行度保持一致
rdd的操作:算子的利用transformation和action
rdd的高级用法(用于优化代码)共享变量:缓存和checkpoint
rdd的原理概念:rdd依赖,内核调度,shuffle,并行度
spark SQL
hive的盘算流程:创建库,表metastore存储元数据(库名,表名,字段,表的路径),将元数据存进mysql中的hive3库
- 先容:sparksql是apche spark中用于处置惩罚结构化数据(dataframe和datasets的模块)
- 数据结构分类:
- 结构数据:表结构数据,由行列组成,并且表述了数据的属性和类型,表信息
- 半结构化数据:spark中可以通过方法将半结构化数据转为结构化数据
- 好比:xml,json
- 描述数据的存储结构,但是无法描述数据的类型
- 非结构化数据 rdd可以处置惩罚(文本,图片,视频)
- 特点:
- 易整合
- 利用sql配合spark一起利用,封装了不同语言的dsl方法
- 统一数据访问
- 利用read方法可以读取hdfs的数据,mysql的数据,不同类型的文件数据(json,csv,orc)
- 利用write方法可以写入hdfs,mysql不同类型的文件
- 兼容hive
- 尺度的数据毗连
- 利用jdbc和odbc毗连方式毗连sparksql
- 过程
- 先从mysql中查询元数据再去hdfs中读取数据文件
- 是spark独立开发的工具
- 对spark兼容性更好,优化性能得到提升
- sparksql的本质是将sql语句转化为rdd执行,catalyst引擎负责将sql转化为rdd
- sparksql可以毗连利用hive的metastore服务,管理表的元数据
- shark
- 在spark1.0之前,也就是sparksql没有之前,用来利用sql操作spark数据的
- 将hive的sql语句转化为rdd,spark去执行,然后在hdfs中创建库表目次
- 运行模式是hive on spark
- 会将hivesql转化为spark的rdd
- shark是基于hive开发的,维护麻烦,2015年停止维护
- 数据类型
数据模型spark封装一个底子数据模型(数据类型)rdd
然后根据rdd进行再次封装变成新的数据类型dataFrame
然后根据dataframe再次封装得到了dataset类型
- rdd 弹性分布式集合 利用python,java,scala,R
[1,zhangsan,19,2,lisi,20] #列表的情势
- dataframe类型 结构化数据 行列 表信息(数据的属性(字段)和类型) 利用python,java,scala,R
- from pyspark.sql import Row
- # 使用row类型生成一行数据
- r1 = Row(1,'wuhao',20)
- # 从行中可以通过下标取值
- print(r1[1])
- # 使用row类型生成数据的同时指定字段名
- r2 = Row(id=2,name='dashuaige',age=20)
- # 从行中通过字段名取值
- print(r2['name'])
复制代码
- schema类 表信息
- from pyspark.sql.types import *
- # 定义schema信息
- # 使用structType类进行定义
- # add是指定字段信息
- # 第一个参数,字段名
- # 第二个参数,字段信息
- # 第三个参数,是否允许为空值,默认是True
- schema_type = StructType.add('id',IntegerType(),False).add('name',StringType()).add('age',IntegerType())
复制代码 - [
- [1,zhangsan,19],
- [2,lisi,20]
- ]
- schema
- {
- id:int,
- name:string,
- age:int
- }
复制代码
- datasets类型 结构化数据 利用java,scala
- row类 一行数据 一个dataframe
- schema类 表信息
- 从datasets中取出的一行数据可以当作dataframe类型操作
dataframe
详解
- 结构化数据 行列 表信息(数据的属性(字段)和类型) 利用python,java,scala,R
- row类 行数据 rdd中一个列表元素
- from pyspark.sql import Row
- # 使用row类型生成一行数据
- r1 = Row(1,'wuhao',20)
- # 从行中可以通过下标取值
- print(r1[1])
- # 使用row类型生成数据的同时指定字段名
- r2 = Row(id=2,name='dashuaige',age=20)
- # 从行中通过字段名取值
- print(r2['name'])
复制代码 - schema类 表信息
- from pyspark.sql.types import *
- # 定义schema信息
- # 使用structType类进行定义
- # add是指定字段信息
- # 第一个参数,字段名
- # 第二个参数,字段信息
- # 第三个参数,是否允许为空值,默认是True
- schema_type = StructType.add('id',IntegerType(),False).add('name',StringType()).add('age',IntegerType())
复制代码 - [
- [1,zhangsan,19],
- [2,lisi,20]
- ]
- schema
- {
- id:int,
- name:string,
- age:int
- }
复制代码 创建
创建dataframe数据
需要利用一个sparksession的类创建
sparksession类是在sparkcontext的底子上进行了封装,也就是sparksession类中包含了sparkcontext
基本创建
- # 导入SparkSession和Row
- from pyspark.sql import SparkSession,Row
- # 导入定义字段的类型
- from pyspark.sql.types import *
- # dataframe的创建方法是由sparkSession提供的,需要生成sparksession对象
- # 是一个固定写法
- ss = SparkSession.builder.getOrCreate()
- # 定义每行数据
- r1 = Row(id=1,name='张三',age=20)
- r2 = Row(id=2,name='李四',age=19)
- # 定义schema信息
- schema_type = StructType().add('id',IntegerType).add('name',StringType).add('age',IntegerType)
- # 创建dataframe数据
- # 使用sparksession对象中的createDataFrame
- # 第一个参数,指定行数据,将每行数据放入列表中
- # 第二个参数,指定表信息
- df = ss.createDataFrame([r1,r2],schema_type)
- # 查看df的表数据
- # 使用dataframe下的show方法
- df.show()
- # 查看schema信息
- df.printSchema()
复制代码 rdd的二维数据转化为dataframe
- from pyspark.sql import SparkSession
- from pyspark.sql.types import *
- # 生成sparkSession对象
- ss = SparkSession.builder.getOrCreate()
- # sparkSession中可以通过sparkcontext获取sparkcontext对象
- sc = ss.sparkContext
- # 生成rdd
- rdd = sc.parallrlize([[1,'张三',19],[2,'李四',20]])
- # 定义schema信息
- schema_type = StructType().add('id',IntegerType).add('name',StringType).add('age',IntegerType)
- # 将二维的rdd数据转化为dataframe数据
- df = rdd.toDF(schema_type)
- # 查看dataframe数据
- df.show()
复制代码 基本利用
spark提供的DSL方法和sql的关键词一样,利用方式与sql基本类似,在进行数据处置惩罚时,要按照SQL的的执行顺序去思索怎样处置惩罚数据
from join 知道数据在哪 df本身就是要处置惩罚的数据 df.join(df2)
where 过滤需要处置惩罚的数据 df.join(df2).where()
group by 聚合 数据的盘算 df.join(df2).where().groupby().sum()
having 盘算后的数据进行过滤 df.join(df2).where().groupby().sum().where()
select 展示数据的字段 df.join(df2).where().groupby().sum().where().select()
order by 展示数据的排序 df.join(df2).where().groupby().sum().where().select().orderBy()
limit 展示数据的数量 df.join(df2).where().groupby().sum().where().select().orderBy().limit()
DSL方法之后完成后回得到一个处置惩罚后的新的df
- # 先将rdd转化为二维的形式
- # 再转化为dataframe
- rdd = rdd.map(lambda x:[int(x.split(',')[0]),x.split(',')[1],int(x.split()[2])])
- # 定义schema信息
- schema_type = StructType().add('id',IntegerType).add('name',StringType).add('age',IntegerType)
- df = rdd.toDF(schema_type)
- # df数据进行操作
- # 会返回一个新的df
- # 指定查询的字段数据
- select_df1 = df.select('name') #select name from df;
- select_df2 = df.select('name','age') #select name,age from df;
- select_df3 = df.select(df['name'],df['age']) #select name,age from df;
- # where的条件过滤 返回一个新的df
- # 一个条件 年龄大于20的数据
- where_df1 = df.where('age>20') #select * from df where age>20;
- # 多个条件的与或非
- where_df2 = df.where('id>10 and name like "王%"') #select * from df where id>10 and name like "王%";
- # group by分组一般结合聚合(sum,avg,min,max)操作
- # df.groupby()和df.groupBy是一样的逻辑
- groupby_df1 = df.groupby('gender').sum('age') #select gender,sum(age) from df group by(gender);
- # 多个字段分组
- groupby_df2 = df.groupby('gender','age') #select * from df group by(gender,age)
- # 分组后的数据过滤 sql中的having
- where_groupby_df1 = df.groupby('gender').sum('age').where('sum(age)>80') #select gender,sum(age) from df group by gender having sum(age)>80;
- # orderby排序
- # 底层调用的是sort,默认是升序
- orderby_df1 = df.orderBy('age') # select * from df order by age;
- orderby_df2 = df.orderBy('age',ascending=False) # select * from df order by age desc;
- orderby_df3 = df.orderBy(['age','id'],ascending=False) #多字段排序 select * from df order by age,id desc;
- # limit指定返回的数据数量(spark中不能指定分页)
- limit_df = df.limit(5) # select * from df limit 5;
- # 将多个方法放在一起使用,主义执行顺序
- all_df1 = df.where('age>20').groupby('gender').sum('age').where('sum(age)>30').orderBy('sum(age)')
复制代码 利用sparksession提供的sql方法,编写sql语句执行
留意这里只能写hivesql,与mysql内里有些语法是不同的
- # 使用sql语句去操作spark数据
- # 1.先将df数据指定为一个表
- df.createOrReplaceTempView('stu')
- # 使用sparksession提供的sql方法
- sqldf_1 = ss.sql('select * form stu')
- # 展示数据
- print('----------------sqldf--------------------')
- print(sqldf_1.show())
复制代码 高级的DSL方法
- from pyspark.sql import SparkSession
- from pyspark.sql.types import *
- # 生成sparkSession对象
- ss = SparkSession.builder.getOrCreate()
- # sparkSession中可以通过sparkcontext获取sparkcontext对象
- sc = ss.sparkContext
- # 生成rdd
- rdd1 = sc.parallrlize([[1,'张三',19],[2,'李四',20]])
- rdd2 = sc.parallrlize([[1,'吴昊',19],[2,'王五',20]])
- # 定义schema信息
- schema_type = StructType().add('id',IntegerType).add('name',StringType).add('age',IntegerType)
- # 将二维的rdd数据转化为dataframe数据(必须是二维嵌套)
- df1 = rdd1.toDF(schema_type)
- df2 = rdd2.toDF(schema_type)
- # join的关联
- # 内关联返回一个新的rdd
- # 第一个参数:关联的df
- # 第二个字段关联的字段
- # 第三个参数,关联方式(默认不写是内关联)
- join_df1 = df1.join(df2,'id')
- # 左关联
- join_df1 = df1.join(df2,'id','left')
- # 右关联
- join_df1 = df1.join(df2,'id','right')
- # 查看dataframe数据
- df1.show()
- df2.show()
复制代码
- 缓存和checkpoint
- 相同点和不同点和rdd中的一样
- 如果既进行了缓存又进行了checkpoint遇到盘算错误会先去缓存中寻找
- from pyspark.sql import SparkSession
- from pyspark.sql.types import *
- # 生成sparkSession对象
- ss = SparkSession.builder.getOrCreate()
- # sparkSession中可以通过sparkcontext获取sparkcontext对象
- sc = ss.sparkContext
- # 设置checkpoint存储位置
- sc.setCheckpointDir('hdfs:///spark_checkpoint')
- # 生成rdd
- rdd = sc.parallrlize([[1,'张三',19],[2,'李四',20]])
- # 定义schema信息
- schema_type = StructType().add('id',IntegerType).add('name',StringType).add('age',IntegerType)
- # 将二维的rdd数据转化为dataframe数据(必须是二维嵌套)
- df = rdd.toDF(schema_type)
- # df进行缓存
- df.persist()
- # df进行checkpoint
- df.checkpoint()
- new_df = df.where('id>10') # 再进行df计算时,如果计算错误,就直接读取df的缓存数据或者checkpoint再进行计算
- # 查看dataframe数据
- new_df.show()
复制代码 内置函数
利用spark中的内置函数完成对字段数据的操作,和hive中的内置函数操作基本一样
- # 生成rdd
- rdd = sc.parallrlize([
- [1,'张三',19,'1989-5-19'],
- [2,'李四',20,'2003-12-04']
- ])
- # 定义schema信息
- schema_type = StructType().add('id',IntegerType).\
- add('name',StringType).\
- add('age',IntegerType).\
- add('data',StringType).\
- add('unix_t',StringType)
- # 将二维的rdd数据转化为dataframe数据(必须是二维嵌套)
- df = rdd.toDF(schema_type)
- df.checkpoint()
- # 内置函数使用
- # 字符串操作
- # 字符串拼接
- new_df1 = df.select(f.concat('name','age')) #select concat(name,age) from df;
- # 指定拼接字符
- new_df2 = df.select(f.concat_ws(',','name','age')) #select concat_ws(',',name,age) from df;
- # 字符串截取
- # 第一个参数是要截取的字段,第二个是从第几个字符开始截取,第三个是要截取多少个字符
- new_df3 = df.select(f.substring('name',1,5)) #select substring(name,1,5) from df;
- # 字符串切割
- # 第一个参数是要分割的字段,第二个是按什么分割
- # 分割完成后每条数据会变成一个列表
- new_df4 = df.select(f.split('data','-')) #select split(data,'-') from df;
- # 对分割后的数据可以进行下标取值
- new_df44 = df.select(f.split('data','-')[0])
- # 展示结果
- new_df1.show()
- print('-------------指定拼接分隔符---------------')
- new_df2.show()
- print('-------------字符串截取---------------')
- new_df3.show()
- print('-------------字符串分割---------------')
- new_df4.show()
- print('-------------字符串分割和下标取值---------------')
- new_df44.show()
复制代码
- # 生成rdd
- rdd = sc.parallrlize([
- [1,'张三',19,'1989-5-19'],
- [2,'李四',20,'2003-12-04']
- ])
- # 定义schema信息
- schema_type = StructType().add('id',IntegerType).\
- add('name',StringType).\
- add('age',IntegerType).\
- add('data',StringType).\
- add('unix_t',StringType)
- # 将二维的rdd数据转化为dataframe数据(必须是二维嵌套)
- df = rdd.toDF(schema_type)
- df.checkpoint()
- # 获取当前日期
- new_df5 = df.select(f.current_date())
- # 获取当前日期和时间信息
- new_df55 = df.select(f.current_timestamp())
- # 获取unix时间
- new_df555 = df.select(f.unix_timestamp())
- # 将unix时间转化为标准时间,可以通过format=修改默认标准时间格式
- new_df5555 = df.select(f.from_unixtime('unix_t'))
- # 时间加减,单位为天(默认加一天,将数据改成负数就是减)
- new_df55555 = df.select(f.date_add('data',1))
- print('-------------获取当前日期---------------')
- new_df5.show()
- print('-------------获取当前日期和时间---------------')
- new_df55.show()
- print('-------------获取unix时间---------------')
- new_df555.show()
- print('-------------将unix时间转化为标准时间---------------')
- new_df5555.show()
- print('-------------时间加减---------------')
- new_df55555.show()
复制代码
- 聚合
- 内置函数中有聚合操作方法 .sum() .avg() .max() .min()
- 需要配合df中的agg方法一起利用
- # 聚合(内置函数中的聚合方法先分组再聚合)
- # 配合agg方法(如果不写agg就只能单聚合)
- new_df6 = df.groupby('gender').agg(f.sum('age')) #select gender,sum(age) from df order by gender;
- # 配合agg方法进行多个聚合
- new_df66 = df.groupby('gender').agg(f.sum('age'),f.avg('age'),f.max('age'))
- # 多个聚合保留两位小数
- #select gender,round(sum(age),2) from df order by gender;
- new_df666 = df.groupby('gender').agg(f.round(f.sum('age'),2),f.round(f.avg('age'),2),f.round(f.max('age'),2))
复制代码
- # 对列起别名
- new_df6666 = df.groupby('gender').agg(f.round(f.sum('age'),2).alias('sum_age'),f.round(f.avg('age'),2).alias('avg_age'),f.round(f.max('age'),2).alias('max_age'))
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |