基于后端和爬虫创建的代理ip池

打印 上一主题 下一主题

主题 906|帖子 906|积分 2718

搭建免费的代理ip池

需要解决的问题:

  • 使用什么方式存储ip

    • 文件存储
      缺点: 打开文件修改文件操作较麻烦
    • mysql
      缺点: 查询速度较慢
    • mongodb
      缺点: 查询速度较慢. 没有查重功能
    • redis  --> 使用redis存储最为合适
    所以 -> 数据结构采用redis中的zset有序集合

  • 获取ip的网站

  • 项目架构???
项目架构


  • 获取api
  • 筛选api
  • 验证api的有效性
  • 提供api
项目结构图

项目结构如下:

项目代码

code文件夹
redis_proxy.py
  1. # -*- encoding:utf-8 -*-
  2. # @time: 2022/7/4 11:32
  3. # @author: Maxs_hu
  4. """
  5. 这里用来做redis中间商. 去控制redis和ip之间的调用关系
  6. """
  7. from redis import Redis
  8. import random
  9. class RedisProxy:
  10.     def __init__(self):
  11.         # 连接到redis数据库
  12.         self.red = Redis(
  13.             host='localhost',
  14.             port=6379,
  15.             db=9,
  16.             password=123456,
  17.             decode_responses=True
  18.         )
  19.     # 1. 存储到redis中. 存储之前需要提前判断ip是否存在. 防止将已存在的ip的score抵掉
  20.     # 2. 需要校验所有的ip. 查询ip
  21.     # 3. 验证可用性. 可用分值拉满. 不可用扣分
  22.     # 4. 将可用的ip查出来返回给用户
  23.     #       先给满分的
  24.     #       再给有分的
  25.     #       都没有分. 就不给
  26.     def add_ip(self, ip):  # 外界调用并传入ip
  27.         # 判断ip在redis中是否存在
  28.         if not self.red.zscore('proxy_ip', ip):
  29.             self.red.zadd('proxy_ip', {ip: 10})
  30.             print('proxy_ip存储完毕', ip)
  31.         else:
  32.             print('存在重复', ip)
  33.     def get_all_proxy(self):
  34.         # 查询所有的ip功能
  35.         return self.red.zrange('proxy_ip', 0, -1)
  36.     def set_max_score(self, ip):
  37.         self.red.zadd('proxy_ip', {ip: 100})  # 注意是引号的格式
  38.     def deduct_score(self, ip):
  39.         # 先将分数查询出来
  40.         score = self.red.zscore('proxy_ip', ip)
  41.         # 如果有分值.那就扣一分
  42.         if score > 0:
  43.             self.red.zincrby('proxy_ip', -1, ip)
  44.         else:
  45.             # 如果分值已经扣的小于0了. 那么可以直接删除了
  46.             self.red.zrem('proxy_ip', ip)
  47.     def effect_ip(self):
  48.         # 先将ip通过分数筛选出来
  49.         ips = self.red.zrangebyscore('proxy_ip', 100, 100, 0, -1)
  50.         if ips:
  51.             return random.choice(ips)
  52.         else:  # 没有满分的
  53.             # 将九十分以上的筛选出来
  54.             ips = self.red.zrangebyscore('proxy_ip', 11, 99, 0, -1)
  55.             if ips:
  56.                 return random.choice(ips)
  57.             else:
  58.                 print('无可用ip')
  59.                 return None
复制代码
ip_collection.py
  1. # -*- encoding:utf-8 -*-
  2. # @time: 2022/7/4 11:32
  3. # @author: Maxs_hu
  4. """
  5. 这里用来收集ip
  6. """
  7. from redis_proxy import RedisProxy
  8. import requests
  9. from lxml import html
  10. from multiprocessing import Process
  11. import time
  12. import random
  13. def get_kuai_ip(red):
  14.     url = "https://free.kuaidaili.com/free/intr/"
  15.     headers = {
  16.         "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"
  17.     }
  18.     resp = requests.get(url, headers=headers)
  19.     etree = html.etree
  20.     et = etree.HTML(resp.text)
  21.     trs = et.xpath('//table//tr')
  22.     for tr in trs:
  23.         ip = tr.xpath('./td[1]/text()')
  24.         port = tr.xpath('./td[2]/text()')
  25.         if not ip:  # 将不含有ip值的筛除
  26.             continue
  27.         proxy_ip = ip[0] + ":" + port[0]
  28.         red.add_ip(proxy_ip)
  29. def get_unknown_ip(red):
  30.     url = "https://ip.jiangxianli.com/"
  31.     headers = {
  32.         "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"
  33.     }
  34.     resp = requests.get(url, headers=headers)
  35.     etree = html.etree
  36.     et = etree.HTML(resp.text)
  37.     trs = et.xpath('//table//tr')
  38.     for tr in trs:
  39.         ip = tr.xpath('./td[1]/text()')
  40.         port = tr.xpath('./td[2]/text()')
  41.         if not ip:  # 将不含有ip值的筛除
  42.             continue
  43.         proxy_ip = ip[0] + ":" + port[0]
  44.         red.add_ip(proxy_ip)
  45. def get_happy_ip(red):
  46.     page = random.randint(1, 5)
  47.     url = f'http://www.kxdaili.com/dailiip/2/{page}.html'
  48.     headers = {
  49.         "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"
  50.     }
  51.     resp = requests.get(url, headers=headers)
  52.     etree = html.etree
  53.     et = etree.HTML(resp.text)
  54.     trs = et.xpath('//table//tr')
  55.     for tr in trs:
  56.         ip = tr.xpath('./td[1]/text()')
  57.         port = tr.xpath('./td[2]/text()')
  58.         if not ip:  # 将不含有ip值的筛除
  59.             continue
  60.         proxy_ip = ip[0] + ":" + port[0]
  61.         red.add_ip(proxy_ip)
  62. def get_nima_ip(red):
  63.     url = 'http://www.nimadaili.com/'
  64.     headers = {
  65.         "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"
  66.     }
  67.     resp = requests.get(url, headers=headers)
  68.     etree = html.etree
  69.     et = etree.HTML(resp.text)
  70.     trs = et.xpath('//table//tr')
  71.     for tr in trs:
  72.         ip = tr.xpath('./td[1]/text()')  # 这里存在空值. 所以不能在后面加[0]
  73.         if not ip:
  74.             continue
  75.         red.add_ip(ip[0])
  76. def get_89_ip(red):
  77.     page = random.randint(1, 26)
  78.     url = f'https://www.89ip.cn/index_{page}.html'
  79.     headers = {
  80.         "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"
  81.     }
  82.     resp = requests.get(url, headers=headers)
  83.     etree = html.etree
  84.     et = etree.HTML(resp.text)
  85.     trs = et.xpath('//table//tr')
  86.     for tr in trs:
  87.         ip = tr.xpath('./td[1]/text()')
  88.         if not ip:
  89.             continue
  90.         red.add_ip(ip[0].strip())
  91. def main():
  92.     # 创建一个redis实例化对象
  93.     red = RedisProxy()
  94.     print("开始采集数据")
  95.     while 1:
  96.         try:
  97.             # 这里可以添加各种采集的网站
  98.             print('>>>开始收集快代理ip')
  99.             get_kuai_ip(red)  # 收集快代理
  100.             # get_unknown_ip(red)  # 收集ip
  101.             print(">>>开始收集开心代理ip")
  102.             get_happy_ip(red)  # 收集开心代理
  103.             print(">>>开始收集泥马代理ip")
  104.             # get_nima_ip(red)  # 收集泥马代理
  105.             print(">>>开始收集89代理ip")
  106.             get_89_ip(red)
  107.             time.sleep(60)
  108.         except Exception as e:
  109.             print('ip储存出错了', e)
  110.             time.sleep(60)
  111. if __name__ == '__main__':
  112.     main()
  113.     # 创建一个子进程
  114.     # p = Process(target=main)
  115.     # p.start()
复制代码
ip_verify.py
  1. # -*- encoding:utf-8 -*-
  2. # @time: 2022/7/4 11:34
  3. # @author: Maxs_hu
  4. """
  5. 这里用来验证ip的可用性: 使用携程发送请求增加效率
  6. """
  7. from redis_proxy import RedisProxy
  8. from multiprocessing import Process
  9. import asyncio
  10. import aiohttp
  11. import time
  12. async def verify_ip(ip, red, sem):
  13.     timeout = aiohttp.ClientTimeout(total=10)  # 设置网页等待时间不超过十秒
  14.     try:
  15.         async with sem:
  16.             async with aiohttp.ClientSession() as session:
  17.                 async with session.get(url='http://www.baidu.com/',
  18.                                        proxy='http://'+ip,
  19.                                        timeout=timeout) as resp:
  20.                     page_source = await resp.text()
  21.                     if resp.status in [200, 302]:
  22.                         # 如果可用. 加分
  23.                         red.set_max_score(ip)
  24.                         print('验证没有问题. 分值拉满~', ip)
  25.                     else:
  26.                         # 如果不可用. 扣分
  27.                         red.deduct_score(ip)
  28.                         print('问题ip. 扣一分', ip)
  29.     except Exception as e:
  30.         print('出错了', e)
  31.         red.deduct_score(ip)
  32.         print('问题ip. 扣一分', ip)
  33. async def task(red):
  34.     ips = red.get_all_proxy()
  35.     sem = asyncio.Semaphore(30)  # 设置每次三十的信号量
  36.     tasks = []
  37.     for ip in ips:
  38.         tasks.append(asyncio.create_task(verify_ip(ip, red, sem)))
  39.     if tasks:
  40.         await asyncio.wait(tasks)
  41. def main():
  42.     red = RedisProxy()
  43.     time.sleep(5)  # 初始的等待时间. 等待采集到数据
  44.     print("开始验证可用性")
  45.     while 1:
  46.         try:
  47.             asyncio.run(task(red))
  48.             time.sleep(100)
  49.         except Exception as e:
  50.             print("ip_verify出错了", e)
  51.             time.sleep(100)
  52. if __name__ == '__main__':
  53.     main()
  54.     # 创建一个子进程
  55.     # p = Process(target=main())
  56.     # p.start()
复制代码
ip_api.py
  1. # -*- encoding:utf-8 -*-
  2. # @time: 2022/7/4 11:35
  3. # @author: Maxs_hu
  4. """
  5. 这里用来提供给用户ip接口. 通过写后台服务器. 用户访问我们的服务器就可以得到可用的代理ip:
  6.    1. flask
  7.    2. sanic --> 今天使用这个要稍微简单一点
  8. """
  9. from redis_proxy import RedisProxy
  10. from sanic import Sanic, json
  11. from sanic_cors import CORS
  12. from multiprocessing import Process
  13. # 创建一个app
  14. app = Sanic('ip')  # 随便给个名字
  15. # 解决跨域问题
  16. CORS(app)
  17. red = RedisProxy()
  18. @app.route('maxs_hu_ip')  # 添加路由
  19. def api(req):  # 第一个请求参数固定. 请求对象
  20.    ip = red.effect_ip()
  21.    return json({"ip": ip})
  22. def main():
  23.    # 让sanic跑起来
  24.    app.run(host='127.0.0.1', port=1234)
  25. if __name__ == '__main__':
  26.    main()
  27.    # p = Process(target=main())
  28.    # p.start()
复制代码
runner.py
  1. # -*- encoding:utf-8 -*-
  2. # @time: 2022/7/5 17:36
  3. # @author: Maxs_hu
  4. from ip_api import main as api_run
  5. from ip_collection import main as coll_run
  6. from ip_verify import main as veri_run
  7. from multiprocessing import Process
  8. def main():
  9.     # 设置互不干扰的三个进程
  10.     p1 = Process(target=api_run)  # 只需要将目标函数的内存地址传过去即可
  11.     p2 = Process(target=coll_run)
  12.     p3 = Process(target=veri_run)
  13.     p1.start()
  14.     p2.start()
  15.     p3.start()
  16. if __name__ == '__main__':
  17.     main()
复制代码
测试ip是否可用.py
  1. # -*- encoding:utf-8 -*-
  2. # @time: 2022/7/5 18:15
  3. # @author: Maxs_hu
  4. import requests
  5. def get_proxy():
  6.     url = "http://127.0.0.1:1234/maxs_hu_ip"
  7.     resp = requests.get(url)
  8.     return resp.json()
  9. def main():
  10.     url = 'http://mip.chinaz.com/?query=' + get_proxy()["ip"]
  11.     proxies = {
  12.         "http": 'http://' + get_proxy()["ip"],
  13.         "https": 'http://' + get_proxy()["ip"]  # 目前代理只支持http请求
  14.     }
  15.     headers = {
  16.         "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36",
  17.     }
  18.     resp = requests.get(url, proxies=proxies, headers=headers)
  19.     resp.encoding = 'utf-8'
  20.     print(resp.text)  # 物理位置
  21. if __name__ == '__main__':
  22.     main()
复制代码
运行效果

项目运行截图:

redis储存截图:

总结


  • 免费代理ip只支持http的网页操作. 并不好用. 如果有需求可以进行购买然后加入ip代理池
  • 网页部署到自己的服务器上. 别人访问自己的服务器. 以后学了全栈可以加上登录. 和付费功能. 实现功能的进一步拓展
  • 项目架构是生产者消费者模型. 三个模块同时运行. 每个模块为一个进程. 互不影响
  • 代理设计细节有待处理. 但总体运行效果还可以. 遇到问题再修改

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

商道如狼道

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表