test-mqtt-sender.py 855 B

123456789101112131415161718192021222324252627282930313233343536
  1. import paho.mqtt.client as mqtt
  2. import time
  3. import json
  4. client = mqtt.Client()
  5. # client.connect(host='10.10.61.229', port=41883)
  6. client.connect(host='127.0.0.1', port=41883)
  7. def test_bg_log():
  8. """
  9. mqtt消息注释:
  10. {
  11. "1": "2024-12-12 01:01:21", # 发送时间
  12. "2": 6000, # 方向值
  13. "3": 6000, # 手油门值
  14. "4": 6000, # 脚油门值
  15. "5": 6000 # 刹车值
  16. }
  17. """
  18. data = {
  19. 1: '2024-12-12 01:01:21', # 发送时间
  20. 2: 6000, # 方向值
  21. 3: 6000, # 油门值
  22. 4: 6000, # 刹车值
  23. }
  24. results = client.publish('bg/log', json.dumps(data))
  25. result_code, message_id = results
  26. print(f"#result_code: {result_code}, #message_id: {message_id}")
  27. if __name__ == '__main__':
  28. # --- test ---
  29. while True:
  30. test_bg_log()
  31. time.sleep(3)