"""
todo
    1、存在消息被异步多次处理的问题
"""
# from hub import methods, Global

import struct
import asyncio
import time
import socket

import sys
import importlib

# --- for linux
sys.path.append('/home/sri/repositories/repositories/sri-project.demo-py/sri-dino-pyserver01')
sys.path.append('/home/sri/repositories/repositories/sri-project.demo-py/sri-pysdk')

protobuf = importlib.import_module(f"xprotobuf.protocol_pb2")
methods = importlib.import_module(f"xlib")
clients = {}  # {<ipv4>: (socket, update_at, type)} | {<连接id>: (socket对象, 最后一次请求时间, 客户端类型)}
serial_rid_dict = {
    '65F7171A-5585-46C7-A9D6-967ABA9EB223': 1000000,
    '7AF3F619-5067-4EE0-A710-89A6CB097EFE': 1000001,
    'ECB93A87-560B-4022-8C5F-CBF9FE1E596A': 1000002,
    'C0D14B6F-0FF0-4B68-877D-D2CB886FCD0E': 1000002,
    'E537DDFB-6E3E-4E1A-AD18-AC21393BE300': 1000004,  # 正在使用
}
account_uid_dict = {
    'ego': 3
}


class SRIConnection(asyncio.Protocol):
    """"""

    head_sequence = '<hh'  # 字节序规则
    head_size = struct.calcsize(head_sequence)
    message_data = b''

    def connection_made(self, client):
        """
        建立客户端连接
        """
        peername = client.get_extra_info('peername')
        self.connection_id = f"{peername[0].replace('.', '')}-{peername[1]}"  # 12700130000 客户端id
        self.client = client
        # self.data = b''
        self.client_type = None
        self.client_info = None
        self.update_at = methods.now_ts()
        clients[self.connection_id] = self

        # 获取底层 socket
        # sock = self.client.get_extra_info('socket')

        # 查看缓冲区大小
        # recv_buffer_size = sock.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
        # send_buffer_size = sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
        # methods.debug_log(f'{self.connection_id}|SRIConnection70', f"Receive buffer size: {recv_buffer_size}")
        # methods.debug_log(f'{self.connection_id}|SRIConnection70', f"Send buffer size: {send_buffer_size}")

    def connection_lost(self, exc):
        """
        关闭连接
        """
        methods.debug_log(f"{self.connection_id}|SRIConnection085", f"连接已关闭")

        # --- 处理车端掉线,通知所有舱端
        if self.client_type == 'vehicle':
            """
            SCDelRobot: 消息体
            SCDelRobot.peer: int32
            SCDelRobot.egotype: int32
            """
            o2 = protobuf.SCDelRobot()
            o2.peer = self.client_info.get('rid')  # 车端唯一标识
            o2.egotype = self.client_info.get('egotype')
            re_command_id = protobuf.SC_NotifyDel  # 4017
            re_body_length = o2.ByteSize()
            re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
            re_body_data = o2.SerializeToString()
            re_send_data = re_head_data + re_body_data
            methods.debug_log(f"{self.connection_id}|SRIConnection085", f"re_command_id: {re_command_id}")
            for item in clients.values():
                if item.client_type == 'cockpit':
                    item.client.write(re_send_data)

        # --- clean
        if self.connection_id in clients:
            del clients[self.connection_id]

    def data_received(self, data):
        """
        消息处理
        """
        # methods.debug_log(f'{self.connection_id}|SRIConnection96', f"data length: {len(data)}")
        asyncio.create_task(self.handle_data(data))

    async def handle_data(self, data):
        """
        消息处理
        """
        # methods.debug_log(f'{self.connection_id}|SRIConnection102', f"data length: {len(data)}")
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(None, self.process_data, data)

    def process_data(self, data):
        """
        消息处理
        """
        try:
            methods.debug_log(f"{self.connection_id}|SRIConnection114", f"receive: {len(data)}, message: {repr(data)}")
            self.message_data += data  # 执行了这个以后

            methods.debug_log(f"{self.connection_id}|SRIConnection114", f"--- 1")
            methods.debug_log(f"{self.connection_id}|SRIConnection114", f"--- 2")
            methods.debug_log(f"{self.connection_id}|SRIConnection114", f"--- 3")

            count = 0
            while True:

                # --- print
                count += 1
                methods.debug_log(f"{self.connection_id}|SRIConnection123",
                                  f"---------------------------------- while count: {count}")

                # 确保有足够的字节来处理消息头
                if len(self.message_data) < self.head_size:
                    break

                # 获取消息头
                head_data = self.message_data[:self.head_size]
                command_id, body_length = struct.unpack(self.head_sequence, head_data)
                methods.debug_log(f'{self.connection_id}|SRIConnection130',
                                  f"command_id: {command_id}, body_length: {body_length}")

                # 检查命令ID是否有效
                if not (2000 <= command_id < 5000):
                    self.message_data = b''  # 清空无效数据
                    break

                # 检查完整消息是否接收完毕
                total_length = self.head_size + body_length
                if len(self.message_data) < total_length:
                    break  # 数据不完整,等待更多数据

                # 调用相应处理方法
                method = getattr(self, f'message{command_id}', None)
                if method:
                    method(self.message_data[self.head_size:total_length])  # 处理完整消息
                else:
                    methods.debug_log(f"{self.connection_id}|SRIConnectionError",
                                      f"No handler for command_id: {command_id}")

                # 移除已处理的消息
                self.message_data = self.message_data[total_length:]

        except Exception as exception:
            methods.debug_log(f"{self.connection_id}|SRIConnection132", f"#exception: {exception}")
            methods.debug_log(f"{self.connection_id}|SRIConnection123", f"#traceback: {methods.trace_log()}")

    def message2008(self, body_data):
        # methods.debug_log(f"{self.connection_id}|SRIConnection80", f"message: 2008")
        pass

    def message2009(self, body_data):

        # --- 监听 2009
        """
        CSAdd: 消息体
        CSAdd.serial: string
        CSAdd.type: int32 EgoType::Car | EgoType::None EgoType::User EgoType::Car
        CSAdd.name: string
        """
        object0 = protobuf.CSAdd()
        object0.ParseFromString(body_data)
        methods.debug_log(f"{self.connection_id}|SRIConnection90", f"#serial: {object0.serial}")
        methods.debug_log(f"{self.connection_id}|SRIConnection90", f"#name: {object0.name}")
        methods.debug_log(f"{self.connection_id}|SRIConnection90", f"#type: {object0.type}")

        # --- update ---
        self.client_type = 'vehicle'
        self.client_info = {
            'rid': serial_rid_dict.get(object0.serial),
            'name': object0.name,
            'serial': object0.serial,
            'egotype': 2,  # 车端类型
        }

        # --- send 4016
        """
        SCAddRobot: 消息体
        SCAddRobot.robot: Robot
        Robot: 消息体
        Robot.rid: int32
        Robot.name: string
        Robot.type: int32
        Robot.state: RobotState Offline Online Busy
        """
        o1 = protobuf.Robot()
        o1.rid = self.client_info.get('rid')
        o1.name = object0.name
        o1.type = 2  # 0 EgoType::None 1 EgoType::User 2 EgoType::Car
        o1.state = protobuf.Robot.Online
        o2 = protobuf.SCAddRobot()
        o2.robot.CopyFrom(o1)
        re_command_id = protobuf.SC_NotifyAdd  # 4016
        re_body_length = o2.ByteSize()
        re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
        re_body_data = o2.SerializeToString()
        re_send_data = re_head_data + re_body_data
        methods.debug_log(f"{self.connection_id}|SRIConnection136", f"sendmessage: {re_command_id}")

        # --- send 4016 发送全部舱端
        for item in clients.values():
            if item.client_type == 'cockpit':
                item.client.write(re_send_data)

        # --- send 4007  todo 凯强说并未用到
        # o1 = protobuf.SCAdd()
        # o1.ret = True
        # o1.uid = 112233  # todo 这个uid,应该是哪个舱端永辉在操作这个车 | 但是现在还没有人操控车
        # o1.name = object0.name
        # re_command_id = protobuf.SC_Add  # 4007
        # re_body_length = o1.ByteSize()
        # re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
        # re_body_data = o1.SerializeToString()
        # re_send_data = re_head_data + re_body_data
        # methods.debug_log(f"{self.connection_id} | SRIConnection150", f"re_command_id: {re_command_id}")
        # self.client.write(re_send_data)

    def message2000(self, body_data):

        # --- 解析消息体
        object = protobuf.CSSign()
        object.ParseFromString(body_data)
        methods.debug_log(f"{self.connection_id}|SRIConnection162", f"#account: {object.account}")
        methods.debug_log(f"{self.connection_id}|SRIConnection162", f"#password: {object.password}")

        # --- update ---
        self.client_type = 'cockpit'
        self.client_info = {
            'uid': 3,  # 对应数据库里的ego的id
            'name': 'ego',  # 对应数据库里
            'egotype': 1,  # 舱端类型
        }

        # --- send 4000
        object = protobuf.SCSign()
        object.ret = True
        object.uid = self.client_info.get('uid')
        object.name = self.client_info.get('name')
        re_command_id = protobuf.SC_Sign  # 4000
        re_body_length = object.ByteSize()
        re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
        re_body_data = object.SerializeToString()
        re_send_data = re_head_data + re_body_data
        methods.debug_log(f"{self.connection_id}|SRIConnection175", f"re_command_id: {re_command_id}")
        self.client.write(re_send_data)

    def message2010(self, body_data):

        # --- send 4008 发送全部车端信息列表
        """
        Robot: 消息体
        Robot.rid: string
        Robot.name: string
        Robot.type: int32 EgoType::Car | EgoType::None EgoType::User EgoType::Car
        Robot.state: enum Offline Online Busy
        """
        o2 = protobuf.SCRobot()
        for item in clients.values():
            if item.client_type and item.client_type == 'vehicle':
                o1 = protobuf.Robot()
                o1.rid = item.client_info.get('rid')
                o1.name = item.client_info.get('name')
                o1.type = 2  # EgoType::Car
                o1.state = protobuf.Robot.RobotState.Value('Online')
                o2.robot.add().CopyFrom(o1)

        re_command_id = protobuf.SC_Robot  # 4008
        re_body_length = o2.ByteSize()
        re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
        re_body_data = o2.SerializeToString()
        re_send_data = re_head_data + re_body_data
        methods.debug_log(f"{self.connection_id}|SRIConnection217", f"re_command_id: {re_command_id}")
        self.client.write(re_send_data)

    def message2001(self, body_data):

        # --- 解析消息体
        """
        CSReq: 消息体
        CSReq.peer: int32(rid,车端唯一标识)
        CSReq.index: int32(相机位置,RenderPosition)
        CSReq.egotype: int32(终端类型,舱端/车端)
        """
        o1 = protobuf.CSReq()
        o1.ParseFromString(body_data)
        methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#peer: {o1.peer}")
        methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#peer: {o1.index}")
        # methods.debug_log(f"{self.connection_id}|SRIConnection235.message2001",
        #                   f"#=============================index: {o1.index}")
        methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#egotype: {o1.egotype}")
        methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#uid: {self.client_info.get('uid')}")
        methods.debug_log(f"{self.connection_id}|SRIConnection299.message2001", f"#rid: {self.client_info.get('rid')}")

        # --- send 4009 指定车端
        for item in clients.values():
            if item.client_info.get('rid') and item.client_info.get('rid') == o1.peer:
                """
                CSReq: 消息体
                CSReq.peer: int32(rid,车端唯一标识)
                CSReq.index: int32(相机位置,RenderPosition)
                CSReq.egotype: int32(终端类型,舱端/车端)
                """
                o2 = protobuf.CSReq()
                o2.peer = self.client_info.get('uid')
                o2.index = o1.index
                o2.egotype = o1.egotype
                re_command_id = protobuf.SC_NotifyReq  # 4009
                re_body_length = o2.ByteSize()
                re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
                re_body_data = o2.SerializeToString()
                re_send_data = re_head_data + re_body_data
                methods.debug_log(f"{self.connection_id}|SRIConnection217", f"re_command_id: {re_command_id}")
                item.client.write(re_send_data)

    def message2002(self, body_data):

        # --- 解析消息体
        """
        CSRep: 消息体
        CSRep.desc: VideoDesc
        CSRep.peer: int32
        CSRep.index: int32
        CSRep.egotype: int32
        """
        o1 = protobuf.CSRep()
        o1.ParseFromString(body_data)
        methods.debug_log(f"{self.connection_id}|SRIConnection319", f"#desc: {o1.desc}")
        methods.debug_log(f"{self.connection_id}|SRIConnection319", f"#peer: {o1.peer}")
        methods.debug_log(f"{self.connection_id}|SRIConnection319", f"#index: {o1.index}")
        methods.debug_log(f"{self.connection_id}|SRIConnection319", f"#egotype: {o1.egotype}")
        methods.debug_log(f"{self.connection_id}|SRIConnection235", f"#uid: {self.client_info.get('uid')}")
        methods.debug_log(f"{self.connection_id}|SRIConnection235", f"#rid: {self.client_info.get('rid')}")

        # --- send 4010 通知指定舱端
        for item in clients.values():
            if item.client_info.get('uid') and item.client_info.get('uid') == o1.peer:
                o2 = protobuf.CSRep()
                o2.desc = o1.desc
                o2.peer = self.client_info.get('rid')
                o2.index = o1.index
                o2.egotype = o1.egotype
                re_command_id = protobuf.SC_NotifyRep  # 4010
                re_body_length = o2.ByteSize()
                re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
                re_body_data = o2.SerializeToString()
                re_send_data = re_head_data + re_body_data
                methods.debug_log(f"{self.connection_id}|SRIConnection217", f"re_command_id: {re_command_id}")
                item.client.write(re_send_data)

    def message2004(self, body_data):

        # --- 解析消息体
        """
        Offer: 消息体
        Offer.index: int32
        Offer.peer: int32(车端rid)
        Offer.type: string
        Offer.sdp: string
        """
        object = protobuf.Offer()
        object.ParseFromString(body_data)
        methods.debug_log(f"{self.connection_id}|SRIConnection348", f"#peer: {object.peer}")
        methods.debug_log(f"{self.connection_id}|SRIConnection348", f"#uid: {self.client_info.get('uid')}")
        methods.debug_log(f"{self.connection_id}|SRIConnection348", f"#rid: {self.client_info.get('rid')}")

        # --- send 4012 指定车端
        for item in clients.values():
            if item.client_info.get('rid') and item.client_info.get('rid') == object.peer:
                o1 = protobuf.Offer()
                o1.index = object.index
                o1.peer = self.client_info.get('uid')  # 将舱端id赋值到指定车端对象
                o1.type = object.type
                o1.sdp = object.sdp
                re_command_id = protobuf.SC_NotifyOffer  # 4012
                re_body_length = o1.ByteSize()
                re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
                re_body_data = o1.SerializeToString()
                re_send_data = re_head_data + re_body_data
                methods.debug_log(f"{self.connection_id}|SRIConnection341", f"re_command_id: {re_command_id}")
                item.client.write(re_send_data)

    def message2005(self, body_data):

        # --- 解析消息体
        """
        Answer: 消息体
        Answer.index: int32
        Answer.peer: int32(舱端uid)
        Answer.type: string
        Answer.sdp: string
        """
        object = protobuf.Answer()
        object.ParseFromString(body_data)
        methods.debug_log(f"{self.connection_id}|SRIConnection411", f"#peer: {object.peer}")
        methods.debug_log(f"{self.connection_id}|SRIConnection411", f"#uid: {self.client_info.get('uid')}")
        methods.debug_log(f"{self.connection_id}|SRIConnection411", f"#rid: {self.client_info.get('rid')}")

        # --- send 4011 指定舱端
        count = 0
        for item in clients.values():

            count += 1
            methods.debug_log(f"{self.connection_id}|SRIConnection428",
                              f"[{count}]item.client_info: {item.client_info}")

            if item.client_info.get('uid') and item.client_info.get('uid') == object.peer:
                o1 = protobuf.Answer()
                o1.index = object.index
                o1.peer = self.client_info.get('rid')  # 将车端id赋值到指定舱端对象,
                o1.type = object.type
                o1.sdp = object.sdp
                re_command_id = protobuf.SC_NotifyAnswer  # 4011
                re_body_length = o1.ByteSize()
                re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
                re_body_data = o1.SerializeToString()
                re_send_data = re_head_data + re_body_data
                methods.debug_log(f"{self.connection_id}|SRIConnection428", f"re_command_id: {re_command_id}")
                methods.debug_log(f"{self.connection_id}|SRIConnection428",
                                  f"=====================================================-----index: {o1.index}")
                item.client.write(re_send_data)

    def message2006(self, body_data):

        # --- 解析消息体
        """
        Candidate: 消息体
        Candidate.index: int32
        Candidate.peer: int32(车端rid)
        Candidate.type: string
        Candidate.candidate: string
        Candidate.sdpMLineIndex: int32
        Candidate.sdpMid: string
        Candidate.egotype: int32
        """
        object = protobuf.Candidate()
        object.ParseFromString(body_data)
        methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#peer: {object.peer}")
        methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#candidate.in: {object.candidate}")
        methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#uid: {self.client_info.get('uid')}")
        methods.debug_log(f"{self.connection_id}|SRIConnection413", f"#rid: {self.client_info.get('rid')}")

        for item in clients.values():

            # --- send 4013 舱端到车端
            if item.client_info.get('rid') and item.client_info.get('rid') == object.peer:
                o1 = protobuf.Candidate()
                o1.index = object.index
                o1.peer = self.client_info.get('uid')  # 将舱端id赋值到指定车端对象,
                o1.type = object.type
                o1.candidate = object.candidate
                o1.sdpMLineIndex = object.sdpMLineIndex
                o1.sdpMid = object.sdpMid
                o1.egotype = object.egotype
                re_command_id = protobuf.SC_NotifyCandidate  # 4013
                re_body_length = o1.ByteSize()
                re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
                re_body_data = o1.SerializeToString()
                re_send_data = re_head_data + re_body_data
                methods.debug_log(f"{self.connection_id}|SRIConnection409", f"re_command_id: {re_command_id}")
                methods.debug_log(f"{self.connection_id}|SRIConnection409", f"#candidate.out: {o1.candidate}")
                item.client.write(re_send_data)

            # --- send 4013 车端到舱端
            if item.client_info.get('uid') and item.client_info.get('uid') == object.peer:
                o1 = protobuf.Candidate()
                o1.index = object.index
                o1.peer = self.client_info.get('rid')  # 将车端id赋值到指定舱端对象,
                o1.type = object.type
                o1.candidate = object.candidate
                o1.sdpMLineIndex = object.sdpMLineIndex
                o1.sdpMid = object.sdpMid
                o1.egotype = object.egotype
                re_command_id = protobuf.SC_NotifyCandidate  # 4013
                re_body_length = o1.ByteSize()
                re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
                re_body_data = o1.SerializeToString()
                re_send_data = re_head_data + re_body_data
                methods.debug_log(f"{self.connection_id}|SRIConnection426", f"re_command_id: {re_command_id}")
                methods.debug_log(f"{self.connection_id}|SRIConnection426", f"#candidate.out: {o1.candidate}")
                item.client.write(re_send_data)

    def message2007(self, body_data):

        # --- 解析消息体
        """
        Leave: 消息体
        Leave.peer: int32(车端rid)
        Leave.egotype: int32
        """
        o1 = protobuf.Leave()
        o1.ParseFromString(body_data)
        methods.debug_log(f"{self.connection_id}|SRIConnection497", f"#peer: {o1.peer}")

        # --- send 4009 指定车端
        for item in clients.values():
            if item.client_info.get('rid') and item.client_info.get('rid') == o1.peer:
                """
                Leave: 消息体
                Leave.peer: int32(车端rid)
                Leave.egotype: int32
                """
                o2 = protobuf.Leave()
                o2.peer = self.client_info.get('uid')
                o2.egotype = o1.egotype
                re_command_id = protobuf.SC_NotifyLeave  # 4014
                re_body_length = o2.ByteSize()
                re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
                re_body_data = o2.SerializeToString()
                re_send_data = re_head_data + re_body_data
                methods.debug_log(f"{self.connection_id}|SRIConnection515", f"re_command_id: {re_command_id}")
                item.client.write(re_send_data)

    def message2014(self, body_data):

        # --- 解析消息体
        """
        message CSState
        {
            UserState state=1;
            int32 uid=2;
        };
        """
        o1 = protobuf.CSState()
        o1.ParseFromString(body_data)
        methods.debug_log(f"{self.connection_id}|SRIConnection530", f"#state: {o1.state}")
        methods.debug_log(f"{self.connection_id}|SRIConnection530", f"#uid: {o1.uid}")

        # --- send 4016 发送全部舱端
        for item in clients.values():
            if item.client_type == 'cockpit':
                """
                message SCState
                {
                    UserState state=1;
                    int32 uid=2;
                };
                """
                o2 = protobuf.SCState()
                o2.state = o1.state
                o2.uid = self.client_info.get('rid')
                re_command_id = protobuf.SC_State  # 4022
                re_body_length = o2.ByteSize()
                re_head_data = struct.pack(self.head_sequence, re_command_id, re_body_length)
                re_body_data = o2.SerializeToString()
                re_send_data = re_head_data + re_body_data
                methods.debug_log(f"{self.connection_id}|SRIConnection551", f"re_command_id: {re_command_id}")
                item.client.write(re_send_data)

    @staticmethod
    async def check_clients():
        """
        剔除掉线连接
        """
        count = 0
        while True:
            count += 1
            print(f"#count: {count}", flush=True)

            now_at = methods.now_ts()

            for connection_id in list(clients.keys()):
                object = clients.get(connection_id)
                print(f"#client: {object.client}, #update_at: {object.update_at}, #client_type: {object.client_type}",
                      flush=True)

            await asyncio.sleep(3)  # 发送消息的间隔时间

    @staticmethod
    async def run():
        """
        """
        # --- define ---
        loop = asyncio.get_running_loop()
        server = await loop.create_server(
            lambda: SRIConnection(),
            '0.0.0.0', 20917
        )

        # --- start ---
        async with server:
            print("Server listening on 0.0.0.0:20917", flush=True)
            # await loop.create_task(SRIConnection.check_clients())
            await server.serve_forever()


if __name__ == '__main__':
    asyncio.run(SRIConnection.run())