test-mqtt-sender.py 920 B

12345678910111213141516171819202122232425262728293031323334353637
  1. import paho.mqtt.client as mqtt
  2. import time
  3. import json
  4. client = mqtt.Client()
  5. # client.connect(host='10.10.61.229', port=41883)
  6. client.connect(host='127.0.0.1', port=41883) # 本地环境
  7. # client.connect(host='10.10.10.73', port=41883)
  8. def test_bg_log():
  9. """
  10. mqtt消息注释:
  11. {
  12. "1": "2024-12-12 01:01:21", # 发送时间
  13. "2": 6000, # 方向值
  14. "3": 6000, # 手油门值
  15. "4": 6000, # 脚油门值
  16. "5": 6000 # 刹车值
  17. }
  18. """
  19. data = {
  20. 1: '2024-12-12 01:01:21', # 发送时间
  21. 2: 6000, # 方向值
  22. 3: 6000, # 油门值
  23. 4: 6000, # 刹车值
  24. }
  25. results = client.publish('bg/log', json.dumps(data))
  26. result_code, message_id = results
  27. print(f"#result_code: {result_code}, #message_id: {message_id}")
  28. if __name__ == '__main__':
  29. # --- test ---
  30. while True:
  31. test_bg_log()
  32. time.sleep(3)