| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381 | 
							- # from hub import methods, Global
 
- import threading
 
- import socket
 
- import struct
 
- import time
 
- # import cv2
 
- # import base64
 
- # import asyncio
 
- import sys
 
- import importlib
 
- sys.path.append('/home/server/repositories/repositories/sri-project.demo-py/sri-server-bg03')
 
- protobuf = importlib.import_module(f"protobuf.protocol_pb2")
 
- sys.path.append('/home/server/repositories/repositories/sri-project.demo-py/3rdparty')
 
- methods = importlib.import_module(f"xlib")
 
- class Connection(object):
 
-     """"""
 
-     tcp_server = None
 
-     tcp_client = None
 
-     all_connection_dict = {}  # {<ipv4>: (socket, update_at, type)} | {<连接id>: (socket对象, 最后一次请求时间, 客户端类型)}
 
-     @classmethod
 
-     def get_method_by_command_id(cls, command_id):
 
-         """获取方法名"""
 
-         fn_dict = {
 
-             protobuf.CS_Sign: cls.f1,  # 建立webrtc邀请
 
-         }
 
-         return fn_dict.get(command_id)
 
-     @classmethod
 
-     def f1(cls):
 
-         pass
 
-     # @classmethod
 
-     # def run_forever(cls):
 
-     #     """
 
-     #     调用协程方法
 
-     #     """
 
-     #     tasks = [cls.check_send()]
 
-     #     _loop = asyncio.new_event_loop()
 
-     #     asyncio.set_event_loop(_loop)
 
-     #     loop = asyncio.get_event_loop()
 
-     #     loop.run_until_complete(asyncio.wait(tasks))
 
-     # @classmethod
 
-     # async def check_send(cls):
 
-     #
 
-     #     # --- define ---
 
-     #     last_send_id = str()
 
-     #
 
-     #     while True:
 
-     #
 
-     #         try:
 
-     #             # --- fill face_type_name_dict ---
 
-     #             """
 
-     #             face_type_name_dict = {<type_uuid>: <name>}
 
-     #             """
 
-     #             face_type_name_dict = dict()
 
-     #             for item in Global.mdb.get_all('FaceType'):
 
-     #                 uuid = str(item.get('_id'))
 
-     #                 face_type_name_dict[uuid] = item.get('name')
 
-     #
 
-     #             # --- debug ---
 
-     #             # methods.debug_log(f"LineManage", f"m-21: run at {methods.now_string()} "
 
-     #             #                                  f"| {len(cls.line_dict.values())}")
 
-     #             # await asyncio.sleep(3)
 
-     #             # await asyncio.sleep(0.5)
 
-     #
 
-     #             # --- get send_data ---
 
-     #             """
 
-     #             send_data = {
 
-     #                 send_id: 数据id
 
-     #                 send_list: 数据列表
 
-     #             }
 
-     #             """
 
-     #             send_data = Global.rdb.get_one(key='send_data')
 
-     #             # send_data = db0.get_one(key='send_data')
 
-     #
 
-     #             # --- check ---
 
-     #             if not send_data:
 
-     #                 continue
 
-     #
 
-     #             # --- check ---
 
-     #             send_id = send_data.get('send_id')
 
-     #             if not send_id:
 
-     #                 continue
 
-     #
 
-     #             # --- check ---
 
-     #             if send_id == last_send_id:
 
-     #                 continue
 
-     #
 
-     #             # --- check ---
 
-     #             send_list = send_data.get('send_list')
 
-     #             if send_list is None or len(send_list) == 0:
 
-     #                 continue
 
-     #
 
-     #             # --- debug ---
 
-     #             # await asyncio.sleep(3)
 
-     #             # await asyncio.sleep(0.5)
 
-     #             methods.debug_log(f"LineManage", f"m-74: run at {methods.now_string()} "
 
-     #                                              f"| send count is {len(send_list)} "
 
-     #                                              f"| online count is {len(cls.line_dict.values())}")
 
-     #
 
-     #             # --- update ---
 
-     #             last_send_id = send_id
 
-     #
 
-     #             # --- send ---
 
-     #             for line_id in list(cls.line_dict.keys()):
 
-     #
 
-     #                 try:
 
-     #
 
-     #                     # --- check ---
 
-     #                     if not cls.check_line_is_live(line_id):
 
