test_mqtt.py 809 B

1234567891011121314151617181920212223242526272829303132333435
  1. import paho.mqtt.client as mqtt
  2. import time
  3. import json
  4. client = mqtt.Client()
  5. client.connect(host='10.10.61.229', port=41883)
  6. def test_bg_log():
  7. """
  8. mqtt消息注释:
  9. {
  10. "1": "2024-12-12 01:01:21", # 发送时间
  11. "2": 6000, # 方向值
  12. "3": 6000, # 手油门值
  13. "4": 6000, # 脚油门值
  14. "5": 6000 # 刹车值
  15. }
  16. """
  17. data = {
  18. 1: '2024-12-12 01:01:21', # 发送时间
  19. 2: 6000, # 方向值
  20. 3: 6000, # 油门值
  21. 4: 6000, # 刹车值
  22. }
  23. results = client.publish('bg/log', json.dumps(data))
  24. result_code, message_id = results
  25. print(f"#result_code: {result_code}, #message_id: {message_id}")
  26. if __name__ == '__main__':
  27. # --- test ---
  28. while True:
  29. test_bg_log()
  30. time.sleep(10)