MessageListener.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. from hub import methods, Global
  2. import threading
  3. import time
  4. import json
  5. import os
  6. # --- for linux
  7. # save_dir = f"/home/server/logs"
  8. # --- for windows
  9. # save_dir = r'C:\SRI-DINO.Server-py\logs' # sri内网测试环境
  10. save_dir = r'C:\temp' # sri内网测试环境
  11. class MessageListener(object):
  12. """
  13. """
  14. @staticmethod
  15. def v001(client, userdata, message):
  16. """消息处理方法"""
  17. # --- log ---
  18. # print(f'MessageListener24: #message.payload: {message.payload}', flush=True)
  19. # methods.debug_log(f"MessageListener24", f"#message.payload: {message.payload}")
  20. # --- save log ---
  21. # file_name = methods.now_string('%Y-%m-%d-%H.log')
  22. # log_file_path = os.path.join(save_dir, file_name)
  23. # methods.write_text(f'{message.payload}\n', 'a')
  24. # --- save log ---
  25. # file_name = methods.now_string('%Y-%m-%d-%H.log')
  26. # log_file_path = os.path.join(save_dir, file_name)
  27. log_dict = json.loads(message.payload)
  28. # log_list = list()
  29. log_time = int(log_dict.get('timestamp'))/1000
  30. log_time = methods.ts_to_string(log_time)
  31. methods.debug_log(f"MessageListener24", f"#log_time: {log_time}")
  32. # print(f'MessageListener24: #log_time: {log_time}', flush=True)
  33. # # item_count = 8 # 小车项目
  34. # item_count = 29 # 湛江项目
  35. # for i in range(1, item_count + 1):
  36. # v = str(log_dict.get(str(i)))
  37. # log_list.append(v)
  38. # methods.write_text(log_file_path, '|'.join(log_list) + '\n', 'a')
  39. @classmethod
  40. def start_check_loop(cls):
  41. """"""
  42. # --- check ---
  43. # if not methods.is_dir(save_dir):
  44. # out = methods.run_command(f'mkdir -p {save_dir}', callback=True)
  45. # methods.debug_log('MessageListener46', f"#out: {out}")
  46. Global.emqx.start_subscribe_loop(
  47. decorate_method=MessageListener.v001,
  48. # subscribe_topic='bg/log'
  49. subscribe_topic='Vehicle/ControlVehicle/Veh001'
  50. )
  51. # @classmethod
  52. # def run_background(cls, background_is=True):
  53. # """"""
  54. # p1 = threading.Thread(target=cls.start_check_loop)
  55. # p1.start()
  56. @classmethod
  57. def run(cls, background_is=True):
  58. thread_list = [
  59. threading.Thread(target=cls.start_check_loop),
  60. ]
  61. for thread in thread_list:
  62. thread.setDaemon(True)
  63. thread.start()
  64. if background_is:
  65. return
  66. for thread in thread_list:
  67. thread.join()
  68. if __name__ == '__main__':
  69. # --- test ---
  70. MessageListener.run()