-     #                         methods.debug_log(f"LineManage", f"m-56: websocket link broken.")
 
-     #                         cls.line_dict.pop(line_id)
 
-     #                         continue
 
-     #
 
-     #                     # --- send ---
 
-     #                     """
 
-     #                     send_list = [
 
-     #                         {
 
-     #                             base_face_uuid: 底库人脸id
 
-     #                             snap_face_image: 抓拍人脸
 
-     #                             base_face_image_path: 底库人脸路径
 
-     #                             face_similarity: 相似度
 
-     #                         }
 
-     #                     ]
 
-     #                     """
 
-     #                     for data in send_list:
 
-     #
 
-     #                         # --- check ---
 
-     #                         if data.get('snap_face_image') is None:
 
-     #                             continue
 
-     #
 
-     #                         # --- define ---
 
-     #                         """
 
-     #                         send_dict = {
 
-     #                             input_face_b64: 抓拍人脸图像
 
-     #                             face_uuid: 人脸id
 
-     #                             face_name: 人脸名称
 
-     #                             known_face_b64: 底库人脸图像
 
-     #                             face_similarity: 相似度
 
-     #                             face_type_name_list: 人员类型
 
-     #                         }
 
-     #                         """
 
-     #                         send_dict = dict(
 
-     #                             input_face_b64=cls.image_to_b64(data.get('snap_face_image')),
 
-     #                             # input_face_b64=str(),
 
-     #                             known_face_b64=str(),
 
-     #                             face_uuid=str(),
 
-     #                             face_name=str(),
 
-     #                             face_similarity=data.get('face_similarity'),
 
-     #                             face_type_name_list=list(),
 
-     #                         )
 
-     #
 
-     #                         # --- fill known_face_b64 ---
 
-     #                         base_face_image_path = data.get('base_face_image_path')
 
-     #                         if base_face_image_path and methods.is_file(base_face_image_path):
 
-     #                             frame = cv2.imread(base_face_image_path)
 
-     #                             if frame is not None:
 
-     #                                 _, image = cv2.imencode('.jpg', frame)
 
-     #                                 base64_data = base64.b64encode(image)  # byte to b64 byte
 
-     #                                 s = base64_data.decode()  # byte to str
 
-     #                                 send_dict['known_face_b64'] = f'data:image/jpeg;base64,{s}'
 
-     #
 
-     #                         # --- fill face_uuid and face_name ---
 
-     #                         """
 
-     #                         Face: 陌生人脸表
 
-     #                         Face.face_name: 人脸名称
 
-     #                         """
 
-     #                         face_uuid = data.get('base_face_uuid')
 
-     #                         if face_uuid:
 
-     #                             send_dict['face_uuid'] = face_uuid
 
-     #                             face = Global.mdb.get_one_by_id('Face', face_uuid)
 
-     #                             if face and face.get('face_name'):
 
-     #                                 send_dict['face_name'] = face.get('face_name')
 
-     #
 
-     #                             # --- fill face_type_name_list ---
 
-     #                             face_type_uuid_list = face.get('face_type_uuid_list')
 
-     #                             if face_type_uuid_list:
 
-     #                                 send_dict['face_type_name_list'] = [face_type_name_dict.get(i)
 
-     #                                                                     for i in face_type_uuid_list
 
-     #                                                                     if face_type_name_dict.get(i)]
 
-     #
 
-     #                         # --- send ---
 
-     #                         # methods.debug_log(f"LineManage", f"m-153: send_dict is {send_dict}")
 
-     #                         line = cls.line_dict.get(line_id)
 
-     #                         send_json = methods.json_dumps(send_dict)
 
-     #                         await line.send_text(send_json)
 
-     #                         # await asyncio.sleep(0.1)
 
-     #
 
-     #                 except Exception as exception:
 
-     #
 
-     #                     # --- check ---
 
-     #                     if not cls.check_line_is_live(line_id):
 
-     #                         cls.line_dict.pop(line_id)
 
-     #
 
-     #                     if exception.__class__.__name__ == 'RuntimeError':
 
-     #                         methods.debug_log(f"LineManage", f"m-170: {cls.get_line_state()}")
 
-     #                     else:
 
-     #                         methods.debug_log('LineManage', f"m-172: exception | {exception}")
 
