#include <atomic>
#include <string>
#include "../common/comm.h"
#include "api.h"
#include "../common/iobuffer.h"
#include "../common/sensor_socket.h"
#include "../common/peer_connection.h"
#include "VideoRenderer.h"
#include "../thirdparty/jsoncpp/include/json/json.h"
#include "protocol.pb.h"
#include "imu_sensor.h"
#include "Rtk.h"

#include "message_queue.h"
#include <fstream>
#include <iostream>
#include <cstring>

#include "hmacsha256.h"
#include <linux/usbdevice_fs.h>
#include <sys/ioctl.h>

#include <sys/statfs.h>                                                                                                            
#include <stdlib.h>
#include <unistd.h>
#include <sys/vfs.h>
#include <dirent.h>
#include <sys/stat.h>
 
CMessageQueue::CMessageQueue():Head(nullptr),Tail(nullptr)
{
   _peerId=-1;
   btimeStopedCar = false;
   File_fd = NULL;
   _Version = 1;
}
CMessageQueue::~CMessageQueue()
{

}

void renableUSB(const char* file)
{
    printf("Resetting USB device %s\n", file);
    int fd = open(file, O_WRONLY);
    if (fd < 0) {
        char text[256];
        perror(text);
        printf("Error opening output file %s", text);
        return;
    }


    int rc = ioctl(fd, USBDEVFS_RESET, 0);
    if (rc < 0) {
        perror("Error in ioctl");
        return;
    }
    printf("Reset successful\n");

    close(fd);
}

void CMessageQueue::Create()
{
  
    Json::Value root;
	Json::Reader jsonReader;
    std::ifstream ifile("./config.json");
    std::string serverip;
    int32_t _hostPort;
    int32_t _remotePort;
    _curTick=0;
    bStopedCar=false;
    if(jsonReader.parse(ifile,root))
    {
        std::cout<<"enter config json"<<std::endl;
        _serial=root["serial"].asString();
         
       int32_t _Rtkport=root["rtk_port"].asInt();
       int32_t _Rtkhost=root["rtk_host"].asInt();
       std::string _Rtkip=root["rtk_ip"].asString();
       std::cout<<"Rtk ip:"<< _Rtkip <<std::endl;
       _Rtk = std::make_unique<SensorSocket<CRtkSensor>>(this, _Rtkip, _Rtkport, _Rtkhost);
       //_Rtk->Start();


       _Version = root["Version"].asInt();

       if(_Version)//AD10
       {
            std::string can_port_radar = root["can_bus_radar"].asString();
            _CanBusRadar = std::make_unique<SensorCanBus<CCanRadarSensor>>(this, can_port_radar);
            //_CanBusRadar->Start();
       }
       else
       {
            //_RadarIp
            int32_t _radarport=root["radar_port"].asInt();
            int32_t _radarhost=root["radar_host"].asInt();
            std::string _radarip=root["radar_ip"].asString();
            std::cout<<"radar ip:"<<_radarip<<std::endl;
            _RadarIp = std::make_unique<SensorSocket<CRadarSensor>>(this,_radarip,_radarport,_radarhost);
            //_RadarIp->Start();
       }
       
       
       if(_Version)//AD10
       {
            std::string can_port = root["can_bus_vehicle"].asString();
            _CanBusVehicle = std::make_unique<SensorCanBus<CCanBusSensor>>(this, can_port);
            //_CanBusVehicle->Start();
       }
       else
       {
            _PcanBusVehicle = std::make_unique<SensorPeakCan<CPcanSensor>>(this);
            //_PcanBusVehicle->Start();
       }
       

       _UdpMinPort=root["udp_min_port"].asInt();
       _UdpMaxPort=root["udp_max_port"].asInt();
       //20230417
       //const char* serverUrl = "tcp://localhost:1883";  //服务器地址
       //const char* userName = "";  //用户名
       //const char* password = "";  //密码
       std::string serverUrl = root["MqttserverUrl"].asString();
       std::string userName = root["Esn"].asString();
       std::string password = root["EsnPass"].asString();
       std::string clientId = "";  //客户端标识符

       bool _Hm256 = false;

       unsigned char Source[128];

       time_t rawtime;
       struct tm* info;
       char buffer[80];
       time(&rawtime);

       info = localtime(&rawtime);

       uint8_t secret[32] = { 0 };
       //如UTC 时间2018/7/24 17:56:20 则应表示为2018072417。
       /*sprintf((char*)secret, "%d%.2d%.2d%.2d", info->tm_year + 1900, info->tm_mon + 1, info->tm_mday,
           info->tm_hour);*/
       sprintf((char*)secret, "%d%.2d%.2d", info->tm_year + 1900, info->tm_mon + 1, info->tm_mday);

       memset(Source, 0, 128);
       sprintf((char*)Source, "%s_0_0_%s", userName.c_str(), secret);
       clientId.clear();
       clientId.append((const char*)Source);

       if (password.length() && _Hm256)
       {
           uint8_t outdata[128] = { 0 };
           uint8_t md[SHA256_DIGESTLEN] = { 0 };

           int len1 = strlen((char*)secret);
           int len2 = strlen((char*)password.c_str());

           HMAC_SHA256_CTX hmac;
           hmac_sha256_init(&hmac, secret, len1);
           hmac_sha256_update(&hmac, (const uint8_t*)password.c_str(), len2);
           hmac_sha256_final(&hmac, md);

           memcpy(outdata, md, SHA256_DIGESTLEN);


           password.clear();
           password.append((const char*)outdata);
           
       }
       
       _Mqtt_ZR = std::make_unique<SensorMQTT<CMqttSensor>>(this, serverUrl, userName, password, clientId);


       std::string DataServerUrl = root["DataServerUrl"].asString();
       std::string DatauserName = root["DataEsn"].asString();
       std::string Datapassword = root["DataEsnPass"].asString();
       std::string DataclientId = root["DataClientId"].asString();

       _Mqtt_SE = std::make_unique<SensorMQTT<CDataMqttSensor>>(this, DataServerUrl, DatauserName, Datapassword, DataclientId);
       //_Mqtt_SE->Start();

       //std::string _usb_1 = root["usb_3"].asString();
       //renableUSB("/dev/bus/usb/002/002");
       //renableUSB("/dev/bus/usb/001/003");
       //renableUSB(_usb_2.c_str());
       //renableUSB(_usb_1.c_str());

        serverip=root["ip"].asString();
        _hostPort=root["TcpHostPort"].asInt();
        _remotePort=root["TcpRemotePort"].asInt();

        _name=root["name"].asString();
        const Json::Value arrayObj=root["camerainfo"];
        int32_t count=arrayObj.size();
        for(int32_t i=0;i<count;i++)
        {
            LocalCameraInfo info;
            const Json::Value& value=arrayObj[i];
            info.index=value["index"].asInt();
            info.label=value["label"].asString();
            _cameraArray.push_back(info);
        }
        for(int i=0;i<_cameraArray.size();i++)
        {
            _peerArray.push_back({nullptr});
        }
       
    }
    else{
        std::string error=jsonReader.getFormattedErrorMessages();
        std::cout<<error<<std::endl;
    }
    _client=std::make_unique<SocketClient>(this);
    _client->Start(serverip.c_str(),_remotePort,_hostPort);
   
   

    //20230412
    //_Mqtt_ZR = std::make_unique<CMqtt>(this);
    
    //_Mqtt_ZR->start();
  
    //_can=std::make_unique<SocketCan>(this);
    //_can->Start(_canip,_canport,_hostport);
      
    
  
    std::this_thread::yield();
 //    OnNotifyReq(0);

}
/*
void CMessageQueue::WriteCanMessage(std::unordered_map<int32_t, cannet_frame>& node,bool islidar)
{
    if(!bDataChannelCreated) return;
     // std::lock_guard<std::mutex> l(_canLock);
      RemoNet::CCCanMesage Req;
      Req.set_islidar(islidar);
      for(auto& p:node)
      {
         int32_t lidar=p.second.canid;
         auto m=Req.add_message();
         m->set_head(p.second.dlc);
         m->set_canid(lidar);
         m->set_data(p.second.data,8);
        
      }
      
      MessageHead Head;
      CIOBuffer pBuffer;
      Head.Command = RemoNet::CC_CAN;
      Head.Length = Req.ByteSizeLong();
      Head.Serialize(pBuffer.Buffer);
      auto ptr = pBuffer.Buffer + MessageHead::Size();
      Req.SerializeToArray(ptr, Head.Length);
      pBuffer.Length = MessageHead::Size() + Head.Length;
      _peerArray[0]->SendData(&pBuffer);
}

bool CMessageQueue::IsCarId(int32_t value)
{
    return std::find(_carArray.begin(),_carArray.end(),value)!=_carArray.end();
}
*/

void CMessageQueue::EnQueue(CIOBuffer* pBuffer)
{
    bool bNullBuffer=false;
    std::unique_lock <std::mutex>  lck(_lock);
    if(Head==nullptr)
    {
        Head=Tail=pBuffer;
        bNullBuffer=true;
    }
    else{
        Tail->NextBuf=pBuffer;
        Tail=Tail->NextBuf;
    } 
    pBuffer->NextBuf=nullptr;
    if(bNullBuffer)
    {
        //_cv.notify_one();
        //20231023
        _cv.notify_all();
    }
}  
void CMessageQueue::Process()
{
    // printf("-----process-----\n");
    CIOBuffer * ptr=nullptr;
    {
        std::unique_lock <std::mutex>  lck(_lock);
        /*
        while(Head==nullptr)
        {
            _cv.wait(lck);
        }
        */
        while(Head==nullptr&&_cv.wait_for(lck,std::chrono::seconds(1))==std::cv_status::timeout)
        {
          CheckSignal();


          //std::cout<<".";
          //std::cout.flush();
          
        }
        
    }
    while(Head!=nullptr)
    {
        // printf("head-------\n");
        ptr=Head;
        Head=Head->NextBuf;
        if(ptr!=nullptr)
        {
            Message* message=reinterpret_cast<Message *>(ptr->Buffer);
        
            switch (message->cmd)
            {
            case MessageType::ReqVideo:
            // printf("ReqVideo\n");
            // printf("ReqVideo message param_l : %d\n", message->param_l);
            OnNotifyReq((int32_t)message->param_l);
            break;
            case MessageType::RepVideo:
            // printf("RepVideo\n");
            // printf("RepVideo message param_l : %d\n", message->param_l);
            OnNotifyRep((int32_t)message->param_l);
            break;
            case MessageType::Connected:
            // printf("Connected\n");
            // printf("Connected message param_l : %d\n", message->param_l);
            OnNotifyConnected((bool)message->param_l);
            break;
            case MessageType::Leave:
            // printf("Leave\n");
            OnNotifyLeave();
            break;
            case MessageType::AsyncMessage:
            // printf("AsyncMessage\n");
            OnNotifyMessage();
            break;
            case MessageType::StopSensor:
            // printf("StopSensor\n");
            OnNotifyStopSensor();
            break;        
            case MessageType::Ping:
            // printf("Ping \n");
            OnNotifyPing(message->param_l);
            break; 
            }
            ptr->Release();
        }   
        // printf("8888888-------------head-------\n");
    }    
}
void CMessageQueue::SetTick(long long tick)
{
    _curTick=tick;
}


void CMessageQueue::OnNotifyConnected(bool bRet)
{
    if(bRet)
    {
        std::cout << _serial << " " << _name << std::endl;
        
        _client->WriteAddRobot(_serial,_name,static_cast<int32_t>(EgoType::Car));
        _updatethread.start(_client.get());
        //cs->Analog(0,0,0,0,0);

    }
    else
    {
        if(_peerId!=-1)
        {
            OnVideoLeave(_peerId,EgoType::User);
            _peerId=-1;
        }
      
        _updatethread.stop();
    }
}

// int day_diffient(int year_start, int month_start, int day_start,char *Dst)
// {
//     //2024-03-11

//     int year_end = strtol(Dst,NULL,10);

//     char Temp = '-',*p = NULL,*p1 = NULL;

//     p = strchr(Dst,Temp);
//     int month_end = strtol(p + 1,NULL,10);
//     p1 = strchr(p + 1,Temp);
//     int day_end = strtol(p1 + 1,NULL,10);

//     int y2, m2, d2;
//     int y1, m1, d1;

//     m1 = (month_start + 9) % 12;
//     y1 = year_start - m1/10;
//     d1 = 365*y1 + y1/4 - y1/100 + y1/400 + (m1*306 + 5)/10 + (day_start - 1);
//     m2 = (month_end + 9) % 12;
//     y2 = year_end - m2/10;
//     d2 = 365*y2 + y2/4 - y2/100 + y2/400 + (m2*306 + 5)/10 + (day_end - 1);

//     return (d1 - d2);
// }

// void delete_days(const char *path)
// {
//     DIR *dir;
//     struct dirent *entry;
//     struct stat statbuf;

//     time_t rawtime;
//     struct tm* info;
//     time(&rawtime);

//     info = localtime(&rawtime);
//     uint8_t secret[64] = { 0 };

//     sprintf((char*)secret, "%d-%.2d-%.2d.log", info->tm_year + 1900, info->tm_mon + 1, info->tm_mday);
//     printf("%s\r\n",secret);
 
//     dir = opendir(path);
//     if (dir == NULL)
//     {
//         printf("无法打开目录\n");
//         return;
//     }
 
//     while ((entry = readdir(dir)) != NULL) 
//     {
        
//         if(entry->d_name[0]=='.') 
//             continue;
//         //printf("%s\n",entry->d_name);

//         if(day_diffient(info->tm_year + 1900, info->tm_mon + 1, info->tm_mday,entry->d_name) > 15)
//         {
//             char Dst[128];
//             memset(Dst,0,128);

//             sprintf(Dst,"rm -rf /home/nvidia/devdata/ZJ_PRO_test/EgoSystem/build/log/%s",entry->d_name);
//             system(Dst);
//         }
//     }
//     closedir(dir);
// }

// void CMessageQueue::SerichFile(char *filename)
// {
//     DIR *directory_pointer;
//     struct dirent *entry;
//     int exist = 0;

//     char Dst[128];
//     memset(Dst,0,128);
//     sprintf(Dst,"/home/nvidia/devdata/ZJ_PRO_test/EgoSystem/build/log/%s",filename);

//     if((directory_pointer=opendir("/home/nvidia/devdata/ZJ_PRO_test/EgoSystem/build/log/")) == NULL)
//         printf("Error open\n");
//     else
//     {
//         while((entry=readdir(directory_pointer)) != NULL)
//         {
//             if(entry->d_name[0]=='.') continue;
//                 printf("%s\n",entry->d_name);
            
//             if(!strcmp(entry->d_name,filename))
//             {
//                 File_fd = fopen(Dst, "a");
//                 exist = 1;
//                 break;
//             }
//         }
//     }

//     if(!exist)
//         File_fd = fopen(Dst, "w+");
//     closedir(directory_pointer);
// }


void CMessageQueue::OnNotifyReq(int32_t index)
{ 
   // std::cout<<__FUNCTION__<<","<<__LINE__<<std::endl;
   printf("index:%d\n",index);
   printf ("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000\n");
    if(_peerId==-1) return;
    if(index==0)
    {
        mrsWebrtcCreateFactory(true);
        _curTick=0;
        bStopedCar=false;
        _Rtk->Start();
        if(_Version)//AD10
        {
            _CanBusVehicle->Start();
            _CanBusRadar->Start();
        }
        else
        {
            _PcanBusVehicle->Start();
            _RadarIp->Start();
        }
        
        _Mqtt_ZR->Start();
        _Mqtt_SE->Start();
        printf("8999----------------------------------------------------------------\n");

        // File_fd = NULL;
        // struct statfs diskInfo;  
      
        // statfs("/dev/mmcblk0p1", &diskInfo);  
        // unsigned long long freeDisk = diskInfo.f_bfree * diskInfo.f_bsize; 


        // time_t rawtime;
        // struct tm* info;
        // time(&rawtime);

        // info = localtime(&rawtime);

        // uint8_t secret[64] = { 0 };

        // sprintf((char*)secret, "%d-%.2d-%.2d.log", info->tm_year + 1900, info->tm_mon + 1, info->tm_mday);

        // if((freeDisk >> 30) < 15)
        // {
        //     system("rm -rf /home/nvidia/devdata/ZJ_PRO_test/EgoSystem/build/log");
        //     system("mkdir /home/nvidia/devdata/ZJ_PRO_test/EgoSystem/build/log/");
        // }
        // else
        //     delete_days("/home/nvidia/devdata/ZJ_PRO_test/EgoSystem/build/log");

        // SerichFile((char *)secret);

    }
    
    _peerArray[index]=std::make_unique<CPeerConnection>(static_cast<ChannelType>(index),_client.get());
   
	
	_client->WriteVideoRep(_peerId, RemoNet::VideoDesc::OK, index);
}
void CMessageQueue::OnNotifyRep(int32_t index)
{

    _peerArray[index]=std::make_unique<CPeerConnection>(static_cast<ChannelType>(index), _client.get());
    
	 
	InitPeerConnection(_peerId,index);
	_peerArray[index]->CreateOffer();
	
}
void CMessageQueue::InitPeerConnection(int32_t peer,int32_t index)
{
	 
   
	_peerArray[index]->Initialize(peer,index,_UdpMinPort,_UdpMaxPort);
    _peerArray[index]->AddDataChannel(true, false);
   
    _peerArray[index]->AddLocalVideoTrack(static_cast<RenderPosition>(index),_cameraArray[index].index);
    
    
     if(index==RenderPosition::BACK)
    {
        void * front=_peerArray[RenderPosition::FRONT]->GetCurrentCtx();
        while(front==nullptr)
        {
            std::cout<<"front==nullptr"<<std::endl;
             std::this_thread::sleep_for(std::chrono::microseconds(50));  
             front=_peerArray[RenderPosition::FRONT]->GetCurrentCtx();
        }
        void * back=_peerArray[RenderPosition::BACK]->GetCurrentCtx();
        while (back==nullptr)
        {
             std::cout<<"back==nullptr"<<std::endl;
            std::this_thread::sleep_for(std::chrono::microseconds(50));  
            back=_peerArray[RenderPosition::BACK]->GetCurrentCtx();
        }
        _peerArray[RenderPosition::FRONT]->SetOtherCtx(back);
        _peerArray[RenderPosition::BACK]->SetOtherCtx(front);
    }
    
    
   /*
    if((index+1)==RenderPosition::ALL)
    {
        void * front=_peerArray[RenderPosition::FRONT]->GetCurrentCtx();
        while(front==nullptr)
        {
            std::cout<<"front==nullptr"<<std::endl;
             std::this_thread::sleep_for(std::chrono::microseconds(50));  
             front=_peerArray[RenderPosition::FRONT]->GetCurrentCtx();
        }
        void * back=_peerArray[RenderPosition::BACK]->GetCurrentCtx();
        while (back==nullptr)
        {
             std::cout<<"back==nullptr"<<std::endl;
            std::this_thread::sleep_for(std::chrono::microseconds(50));  
            back=_peerArray[RenderPosition::BACK]->GetCurrentCtx();
        }
        _peerArray[RenderPosition::FRONT]->SetOtherCtx(back);
        _peerArray[RenderPosition::BACK]->SetOtherCtx(front);

        void * left=_peerArray[RenderPosition::LEFT]->GetCurrentCtx();
        while(left==nullptr)
        {
            std::cout<<"left==nullptr"<<std::endl;
             std::this_thread::sleep_for(std::chrono::microseconds(50));  
             front=_peerArray[RenderPosition::LEFT]->GetCurrentCtx();
        }
        void * right=_peerArray[RenderPosition::RIGHT]->GetCurrentCtx();
        while (right==nullptr)
        {
             std::cout<<"right==nullptr"<<std::endl;
            std::this_thread::sleep_for(std::chrono::microseconds(50));  
            back=_peerArray[RenderPosition::RIGHT]->GetCurrentCtx();
        }
         _peerArray[RenderPosition::LEFT]->SetOtherCtx(right);
        _peerArray[RenderPosition::RIGHT]->SetOtherCtx(left);
    }
    */
    /*
     if((index+1)==RenderPosition::ALL)
    {
        void * front=_peerArray[RenderPosition::FRONT]->GetCurrentCtx();
        void * back=_peerArray[RenderPosition::BACK]->GetCurrentCtx();
        void * left=_peerArray[RenderPosition::LEFT]->GetCurrentCtx();
        void * right=_peerArray[RenderPosition::RIGHT]->GetCurrentCtx();
        void * dash=_peerArray[RenderPosition::DASHBOARD]->GetCurrentCtx();
        _peerArray[RenderPosition::FRONT]->SetOtherCtx(back);
        _peerArray[RenderPosition::BACK]->SetOtherCtx(front);
        _peerArray[RenderPosition::LEFT]->SetOtherCtx(left);
        _peerArray[RenderPosition::RIGHT]->SetOtherCtx(right);
        _peerArray[RenderPosition::DASHBOARD]->SetOtherCtx(dash);
    }
    */
   

    if(index==RenderPosition::FRONT)
	    _peerArray[index]->AddLocalAudioTrack();
}
 
void CMessageQueue::OnAdd(bool bRet)
{

}
void CMessageQueue::OnConnected(bool bRet)
{
    
    CIOBuffer * pBuffer=CIOBuffer::Alloc();
    Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
    message->cmd=MessageType::Connected;
    message->param_l=bRet;
    EnQueue(pBuffer);
   
}
 
void CMessageQueue::OnVideoLeave(int32_t peer,EgoType type)
{
    {
         CIOBuffer * pBuffer=CIOBuffer::Alloc();
        Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
        message->cmd=MessageType::StopSensor;
        EnQueue(pBuffer);
    }
    {
        CIOBuffer * pBuffer=CIOBuffer::Alloc();
        Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
        message->cmd=MessageType::Leave;
     
        EnQueue(pBuffer);
    }
}
#ifdef WIN32
void CMessageQueue::OnVideoRep(int32_t index,RemoNet::VideoDesc desc)
{
   if (desc == RemoNet::VideoDesc::OK)
   {
        assert(_peerId!=-1);
        CIOBuffer * pBuffer=CIOBuffer::Alloc();
        Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
        message->cmd=MessageType::ReqVideo;
        message->param_l=index;
        
        EnQueue(pBuffer);
   }

}
#else
void CMessageQueue::OnVideoReq(int32_t video,int32_t peer)
{
   // std::cout<<__FUNCTION__<<","<<__LINE__<<std::endl;
    _peerId=peer;
    CIOBuffer * pBuffer=CIOBuffer::Alloc();
    Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
    message->cmd=MessageType::ReqVideo;
    message->param_l=video;
    
    EnQueue(pBuffer);
    
}
#endif
void CMessageQueue::OnNotifyLeave()
{
  
     std::this_thread::sleep_for(std::chrono::milliseconds(100));   
   
     for (size_t i = 0; i < _peerArray.size(); i++)
     {
         if(_peerArray[i]!=nullptr)
         {
             _peerArray[i]->Close();
             _peerArray[i].reset();
         }
          /* code */
     }
      _peerId=-1;
      
}
void CMessageQueue::OnNotifyStopSensor()
{
    _curTick=0;

    if(_Version)//AD10
    {
        std::cout<<"_CanBusVehicle Stop"<<std::endl;
        _CanBusVehicle->Stop();
        std::cout<<"_CanBusRadar Stop"<<std::endl;
        _CanBusRadar->Stop();
    }
    else
    {
        std::cout<<"_PcanBusVehicle Stop"<<std::endl;
        _PcanBusVehicle->Stop();
        std::cout<<"_RadarIp Stop"<<std::endl;
        _RadarIp->Stop();
    }
    std::cout << "RTK Stop" << std::endl;
    _Rtk->Stop();
    std::cout<<"mqtt Stop"<<std::endl;
    _Mqtt_ZR->Stop();
    std::cout<<"data mqtt Stop"<<std::endl;
    _Mqtt_SE->Stop();

    // if(!File_fd)
    // {
    //     fclose(File_fd);
    //     File_fd = NULL;
    // }
    

     RemoNet::StopAck Rep;
    
     CIOBuffer Buffer;
     MessageHead Head;
     Head.Command = RemoNet::CC_StopACK;
     Head.Length = Rep.ByteSizeLong();
     Head.Serialize(Buffer.Buffer);
     auto ptr = Buffer.Buffer + MessageHead::Size();
     Rep.SerializeToArray(ptr, Head.Length);
     Buffer.Length = Head.Length + MessageHead::Size();
     _peerArray[RenderPosition::FRONT]->SendData(Buffer);


}
void CMessageQueue::OnVideoOffer(int32_t index,const char* type, const char* sdp)
{ 
 //   std::cout<<__FUNCTION__<<","<<__LINE__<<std::endl;
 printf ("onvideo offer9999999999999999999999999999999999999999999999999999999999999999999999999999\n");
    InitPeerConnection(_peerId,index); 
    
    _peerArray[index]->SetRemoteDescription(type,sdp);
    _peerArray[index]->CreateAnswer();
     
}
void CMessageQueue::OnVideoAnswer(int32_t index, const char* type, const char* sdp)
{ 
  //  std::cout<<__FUNCTION__<<","<<__LINE__<<std::endl;
      
     _peerArray[index]->SetRemoteDescription(type,sdp);
    
}
void CMessageQueue::OnVideoCandidate(int32_t index,const char* candidate,
		int32_t sdp_mline_index,
		const char* sdp_mid)
{ 
      
    _peerArray[index]->AddIceCandidate(candidate,sdp_mline_index,sdp_mid);
     
}
/*
void CMessageQueue::SwitchCamera(bool front)
{
     _peerArray[RenderPosition::FRONT_BACK]->SwitchCapture(front);
}
*/

void CMessageQueue::OnMessageFrameNotify(ChannelType type,int16_t cmd,int16_t length,const void * data)
{
     
    // std::cout<<"cmd:" <<std::hex<<cmd<<std::endl;
     if(cmd==RemoNet::CC_Text)
     {
        RemoNet::TestTextReq Req;    
        Req.ParseFromArray(data,length);
        std::cout<<Req.text()<<std::endl;
        CIOBuffer * pBuffer=CIOBuffer::Alloc();
        Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
        message->cmd=MessageType::AsyncMessage;
        EnQueue(pBuffer);
    
     }
     else if(cmd==RemoNet::CC_Switch)
     {
         RemoNet::CCSwitch Req;
         Req.ParseFromArray(data,length);
         bool front=Req.front();
         _peerArray[RenderPosition::FRONT]->SwitchCapture(front);

     }
     else if(cmd==RemoNet::CC_Ping)
     {
           RemoNet::CCPing Req;
           Req.ParseFromArray(data,length);
           CIOBuffer * pBuffer=CIOBuffer::Alloc();
           Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
           message->cmd=MessageType::Ping;
           message->param_l=Req.tick();
           EnQueue(pBuffer);
     }
     else if(cmd==RemoNet::CC_SensorStop)
     {

        CIOBuffer * pBuffer=CIOBuffer::Alloc();
        Message* message=reinterpret_cast<Message *>(pBuffer->Buffer);
        message->cmd=MessageType::StopSensor;
        EnQueue(pBuffer);
     }
     else if (cmd == RemoNet::CC_CANMSG)
     {
         _source = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
         _curTick = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
         
         RemoNet::CCCanMsg Req;
         Req.ParseFromArray(data, length);

         cannet_frame* frames = (cannet_frame*)alloca(sizeof(cannet_frame) * Req.frams_size());

         for (int32_t i = 0; i < Req.frams_size(); i++)
         {
             auto& frame = Req.frams(i);
             frames[i].canid = frame.canid();
             frames[i].dlc = frame.dlc();
             memcpy(frames[i].data, frame.data().data(), frame.dlc());

         }
         //_robot->Get()->OnMessage(frames, Req.frams_size());
         if(_Version)//AD10
            _CanBusVehicle->Get()->OnMessage(frames, Req.frams_size());
        else
            _PcanBusVehicle->Get()->OnMessage(frames, Req.frams_size());
     }
}

void CMessageQueue::StopCar()
{
    std::cout<<"Stop Car"<<std::endl;
    if(_Version)//AD10
        _CanBusVehicle->Get()->Emergency();
    else
        _PcanBusVehicle->Get()->Emergency();
}
 
void CMessageQueue::OnNotifyMessage()
{
         RemoNet::TestTextReq Req;
         Req.set_text("ewqewqewqe");
         CIOBuffer Buffer;
          MessageHead Head;
          Head.Command = RemoNet::CC_Text;
          Head.Length = Req.ByteSizeLong();
          Head.Serialize(Buffer.Buffer);
          auto ptr = Buffer.Buffer + MessageHead::Size();
          Req.SerializeToArray(ptr, Head.Length);
          Buffer.Length = Head.Length + MessageHead::Size();
         _peerArray[RenderPosition::FRONT]->SendData(Buffer);
         
}

void CMessageQueue::OnNotifyPing(int64_t value)
{
     RemoNet::CCPing Rep;
     Rep.set_tick(value);
     CIOBuffer Buffer;
     MessageHead Head;
     Head.Command = RemoNet::CC_Ping;
     Head.Length = Rep.ByteSizeLong();
     Head.Serialize(Buffer.Buffer);
     auto ptr = Buffer.Buffer + MessageHead::Size();
     Rep.SerializeToArray(ptr, Head.Length);
     Buffer.Length = Head.Length + MessageHead::Size();
     //std::cout << "ping" << std::endl;
     if(  _peerArray[RenderPosition::FRONT]!=nullptr && _peerArray[RenderPosition::FRONT]->bReadyChannel)
        _peerArray[RenderPosition::FRONT]->SendData(Buffer);
}

/*
void CMessageQueue::StartCar()
{
    _can->SetStartWrite(true);
}
*/
void CMessageQueue::CheckSignal()
{
     
    if(!bStopedCar)
    {
        long long tick=std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
        if(_curTick!=0&&tick-_curTick > 3) 
        {
            StopCar();
            bStopedCar=true;
            std::cout<<"_curTick!=0&&tick-_curTick > 3" << std::endl;
            _curStopTick = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
            btimeStopedCar = true;
            _curTick = 0;
        }
    }
     
}
void CMessageQueue::WriteIMUData(ImuData* data)
{
     
    MessageHead Head;
    CIOBuffer Buffer;
       
    RemoNet::IMuMessage Req;
  
   
    Req.set_rx(data->ry);
    Req.set_ry(data->rx);
   // Req.set_rz(data->rz);
 
    Head.Command=RemoNet::CC_IMU;
    Head.Length=Req.ByteSizeLong();
    Head.Serialize(Buffer.Buffer);
    auto ptr = Buffer.Buffer + MessageHead::Size();
    Req.SerializeToArray(ptr, Head.Length);
    Buffer.Length = Head.Length + MessageHead::Size();
    
    if( _peerArray[ChannelType::CHANNEL_IMU]!=nullptr)
        _peerArray[ChannelType::CHANNEL_IMU]->SendData(Buffer);
    
  
}
void CMessageQueue::WritePacket(ChannelType type, CIOBuffer & pBuffer)
{
     if( _peerArray[type]!=nullptr)
        _peerArray[type]->SendData(pBuffer);
}


void CMessageQueue::WriteRadarData(RadarData& data)
{
      MessageHead Head;
      CIOBuffer Buffer;
      RemoNet::CCRadarMessage Req;
      Req.set_radar0(data.r0);
      Req.set_radar1(data.r1);
      Req.set_radar2(data.r2);
      Req.set_radar3(data.r3);
      Req.set_radar4(data.r4);
      Req.set_radar5(data.r5);
      Req.set_radar6(data.r6);
      Req.set_radar7(data.r7);
      //Head.Command=RemoNet::CC_IMU;
      Head.Command = RemoNet::CC_Radar;
      Head.Length=Req.ByteSizeLong();
      Head.Serialize(Buffer.Buffer);
      auto ptr = Buffer.Buffer + MessageHead::Size();
      Req.SerializeToArray(ptr, Head.Length);
      Buffer.Length = Head.Length + MessageHead::Size();
      if( _peerArray[ChannelType::CHANNEL_RADAR]!=nullptr)
         _peerArray[ChannelType::CHANNEL_RADAR]->SendData(Buffer);
       
}

void CMessageQueue::WriteRobotStatus(int32_t ,int32_t )
{
    
}
#ifdef LIDAR_SENSOR
void CMessageQueue::WriteLidarPoint(const PointCloudMsg<PointXYZI>& msg,ChannelType side)
{
    RemoNet::LidarPoint pt;
    pt.set_is_left(side==ChannelType::CHANNEL_LEFT_LIDAR);
    pt.set_frame_id(msg.frame_id);
    pt.set_height(msg.height);
    pt.set_width(msg.width);
    pt.set_is_dense(msg.is_dense);
    pt.set_seq(msg.seq);
    pt.set_timestamp(msg.timestamp);
    for(int i=0;i<msg.point_cloud_ptr->size();i++)
    {
        pt.add_data((*msg.point_cloud_ptr)[i].x);
        pt.add_data((*msg.point_cloud_ptr)[i].y);
        pt.add_data((*msg.point_cloud_ptr)[i].z);
        pt.add_data((*msg.point_cloud_ptr)[i].intensity);
    }
    MessageHead Head;
    CIOBuffer Buffer;
    Head.Command=RemoNet::CC_LIDARDATA;
    Head.Length=pt.ByteSizeLong();
    Head.Serialize(Buffer.Buffer);
    auto ptr = Buffer.Buffer + MessageHead::Size();
    pt.SerializeToArray(ptr, Head.Length);
    Buffer.Length = Head.Length + MessageHead::Size();
    
    if( _peerArray[side]!=nullptr)
        _peerArray[side]->SendData(&Buffer);
}
#endif
void CMessageQueue::SwitchCamera(bool front)
{
      _peerArray[RenderPosition::FRONT]->SwitchCapture(front);
      _peerArray[RenderPosition::BACK]->SwitchCapture(front);
}

void CMessageQueue::SendZGJStatus(int status)
{ 
    _Rtk->Get()->Send_ZGJ_status(status);
}

void CMessageQueue::SendVehicleStatus(int16_t Direction,int16_t Hand_Throttle,int16_t Foot_Throttle,int16_t Brake)
{
    //CDataMqttSensor::sendMessage()
    if (CDataMqttSensor::_run)
	{
		time_t rawtime;
        struct tm* info;
        time(&rawtime);

        info = localtime(&rawtime);

        char secret[64] = { 0 };

        sprintf((char*)secret, "%d-%.2d-%.2d %.2d:%.2d:%.2d", info->tm_year + 1900, info->tm_mon + 1, info->tm_mday,info->tm_hour,info->tm_min,info->tm_sec);
        printf("%s\r\n",secret);


        char WriteLocalDat[128];
        memset(WriteLocalDat,0,128);
        sprintf(WriteLocalDat, "%d-%.2d-%.2d %.2d:%.2d:%.2d    方向值:%d    手油门值:%d    脚油门值:%d    刹车值:%d\n", info->tm_year + 1900, info->tm_mon + 1, info->tm_mday,info->tm_hour,info->tm_min,info->tm_sec,
        Direction,Hand_Throttle,Foot_Throttle,Brake);
        fwrite(WriteLocalDat, strlen(WriteLocalDat) , 1, File_fd);

		Json::Value root;
		Json::Value Source;
		Json::FastWriter writer;

		std::string SendTime;

		Source["1"] = SendTime.assign(secret,strlen(secret));//发送时间
		Source["2"] = Direction;//方向值
		Source["3"] = Hand_Throttle;//手油门值
        Source["4"] = Foot_Throttle;//脚油门值
		Source["5"] = Brake;//刹车值

		Json::StyledWriter sw;
		//std::cout << sw.write(Source) << std::endl << std::endl;
        if (!DataMqtt_SendDate)
		{
			DataMqtt_curTick = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
			DataMqtt_SendDate = true;
		}
		else
		{
			long long tick = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
			if (DataMqtt_curTick != 0 && tick - DataMqtt_curTick > 10000)
			{
				if (CDataMqttSensor::_run && (CDataMqttSensor::m_mqttClient != NULL))
					CDataMqttSensor::sendMessage((char*)sw.write(Source).c_str(), 0, (char*)"bg/log");
				DataMqtt_SendDate = false;
			}
		}
	}
}