项目结构图项目结构如下:
项目代码
redis_proxy.py复制代码
- # -*- encoding:utf-8 -*-
- # @time: 2022/7/4 11:32
- # @author: Maxs_hu
- """
- 这里用来做redis中间商. 去控制redis和ip之间的调用关系
- """
- from redis import Redis
- import random
- class RedisProxy:
- def __init__(self):
- # 连接到redis数据库
- self.red = Redis(
- host='localhost',
- port=6379,
- db=9,
- password=123456,
- decode_responses=True
- )
- # 1. 存储到redis中. 存储之前需要提前判断ip是否存在. 防止将已存在的ip的score抵掉
- # 2. 需要校验所有的ip. 查询ip
- # 3. 验证可用性. 可用分值拉满. 不可用扣分
- # 4. 将可用的ip查出来返回给用户
- # 先给满分的
- # 再给有分的
- # 都没有分. 就不给
- def add_ip(self, ip): # 外界调用并传入ip
- # 判断ip在redis中是否存在
- if not self.red.zscore('proxy_ip', ip):
- self.red.zadd('proxy_ip', {ip: 10})
- print('proxy_ip存储完毕', ip)
- else:
- print('存在重复', ip)
- def get_all_proxy(self):
- # 查询所有的ip功能
- return self.red.zrange('proxy_ip', 0, -1)
- def set_max_score(self, ip):
- self.red.zadd('proxy_ip', {ip: 100}) # 注意是引号的格式
- def deduct_score(self, ip):
- # 先将分数查询出来
- score = self.red.zscore('proxy_ip', ip)
- # 如果有分值.那就扣一分
- if score > 0:
- self.red.zincrby('proxy_ip', -1, ip)
- else:
- # 如果分值已经扣的小于0了. 那么可以直接删除了
- self.red.zrem('proxy_ip', ip)
- def effect_ip(self):
- # 先将ip通过分数筛选出来
- ips = self.red.zrangebyscore('proxy_ip', 100, 100, 0, -1)
- if ips:
- return random.choice(ips)
- else: # 没有满分的
- # 将九十分以上的筛选出来
- ips = self.red.zrangebyscore('proxy_ip', 11, 99, 0, -1)
- if ips:
- return random.choice(ips)
- else:
- print('无可用ip')
- return None
ip_collection.py复制代码
- # -*- encoding:utf-8 -*-
- # @time: 2022/7/4 11:32
- # @author: Maxs_hu
- """
- 这里用来收集ip
- """
- from redis_proxy import RedisProxy
- import requests
- from lxml import html
- from multiprocessing import Process
- import time
- import random
- def get_kuai_ip(red):
- url = "https://free.kuaidaili.com/free/intr/"
- headers = {
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"
- }
- resp = requests.get(url, headers=headers)
- etree = html.etree
- et = etree.HTML(resp.text)
- trs = et.xpath('//table//tr')
- for tr in trs:
- ip = tr.xpath('./td[1]/text()')
- port = tr.xpath('./td[2]/text()')
- if not ip: # 将不含有ip值的筛除
- continue
- proxy_ip = ip[0] + ":" + port[0]
- red.add_ip(proxy_ip)
- def get_unknown_ip(red):
- url = "https://ip.jiangxianli.com/"
- headers = {
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"
- }
- resp = requests.get(url, headers=headers)
- etree = html.etree
- et = etree.HTML(resp.text)
- trs = et.xpath('//table//tr')
- for tr in trs:
- ip = tr.xpath('./td[1]/text()')
- port = tr.xpath('./td[2]/text()')
- if not ip: # 将不含有ip值的筛除
- continue
- proxy_ip = ip[0] + ":" + port[0]
- red.add_ip(proxy_ip)
- def get_happy_ip(red):
- page = random.randint(1, 5)
- url = f'http://www.kxdaili.com/dailiip/2/{page}.html'
- headers = {
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"
- }
- resp = requests.get(url, headers=headers)
- etree = html.etree
- et = etree.HTML(resp.text)
- trs = et.xpath('//table//tr')
- for tr in trs:
- ip = tr.xpath('./td[1]/text()')
- port = tr.xpath('./td[2]/text()')
- if not ip: # 将不含有ip值的筛除
- continue
- proxy_ip = ip[0] + ":" + port[0]
- red.add_ip(proxy_ip)
- def get_nima_ip(red):
- url = 'http://www.nimadaili.com/'
- headers = {
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"
- }
- resp = requests.get(url, headers=headers)
- etree = html.etree
- et = etree.HTML(resp.text)
- trs = et.xpath('//table//tr')
- for tr in trs:
- ip = tr.xpath('./td[1]/text()') # 这里存在空值. 所以不能在后面加[0]
- if not ip:
- continue
- red.add_ip(ip[0])
- def get_89_ip(red):
- page = random.randint(1, 26)
- url = f'https://www.89ip.cn/index_{page}.html'
- headers = {
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"
- }
- resp = requests.get(url, headers=headers)
- etree = html.etree
- et = etree.HTML(resp.text)
- trs = et.xpath('//table//tr')
- for tr in trs:
- ip = tr.xpath('./td[1]/text()')
- if not ip:
- continue
- red.add_ip(ip[0].strip())
- def main():
- # 创建一个redis实例化对象
- red = RedisProxy()
- print("开始采集数据")
- while 1:
- try:
- # 这里可以添加各种采集的网站
- print('>>>开始收集快代理ip')
- get_kuai_ip(red) # 收集快代理
- # get_unknown_ip(red) # 收集ip
- print(">>>开始收集开心代理ip")
- get_happy_ip(red) # 收集开心代理
- print(">>>开始收集泥马代理ip")
- # get_nima_ip(red) # 收集泥马代理
- print(">>>开始收集89代理ip")
- get_89_ip(red)
- time.sleep(60)
- except Exception as e:
- print('ip储存出错了', e)
- time.sleep(60)
- if __name__ == '__main__':
- main()
- # 创建一个子进程
- # p = Process(target=main)
- # p.start()
ip_verify.py复制代码
- # -*- encoding:utf-8 -*-
- # @time: 2022/7/4 11:34
- # @author: Maxs_hu
- """
- 这里用来验证ip的可用性: 使用携程发送请求增加效率
- """
- from redis_proxy import RedisProxy
- from multiprocessing import Process
- import asyncio
- import aiohttp
- import time
- async def verify_ip(ip, red, sem):
- timeout = aiohttp.ClientTimeout(total=10) # 设置网页等待时间不超过十秒
- try:
- async with sem:
- async with aiohttp.ClientSession() as session:
- async with session.get(url='http://www.baidu.com/',
- proxy='http://'+ip,
- timeout=timeout) as resp:
- page_source = await resp.text()
- if resp.status in [200, 302]:
- # 如果可用. 加分
- red.set_max_score(ip)
- print('验证没有问题. 分值拉满~', ip)
- else:
- # 如果不可用. 扣分
- red.deduct_score(ip)
- print('问题ip. 扣一分', ip)
- except Exception as e:
- print('出错了', e)
- red.deduct_score(ip)
- print('问题ip. 扣一分', ip)
- async def task(red):
- ips = red.get_all_proxy()
- sem = asyncio.Semaphore(30) # 设置每次三十的信号量
- tasks = []
- for ip in ips:
- tasks.append(asyncio.create_task(verify_ip(ip, red, sem)))
- if tasks:
- await asyncio.wait(tasks)
- def main():
- red = RedisProxy()
- time.sleep(5) # 初始的等待时间. 等待采集到数据
- print("开始验证可用性")
- while 1:
- try:
- asyncio.run(task(red))
- time.sleep(100)
- except Exception as e:
- print("ip_verify出错了", e)
- time.sleep(100)
- if __name__ == '__main__':
- main()
- # 创建一个子进程
- # p = Process(target=main())
- # p.start()
ip_api.py复制代码
- # -*- encoding:utf-8 -*-
- # @time: 2022/7/4 11:35
- # @author: Maxs_hu
- """
- 这里用来提供给用户ip接口. 通过写后台服务器. 用户访问我们的服务器就可以得到可用的代理ip:
- 1. flask
- 2. sanic --> 今天使用这个要稍微简单一点
- """
- from redis_proxy import RedisProxy
- from sanic import Sanic, json
- from sanic_cors import CORS
- from multiprocessing import Process
- # 创建一个app
- app = Sanic('ip') # 随便给个名字
- # 解决跨域问题
- CORS(app)
- red = RedisProxy()
- @app.route('maxs_hu_ip') # 添加路由
- def api(req): # 第一个请求参数固定. 请求对象
- ip = red.effect_ip()
- return json({"ip": ip})
- def main():
- # 让sanic跑起来
- app.run(host='127.0.0.1', port=1234)
- if __name__ == '__main__':
- main()
- # p = Process(target=main())
- # p.start()
runner.py测试ip是否可用.py复制代码
- # -*- encoding:utf-8 -*-
- # @time: 2022/7/5 17:36
- # @author: Maxs_hu
- from ip_api import main as api_run
- from ip_collection import main as coll_run
- from ip_verify import main as veri_run
- from multiprocessing import Process
- def main():
- # 设置互不干扰的三个进程
- p1 = Process(target=api_run) # 只需要将目标函数的内存地址传过去即可
- p2 = Process(target=coll_run)
- p3 = Process(target=veri_run)
- p1.start()
- p2.start()
- p3.start()
- if __name__ == '__main__':
- main()
项目运行截图:
redis储存截图:总结
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) | Powered by Discuz! X3.4 |