-     #                         methods.debug_log('LineManage', f"m-172: traceback | {methods.trace_log()}")
 
-     #
 
-     #         except Exception as exception:
 
-     #
 
-     #             methods.debug_log('LineManage', f"m-179: exception | {exception}")
 
-     #             methods.debug_log('LineManage', f"m-179: traceback | {methods.trace_log()}")
 
-     #             methods.debug_log('LineManage', f"m-179: wait 1 minutes try again!")
 
-     #             await asyncio.sleep(60)
 
-     # @classmethod
 
-     # def get_line_total(cls):
 
-     #     count = 0
 
-     #     for k, v in cls.line_dict.items():
 
-     #         count += 1
 
-     #     return count
 
-     # @classmethod
 
-     # def check_line_is_live(cls, line_id):
 
-     #     d1 = {
 
-     #         0: 'CONNECTING',
 
-     #         1: 'CONNECTED',
 
-     #         2: 'DISCONNECTED',
 
-     #     }
 
-     #     line = cls.line_dict.get(line_id)
 
-     #     if line and d1.get(line.client_state.value) != 'DISCONNECTED':
 
-     #         return True
 
-     #     else:
 
-     #         return False
 
-     # @classmethod
 
-     # def get_line_state(cls):
 
-     #     d1 = {
 
-     #         0: 'CONNECTING',
 
-     #         1: 'CONNECTED',
 
-     #         2: 'DISCONNECTED',
 
-     #     }
 
-     #     d2 = dict()  # {<line_id>: <state>}
 
-     #     for line_id, line in cls.line_dict.items():
 
-     #         state = d1.get(line.client_state.value)
 
-     #         _id = line_id[-6:]
 
-     #         d2[_id] = state
 
-     #     return d2
 
-     # @staticmethod
 
-     # def image_to_b64(image):
 
-     #     frame = numpy_method.to_array(image)  # list to numpy array
 
-     #     _, image = cv2.imencode('.jpg', frame)
 
-     #     base64_data = base64.b64encode(image)
 
-     #     s = base64_data.decode()
 
-     #     return f'data:image/jpeg;base64,{s}'
 
-     @classmethod
 
-     def create_tcp_server(cls, host='0.0.0.0', port=20916):
 
-         cls.tcp_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
-         cls.tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
-         cls.tcp_server.bind((host, port))
 
-         cls.tcp_server.listen(10)  # 设置可处理的未处理连接请求的最大数量
 
-         methods.debug_log('Connection.260', f"Server listening on {host}:{port}")
 
-     @classmethod
 
-     def start_listener(cls):
 
-         cls.create_tcp_server()
 
-         head_sequence = '<hh'  # 字节序规则
 
-         head_size = struct.calcsize(head_sequence)
 
-         while True:
 
-             try:
 
-                 # --- 等待连接 ---
 
-                 client, ipv4 = cls.tcp_server.accept()
 
-                 # cls.clients[ipv4] = client, 时间戳, 客户端类型
 
-                 methods.debug_log('Connection.260', f"Connected by {ipv4}")
 
-                 # --- 解析数据头
 
-                 head_bytestream = client.recv(head_size)
 
-                 if not head_bytestream:
 
-                     continue
 
-                 command_id, body_length = struct.unpack(head_sequence, head_bytestream)
 
-                 methods.debug_log('Connection.284', f"Received values: {command_id, body_length}")
 
-                 # --- debug ---
 
-                 # command_id, body_length = struct.unpack('<hh', head_bytestream)
 
-                 # methods.debug_log('Connection.1', f"Received values: {command_id, body_length}")
 
-                 # command_id, body_length = struct.unpack('<HH', head_bytestream)
 
-                 # methods.debug_log('Connection.2', f"Received values: {command_id, body_length}")
 
-                 # command_id, body_length = struct.unpack('>hh', head_bytestream)
 
-                 # methods.debug_log('Connection.5', f"Received values: {command_id, body_length}")
 
-                 # command_id, body_length = struct.unpack('>HH', head_bytestream)
 
-                 # methods.debug_log('Connection.6', f"Received values: {command_id, body_length}")
 
-                 # command_id, body_length = struct.unpack('@hh', head_bytestream)
 
-                 # methods.debug_log('Connection.9', f"Received values: {command_id, body_length}")
 
-                 # command_id, body_length = struct.unpack('@HH', head_bytestream)
 
-                 # methods.debug_log('Connection.10', f"Received values: {command_id, body_length}")
 
-                 # command_id, body_length = struct.unpack('=hh', head_bytestream)
 
-                 # methods.debug_log('Connection.13', f"Received values: {command_id, body_length}")
 
-                 # command_id, body_length = struct.unpack('=HH', head_bytestream)
 
-                 # methods.debug_log('Connection.14', f"Received values: {command_id, body_length}")
 
-                 # command_id, body_length = struct.unpack('<ii', head_bytestream)
 
-                 # methods.debug_log('Connection.3', f"Received values: {command_id, body_length}")
 
-                 # command_id, body_length = struct.unpack('<II', head_bytestream)
 
-                 # methods.debug_log('Connection.4', f"Received values: {command_id, body_length}")
 
-                 # command_id, body_length = struct.unpack('>ii', head_bytestream)
 
-                 # methods.debug_log('Connection.7', f"Received values: {command_id, body_length}")
 
-                 # command_id, body_length = struct.unpack('>II', head_bytestream)
 
-                 # methods.debug_log('Connection.8', f"Received values: {command_id, body_length}")
 
-                 # command_id, body_length = struct.unpack('@ii', head_bytestream)
 
-                 # methods.debug_log('Connection.11', f"Received values: {command_id, body_length}")
 
-                 # command_id, body_length = struct.unpack('@II', head_bytestream)
 
-                 # methods.debug_log('Connection.12', f"Received values: {command_id, body_length}")
 
-                 # command_id, body_length = struct.unpack('=ii', head_bytestream)
 
-                 # methods.debug_log('Connection.15', f"Received values: {command_id, body_length}")
 
-                 # command_id, body_length = struct.unpack('=II', head_bytestream)
 
-                 # methods.debug_log('Connection.16', f"Received values: {command_id, body_length}")
 
-                 # --- 解析数据体
 
-                 if command_id == protobuf.CS_KeepAlive:
 
-                     continue
 
-                 elif command_id == protobuf.CS_Sign:
 
-                     # --- 接收请求数据
 
-                     body_bytestream = client.recv(body_length)
 
-                     object = protobuf.CSSign()
 
-                     object.ParseFromString(body_bytestream)
 
-                     print(f"Received Response: account={object.account}, password={object.password}")
 
-                     # --- 发送返回数据
 
-                     object = protobuf.SCSign()
 
-                     object.ret = True
 
-                     object.uid = 112233
 
-                     object.cid = 223344
 
-                     object.name = 'aabbcc'
 
-                     command_id = protobuf.SC_Sign
 
-                     body_length = object.ByteSize()
 
-                     head_bytestream = struct.pack(head_sequence, command_id, body_length)
 
-                     body_bytestream = object.SerializeToString()
 
-                     send_bytestream = head_bytestream + body_bytestream
 
-                     client.sendall(send_bytestream)
 
-                     print(f"Sent data: {send_bytestream}")
 
-                     # --- test ---
 
-                     # for _ in range(10):
 
-                     #     for i in range(3)[::-1]:
 
-                     #         methods.debug_log('SRIConnection.58', f"sleep {i}s! | [{ipv4}]")
 
-                     #         time.sleep(1)
 
-                     #     methods.debug_log('SRIConnection.58', f"send re_send_data | [{ipv4}]")
 
-                     #     client.sendall(send_bytestream)
 
-             except Exception as exception:
 
-                 methods.debug_log('Connection.287', f"#exception: {exception}")
 
-                 methods.debug_log('Connection.287', f"#traceback: {methods.trace_log()}")
 
-             finally:
 
-                 client.close()
 
-     @classmethod
 
-     def run(cls, background_is=True):
 
-         thread_list = [
 
-             threading.Thread(target=cls.start_listener),  # 人脸检测 deepstream facenet
 
-         ]
 
-         for thread in thread_list:
 
-             thread.setDaemon(True)
 
-             thread.start()
 
-         if background_is:
 
-             return
 
-         for thread in thread_list:
 
-             thread.join()
 
- if __name__ == '__main__':
 
-     print(protobuf.CS_Offer)
 
-     Connection.run(background_is=False)
 
 
  |