本实行采用Python语言,从网页爬取情形数据,并使用大数据处理框架Spark对情形数据举行处理分析,并对分析结果举行可视化。
一、实行环境
(1)Linux: Ubuntu 20.04
(2)Python: 3.6
(3)Spark: 3.2.0
(4)pycharm
安装完上述环境以后,为了支持Python可视化分析,还须要实行如下命令安装新的组件:
二、实行数据介绍
本次实行所采用的数据,从中央情形台官方网站(网址:http://www.nmc.cn/)爬取,主要是最近24小时各个都会的天气数据,包括时间点(整点)、整点气温、整点降水量、风力、整点气压、相对湿度等。正常情况下,每个都会会对应24条数据(每个整点一条)。数据规模达到2429个都会,58297条数据,有部分都会部分时间点数据存在缺失或非常。限于本次大作业时间有限,没有办法全面分析这些数据,大作业中主要计算分析了各个都会过去24小时的均匀气暖和降水量情况。
图 1 中央情形台官网
图 2 爬取的表格passed_weather_ALL.csv信息
三、数据获取
1.观察数据获取方式
打开中央情形台官方网站(网址:http://www.nmc.cn/),恣意点击左侧栏“热门都会”中的一个都会。打开火狐(Firefox)浏览器或者谷歌(chrome)浏览器的Web控制台。通过切换“省份”和“都会”,我们可以发现,网页中的数据是以json字符串格式异步地从服务器传送。可以发现以下数据和请求URL的关系。
请求URL 传回数据
http://www.nmc.cn/f/rest/province 省份数据
http://www.nmc.cn/f/rest/province/+省份三位编码 某个省份的都会数据
http://www.nmc.cn/f/rest/passed/+都会编号 某个都会最近24小时整点天气数据
由于省份三位编码(如福建省编码为“ABJ”)须要从省份数据获得中获得,都会编号须要从都会数据获得(如福州市编号为“58847”),所以为了获得各个都会最近24小时整点天气数据,依次爬取省份数据、都会数据、最近24小时整点数据。
2.数据爬取
由于可以直接通过访问请求URL,传回的响应的数据部分即是json格式的数据,所以只须要调用python的urllib.request, urllib.error, urllib.parse库中相干函数,对上述URL举行请求即可。不须要像平常爬取HTML网页时还须要对网页源码举行解析,查找相干数据。唯一须要留意的是,有些都会可能不存在或者全部缺失最近24小时整点数据,须要举行过滤,以免出错。
3.数据存储
虽然上一步获取的json数据可以直接存储并可使用SparkSession直接读取,但是为了方便观察数据布局、辨识非常数据、对数据增加部分提示信息,爬取后的数据举行了一些处理之后,保存成了csv格式,包括省份数据(province.csv)、都会数据(city.csv)、各个都会最近24小时整点天气数据(passed_weather_ALL.csv)。由于所有都会过去24小时整点天气数据数目太多,为了制止内存不敷,每爬取50个都会的数据后,就会举行一次保存。
4.数据读取
因为各个都会最近24小时整点天气数据体量较大,每次爬取须要半小时以上,为了提高实行效率,只会举行一次数据爬取。以后会直接读取第一次实行数据。假如须要重新爬取数据,须要手动删除已有数据,即删除input文件夹下province.csv、city.csv、passed_weather_ALL.csv。
5.数据布局
最后保存的各个都会最近24小时整点天气数据(passed_weather_ALL.csv)每条数据各字段含义如下所示,这里仅列出实行中使用部分。
字段 含义
province 都会地点省份(中文)
city_index 都会序号(计数)
city_name 都会名称(中文)
city_code 都会编号
time 时间点(整点)
temperature 气温
rain1h 过去1小时降雨量
6.爬虫过程截图
开始爬虫
运行中的截图:
爬取完毕:
四、数据分析
数据分析主要使用Spark SQL相干知识与技术,对各个都会过去24小时累积降雨量和当日均匀气温举行了计算和排序。
1.计算各个都会过去24小时累积雨量
思路:按照都会对数据举行分组,对每个都会的rain1h字段举行分组求和。
特别阐明:由于获取数据所需时间较长,天气数据的时间跨度可能略有差别等,这里为了简化操作没有筛选出具有雷同时间跨度的数据再举行计算。
相干步调如下:
(1)创建SparkSession对象spark;
(2)使用spark.read.csv(filename)读取passed_weather_ALL.csv数据天生Dateframe df;
(3)对df举行操作:使用Dateframe的select方法选择province,city_name,city_code,rain1h字段,并使用Column对象的cast(dateType)方法将rain1h转成数值型,再使用Dateframe的filter方法筛选出rain1h小于1000的记录(大于1000是非常数据),得到新的Dateframe df_rain;
(4)对df_rain举行操作:使用Dateframe的groupBy操作按照province,city_name,city_code的字段分组,使用agg方法对rain1h字段举行分组求和得到新的字段rain24h(过去24小时累积雨量),使用sort方法按照rain24h降序分列,经过上述操作得到新的Dateframe df_rain_sum
(5)对df_rain_sum调用cache()方法将此前的转换关系举行缓存,提高性能
(6)对df_rain_sum调用coalesce()将数据分区数目减为1,并使用write.csv(filename)方法将得到的数据长期化到本地文件。
(7)对df_rain_sum调用head()方法取前多少条数据(即24小时累积降水量Top-N的列表)供数据可视化使用。
本部分分析对应的详细代码如下:
- def passed_rain_analyse(filename): # 计算各个城市过去24小时累积雨量
- print("begin to analyse passed rain")
- spark = SparkSession.builder.master("local").appName("passed_rain_analyse").getOrCreate()
- df = spark.read.csv(filename, header=True)
- df_rain = df.select(df['province'], df['city_name'], df['city_code'], df['rain1h'].cast(DecimalType(scale=1))) \
- .filter(df['rain1h'] < 1000) # 筛选数据,去除无效数据
- df_rain_sum = df_rain.groupBy("province", "city_name", "city_code") \
- .agg(F.sum("rain1h").alias("rain24h")) \
- .sort(F.desc("rain24h")) # 分组、求和、排序
- df_rain_sum.cache()
- df_rain_sum.coalesce(1).write.csv("file:///home/hadoop/PythonCode/SparkAnalysis/passed_rain_analyse.csv")
- print("end analysing passed rain")
- return df_rain_sum.head(20)
复制代码 2.计算各个都会当日均匀气温
思路:根据国家尺度(《地面情形服务观测规范》),日均匀气温取四时次数据的均匀值,四时次数据为:02时、08时、14时、20时。据此,应该先筛选出各个时次的气温数据,再按照都会对数据举行分组,对每个都会的tempeature字段举行分组求均匀。
特别阐明:为了能获取到上述一天的四个时次的天气数据,建议在当天的20时30分后再爬取数据。
相干步调如下:
(1)创建SparkSession对象spark;
(2)使用spark.read.csv(filename)读取passed_weather_ALL.csv数据天生Dateframe df;
(3)对df举行操作:使用Dateframe的select方法选择province,city_name,city_code,temperature字段,并使用库pyspark.sql.functions中的date_format(col,pattern)方法和hour(col)将time字段转换成date(日期)字段和hour(小时)字段,(time字段的分秒信息无用),,得到新的Dateframe df_temperature;
(4)对df_temperature举行操作:使用Dateframe的filter操作过滤出hour字段在[2,8,14,20]中的记录,经过上述操作得到新的Dateframe df_4point_temperature
(5)对df_4point_temperature举行操作:使用Dateframe的groupBy操作按照province,city_name,city_code,date字段分组,使用agg方法对temperature字段举行分组计数和求和(求和字段定名为avg_temperature),使用filter方法过滤出分组计数为4的记录(确保有4个时次才能计算日均匀温),使用sort方法按照avg_temperature降序分列,再筛选出须要保存的字段province,city_name,city_code,date,avg_temperature(趁便使用库pyspark.sql.functions中的format_number(col,precision)方法保存一位小数),经过上述操作得到新的Dateframe df_avg_temperature
(6)对df_avg_temperature调用cache()方法将此前的转换关系举行缓存,提高性能
(7)对df_avg_temperature调用coalesce()将数据分区数目减为1,并使用write.json(filename)方法将得到的数据长期化到本地文件。
(8)对df_rain_sum调用collect()方法取将Dateframe转换成list,方便后续举行数据可视化。
本部分分析对应的详细代码如下:
- def passed_temperature_analyse(filename):
- print("begin to analyse passed temperature")
- spark = SparkSession.builder.master("local").appName("passed_temperature_analyse").getOrCreate()
- df = spark.read.csv(filename, header=True)
- df_temperature = df.select( # 选择需要的列
- df['province'],
- df['city_name'],
- df['city_code'],
- df['temperature'].cast(DecimalType(scale=1)),
- F.date_format(df['time'], "yyyy-MM-dd").alias("date"), # 得到日期数据
- F.hour(df['time']).alias("hour") # 得到小时数据
- )
- # 筛选四点时次
- df_4point_temperature = df_temperature.filter(df_temperature['hour'].isin([2, 8, 12, 20]))
- # df_4point_temperature.printSchema()
- df_avg_temperature = df_4point_temperature.groupBy("province", "city_name", "city_code", "date") \
- .agg(F.count("temperature"), F.avg("temperature").alias("avg_temperature")) \
- .filter("count(temperature) = 4") \
- .sort(F.asc("avg_temperature")) \
- .select("province", "city_name", "city_code", "date",
- F.format_number('avg_temperature', 1).alias("avg_temperature"))
- df_avg_temperature.cache()
- avg_temperature_list = df_avg_temperature.collect()
- df_avg_temperature.coalesce(1).write.json("file:///home/hadoop/PythonCode/SparkAnalysis/passed_temperature_analyse.json")
- print("end analysing passed temperature")
- return avg_temperature_list[0:10]
复制代码 3.计算各个都会当日均匀湿度
详细步调与计算计算各个都会当日均匀气温类似;
代码:
- def passed_humidity_analyse(filename):
- print("begin to analyse passed humidity")
- spark = SparkSession.builder.master("local").appName("passed_humidity_analyse").getOrCreate()
- df = spark.read.csv(filename, header=True)
- df_humidity = df.select( # 选择需要的列
- df['province'],
- df['city_name'],
- df['city_code'],
- df['humidity'].cast(DecimalType(scale=1)),
- F.date_format(df['time'], "yyyy-MM-dd").alias("date"), # 得到日期数据
- F.hour(df['time']).alias("hour") # 得到小时数据
- )
- # 筛选四点时次
- df_4point_humidity = df_humidity.filter(df_humidity['hour'].isin([2, 8, 12, 20]))
- # df_4point_humidity.printSchema()
- df_avg_humidity = df_4point_humidity.groupBy("province", "city_name", "city_code", "date") \
- .agg(F.count("humidity"), F.avg("humidity").alias("avg_humidity")) \
- .filter("count(humidity) = 4") \
- .sort(F.asc("avg_humidity")) \
- .select("province", "city_name", "city_code", "date",
- F.format_number('avg_humidity', 1).alias("avg_humidity"))
- df_avg_humidity.cache()
- avg_humidity_list = df_avg_humidity.collect()
- df_avg_humidity.coalesce(1).write.json("file:///home/hadoop/PythonCode/SparkAnalysis/passed_humidity_analyse.json")
- print("end analysing passed analyse")
- return avg_humidity_list[0:10]
复制代码 4.计算各个都会当日均匀风速
详细步调与计算计算各个都会当日均匀气温类似;
代码:
- def passed_windSpeed_analyse(filename):
- print("begin to analyse passed windSpeed")
- spark = SparkSession.builder.master("local").appName("passed_windSpeed_analyse").getOrCreate()
- df = spark.read.csv(filename, header=True)
- df_windSpeed = df.select( # 选择需要的列
- df['province'],
- df['city_name'],
- df['city_code'],
- df['windSpeed'].cast(DecimalType(scale=1)),
- F.date_format(df['time'], "yyyy-MM-dd").alias("date"), # 得到日期数据
- F.hour(df['time']).alias("hour") # 得到小时数据
- )
- # 筛选四点时次
- df_4point_windSpeed = df_windSpeed.filter(df_windSpeed['hour'].isin([2, 8, 12, 20]))
- # df_4point_windSpeed.printSchema()
- df_avg_windSpeed = df_4point_windSpeed.groupBy("province", "city_name", "city_code", "date") \
- .agg(F.count("windSpeed"), F.avg("windSpeed").alias("avg_windSpeed")) \
- .filter("count(windSpeed) = 4") \
- .sort(F.asc("avg_windSpeed")) \
- .select("province", "city_name", "city_code", "date",
- F.format_number('avg_windSpeed', 1).alias("avg_windSpeed"))
- df_avg_windSpeed.cache()
- avg_windSpeed_list = df_avg_windSpeed.collect()
- df_avg_windSpeed.coalesce(1).write.json("file:///home/hadoop/PythonCode/SparkAnalysis/passed_windSpeed_analyse.json")
- print("end analysing passed windSpeed")
- return avg_windSpeed_list[0:10]
复制代码 五、数据可视化
数据可视化使用python matplotlib库。可使用pip命令安装。也可以pycharm直接安装。
绘制过程大要如下:
第一步,应当设置字体,这里提供了黑体的字体文件simhei.tff。否则坐标轴等出现中文的地方是乱码。
第二步,设置数据(累积雨量或者日均匀气温)和横轴坐标(都会名称),配置直方图。
第三步,配置横轴坐标位置,设置纵轴坐标范围
第四步,配置横纵坐标标签
第五步,配置每个条形图上方显示的数据
第六步,根据上述配置,画出直方图。
画图部分对应的运行截图如下:
保存的matplotlib作的图:
六、数据以及源代码
1.爬取的数据截图:
city.csv
passed_weather_ALL.csv
province.csv
2.爬虫代码
- # -*- coding: utf-8 -*-
- import urllib.request, urllib.error, urllib.parse
- import json
- import csv
- import os
- import time
- import random
- import socket
- class Crawler:
- def get_html(self, url):
- head = {
- "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36"
- }
- request = urllib.request.Request(url, headers=head)
- NET_STATUS = False
- while not NET_STATUS:
- try:
- # url = 'http://www.nmc.cn/f/rest/province'
- response = urllib.request.urlopen(request, timeout=5)
- html = response.read().decode("utf-8")
- return html
- except socket.timeout:
- print('NET_STATUS is not good')
- NET_STATUS = False
- # def get_html(self, url): # 得到指定一个URL的网页内容
- # head = {
- # "User-Agent": "Mozilla/5.0(Windows NT 10.0;Win64;x64) AppleWebKit/537.36(KHTML, likeGecko) Chrome / 89.0.4389.90Safari / 537.36"
- # }
- # request = urllib.request.Request(url, headers=head)
- # try:
- # response = urllib.request.urlopen(request, timeout=5)
- # html = response.read().decode("utf-8")
- # # print(html)
- # except urllib.error.URLError as e:
- # if hasattr(e, "code"):
- # print(e.code)
- # if hasattr(e, "reason"):
- # print(e.reason)
- # return html
- def parse_json(self, url):
- obj = self.get_html(url)
- if obj:
- json_obj = json.loads(obj)
- else:
- json_obj = list()
- # print json_obj
- # for obj in json_obj:
- # print obj
- # print chardet.detect(obj['name'])
- return json_obj
- # soup = BeautifulSoup(html_doc,"html.parser",from_encoding='utf-8')
- # links = soup.find_all('a')
- # print "all links"
- # for link in links:
- # print link.name,link['href']
- def write_csv(self, file, data):
- if data:
- print("begin to write to " + file)
- with open(file, 'a+', newline='') as f:
- # f.write(codecs.BOM_UTF8)
- f_csv = csv.DictWriter(f, list(data[0].keys()))
- if not os.path.exists(file):
- f_csv.writeheader()
- f_csv.writerows(data)
- # writerows()将一个二维列表中的每一个列表写为一行。
- print("end to write to " + file)
- def write_header(self, file, data):
- if data:
- print("begin to write to " + file)
- with open(file, 'a+', newline='') as f:
- # f.write(codecs.BOM_UTF8)
- f_csv = csv.DictWriter(f, list(data[0].keys()))
- f_csv.writeheader()
- f_csv.writerows(data)
- print("end to write to " + file)
- def write_row(self, file, data):
- if data:
- print("begin to write to " + file)
- with open(file, 'a+', newline='') as f:
- # f.write(codecs.BOM_UTF8)
- f_csv = csv.DictWriter(f, list(data[0].keys()))
- # f_csv.writeheader()
- f_csv.writerows(data)
- print("end to write to " + file)
- def read_csv(self, file):
- print("begin to read " + file)
- with open(file, 'r') as f:
- data = csv.DictReader(f)
- print("end to read " + file)
- return list(data)
- def get_provinces(self):
- province_file = 'input/province.csv'
- if not os.path.exists(province_file): # 如果没有省份文件,开始爬取城市信息
- print("begin crawl province")
- provinces = self.parse_json('http://www.nmc.cn/f/rest/province')
- print("end crawl province")
- self.write_csv(province_file, provinces)
- else:
- provinces = self.read_csv(province_file)
- return provinces
- def get_cities(self): # 获取城市
- city_file = 'input/city.csv'
- if not os.path.exists(city_file): # 如果没有城市文件,开始爬取城市信息
- cities = list()
- print("begin crawl city")
- for province in self.get_provinces(): # 循环34个省份
- print(province['name'])
- url = province['url'].split('/')[-1].split('.')[0]
- cities.extend(self.parse_json('http://www.nmc.cn/f/rest/province/' + url))
- self.write_csv(city_file, cities)
- print("end crawl city")
- else:
- cities = self.read_csv(city_file)
- return cities
- def get_passed_weather(self, province):
- weather_passed_file = 'input/passed_weather_' + province + '.csv'
- if os.path.exists(weather_passed_file):
- return
- passed_weather = list()
- count = 0
- if province == 'ALL':
- print("begin crawl passed weather") # 开始爬取历史天气
- for city in self.get_cities():
- print(city['province'] + ' ' + city['city'] + ' ' + city['code'])
- data = self.parse_json('http://www.nmc.cn/f/rest/passed/' + city['code'])
- if data:
- count = count + 1
- for item in data:
- item['city_code'] = city['code']
- item['province'] = city['province']
- item['city_name'] = city['city']
- item['city_index'] = str(count)
- passed_weather.extend(data)
- # time.sleep(random.random())
- time.sleep(0.8)
- if count % 50 == 0:
- if count == 50:
- self.write_header(weather_passed_file, passed_weather)
- else:
- self.write_row(weather_passed_file, passed_weather)
- passed_weather = list()
- if passed_weather:
- if count <= 50:
- self.write_header(weather_passed_file, passed_weather)
- else:
- self.write_row(weather_passed_file, passed_weather)
- print("end crawl passed weather")
- else:
- print("begin crawl passed weather")
- select_city = [x for x in self.get_cities() if x['province'] == province]
- for city in select_city:
- print(city['province'] + ' ' + city['city'] + ' ' + city['code'])
- data = self.parse_json('http://www.nmc.cn/f/rest/passed/' + city['code'])
- if data:
- count = count + 1
- for item in data:
- item['city_index'] = str(count)
- item['city_code'] = city['code']
- item['province'] = city['province']
- item['city_name'] = city['city']
- passed_weather.extend(data)
- # time.sleep(1)
- self.write_csv(weather_passed_file, passed_weather)
- print("end crawl passed weather")
- def run(self, range='ALL'):
- self.get_passed_weather(range)
- def main():
- crawler = Crawler()
- crawler.run("ALL")
- if __name__ == '__main__':
- main()
复制代码 3.Spark分析代码:
- # coding:utf-8from pyspark.sql import SparkSessionfrom pyspark.sql import functions as Ffrom pyspark.sql.types import DecimalType, TimestampTypeimport matplotlib as mplimport matplotlib.pyplot as pltfrom matplotlib.font_manager import FontPropertiesimport osimport mathfrom Crawler import *def passed_rain_analyse(filename): # 计算各个城市过去24小时累积雨量
- print("begin to analyse passed rain")
- spark = SparkSession.builder.master("local").appName("passed_rain_analyse").getOrCreate()
- df = spark.read.csv(filename, header=True)
- df_rain = df.select(df['province'], df['city_name'], df['city_code'], df['rain1h'].cast(DecimalType(scale=1))) \
- .filter(df['rain1h'] < 1000) # 筛选数据,去除无效数据
- df_rain_sum = df_rain.groupBy("province", "city_name", "city_code") \
- .agg(F.sum("rain1h").alias("rain24h")) \
- .sort(F.desc("rain24h")) # 分组、求和、排序
- df_rain_sum.cache()
- df_rain_sum.coalesce(1).write.csv("file:///home/hadoop/PythonCode/SparkAnalysis/passed_rain_analyse.csv")
- print("end analysing passed rain")
- return df_rain_sum.head(20)
- def passed_temperature_analyse(filename):
- print("begin to analyse passed temperature")
- spark = SparkSession.builder.master("local").appName("passed_temperature_analyse").getOrCreate()
- df = spark.read.csv(filename, header=True)
- df_temperature = df.select( # 选择需要的列
- df['province'],
- df['city_name'],
- df['city_code'],
- df['temperature'].cast(DecimalType(scale=1)),
- F.date_format(df['time'], "yyyy-MM-dd").alias("date"), # 得到日期数据
- F.hour(df['time']).alias("hour") # 得到小时数据
- )
- # 筛选四点时次
- df_4point_temperature = df_temperature.filter(df_temperature['hour'].isin([2, 8, 12, 20]))
- # df_4point_temperature.printSchema()
- df_avg_temperature = df_4point_temperature.groupBy("province", "city_name", "city_code", "date") \
- .agg(F.count("temperature"), F.avg("temperature").alias("avg_temperature")) \
- .filter("count(temperature) = 4") \
- .sort(F.asc("avg_temperature")) \
- .select("province", "city_name", "city_code", "date",
- F.format_number('avg_temperature', 1).alias("avg_temperature"))
- df_avg_temperature.cache()
- avg_temperature_list = df_avg_temperature.collect()
- df_avg_temperature.coalesce(1).write.json("file:///home/hadoop/PythonCode/SparkAnalysis/passed_temperature_analyse.json")
- print("end analysing passed temperature")
- return avg_temperature_list[0:10]
- def passed_humidity_analyse(filename):
- print("begin to analyse passed humidity")
- spark = SparkSession.builder.master("local").appName("passed_humidity_analyse").getOrCreate()
- df = spark.read.csv(filename, header=True)
- df_humidity = df.select( # 选择需要的列
- df['province'],
- df['city_name'],
- df['city_code'],
- df['humidity'].cast(DecimalType(scale=1)),
- F.date_format(df['time'], "yyyy-MM-dd").alias("date"), # 得到日期数据
- F.hour(df['time']).alias("hour") # 得到小时数据
- )
- # 筛选四点时次
- df_4point_humidity = df_humidity.filter(df_humidity['hour'].isin([2, 8, 12, 20]))
- # df_4point_humidity.printSchema()
- df_avg_humidity = df_4point_humidity.groupBy("province", "city_name", "city_code", "date") \
- .agg(F.count("humidity"), F.avg("humidity").alias("avg_humidity")) \
- .filter("count(humidity) = 4") \
- .sort(F.asc("avg_humidity")) \
- .select("province", "city_name", "city_code", "date",
- F.format_number('avg_humidity', 1).alias("avg_humidity"))
- df_avg_humidity.cache()
- avg_humidity_list = df_avg_humidity.collect()
- df_avg_humidity.coalesce(1).write.json("file:///home/hadoop/PythonCode/SparkAnalysis/passed_humidity_analyse.json")
- print("end analysing passed analyse")
- return avg_humidity_list[0:10]
- def passed_windSpeed_analyse(filename):
- print("begin to analyse passed windSpeed")
- spark = SparkSession.builder.master("local").appName("passed_windSpeed_analyse").getOrCreate()
- df = spark.read.csv(filename, header=True)
- df_windSpeed = df.select( # 选择需要的列
- df['province'],
- df['city_name'],
- df['city_code'],
- df['windSpeed'].cast(DecimalType(scale=1)),
- F.date_format(df['time'], "yyyy-MM-dd").alias("date"), # 得到日期数据
- F.hour(df['time']).alias("hour") # 得到小时数据
- )
- # 筛选四点时次
- df_4point_windSpeed = df_windSpeed.filter(df_windSpeed['hour'].isin([2, 8, 12, 20]))
- # df_4point_windSpeed.printSchema()
- df_avg_windSpeed = df_4point_windSpeed.groupBy("province", "city_name", "city_code", "date") \
- .agg(F.count("windSpeed"), F.avg("windSpeed").alias("avg_windSpeed")) \
- .filter("count(windSpeed) = 4") \
- .sort(F.asc("avg_windSpeed")) \
- .select("province", "city_name", "city_code", "date",
- F.format_number('avg_windSpeed', 1).alias("avg_windSpeed"))
- df_avg_windSpeed.cache()
- avg_windSpeed_list = df_avg_windSpeed.collect()
- df_avg_windSpeed.coalesce(1).write.json("file:///home/hadoop/PythonCode/SparkAnalysis/passed_windSpeed_analyse.json")
- print("end analysing passed windSpeed")
- return avg_windSpeed_list[0:10]
- def draw_rain(rain_list): print("begin to draw the picture of passed rain") font = FontProperties(fname='ttf/simhei.ttf') # 设置字体 name_list = [] num_list = [] for item in rain_list: name_list.append(item.province[0:2] + '\n' + item.city_name) num_list.append(item.rain24h) index = [i + 0.25 for i in range(0, len(num_list))] rects = plt.bar(index, num_list, color=['r', 'g', 'b', 'y'], width=0.5) plt.xticks([i + 0.25 for i in index], name_list, fontproperties=font,fontsize=20) plt.yticks(fontsize=20) plt.ylim(ymax=(int(max(num_list)+10) / 100) * 100, ymin=0) plt.xlabel("都会", fontproperties=font,fontsize=20) plt.ylabel("雨量", fontproperties=font,fontsize=20) plt.title("过去24小时累计降雨量天下前20名", fontproperties=font,fontsize=20) for rect in rects: height = rect.get_height() plt.text(rect.get_x() + rect.get_width() / 2, height + 1, str(height), ha="center", va="bottom",fontsize=20) plt.show() print("ending drawing the picture of passed rain")def draw_temperature(temperature_list): print("begin to draw the picture of passed temperature") font = FontProperties(fname='ttf/simhei.ttf') name_list = [] num_list = [] date = temperature_list[0].date for item in temperature_list: name_list.append(item.province[0:2] + '\n' + item.city_name) # item.avg_temperature = item.avg_temperature.replace(',', '') num_list.append(float(item.avg_temperature)) # num_list.append([float(x) for x in item.avg_temperature]) index = [i + 0.25 for i in range(0, len(num_list))] rects = plt.bar(index, num_list, color=['r', 'g', 'b', 'y'], width=0.5) plt.xticks([i + 0.25 for i in index], name_list, fontproperties=font,fontsize=20) plt.yticks(fontsize=20) plt.ylim(ymax=math.ceil(float(max(num_list)))-10, ymin=0) plt.xlabel("都会", fontproperties=font,fontsize=20) plt.ylabel("日均匀气温", fontproperties=font,fontsize=20) plt.title(date + "天下日均匀气温最低前10名", fontproperties=font,fontsize=20) for rect in rects: height = rect.get_height() plt.text(rect.get_x() + rect.get_width() / 2, height + 0.1, str(height), ha="center", va="bottom",fontsize=20) plt.show() print("ending drawing the picture of passed temperature")def draw_humidity(humidity_list): print("begin to draw the picture of passed humidity") font = FontProperties(fname='ttf/simhei.ttf') name_list = [] num_list = [] date = humidity_list[0].date for item in humidity_list: name_list.append(item.province[0:2] + '\n' + item.city_name) num_list.append(float(item.avg_humidity)) index = [i + 0.25 for i in range(0, len(num_list))] rects = plt.bar(index, num_list, color=['r', 'g', 'b', 'y'], width=0.5) plt.xticks([i + 0.25 for i in index], name_list, fontproperties=font,fontsize=20) plt.yticks(fontsize=20) plt.ylim(ymax=math.ceil(float(max(num_list))), ymin=0) plt.xlabel("都会", fontproperties=font,fontsize=20) plt.ylabel("日均匀湿度", fontproperties=font,fontsize=20) plt.title(date + "天下日均匀湿度最低前10名", fontproperties=font,fontsize=20) for rect in rects: height = rect.get_height() plt.text(rect.get_x() + rect.get_width() / 2, height + 0.1, str(height), ha="center", va="bottom", fontsize=20) plt.show() print("ending drawing the picture of passed humidity")def draw_windSpeed(windSpeed_list): print("begin to draw the picture of passed windSpeed") font = FontProperties(fname='ttf/simhei.ttf') name_list = [] num_list = [] date = windSpeed_list[0].date for item in windSpeed_list: name_list.append(item.province[0:2] + '\n' + item.city_name) num_list.append(float(item.avg_windSpeed)) index = [i + 0.25 for i in range(0, len(num_list))] rects = plt.bar(index, num_list, color=['r', 'g', 'b', 'y'], width=0.5) plt.xticks([i + 0.25 for i in index], name_list, fontproperties=font,fontsize=20) plt.yticks(fontsize=20) plt.ylim(ymax=math.ceil(float(max(num_list))), ymin=0) plt.xlabel("都会", fontproperties=font,fontsize=20) plt.ylabel("日均匀风速", fontproperties=font,fontsize=20) plt.title(date + "天下日均匀风速最低前10名", fontproperties=font, fontsize=20) for rect in rects: height = rect.get_height() plt.text(rect.get_x() + rect.get_width() / 2, height + 0.1, str(height), ha="center", va="bottom", fontsize=20) plt.show() print("ending drawing the picture of passed windSpeed")def main(): sourcefile = "input/passed_weather_ALL.csv" if not os.path.exists(sourcefile): crawler = Crawler() crawler.run('ALL') # 过去24小时累计降雨量天下前20名 rain_list = passed_rain_analyse('file:///home/hadoop/PythonCode/SparkAnalysis/' + sourcefile) draw_rain(rain_list) # 天下日均匀气温最低前10名 temperature_list = passed_temperature_analyse('file:///home/hadoop/PythonCode/SparkAnalysis/' + sourcefile) draw_temperature(temperature_list) # 天下日均匀湿度最低前10名 humidity_list = passed_humidity_analyse('file:///home/hadoop/PythonCode/SparkAnalysis/' + sourcefile) draw_humidity(humidity_list) # 天下日均匀风速最低前10名 windSpeed_list = passed_windSpeed_analyse('file:///home/hadoop/PythonCode/SparkAnalysis/' + sourcefile) draw_windSpeed(windSpeed_list)if __name__ == '__main__': main()
复制代码 附录:csv文件、源代码等
链接:https://pan.baidu.com/s/1ofcoqahctrHA04lXP4ihog?pwd=nyrf
提取码:nyrf
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |