解决方法就是通过 aiohttp + asyncio,什么是aiohttp?一个基于 asyncio 的异步 HTTP 网络模块,可用于实现异步爬虫,速度明显快于 requests 的同步爬虫。
# author: 钢铁知识库
- import time
- import requests
- # 同步请求
- def main():
- start = time.time()
- for i in range(5):
- res = requests.get('http://httpbin.org/delay/2')
- print(f'当前时间:{datetime.datetime.now()}, status_code = {res.status_code}')
- print(f'requests同步耗时:{time.time() - start}')
- if __name__ == '__main__':
- main()
- '''
- 当前时间:2022-09-05 15:44:51.991685, status_code = 200
- 当前时间:2022-09-05 15:44:54.528918, status_code = 200
- 当前时间:2022-09-05 15:44:57.057373, status_code = 200
- 当前时间:2022-09-05 15:44:59.643119, status_code = 200
- 当前时间:2022-09-05 15:45:02.167362, status_code = 200
- requests同步耗时:12.785893440246582
- '''
复制代码 可以看到5次请求总共用12.7秒,再来看同样的请求异步多少时间。
- import asyncio
- import time
- import aiohttp
- async def async_http():
- # 声明一个支持异步的上下文管理器
- async with aiohttp.ClientSession() as session:
- res = await session.get('http://httpbin.org/delay/2')
- print(f'当前时间:{datetime.datetime.now()}, status_code = {res.status}')
- tasks = [async_http() for _ in range(5)]
- start = time.time()
- # Python 3.7 及以后,不需要显式声明事件循环,可以使用 asyncio.run()来代替最后的启动操作
- asyncio.run(asyncio.wait(tasks))
- print(f'aiohttp异步耗时:{time.time() - start}')
- '''
- 当前时间:2022-09-05 15:42:32.363966, status_code = 200
- 当前时间:2022-09-05 15:42:32.366957, status_code = 200
- 当前时间:2022-09-05 15:42:32.374973, status_code = 200
- 当前时间:2022-09-05 15:42:32.384909, status_code = 200
- 当前时间:2022-09-05 15:42:32.390318, status_code = 200
- aiohttp异步耗时:2.5826876163482666
- '''
复制代码 两次对比可以看到执行过程,时间一个是顺序执行,一个是同时执行。这就是同步和异步的区别。
接下来我们会详细介绍aiohttp库的用法和爬取实战。aiohttp 是一个支持异步请求的库,它和 asyncio 配合使用,可以使我们非常方便地实现异步请求操作。asyncio模块,其内部实现了对TCP、UDP、SSL协议的异步操作,但是对于HTTP请求,就需要aiohttp实现了。
- import asyncio
- import aiohttp
- async def get_api(session, url):
- # 声明一个支持异步的上下文管理器
- async with session.get(url) as response:
- return await response.text(), response.status
- async def main():
- async with aiohttp.ClientSession() as session:
- html, status = await get_api(session, 'http://httpbin.org/delay/2')
- print(f'html: {html[:50]}')
- print(f'status : {status}')
- if __name__ == '__main__':
- # Python 3.7 及以后,不需要显式声明事件循环,可以使用 asyncio.run(main())来代替最后的启动操作
- asyncio.get_event_loop().run_until_complete(main())
- '''
- html: {
- "args": {},
- "data": "",
- "files": {},
- status : 200
- Process finished with exit code 0
- '''
复制代码 aiohttp请求的方法和之前有明显区别,主要包括如下几点:
- 除了导入aiohttp库,还必须引入asyncio库,因为要实现异步,需要启动协程。
- 异步的方法定义不同,前面都要统一加async来修饰。
- with as用于声明上下文管理器,帮我们自动分配和释放资源,加上async代码支持异步。
- 对于返回协程对象的操作,前面需要加await来修饰。response.text()返回的是协程对象。
- 最后运行启用循环事件
- import aiohttp
- import asyncio
- async def main():
- params = {'name': '钢铁知识库', 'age': 23}
- async with aiohttp.ClientSession() as session:
- async with session.get('https://www.httpbin.org/get', params=params) as res:
- print(await res.json())
- if __name__ == '__main__':
- asyncio.get_event_loop().run_until_complete(main())
- '''
- {'args': {'age': '23', 'name': '钢铁知识库'}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'www.httpbin.org', 'User-Agent': 'Python/3.8 aiohttp/3.8.1', 'X-Amzn-Trace-Id': 'Root=1-63162e34-1acf7bde7a6d801368494c72'}, 'origin': '', 'url': 'https://www.httpbin.org/get?name=钢铁知识库&age=23'}
- '''
复制代码 可以看到实际请求的URL后面带了后缀,这就是params的内容。
除了get请求,aiohttp还支持其它请求类型,如POST、PUT、DELETE等,和requests使用方式类似。- session.post('http://httpbin.org/post', data=b'data')
- session.put('http://httpbin.org/put', data=b'data')
- session.delete('http://httpbin.org/delete')
- session.head('http://httpbin.org/get')
- session.options('http://httpbin.org/get')
- session.patch('http://httpbin.org/patch', data=b'data')
复制代码 要使用这些方法,只需要把对应的方法和参数替换一下。用法和get类似就不再举例。
对于响应来说,我们可以用如下方法分别获取其中的响应情况。状态码、响应头、响应体、响应体二进制内容、响应体JSON结果,实例如下:- #!/usr/bin/env python
- import aiohttp
- import asyncio
- async def main():
- data = {'name': '钢铁知识库', 'age': 23}
- async with aiohttp.ClientSession() as session:
- async with session.post('https://www.httpbin.org/post', data=data) as response:
- print('status:', response.status) # 状态码
- print('headers:', response.headers) # 响应头
- print('body:', await response.text()) # 响应体
- print('bytes:', await response.read()) # 响应体二进制内容
- print('json:', await response.json()) # 响应体json数据
- if __name__ == '__main__':
- asyncio.get_event_loop().run_until_complete(main())
复制代码- '''
- status: 200
- headers: <CIMultiDictProxy('Date': 'Tue, 06 Sep 2022 00:18:36 GMT', 'Content-Type': 'application/json', 'Content-Length': '534', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true')>
- body: {
- "args": {},
- "data": "",
- "files": {},
- "form": {
- "age": "23",
- "name": "\u94a2\u94c1\u77e5\u8bc6\u5e93"
- },
- "headers": {
- "Accept": "*/*",
- "Accept-Encoding": "gzip, deflate",
- "Content-Length": "57",
- "Content-Type": "application/x-www-form-urlencoded",
- "Host": "www.httpbin.org",
- "User-Agent": "Python/3.8 aiohttp/3.8.1",
- "X-Amzn-Trace-Id": "Root=1-631691dc-6aa1b2b85045a1a0481d06e1"
- },
- "json": null,
- "origin": "",
- "url": "https://www.httpbin.org/post"
- }
- bytes: b'{\n "args": {}, \n "data": "", \n "files": {}, \n "form": {\n "age": "23", \n "name": "\\u94a2\\u94c1\\u77e5\\u8bc6\\u5e93"\n }, \n "headers": {\n "Accept": "*/*", \n "Accept-Encoding": "gzip, deflate", \n "Content-Length": "57", \n "Content-Type": "application/x-www-form-urlencoded", \n "Host": "www.httpbin.org", \n "User-Agent": "Python/3.8 aiohttp/3.8.1", \n "X-Amzn-Trace-Id": "Root=1-631691dc-6aa1b2b85045a1a0481d06e1"\n }, \n "json": null, \n "origin": "", \n "url": "https://www.httpbin.org/post"\n}\n'
- json: {'args': {}, 'data': '', 'files': {}, 'form': {'age': '23', 'name': '钢铁知识库'}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Content-Length': '57', 'Content-Type': 'application/x-www-form-urlencoded', 'Host': 'www.httpbin.org', 'User-Agent': 'Python/3.8 aiohttp/3.8.1', 'X-Amzn-Trace-Id': 'Root=1-631691dc-6aa1b2b85045a1a0481d06e1'}, 'json': None, 'origin': '', 'url': 'https://www.httpbin.org/post'}
- '''
复制代码 可以看到有些字段前面需要加await,因为其返回的是一个协程对象(如async修饰的方法),那么前面就要加await。
我们可以借助ClientTimeout对象设置超时,例如要设置1秒的超时时间,可以这么实现:- #!/usr/bin/env python
- import asyncio
- async def main():
- # 设置 1 秒的超时
- timeout = aiohttp.ClientTimeout(total=1)
- data = {'name': '钢铁知识库', 'age': 23}
- async with aiohttp.ClientSession(timeout=timeout) as session:
- async with session.get('https://www.httpbin.org/delay/2', data=data) as response:
- print('status:', response.status) # 状态码
- if __name__ == '__main__':
- asyncio.get_event_loop().run_until_complete(main())
- '''
- Traceback (most recent call last):
- ####中间省略####
- raise asyncio.TimeoutError from None
- asyncio.exceptions.TimeoutError
- '''
复制代码 这里设置了超时1秒请求延时2秒,发现抛出异常asyncio.TimeoutError,如果正常则响应200。
aiohttp可以支持非常高的并发量,但面对高并发网站可能会承受不住,随时有挂掉的危险,这时需要对并发进行一些控制。现在我们借助asyncio 的Semaphore来控制并发量,实例如下:- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # @Author : 钢铁知识库
- import asyncio
- from datetime import datetime
- import aiohttp
- # 声明最大并发量
- semaphore = asyncio.Semaphore(2)
- async def get_api():
- async with semaphore:
- print(f'scrapting...{datetime.now()}')
- async with session.get('https://www.baidu.com') as response:
- await asyncio.sleep(2)
- # print(f'当前时间:{datetime.now()}, {response.status}')
- async def main():
- global session
- session = aiohttp.ClientSession()
- tasks = [asyncio.ensure_future(get_api()) for _ in range(1000)]
- await asyncio.gather(*tasks)
- await session.close()
- if __name__ == '__main__':
- asyncio.get_event_loop().run_until_complete(main())
- '''
- scrapting...2022-09-07 08:11:14.190000
- scrapting...2022-09-07 08:11:14.292000
- scrapting...2022-09-07 08:11:16.482000
- scrapting...2022-09-07 08:11:16.504000
- scrapting...2022-09-07 08:11:18.520000
- scrapting...2022-09-07 08:11:18.521000
- '''
复制代码 在main方法里,我们声明了1000个task,如果没有通过Semaphore进行并发限制,那这1000放到gather方法后会被同时执行,并发量相当大。有了信号量的控制之后,同时运行的task数量就会被控制,这样就能给aiohttp限制速度了。
- import asyncio
- import json,re
- import logging
- import aiohttp
- import requests
- from utils.conn_db import ConnDb
- # 日志格式
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
- # 章节目录api
- b_id = '4308080950'
- url = 'https://dushu.baidu.com/api/pc/getCatalog?data={"book_id":"'+b_id+'"}'
- headers = {
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
- "Chrome/ Safari/537.36"
- }
- # 并发声明
- semaphore = asyncio.Semaphore(5)
- async def download(title,b_id, cid):
- data = {
- "book_id": b_id,
- "cid": f'{b_id}|{cid}',
- }
- data = json.dumps(data)
- detail_url = 'https://dushu.baidu.com/api/pc/getChapterContent?data={}'.format(data)
- async with semaphore:
- async with aiohttp.ClientSession(headers=headers) as session:
- async with session.get(detail_url) as response:
- res = await response.json()
- content = {
- 'title': title,
- 'content': res['data']['novel']['content']
- }
- # print(title)
- await save_data(content)
- async def save_data(data):
- if data:
- client = ConnDb().conn_motor_mongo()
- db = client.baidu_novel
- collection = db.novel
- logging.info('saving data %s', data)
- await collection.update_one(
- {'title': data.get('title')},
- {'$set': data},
- upsert=True
- )
- async def main():
- res = requests.get(url, headers=headers)
- tasks = []
- for re in res.json()['data']['novel']['items']: # 拿到某小说目录cid
- title = re['title']
- cid = re['cid']
- tasks.append(download(title, b_id, cid)) # 将请求放到列表里,再通过gather执行并发
- await asyncio.gather(*tasks)
- if __name__ == '__main__':
- asyncio.run(main())
复制代码 至此,我们就使用aiohttp完成了对小说章节的爬取。
要实现异步处理,得先要有挂起操作,当一个任务需要等待 IO 结果的时候,可以挂起当前任务,转而去执行其他任务,这样才能充分利用好资源,要实现异步,需要了解 await 的用法,使用 await 可以将耗时等待的操作挂起,让出控制权。当协程执行的时候遇到 await,时间循环就会将本协程挂起,转而去执行别的协程,直到其他的协程挂起或执行完毕。
await 后面的对象必须是如下格式之一:
- A native coroutine object returned from a native coroutine function,一个原生 coroutine 对象。
- A generator-based coroutine object returned from a function decorated with types.coroutine,一个由 types.coroutine 修饰的生成器,这个生成器可以返回 coroutine 对象。
- An object with an await method returning an iterator,一个包含 await 方法的对象返回的一个迭代器。
aiohttp 以异步方式爬取网站的耗时远小于 requests 同步方式,以上列举的例子希望对你有帮助。
