ToB企服应用市场:ToB评测及商务社交产业平台

标题: Python 客户端类库之paho-mqtt学习总结 [打印本页]

作者: 何小豆儿在此    时间: 2024-9-22 20:59
标题: Python 客户端类库之paho-mqtt学习总结
实践环境

Python 3.9.13
paho-mqtt 2.1.0
简介

Eclipse Paho MQTT Python客户端类库实现了MQTT 协议版本 5.0, 3.1.1, 和3.1。
该类库提供一个客户端类,允许应用连接到MQTT代理并发布消息,订阅主题并检索发布的消息。同时还提供了一个写其它辅助函数,使向MQTT服务器发布一次性消息变得非常简单。
支持 Python 3.7+。
MQTT协议是一种机器对机器(M2M)/“物联网”连接协议。它被计划为一种极其轻量级的发布/订阅消息传输,对于需要小代码占用和/或网络带宽非常昂贵的远程连接非常有用。
安装
  1. pip install paho-mqtt
复制代码
已知限定

以下是已知的未实现的MQTT功能。
当clean_session为False时,会话仅存储在内存中,不会持久化。这意味着当客户端重新启动时(不仅仅是重新连接,通常是因为程序重新启动而重新创建对象),会话就会丢失。这可能会导致消息丢失。
客户端会话的以下部分丢失:
此外,当clean_session为True时,此类库将在网络重新连接时重新发布 QoS > 0消息。这意味着 QoS > 0消息不会丢失。但尺度规定,我们应该抛弃发送发布包的任何消息。设置为True意味着不符合尺度,QoS 2 可能会被吸收两次。
假如只需要一次交付的 QoS 2 保证,则应设置clean_session=False。
用法与API

API详细在线文档:https://eclipse.dev/paho/files/paho.mqtt.python/html/client.html
示例:https://github.com/eclipse/paho.mqtt.python/tree/master/examples
开始

下面是一个非常简单的示例,它订阅代理$SYS主题树并打印出效果消息:
  1. # -*- coding:utf-8 -*-
  2. import paho.mqtt.client as mqtt
  3. def on_connect(client, userdata, flags, reason_code, properties):
  4.     '''客户端从服务器接收到 CONNACK 响应时的回调'''
  5.     print(f"Connected with result code {reason_code}")  # 成功连接时 reason_code 值为 Success
  6.     # 在on_connect()中执行订阅操作,意味着如果应用失去连接并且重新连接后,订阅将被续订。
  7.     if reason_code == 'Success':
  8.         client.subscribe('$SYS/#')
  9. def on_disconnect(client, userdata, flags, reason_code, properties):
  10.     print(f'Disconnected with result code {reason_code}')
  11. def on_message(client, userdata, msg):
  12.     '''从服务器收到 PUBLISH 消息时的回调。'''
  13.     print(msg.topic + ' ' + str(msg.payload)) # 输出值形如 $SYS/broker/version b'mosquitto version 2.0.18'
  14. mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
  15. mqttc.on_connect = on_connect
  16. mqttc.on_disconnect = on_disconnect
  17. mqttc.on_message = on_message
  18. # client.username_pw_set('testacc', 'test1234') # 设置访问账号和密码
  19. mqttc.connect("mqtt.eclipseprojects.io", 1883, 60)
  20. # 阻塞调用,处理网络流量、分派回调和处理重新连接
  21. # 有其它提供线程接口和手动接口的loop*()函数可用
  22. mqttc.loop_forever()
复制代码
说明:
客户端(Client)

Client类一般使用流程如下:
将调用回调以允许应用程序根据需要处置惩罚事件。这些回调如下所述。
网络循环

这些功能是Client背后的驱动力。假如它们没有被调用,传入的网络数据将不会被处置惩罚,传出的网络数据也不会被发送。管理网络环路有四种选择。这里描述了三个,第四个在下面的“外部事件循环支持”中描述。不要混淆使用不同的loop函数。
loop_start() / loop_stop()
  1. mqttc.loop_start()
  2. while True:
  3.     temperature = sensor.blocking_read()
  4.     mqttc.publish("paho/temperature", temperature)
  5. mqttc.loop_stop()
复制代码
这些函数实现了网络循环的线程接口。在connect*()之前或之后调用loop_start()一次,会在后台运行一个线程来自动调用loop()。这释放了主线程,用于可能壅闭的其他工作。此调用还处置惩罚与代理的重新连接。调用loop_stop() 以停止后台线程。假如调用disconnect(),循环也会停止。
loop_forever()
  1. mqttc.loop_forever(retry_first_connection=False)
复制代码
这是网络循环的壅闭形式,在客户端调用disconnect()之前不会返回(即调用mqttc.disconnect()后会停止壅闭,继续运行其后的代码)。它会自动处置惩罚重新连接。
除了使用connect_async时的第一次连接尝试外,使用retry_first_connection=True 使其重试第一次连接。
告诫:这可能会导致客户端保持连接到不存在的主机而不会出现失败。
loop()
  1. run = True
  2. while run:
  3.     rc = mqttc.loop(timeout=1.0)
  4.     if rc != 0:
  5.         # need to handle error, possible reconnecting or stopping the application
复制代码
定期调用以处置惩罚网络事件。此调用触发select()等候,直到网络套接字可用于读取或写入,假如套接字可用,则处置惩罚流入/流出的数据。此函数最多壅闭timeout秒。timeout不能超过客户端的keepalive值,否则代剖析定期断开客户端的连接。
使用这种循环,需要本身处置惩罚重新连接计谋。
回调

与paho-mqtt交互的接口包括各种回调,当发生某些事件时,类库会调用这些回调。
回调是在代码中定义的函数,用于实现对这些事件要求的操作。这可能只是打印收到的消息,也可能是更复杂的行为。
回调API是有版本的,所选版本是我们提供给客户端构造函数的CallbackAPIVersion。目前支持两个版本:
存在以下回调:
参阅在线文档查看有关每个回调的特征。
订阅示例
  1. # -*- coding:utf-8 -*-
  2. import paho.mqtt.client as mqtt
  3. def on_subscribe(client, userdata, mid, reason_code_list, properties):
  4.     # 由于我们只订阅了一个信道,reason_code_list只包含一个条目
  5.     # print(reason_code_list) #输出: [ReasonCode(Suback, 'Granted QoS 0')]
  6.     if reason_code_list[0].is_failure:
  7.         print(f"Broker rejected you subscription: {reason_code_list[0]}")
  8.     else:
  9.         print(f"Broker granted the following QoS: {reason_code_list[0].value}")
  10. def on_unsubscribe(client, userdata, mid, reason_code_list, properties):
  11.     #注意,reason_code_list仅存在于MQTTv5中,在MQTTv3中,它将始终为空
  12.     if len(reason_code_list) == 0 or not reason_code_list[0].is_failure:
  13.         print("unsubscribe succeeded (if SUBACK is received in MQTTv3 it success)")
  14.     else:
  15.         print(f"Broker replied with failure: {reason_code_list[0]}")
  16.     client.disconnect()
  17. def on_message(client, userdata, message):
  18.     # userdata是我们选择提供的数据结构,这里为一个列表(通过下方的 mqttc.user_data_set([])设置,该函数参数即为userdata参数值
  19.     userdata.append(message.payload)
  20.     # 假设只想处理10条消息
  21.     if len(userdata) >= 10:
  22.         client.unsubscribe("$SYS/#")
  23. def on_connect(client, userdata, flags, reason_code, properties):
  24.     if reason_code.is_failure:
  25.         print(f"Failed to connect: {reason_code}. loop_forever() will retry connection")
  26.     else:
  27.         # 应该始终在 on_connect 回调中订阅以确保在重新连接时订阅依旧存在。
  28.         client.subscribe("$SYS/#")
  29. mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
  30. mqttc.on_connect = on_connect
  31. mqttc.on_message = on_message
  32. mqttc.on_subscribe = on_subscribe
  33. mqttc.on_unsubscribe = on_unsubscribe
  34. mqttc.user_data_set([]) # 设置 userdata
  35. mqttc.connect("mqtt.eclipseprojects.io")
  36. mqttc.loop_forever() # 当调用client.disconnect()后继续执行以下代码
  37. print(f"Received the following message: {mqttc.user_data_get()}")
复制代码
发布示例
  1. # -*- coding:utf-8 -*-
  2. import time
  3. import paho.mqtt.client as mqtt
  4. def on_publish(client, userdata, mid, reason_code, properties):
  5.     '''reason_code和properties将仅出现在MQTTv5中。在MQTTv3中始终未设置
  6.     使用不存在`uncaked_publish`中的`mid`调用`on_publish()`。这是由于不可避免的竞争情形:
  7.     * publish() 返回已发送消息的mid。
  8.     * 主线程将publish()返回的mid添加到uncaked_publish中
  9.     * loop_start线程调用on_publish()
  10.     虽然不太可能(因为on_publish()将在网络往返后调用),但是这是一种可能发生的竞争情形
  11.     避免竞争情形的最佳解决方案是使用publish()中的msg_info。还可以尝试使用已确认的mid列表,而不是从待处理列表中删除
  12.     但是请记住,mid可以重复使用!
  13.     reason_code和properties将仅出现在MQTTv5中。在MQTTv3中始终未设置
  14.     '''
  15.     try:
  16.         userdata.remove(mid)
  17.     except KeyError:
  18.         print("on_publish() is called with a mid not present in unacked_publish")
  19.         print("This is due to an unavoidable race-condition:")
  20.         print("* publish() return the mid of the message sent.")
  21.         print("* mid from publish() is added to unacked_publish by the main thread")
  22.         print("* on_publish() is called by the loop_start thread")
  23.         print("While unlikely (because on_publish() will be called after a network round-trip),")
  24.         print(" this is a race-condition that COULD happen")
  25.         print("")
  26.         print("The best solution to avoid race-condition is using the msg_info from publish()")
  27.         print("We could also try using a list of acknowledged mid rather than removing from pending list,")
  28.         print("but remember that mid could be re-used !")
  29. unacked_publish = set()
  30. mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
  31. mqttc.on_publish = on_publish
  32. mqttc.user_data_set(unacked_publish)
  33. mqttc.connect("mqtt.eclipseprojects.io")
  34. mqttc.loop_start()
  35. # 应用生产一些消息
  36. msg_info = mqttc.publish("paho/test/topic", "my message", qos=1)
  37. unacked_publish.add(msg_info.mid)
  38. msg_info2 = mqttc.publish("paho/test/topic", "my message2", qos=1)
  39. unacked_publish.add(msg_info2.mid)
  40. # 等待所有消息被发布
  41. while len(unacked_publish):
  42.     time.sleep(0.1)
  43. # 由于上述描述的竞争状态, 以下等待所有消息发布完成的方式更安全
  44. msg_info.wait_for_publish()
  45. msg_info2.wait_for_publish()
  46. mqttc.disconnect()
  47. mqttc.loop_stop()
复制代码
说明:
Logger

客户端会发出一些日志消息,这些消息在故障清除过程中可能很有用。启用日志最简单的方法是调用enable_logger()。可以提供自定义记录器或使用默认记录器
示例:
  1. import logging
  2. import paho.mqtt.client as mqtt
  3. logging.basicConfig(level=logging.DEBUG)
  4. mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
  5. mqttc.enable_logger()
  6. mqttc.connect("mqtt.eclipseprojects.io", 1883, 60)
  7. mqttc.loop_start()
  8. # Do additional action needed, publish, subscribe, ...
  9. [...]
复制代码
还可以定义一个on_log回调,它将吸收所有日志消息的副本。例子:
  1. import paho.mqtt.client as mqtt
  2. def on_log(client, userdata, paho_log_level, messages):
  3.     if paho_log_level == mqtt.LogLevel.MQTT_LOG_ERR:
  4.         print(message)
  5. mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
  6. mqttc.on_log = on_log
  7. mqttc.connect("mqtt.eclipseprojects.io", 1883, 60)
  8. mqttc.loop_start()
  9. # Do additional action needed, publish, subscribe, ...
  10. [...]
复制代码
Paho日志级别和尺度日志级别的对应关系如下:
PahologgingMQTT_LOG_ERRlogging.ERRORMQTT_LOG_WARNINGlogging.WARNINGMQTT_LOG_NOTICElogging.INFO (no direct equivalent)MQTT_LOG_INFOlogging.INFOMQTT_LOG_DEBUGlogging.DEBUG外部事件循环支持

为了支持其他网络循环,如asyncio(拜见示例),类库公开了一些方法和回调来支持这些用例。
存在以下循环方法:
用伪代码表现如下:
  1. while run:
  2.     if need_read:
  3.         mqttc.loop_read()
  4.     if need_write:
  5.         mqttc.loop_write()
  6.     mqttc.loop_misc()
  7.     if not need_read and not need_write:
  8.         # But don't wait more than few seconds, loop_misc() need to be called regularly
  9.         wait_for_change_in_need_read_or_write()
  10.     updated_need_read_and_write()
复制代码
棘手的部分是实现updated_need_read_and_write并等候条件变更。为了支持这一点,存在以下方法:
回调总是按以下次序调用:
全局辅助函数

客户端模块还提供了一些全局辅助函数。
topic_matches_sub(sub, topic)可用于检查主题(topic)是否与订阅(subscription)匹配。
比方:
主题foo/bar 将与订阅foo/#或+/bar匹配
主题non/matching 将不匹配订阅non/+/+
发布

此模块提供了两个辅助函数single()和multiple(),允许以一次性方式直接发布消息。换句话说,它们对于有一个/多个消息要发布到代理,然后断开连接而不需要其他任何东西的情况非常有用。
提供的两个函数是single()和multiple()。
这两个函数都支持MQTT v5.0,但目前不允许在连接或发送消息时设置任何属性。
Single

发布一条消息到代理,然后彻底断开连接。
例子:
  1. import paho.mqtt.publish as publish
  2. publish.single("paho/test/topic", "payload", hostname="mqtt.eclipseprojects.io")
复制代码
Multiple

发布多条消息到代理,然后彻底断开连接。
例子:
  1. from paho.mqtt.enums import MQTTProtocolVersion
  2. import paho.mqtt.publish as publish
  3. msgs = [{'topic':"paho/test/topic", 'payload':"multiple 1"},
  4.     ("paho/test/topic", "multiple 2", 0, False)]
  5. publish.multiple(msgs, hostname="mqtt.eclipseprojects.io", protocol=MQTTProtocolVersion.MQTTv5)
复制代码
订阅

此模块提供了两个辅助函数simple()和callback(),以允许直接订阅和处置惩罚消息。
这两个函数都支持MQTT v5.0,但目前不允许在连接或发送消息时设置任何属性。
Simple

订阅一组主题并返回收到的消息。这是一个壅闭函数。
例子:
  1. import paho.mqtt.subscribe as subscribe
  2. msg = subscribe.simple("paho/test/topic", hostname="mqtt.eclipseprojects.io")
  3. print("%s %s" % (msg.topic, msg.payload))
复制代码
使用回调(Callback)

订阅一组主题,并使用用户提供的回调处置惩罚收到的消息。
例子:
  1. import paho.mqtt.subscribe as subscribe
  2. def on_message_print(client, userdata, message):
  3.     print("%s %s" % (message.topic, message.payload))
  4.     userdata["message_count"] += 1
  5.     if userdata["message_count"] >= 5:
  6.         # it's possible to stop the program by disconnecting
  7.         client.disconnect()
  8. subscribe.callback(on_message_print, "paho/test/topic", hostname="mqtt.eclipseprojects.io", userdata={"message_count": 0})
复制代码
参考连接

https://github.com/eclipse/paho.mqtt.python
https://eclipse.dev/paho/files/paho.mqtt.python/html/client.html

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4