123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- from hub import methods, Global
- import threading
- import time
- import json
- import os
- # --- for linux
- # save_dir = f"/home/server/logs"
- # --- for windows
- # save_dir = r'C:\SRI-DINO.Server-py\logs' # sri内网测试环境
- save_dir = r'C:\temp' # sri内网测试环境
- class MessageListener(object):
- """
- """
- @staticmethod
- def v001(client, userdata, message):
- """消息处理方法"""
- # --- log ---
- # print(f'MessageListener24: #message.payload: {message.payload}', flush=True)
- # methods.debug_log(f"MessageListener24", f"#message.payload: {message.payload}")
- # --- save log ---
- # file_name = methods.now_string('%Y-%m-%d-%H.log')
- # log_file_path = os.path.join(save_dir, file_name)
- # methods.write_text(f'{message.payload}\n', 'a')
- # --- save log ---
- # file_name = methods.now_string('%Y-%m-%d-%H.log')
- # log_file_path = os.path.join(save_dir, file_name)
- log_dict = json.loads(message.payload)
- # log_list = list()
- log_time = int(log_dict.get('timestamp'))/1000
- log_time = methods.ts_to_string(log_time)
- methods.debug_log(f"MessageListener24", f"#log_time: {log_time}")
- # print(f'MessageListener24: #log_time: {log_time}', flush=True)
- # # item_count = 8 # 小车项目
- # item_count = 29 # 湛江项目
- # for i in range(1, item_count + 1):
- # v = str(log_dict.get(str(i)))
- # log_list.append(v)
- # methods.write_text(log_file_path, '|'.join(log_list) + '\n', 'a')
- @classmethod
- def start_check_loop(cls):
- """"""
- # --- check ---
- # if not methods.is_dir(save_dir):
- # out = methods.run_command(f'mkdir -p {save_dir}', callback=True)
- # methods.debug_log('MessageListener46', f"#out: {out}")
- Global.emqx.start_subscribe_loop(
- decorate_method=MessageListener.v001,
- # subscribe_topic='bg/log'
- subscribe_topic='Vehicle/ControlVehicle/Veh001'
- )
- # @classmethod
- # def run_background(cls, background_is=True):
- # """"""
-
- # p1 = threading.Thread(target=cls.start_check_loop)
- # p1.start()
- @classmethod
- def run(cls, background_is=True):
- thread_list = [
- threading.Thread(target=cls.start_check_loop),
- ]
- for thread in thread_list:
- thread.setDaemon(True)
- thread.start()
- if background_is:
- return
- for thread in thread_list:
- thread.join()
- if __name__ == '__main__':
- # --- test ---
- MessageListener.run()
|