1.HDFS:
1.1常用下令:
1.操作下令:
1.1创建文件夹
- hdfs dfs -mkdir [-p] <文件路径>
复制代码 例如,新建文件夹 /202012721、/202012721/dir1。下令如下:
- hdfs dfs -mkdir /202012721
- hdfs dfs -mkdir -p /202012721/dir1
复制代码
1.2 列出指定的文件和目次
- hdfs dfs -ls [-d][-h][-R] <文件路径>
复制代码 [-d]:返回 path。
[-h]:h 指 human-readble,表示按照人性化的单元显示文件巨细,比如文件显示为 10 MB,而不会显示 10240 KB。
[-R]:级联显示 paths 下的文件。
例如,列出根目次下的文件或目次。下令如下:
1.3新建文件
例如,在主目次 /202012721/input 下创建巨细为 0 00 的空文件 file。下令如下:
- hdfs dfs -touchz /202012721/input.txthdfs dfs -ls /
- 202012721
复制代码 1.4上传文件
- hdfs dfs -put [-f] [-p] <本地路径> <HDFS上的路径>
复制代码 例如,在本地创建一个文件 data.txt 并传到 HDFS 的 /202012721 目次下下令如下:
- hdfs dfs -put data.txt /202012721/data.txt
复制代码 1.5将本地文件移动到 HDFS
- hdfs dfs -moveFromLocal <本地文件路径> <HDFS路径>
复制代码 例如,我们把本地文件 data2.txt 移动到 HDFS 的 /202012721 下令如下:
- hdfs dfs -moveFromLocal data2.txt /202012721
复制代码 1.6下载文件
- hdfs dfs -get [-p] <HDFS路径> <本地文件路径>
复制代码 例如,将 HDFS 中的 /202012721/data.txt 文件下载并生存为本地的 ~/local_data.txt:
- hdfs dfs -get /202012721/data.txt ~/local_data.txt
复制代码 1.7查看文件
- hdfs dfs -cat [-ignoreCrc] <文件路径>
复制代码 [-ignoreCrc]:忽略循环检验失败的文件。
- hdfs dfs -text [-ignoreCrc] <文件路径>
复制代码 text 不但可以查看文本文件,还可以查看压缩文件和 Avro 序列化的文件。
- hdfs dfs -tail [-f] <文件路径>
复制代码 [-f]:动态更新显示数据。
tail 查看的是最后 1 KB 的文件(Linux 上的 tail 默认查看最后 10 行记载)。
例如,查看 /202012721/data.txt。下令如下:
- hdfs dfs -cat /202012721/data.txt
复制代码 1.8追写文件
- hdfs dfs -appendToFile <本地路径> <目标路径>
复制代码 该下令将 localsrc 指向的本地文件内容写入目的文件 dst。
例如,在本地根目次下新建 data3.txt 文件,内容为 hadoop,追加到文件 /202012721/data.txt 中。下令如下:
- hdfs dfs -appendToFile data3.txt /202012721/data.txt
- hdfs dfs -cat /202012721/data.txt
复制代码 1.9删除目次或者文件
- hdfs dfs -rm [-f] [-r] <文件路径>
复制代码 [-f]:如果要删除的文件不存在,不显示错误信息。
[-r/R]:级联删除目次下全部的文件和子目次文件。
例如,删除 HDFS 中的 /202012721/data2.txt 文件。下令如下:
- hdfs dfs -rm /202012721/data2.txt
复制代码 1.10显示占用的磁盘空间巨细
- hdfs dfs -du [-s] [-h] <文件路径>
复制代码 [-s]:显示指定目次下文件总的巨细。
[-h]:h 指 human-readble,表示按照人性化的单元显示文件巨细,比如文件显示为 10 MB,而不会显示 10240 KB。
例如,显示 HDFS 根目次中 202012721 文件夹下全部文件的巨细。下令如下:
1.11HDFS 中的文件复制
- hdfs dfs -cp [-f] [-p | -p[topax]] <原路径> <目标路径>
复制代码 [-f]:如果目的文件存在,将强行覆盖。
[-p]:将生存文件的属性。
例如,将 HDFS 中的 /202012721/data.txt 复制为 /202012721/data_copy.txt。下令如下:
- hdfs dfs -cp /202012721/data.txt /202012721/data_copy.txt
复制代码 1.12 HDFS 中的文件移动
- hdfs dfs -mv <原路径> <目标路径>
复制代码 例如,将 HDFS 中的 /202012721/data_copy.txt 移动(也可理解为改名)为 /202012721/data2.txt。下令如下:
- hdfs dfs -mv /202012721/data_copy.txt /202012721/data2.txt
复制代码 1.13管理-陈诉文件体系的根本信息和统计信息
1.14查看拓扑
- hdfs dfsadmin -printTopology
复制代码
1.2例题:
任务描述
本关任务:使用 Hadoop 下令来操作分布式文件体系。
编程要求
在右侧下令行中启动 Hadoop ,进行如下操作。
在 HDFS 中创建 /usr/output/ 文件夹;
在本地创建 hello.txt 文件并添加内容:“ HDFS 的块比磁盘的块大,其目的是为了最小化寻址开销。”;
将 hello.txt 上传至 HDFS 的 /usr/output/ 目次下;
删除 HDFS 的 /user/hadoop 目次;
将 Hadoop 上的文件 hello.txt 从 HDFS 复制到本地 /usr/local 目次。
预期输出:
HDFS的块比磁盘的块大,其目的是为了最小化寻址开销。
HDFS的块比磁盘的块大,其目的是为了最小化寻址开销。
代码:
- start-dfs.sh
- hadoop fs -mkdir /usr
- hadoop fs -mkdir /usr/output
- vim hello.txt
- hadoop fs -put hello.txt /usr/output
- hadoop fs -rm -r /user/hadoop
- hadoop fs -copyToLocal /usr/output/hello.txt /usr/local
复制代码 HDFS体系初体验:
- start-dfs.sh
- hadoop fs -mkdir /task
- hadoop fs -ls /
- touch task.txt
复制代码- vim task.txt
-
- hello educoder
复制代码 hello educoder输入完后,退出vim文本编辑器后再输入下面的内容
- hadoop fs -put task.txt /task
- hadoop fs -cat /task/task.txt
复制代码 2.HBase:
2.1常用下令:
1. 启动HBase
start-hbase.sh
2. 关闭hbase
stop-hbase.sh
3. 进入客户端
./hbase shell
4. 退出客户端下令
quit
5. 查看namespace
list_namespace
6. 创建namespace
create_namespace "namespace"
7. 删除namespace
drop_namespace "namespace"
8.表操作
1. 查看全部表
hbase(main):024:0> list
2. 查看某个namespace下全部的表
hbase(main):027:0> list_namespace_tables "testns"
3. 创建表
hbase(main):027:0> create "namespace:表名","列族1","列族2"
hbase(main):027:0> create "testns:t_person","info","edu"
4. 查看表布局
hbase(main):027:0> desc "testns:t_person"
5. 删除表和禁用表
hbase(main):027:0> disable "namespace:表"
hbase(main):027:0> drop "namespace:表"
9.数据增编削查
1. 添加数据
put "namespace:表","rowkey","列族1:列名1","值"
hbase(main):007:0> put 'testns:t_person','1001','info:name','zhangsan'
2. 根据rowkey查找数据
get "namespace:表名","rowkey"
hbase(main):015:0> get 'testns:t_person','1001'
3. scan查询表中全部数据
hbase(main):015:0> scan 'testns:t_person'
4. scan 查询表中前2条数据
hbase(main):015:0> scan "restns:t_person",{LIMIT=>2}
5. 使用start row和end row范围查找
hbase(main):015:0> scan "testns:t_person",{STARTROW=>'1001',STOPROW=>'1003'}
6. 使用start row和limit查找
hbase(main):015:0> scan "testns:t_person",{STARTROW=>'1001',LIMIT=>2}
7. 修改数据
put "namespace:表名","rowkey","列族:列名","值"
8. 删除数据
delete "namespace:表","rowkey","列族:列名"
9. 删除某个rowkey对应的数据
deleteall "namespace:表","rowkey"
10. 统计表中全部数据
count "namespace:表"
11. 清空表中的全部数据
truncate "namespace:表"
12. 创建表
hbase(main):013:0> create "testns:user","info"
13. 修改版本数
hbase(main):013:0> alter "testns:user",{NAME=>'INFO',VERSIONS=>2}
14. 查看多版本
hbase(main):013:0> get "testns:user","10001",{COLUMN=>'info:name',VERSIONS=>3}
例题:
创建表:
创建test表,然后继续在HBase中创建两张表,表名分别为:dept,emp,列都为:data
输出:
describe 'test' Table test is ENABLED test describe 'dept' Table dept is ENABLED dept describe 'emp' Table emp is ENABLED emp
启动:
建表:
- # 创建表 test
- create 'test', 'data'
-
- # 创建表 dept
- create 'dept', 'data'
-
- # 创建表 emp
- create 'emp', 'data'
复制代码 添加数据、删除数据、删除表:
建表插数据:
- create 'mytable', 'data'
- put 'mytable', 'row1', 'data:1', 'zhangsan'
- put 'mytable', 'row2', 'data:2', 'zhangsanfeng'
- put 'mytable', 'row3', 'data:3', 'zhangwuji'
复制代码 3.MapReduce
Map/Reduce程序总共分为两块即:Map,Reduce,Map负责处理输入文件的内容。 mapper方法,它以空格为分隔符将一行切分为多少tokens,之后,输出< <word>, 1> 情势的键值对。
Reducer中的reduce方法 仅是将每个key出现的次数求和。 hadoop-streaming只来自各个mapper的键值对按照键排序,不会集并,因此我们需要本身将排序后的键值对合并成<键, 值列表>的情势后再发给reduce程序处理,recuder.py中的main()就是为了实现这个功能。
例题:
1.结果统计
使用MapReduce盘算班级每个学生的最好结果,输入文件路径为/user/test/input,请将盘算后的效果输出到/user/test/output/目次下。
- #! /usr/bin/python3
- import sys
- def main():
- for line in sys.stdin:
- line = line.strip()
- mapper(line)
- # 使用name,age分别表示姓名和年龄
- def mapper(line):
- ########## begin ############
- group = line.split('\\n')
- for people in group:
- if len(people.strip()) == 0:
- continue
- name, age = people.split(' ')
- ########### End #################
- print("%s\t%s" % (name, age))
- if __name__ == '__main__':
- main()
复制代码- #! /usr/bin/python3
- import sys
- from operator import itemgetter
- # 找出values的最大值,并按name\tmax_age的形式输出。
- def reducer(k, values):
- ##### Begin #########
- print("%s\t%s" % (k,max(values)))
- ##### End #########
- #############################################
- # mapper的输出经过分区处理后,数据行按照键排序;
- # 具有相同键的行排在一起;
- # hadoop-streaming不会合并相同键的各个值。
- # 下面的代码将相同键的各个值放到同一个列表中,
- # 并调用reducer函数实现找出每个人的最大年龄并输出;
- # 输出格式为:姓名\年龄
- #############################################
- def main():
- current_name = None
- ages = []
- name, age = '', 0
- for line in sys.stdin:
- line = line.strip()
- name, age = line.split('\t', 1)
- age = int(age)
- if current_name == name:
- ages.append(age)
- else:
- if current_name:
- reducer(current_name, ages)
- ages = []
-
- ages.append(age)
- current_name = name
- # 不要忘记最后一个人
- if current_name == name:
- reducer(current_name, ages)
- if __name__ == '__main__':
- main()
复制代码 2.文件内容合并去重
对于两个输入文件,即文件file1和文件file2,请编写MapReduce程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件file3。 为了完成文件合并去重的任务,你编写的程序要能将含有重复内容的差别文件合并到一个没有重复的整合文件,规则如下: 第一列按学号分列; 学号相同,按x,y,z分列; 输入文件路径为:/user/tmp/input/; 输出路径为:/user/tmp/output/。 留意:输入文件后台已经帮你创建好了,不需要你再重复创建。
- #! /usr/bin/python3
- import sys
- def main():
- for line in sys.stdin:
- line = line.strip()
- mapper(line)
- def mapper(line):
- ########## Begin ###############
- items = line.split('\\n')
-
- for item in items:
- key,value = item.split()
- print("%s\t%s" % (key,value))
- ########### End #############
- if __name__ == '__main__':
- main()
复制代码- #! /usr/bin/python3
- import sys
- def reducer(k, values):
- ############ Begin ################
- value = sorted(list(set(values)))
- for v in value:
- print("%s\t%s" % (k,v))
- ############ End ################
- def main():
- current_key = None
- values = []
- akey, avalue = None, None
- for line in sys.stdin:
- line = line.strip()
- akey, avalue = line.split('\t')
-
- if current_key == akey:
- values.append(avalue)
- else:
- if current_key:
- reducer(current_key, values)
- values = []
-
- values.append(avalue)
- current_key = akey
-
- if current_key == akey:
- reducer(current_key, values)
- if __name__ == '__main__':
- main()
-
复制代码 3.挖掘父子关系
你编写的程序要能挖掘父子辈关系,给出祖孙辈关系的表格。规则如下: 孙子在前,祖父在后; 输入文件路径:/user/reduce/input; 输出文件路径:/user/reduce/output。
- #! /usr/bin/python3
-
- import sys
- def mapper(line):
- ############### Begin ############
- items = line.split("\\n")
- for item in items:
- child,parent = item.split()
- print("%s\t%s" % (child,"p-"+parent))
- print("%s\t%s" % (parent,"c-"+child))
- ############### End #############
- def main():
- for line in sys.stdin:
- line = line.strip()
- if line.startswith('child'):
- pass
- else:
- mapper(line)
- if __name__ == '__main__':
- main()
复制代码- #! /usr/bin/python3
- import sys
- def reducer(k, values):
- ############## Begin ################
- grandc = []
- grandp = []
- for value in values:
- if value[:2] == 'c-':
- grandc.append(value[2:])
- elif value[:2] == 'p-':
- grandp.append(value[2:])
-
- for c in grandc:
- for p in grandp:
- print("%s\t%s" % (c,p))
- ############## End #################
- def main():
- current_key = None
- values = []
- akey, avalue = None, None
- print("grand_child\tgrand_parent")
- for line in sys.stdin:
- line = line.strip()
- try:
- akey, avalue = line.split('\t')
- except:
- continue
- if current_key == akey:
- values.append(avalue)
- else:
- if current_key:
- reducer(current_key, values)
- values = []
- values.append(avalue)
- current_key = akey
- if current_key == akey:
- reducer(current_key, values)
- if __name__ == '__main__':
- main()
复制代码 4.数据洗濯
1.去掉字段“上映天数”中带有“零点场”、“点映”、“展映”和“重映”的电影数据;
2.以字段“上映天数”和“当前日期”为依据,在尾列添加一个“上映日期”(releaseDate)的字段,该字段值为“当前日期”减去“上映天数”+1(格式为:2020-10-13。例如:若字段“上映天数”的值为“上映2天”,字段“当前日期”为“2020-10-10”,则字段“上映日期”的值为“2020-10-09”)。如果字段“上映天数”为空,则字段“上映日期”的值设为“往期电影”。留意:若字段“上映天数”的值为“上映首日”,则字段“上映日期”的值应设为“当前日期”的值;
3.对字段“当日综合票房”和字段“当前总票房”的数据值进行处理,以“万”为单元,通过数字情势展示,若原数据中有“万”等表示量级的汉字,需去掉量级汉字(如:“1.5亿”需转换为“15000”,“162.4万”转换为“162.4”)。转换时需留意精度缺失标题(如:使用 double、float 等范例做算术运算时,1.14 亿转换为万时,效果会出现 11399.999999999998 万,可以使用 BigDecimal 类来处理这类精度缺失标题),字段“当日综合票房”和字段“当前总票房”最后保留两位小数;
4.洗濯完的数据集设置为 1 个分区后,存储到 /root/files 目次下,分隔方式为 \t。
** 留意:不要改变字段原本的顺序。 **
数据集说明
本数据集是电影票房数据,包含十个字段的信息,数据集的字段含义说明如下:
盛世秧歌 7.06万 <0.1% 107 <0.1% 22 47.3% 1757.1万 展映 2019-10-16
决胜时刻 70.83万 0.6% 561 0.1% 34 21.6% 1.14亿 上映25天 2019-10-14
天池水怪 5.43万 <0.1% 55 <0.1% 30 -- 75.1万 2020-10-06
洗濯后数据展示:
决胜时刻 70.83 0.6% 561 0.1% 34 21.6% 11400.0 上映25天 2019-10-14 2019-09-20
天池水怪 5.43 <0.1% 55 <0.1% 30 -- 75.1 2020-10-06 往期电影
留意事项
1.洗濯数据后共多少行;
2.字段“上映日期”添加乐成;
3.字段“当日综合票房”和字段“当前总票房”的单元处理乐成。
启动HDFS:
- # dbhelper.py
-
- import pymysql
- import sys
- import codecs
-
- class DBHelper:
-
- def get_connection():
- # 根据题目提供的凭据建立到mysql服务器的连接"conn",注意字符集指定为"utf8mb4"
- ######## Begin ############
- conn = pymysql.connect(host='localhost',port=3306,\
- user='root',passwd='123123',\
- charset='utf8mb4',db='mydb')
- ######## End ############
- return conn
-
- @classmethod
- def get_region(cls):
- conn = cls.get_connection()
- regions = dict()
- with conn.cursor() as cur:
- #从数据库中查询所有的省市代码和省市名称,并保存到字典regions中。
- ############ Begin ###################
- cur.execute("select CodeNum,Address from allregion")
-
- for s in cur.fetchall():
- regions[s[0]] = s[1]
- ############ End #################
- conn.close()
- return regions
-
- @classmethod
- def get_userphones(cls):
- conn = cls.get_connection()
- userphones = dict()
- with conn.cursor() as cur:
- #从数据库中查询所有的电话号码和对应的姓名,并保存到字典userphones中。
- ############ Begin ###################
- cur.execute("select phone,trueName from userphone")
-
- for t in cur.fetchall():
- userphones[t[0]] = t[1]
- ############ End #################
- conn.close()
- return userphones
-
- def main():
- sys.stdout = codecs.getwriter('utf-8')(sys.stdout.detach())
- region = DBHelper.get_region()
- users = DBHelper.get_userphones()
-
- if __name__ == '__main__':
- main()
复制代码- #! /usr/bin/python3
- #
- # mapper.py
- import sys
- from dbhelper import DBHelper
- import codecs
- import time
-
- # 获取“省市代码:省市名称”项并保存在字典regions中;
- # 获取“电话号码:姓名”项并保存在字典userphones中。
- regions = DBHelper.get_region()
- userphones = DBHelper.get_userphones()
-
- def main():
- # 正确输出utf-8编码的汉字
- sys.stdout = codecs.getwriter('utf-8')(sys.stdout.detach())
- for line in sys.stdin:
- line = line.strip()
- mapper(line)
-
- def mapper(line):
- # 输出形如“邓二,张倩,13666666666,15151889601,2018-03-29 10:58:12,2018-03-29 10:58:42,30,黑龙江省,上海市”的字符串
- # 本题不需要reduce阶段,输出题目要求的内容即可,不需要使用“键\t值”的形式。
- ########## begin ##############
- items = line.split(',')
-
- caller = userphones.get(items[0])
- reciever = userphones.get(items[1])
- begin_time = int(items[2])
- end_time = int(items[3])
- caller_address = regions.get(items[4])
- reciever_address = regions.get(items[5])
-
- print(caller,reciever,sep=',',end=',')
- print(','.join(items[:2]),end=',')
- print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(begin_time)),end=',')
- print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(end_time)),end=',')
- print(str(end_time - begin_time),end=',')
- print(caller_address,reciever_address,sep=',')
- ########### End #################
-
- if __name__ == '__main__':
- main()
复制代码
总结:
map函数:
- def mapper(line):
- words = line.split(' ')
- for word in words:
- if len(word.strip()) == 0:
- continue
- print("%s\t%s" % (word, 1))
- if __name__ == '__main__':
- main()
复制代码 reducer函数:
- def reducer(k, values):
- print("%s\t%s" % (k, sum(values)))
复制代码 4.Spark
1.Spark任务提交
jar包所在位置: /root/project.jar
主类:Student
提交模式:local
- #!/bin/bash
-
- cp -r Spark/SparkRDD/target/project.jar /root
- cd /opt/spark/dist/bin
- #********** Begin **********#
- ./spark-submit \
- --master local \
- --class Student \
- /root/project.jar
- #********** End **********#
复制代码 盘算圆周率:
- cd /opt/spark/dist/bin
- ./spark-submit
- --master local
- --class org.apache.spark.examples.SparkPi
- /opt/spark/dist/examples/jars/spark-examples_2.11-2.2.0.jar
复制代码 ./spark-submit
--master 本地模式
--class 程序运行的主类名
xxx.jar
2.sparkRDD
RDD五大特性:
一组分片(Partition),即数据集的根本组成单元。对于RDD来说,每个分片都会被一个盘算任务处理,并决定并行盘算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采取默认值。默认值就是程序所分配到的CPU Core的数量。
一个盘算每个分区的函数。Spark中RDD的盘算是以分片为单元的,每个 RDD都会实现 compute 函数以到达这个目的。compute函数会对迭代器进行复合,不需要生存每次盘算的效果。
RDD之间的依靠关系。RDD的每次转换都会天生一个新的RDD,所以RDD之间就会形成雷同于流水线一样的前后依靠关系。在部分分区数据丢失时,Spark 可以通过这个依靠关系重新盘算丢失的分区数据,而不是对RDD的全部分区进行重新盘算。
一个Partitioner,即RDD的分片函数。当前Spark中实现了两种范例的分片函数,一个是基于哈希的HashPartitioner,别的一个是基于范围的 RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表生存的就是每个 Partition 所在的块的位置。按照“移动数据不如移动盘算”的理念,Spark 在进行任务调理的时间,会尽可能地将盘算任务分配到其所要处理数据块的存储位置。
1.集合并行化创建RDD:
SparkContext创建;
- sc = SparkContext("local", "Simple App")
复制代码 说明:"local" 是指让Spark程序本地运行,"Simple App" 是指Spark程序的名称,这个名称可以恣意(为了直观明了的查看,最好设置有意义的名称)。
集合并行化创建RDD:
- data = [1,2,3,4]
- rdd = sc.parallelize(data)
复制代码 collect算子:在驱动程序中将数据集的全部元素作为数组返回(留意数据集不能过大):
停止SparkContext
- # -*- coding: UTF-8 -*-
- from pyspark import SparkContext
-
- if __name__ == "__main__":
- #********** Begin **********#
-
- # 1.初始化 SparkContext,该对象是 Spark 程序的入口
- sc = SparkContext("local", "Simple RDD App")
-
- # 2.创建一个1到8的列表List
- data = list(range(1, 9))
-
- # 3.通过 SparkContext 并行化创建 rdd
- rdd = sc.parallelize(data)
-
- # 4.使用 rdd.collect() 收集 rdd 的内容。 rdd.collect() 是 Spark Action 算子,在后续内容中将会详细说明,主要作用是:收集 rdd 的数据内容
- rdd_content = rdd.collect()
-
- # 5.打印 rdd 的内容
- print(rdd_content)
-
- # 6.停止 SparkContext
- sc.stop()
-
-
- #********** End **********#
复制代码 2.读取外部数据集创建RDD
文本文件RDD可以使用创建SparkContex的textFile方法。此方法需要一个 URI的文件(本地路径的呆板上,或一个hdfs://,s3a:// 等 URI),并读取其作为行的集合。这是一个示例调用:
- distFile = sc.textFile("data.txt")
复制代码- # -*- coding: UTF-8 -*-
- from pyspark import SparkContext
-
- if __name__ == '__main__':
- #********** Begin **********#
-
- # 1.初始化 SparkContext,该对象是 Spark 程序的入口
- sc = SparkContext("local", "Simple App")
- # 文本文件 RDD 可以使用创建 SparkContext 的textFile 方法。此方法需要一个 URI的 文件(本地路径的机器上,或一个hdfs://,s3a://等URI),并读取其作为行的集合
- # 2.读取本地文件,URI为:/root/wordcount.txt
- raw = sc.textFile("/root/wordcount.txt")
- rdd = raw.map(lambda x:x)
- # 3.使用 rdd.collect() 收集 rdd 的内容。 rdd.collect() 是 Spark Action 算子,在后续内容中将会详细说明,主要作用是:收集 rdd 的数据内容
- rdd.collect()
- # 4.打印 rdd 的内容
- print(rdd.collect())
- # 5.停止 SparkContext
- sc.stop()
- #********** End **********#
复制代码 3.Transformation - map
- # -*- coding: UTF-8 -*-
- from pyspark import SparkContext
-
- if __name__ == "__main__":
- #********** Begin **********#
-
- # 1.初始化 SparkContext,该对象是 Spark 程序的入口
- sc = SparkContext("local","Simple App")
- # 2.创建一个1到5的列表List
- List = [1,2,3,4,5]
- # 3.通过 SparkContext 并行化创建 rdd
- rdd = sc.parallelize(List)
- # 4.使用rdd.collect() 收集 rdd 的元素。
- print(rdd.collect())
-
- """
- 使用 map 算子,将 rdd 的数据 (1, 2, 3, 4, 5) 按照下面的规则进行转换操作,规则如下:
- 需求:
- 偶数转换成该数的平方
- 奇数转换成该数的立方
- """
-
- # 5.使用 map 算子完成以上需求
- rdd_map = rdd.map(lambda x:(x*x if (x%2==0) else x*x*x))
- # 6.使用rdd.collect() 收集完成 map 转换的元素
- print(rdd_map.collect())
- # 7.停止 SparkContext
- sc.stop()
-
- #********** End **********#
复制代码 4.Transformation - mapPartitions
- # -*- coding: UTF-8 -*-
- from pyspark import SparkContext
-
- #********** Begin **********#
- def f(iterator):
- list = []
- for x in iterator:
- list.append((x,len(x)))
- return list
- #********** End **********#
-
- if __name__ == "__main__":
- #********** Begin **********#
-
- # 1.初始化 SparkContext,该对象是 Spark 程序的入口
- sc = SparkContext("local","Simple App")
- # 2. 一个内容为("dog", "salmon", "salmon", "rat", "elephant")的列表List
- data = ["dog", "salmon", "salmon", "rat", "elephant"]
- # 3.通过 SparkContext 并行化创建 rdd
- rdd = sc.parallelize(data)
- # 4.使用rdd.collect() 收集 rdd 的元素。
- print(rdd.collect())
-
- """
- 使用 mapPartitions 算子,将 rdd 的数据 ("dog", "salmon", "salmon", "rat", "elephant") 按照下面的规则进行转换操作,规则如下:
- 需求:
- 将字符串与该字符串的长度组合成一个元组,例如:
- dog --> (dog,3)
- salmon --> (salmon,6)
- """
-
- # 5.使用 mapPartitions 算子完成以上需求
- partitions = rdd.mapPartitions(f)
- # 6.使用rdd.collect() 收集完成 mapPartitions 转换的元素
- print(partitions.collect())
- # 7.停止 SparkContext
- sc.stop()
-
- #********** End **********#
复制代码 5.Transformation - filter
- # -*- coding: UTF-8 -*-
- from pyspark import SparkContext
-
- if __name__ == "__main__":
- #********** Begin **********#
-
- # 1.初始化 SparkContext,该对象是 Spark 程序的入口
- sc = SparkContext("local","Simple App")
- # 2.创建一个1到8的列表List
- data = [1,2,3,4,5,6,7,8]
- # 3.通过 SparkContext 并行化创建 rdd
- rdd = sc.parallelize(data)
- # 4.使用rdd.collect() 收集 rdd 的元素。
- print(rdd.collect())
-
- """
- 使用 filter 算子,将 rdd 的数据 (1, 2, 3, 4, 5, 6, 7, 8) 按照下面的规则进行转换操作,规则如下:
- 需求:
- 过滤掉rdd中的奇数
- """
- # 5.使用 filter 算子完成以上需求
- rdd_filter = rdd.filter(lambda x:x%2==0)
- # 6.使用rdd.collect() 收集完成 filter 转换的元素
- print(rdd_filter.collect())
- # 7.停止 SparkContext
- sc.stop()
-
- #********** End **********#
复制代码 6.Transformation - flatMap
- # -*- coding: UTF-8 -*-
- from pyspark import SparkContext
-
- if __name__ == "__main__":
- #********** Begin **********#
-
- # 1.初始化 SparkContext,该对象是 Spark 程序的入口
- sc = SparkContext("local","Simple App")
- # 2.创建一个[[1, 2, 3], [4, 5, 6], [7, 8, 9]] 的列表List
- data = [[1,2,3],[4,5,6],[7,8,9]]
- # 3.通过 SparkContext 并行化创建 rdd
- rdd = sc.parallelize(data)
- # 4.使用rdd.collect() 收集 rdd 的元素。
- print(rdd.collect())
- """
- 使用 flatMap 算子,将 rdd 的数据 ([1, 2, 3], [4, 5, 6], [7, 8, 9]) 按照下面的规则进行转换操作,规则如下:
- 需求:
- 合并RDD的元素,例如:
- ([1,2,3],[4,5,6]) --> (1,2,3,4,5,6)
- ([2,3],[4,5],[6]) --> (1,2,3,4,5,6)
- """
- # 5.使用 filter 算子完成以上需求
- flat_map = rdd.flatMap(lambda x:x)
- # 6.使用rdd.collect() 收集完成 filter 转换的元素
- print(flat_map.collect())
- # 7.停止 SparkContext
- sc.stop()
- #********** End **********#
复制代码 7.Transformation - distinct
- # -*- coding: UTF-8 -*-
- from pyspark import SparkContext
-
- if __name__ == "__main__":
- #********** Begin **********#
-
- # 1.初始化 SparkContext,该对象是 Spark 程序的入口
- sc = SparkContext("local","Simple App")
-
- # 2.创建一个内容为(1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1)的列表List
- data = [1,2,3,4,5,6,5,4,3,2,1]
-
- # 3.通过 SparkContext 并行化创建 rdd
- rdd = sc.parallelize(data)
-
- # 4.使用rdd.collect() 收集 rdd 的元素
- print(rdd.collect())
-
- """
- 使用 distinct 算子,将 rdd 的数据 (1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1) 按照下面的规则进行转换操作,规则如下:
- 需求:
- 元素去重,例如:
- 1,2,3,3,2,1 --> 1,2,3
- 1,1,1,1, --> 1
- """
- # 5.使用 distinct 算子完成以上需求
- a = rdd.distinct()
-
- # 6.使用rdd.collect() 收集完成 distinct 转换的元素
- print(a.collect())
-
- # 7.停止 SparkContext
- sc.stop()
-
- #********** End **********#
复制代码 8.Transformation - sortBy
- # -*- coding: UTF-8 -*-
- from pyspark import SparkContext
-
- if __name__ == "__main__":
- # ********** Begin **********#
-
- # 1.初始化 SparkContext,该对象是 Spark 程序的入口
- sc = SparkContext("local", "Simple App")
-
- # 2.创建一个内容为(1, 3, 5, 7, 9, 8, 6, 4, 2)的列表List
- data = [1,3,5,7,9,8,6,4,2]
-
- # 3.通过 SparkContext 并行化创建 rdd
- rdd = sc.parallelize(data)
-
- # 4.使用rdd.collect() 收集 rdd 的元素
- print(rdd.collect())
-
-
- """
- 使用 sortBy 算子,将 rdd 的数据 (1, 3, 5, 7, 9, 8, 6, 4, 2) 按照下面的规则进行转换操作,规则如下:
- 需求:
- 元素排序,例如:
- 5,4,3,1,2 --> 1,2,3,4,5
- """
- # 5.使用 sortBy 算子完成以上需求
- a = rdd.sortBy(lambda x:x)
-
- # 6.使用rdd.collect() 收集完成 sortBy 转换的元素
- print(a.collect())
-
- # 7.停止 SparkContext
- sc.stop()
-
- #********** End **********#
复制代码 9.Transformation - sortByKey
- # -*- coding: UTF-8 -*-
- from pyspark import SparkContext
-
- if __name__ == "__main__":
- # ********** Begin **********#
-
- # 1.初始化 SparkContext,该对象是 Spark 程序的入口
- sc = SparkContext("local", "Simple App")
-
- # 2.创建一个内容为[(B',1),('A',2),('C',3)]的列表List
- data = [("B",1),("A",2),("C",3)]
-
- # 3.通过 SparkContext 并行化创建 rdd
- rdd = sc.parallelize(data)
-
- # 4.使用rdd.collect() 收集 rdd 的元素
- print(rdd.collect())
-
- """
- 使用 sortByKey 算子,将 rdd 的数据 ('B', 1), ('A', 2), ('C', 3) 按照下面的规则进行转换操作,规则如下:
- 需求:
- 元素排序,例如:
- [(3,3),(2,2),(1,1)] --> [(1,1),(2,2),(3,3)]
- """
- # 5.使用 sortByKey 算子完成以上需求
- a = rdd.sortByKey()
-
- # 6.使用rdd.collect() 收集完成 sortByKey 转换的元素
- print(a.collect())
-
- # 7.停止 SparkContext
- sc.stop()
-
- # ********** End **********#
复制代码 10.Transformation - mapValues
- # -*- coding: UTF-8 -*-
- from pyspark import SparkContext
-
- if __name__ == "__main__":
- # ********** Begin **********#
-
- # 1.初始化 SparkContext,该对象是 Spark 程序的入口
- sc = SparkContext("local", "Simple App")
-
- # 2.创建一个内容为[("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]的列表List
- data = [("1",1),("2",2),("3",3),("4",4),("5",5)]
-
- # 3.通过 SparkContext 并行化创建 rdd
- rdd = sc.parallelize(data)
-
- # 4.使用rdd.collect() 收集 rdd 的元素
- print(rdd.collect())
-
- """
- 使用 mapValues 算子,将 rdd 的数据 ("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5) 按照下面的规则进行转换操作,规则如下:
- 需求:
- 元素(key,value)的value进行以下操作:
- 偶数转换成该数的平方
- 奇数转换成该数的立方
- """
- # 5.使用 mapValues 算子完成以上需求
- a = rdd.mapValues(lambda x:x*x if x%2==0 else x*x*x)
-
- # 6.使用rdd.collect() 收集完成 mapValues 转换的元素
- print(a.collect())
-
- # 7.停止 SparkContext
- sc.stop()
-
- # ********** End **********#
复制代码 11.Transformations - reduceByKey
- # -*- coding: UTF-8 -*-
- from pyspark import SparkContext
-
- if __name__ == "__main__":
- # ********** Begin **********#
-
- # 1.初始化 SparkContext,该对象是 Spark 程序的入口
- sc = SparkContext("local", "Simple App")
-
- # 2.创建一个内容为[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]的列表List
- data = [("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]
-
- # 3.通过 SparkContext 并行化创建 rdd
- rdd = sc.parallelize(data)
-
- # 4.使用rdd.collect() 收集 rdd 的元素
- print(rdd.collect())
-
- """
- 使用 reduceByKey 算子,将 rdd 的数据[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)] 按照下面的规则进行转换操作,规则如下:
- 需求:
- 元素(key-value)的value累加操作,例如:
- (1,1),(1,1),(1,2) --> (1,4)
- (1,1),(1,1),(2,2),(2,2) --> (1,2),(2,4)
- """
- # 5.使用 reduceByKey 算子完成以上需求
- a = rdd.reduceByKey(lambda x,y:x+y)
-
- # 6.使用rdd.collect() 收集完成 reduceByKey 转换的元素
- print(a.collect())
-
- # 7.停止 SparkContext
- sc.stop()
-
- # ********** End **********#
复制代码 12.WordCount - 词频统计
- # -*- coding: UTF-8 -*-
- from pyspark import SparkContext
-
- if __name__ == "__main__":
-
- """
- 需求:对本地文件系统URI为:/root/wordcount.txt 的内容进行词频统计
- """
- # ********** Begin **********#
-
- sc = SparkContext("local","pySpark")
- rdd = sc.textFile("/root/wordcount.txt")
- values = rdd.flatMap(lambda x:str(x).split(" ")).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y).sortBy(lambda x:tuple(x)[1],False)
- print(values.collect())
-
- # ********** End **********#
复制代码 13.Actions - 常用算子
- # -*- coding: UTF-8 -*-
- from pyspark import SparkContext
-
- if __name__ == "__main__":
- # ********** Begin **********#
-
- # 1.初始化 SparkContext,该对象是 Spark 程序的入口
- sc = SparkContext("local","Simple App")
- # 2.创建一个内容为[1, 3, 5, 7, 9, 8, 6, 4, 2]的列表List
- data = [1, 3, 5, 7, 9, 8, 6, 4, 2]
- # 3.通过 SparkContext 并行化创建 rdd
- rdd = sc.parallelize(data)
- # 4.收集rdd的所有元素并print输出
- print(rdd.collect())
- # 5.统计rdd的元素个数并print输出
- print(rdd.count())
- # 6.获取rdd的第一个元素并print输出
- print(rdd.first())
- # 7.获取rdd的前3个元素并print输出
- print(rdd.take(3))
- # 8.聚合rdd的所有元素并print输出
- print(rdd.reduce(lambda x,y:x+y))
- # 9.停止 SparkContext
- sc.stop()
-
- # ********** End **********#
复制代码 14.Friend Recommendation - 好友保举
- # -*- coding: UTF-8 -*-
- from pyspark import SparkContext
-
- def word_couple(word1, word2):
- if hash(word1) > hash(word2):
- return word1 + '_' + word2
- return word2 + '_' + word1
-
- def relations(items):
- result = []
- for i in range(1, len(items)):
- result.append((word_couple(items[0], items[i]), 0))
- for j in range(i+1, len(items)):
- result.append((word_couple(items[i], items[j]), 1))
- return result
-
- def fun2(x):
- values = tuple(x[1])
- return ((x[0], 0) if min(values)==0 else (x[0], sum(values)))
-
- if __name__ == "__main__":
- """
- 需求:对本地文件系统URI为:/root/friend.txt 的数据统计间接好友的数量
- """
- # ********** Begin **********#
- sc = SparkContext("local", "friend recommendation")
- src = sc.textFile("/root/friend.txt").map(lambda x:x.strip().encode('utf-8').split(" "))
- rdd = src.flatMap(relations).reduceByKey(lambda x,y:0 if x==0 or y==0 else x+y).filter(lambda x:x[1]>0)
- print(rdd.collect())
-
- # ********** End **********#
复制代码 3.Spark SQL
1.创建 SparkSession;
- from pyspark.sql import SparkSession
- spark = SparkSession \
- .builder \
- .appName("Python Spark SQL basic example") \
- .config("spark.sql.crossJoin.enabled", "true") \
- .master("local") \
- .getOrCreate()
复制代码 2.读取所给 Json 数据创建 DataFrame;
- df =spark.read.json("/jun.json")
复制代码 3.创建视图;
- df.createOrReplaceTempView("table1")
复制代码 4.编写 sql 语句盘算指标;
- sqlDF = spark.sql("sql语句")
复制代码 使用SparkSQL统计战斗机飞行性能
sql 语句参考处理思路:
最大飞行速率字段特性:带有分隔符的数字和中文组成,如 1,438.4 千米每小时
第一步:使用 regexp_extract 通过正则表达式提取中文前的全部数字。
regexp_extract函数:
regexp_extract(str, regexp, [idx])
参数说明:
str是被剖析的字符串或字段名;
regexp 是正则表达式;
idx是返回效果,取表达式的哪一部分默认值为1;
0表示把整个正则表达式对应的效果全部返回;
1表示返回正则表达式中第一个()对应的效果,以此类推 。
示例:
select regexp_extract('hitdecisiondlist','(i)(.*?)(e)',0) ;
得到的效果为: itde
select regexp_extract('hitdecisiondlist','(i)(.*?)(e)',1) ;
得到的效果为: i
本例可使用如下表达式提取:
(regexp_extract(`最大飞行速率`,'[\d,\.]+',0)
需要留意的是当把该表达式加入spark.sql("sql语句")时,\d和\.需要转义(\\)。
第二步:使用 regexp_replace 函数去除千位分隔符,。
regexp_replace 函数:
regexp_replace(str, pattern, replacement)[source]
参数说明:
str:待搜刮的字符串表达式。
pattern:匹配内容的正则表达式。
replacement:更换用的字符串表达式。
本例可使用如下表达式将,更换为'':
regexp_replace(regexp_extract(`最大飞行速率`,'[\\\d,\\\.]+',0),'\\\,','')
第三步:使用 CAST 函数将字符串转化为 double 用于排序。
本例可使用如下表达式:
cast(replace(regexp_extract(`最大飞行速率`,'[\\\d,\\\.]+',0),',','') as float)
第四步:使用降序输出前三。
第五步:将处理效果生存到本地目次;
sqlDF.write.format("csv").save("生存路径")
第六步:停止 SparkSession。
spark.stop()
统计出全球飞行速率排名前三的战斗机。 本实训提供一份全球战斗机相干指标参数的 Json 数据(数据在 /root/jun.json)。
统计出指标后将效果以 csv 格式生存到 /root/airspark 目次。
- # coding=utf-8
- from pyspark.sql import SparkSession
- #**********Begin**********#
- #创建SparkSession
- spark = SparkSession \
- .builder \
- .appName("Python Spark SQL ") \
- .master("local")\
- .getOrCreate()
- #读取/root/jun.json中数据
- df = spark.read.json("/root/jun.json").coalesce(1)
- #创建视图
- df.createOrReplaceTempView("table1")
- #统计出全球飞行速度排名前三的战斗机
- out=spark.sql("select cast(regexp_replace(regexp_extract(`最大飞行速度`,'[\\\d,\\\.]+',0),'\\\,','') as float) as speed,`名称` from table1 order by cast(regexp_replace(regexp_extract(`最大飞行速度`,'[\\\d,\\\.]+',0),'\\\,','') as float) DESC limit 3")
- #保存结果
- out.write.mode("overwrite").format("csv").save("/root/airspark")
- #**********End**********#
- spark.stop()
复制代码 使用SparkSQL统计各个研发单元研制战斗机占比
- # coding=utf-8
- from pyspark.sql import SparkSession
- #**********Begin**********#
- #创建SparkSession
- spark = SparkSession \
- .builder \
- .appName("Python Spark SQL ") \
- .master("local")\
- .getOrCreate()
- #读取/root/jun.json中数据
- df = spark.read.json("/root/jun.json").coalesce(1)
- #创建视图
- df.createOrReplaceTempView("table1")
- #统计出全球各研发单位研制的战斗机在全球所有战斗机中的占比
- out=spark.sql("select concat(round(count(`研发单位`)*100/(select count(`研发单位`) as num from table1 where `研发单位` is not null and `名称`is not null ),2),'%') as ratio, `研发单位` from table1 where `研发单位` is not null and `名称`is not null group by `研发单位`")
- #保存结果
- out.write.mode("overwrite").format("csv").save("/root/airspark")
- #**********End**********#
- spark.stop()
复制代码 SparkSQL 读取 CSV
- spark = SparkSession.builder.appName("demo").master("local").getOrCreate()
- spark.read.option("header", True).option("delimiter", "CSV分隔符").csv("csv路径")
复制代码 option参数说明:
header为true:将 CSV 第一行数据作为头部信息,换一句来说,就是将 CSV 的第一行数据作为 SparkSQL 表的字段
delimiter:分隔符,例如,CSV 文件默认以英文逗号进行字段分隔,那么 delimiter 为英文逗号,如果文件以分号进行字段分隔,那么 delimiter 为分号
SparkSQL 内置字符串处理函数
正则表达式
正则表达式的限定符
将出租车轨迹数据规整化,洗濯掉多余的字符串,并使用 DataFrame.show() 打印输出。
CSV 文件是以 \t 进行字段分隔,文件路径为 /root/data.csv
- # -*- coding: UTF-8 -*-
- from pyspark.sql import SparkSession
- if __name__ =='__main__':
- spark = SparkSession.builder.master("local").appName("demo").getOrCreate()
- #**********begin**********#
- df = spark.read.option("header",True).option("delimiter","\t").csv("/root/data.csv")
- df.createTempView("data")
- spark.sql("""
- select regexp_replace(TRIP_ID,'\\\W+','') as TRIP_ID ,
- regexp_replace(CALL_TYPE,'\\\W+','') as CALL_TYPE ,
- regexp_replace(ORIGIN_CALL,'\\\W+','') as ORIGIN_CALL ,
- regexp_replace(TAXI_ID,'\\\W+','') as TAXI_ID ,
- regexp_replace(ORIGIN_STAND,'\\\W+','') as ORIGIN_STAND ,
- regexp_replace(TIMESTAMP,'\\\W+','') as TIMESTAMP ,
- regexp_replace(POLYLINE,'\\\W+','') as POLYLINE
- from data
- """).show()
- #**********end**********#
- spark.stop()
复制代码 SparkSql UDF
UDF对表中的单行进行转换,以便为每行天生单个对应的输出值。例如,大多数 SQL环境提供UPPER函数返回作为输入提供的字符串的大写版本。
用户自定义函数可以在Spark SQL中定义和注册为UDF,而且可以关联别名,这个别名可以在后面的SQL查询中使用。作为一个简朴的示例,我们将定义一个 UDF来将以下JSON数据中的温度从摄氏度(degrees Celsius)转换为华氏度(degrees Fahrenheit):
python自定义udf函数:
- df = sqlContext.read.json("temperatures.json")
-
- df.registerTempTable("citytemps")
- # Register the UDF with our SQLContext
-
- sqlContext.registerFunction("CTOF", lambda degreesCelsius: ((degreesCelsius * 9.0 / 5.0) + 32.0))
- sqlContext.sql("SELECT city, CTOF(avgLow) AS avgLowF, CTOF(avgHigh) AS avgHighF FROM citytemps").show()
复制代码 留意,Spark SQL定义了UDF1到UDF22共22个类,UDF最多支持22 个输入参数。上面的例子中使用UDF1来处理我们单个温度值作为输入。如果我们不想修改Apache Spark的源代码,对于需要超过22个输出参数的应用程序我们可以使用数组或布局作为参数来解决这个标题,如果你发现本身用了UDF6 或者更高UDF类你可以思量这样操作。
- # -*- coding: UTF-8 -*-
- from pyspark.sql import SparkSession
- import json
- if __name__ == '__main__' :
- spark = SparkSession.builder.master("local").appName("demo").getOrCreate()
- #**********begin**********#
- df = spark.read.option("header",True).option("delimiter","\t").csv("/root/data2.csv")
- df.createTempView("data")
- spark.sql("select TRIP_ID,CALL_TYPE,ORIGIN_CALL, TAXI_ID, ORIGIN_STAND, from_unixtime(TIMESTAMP,'yyyy-MM-dd') as TIME ,POLYLINE from data").show()
- spark.udf.register("timeLen", lambda x: {
- (len(json.loads(x)) - 1) * 15 if len(json.loads(x)) > 0 else 8
- })
- spark.udf.register("startLocation", lambda x: {
- str(json.loads(x)[0]) if len(json.loads(x)) > 0 else ""
- })
- spark.udf.register( "endLocation", lambda x: {
- str(json.loads(x)[len(json.loads(x)) - 1]) if len(json.loads(x)) > 0 else ""
- })
- df.createTempView("data2")
- res=spark.sql("select TRIP_ID,CALL_TYPE,ORIGIN_CALL,TAXI_ID,ORIGIN_STAND,from_unixtime(TIMESTAMP,'yyyy-MM-dd') as TIME, POLYLINE, timeLen(POLYLINE) as TIMELEN, startLocation(POLYLINE) as STARTLOCATION, endLocation(POLYLINE) as ENDLOCATION from data2")
- res.createTempView("data3")
- res.show()
- spark.sql("select CALL_TYPE,TIME,count(1) as NUM from data3 group by TIME,CALL_TYPE order by CALL_TYPE,TIME").show()
- #**********end**********#
复制代码 5.样题:
下令题.
将/root/data/file2.txt的内容追加到HDFS上/user/2024/input/data.txt的文件尾部,并显示data.txt的内容。
1.将 /root/data/file2.txt 的内容追加到 HDFS 上 /user/2024/input/data.txt 的文件尾部 在下令行中执行以下下令:
- hadoop fs -appendToFile /root/data/file2.txt hdfs://your-hdfs-namenode:port/user/2024/input/data.txt
复制代码 这里的 your-hdfs-namenode:port 需要更换为你现实的 HDFS 名称节点地址和端口号。
2.显示 data.txt 的内容
- hadoop fs -cat hdfs://your-hdfs-namenode:port/user/2024/input/data.txt
复制代码 同样要将 your-hdfs-namenode:port 更换为现实的 HDFS 名称节点相干信息。
程序题:
银行客户数据包括流水号,客户编号,姓名,信用积分,地区,性别,年事,年限,存贷款,产品数,有本行信用卡,活跃用户,收入,已流失,各数据项之间用半角逗号,分隔。数据示例如下:1,15634602,Hargrave,619,France,Female,42,2,0,1,1,1,101348.88,12,15647311,Hill,608,Spain,Female,41,1,83807.86,1,0,1,112542.58,017,15737452,Romeo,653,Germany,Male,58,1,132602.88,1,1,0,5097.67,1
请统计各性别客户的数量,效果按照客户数量降序分列。预期输特别式如下:[('Female',60), ('Male',31)]
MapReduce:
- from mrjob.job import MRJob
- class GenderCountMRJob(MRJob):
- def mapper(self, _, line):
- fields = line.split(',')
- gender = fields[4]
- yield gender, 1
- def reducer(self, gender, counts):
- yield None, (gender, sum(counts))
- def reducer_sort(self, _, gender_count_pairs):
- sorted_pairs = sorted(gender_count_pairs, key=lambda x: x[1], reverse=True)
- for pair in sorted_pairs:
- yield pair
- if __name__ == '__main__':
- GenderCountMRJob.run()
复制代码 Spark RDD:
- from pyspark import SparkContext
- sc = SparkContext("local", "GenderCountSparkRDD")
- data_rdd = sc.textFile("your_data_file.txt")
- gender_count_rdd = data_rdd.map(lambda line: line.split(',')).map(lambda fields: (fields[4], 1)) \
- .reduceByKey(lambda a, b: a + b)
- sorted_gender_count_rdd = gender_count_rdd.map(lambda x: (x[1], x[0])).sortByKey(ascending=False) \
- .map(lambda x: (x[1], x[0]))
- result = sorted_gender_count_rdd.collect()
- for row in result:
- print(row)
- sc.stop()
复制代码 Spark SQL:
- from pyspark.sql import SparkSession
- from pyspark.sql.functions import count
- spark = SparkSession.builder.appName("GenderCountSparkSQL").getOrCreate()
- data_df = spark.read.csv("your_data_file.txt", header=False, inferSchema=True)
- data_df = data_df.withColumnRenamed("_c0", "id").withColumnRenamed("_c1", "customer_id") \
- .withColumnRenamed("_c2", "name").withColumnRenamed("_c3", "credit_score") \
- .withColumnRenamed("_c4", "region").withColumnRenamed("_c5", "gender") \
- .withColumnRenamed("_c6", "age").withColumnRenamed("_c7", "years") \
- .withColumnRenamed("_c8", "deposit_loan").withColumnRenamed("_c9", "product_count") \
- .withColumnRenamed("_c10", "has_credit_card").withColumnRenamed("_c11", "active_user") \
- .withColumnRenamed("_c12", "income").withColumnRenamed("_c13", "has_churned")
- gender_count_df = data_df.groupBy("gender").agg(count("gender").alias("count"))
- sorted_gender_count_df = gender_count_df.orderBy("count", ascending=False)
- result = sorted_gender_count_df.collect()
- for row in result:
- print((row["gender"], row["count"]))
- spark.stop()
复制代码 在上述代码中,your_data_file.txt 需要更换为现实存储银行客户数据的文件名。
分析题:
供电企业的SCADA体系每隔15分钟自动收罗一次用户用电数据,为了减小数据库的存储压力,每个月都要导出上月的数据。营销部门想根据这些数据分析用户的用电风俗。从数据库导出的数据文件随着时间不停增加,需要用符合的方式存储下来。第一种方案是存储到部署在单台服务器上的大型数据库Oracle里,第二种方案是存储到Hadoop集群的分布式文件体系HDFS里。你倾向于选择哪种方案,说说你的来由。
框架分析:
倾向于选择将数据存储到 Hadoop 集群的分布式文件体系 HDFS 里,来由如下:
数据量与可扩展性方面
HDFS 优势:供电企业的用电数据会随着时间不停增加,数据量庞大。HDFS 是为大规模数据存储而设计的分布式文件体系,可以或许轻松应对海量数据的存储需求。它可以通过在集群中添加更多的节点来线性扩展存储容量,方便适应数据量的连续增长。
Oracle 局限:虽然 Oracle 是大型数据库,但其部署在单台服务器上时,存储容量受限于该服务器的硬件配置。当数据量增长到肯定程度,单台服务器可能面临存储空间不敷的标题,扩展存储相对复杂且成本较高。
数据处理与分析方面
HDFS 优势:结合 Hadoop 生态体系,在 HDFS 存储数据后,可以方便地利用如 MapReduce、Spark 等分布式盘算框架对数据进行高效处理和分析。这些框架能充实发挥集群的盘算能力,并行处理大量用电数据以分析用户用电风俗,适合处理大规模数据的场景。 Oracle 局限:在单台服务器上的 Oracle 数据库进行大规模数据处理时,可能会受到服务器性能瓶颈的限定,如 CPU、内存等资源的不敷。处理海量用电数据以分析用户用电风俗可能会耗费大量时间,效率相对较低。
成本效益方面
HDFS 优势:Hadoop 集群可以基于相对廉价的商用硬件搭建,通过增加节点来提升存储和盘算能力,在大规模数据存储和处理场景下,具有较好的成本效益。
Oracle 局限:Oracle 数据库通常需要购买贸易许可证,而且对于单台服务器配置要求较高以满足不停增长的数据存储和处理需求,这可能导致较高的硬件采购和软件许可成本。
综合思量,对于供电企业不停增长的大量用电数据以及后续要进行的用户用电风俗分析需求,存储到 Hadoop 集群的分布式文件体系 HDFS 里是更符合的方案。
盘算步调分析:
存储到 Oracle 数据库(单台服务器)的盘算步调分析
数据导入步调:
1.首先需要创建与 Oracle 数据库的连接,配置好相应的数据库连接参数,如主机地址、端口号、用户名、密码、数据库实例名等。
2.将从 SCADA 体系导出的每月用电数据文件,按照 Oracle 数据库规定的格式进行整理(可能涉及到数据范例转换、字段映射等操作),确保数据能正确导入数据库表中。
3.使用数据库提供的导入工具(如 SQL*Loader 等)或通过编写 SQL 插入语句,将整理好的数据逐行插入到预先创建好的用于存储用电数据的表中。
数据分析步调:
1.当营销部门要分析用户用电风俗时,需要在 Oracle 数据库中编写复杂的 SQL 查询语句。例如,要分析差别时间段用户的用电量厘革,可能需要通过分组查询(GROUP BY)、条件筛选(WHERE)、聚合函数(如 SUM、AVG 等)等来提取相干数据。
2.若要进行更深入的跨时间段、跨用户群体等复杂分析,可能还需要编写存储过程、函数等来实现,这涉及到对数据库编程知识的熟练运用。
3.由于数据存储在单台服务器上的 Oracle 数据库中,在执行这些查询和分析操作时,受限于服务器的硬件资源(如 CPU、内存等)。如果数据量庞大,查询可能会耗费较长时间,甚至可能因为资源不敷导致查询失败或性能严重下降。
存储到 HDFS(Hadoop 集群)的盘算步调分析
数据导入步调:
1.首先要确保 Hadoop 集群正常运行,而且与 SCADA 体系所在的环境可以或许进行数据传输。 2.将从 SCADA 体系导出的每月用电数据文件,直接上传到 HDFS 的指定目次下。可以使用 Hadoop 提供的下令行工具(如 hadoop fs -put 等)或在相干应用程序中调用 HDFS 的 API 来完成数据上传操作。
数据分析步调:
1.在 HDFS 存储数据后,可以利用 Hadoop 生态体系中的分布式盘算框架进行数据分析。比如使用 MapReduce 或 Spark 等。
以 MapReduce 为例:
1.首先要编写 MapReduce 程序,在 Mapper 阶段,将从 HDFS 读取的用电数据文件中的每一行数据进行剖析,提取出与分析用户用电风俗相干的字段(如用户 ID、用电量、用电时间等),并以键值对的情势输出。例如,以用户 ID 为键,用电量和用电时间等信息为值。
2.在 Reducer 阶段,对 Mapper 输出的具有相同键(即同一用户)的键值对进行汇总处理,比如盘算该用户在差别时间段的总用电量、平均用电量等,从而分析出用户的用电风俗。
以 Spark 为例:
1.可以先创建 SparkContext 或 SparkSession 来初始化 Spark 环境,然后通过 Spark 的 API 将存储在 HDFS 中的用电数据文件读取为 RDD(弹性分布式数据集)或 DataFrame(数据框)情势。
2.接着利用 Spark 提供的丰富的函数和操作符,如分组(groupBy)、聚合(agg)、窗口函数(window functions)等,对数据进行处理和分析,以得出用户的用电风俗相干的结论。 由于 Hadoop 集群是由多个节点组成的分布式体系,这些分布式盘算框架可以充实利用集群的盘算资源,并行处理大量的用电数据,大大进步了数据分析的效率,纵然面对海量数据,也能相对快速地完成分析任务。
从盘算步调分析来看,存储到 HDFS 并结合分布式盘算框架进行数据分析,在处理供电企业大量用电数据以分析用户用电风俗时,具有更好的机动性、可扩展性和效率,相比之下更适合这种大规模数据处理的场景。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |