xmqtt.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. """
  2. cd /home/server/repositories/repositories/gitee.com/casperz.py-project/module-py/xclient
  3. pip3 install paho-mqtt==1.6.1
  4. python3 xmqtt-a1.py
  5. python3 xmqtt-a2.py
  6. python3 xmqtt.py
  7. pip3 install paho-mqtt==1.6.1 -i hhttps://mirror.baidu.com/pypi/simple
  8. import paho.mqtt.client as mqtt
  9. # MQTT 代理设置
  10. broker_address = "你的代理地址"
  11. broker_port = 1883
  12. username = "admin"
  13. password = "SRIsri123"
  14. # 创建客户端实例
  15. client = mqtt.Client()
  16. # 设置用户名和密码
  17. client.username_pw_set(username, password)
  18. # 连接到 MQTT 代理
  19. client.connect(broker_address, broker_port)
  20. # 可以继续执行 MQTT 操作(发布、订阅等)
  21. """
  22. import paho.mqtt.client as mqtt
  23. import time
  24. import json
  25. class Client(mqtt.Client):
  26. def __init__(self, host='127.0.0.1', port=41883):
  27. super().__init__()
  28. self.connect(host=host, port=port, keepalive=60)
  29. # if subscribe_topic:
  30. # def on_message(client, userdata, message):
  31. # # print(f"#userdata: {userdata}")
  32. # # print(f"#message.payload: {str(message.payload.decode())}")
  33. # print(f"#message.payload: {json.loads(message.payload)}")
  34. # print(f"#message.topic: {message.topic}")
  35. #
  36. # self.on_message = on_message
  37. # self.subscribe(subscribe_topic)
  38. # self.loop_forever()
  39. def start_subscribe_loop(self, decorate_method, subscribe_topic):
  40. """启动订阅循环"""
  41. self.on_message = decorate_method
  42. self.subscribe(subscribe_topic)
  43. self.loop_forever()
  44. def publish_message(self, publish_topic, message):
  45. """发布消息"""
  46. try:
  47. results = self.publish(publish_topic, message)
  48. result_code, message_id = results
  49. """
  50. result_code: 错误码
  51. result_code.0: 成功
  52. result_code.1: 失败
  53. result_code.4: 无法连接到 MQTT 代理
  54. result_code.7: QoS错误
  55. """
  56. print(f"#result_code: {result_code}, #message_id: {message_id}")
  57. except Exception as e:
  58. print(f'#Exception: {e.__class__.__name__}')
  59. if __name__ == '__main__':
  60. # --- init ---
  61. # c1 = Client(host='127.0.0.1', port=41883)
  62. c1 = Client(host='192.168.131.23', port=41883)
  63. # --- test subscribe ---
  64. # def m1(_, __, p3):
  65. # print(f"#message.payload: {json.loads(p3.payload)}")
  66. # c1.start_subscribe_loop(decorate_method=m1, subscribe_topic='test/topic')
  67. # --- test publish ---
  68. # while True:
  69. # data = {
  70. # 'code': 1001,
  71. # '方向': 6000,
  72. # '刹车': 6000,
  73. # '油门': 8000,
  74. # }
  75. # # c1.publish_message('bg/log', json.dumps(data))
  76. # c1.publish_message('hs/vehicle/state', json.dumps(data))
  77. # time.sleep(3)
  78. # --- test publish ---
  79. while True:
  80. data = {
  81. 'address': "192.168.131.180",
  82. 'state': 2,
  83. 'direction': 22,
  84. }
  85. c1.publish_message('hs/vehicle/state', json.dumps(data))
  86. time.sleep(3)