test_mqtt.py 1.1 KB

12345678910111213141516171819202122232425262728293031
  1. import paho.mqtt.client as mqtt
  2. import time
  3. import json
  4. client = mqtt.Client()
  5. client.connect(host='192.168.131.23', port=41883)
  6. def test(topic="hs/vehicle/state"):
  7. data = {
  8. "address": "192.168.131.180", # 车辆ip
  9. "state": 2, # 车辆状态 1 离线 2 在线空闲 3 人工驾驶中 4 远程驾驶中 5 自动驾驶中
  10. "direction": 12, # 车头方向(场地坐标偏转角度)
  11. }
  12. return topic, client.publish(topic, json.dumps(data))
  13. # def test(topic="hs/pot/data"):
  14. # data = {
  15. # "address": "192.168.131.180", # 车辆ip
  16. # "state": 2, # 车辆状态 1 离线 2 在线空闲 3 人工驾驶中 4 远程驾驶中 5 自动驾驶中
  17. # "direction": 12, # 车头方向(场地坐标偏转角度)
  18. # }
  19. # return topic, client.publish(topic, json.dumps(data))
  20. if __name__ == '__main__':
  21. # --- test ---
  22. while True:
  23. topic, results = test()
  24. result_code, message_id = results
  25. print(f"#topic: {topic}, #result_code: {result_code}, #message_id: {message_id}")
  26. time.sleep(3)