rabbitmq问题,消费者执行时间太长,超过心跳时间,消费者消失,任务堆积 ...

一给  金牌会员 | 2024-12-15 10:52:49 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 988|帖子 988|积分 2964

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
rabbitmq问题,消费者执行时间太长,超过心跳时间,消费者消失,任务堆积

1.python多线程使用rabbitmq包地点

flask_rabbitmq
2.解决后的包

  1. import json
  2. import logging
  3. import signal
  4. import sys
  5. import threading
  6. import time
  7. import uuid
  8. from concurrent.futures import ThreadPoolExecutor
  9. import pika
  10. class FlaskRabbitMQ:
  11.     def __init__(self, app=None, queue=None, heartbeat=60, max_retries=3, max_workers=500):
  12.         self.app = app
  13.         self.queue = queue
  14.         self.config = None
  15.         self.heartbeat = heartbeat
  16.         self.max_retries = max_retries  # 设置最大重试次数
  17.         self.rabbitmq_server_host = None
  18.         self.rabbitmq_server_username = None
  19.         self.rabbitmq_server_password = None
  20.         self._channel = None
  21.         self._rpc_class_list = []
  22.         self.data = {}
  23.         self.logger = logging.getLogger('ApiLogger')
  24.         self.error_logger = logging.getLogger('ErrorLogger')
  25.         self._thread_local = threading.local()  # 为每个线程存储独立通道
  26.         self.executor = ThreadPoolExecutor(max_workers=max_workers)  # 创建线程池
  27.         if app:
  28.             self.init_app(app)
  29.     def init_app(self, app=None):
  30.         self.app = app
  31.         if self.queue:
  32.             self.queue.init_app(app)
  33.         self.config = self.app.config
  34.         self.valid_config()
  35.     def valid_config(self):
  36.         self.rabbitmq_server_host = self.config.get('RABBITMQ_HOST')
  37.         self.rabbitmq_server_username = self.config.get('RABBITMQ_USERNAME')
  38.         self.rabbitmq_server_password = self.config.get('RABBITMQ_PASSWORD')
  39.     def _create_new_connection(self, heart_beat=None):
  40.         credentials = pika.PlainCredentials(
  41.             self.rabbitmq_server_username,
  42.             self.rabbitmq_server_password
  43.         )
  44.         if heart_beat is None:
  45.             heart_beat = self.heartbeat
  46.         parameters = pika.ConnectionParameters(
  47.             self.rabbitmq_server_host,
  48.             credentials=credentials,
  49.             heartbeat=heart_beat
  50.         )
  51.         return pika.BlockingConnection(parameters)
  52.     def _get_connection(self, heart_beat=None):
  53.         # 为每个线程创建独立连接
  54.         if not hasattr(self._thread_local, 'connection') or self._thread_local.connection.is_closed:
  55.             self._thread_local.connection = self._create_new_connection(heart_beat)
  56.             print(f'创建新连接_thread_local.connection:{self._thread_local.connection}')
  57.         return self._thread_local.connection
  58.     def _get_channel(self, heart_beat=None):
  59.         # 每个线程使用独立连接的通道
  60.         connection = self._get_connection(heart_beat)
  61.         # 查看当前心跳设置
  62.         print(f"Heartbeat: {connection._impl.params.heartbeat}")
  63.         return connection.channel()
  64.     def temporary_queue_declare(self):
  65.         return self.queue_declare(exclusive=True, auto_delete=True)
  66.     def queue_declare(self, queue_name='', passive=False, durable=False, exclusive=False, auto_delete=False,
  67.                       arguments=None):
  68.         channel = self._get_channel()
  69.         try:
  70.             result = channel.queue_declare(
  71.                 queue=queue_name,
  72.                 passive=passive,
  73.                 durable=durable,
  74.                 exclusive=exclusive,
  75.                 auto_delete=auto_delete,
  76.                 arguments=arguments
  77.             )
  78.             return result.method.queue
  79.         except pika.exceptions.ChannelClosedByBroker as e:
  80.             if e.reply_code == 406 and "inequivalent arg 'durable'" in e.reply_text:
  81.                 self.error_logger.error(f"队列 '{queue_name}' 的持久化参数不匹配,正在删除并重新声明。")
  82.                 channel.queue_delete(queue=queue_name)
  83.                 result = channel.queue_declare(
  84.                     queue=queue_name,
  85.                     passive=passive,
  86.                     durable=durable,
  87.                     exclusive=exclusive,
  88.                     auto_delete=auto_delete,
  89.                     arguments=arguments
  90.                 )
  91.                 return result.method.queue
  92.             else:
  93.                 self.error_logger.error(f"声明队列 '{queue_name}' 时出错: {e}")
  94.                 raise
  95.         finally:
  96.             channel.close()
  97.     def queue_delete(self, queue_name):
  98.         channel = self._get_channel()
  99.         try:
  100.             self._channel.queue_delete(queue=queue_name)
  101.             self.logger.info(f"队列 '{queue_name}' 已成功删除。")
  102.         except Exception as e:
  103.             self.error_logger.error(f"删除队列 '{queue_name}' 失败: {e}")
  104.             raise
  105.         finally:
  106.             channel.close()
  107.     def exchange_bind_to_queue(self, type, exchange_name, routing_key, queue):
  108.         channel = self._get_channel()
  109.         try:
  110.             channel.exchange_declare(exchange=exchange_name, exchange_type=type)
  111.             channel.queue_bind(queue=queue, exchange=exchange_name, routing_key=routing_key)
  112.         except Exception as e:
  113.             self.error_logger.error(f"绑定队列 '{queue}' 到交换机 '{exchange_name}' 时出错: {e}")
  114.             raise
  115.         finally:
  116.             channel.close()
  117.     def exchange_declare(self, exchange_name, exchange_type):
  118.         channel = self._get_channel()
  119.         try:
  120.             channel.exchange_declare(exchange=exchange_name, exchange_type=exchange_type)
  121.         except Exception as e:
  122.             self.error_logger.error(f"交换机 '{exchange_name}' 声明失败: {e}")
  123.             raise
  124.         finally:
  125.             channel.close()
  126.     def queue_bind(self, exchange_name, routing_key, queue_name):
  127.         channel = self._get_channel()
  128.         try:
  129.             channel.queue_bind(queue=queue_name, exchange=exchange_name, routing_key=routing_key)
  130.         except Exception as e:
  131.             self.error_logger.error(f"队列 '{queue_name}' 绑定到交换机 '{exchange_name}' 时出错: {e}")
  132.             raise
  133.         finally:
  134.             channel.close()
  135.     def basic_consuming(self, queue_name, callback, arguments=None, auto_ack=False):
  136.         channel = self._get_channel()
  137.         try:
  138.             channel.basic_consume(queue=queue_name, on_message_callback=callback, arguments=arguments,
  139.                                   auto_ack=auto_ack)
  140.         except Exception as e:
  141.             self.error_logger.error(f"basic_consume 中的流失错误: {e}")
  142.         finally:
  143.             channel.close()
  144.     def send_expire(self, body, exchange, key, properties=None, max_retries=3):
  145.         channel = None  # 在外部初始化为 None
  146.         try:
  147.             # 创建新通道进行消息发布
  148.             channel = self._get_channel()
  149.             if properties:
  150.                 channel.basic_publish(
  151.                     exchange=exchange,
  152.                     routing_key=key,
  153.                     body=body,
  154.                     properties=properties
  155.                 )
  156.             else:
  157.                 channel.basic_publish(
  158.                     exchange=exchange,
  159.                     routing_key=key,
  160.                     body=body
  161.                 )
  162.         except Exception as e:
  163.             self.error_logger.error(f'推送消息异常:{e}')
  164.         finally:
  165.             if channel:  # 检查 channel 是否已定义
  166.                 channel.close()  # 关闭通道
  167.     def send(self, body, exchange, key, corr_id=None):
  168.         channel = self._get_channel()
  169.         try:
  170.             if not corr_id:
  171.                 channel.basic_publish(
  172.                     exchange=exchange,
  173.                     routing_key=key,
  174.                     body=body
  175.                 )
  176.             else:
  177.                 channel.basic_publish(
  178.                     exchange=exchange,
  179.                     routing_key=key,
  180.                     body=body,
  181.                     properties=pika.BasicProperties(
  182.                         correlation_id=corr_id
  183.                     )
  184.                 )
  185.         finally:
  186.             channel.close()  # 关闭通道
  187.     def send_json(self, body, exchange, key, corr_id=None):
  188.         data = json.dumps(body)
  189.         self.send(data, exchange=exchange, key=key, corr_id=corr_id)
  190.     def send_sync(self, body, key=None, timeout=5):
  191.         if not key:
  192.             raise Exception("The routing key is not present.")
  193.         corr_id = str(uuid.uuid4())
  194.         callback_queue = self.temporary_queue_declare()
  195.         self.data[corr_id] = {
  196.             'isAccept': False,
  197.             'result': None,
  198.             'reply_queue_name': callback_queue
  199.         }
  200.         channel = self._get_channel()
  201.         try:
  202.             # 设置消费回调
  203.             channel.basic_consume(queue=callback_queue, on_message_callback=self.on_response, auto_ack=True)
  204.             # 发送消息
  205.             channel.basic_publish(
  206.                 exchange='',
  207.                 routing_key=key,
  208.                 body=body,
  209.                 properties=pika.BasicProperties(
  210.                     reply_to=callback_queue,
  211.                     correlation_id=corr_id,
  212.                 )
  213.             )
  214.             # 等待响应
  215.             end = time.time() + timeout
  216.             while time.time() < end:
  217.                 if self.data[corr_id]['isAccept']:
  218.                     self.logger.info("已接收到 RPC 服务器的响应 => {}".format(self.data[corr_id]['result']))
  219.                     return self.data[corr_id]['result']
  220.                 else:
  221.                     time.sleep(0.3)
  222.                     continue
  223.             self.error_logger.error("获取响应超时。")
  224.             return None
  225.         finally:
  226.             channel.close()  # 关闭通道
  227.     def send_json_sync(self, body, key=None):
  228.         if not key:
  229.             raise Exception("The routing key is not present.")
  230.         data = json.dumps(body)
  231.         return self.send_sync(data, key=key)
  232.     def accept(self, key, result):
  233.         self.data[key]['isAccept'] = True
  234.         self.data[key]['result'] = str(result)
  235.         channel = self._get_channel()
  236.         try:
  237.             # 删除回复队列
  238.             channel.queue_delete(queue=self.data[key]['reply_queue_name'])
  239.         finally:
  240.             channel.close()  # 关闭通道
  241.     def on_response(self, ch, method, props, body):
  242.         self.logger.info("接收到响应 => {}".format(body))
  243.         corr_id = props.correlation_id
  244.         self.accept(corr_id, body)
  245.     def register_class(self, rpc_class):
  246.         if not hasattr(rpc_class, 'declare'):
  247.             raise AttributeError("The registered class must contains the declare method")
  248.         self._rpc_class_list.append(rpc_class)
  249.     def _run(self):
  250.         # 注册所有声明的类
  251.         for item in self._rpc_class_list:
  252.             item().declare()
  253.         # 遍历所有在 Queue 中注册的回调函数
  254.         for (type, queue_name, exchange_name, routing_key, version, callback, auto_ack,
  255.              thread_num, heart_beat) in self.queue._rpc_class_list:
  256.             if type == ExchangeType.DEFAULT:
  257.                 if not queue_name:
  258.                     # 如果队列名称为空,则声明一个临时队列
  259.                     queue_name = self.temporary_queue_declare()
  260.                 elif version == 1:
  261.                     self.basic_consuming(queue_name, callback, auto_ack=auto_ack)
  262.                 else:
  263.                     self._channel.queue_declare(queue=queue_name, auto_delete=True)
  264.                     self.basic_consuming(queue_name, callback)
  265.             elif type in [ExchangeType.FANOUT, ExchangeType.DIRECT, ExchangeType.TOPIC]:
  266.                 if not queue_name:
  267.                     # 如果队列名称为空,则声明一个临时队列
  268.                     queue_name = self.temporary_queue_declare()
  269.                 elif version == 1:
  270.                     arguments = {
  271.                         'x-match': type,  # 设置 exchange_type
  272.                         'routing_key': routing_key,  # 设置 routing_key
  273.                     }
  274.                     self.basic_consuming(queue_name, callback, arguments=arguments, auto_ack=auto_ack)
  275.                 else:
  276.                     self._channel.queue_declare(queue=queue_name)
  277.                     self.exchange_bind_to_queue(type, exchange_name, routing_key, queue_name)
  278.                     # 消费队列
  279.                     self.basic_consuming(queue_name, callback)
  280.             # 启动指定数量的线程来处理消息
  281.             for _ in range(thread_num):
  282.                 self.executor.submit(self._start_thread_consumer, queue_name, callback, auto_ack, heart_beat)
  283.         self.logger.info(" * Flask RabbitMQ 应用正在消费中")
  284.     def _start_thread_consumer(self, queue_name, callback, auto_ack, heart_beat=None) -> None:
  285.         channel = self._get_channel(heart_beat)
  286.         channel.basic_qos(prefetch_count=1)
  287.         try:
  288.             channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=auto_ack)
  289.             channel.start_consuming()
  290.         except Exception as e:
  291.             self.error_logger.error(f"Error in consumer thread: {e}")
  292.         finally:
  293.             channel.close()  # 消费完成后关闭通道,但不关闭连接
  294.     def shutdown_executor(self):
  295.         # 关闭线程池
  296.         if self.executor:
  297.             self.logger.info("关闭线程池...")
  298.             self.executor.shutdown(wait=True)  # 等待所有线程完成再关闭
  299.     def close_rabbitmq_connection(self):
  300.         if hasattr(self._thread_local, 'connection') and self._thread_local.connection.is_open:
  301.             self._thread_local.connection.close()
  302.     def signal_handler(self, sig, frame):
  303.         self.logger.info('RabbitMQ开始停止...')
  304.         # 关闭线程池
  305.         self.shutdown_executor()
  306.         # 关闭 RabbitMQ 连接
  307.         self.close_rabbitmq_connection()
  308.         sys.exit(0)
  309.     def run(self):
  310.         # 捕获终止信号以进行优雅关闭
  311.         signal.signal(signal.SIGINT, self.signal_handler)
  312.         signal.signal(signal.SIGTERM, self.signal_handler)
  313.         self._run()
  314. class ExchangeType:
  315.     DEFAULT = 'default'
  316.     DIRECT = "direct"
  317.     FANOUT = "fanout"
  318.     TOPIC = 'topic'
  319. class Queue:
  320.     """
  321.     支持多线程的Queue类
  322.     """
  323.     def __init__(self) -> None:
  324.         self._rpc_class_list = []
  325.         self.app = None
  326.     def __call__(self, queue=None, type=ExchangeType.DEFAULT, version=0, exchange='', routing_key='', auto_ack=False,
  327.                  thread_num=1, heart_beat=60):
  328.         def _(func):
  329.             self._rpc_class_list.append((type, queue, exchange, routing_key, version, func, auto_ack, thread_num, heart_beat))
  330.         return _
  331.     def init_app(self, app=None):
  332.         self.app = app
复制代码
3.怎样使用

  1. @queue(queue='test', type=ExchangeType.DIRECT, exchange='record_exchange',
  2.        routing_key='test', version=1, thread_num=15, heart_beat=300)
  3. def prop_code_signal_callback(ch, method, props, body):
  4.     try:
  5.         data = json.loads(body)
  6.         loger.info(f'prop_code_signal -> data:{data}')
  7.         # 打印当前线程名称
  8.         current_thread = threading.current_thread().name
  9.         loger.info(f'prop_code_signal ->当前线程名称: {current_thread}')
  10.         # 业务逻辑
  11.         # 处理成功,确认消息
  12.         ch.basic_ack(delivery_tag=method.delivery_tag)
  13.         
  14.     except Exception as e:
  15.        loger.error(f"test出现异常: {e}")
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

一给

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表