# from hub import methods, Global import threading import socket import struct import time # import cv2 # import base64 # import asyncio import sys import importlib sys.path.append('/home/server/repositories/repositories/sri-project.demo-py/sri-server-bg03') protobuf = importlib.import_module(f"protobuf.protocol_pb2") sys.path.append('/home/server/repositories/repositories/sri-project.demo-py/3rdparty') methods = importlib.import_module(f"xlib") class Connection(object): """""" tcp_server = None tcp_client = None all_connection_dict = {} # {: (socket, update_at, type)} | {<连接id>: (socket对象, 最后一次请求时间, 客户端类型)} @classmethod def get_method_by_command_id(cls, command_id): """获取方法名""" fn_dict = { protobuf.CS_Sign: cls.f1, # 建立webrtc邀请 } return fn_dict.get(command_id) @classmethod def f1(cls): pass # @classmethod # def run_forever(cls): # """ # 调用协程方法 # """ # tasks = [cls.check_send()] # _loop = asyncio.new_event_loop() # asyncio.set_event_loop(_loop) # loop = asyncio.get_event_loop() # loop.run_until_complete(asyncio.wait(tasks)) # @classmethod # async def check_send(cls): # # # --- define --- # last_send_id = str() # # while True: # # try: # # --- fill face_type_name_dict --- # """ # face_type_name_dict = {: } # """ # face_type_name_dict = dict() # for item in Global.mdb.get_all('FaceType'): # uuid = str(item.get('_id')) # face_type_name_dict[uuid] = item.get('name') # # # --- debug --- # # methods.debug_log(f"LineManage", f"m-21: run at {methods.now_string()} " # # f"| {len(cls.line_dict.values())}") # # await asyncio.sleep(3) # # await asyncio.sleep(0.5) # # # --- get send_data --- # """ # send_data = { # send_id: 数据id # send_list: 数据列表 # } # """ # send_data = Global.rdb.get_one(key='send_data') # # send_data = db0.get_one(key='send_data') # # # --- check --- # if not send_data: # continue # # # --- check --- # send_id = send_data.get('send_id') # if not send_id: # continue # # # --- check --- # if send_id == last_send_id: # continue # # # --- check --- # send_list = send_data.get('send_list') # if send_list is None or len(send_list) == 0: # continue # # # --- debug --- # # await asyncio.sleep(3) # # await asyncio.sleep(0.5) # methods.debug_log(f"LineManage", f"m-74: run at {methods.now_string()} " # f"| send count is {len(send_list)} " # f"| online count is {len(cls.line_dict.values())}") # # # --- update --- # last_send_id = send_id # # # --- send --- # for line_id in list(cls.line_dict.keys()): # # try: # # # --- check --- # if not cls.check_line_is_live(line_id): # methods.debug_log(f"LineManage", f"m-56: websocket link broken.") # cls.line_dict.pop(line_id) # continue # # # --- send --- # """ # send_list = [ # { # base_face_uuid: 底库人脸id # snap_face_image: 抓拍人脸 # base_face_image_path: 底库人脸路径 # face_similarity: 相似度 # } # ] # """ # for data in send_list: # # # --- check --- # if data.get('snap_face_image') is None: # continue # # # --- define --- # """ # send_dict = { # input_face_b64: 抓拍人脸图像 # face_uuid: 人脸id # face_name: 人脸名称 # known_face_b64: 底库人脸图像 # face_similarity: 相似度 # face_type_name_list: 人员类型 # } # """ # send_dict = dict( # input_face_b64=cls.image_to_b64(data.get('snap_face_image')), # # input_face_b64=str(), # known_face_b64=str(), # face_uuid=str(), # face_name=str(), # face_similarity=data.get('face_similarity'), # face_type_name_list=list(), # ) # # # --- fill known_face_b64 --- # base_face_image_path = data.get('base_face_image_path') # if base_face_image_path and methods.is_file(base_face_image_path): # frame = cv2.imread(base_face_image_path) # if frame is not None: # _, image = cv2.imencode('.jpg', frame) # base64_data = base64.b64encode(image) # byte to b64 byte # s = base64_data.decode() # byte to str # send_dict['known_face_b64'] = f'data:image/jpeg;base64,{s}' # # # --- fill face_uuid and face_name --- # """ # Face: 陌生人脸表 # Face.face_name: 人脸名称 # """ # face_uuid = data.get('base_face_uuid') # if face_uuid: # send_dict['face_uuid'] = face_uuid # face = Global.mdb.get_one_by_id('Face', face_uuid) # if face and face.get('face_name'): # send_dict['face_name'] = face.get('face_name') # # # --- fill face_type_name_list --- # face_type_uuid_list = face.get('face_type_uuid_list') # if face_type_uuid_list: # send_dict['face_type_name_list'] = [face_type_name_dict.get(i) # for i in face_type_uuid_list # if face_type_name_dict.get(i)] # # # --- send --- # # methods.debug_log(f"LineManage", f"m-153: send_dict is {send_dict}") # line = cls.line_dict.get(line_id) # send_json = methods.json_dumps(send_dict) # await line.send_text(send_json) # # await asyncio.sleep(0.1) # # except Exception as exception: # # # --- check --- # if not cls.check_line_is_live(line_id): # cls.line_dict.pop(line_id) # # if exception.__class__.__name__ == 'RuntimeError': # methods.debug_log(f"LineManage", f"m-170: {cls.get_line_state()}") # else: # methods.debug_log('LineManage', f"m-172: exception | {exception}") # methods.debug_log('LineManage', f"m-172: traceback | {methods.trace_log()}") # # except Exception as exception: # # methods.debug_log('LineManage', f"m-179: exception | {exception}") # methods.debug_log('LineManage', f"m-179: traceback | {methods.trace_log()}") # methods.debug_log('LineManage', f"m-179: wait 1 minutes try again!") # await asyncio.sleep(60) # @classmethod # def get_line_total(cls): # count = 0 # for k, v in cls.line_dict.items(): # count += 1 # return count # @classmethod # def check_line_is_live(cls, line_id): # d1 = { # 0: 'CONNECTING', # 1: 'CONNECTED', # 2: 'DISCONNECTED', # } # line = cls.line_dict.get(line_id) # if line and d1.get(line.client_state.value) != 'DISCONNECTED': # return True # else: # return False # @classmethod # def get_line_state(cls): # d1 = { # 0: 'CONNECTING', # 1: 'CONNECTED', # 2: 'DISCONNECTED', # } # d2 = dict() # {: } # for line_id, line in cls.line_dict.items(): # state = d1.get(line.client_state.value) # _id = line_id[-6:] # d2[_id] = state # return d2 # @staticmethod # def image_to_b64(image): # frame = numpy_method.to_array(image) # list to numpy array # _, image = cv2.imencode('.jpg', frame) # base64_data = base64.b64encode(image) # s = base64_data.decode() # return f'data:image/jpeg;base64,{s}' @classmethod def create_tcp_server(cls, host='0.0.0.0', port=20916): cls.tcp_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) cls.tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) cls.tcp_server.bind((host, port)) cls.tcp_server.listen(10) # 设置可处理的未处理连接请求的最大数量 methods.debug_log('Connection.260', f"Server listening on {host}:{port}") @classmethod def start_listener(cls): cls.create_tcp_server() head_sequence = 'hh', head_bytestream) # methods.debug_log('Connection.5', f"Received values: {command_id, body_length}") # command_id, body_length = struct.unpack('>HH', head_bytestream) # methods.debug_log('Connection.6', f"Received values: {command_id, body_length}") # command_id, body_length = struct.unpack('@hh', head_bytestream) # methods.debug_log('Connection.9', f"Received values: {command_id, body_length}") # command_id, body_length = struct.unpack('@HH', head_bytestream) # methods.debug_log('Connection.10', f"Received values: {command_id, body_length}") # command_id, body_length = struct.unpack('=hh', head_bytestream) # methods.debug_log('Connection.13', f"Received values: {command_id, body_length}") # command_id, body_length = struct.unpack('=HH', head_bytestream) # methods.debug_log('Connection.14', f"Received values: {command_id, body_length}") # command_id, body_length = struct.unpack('ii', head_bytestream) # methods.debug_log('Connection.7', f"Received values: {command_id, body_length}") # command_id, body_length = struct.unpack('>II', head_bytestream) # methods.debug_log('Connection.8', f"Received values: {command_id, body_length}") # command_id, body_length = struct.unpack('@ii', head_bytestream) # methods.debug_log('Connection.11', f"Received values: {command_id, body_length}") # command_id, body_length = struct.unpack('@II', head_bytestream) # methods.debug_log('Connection.12', f"Received values: {command_id, body_length}") # command_id, body_length = struct.unpack('=ii', head_bytestream) # methods.debug_log('Connection.15', f"Received values: {command_id, body_length}") # command_id, body_length = struct.unpack('=II', head_bytestream) # methods.debug_log('Connection.16', f"Received values: {command_id, body_length}") # --- 解析数据体 if command_id == protobuf.CS_KeepAlive: continue elif command_id == protobuf.CS_Sign: # --- 接收请求数据 body_bytestream = client.recv(body_length) object = protobuf.CSSign() object.ParseFromString(body_bytestream) print(f"Received Response: account={object.account}, password={object.password}") # --- 发送返回数据 object = protobuf.SCSign() object.ret = True object.uid = 112233 object.cid = 223344 object.name = 'aabbcc' command_id = protobuf.SC_Sign body_length = object.ByteSize() head_bytestream = struct.pack(head_sequence, command_id, body_length) body_bytestream = object.SerializeToString() send_bytestream = head_bytestream + body_bytestream client.sendall(send_bytestream) print(f"Sent data: {send_bytestream}") # --- test --- # for _ in range(10): # for i in range(3)[::-1]: # methods.debug_log('SRIConnection.58', f"sleep {i}s! | [{ipv4}]") # time.sleep(1) # methods.debug_log('SRIConnection.58', f"send re_send_data | [{ipv4}]") # client.sendall(send_bytestream) except Exception as exception: methods.debug_log('Connection.287', f"#exception: {exception}") methods.debug_log('Connection.287', f"#traceback: {methods.trace_log()}") finally: client.close() @classmethod def run(cls, background_is=True): thread_list = [ threading.Thread(target=cls.start_listener), # 人脸检测 deepstream facenet ] for thread in thread_list: thread.setDaemon(True) thread.start() if background_is: return for thread in thread_list: thread.join() if __name__ == '__main__': print(protobuf.CS_Offer) Connection.run(background_is=False)