rabbitmq问题,消费者执行时间太长,超过心跳时间,消费者消失,任务堆积
rabbitmq问题,消费者执行时间太长,超过心跳时间,消费者消失,任务堆积1.python多线程使用rabbitmq包地点
flask_rabbitmq
2.解决后的包
import json
import logging
import signal
import sys
import threading
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
import pika
class FlaskRabbitMQ:
def __init__(self, app=None, queue=None, heartbeat=60, max_retries=3, max_workers=500):
self.app = app
self.queue = queue
self.config = None
self.heartbeat = heartbeat
self.max_retries = max_retries# 设置最大重试次数
self.rabbitmq_server_host = None
self.rabbitmq_server_username = None
self.rabbitmq_server_password = None
self._channel = None
self._rpc_class_list = []
self.data = {}
self.logger = logging.getLogger('ApiLogger')
self.error_logger = logging.getLogger('ErrorLogger')
self._thread_local = threading.local()# 为每个线程存储独立通道
self.executor = ThreadPoolExecutor(max_workers=max_workers)# 创建线程池
if app:
self.init_app(app)
def init_app(self, app=None):
self.app = app
if self.queue:
self.queue.init_app(app)
self.config = self.app.config
self.valid_config()
def valid_config(self):
self.rabbitmq_server_host = self.config.get('RABBITMQ_HOST')
self.rabbitmq_server_username = self.config.get('RABBITMQ_USERNAME')
self.rabbitmq_server_password = self.config.get('RABBITMQ_PASSWORD')
def _create_new_connection(self, heart_beat=None):
credentials = pika.PlainCredentials(
self.rabbitmq_server_username,
self.rabbitmq_server_password
)
if heart_beat is None:
heart_beat = self.heartbeat
parameters = pika.ConnectionParameters(
self.rabbitmq_server_host,
credentials=credentials,
heartbeat=heart_beat
)
return pika.BlockingConnection(parameters)
def _get_connection(self, heart_beat=None):
# 为每个线程创建独立连接
if not hasattr(self._thread_local, 'connection') or self._thread_local.connection.is_closed:
self._thread_local.connection = self._create_new_connection(heart_beat)
print(f'创建新连接_thread_local.connection:{self._thread_local.connection}')
return self._thread_local.connection
def _get_channel(self, heart_beat=None):
# 每个线程使用独立连接的通道
connection = self._get_connection(heart_beat)
# 查看当前心跳设置
print(f"Heartbeat: {connection._impl.params.heartbeat}")
return connection.channel()
def temporary_queue_declare(self):
return self.queue_declare(exclusive=True, auto_delete=True)
def queue_declare(self, queue_name='', passive=False, durable=False, exclusive=False, auto_delete=False,
arguments=None):
channel = self._get_channel()
try:
result = channel.queue_declare(
queue=queue_name,
passive=passive,
durable=durable,
exclusive=exclusive,
auto_delete=auto_delete,
arguments=arguments
)
return result.method.queue
except pika.exceptions.ChannelClosedByBroker as e:
if e.reply_code == 406 and "inequivalent arg 'durable'" in e.reply_text:
self.error_logger.error(f"队列 '{queue_name}' 的持久化参数不匹配,正在删除并重新声明。")
channel.queue_delete(queue=queue_name)
result = channel.queue_declare(
queue=queue_name,
passive=passive,
durable=durable,
exclusive=exclusive,
auto_delete=auto_delete,
arguments=arguments
)
return result.method.queue
else:
self.error_logger.error(f"声明队列 '{queue_name}' 时出错: {e}")
raise
finally:
channel.close()
def queue_delete(self, queue_name):
channel = self._get_channel()
try:
self._channel.queue_delete(queue=queue_name)
self.logger.info(f"队列 '{queue_name}' 已成功删除。")
except Exception as e:
self.error_logger.error(f"删除队列 '{queue_name}' 失败: {e}")
raise
finally:
channel.close()
def exchange_bind_to_queue(self, type, exchange_name, routing_key, queue):
channel = self._get_channel()
try:
channel.exchange_declare(exchange=exchange_name, exchange_type=type)
channel.queue_bind(queue=queue, exchange=exchange_name, routing_key=routing_key)
except Exception as e:
self.error_logger.error(f"绑定队列 '{queue}' 到交换机 '{exchange_name}' 时出错: {e}")
raise
finally:
channel.close()
def exchange_declare(self, exchange_name, exchange_type):
channel = self._get_channel()
try:
channel.exchange_declare(exchange=exchange_name, exchange_type=exchange_type)
except Exception as e:
self.error_logger.error(f"交换机 '{exchange_name}' 声明失败: {e}")
raise
finally:
channel.close()
def queue_bind(self, exchange_name, routing_key, queue_name):
channel = self._get_channel()
try:
channel.queue_bind(queue=queue_name, exchange=exchange_name, routing_key=routing_key)
except Exception as e:
self.error_logger.error(f"队列 '{queue_name}' 绑定到交换机 '{exchange_name}' 时出错: {e}")
raise
finally:
channel.close()
def basic_consuming(self, queue_name, callback, arguments=None, auto_ack=False):
channel = self._get_channel()
try:
channel.basic_consume(queue=queue_name, on_message_callback=callback, arguments=arguments,
auto_ack=auto_ack)
except Exception as e:
self.error_logger.error(f"basic_consume 中的流失错误: {e}")
finally:
channel.close()
def send_expire(self, body, exchange, key, properties=None, max_retries=3):
channel = None# 在外部初始化为 None
try:
# 创建新通道进行消息发布
channel = self._get_channel()
if properties:
channel.basic_publish(
exchange=exchange,
routing_key=key,
body=body,
properties=properties
)
else:
channel.basic_publish(
exchange=exchange,
routing_key=key,
body=body
)
except Exception as e:
self.error_logger.error(f'推送消息异常:{e}')
finally:
if channel:# 检查 channel 是否已定义
channel.close()# 关闭通道
def send(self, body, exchange, key, corr_id=None):
channel = self._get_channel()
try:
if not corr_id:
channel.basic_publish(
exchange=exchange,
routing_key=key,
body=body
)
else:
channel.basic_publish(
exchange=exchange,
routing_key=key,
body=body,
properties=pika.BasicProperties(
correlation_id=corr_id
)
)
finally:
channel.close()# 关闭通道
def send_json(self, body, exchange, key, corr_id=None):
data = json.dumps(body)
self.send(data, exchange=exchange, key=key, corr_id=corr_id)
def send_sync(self, body, key=None, timeout=5):
if not key:
raise Exception("The routing key is not present.")
corr_id = str(uuid.uuid4())
callback_queue = self.temporary_queue_declare()
self.data = {
'isAccept': False,
'result': None,
'reply_queue_name': callback_queue
}
channel = self._get_channel()
try:
# 设置消费回调
channel.basic_consume(queue=callback_queue, on_message_callback=self.on_response, auto_ack=True)
# 发送消息
channel.basic_publish(
exchange='',
routing_key=key,
body=body,
properties=pika.BasicProperties(
reply_to=callback_queue,
correlation_id=corr_id,
)
)
# 等待响应
end = time.time() + timeout
while time.time() < end:
if self.data['isAccept']:
self.logger.info("已接收到 RPC 服务器的响应 => {}".format(self.data['result']))
return self.data['result']
else:
time.sleep(0.3)
continue
self.error_logger.error("获取响应超时。")
return None
finally:
channel.close()# 关闭通道
def send_json_sync(self, body, key=None):
if not key:
raise Exception("The routing key is not present.")
data = json.dumps(body)
return self.send_sync(data, key=key)
def accept(self, key, result):
self.data['isAccept'] = True
self.data['result'] = str(result)
channel = self._get_channel()
try:
# 删除回复队列
channel.queue_delete(queue=self.data['reply_queue_name'])
finally:
channel.close()# 关闭通道
def on_response(self, ch, method, props, body):
self.logger.info("接收到响应 => {}".format(body))
corr_id = props.correlation_id
self.accept(corr_id, body)
def register_class(self, rpc_class):
if not hasattr(rpc_class, 'declare'):
raise AttributeError("The registered class must contains the declare method")
self._rpc_class_list.append(rpc_class)
def _run(self):
# 注册所有声明的类
for item in self._rpc_class_list:
item().declare()
# 遍历所有在 Queue 中注册的回调函数
for (type, queue_name, exchange_name, routing_key, version, callback, auto_ack,
thread_num, heart_beat) in self.queue._rpc_class_list:
if type == ExchangeType.DEFAULT:
if not queue_name:
# 如果队列名称为空,则声明一个临时队列
queue_name = self.temporary_queue_declare()
elif version == 1:
self.basic_consuming(queue_name, callback, auto_ack=auto_ack)
else:
self._channel.queue_declare(queue=queue_name, auto_delete=True)
self.basic_consuming(queue_name, callback)
elif type in :
if not queue_name:
# 如果队列名称为空,则声明一个临时队列
queue_name = self.temporary_queue_declare()
elif version == 1:
arguments = {
'x-match': type,# 设置 exchange_type
'routing_key': routing_key,# 设置 routing_key
}
self.basic_consuming(queue_name, callback, arguments=arguments, auto_ack=auto_ack)
else:
self._channel.queue_declare(queue=queue_name)
self.exchange_bind_to_queue(type, exchange_name, routing_key, queue_name)
# 消费队列
self.basic_consuming(queue_name, callback)
# 启动指定数量的线程来处理消息
for _ in range(thread_num):
self.executor.submit(self._start_thread_consumer, queue_name, callback, auto_ack, heart_beat)
self.logger.info(" * Flask RabbitMQ 应用正在消费中")
def _start_thread_consumer(self, queue_name, callback, auto_ack, heart_beat=None) -> None:
channel = self._get_channel(heart_beat)
channel.basic_qos(prefetch_count=1)
try:
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=auto_ack)
channel.start_consuming()
except Exception as e:
self.error_logger.error(f"Error in consumer thread: {e}")
finally:
channel.close()# 消费完成后关闭通道,但不关闭连接
def shutdown_executor(self):
# 关闭线程池
if self.executor:
self.logger.info("关闭线程池...")
self.executor.shutdown(wait=True)# 等待所有线程完成再关闭
def close_rabbitmq_connection(self):
if hasattr(self._thread_local, 'connection') and self._thread_local.connection.is_open:
self._thread_local.connection.close()
def signal_handler(self, sig, frame):
self.logger.info('RabbitMQ开始停止...')
# 关闭线程池
self.shutdown_executor()
# 关闭 RabbitMQ 连接
self.close_rabbitmq_connection()
sys.exit(0)
def run(self):
# 捕获终止信号以进行优雅关闭
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
self._run()
class ExchangeType:
DEFAULT = 'default'
DIRECT = "direct"
FANOUT = "fanout"
TOPIC = 'topic'
class Queue:
"""
支持多线程的Queue类
"""
def __init__(self) -> None:
self._rpc_class_list = []
self.app = None
def __call__(self, queue=None, type=ExchangeType.DEFAULT, version=0, exchange='', routing_key='', auto_ack=False,
thread_num=1, heart_beat=60):
def _(func):
self._rpc_class_list.append((type, queue, exchange, routing_key, version, func, auto_ack, thread_num, heart_beat))
return _
def init_app(self, app=None):
self.app = app
3.怎样使用
@queue(queue='test', type=ExchangeType.DIRECT, exchange='record_exchange',
routing_key='test', version=1, thread_num=15, heart_beat=300)
def prop_code_signal_callback(ch, method, props, body):
try:
data = json.loads(body)
loger.info(f'prop_code_signal -> data:{data}')
# 打印当前线程名称
current_thread = threading.current_thread().name
loger.info(f'prop_code_signal ->当前线程名称: {current_thread}')
# 业务逻辑
# 处理成功,确认消息
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
loger.error(f"test出现异常: {e}")
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]