Python 基于队列实现 tcp socket 毗连池

打印 上一主题 下一主题

主题 1970|帖子 1970|积分 5910

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
毗连池实现

socket_pool.py
  1. # -*- coding:utf-8 -*-
  2. import socket
  3. import time
  4. import threading
  5. import os
  6. import logging
  7. import traceback
  8. from queue import Queue, Empty
  9. _logger = logging.getLogger('mylogger')
  10. class SocketPool:
  11.     def __init__(self, host, port, min_connections=10, max_connections=10):
  12.         '''
  13.         初始化Socket连接池
  14.         :param host: 目标主机地址
  15.         :param port: 目标端口号
  16.         :param min_connections: 最小连接数
  17.         :param max_connections: 最大连接数
  18.         '''
  19.         self.host = host
  20.         self.port = port
  21.         self.min_connections = min_connections
  22.         self.max_connections = max_connections
  23.         self.busy_sockets_dict = {} # 存放从连接池取出的socket的id
  24.         self._sock_lock = threading.Lock()  # 线程锁保证计数正确
  25.         self._pool = Queue(max_connections)  # 基于线程安全的队列存储连接
  26.         self._lock = threading.Lock()        # 线程锁保证资源安全:
  27.         self._init_pool()                    # 预创建连接
  28.         self._start_health_check()           # 启动连接健康检查线程
  29.     def _init_pool(self):
  30.         '''预创建连接并填充到池中'''
  31.         
  32.         for _ in range(self.min_connections):
  33.             sock = self._create_socket()
  34.             self._pool.put(sock)
  35.     def _create_socket(self):
  36.         '''创建新的Socket连接'''
  37.         
  38.         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  39.         try:
  40.             sock.connect((self.host, self.port))
  41.             return sock
  42.         except socket.error as e:
  43.             raise ConnectionError(f'Failed to connect: {e}')  # 连接失败抛出异常
  44.     def _start_health_check(self):
  45.         '''启动后台线程定期检查连接有效性'''
  46.         
  47.         def check():
  48.             while True:
  49.                 with self._lock:
  50.                     for _ in range(self._pool.qsize()):
  51.                         sock = self._pool.get()
  52.                         self.busy_sockets_dict[sock] = 1
  53.                         try:
  54.                             sock.send(b'PING<END>')  # 发送心跳包验证连接状态
  55.                             # 以下 11 为服务端返回数据字节长度,不能乱写,否则会导致获取非健康检查响应报文数据存在多余内容,不符合格式,从而导致数据解析问题
  56.                             sock.recv(11)
  57.                             self._pool.put(sock)
  58.                             self.busy_sockets_dict.pop(sock)
  59.                         except (socket.error, ConnectionResetError):
  60.                             _logger.error('socket连接健康检查出错:%s, 关闭失效连接并创建新连接替换' % traceback.format_exc())
  61.                             sock.close()  # 关闭失效连接并创建新连接替换
  62.                             self.busy_sockets_dict.pop(sock)
  63.                             new_sock = self._create_socket()
  64.                             self._pool.put(new_sock)
  65.                     
  66.                     # 如果sock数量小于最小数量,则补充
  67.                     for _ in range(0, self.min_connections - self._pool.qsize()):
  68.                         new_sock = self._create_socket()
  69.                         self._pool.put(new_sock)
  70.                 time.sleep(60)  # 每60秒检查一次
  71.         threading.Thread(target=check, daemon=True).start()
  72.     def get_connection(self):
  73.         '''
  74.         从池中获取一个可用连接
  75.         :return: socket对象
  76.         '''
  77.         
  78.         with self._sock_lock:
  79.             if self._pool.empty():
  80.                 if len(self.busy_sockets_dict.keys()) < self.max_connections:
  81.                     new_sock = self._create_socket()
  82.                     self.busy_sockets_dict[new_sock] = 1
  83.                     return new_sock
  84.                 else:
  85.                     raise Empty('No available connections in pool')
  86.             else:
  87.                 try:
  88.                     sock = self._pool.get(block=False)
  89.                     self.busy_sockets_dict[sock] = 1
  90.                     return sock
  91.                 except Exception:
  92.                     _logger.error('获取socket连接出错:%s' % traceback.format_exc())
  93.                     raise
  94.                
  95.     def release_connection(self, sock):
  96.         '''
  97.         将连接归还到池中
  98.         :param sock: 待归还的socket对象
  99.         '''
  100.         if not sock._closed:
  101.             self._pool.put(sock)
  102.         if sock in self.busy_sockets_dict:
  103.             self.busy_sockets_dict.pop(sock)
  104.     def close_all(self):
  105.         '''关闭池中所有连接'''
  106.         
  107.         while not self._pool.empty():
  108.             sock = self._pool.get()
  109.             sock.close()
  110.             self.busy_sockets_dict.pop(sock.id)
  111.         self.busy_sockets_dict = {} # 兜底
  112. host = os.environ.get('MODBUS_TCP_SERVER_HOST', '127.0.0.1')
  113. port = int(os.environ.get('MODBUS_TCP_SERVER_PORT', '9000'))
  114. min_connections = int(os.environ.get('DJANGO_SOCKET_POOL_MAX_CONNECTIONS', '10'))
  115. max_connections = int(os.environ.get('DJANGO_SOCKET_POOL_MAX_CONNECTIONS', '100'))
  116. socketPool = SocketPool(host, port, min_connections, max_connections)
复制代码
使用毗连池
  1. from socket_pool import socketPool
  2. def send_socket_msg(data):
  3.     global socketPool
  4.    
  5.     try:
  6.         sock = None
  7.         # 获取连接(支持超时控制)
  8.         sock = socketPool.get_connection()
  9.         # 发送数据
  10.         sock.sendall(data.encode('utf-8'))
  11.     except Exception:
  12.         error_msg = '发送消息出错:%s' % traceback.format_exc()
  13.         _logger.error(error_msg)
  14.         
  15.         if sock is not None:
  16.             sock.close()
  17.             socketPool.release_connection(sock)
  18.         return send_socket_msg(data)
  19.    
  20.     response = ''
  21.     try:
  22.         while True:
  23.             chunk = sock.recv(4096)
  24.             chunk = chunk.decode('utf-8')
  25.             response += chunk
  26.             if response.endswith('<END>'):
  27.                 response = response.rstrip('<END>')
  28.                 return {'success':True, 'message':response}
  29.     except Exception:
  30.         error_msg = '获取消息出错:%s' % traceback.format_exc()
  31.         _logger.error(error_msg)
  32.         return {'success':False, 'message': error_msg}
  33.     finally:
  34.         # 必须归还连接!
  35.         socketPool.release_connection(sock)
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

王海鱼

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表