MessageListener.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. from hub import methods, Global
  2. import threading
  3. import time
  4. import json
  5. import os
  6. # --- for linux
  7. # save_dir = f"/home/server/logs"
  8. # --- for windows
  9. # save_dir = r'C:\SRI-DINO.Server-py\logs' # sri内网测试环境
  10. save_dir = r'C:\logs' # sri内网测试环境
  11. class MessageListener(object):
  12. """
  13. """
  14. log_key = str()
  15. @staticmethod
  16. def v001(client, userdata, message):
  17. """消息处理方法"""
  18. # --- log ---
  19. # print(f'MessageListener24: #message.payload: {message.payload}', flush=True)
  20. # methods.debug_log(f"MessageListener24", f"#message.payload: {message.payload}")
  21. # --- save log ---
  22. file_name = methods.now_string('v001-%Y-%m-%d-%H.log')
  23. log_file_path = os.path.join(save_dir, file_name)
  24. log_dict = json.loads(message.payload)
  25. log_list = [
  26. f"timestamp: {methods.ts_to_string(int(log_dict.get('timestamp'))/1000)}", # 时间
  27. f"userID: {log_dict.get('userID')}", # 用户id
  28. f"VehicleID: {log_dict.get('VehicleID')}", # 车id
  29. f"directSwitch: {log_dict.get('baseControl')[0].get('directSwitch')}", # 前后切换
  30. f"eStop: {log_dict.get('eStop')[0].get('eStop')}", # 急停开关
  31. f"gearCaontrol: {log_dict.get('gearCaontrol')[0].get('gearCaontrol')}", # 车辆档位控制
  32. f"hazardLight: {log_dict.get('hazardLight')[0].get('hazardLight')}", # 双闪灯
  33. ]
  34. # --- check ---
  35. if MessageListener.log_key and MessageListener.log_key == '-'.join(log_list[1:]):
  36. return
  37. # --- check --
  38. if not MessageListener.log_key:
  39. MessageListener.log_key = '-'.join(log_list[1:])
  40. print(f'MessageListener24: #log_list: {log_list}', flush=True)
  41. methods.write_text(log_file_path, ' | '.join(log_list) + '\n', 'a')
  42. @classmethod
  43. def subscribe001(cls):
  44. """"""
  45. # --- check ---
  46. # if not methods.is_dir(save_dir):
  47. # out = methods.run_command(f'mkdir -p {save_dir}', callback=True)
  48. # methods.debug_log('MessageListener46', f"#out: {out}")
  49. Global.emqx.start_subscribe_loop(
  50. decorate_method=MessageListener.v001,
  51. # subscribe_topic='bg/log'
  52. subscribe_topic='Vehicle/ControlVehicle/Veh001'
  53. )
  54. # @classmethod
  55. # def run_background(cls, background_is=True):
  56. # """"""
  57. # p1 = threading.Thread(target=cls.subscribe001)
  58. # p1.start()
  59. @classmethod
  60. def run(cls, background_is=True):
  61. thread_list = [
  62. threading.Thread(target=cls.subscribe001),
  63. ]
  64. for thread in thread_list:
  65. thread.setDaemon(True)
  66. thread.start()
  67. if background_is:
  68. return
  69. for thread in thread_list:
  70. thread.join()
  71. if __name__ == '__main__':
  72. # --- test ---
  73. MessageListener.run()