基于Spark的情形数据处理与分析

诗林  高级会员 | 2024-7-24 08:24:41 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 227|帖子 227|积分 681

本实行采用Python语言,从网页爬取情形数据,并使用大数据处理框架Spark对情形数据举行处理分析,并对分析结果举行可视化。
一、实行环境

(1)Linux: Ubuntu 20.04
(2)Python: 3.6
(3)Spark: 3.2.0
(4)pycharm
安装完上述环境以后,为了支持Python可视化分析,还须要实行如下命令安装新的组件:
二、实行数据介绍

本次实行所采用的数据,从中央情形台官方网站(网址:http://www.nmc.cn/)爬取,主要是最近24小时各个都会的天气数据,包括时间点(整点)、整点气温、整点降水量、风力、整点气压、相对湿度等。正常情况下,每个都会会对应24条数据(每个整点一条)。数据规模达到2429个都会,58297条数据,有部分都会部分时间点数据存在缺失或非常。限于本次大作业时间有限,没有办法全面分析这些数据,大作业中主要计算分析了各个都会过去24小时的均匀气暖和降水量情况。

图 1 中央情形台官网

图 2 爬取的表格passed_weather_ALL.csv信息
三、数据获取

1.观察数据获取方式

打开中央情形台官方网站(网址:http://www.nmc.cn/),恣意点击左侧栏“热门都会”中的一个都会。打开火狐(Firefox)浏览器或者谷歌(chrome)浏览器的Web控制台。通过切换“省份”和“都会”,我们可以发现,网页中的数据是以json字符串格式异步地从服务器传送。可以发现以下数据和请求URL的关系。

请求URL 传回数据
http://www.nmc.cn/f/rest/province 省份数据

http://www.nmc.cn/f/rest/province/+省份三位编码 某个省份的都会数据

http://www.nmc.cn/f/rest/passed/+都会编号 某个都会最近24小时整点天气数据

由于省份三位编码(如福建省编码为“ABJ”)须要从省份数据获得中获得,都会编号须要从都会数据获得(如福州市编号为“58847”),所以为了获得各个都会最近24小时整点天气数据,依次爬取省份数据、都会数据、最近24小时整点数据。
2.数据爬取

由于可以直接通过访问请求URL,传回的响应的数据部分即是json格式的数据,所以只须要调用python的urllib.request, urllib.error, urllib.parse库中相干函数,对上述URL举行请求即可。不须要像平常爬取HTML网页时还须要对网页源码举行解析,查找相干数据。唯一须要留意的是,有些都会可能不存在或者全部缺失最近24小时整点数据,须要举行过滤,以免出错。
3.数据存储

虽然上一步获取的json数据可以直接存储并可使用SparkSession直接读取,但是为了方便观察数据布局、辨识非常数据、对数据增加部分提示信息,爬取后的数据举行了一些处理之后,保存成了csv格式,包括省份数据(province.csv)、都会数据(city.csv)、各个都会最近24小时整点天气数据(passed_weather_ALL.csv)。由于所有都会过去24小时整点天气数据数目太多,为了制止内存不敷,每爬取50个都会的数据后,就会举行一次保存。
4.数据读取

因为各个都会最近24小时整点天气数据体量较大,每次爬取须要半小时以上,为了提高实行效率,只会举行一次数据爬取。以后会直接读取第一次实行数据。假如须要重新爬取数据,须要手动删除已有数据,即删除input文件夹下province.csv、city.csv、passed_weather_ALL.csv。
5.数据布局

最后保存的各个都会最近24小时整点天气数据(passed_weather_ALL.csv)每条数据各字段含义如下所示,这里仅列出实行中使用部分。
字段 含义
province 都会地点省份(中文)
city_index 都会序号(计数)
city_name 都会名称(中文)
city_code 都会编号
time 时间点(整点)
temperature 气温
rain1h 过去1小时降雨量
6.爬虫过程截图

开始爬虫

运行中的截图:

爬取完毕:

四、数据分析

数据分析主要使用Spark SQL相干知识与技术,对各个都会过去24小时累积降雨量和当日均匀气温举行了计算和排序。
1.计算各个都会过去24小时累积雨量

思路:按照都会对数据举行分组,对每个都会的rain1h字段举行分组求和。
特别阐明:由于获取数据所需时间较长,天气数据的时间跨度可能略有差别等,这里为了简化操作没有筛选出具有雷同时间跨度的数据再举行计算。
相干步调如下:
(1)创建SparkSession对象spark;
(2)使用spark.read.csv(filename)读取passed_weather_ALL.csv数据天生Dateframe df;
(3)对df举行操作:使用Dateframe的select方法选择province,city_name,city_code,rain1h字段,并使用Column对象的cast(dateType)方法将rain1h转成数值型,再使用Dateframe的filter方法筛选出rain1h小于1000的记录(大于1000是非常数据),得到新的Dateframe df_rain;
(4)对df_rain举行操作:使用Dateframe的groupBy操作按照province,city_name,city_code的字段分组,使用agg方法对rain1h字段举行分组求和得到新的字段rain24h(过去24小时累积雨量),使用sort方法按照rain24h降序分列,经过上述操作得到新的Dateframe df_rain_sum
(5)对df_rain_sum调用cache()方法将此前的转换关系举行缓存,提高性能
(6)对df_rain_sum调用coalesce()将数据分区数目减为1,并使用write.csv(filename)方法将得到的数据长期化到本地文件。
(7)对df_rain_sum调用head()方法取前多少条数据(即24小时累积降水量Top-N的列表)供数据可视化使用。
本部分分析对应的详细代码如下:
  1. def passed_rain_analyse(filename):  # 计算各个城市过去24小时累积雨量
  2.     print("begin to analyse passed rain")
  3.     spark = SparkSession.builder.master("local").appName("passed_rain_analyse").getOrCreate()
  4.     df = spark.read.csv(filename, header=True)
  5.     df_rain = df.select(df['province'], df['city_name'], df['city_code'], df['rain1h'].cast(DecimalType(scale=1))) \
  6.         .filter(df['rain1h'] < 1000)  # 筛选数据,去除无效数据
  7.     df_rain_sum = df_rain.groupBy("province", "city_name", "city_code") \
  8.         .agg(F.sum("rain1h").alias("rain24h")) \
  9.         .sort(F.desc("rain24h"))  # 分组、求和、排序
  10.     df_rain_sum.cache()
  11.     df_rain_sum.coalesce(1).write.csv("file:///home/hadoop/PythonCode/SparkAnalysis/passed_rain_analyse.csv")
  12.     print("end analysing passed rain")
  13.     return df_rain_sum.head(20)
复制代码
2.计算各个都会当日均匀气温

思路:根据国家尺度(《地面情形服务观测规范》),日均匀气温取四时次数据的均匀值,四时次数据为:02时、08时、14时、20时。据此,应该先筛选出各个时次的气温数据,再按照都会对数据举行分组,对每个都会的tempeature字段举行分组求均匀。
特别阐明:为了能获取到上述一天的四个时次的天气数据,建议在当天的20时30分后再爬取数据。
相干步调如下:
(1)创建SparkSession对象spark;
(2)使用spark.read.csv(filename)读取passed_weather_ALL.csv数据天生Dateframe df;
(3)对df举行操作:使用Dateframe的select方法选择province,city_name,city_code,temperature字段,并使用库pyspark.sql.functions中的date_format(col,pattern)方法和hour(col)将time字段转换成date(日期)字段和hour(小时)字段,(time字段的分秒信息无用),,得到新的Dateframe df_temperature;
(4)对df_temperature举行操作:使用Dateframe的filter操作过滤出hour字段在[2,8,14,20]中的记录,经过上述操作得到新的Dateframe df_4point_temperature
(5)对df_4point_temperature举行操作:使用Dateframe的groupBy操作按照province,city_name,city_code,date字段分组,使用agg方法对temperature字段举行分组计数和求和(求和字段定名为avg_temperature),使用filter方法过滤出分组计数为4的记录(确保有4个时次才能计算日均匀温),使用sort方法按照avg_temperature降序分列,再筛选出须要保存的字段province,city_name,city_code,date,avg_temperature(趁便使用库pyspark.sql.functions中的format_number(col,precision)方法保存一位小数),经过上述操作得到新的Dateframe df_avg_temperature
(6)对df_avg_temperature调用cache()方法将此前的转换关系举行缓存,提高性能
(7)对df_avg_temperature调用coalesce()将数据分区数目减为1,并使用write.json(filename)方法将得到的数据长期化到本地文件。
(8)对df_rain_sum调用collect()方法取将Dateframe转换成list,方便后续举行数据可视化。
本部分分析对应的详细代码如下:
  1. def passed_temperature_analyse(filename):
  2.     print("begin to analyse passed temperature")
  3.     spark = SparkSession.builder.master("local").appName("passed_temperature_analyse").getOrCreate()
  4.     df = spark.read.csv(filename, header=True)
  5.     df_temperature = df.select(  # 选择需要的列
  6.         df['province'],
  7.         df['city_name'],
  8.         df['city_code'],
  9.         df['temperature'].cast(DecimalType(scale=1)),
  10.         F.date_format(df['time'], "yyyy-MM-dd").alias("date"),  # 得到日期数据
  11.         F.hour(df['time']).alias("hour")  # 得到小时数据
  12.     )
  13.     # 筛选四点时次
  14.     df_4point_temperature = df_temperature.filter(df_temperature['hour'].isin([2, 8, 12, 20]))
  15.     # df_4point_temperature.printSchema()
  16.     df_avg_temperature = df_4point_temperature.groupBy("province", "city_name", "city_code", "date") \
  17.         .agg(F.count("temperature"), F.avg("temperature").alias("avg_temperature")) \
  18.         .filter("count(temperature) = 4") \
  19.         .sort(F.asc("avg_temperature")) \
  20.         .select("province", "city_name", "city_code", "date",
  21.                 F.format_number('avg_temperature', 1).alias("avg_temperature"))
  22.     df_avg_temperature.cache()
  23.     avg_temperature_list = df_avg_temperature.collect()
  24.     df_avg_temperature.coalesce(1).write.json("file:///home/hadoop/PythonCode/SparkAnalysis/passed_temperature_analyse.json")
  25.     print("end analysing passed temperature")
  26.     return avg_temperature_list[0:10]
复制代码
3.计算各个都会当日均匀湿度

详细步调与计算计算各个都会当日均匀气温类似;
代码:
  1. def passed_humidity_analyse(filename):
  2.     print("begin to analyse passed humidity")
  3.     spark = SparkSession.builder.master("local").appName("passed_humidity_analyse").getOrCreate()
  4.     df = spark.read.csv(filename, header=True)
  5.     df_humidity = df.select(  # 选择需要的列
  6.         df['province'],
  7.         df['city_name'],
  8.         df['city_code'],
  9.         df['humidity'].cast(DecimalType(scale=1)),
  10.         F.date_format(df['time'], "yyyy-MM-dd").alias("date"),  # 得到日期数据
  11.         F.hour(df['time']).alias("hour")  # 得到小时数据
  12.     )
  13.     # 筛选四点时次
  14.     df_4point_humidity = df_humidity.filter(df_humidity['hour'].isin([2, 8, 12, 20]))
  15.     # df_4point_humidity.printSchema()
  16.     df_avg_humidity = df_4point_humidity.groupBy("province", "city_name", "city_code", "date") \
  17.         .agg(F.count("humidity"), F.avg("humidity").alias("avg_humidity")) \
  18.         .filter("count(humidity) = 4") \
  19.         .sort(F.asc("avg_humidity")) \
  20.         .select("province", "city_name", "city_code", "date",
  21.                 F.format_number('avg_humidity', 1).alias("avg_humidity"))
  22.     df_avg_humidity.cache()
  23.     avg_humidity_list = df_avg_humidity.collect()
  24.     df_avg_humidity.coalesce(1).write.json("file:///home/hadoop/PythonCode/SparkAnalysis/passed_humidity_analyse.json")
  25.     print("end analysing passed analyse")
  26.     return avg_humidity_list[0:10]
复制代码
4.计算各个都会当日均匀风速

详细步调与计算计算各个都会当日均匀气温类似;
代码:
  1. def passed_windSpeed_analyse(filename):
  2.     print("begin to analyse passed windSpeed")
  3.     spark = SparkSession.builder.master("local").appName("passed_windSpeed_analyse").getOrCreate()
  4.     df = spark.read.csv(filename, header=True)
  5.     df_windSpeed = df.select(  # 选择需要的列
  6.         df['province'],
  7.         df['city_name'],
  8.         df['city_code'],
  9.         df['windSpeed'].cast(DecimalType(scale=1)),
  10.         F.date_format(df['time'], "yyyy-MM-dd").alias("date"),  # 得到日期数据
  11.         F.hour(df['time']).alias("hour")  # 得到小时数据
  12.     )
  13.     # 筛选四点时次
  14.     df_4point_windSpeed = df_windSpeed.filter(df_windSpeed['hour'].isin([2, 8, 12, 20]))
  15.     # df_4point_windSpeed.printSchema()
  16.     df_avg_windSpeed = df_4point_windSpeed.groupBy("province", "city_name", "city_code", "date") \
  17.         .agg(F.count("windSpeed"), F.avg("windSpeed").alias("avg_windSpeed")) \
  18.         .filter("count(windSpeed) = 4") \
  19.         .sort(F.asc("avg_windSpeed")) \
  20.         .select("province", "city_name", "city_code", "date",
  21.                 F.format_number('avg_windSpeed', 1).alias("avg_windSpeed"))
  22.     df_avg_windSpeed.cache()
  23.     avg_windSpeed_list = df_avg_windSpeed.collect()
  24.     df_avg_windSpeed.coalesce(1).write.json("file:///home/hadoop/PythonCode/SparkAnalysis/passed_windSpeed_analyse.json")
  25.     print("end analysing passed windSpeed")
  26.     return avg_windSpeed_list[0:10]
复制代码
五、数据可视化

数据可视化使用python matplotlib库。可使用pip命令安装。也可以pycharm直接安装。

绘制过程大要如下:
第一步,应当设置字体,这里提供了黑体的字体文件simhei.tff。否则坐标轴等出现中文的地方是乱码。
第二步,设置数据(累积雨量或者日均匀气温)和横轴坐标(都会名称),配置直方图。
第三步,配置横轴坐标位置,设置纵轴坐标范围
第四步,配置横纵坐标标签
第五步,配置每个条形图上方显示的数据
第六步,根据上述配置,画出直方图。
画图部分对应的运行截图如下:




保存的matplotlib作的图:




六、数据以及源代码

1.爬取的数据截图:

city.csv

passed_weather_ALL.csv

province.csv

2.爬虫代码

  1. # -*- coding: utf-8 -*-
  2. import urllib.request, urllib.error, urllib.parse
  3. import json
  4. import csv
  5. import os
  6. import time
  7. import random
  8. import socket
  9. class Crawler:
  10.     def get_html(self, url):
  11.         head = {
  12.             "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36"
  13.         }
  14.         request = urllib.request.Request(url, headers=head)
  15.         NET_STATUS = False
  16.         while not NET_STATUS:
  17.             try:
  18.                 # url = 'http://www.nmc.cn/f/rest/province'
  19.                 response = urllib.request.urlopen(request, timeout=5)
  20.                 html = response.read().decode("utf-8")
  21.                 return html
  22.             except socket.timeout:
  23.                 print('NET_STATUS is not good')
  24.                 NET_STATUS = False
  25.     # def get_html(self, url):  # 得到指定一个URL的网页内容
  26.     #     head = {
  27.     #         "User-Agent": "Mozilla/5.0(Windows NT 10.0;Win64;x64) AppleWebKit/537.36(KHTML, likeGecko) Chrome / 89.0.4389.90Safari / 537.36"
  28.     #     }
  29.     #     request = urllib.request.Request(url, headers=head)
  30.     #     try:
  31.     #         response = urllib.request.urlopen(request, timeout=5)
  32.     #         html = response.read().decode("utf-8")
  33.     #         # print(html)
  34.     #     except urllib.error.URLError as e:
  35.     #         if hasattr(e, "code"):
  36.     #             print(e.code)
  37.     #         if hasattr(e, "reason"):
  38.     #             print(e.reason)
  39.     #     return html
  40.     def parse_json(self, url):
  41.         obj = self.get_html(url)
  42.         if obj:
  43.             json_obj = json.loads(obj)
  44.         else:
  45.             json_obj = list()
  46.         # print json_obj
  47.         # for obj in json_obj:
  48.         #     print obj
  49.         # print chardet.detect(obj['name'])
  50.         return json_obj
  51.         # soup = BeautifulSoup(html_doc,"html.parser",from_encoding='utf-8')
  52.         # links = soup.find_all('a')
  53.         # print "all links"
  54.         # for link in links:
  55.         #     print link.name,link['href']
  56.     def write_csv(self, file, data):
  57.         if data:
  58.             print("begin to write to " + file)
  59.             with open(file, 'a+', newline='') as f:
  60.                 # f.write(codecs.BOM_UTF8)
  61.                 f_csv = csv.DictWriter(f, list(data[0].keys()))
  62.                 if not os.path.exists(file):
  63.                     f_csv.writeheader()
  64.                 f_csv.writerows(data)
  65.                 # writerows()将一个二维列表中的每一个列表写为一行。
  66.             print("end to write to " + file)
  67.     def write_header(self, file, data):
  68.         if data:
  69.             print("begin to write to " + file)
  70.             with open(file, 'a+', newline='') as f:
  71.                 # f.write(codecs.BOM_UTF8)
  72.                 f_csv = csv.DictWriter(f, list(data[0].keys()))
  73.                 f_csv.writeheader()
  74.                 f_csv.writerows(data)
  75.             print("end to write to " + file)
  76.     def write_row(self, file, data):
  77.         if data:
  78.             print("begin to write to " + file)
  79.             with open(file, 'a+', newline='') as f:
  80.                 # f.write(codecs.BOM_UTF8)
  81.                 f_csv = csv.DictWriter(f, list(data[0].keys()))
  82.                 # f_csv.writeheader()
  83.                 f_csv.writerows(data)
  84.             print("end to write to " + file)
  85.     def read_csv(self, file):
  86.         print("begin to read " + file)
  87.         with open(file, 'r') as f:
  88.             data = csv.DictReader(f)
  89.             print("end to read " + file)
  90.             return list(data)
  91.     def get_provinces(self):
  92.         province_file = 'input/province.csv'
  93.         if not os.path.exists(province_file):  # 如果没有省份文件,开始爬取城市信息
  94.             print("begin crawl province")
  95.             provinces = self.parse_json('http://www.nmc.cn/f/rest/province')
  96.             print("end crawl province")
  97.             self.write_csv(province_file, provinces)
  98.         else:
  99.             provinces = self.read_csv(province_file)
  100.         return provinces
  101.     def get_cities(self):  # 获取城市
  102.         city_file = 'input/city.csv'
  103.         if not os.path.exists(city_file):  # 如果没有城市文件,开始爬取城市信息
  104.             cities = list()
  105.             print("begin crawl city")
  106.             for province in self.get_provinces():  # 循环34个省份
  107.                 print(province['name'])
  108.                 url = province['url'].split('/')[-1].split('.')[0]
  109.                 cities.extend(self.parse_json('http://www.nmc.cn/f/rest/province/' + url))
  110.             self.write_csv(city_file, cities)
  111.             print("end crawl city")
  112.         else:
  113.             cities = self.read_csv(city_file)
  114.         return cities
  115.     def get_passed_weather(self, province):
  116.         weather_passed_file = 'input/passed_weather_' + province + '.csv'
  117.         if os.path.exists(weather_passed_file):
  118.             return
  119.         passed_weather = list()
  120.         count = 0
  121.         if province == 'ALL':
  122.             print("begin crawl passed weather")  # 开始爬取历史天气
  123.             for city in self.get_cities():
  124.                 print(city['province'] + ' ' + city['city'] + ' ' + city['code'])
  125.                 data = self.parse_json('http://www.nmc.cn/f/rest/passed/' + city['code'])
  126.                 if data:
  127.                     count = count + 1
  128.                     for item in data:
  129.                         item['city_code'] = city['code']
  130.                         item['province'] = city['province']
  131.                         item['city_name'] = city['city']
  132.                         item['city_index'] = str(count)
  133.                     passed_weather.extend(data)
  134.                 # time.sleep(random.random())
  135.                 time.sleep(0.8)
  136.                 if count % 50 == 0:
  137.                     if count == 50:
  138.                         self.write_header(weather_passed_file, passed_weather)
  139.                     else:
  140.                         self.write_row(weather_passed_file, passed_weather)
  141.                     passed_weather = list()
  142.             if passed_weather:
  143.                 if count <= 50:
  144.                     self.write_header(weather_passed_file, passed_weather)
  145.                 else:
  146.                     self.write_row(weather_passed_file, passed_weather)
  147.             print("end crawl passed weather")
  148.         else:
  149.             print("begin crawl passed weather")
  150.             select_city = [x for x in self.get_cities() if x['province'] == province]
  151.             for city in select_city:
  152.                 print(city['province'] + ' ' + city['city'] + ' ' + city['code'])
  153.                 data = self.parse_json('http://www.nmc.cn/f/rest/passed/' + city['code'])
  154.                 if data:
  155.                     count = count + 1
  156.                     for item in data:
  157.                         item['city_index'] = str(count)
  158.                         item['city_code'] = city['code']
  159.                         item['province'] = city['province']
  160.                         item['city_name'] = city['city']
  161.                     passed_weather.extend(data)
  162.                 # time.sleep(1)
  163.             self.write_csv(weather_passed_file, passed_weather)
  164.             print("end crawl passed weather")
  165.     def run(self, range='ALL'):
  166.         self.get_passed_weather(range)
  167. def main():
  168.     crawler = Crawler()
  169.     crawler.run("ALL")
  170. if __name__ == '__main__':
  171.     main()
复制代码
3.Spark分析代码:

  1. # coding:utf-8from pyspark.sql import SparkSessionfrom pyspark.sql import functions as Ffrom pyspark.sql.types import DecimalType, TimestampTypeimport matplotlib as mplimport matplotlib.pyplot as pltfrom matplotlib.font_manager import FontPropertiesimport osimport mathfrom Crawler import *def passed_rain_analyse(filename):  # 计算各个城市过去24小时累积雨量
  2.     print("begin to analyse passed rain")
  3.     spark = SparkSession.builder.master("local").appName("passed_rain_analyse").getOrCreate()
  4.     df = spark.read.csv(filename, header=True)
  5.     df_rain = df.select(df['province'], df['city_name'], df['city_code'], df['rain1h'].cast(DecimalType(scale=1))) \
  6.         .filter(df['rain1h'] < 1000)  # 筛选数据,去除无效数据
  7.     df_rain_sum = df_rain.groupBy("province", "city_name", "city_code") \
  8.         .agg(F.sum("rain1h").alias("rain24h")) \
  9.         .sort(F.desc("rain24h"))  # 分组、求和、排序
  10.     df_rain_sum.cache()
  11.     df_rain_sum.coalesce(1).write.csv("file:///home/hadoop/PythonCode/SparkAnalysis/passed_rain_analyse.csv")
  12.     print("end analysing passed rain")
  13.     return df_rain_sum.head(20)
  14. def passed_temperature_analyse(filename):
  15.     print("begin to analyse passed temperature")
  16.     spark = SparkSession.builder.master("local").appName("passed_temperature_analyse").getOrCreate()
  17.     df = spark.read.csv(filename, header=True)
  18.     df_temperature = df.select(  # 选择需要的列
  19.         df['province'],
  20.         df['city_name'],
  21.         df['city_code'],
  22.         df['temperature'].cast(DecimalType(scale=1)),
  23.         F.date_format(df['time'], "yyyy-MM-dd").alias("date"),  # 得到日期数据
  24.         F.hour(df['time']).alias("hour")  # 得到小时数据
  25.     )
  26.     # 筛选四点时次
  27.     df_4point_temperature = df_temperature.filter(df_temperature['hour'].isin([2, 8, 12, 20]))
  28.     # df_4point_temperature.printSchema()
  29.     df_avg_temperature = df_4point_temperature.groupBy("province", "city_name", "city_code", "date") \
  30.         .agg(F.count("temperature"), F.avg("temperature").alias("avg_temperature")) \
  31.         .filter("count(temperature) = 4") \
  32.         .sort(F.asc("avg_temperature")) \
  33.         .select("province", "city_name", "city_code", "date",
  34.                 F.format_number('avg_temperature', 1).alias("avg_temperature"))
  35.     df_avg_temperature.cache()
  36.     avg_temperature_list = df_avg_temperature.collect()
  37.     df_avg_temperature.coalesce(1).write.json("file:///home/hadoop/PythonCode/SparkAnalysis/passed_temperature_analyse.json")
  38.     print("end analysing passed temperature")
  39.     return avg_temperature_list[0:10]
  40. def passed_humidity_analyse(filename):
  41.     print("begin to analyse passed humidity")
  42.     spark = SparkSession.builder.master("local").appName("passed_humidity_analyse").getOrCreate()
  43.     df = spark.read.csv(filename, header=True)
  44.     df_humidity = df.select(  # 选择需要的列
  45.         df['province'],
  46.         df['city_name'],
  47.         df['city_code'],
  48.         df['humidity'].cast(DecimalType(scale=1)),
  49.         F.date_format(df['time'], "yyyy-MM-dd").alias("date"),  # 得到日期数据
  50.         F.hour(df['time']).alias("hour")  # 得到小时数据
  51.     )
  52.     # 筛选四点时次
  53.     df_4point_humidity = df_humidity.filter(df_humidity['hour'].isin([2, 8, 12, 20]))
  54.     # df_4point_humidity.printSchema()
  55.     df_avg_humidity = df_4point_humidity.groupBy("province", "city_name", "city_code", "date") \
  56.         .agg(F.count("humidity"), F.avg("humidity").alias("avg_humidity")) \
  57.         .filter("count(humidity) = 4") \
  58.         .sort(F.asc("avg_humidity")) \
  59.         .select("province", "city_name", "city_code", "date",
  60.                 F.format_number('avg_humidity', 1).alias("avg_humidity"))
  61.     df_avg_humidity.cache()
  62.     avg_humidity_list = df_avg_humidity.collect()
  63.     df_avg_humidity.coalesce(1).write.json("file:///home/hadoop/PythonCode/SparkAnalysis/passed_humidity_analyse.json")
  64.     print("end analysing passed analyse")
  65.     return avg_humidity_list[0:10]
  66. def passed_windSpeed_analyse(filename):
  67.     print("begin to analyse passed windSpeed")
  68.     spark = SparkSession.builder.master("local").appName("passed_windSpeed_analyse").getOrCreate()
  69.     df = spark.read.csv(filename, header=True)
  70.     df_windSpeed = df.select(  # 选择需要的列
  71.         df['province'],
  72.         df['city_name'],
  73.         df['city_code'],
  74.         df['windSpeed'].cast(DecimalType(scale=1)),
  75.         F.date_format(df['time'], "yyyy-MM-dd").alias("date"),  # 得到日期数据
  76.         F.hour(df['time']).alias("hour")  # 得到小时数据
  77.     )
  78.     # 筛选四点时次
  79.     df_4point_windSpeed = df_windSpeed.filter(df_windSpeed['hour'].isin([2, 8, 12, 20]))
  80.     # df_4point_windSpeed.printSchema()
  81.     df_avg_windSpeed = df_4point_windSpeed.groupBy("province", "city_name", "city_code", "date") \
  82.         .agg(F.count("windSpeed"), F.avg("windSpeed").alias("avg_windSpeed")) \
  83.         .filter("count(windSpeed) = 4") \
  84.         .sort(F.asc("avg_windSpeed")) \
  85.         .select("province", "city_name", "city_code", "date",
  86.                 F.format_number('avg_windSpeed', 1).alias("avg_windSpeed"))
  87.     df_avg_windSpeed.cache()
  88.     avg_windSpeed_list = df_avg_windSpeed.collect()
  89.     df_avg_windSpeed.coalesce(1).write.json("file:///home/hadoop/PythonCode/SparkAnalysis/passed_windSpeed_analyse.json")
  90.     print("end analysing passed windSpeed")
  91.     return avg_windSpeed_list[0:10]
  92. def draw_rain(rain_list):    print("begin to draw the picture of passed rain")    font = FontProperties(fname='ttf/simhei.ttf')  # 设置字体    name_list = []    num_list = []    for item in rain_list:        name_list.append(item.province[0:2] + '\n' + item.city_name)        num_list.append(item.rain24h)    index = [i + 0.25 for i in range(0, len(num_list))]    rects = plt.bar(index, num_list, color=['r', 'g', 'b', 'y'], width=0.5)    plt.xticks([i + 0.25 for i in index], name_list, fontproperties=font,fontsize=20)    plt.yticks(fontsize=20)    plt.ylim(ymax=(int(max(num_list)+10) / 100) * 100, ymin=0)    plt.xlabel("都会", fontproperties=font,fontsize=20)    plt.ylabel("雨量", fontproperties=font,fontsize=20)    plt.title("过去24小时累计降雨量天下前20名", fontproperties=font,fontsize=20)    for rect in rects:        height = rect.get_height()        plt.text(rect.get_x() + rect.get_width() / 2, height + 1, str(height), ha="center", va="bottom",fontsize=20)    plt.show()    print("ending drawing the picture of passed rain")def draw_temperature(temperature_list):    print("begin to draw the picture of passed temperature")    font = FontProperties(fname='ttf/simhei.ttf')    name_list = []    num_list = []    date = temperature_list[0].date    for item in temperature_list:        name_list.append(item.province[0:2] + '\n' + item.city_name)        # item.avg_temperature = item.avg_temperature.replace(',', '')        num_list.append(float(item.avg_temperature))        # num_list.append([float(x) for x in item.avg_temperature])    index = [i + 0.25 for i in range(0, len(num_list))]    rects = plt.bar(index, num_list, color=['r', 'g', 'b', 'y'], width=0.5)    plt.xticks([i + 0.25 for i in index], name_list, fontproperties=font,fontsize=20)    plt.yticks(fontsize=20)    plt.ylim(ymax=math.ceil(float(max(num_list)))-10, ymin=0)    plt.xlabel("都会", fontproperties=font,fontsize=20)    plt.ylabel("日均匀气温", fontproperties=font,fontsize=20)    plt.title(date + "天下日均匀气温最低前10名", fontproperties=font,fontsize=20)    for rect in rects:        height = rect.get_height()        plt.text(rect.get_x() + rect.get_width() / 2, height + 0.1, str(height), ha="center", va="bottom",fontsize=20)    plt.show()    print("ending drawing the picture of passed temperature")def draw_humidity(humidity_list):    print("begin to draw the picture of passed humidity")    font = FontProperties(fname='ttf/simhei.ttf')    name_list = []    num_list = []    date = humidity_list[0].date    for item in humidity_list:        name_list.append(item.province[0:2] + '\n' + item.city_name)        num_list.append(float(item.avg_humidity))    index = [i + 0.25 for i in range(0, len(num_list))]    rects = plt.bar(index, num_list, color=['r', 'g', 'b', 'y'], width=0.5)    plt.xticks([i + 0.25 for i in index], name_list, fontproperties=font,fontsize=20)    plt.yticks(fontsize=20)    plt.ylim(ymax=math.ceil(float(max(num_list))), ymin=0)    plt.xlabel("都会", fontproperties=font,fontsize=20)    plt.ylabel("日均匀湿度", fontproperties=font,fontsize=20)    plt.title(date + "天下日均匀湿度最低前10名", fontproperties=font,fontsize=20)    for rect in rects:        height = rect.get_height()        plt.text(rect.get_x() + rect.get_width() / 2, height + 0.1, str(height), ha="center", va="bottom", fontsize=20)    plt.show()    print("ending drawing the picture of passed humidity")def draw_windSpeed(windSpeed_list):    print("begin to draw the picture of passed windSpeed")    font = FontProperties(fname='ttf/simhei.ttf')    name_list = []    num_list = []    date = windSpeed_list[0].date    for item in windSpeed_list:        name_list.append(item.province[0:2] + '\n' + item.city_name)        num_list.append(float(item.avg_windSpeed))    index = [i + 0.25 for i in range(0, len(num_list))]    rects = plt.bar(index, num_list, color=['r', 'g', 'b', 'y'], width=0.5)    plt.xticks([i + 0.25 for i in index], name_list, fontproperties=font,fontsize=20)    plt.yticks(fontsize=20)    plt.ylim(ymax=math.ceil(float(max(num_list))), ymin=0)    plt.xlabel("都会", fontproperties=font,fontsize=20)    plt.ylabel("日均匀风速", fontproperties=font,fontsize=20)    plt.title(date + "天下日均匀风速最低前10名", fontproperties=font, fontsize=20)    for rect in rects:        height = rect.get_height()        plt.text(rect.get_x() + rect.get_width() / 2, height + 0.1, str(height), ha="center", va="bottom", fontsize=20)    plt.show()    print("ending drawing the picture of passed windSpeed")def main():    sourcefile = "input/passed_weather_ALL.csv"    if not os.path.exists(sourcefile):        crawler = Crawler()        crawler.run('ALL')    # 过去24小时累计降雨量天下前20名    rain_list = passed_rain_analyse('file:///home/hadoop/PythonCode/SparkAnalysis/' + sourcefile)    draw_rain(rain_list)    # 天下日均匀气温最低前10名    temperature_list = passed_temperature_analyse('file:///home/hadoop/PythonCode/SparkAnalysis/' + sourcefile)    draw_temperature(temperature_list)    # 天下日均匀湿度最低前10名    humidity_list = passed_humidity_analyse('file:///home/hadoop/PythonCode/SparkAnalysis/' + sourcefile)    draw_humidity(humidity_list)    # 天下日均匀风速最低前10名    windSpeed_list = passed_windSpeed_analyse('file:///home/hadoop/PythonCode/SparkAnalysis/' + sourcefile)    draw_windSpeed(windSpeed_list)if __name__ == '__main__':    main()
复制代码
附录:csv文件、源代码等

链接:https://pan.baidu.com/s/1ofcoqahctrHA04lXP4ihog?pwd=nyrf
提取码:nyrf

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

诗林

高级会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表