Browse Source

周五:解决win环境下部署的问题;完成log记录测试;完成定时log清理测试

casper 7 months ago
parent
commit
c4ee377f87

+ 2 - 0
sri-server-bg01/README-usage-win.bash

@@ -4,6 +4,8 @@
 pip download -i https://pypi.tuna.tsinghua.edu.cn/simple -r requirements-win.txt -d C:\temp\sri-server-bg01
 # 离线安装
 pip install --no-index --find-links=C:\temp\sri-server-bg01 -r requirements-win.txt
+# 验证
+python main.py
 
 # 在线安装
 pip install -r requirements-win.txt -i https://pypi.tuna.tsinghua.edu.cn/simple

+ 10 - 8
sri-server-bg01/requirements-win.txt

@@ -13,11 +13,13 @@ influxdb==5.3.1
 pymysql==0.9.3
 SQLAlchemy==1.4.30
 # --- for server ---
-aiofiles==23.2.1
-python-multipart==0.0.6
-starlette==0.32.0
-fastapi==0.108.0
-fastapi-login==1.9.2
-uvicorn==0.13.3
-werkzeug==3.0.1
-itsdangerous==1.1.0
+aiofiles==0.6.0 
+python-multipart==0.0.5 
+starlette==0.13.6 
+fastapi==0.64.0
+fastapi-login==1.5.2 
+supervisor==4.2.1 
+uvicorn==0.13.3 
+werkzeug==1.0.1
+itsdangerous==1.1.0
+cryptography==40.0.2

+ 2 - 0
sri-server-bg02/README-usage-win.bash

@@ -4,6 +4,8 @@
 pip download -i https://pypi.tuna.tsinghua.edu.cn/simple -r requirements-win.txt -d C:\temp\sri-server-bg02
 # 离线安装
 pip install --no-index --find-links=C:\temp\sri-server-bg02 -r requirements-win.txt
+# 验证
+python main.py
 
 # 在线安装
 pip install -r requirements-win.txt -i https://pypi.tuna.tsinghua.edu.cn/simple

+ 227 - 0
sri-server-bg02/app.py

@@ -0,0 +1,227 @@
+# note: https://responder.kennethreitz.org/en/latest/search.html?q=resp.content
+from hub import methods
+
+import traceback
+import requests
+import importlib
+
+
+def generate_app():
+    """"""
+    # from settings import action_methods
+    from lib.LineManage import LineManage
+
+    # see: https://lzomedia.com/blog/host-fastapi-backend-api-and-react-app-frontend-locally/
+    # --- define middleware --- see: https://www.starlette.io/middleware/
+    # from starlette.applications import Starlette
+    # from starlette.middleware import Middleware
+    # # from starlette.middleware.httpsredirect import HTTPSRedirectMiddleware
+    # # from starlette.middleware.trustedhost import TrustedHostMiddlewar
+    # middleware = [
+    #     Middleware(
+    #         TrustedHostMiddleware,
+    #         allow_credentials=True,
+    #         allowed_hosts=['*'],
+    #         allow_methods=["*"],
+    #         allow_headers=["*"],
+    #     ),
+    #     # Middleware(HTTPSRedirectMiddleware)
+    # ]
+
+    # --- define app --- see: https://responder.kennethreitz.org/en/latest/tour.html#cors
+    import responder
+    app = responder.API(
+        # cors=True,
+        # allow_methods=['*'],
+        # allow_credentials=True,
+    )
+
+    async def coroutine_method(method, _params):
+        """协程方法"""
+        # run_at = time.time()
+        result = await method(**_params)
+        # methods.debug_log(f"app.coroutine_method", f"use time {round(time.time() - run_at, 2)}s")
+        return result
+
+    def foreground_method(method, _params):
+        """等待返回"""
+        # run_at = time.time()
+        result = method(**_params)
+        # methods.debug_log(f"app.foreground_method", f"use time {round(time.time() - run_at, 2)}s")
+        return result
+
+    @app.background.task
+    def background_method(method, _params):
+        """不等返回"""
+        # run_at = time.time()
+        result = method(**_params)
+        # methods.debug_log(f"app.background_method", f"use time {round(time.time() - run_at, 2)}s")
+        return result
+
+    async def run_method(code, _params):
+
+        config_dict = {
+            7000: ('v1', 'f7000', 'IsAsync'),
+            70000: ('v1', 'f70000', 'IsBack'),
+        }
+
+        if code not in config_dict:
+            return dict(code=2, detail=f"{code} not found!")
+        else:
+
+            script_name, method_name, method_type = config_dict[code]
+            script = importlib.import_module(f"api.httpapi.{script_name}")
+            method = getattr(script, method_name)
+            del _params['code']
+
+            if method_type == 'IsAsync':
+                return await coroutine_method(method, _params)
+            elif method_type == 'IsBack':
+                return foreground_method(method, _params)
+            elif method_type == 'NoBack':
+                return background_method(method, _params)
+
+    @app.route('/httpapi')
+    async def httpapi(request, response):
+        try:
+            # --- fill ---
+            if request.params:
+                params = dict(request.params.items())
+            else:
+                params = await request.media()
+
+            # --- debug ---
+            methods.debug_log('app.httpapi92', f"#params: {params}")
+
+            # --- check ---
+            if 'code' not in params:
+                response.media = dict(code=-1, reason="error98", detail=f"参数缺失")
+            else:
+                code = params.get('code')
+                result = await run_method(code, params)
+                methods.debug_log('app.httpapi102', f"#result: {result}")
+                if result.__class__.__name__ == 'bytes':
+                    response.content = result
+                elif result.__class__.__name__ == 'str':
+                    response.text = result
+                elif result.__class__.__name__ == 'dict':
+                    response.media = result
+                elif result.__class__.__name__ == 'Future':
+                    response.media = dict(code=0, detail=f"{result} is running.")
+                elif not result:
+                    response.media = dict(code=0, detail=f"return is null.")
+                else:
+                    response.media = dict(code=0, detail=f"return type is {result.__class__.__name__}.")
+
+        except Exception as exception:
+
+            methods.debug_log("app.httpapi118", f"#exception: {exception.__class__.__name__}")
+            methods.debug_log("app.httpapi118", f"#traceback: {traceback.format_exc()}")
+            response.media = dict(code=-1, reason=exception.__class__.__name__, detail=f"{methods.trace_log()}")
+
+    @app.route('/wsapi', websocket=True)
+    async def wsapi(client):
+        """
+        let wsuri = `${location.protocol === 'https' ? 'wss' : 'ws'}://${location.host}:8801/wsapi`;
+        connect_json_text = {
+            line_id: 连接id,暂为token
+        }
+        )
+        send_json_text_1 = {
+            face_uuid: 匹配到的人脸id
+            face_name: 人脸名称
+            input_face_b64: 抓拍图片(b64格式字符串)
+        }
+        """
+        try:
+            await client.accept()
+            while True:
+
+                # --- check ---
+                data = await client.receive_text()  # 消息接受方法 receive_{text/json/bytes}
+                if not data:
+                    methods.debug_log('app.wsapi140', f"#reason: error140")
+                    await client.send_json({'code': -1, 'reason': 'error140'})
+                    break
+
+                # --- debug ---
+                # methods.debug_log('app.wsapi145', f"#data: {data} | {type(data)}")
+
+                # --- check ---
+                if not methods.is_json(data):
+                    methods.debug_log('app.wsapi133', f"#reason: error146")
+                    await client.send_json({'code': -1, 'reason': 'error146'})
+                    break
+
+                # --- check ---
+                data = methods.json_loads(data)
+                if not data.get('code'):
+                    methods.debug_log('app.wsapi144', f"#reason: error157")
+                    await client.send_json({'code': -1, 'reason': 'error157'})
+                    break
+
+                # --- debug ---
+                if data.get('code') not in [2008]:
+                    methods.debug_log('app.wsapi160', f"#data: {data}")
+
+                # --- check 2000 ---
+                code = data.get('code')
+                if code == 2000:
+
+                    # --- check ---
+                    if 'acct' not in data or 'pw' not in data:
+                        methods.debug_log('app.wsapi166', f"#reason: error164")
+                        await client.send_json({'code': -1, 'reason': 'error164'})
+                        break
+
+                    # --- check ---
+                    username = data.get('acct')
+                    password = data.get('pw')
+                    url = 'http://58.34.98.13:8099/api/ar/user/login'
+                    data = {
+                        "acct": username,
+                        "pw": password,
+                        "method": "1"  # 客户端类型 1 安卓 2 PC端
+                    }
+                    methods.debug_log('app.wsapi179', f"#url: {url}")
+                    response = requests.post(url=url, json=data)
+                    data = response.json()
+                    if data.get('msg') and data.get('msg') == 'success':
+                        data['code'] = 4000
+                        await client.send_json(data)
+                    else:
+                        methods.debug_log('app.wsapi186', f"#reason: error186")
+                        await client.send_json({'code': -1, 'reason': 'error186'})
+
+                    # --- save ---
+                    # methods.debug_log('app.wsapi179', f"{dir(client)}")
+                    # methods.debug_log('app.wsapi179', f"#id: {id(client)} | {type(id(client))}")
+                    methods.debug_log('app.wsapi179', f"{type(client)} | {methods.now_ts()} | {id(client)}")
+                    LineManage.line_dict[username] = client, methods.now_ts(), id(client)
+
+                # --- check 2008 ---
+                if code == 2008:
+
+                    # --- 判断该连接是否是已保存的链接 ---
+                    is_ok = False
+                    for key in list(LineManage.line_dict.keys()):
+
+                        ws, ts, _id = LineManage.line_dict.get(key)
+                        if _id == id(client):
+                            LineManage.line_dict[username] = client, methods.now_ts(), id(client)
+                            is_ok = True
+
+                    await client.send_json({'code': 4008, 'result': is_ok})
+
+
+
+        except Exception as exception:
+
+            if exception.__class__.__name__ == 'WebSocketDisconnect':
+                await client.close()
+            else:
+                methods.debug_log("app.wsapi163", f"#exception: {exception.__class__.__name__}")
+                methods.debug_log("app.wsapi163", f"#traceback: {traceback.format_exc()}")
+
+    # --- return ---
+    return app

+ 21 - 14
sri-server-bg02/lib/JobManage.py

@@ -1,5 +1,12 @@
 from hub import Global, methods
 
+import os
+
+# --- for linux
+# save_dir = f'/home/server/logs'
+
+# --- for windows
+save_dir = r'C:\temp'  # sri内网测试环境
 
 class JobManage(object):
     """"""
@@ -19,7 +26,7 @@ class JobManage(object):
         # Global.aps.create_job(func=cls.job20102, trigger='date', run_date='2022-07-28 17:15:30')  # 定时测试
 
         # --- release ---
-        # Global.aps.create_job(func=cls.job101_on_windows, trigger='cron', hour=22)  # 每天晚10点  release
+        Global.aps.create_job(func=cls.job101, trigger='cron', hour=22)  # 每天晚10点  release
         # Global.aps.create_job(func=cls.job20102, trigger='interval', seconds=600)  # 每10分钟  release
         # Global.aps.create_job(func=cls.job301, trigger='interval', seconds=300)  # 每5分钟  release
 
@@ -29,25 +36,25 @@ class JobManage(object):
         Global.aps.pause_all()
 
     @staticmethod
-    def job101_on_windows():
-        """
-        每日22点清理30天之前的日志
-        """
-        pass
-
-    @staticmethod
-    def job101_on_linux():
+    def job101():
         """
         每日22点清理30天之前的日志
         """
         # --- get list ---
-        log_file_dir = f"/home/server/logs"
-        file_path_list = methods.get_file_path_list(log_file_dir)
+        file_path_list = methods.get_file_path_list(save_dir)
         file_name_list = [i.split('/')[-1] for i in file_path_list]
-        # methods.debug_log('JobManage.job101.41', f"#file_name_list: {file_name_list}")
+        # methods.debug_log('JobManage44', f"#file_name_list: {file_name_list}")
+
+        # --- test ---
+        # for file_name in file_name_list:
+        #     file_path = os.path.join(save_dir, file_name)
+        #     if methods.is_file(file_path):
+        #         methods.remove_file(file_path)
 
         # --- cut list ---
-        for file_name in file_name_list[30:]:
-            file_path = f"/home/server/logs/{file_name}"
+        # remove_count = 30  # 按天
+        remove_count = 30 * 24  # 按小时
+        for file_name in file_name_list[remove_count:]:
+            file_path = os.path.join(save_dir, file_name)
             if methods.is_file(file_path):
                 methods.remove_file(file_path)

+ 253 - 0
sri-server-bg02/lib/LineManage.py

@@ -0,0 +1,253 @@
+from hub import methods, Global
+
+import asyncio
+import threading
+import traceback
+
+
+class LineManage(object):
+    """"""
+
+    line_dict = {}  # {<line_id>: <ws>, <now_ts>}
+
+    @classmethod
+    def run_forever(cls):
+        tasks = [cls.check_loop()]
+        _loop = asyncio.new_event_loop()
+        asyncio.set_event_loop(_loop)
+        loop = asyncio.get_event_loop()
+        loop.run_until_complete(asyncio.wait(tasks))
+
+    @classmethod
+    def run_background(cls, is_back_run=True):
+        t1 = threading.Thread(target=cls.run_forever)
+        t1.start()
+
+    @classmethod
+    async def check_loop(cls):
+
+        # --- define ---
+        # last_send_id = str()
+
+        while True:
+
+            try:
+
+                # --- debug ---
+                # methods.debug_log(f"LineManage.check_loop44", f"#run at {methods.now_string()} "
+                #                                                f"| {len(cls.line_dict.values())}")
+                # await asyncio.sleep(3)
+                # await asyncio.sleep(0.5)
+
+                # --- get send_data ---
+                # """
+                # send_data = {
+                #     send_id: 数据id
+                #     send_list: 数据列表
+                # }
+                # """
+                # send_data = Global.rdb.get_one(key='send_data')
+                # send_data = db0.get_one(key='send_data')
+
+                # --- check ---
+                # if not send_data:
+                #     continue
+
+                # --- check ---
+                # send_id = send_data.get('send_id')
+                # if not send_id:
+                #     continue
+
+                # --- check ---
+                # if send_id == last_send_id:
+                #     continue
+
+                # --- check ---
+                # send_list = send_data.get('send_list')
+                # if send_list is None or len(send_list) == 0:
+                #     continue
+
+                # --- debug ---
+                # await asyncio.sleep(3)
+                # await asyncio.sleep(0.5)
+                # methods.debug_log(f"LineManage", f"m-74: run at {methods.now_string()} "
+                #                                  f"| send count is {len(send_list)} "
+                #                                  f"| online count is {len(cls.line_dict.values())}")
+
+                # --- define ---
+                now_ts = methods.now_ts()
+
+                # --- check ---
+                for username in list(cls.line_dict.keys()):
+
+                    try:
+
+                        # --- debug ---
+                        _ws, last_live_at, _id = cls.line_dict.get(username)
+                        methods.debug_log(f"LineManage.check_loop87", f"#username: {username}"
+                                                                      f" | #last_live_at: {last_live_at}"
+                                                                      f" | #now_ts: {now_ts}")
+
+                        # --- check 180s ---
+                        if now_ts - last_live_at >= 3 * 60:
+                            cls.line_dict.pop(username)
+
+                        # --- check ---
+                        # if not cls.check_line_is_live(line_id):
+                        #     methods.debug_log(f"LineManage", f"m-56: websocket link broken.")
+                        #     cls.line_dict.pop(line_id)
+                        #     continue
+
+                        # --- send ---
+                        # """
+                        # send_list = [
+                        #     {
+                        #         base_face_uuid: 底库人脸id
+                        #         snap_face_image: 抓拍人脸
+                        #         base_face_image_path: 底库人脸路径
+                        #         face_similarity: 相似度
+                        #     }
+                        # ]
+                        # """
+                        # for data in send_list:
+                        #
+                        #     # --- check ---
+                        #     if data.get('snap_face_image') is None:
+                        #         continue
+                        #
+                        #     # --- define ---
+                        #     """
+                        #     send_dict = {
+                        #         input_face_b64: 抓拍人脸图像
+                        #         face_uuid: 人脸id
+                        #         face_name: 人脸名称
+                        #         known_face_b64: 底库人脸图像
+                        #         face_similarity: 相似度
+                        #         face_type_name_list: 人员类型
+                        #     }
+                        #     """
+                        #     send_dict = dict(
+                        #         input_face_b64=cls.image_to_b64(data.get('snap_face_image')),
+                        #         # input_face_b64=str(),
+                        #         known_face_b64=str(),
+                        #         face_uuid=str(),
+                        #         face_name=str(),
+                        #         face_similarity=data.get('face_similarity'),
+                        #         face_type_name_list=list(),
+                        #     )
+                        #
+                        #     # --- fill input_face_b64 ---
+                        #     # snap_face_image_path = data.get('snap_face_image_path')
+                        #     # if snap_face_image_path and methods.is_file(snap_face_image_path):
+                        #     #     frame = cv2.imread(snap_face_image_path)
+                        #     #     if frame is not None:
+                        #     #         _, image = cv2.imencode('.jpg', frame)
+                        #     #         base64_data = base64.b64encode(image)  # byte to b64 byte
+                        #     #         s = base64_data.decode()  # byte to str
+                        #     #         send_dict['input_face_b64'] = f'data:image/jpeg;base64,{s}'
+                        #
+                        #     # --- fill known_face_b64 ---
+                        #     base_face_image_path = data.get('base_face_image_path')
+                        #     if base_face_image_path and methods.is_file(base_face_image_path):
+                        #         frame = cv2.imread(base_face_image_path)
+                        #         if frame is not None:
+                        #             _, image = cv2.imencode('.jpg', frame)
+                        #             base64_data = base64.b64encode(image)  # byte to b64 byte
+                        #             s = base64_data.decode()  # byte to str
+                        #             send_dict['known_face_b64'] = f'data:image/jpeg;base64,{s}'
+                        #
+                        #     # --- fill face_uuid and face_name ---
+                        #     """
+                        #     Face: 陌生人脸表
+                        #     Face.face_name: 人脸名称
+                        #     """
+                        #     face_uuid = data.get('base_face_uuid')
+                        #     if face_uuid:
+                        #         send_dict['face_uuid'] = face_uuid
+                        #         face = Global.mdb.get_one_by_id('Face', face_uuid)
+                        #         if face and face.get('face_name'):
+                        #             send_dict['face_name'] = face.get('face_name')
+                        #
+                        #         # --- fill face_type_name_list ---
+                        #         face_type_uuid_list = face.get('face_type_uuid_list')
+                        #         if face_type_uuid_list:
+                        #             send_dict['face_type_name_list'] = [face_type_name_dict.get(i)
+                        #                                                 for i in face_type_uuid_list
+                        #                                                 if face_type_name_dict.get(i)]
+                        #
+                        #     # --- send ---
+                        #     # methods.debug_log(f"LineManage", f"m-153: send_dict is {send_dict}")
+                        #     line = cls.line_dict.get(line_id)
+                        #     send_json = methods.json_dumps(send_dict)
+                        #     await line.send_text(send_json)
+                        #     # methods.debug_log(f"LineManage",
+                        #     #                   f"m-161: end at {datetime.datetime.now().strftime('%H:%M:%S.%f')}")
+                        #     # await asyncio.sleep(0.1)
+
+                    except Exception as exception:
+
+                        # --- check ---
+                        if not cls.check_line_is_live(username):
+                            cls.line_dict.pop(username)
+
+                        if exception.__class__.__name__ == 'RuntimeError':
+                            methods.debug_log(f"LineManage.check_loop194", f"d2: {cls.get_line_state()}")
+                        else:
+                            methods.debug_log("LineManage.check_loop194", f"#e: {exception.__class__.__name__}")
+                            methods.debug_log("LineManage.check_loop194", f"#t: {traceback.format_exc()}")
+
+                    # --- debug ---
+                    methods.debug_log(f"LineManage.check_loop198", f"wait 1 minutes check again")
+                    await asyncio.sleep(60)
+                    # await asyncio.sleep(60)
+                    # await asyncio.sleep(60)
+
+
+            except Exception as exception:
+
+                methods.debug_log(f"LineManage.check_loop208", f"#e: {exception.__class__.__name__}")
+                methods.debug_log(f"LineManage.check_loop208", f"#t: {traceback.format_exc()}")
+                methods.debug_log(f"LineManage.check_loop208", f"wait 1 minutes try again!")
+                await asyncio.sleep(60)
+
+    @classmethod
+    def get_line_total(cls):
+        count = 0
+        for k, v in cls.line_dict.items():
+            count += 1
+        return count
+
+    @classmethod
+    def check_line_is_live(cls, line_id):
+        d1 = {
+            0: 'CONNECTING',
+            1: 'CONNECTED',
+            2: 'DISCONNECTED',
+        }
+        _ws, _, _ = cls.line_dict.get(line_id)
+        if _ws and d1.get(_ws.client_state.value) != 'DISCONNECTED':
+            return True
+        else:
+            return False
+
+    @classmethod
+    def get_line_state(cls):
+        d1 = {
+            0: 'CONNECTING',
+            1: 'CONNECTED',
+            2: 'DISCONNECTED',
+        }
+        d2 = dict()  # {<line_id>: <state>}
+        for line_id, line in cls.line_dict.items():
+            state = d1.get(line.client_state.value)
+            _id = line_id[-6:]
+            d2[_id] = state
+        return d2
+
+    # @staticmethod
+    # def image_to_b64(image):
+    #     frame = numpy_method.to_array(image)  # list to numpy array
+    #     _, image = cv2.imencode('.jpg', frame)
+    #     base64_data = base64.b64encode(image)
+    #     s = base64_data.decode()
+    #     return f'data:image/jpeg;base64,{s}'

+ 37 - 17
sri-server-bg02/lib/MessageListener.py

@@ -3,12 +3,14 @@ from hub import methods, Global
 import threading
 import time
 import json
+import os
 
-# for linux
-save_dir = f"/home/server/logs"
+# --- for linux
+# save_dir = f"/home/server/logs"
 
-# for windows
-save_dir = r'C:\SRI-DINO.Server-py\logs'
+# --- for windows
+# save_dir = r'C:\SRI-DINO.Server-py\logs'  # sri内网测试环境
+save_dir = r'C:\temp'  # sri内网测试环境
 
 
 class MessageListener(object):
@@ -18,17 +20,22 @@ class MessageListener(object):
     @staticmethod
     def decorate_method(client, userdata, message):
         """消息处理方法"""
-        file_name = methods.now_string('%Y-%m-%d.log')
-        log_file_path = f"{save_dir}/{file_name}"
-        log_dict = json.loads(message.payload)
-        # methods.debug_log(f"MessageListener.20", f"#log_dict: {log_dict}")
+        # --- log ---
+        # print(f'MessageListener24: #message.payload: {message.payload}', flush=True)
 
+        # --- save log ---
+        file_name = methods.now_string('%Y-%m-%d-%H.log')
+        # log_file_path = f"{save_dir}/{file_name}"
+        log_file_path = os.path.join(save_dir, file_name)
+        log_dict = json.loads(message.payload)
         log_list = list()
-        for i in range(1, 5):
+        # item_count = 8  # 小车项目
+        item_count = 29  # 湛江项目
+        for i in range(1, item_count + 1):
             v = str(log_dict.get(str(i)))
             log_list.append(v)
         methods.write_text(log_file_path, '|'.join(log_list) + '\n', 'a')
-        methods.debug_log(f"MessageListener25", f"#message.payload: {json.loads(message.payload)}")
+        # methods.debug_log(f"MessageListener38", f"#message.payload: {json.loads(message.payload)}")
 
     @classmethod
     def start_check_loop(cls):
@@ -36,20 +43,33 @@ class MessageListener(object):
         # --- check ---
         # if not methods.is_dir(save_dir):
         #     out = methods.run_command(f'mkdir -p {save_dir}', callback=True)
-        #     methods.debug_log('MessageListener33', f"#out: {out}")
+        #     methods.debug_log('MessageListener46', f"#out: {out}")
 
         Global.emqx.start_subscribe_loop(
             decorate_method=MessageListener.decorate_method,
             subscribe_topic='bg/log'
         )
 
-    @classmethod
-    def run_background(cls, background_is=True):
-        """"""
-        p1 = threading.Thread(target=cls.start_check_loop)
-        p1.start()
+    # @classmethod
+    # def run_background(cls, background_is=True):
+    #     """"""
+        
+    #     p1 = threading.Thread(target=cls.start_check_loop)
+    #     p1.start()
 
+    @classmethod
+    def run(cls, background_is=True):
+        thread_list = [
+            threading.Thread(target=cls.start_check_loop),
+        ]
+        for thread in thread_list:
+            thread.setDaemon(True)
+            thread.start()
+        if background_is:
+            return
+        for thread in thread_list:
+            thread.join()
 
 if __name__ == '__main__':
     # --- test ---
-    MessageListener.run_background()
+    MessageListener.run()

+ 0 - 252
sri-server-bg02/lib/line_manage.py

@@ -1,252 +0,0 @@
-"""
-websocket发数据
-"""
-from hub import methods, Global, numpy_method
-
-import cv2
-import base64
-import asyncio
-import threading
-
-
-class LineManage(object):
-    """"""
-
-    line_dict = {}  # {<line_id>: <ws>} | line_id: websocket连接id | ws: websocket链接对象
-
-    @classmethod
-    def run_forever(cls):
-        """
-        调用协程方法
-        """
-        tasks = [cls.check_send()]
-        _loop = asyncio.new_event_loop()
-        asyncio.set_event_loop(_loop)
-        loop = asyncio.get_event_loop()
-        loop.run_until_complete(asyncio.wait(tasks))
-
-    @classmethod
-    def run_background(cls, is_back_run=True):
-        """
-        后台运行
-        """
-        t1 = threading.Thread(target=cls.run_forever)
-        t1.start()
-
-    @classmethod
-    async def check_send(cls):
-
-        # --- define ---
-        last_send_id = str()
-
-        while True:
-
-            try:
-                # --- fill face_type_name_dict ---
-                """
-                face_type_name_dict = {<type_uuid>: <name>}
-                """
-                face_type_name_dict = dict()
-                for item in Global.mdb.get_all('FaceType'):
-                    uuid = str(item.get('_id'))
-                    face_type_name_dict[uuid] = item.get('name')
-
-                # --- debug ---
-                # methods.debug_log(f"LineManage", f"m-21: run at {methods.now_string()} "
-                #                                  f"| {len(cls.line_dict.values())}")
-                # await asyncio.sleep(3)
-                # await asyncio.sleep(0.5)
-
-                # --- get send_data ---
-                """
-                send_data = {
-                    send_id: 数据id
-                    send_list: 数据列表
-                }
-                """
-                send_data = Global.rdb.get_one(key='send_data')
-                # send_data = db0.get_one(key='send_data')
-
-                # --- check ---
-                if not send_data:
-                    continue
-
-                # --- check ---
-                send_id = send_data.get('send_id')
-                if not send_id:
-                    continue
-
-                # --- check ---
-                if send_id == last_send_id:
-                    continue
-
-                # --- check ---
-                send_list = send_data.get('send_list')
-                if send_list is None or len(send_list) == 0:
-                    continue
-
-                # --- debug ---
-                # await asyncio.sleep(3)
-                # await asyncio.sleep(0.5)
-                methods.debug_log(f"LineManage", f"m-74: run at {methods.now_string()} "
-                                                 f"| send count is {len(send_list)} "
-                                                 f"| online count is {len(cls.line_dict.values())}")
-
-                # --- update ---
-                last_send_id = send_id
-
-                # --- send ---
-                for line_id in list(cls.line_dict.keys()):
-
-                    try:
-
-                        # --- check ---
-                        if not cls.check_line_is_live(line_id):
-                            methods.debug_log(f"LineManage", f"m-56: websocket link broken.")
-                            cls.line_dict.pop(line_id)
-                            continue
-
-                        # --- send ---
-                        """
-                        send_list = [
-                            {
-                                base_face_uuid: 底库人脸id
-                                snap_face_image: 抓拍人脸
-                                base_face_image_path: 底库人脸路径
-                                face_similarity: 相似度
-                            }
-                        ]
-                        """
-                        for data in send_list:
-
-                            # --- check ---
-                            if data.get('snap_face_image') is None:
-                                continue
-
-                            # --- define ---
-                            """
-                            send_dict = {
-                                input_face_b64: 抓拍人脸图像
-                                face_uuid: 人脸id
-                                face_name: 人脸名称
-                                known_face_b64: 底库人脸图像
-                                face_similarity: 相似度
-                                face_type_name_list: 人员类型
-                            }
-                            """
-                            send_dict = dict(
-                                input_face_b64=cls.image_to_b64(data.get('snap_face_image')),
-                                # input_face_b64=str(),
-                                known_face_b64=str(),
-                                face_uuid=str(),
-                                face_name=str(),
-                                face_similarity=data.get('face_similarity'),
-                                face_type_name_list=list(),
-                            )
-
-                            # --- fill input_face_b64 ---
-                            # snap_face_image_path = data.get('snap_face_image_path')
-                            # if snap_face_image_path and methods.is_file(snap_face_image_path):
-                            #     frame = cv2.imread(snap_face_image_path)
-                            #     if frame is not None:
-                            #         _, image = cv2.imencode('.jpg', frame)
-                            #         base64_data = base64.b64encode(image)  # byte to b64 byte
-                            #         s = base64_data.decode()  # byte to str
-                            #         send_dict['input_face_b64'] = f'data:image/jpeg;base64,{s}'
-
-                            # --- fill known_face_b64 ---
-                            base_face_image_path = data.get('base_face_image_path')
-                            if base_face_image_path and methods.is_file(base_face_image_path):
-                                frame = cv2.imread(base_face_image_path)
-                                if frame is not None:
-                                    _, image = cv2.imencode('.jpg', frame)
-                                    base64_data = base64.b64encode(image)  # byte to b64 byte
-                                    s = base64_data.decode()  # byte to str
-                                    send_dict['known_face_b64'] = f'data:image/jpeg;base64,{s}'
-
-                            # --- fill face_uuid and face_name ---
-                            """
-                            Face: 陌生人脸表
-                            Face.face_name: 人脸名称
-                            """
-                            face_uuid = data.get('base_face_uuid')
-                            if face_uuid:
-                                send_dict['face_uuid'] = face_uuid
-                                face = Global.mdb.get_one_by_id('Face', face_uuid)
-                                if face and face.get('face_name'):
-                                    send_dict['face_name'] = face.get('face_name')
-
-                                # --- fill face_type_name_list ---
-                                face_type_uuid_list = face.get('face_type_uuid_list')
-                                if face_type_uuid_list:
-                                    send_dict['face_type_name_list'] = [face_type_name_dict.get(i)
-                                                                        for i in face_type_uuid_list
-                                                                        if face_type_name_dict.get(i)]
-
-                            # --- send ---
-                            # methods.debug_log(f"LineManage", f"m-153: send_dict is {send_dict}")
-                            line = cls.line_dict.get(line_id)
-                            send_json = methods.json_dumps(send_dict)
-                            await line.send_text(send_json)
-                            # await asyncio.sleep(0.1)
-
-                    except Exception as exception:
-
-                        # --- check ---
-                        if not cls.check_line_is_live(line_id):
-                            cls.line_dict.pop(line_id)
-
-                        if exception.__class__.__name__ == 'RuntimeError':
-                            methods.debug_log(f"LineManage", f"m-170: {cls.get_line_state()}")
-                        else:
-                            methods.debug_log('LineManage', f"m-172: exception | {exception}")
-                            methods.debug_log('LineManage', f"m-172: traceback | {methods.trace_log()}")
-
-            except Exception as exception:
-
-                methods.debug_log('LineManage', f"m-179: exception | {exception}")
-                methods.debug_log('LineManage', f"m-179: traceback | {methods.trace_log()}")
-                methods.debug_log('LineManage', f"m-179: wait 1 minutes try again!")
-                await asyncio.sleep(60)
-
-    @classmethod
-    def get_line_total(cls):
-        count = 0
-        for k, v in cls.line_dict.items():
-            count += 1
-        return count
-
-    @classmethod
-    def check_line_is_live(cls, line_id):
-        d1 = {
-            0: 'CONNECTING',
-            1: 'CONNECTED',
-            2: 'DISCONNECTED',
-        }
-        line = cls.line_dict.get(line_id)
-        if line and d1.get(line.client_state.value) != 'DISCONNECTED':
-            return True
-        else:
-            return False
-
-    @classmethod
-    def get_line_state(cls):
-        d1 = {
-            0: 'CONNECTING',
-            1: 'CONNECTED',
-            2: 'DISCONNECTED',
-        }
-        d2 = dict()  # {<line_id>: <state>}
-        for line_id, line in cls.line_dict.items():
-            state = d1.get(line.client_state.value)
-            _id = line_id[-6:]
-            d2[_id] = state
-        return d2
-
-    @staticmethod
-    def image_to_b64(image):
-        frame = numpy_method.to_array(image)  # list to numpy array
-        _, image = cv2.imencode('.jpg', frame)
-        base64_data = base64.b64encode(image)
-        s = base64_data.decode()
-        return f'data:image/jpeg;base64,{s}'

+ 9 - 4
sri-server-bg02/main.py

@@ -1,7 +1,11 @@
 # from app import generate_app
 # app = generate_app()
 import traceback
+from app import generate_app
 
+# import uvicorn
+
+app = generate_app()
 
 def main():
     try:
@@ -11,17 +15,18 @@ def main():
 
         # 监听mqtt消息服务
         from lib.MessageListener import MessageListener
-        MessageListener.run_background(background_is=False)
+        MessageListener.run(background_is=True)
 
         # 给websocket发数据的
         # from factories.line_manage import LineManage
         # LineManage.run_background()
 
         # websocket服务 与 api服务
-        # app.run(address='0.0.0.0', port=5042, debug=True)
+        app.run(address='0.0.0.0', port=5042, debug=True)
+
     except Exception as exception:
-        print(f"main23: {exception.__class__.__name__}")
-        print(f"main23: {traceback.format_exc()}")
+        print(f'main23: {exception.__class__.__name__}', flush=True)
+        print(f'main23: {traceback.format_exc()}', flush=True)
 
 
 if __name__ == '__main__':

+ 9 - 1
sri-server-bg02/requirements-win.txt

@@ -12,4 +12,12 @@ pymongo==3.11.2
 influxdb==5.3.1
 pymysql==0.9.3
 SQLAlchemy==1.4.30
-paho-mqtt==1.6.1
+paho-mqtt==1.6.1
+# --- for server ---
+typesystem==0.2.5
+aiohttp==3.7.3
+responder==2.0.7
+supervisor==4.2.1
+websocket-client==0.58.0
+# --- for script ---
+werkzeug==1.0.1

+ 45 - 0
sri-server-bg02/test/SRI202409-小车.md

@@ -0,0 +1,45 @@
+~~~
+mqtt服务地址:10.10.10.73(实验室mqtt服务器)
+mqtt服务地址:10.10.60.237(sri内网服务器)
+mqtt服务端口:41883
+mqtt话题:bg/log
+mqtt消息(json字符串):{"1": "2024-12-12 01:01:21", "2": 6000, "3": 6000, "4": 6000}
+mqtt消息注释:
+{
+  "1": "2024-12-12 01:01:21",  # 发送时间
+  "2": 6000,  # 方向值
+  "3": 6000,  # 油门值
+  "4": 6000   # 刹车值
+}
+'{\n   "Scout_id" : 321,\n   "left_b_wheel_motor_speed" : 1500,\n   "left_f_wheel_motor_speed" : 1400,\n   "right_b_wheel_motor_speed" : 1400,\n   "right_f_wheel_motor_speed" : 1500,\n   "rotation_speed" : 20,\n   "speed" : 0,\n   "time" : "2024-09-13 13:16:29"\n}\n'
+~~~
+
+~~~
+#车辆状态更新
+mqtt服务地址:192.168.131.23
+mqtt服务端口:41883
+mqtt话题:hs/vehicle/state
+mqtt消息说明:
+{
+    "address": "192.168.131.180",  # 车辆ip
+    "state": 1,  # 车辆状态 1 离线 2 在线空闲 3 人工驾驶中 4 远程驾驶中 5 自动驾驶中
+    "direction": 15,  # 车头方向(场地坐标偏转角度)
+    "coordinate_x": 15,  # 当前车辆坐标
+    "coordinate_y": 15,  # 当前车辆坐标
+    "weight": 15,  # 负载重量
+}
+~~~
+
+~~~
+#渣包位置更新
+mqtt服务地址:192.168.131.23
+mqtt服务端口:41883
+mqtt话题:hs/pot/data
+mqtt消息说明:
+{
+  "pot_name": "M.24",  # 渣罐编号
+  "pot_x": 40,  # 坐标值
+  "pot_y": 50,  # 坐标值
+  "mark_pot_pose": 0.3  # 渣罐姿态
+}
+~~~

+ 1 - 0
sri-server-bg02/test/SRI202409-湛江项目mqtt接口说明.md

@@ -1,5 +1,6 @@
 ~~~
 mqtt服务地址:10.10.10.73(实验室mqtt服务器)
+mqtt服务地址:10.10.60.237(sri内网服务器)
 mqtt服务端口:41883
 mqtt话题:bg/log
 mqtt消息(json字符串):{"1": "2024-12-12 01:01:21", "2": 6000, "3": 6000, "4": 6000}

+ 2 - 2
sri-server-bg02/test/test-mqtt-sender.py

@@ -4,8 +4,8 @@ import json
 
 client = mqtt.Client()
 # client.connect(host='10.10.61.229', port=41883)
-# client.connect(host='127.0.0.1', port=41883)
-client.connect(host='10.10.10.73', port=41883)
+client.connect(host='127.0.0.1', port=41883)  # 本地环境
+# client.connect(host='10.10.10.73', port=41883)
 
 
 def test_bg_log():