xmqtt.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. """
  2. cd /home/server/repositories/repositories/gitee.com/casperz.py-project/module-py/xclient
  3. pip3 install paho-mqtt==1.6.1
  4. python3 xmqtt-a1.py
  5. python3 xmqtt-a2.py
  6. python3 xmqtt.py
  7. pip3 install paho-mqtt==1.6.1 -i hhttps://mirror.baidu.com/pypi/simple
  8. """
  9. import paho.mqtt.client as mqtt
  10. import time
  11. import json
  12. class Client(mqtt.Client):
  13. def __init__(self, host='127.0.0.1', port=41883):
  14. super().__init__()
  15. self.connect(host=host, port=port, keepalive=60)
  16. # if subscribe_topic:
  17. # def on_message(client, userdata, message):
  18. # # print(f"#userdata: {userdata}")
  19. # # print(f"#message.payload: {str(message.payload.decode())}")
  20. # print(f"#message.payload: {json.loads(message.payload)}")
  21. # print(f"#message.topic: {message.topic}")
  22. #
  23. # self.on_message = on_message
  24. # self.subscribe(subscribe_topic)
  25. # self.loop_forever()
  26. def start_subscribe_loop(self, decorate_method, subscribe_topic):
  27. """启动订阅循环"""
  28. self.on_message = decorate_method
  29. self.subscribe(subscribe_topic)
  30. self.loop_forever()
  31. def publish_message(self, publish_topic, message):
  32. """发布消息"""
  33. try:
  34. results = self.publish(publish_topic, message)
  35. result_code, message_id = results
  36. """
  37. result_code: 错误码
  38. result_code.0: 成功
  39. result_code.1: 失败
  40. result_code.4: 无法连接到 MQTT 代理
  41. result_code.7: QoS错误
  42. """
  43. print(f"#result_code: {result_code}, #message_id: {message_id}")
  44. except Exception as e:
  45. print(f'#Exception: {e.__class__.__name__}')
  46. if __name__ == '__main__':
  47. # --- init ---
  48. # c1 = Client(host='127.0.0.1', port=41883)
  49. c1 = Client(host='192.168.131.23', port=41883)
  50. # --- test subscribe ---
  51. # def m1(_, __, p3):
  52. # print(f"#message.payload: {json.loads(p3.payload)}")
  53. # c1.start_subscribe_loop(decorate_method=m1, subscribe_topic='test/topic')
  54. # --- test publish ---
  55. # while True:
  56. # data = {
  57. # 'code': 1001,
  58. # '方向': 6000,
  59. # '刹车': 6000,
  60. # '油门': 8000,
  61. # }
  62. # # c1.publish_message('bg/log', json.dumps(data))
  63. # c1.publish_message('hs/vehicle/state', json.dumps(data))
  64. # time.sleep(3)
  65. # --- test publish ---
  66. while True:
  67. data = {
  68. 'address': "192.168.131.180",
  69. 'state': 2,
  70. 'direction': 22,
  71. }
  72. c1.publish_message('hs/vehicle/state', json.dumps(data))
  73. time.sleep(3)