test-mqtt-sender.py 1.0 KB

123456789101112131415161718192021222324252627282930313233343536373839
  1. import paho.mqtt.client as mqtt
  2. import time
  3. import json
  4. client = mqtt.Client()
  5. # client.connect(host='10.10.61.229', port=41883)
  6. client.connect(host='10.10.10.116', port=41883)
  7. # client.connect(host='10.10.10.73', port=41883)
  8. # client.connect(host='127.0.0.1', port=41883) # 本地环境
  9. def test_bg_log():
  10. """
  11. mqtt消息注释:
  12. {
  13. "1": "2024-12-12 01:01:21", # 发送时间
  14. "2": 6000, # 方向值
  15. "3": 6000, # 手油门值
  16. "4": 6000, # 脚油门值
  17. "5": 6000 # 刹车值
  18. }
  19. """
  20. data = {
  21. 1: '2024-12-12 01:01:21', # 发送时间
  22. 2: 6000, # 方向值
  23. 3: 6000, # 油门值
  24. 4: 6000, # 刹车值
  25. }
  26. # results = client.publish('bg/log', json.dumps(data))
  27. results = client.publish('qtmqtt', json.dumps(data))
  28. result_code, message_id = results
  29. print(f"#result_code: {result_code}, #message_id: {message_id}")
  30. if __name__ == '__main__':
  31. # --- test ---
  32. while True:
  33. test_bg_log()
  34. time.sleep(3)