马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
毗连池实现
socket_pool.py- # -*- coding:utf-8 -*-
- import socket
- import time
- import threading
- import os
- import logging
- import traceback
- from queue import Queue, Empty
- _logger = logging.getLogger('mylogger')
- class SocketPool:
- def __init__(self, host, port, min_connections=10, max_connections=10):
- '''
- 初始化Socket连接池
- :param host: 目标主机地址
- :param port: 目标端口号
- :param min_connections: 最小连接数
- :param max_connections: 最大连接数
- '''
- self.host = host
- self.port = port
- self.min_connections = min_connections
- self.max_connections = max_connections
- self.busy_sockets_dict = {} # 存放从连接池取出的socket的id
- self._sock_lock = threading.Lock() # 线程锁保证计数正确
- self._pool = Queue(max_connections) # 基于线程安全的队列存储连接
- self._lock = threading.Lock() # 线程锁保证资源安全:
- self._init_pool() # 预创建连接
- self._start_health_check() # 启动连接健康检查线程
- def _init_pool(self):
- '''预创建连接并填充到池中'''
-
- for _ in range(self.min_connections):
- sock = self._create_socket()
- self._pool.put(sock)
- def _create_socket(self):
- '''创建新的Socket连接'''
-
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- try:
- sock.connect((self.host, self.port))
- return sock
- except socket.error as e:
- raise ConnectionError(f'Failed to connect: {e}') # 连接失败抛出异常
- def _start_health_check(self):
- '''启动后台线程定期检查连接有效性'''
-
- def check():
- while True:
- with self._lock:
- for _ in range(self._pool.qsize()):
- sock = self._pool.get()
- self.busy_sockets_dict[sock] = 1
- try:
- sock.send(b'PING<END>') # 发送心跳包验证连接状态
- # 以下 11 为服务端返回数据字节长度,不能乱写,否则会导致获取非健康检查响应报文数据存在多余内容,不符合格式,从而导致数据解析问题
- sock.recv(11)
- self._pool.put(sock)
- self.busy_sockets_dict.pop(sock)
- except (socket.error, ConnectionResetError):
- _logger.error('socket连接健康检查出错:%s, 关闭失效连接并创建新连接替换' % traceback.format_exc())
- sock.close() # 关闭失效连接并创建新连接替换
- self.busy_sockets_dict.pop(sock)
- new_sock = self._create_socket()
- self._pool.put(new_sock)
-
- # 如果sock数量小于最小数量,则补充
- for _ in range(0, self.min_connections - self._pool.qsize()):
- new_sock = self._create_socket()
- self._pool.put(new_sock)
- time.sleep(60) # 每60秒检查一次
- threading.Thread(target=check, daemon=True).start()
- def get_connection(self):
- '''
- 从池中获取一个可用连接
- :return: socket对象
- '''
-
- with self._sock_lock:
- if self._pool.empty():
- if len(self.busy_sockets_dict.keys()) < self.max_connections:
- new_sock = self._create_socket()
- self.busy_sockets_dict[new_sock] = 1
- return new_sock
- else:
- raise Empty('No available connections in pool')
- else:
- try:
- sock = self._pool.get(block=False)
- self.busy_sockets_dict[sock] = 1
- return sock
- except Exception:
- _logger.error('获取socket连接出错:%s' % traceback.format_exc())
- raise
-
- def release_connection(self, sock):
- '''
- 将连接归还到池中
- :param sock: 待归还的socket对象
- '''
- if not sock._closed:
- self._pool.put(sock)
- if sock in self.busy_sockets_dict:
- self.busy_sockets_dict.pop(sock)
- def close_all(self):
- '''关闭池中所有连接'''
-
- while not self._pool.empty():
- sock = self._pool.get()
- sock.close()
- self.busy_sockets_dict.pop(sock.id)
- self.busy_sockets_dict = {} # 兜底
- host = os.environ.get('MODBUS_TCP_SERVER_HOST', '127.0.0.1')
- port = int(os.environ.get('MODBUS_TCP_SERVER_PORT', '9000'))
- min_connections = int(os.environ.get('DJANGO_SOCKET_POOL_MAX_CONNECTIONS', '10'))
- max_connections = int(os.environ.get('DJANGO_SOCKET_POOL_MAX_CONNECTIONS', '100'))
- socketPool = SocketPool(host, port, min_connections, max_connections)
复制代码 使用毗连池
- from socket_pool import socketPool
- def send_socket_msg(data):
- global socketPool
-
- try:
- sock = None
- # 获取连接(支持超时控制)
- sock = socketPool.get_connection()
- # 发送数据
- sock.sendall(data.encode('utf-8'))
- except Exception:
- error_msg = '发送消息出错:%s' % traceback.format_exc()
- _logger.error(error_msg)
-
- if sock is not None:
- sock.close()
- socketPool.release_connection(sock)
- return send_socket_msg(data)
-
- response = ''
- try:
- while True:
- chunk = sock.recv(4096)
- chunk = chunk.decode('utf-8')
- response += chunk
- if response.endswith('<END>'):
- response = response.rstrip('<END>')
- return {'success':True, 'message':response}
- except Exception:
- error_msg = '获取消息出错:%s' % traceback.format_exc()
- _logger.error(error_msg)
- return {'success':False, 'message': error_msg}
- finally:
- # 必须归还连接!
- socketPool.release_connection(sock)
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |