|
- #include <string.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <unistd.h>
- #include <sys/select.h>
- #include <sys/time.h>
- #include <sys/socket.h>
- #include <netinet/in.h>
- #include <arpa/inet.h>
- #include <sys/epoll.h>
- #include <chrono>
- #include "user_handler.h"
- #include "remote.pb.h"
- #include "../common/iobuffer.h"
- #include "user_manager.h"
- #include <mysql_connection.h>
- #include <mysql_driver.h>
- #include <cppconn/exception.h>
- #include <cppconn/driver.h>
- #include <cppconn/connection.h>
- #include <cppconn/resultset.h>
- #include <cppconn/prepared_statement.h>
- #include <cppconn/statement.h>
- #include "DBConnect.h"
- #include "scoped_ptr.h"
- FNRedirector<CUserHandler> CUserHandler::Redirector;
- CUserHandler::CUserHandler(int32_t fd):_fd(fd)
- {
- }
- void CUserHandler::InitFnDirector()
- {
- Redirector.Insert(remote::cs_answer, &CUserHandler::OnAnswer);
-
- Redirector.Insert(remote::cs_candidate, &CUserHandler::OnCadidate);
- Redirector.Insert(remote::cs_keepAlive, &CUserHandler::OnKeepAlive);
- Redirector.Insert(remote::cs_add, &CUserHandler::OnAdd);
- Redirector.Insert(remote::cs_robot, &CUserHandler::OnRobot);
- Redirector.Insert(remote::cs_offer, &CUserHandler::OnOffer);
- Redirector.Insert(remote::cs_rep, &CUserHandler::OnRepVideo);
- Redirector.Insert(remote::cs_req, &CUserHandler::OnReqVideo);
-
- Redirector.Insert(remote::cs_leave, &CUserHandler::OnLeave);
-
-
-
- //Redirector.Insert(RemoNet::CS_CloseVideo, &CUserSocket::OnCloseVideo);
- }
- long long CUserHandler::GetTimeTick()
- {
- return _tick;
- }
- void CUserHandler::OnTimeout()
- {
- }
- int32_t CUserHandler::Read()
- {
- int32_t length=_readBuffer->Length;
- int32_t ret=recv(_fd,&_readBuffer->Buffer[length],CIOBuffer::IO_BUFFER_SIZE-length,0);
- if(ret<0)
- {
- if((errno == EAGAIN) || (errno == EWOULDBLOCK))
- {
- printf("read later\n");
- return 1;
- }
- else {
- return -1;
- }
- }
- _readBuffer->Length+=ret;
- while(true)
- {
- int8_t * ptr=_readBuffer->Buffer;
- length+=ret;
- if(length<MessageHead::Size())
- {
- return 0;
- }
- MessageHead Head;
- Head.Deserialize(ptr);
- int32_t size=length<MessageHead::Size()+Head.Length;
- if(length<size)
- {
- return 0;
- }
- int8_t* Data = ptr + MessageHead::Size();
- Process(Head.Command, Data, Head.Length);
- length -= size;
- ptr += size;
- }
- return 0;
- }
- void CUserHandler::Process(int32_t cmd, int8_t* Data, int32_t Size)
- {
- if (!_loginsucc)
- {
- if ((cmd != remote::cs_add)) return;
- }
- Redirector.Process(this, cmd, Data, Size);
- }
- void CUserHandler::OnKeepAlive(int8_t* Data, int16_t Size)
- {
- _tick=std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();;
- }
- void CUserHandler::OnOffer(int8_t* Data, int16_t Size)
- {
- std::cout << __FUNCTION__ << std::endl;
- remote::Offer Req;
- Req.ParseFromArray(Data, Size);
- int32_t peer = Req.peer();
- CUserManager::GetInstance().NotifyOffer(peer,_uid,Req.index(),Req.type().c_str(),Req.sdp().c_str());
- }
- void CUserHandler::OnAnswer(int8_t* Data, int16_t Size)
- {
- std::cout << __FUNCTION__ << std::endl;
- remote::Answer Req;
- Req.ParseFromArray(Data, Size);
- int32_t peer = Req.peer();
- std::cout << Req.sdp().c_str() << std::endl;
- CUserManager::GetInstance().NotifyAnswer(peer, _uid, Req.index(), Req.type().c_str(), Req.sdp().c_str());
- }
- void CUserHandler::OnCadidate(int8_t* Data, int16_t Size)
- {
- remote::Candidate Req;
- Req.ParseFromArray(Data, Size);
- int32_t peer = Req.peer();
- CUserManager::GetInstance().NotifyCandidate(peer, _uid, Req.index(), Req.type().c_str(), Req.candidate().c_str(), Req.sdpmlineindex(), Req.sdpmid().c_str());
- }
- void CUserHandler::OnAdd(int8_t* Data, int16_t Size)
- {
- remote::CSAdd Req;
- Req.ParseFromArray(Data, Size);
- std::string serial = Req.serial();
- _name = Req.name();
- CQPtr<CConnectionPtr<sql::Connection>> Conn = CDBConnectPool::GetInstance().QueryConnect();
- scoped_ptr<sql::Statement> stmt = (*Conn.get())->createStatement();
- char sql[1024];
- sprintf(sql, "select id from robot where serial=\'%s\'", serial.c_str());
- scoped_ptr<sql::ResultSet> resultSet = stmt->executeQuery(sql);
- bool bRet = false;
- if (resultSet->next())
- {
- _uid = resultSet->getInt(1);
-
-
-
- _loginsucc = true;
- _egoType =static_cast<EgoType>(Req.type());
-
- _state = UserState::Idle;
-
- bRet = true;
- }
- if (bRet)
- {
-
- remote::SCAddRobot robot;
- auto r=robot.mutable_robot();
- r->set_name(_name);
- r->set_rid(_uid);
- r->set_type(_egoType);
- r->set_state(UserState::Idle);
-
-
- CIOBuffer* pBuffer = CIOBuffer::Alloc(__FILE__, __LINE__);
- MessageHead Head;
- Head.Command = remote::sc_NotifyAdd;
- Head.Length = robot.ByteSizeLong();
- Head.Serialize(pBuffer->Buffer);
- auto ptr = pBuffer->Buffer + MessageHead::Size();
- robot.SerializeToArray(ptr, Head.Length);
- pBuffer->Length = MessageHead::Size() + Head.Length;
- CUserManager::GetInstance().BroadCast(pBuffer);
- pBuffer->Release(__FILE__, __LINE__);
- }
- remote::SCAdd Rep;
- Rep.set_ret(bRet);
- Rep.set_uid(_uid);
-
- Rep.set_name(_name);
- CIOBuffer* pBuffer = CIOBuffer::Alloc(__FILE__, __LINE__);
- MessageHead Head;
- Head.Command = remote::sc_add;
- Head.Length = Rep.ByteSizeLong();
- Head.Serialize(pBuffer->Buffer);
- auto ptr = pBuffer->Buffer + MessageHead::Size();
- Rep.SerializeToArray(ptr, Head.Length);
- pBuffer->Length = MessageHead::Size() + Head.Length;
- Write(pBuffer);
- pBuffer->Release(__FILE__, __LINE__);
-
- }
- void CUserHandler::OnRobot(int8_t* Data, int16_t Size)
- {
- remote::CSRobot Req;
- Req.ParseFromArray(Data, Size);
- std::vector<Benchboard> ret;
- CUserManager::GetInstance().GetRobot(ret);
- remote::SCRobot Rep;
- for (auto& node : ret)
- {
- auto robot = Rep.add_robot();
- robot->set_name(node.name.c_str());
- robot->set_rid(node.uid);
- robot->set_type(node.type);
- robot->set_state(node.state);
-
-
-
- }
- CIOBuffer* pBuffer = CIOBuffer::Alloc(__FILE__, __LINE__);
- MessageHead Head;
- Head.Command = remote::sc_robot;
- Head.Length = Rep.ByteSizeLong();
- Head.Serialize(pBuffer->Buffer);
- auto ptr = pBuffer->Buffer + MessageHead::Size();
- Rep.SerializeToArray(ptr, Head.Length);
- pBuffer->Length = MessageHead::Size() + Head.Length;
- Write(pBuffer);
- pBuffer->Release(__FILE__, __LINE__);
- }
- void CUserHandler::OnRepVideo(int8_t* Data, int16_t Size)
- {
- std::cout << __FUNCTION__ << std::endl;
- remote::CSRep Req;
- Req.ParseFromArray(Data, Size);
- int32_t peer = Req.peer();
- if (Req.desc() == remote::VideoDesc::OK)
- _peer = peer;
- CUserManager::GetInstance().ReplyPeerVideo(peer, _uid,static_cast<remote::VideoDesc>(Req.desc()), Req.index());
-
-
- }
- void CUserHandler::OnReqVideo(int8_t* Data, int16_t Size)
- {
- std::cout << __FUNCTION__ << std::endl;
- remote::CSReq Req;
- Req.ParseFromArray(Data, Size);
- int32_t peer = Req.peer();
- int32_t index = Req.index();
- int32_t type = Req.egotype();
- //bool master=Req.master();
- if (type==EgoType::Car&&index==RenderPosition::FRONT)
- {
- if (_peer == _uid || peer == _peer) return;
- _peer = peer;
- }
-
-
- auto state = CUserManager::GetInstance().ConnectPeerVideo(peer, _uid,index);
-
-
- remote::SCReq Rep;
- Rep.set_peer(peer);
- Rep.set_desc(state);
- CIOBuffer* pBuffer = CIOBuffer::Alloc(__FILE__, __LINE__);
- MessageHead Head;
- Head.Command = remote::sc_req;
- Head.Length = Req.ByteSizeLong();
- Head.Serialize(pBuffer->Buffer);
- auto ptr = pBuffer->Buffer + MessageHead::Size();
- Req.SerializeToArray(ptr, Head.Length);
- pBuffer->Length = MessageHead::Size() + Head.Length;
- Write(pBuffer);
- pBuffer->Release(__FILE__, __LINE__);
- }
- void CUserHandler::OnLeave(int8_t* Data, int16_t Size)
- {
-
- remote::Leave Req;
- Req.ParseFromArray(Data, Size);
- auto type = Req.egotype();
- if (_egoType != EgoType::User||type==EgoType::User) return;
- int32_t peer = Req.peer();
- if (type == EgoType::Car)
- {
- if (peer != _peer) return;
- _peer = -1;
-
- }
-
- CUserManager::GetInstance().LeavePeerVideo(peer, _uid, _egoType);
- }
- void CUserHandler::Write(CIOBuffer* pBuffer)
- {
- int32_t offset=0;
- while(pBuffer->Length>0)
- {
- int32_t ret= send(_fd,&pBuffer->Buffer[offset],pBuffer->Length,0);
- if(ret<=0) return;
- offset+=ret;
- pBuffer->Length-=ret;
- }
-
- }
- EgoType CUserHandler::type()
- {
- return _egoType;
- }
- int32_t CUserHandler::uid()
- {
- return _uid;
- }
- std::string& CUserHandler::name()
- {
- return _name;
- }
- UserState CUserHandler::state()
- {
- return _state;
- }
- void CUserHandler::LeaveVideo(int32_t peer,EgoType type)
- {
- if (_peer != peer) return;
- _peer = -1;
- remote::Leave Rep;
- Rep.set_egotype(type);
- Rep.set_peer(peer);
- CIOBuffer* pBuffer = CIOBuffer::Alloc(__FILE__, __LINE__);
- MessageHead Head;
- Head.Command = remote::sc_NotifyLeave;
- Head.Length = Rep.ByteSizeLong();
- Head.Serialize(pBuffer->Buffer);
- auto ptr = pBuffer->Buffer + MessageHead::Size();
- Rep.SerializeToArray(ptr, Head.Length);
- pBuffer->Length = MessageHead::Size() + Head.Length;
- Write(pBuffer);
- pBuffer->Release(__FILE__, __LINE__);
- }
- remote::VideoDesc CUserHandler::ReqVideo(int32_t peer, int32_t index)
- {
-
- //if(index!=RenderPosition::FRONT_BACK)
- if (_peer!=-1&&_peer != peer) return remote::VideoDesc::Busy;
-
- remote::CSReq Rep;
- Rep.set_peer(peer);
-
- Rep.set_index(index);
- CIOBuffer* pBuffer = CIOBuffer::Alloc(__FILE__, __LINE__);
- MessageHead Head;
- Head.Command = remote::sc_NotifyReq;
- Head.Length = Rep.ByteSizeLong();
- Head.Serialize(pBuffer->Buffer);
- auto ptr = pBuffer->Buffer + MessageHead::Size();
- Rep.SerializeToArray(ptr, Head.Length);
- pBuffer->Length = MessageHead::Size() + Head.Length;
- Write(pBuffer);
- pBuffer->Release(__FILE__, __LINE__);
- return remote::VideoDesc::OK;
- }
- void CUserHandler::RepVideo(int32_t peer, int32_t index,remote::VideoDesc desc)
- {
- if (desc != remote::VideoDesc::OK)
- {
- _peer = -1;
- }
- remote::CSRep Rep;
- Rep.set_desc(desc);
- Rep.set_peer(peer);
-
- Rep.set_index(index);
- CIOBuffer* pBuffer = CIOBuffer::Alloc(__FILE__, __LINE__);
- MessageHead Head;
- Head.Command = remote::sc_NotifyRep;
- Head.Length = Rep.ByteSizeLong();
- Head.Serialize(pBuffer->Buffer);
- auto ptr = pBuffer->Buffer + MessageHead::Size();
- Rep.SerializeToArray(ptr, Head.Length);
- pBuffer->Length = MessageHead::Size() + Head.Length;
- Write(pBuffer);
- pBuffer->Release(__FILE__, __LINE__);
- }
- void CUserHandler::NotifyOffer(int32_t peer, int32_t index,const char * type,const char * sdp)
- {
- // std::cout << sdp << std::endl;
- remote::Offer Rep;
- Rep.set_peer(peer);
- Rep.set_index(index);
- Rep.set_type(type);
- Rep.set_sdp(sdp);
- CIOBuffer* pBuffer = CIOBuffer::Alloc(__FILE__, __LINE__);
- MessageHead Head;
- Head.Command = remote::sc_NotifyOffer;
- Head.Length = Rep.ByteSizeLong();
- Head.Serialize(pBuffer->Buffer);
- auto ptr = pBuffer->Buffer + MessageHead::Size();
- Rep.SerializeToArray(ptr, Head.Length);
- pBuffer->Length = MessageHead::Size() + Head.Length;
- Write(pBuffer);
- pBuffer->Release(__FILE__, __LINE__);
- }
- void CUserHandler::NotifyCandidate(int32_t peer, int32_t index, const char* type, const char* candidate, int32_t sdp_mline_index, const char* sdp_mid)
- {
- remote::Candidate Rep;
- Rep.set_peer(peer);
- Rep.set_index(index);
- Rep.set_type(type);
- Rep.set_sdpmid(sdp_mid);
- Rep.set_candidate(candidate);
- Rep.set_sdpmlineindex(sdp_mline_index);
- CIOBuffer* pBuffer = CIOBuffer::Alloc(__FILE__, __LINE__);
- MessageHead Head;
- Head.Command = remote::sc_NotifyCandidate;
- Head.Length = Rep.ByteSizeLong();
- Head.Serialize(pBuffer->Buffer);
- auto ptr = pBuffer->Buffer + MessageHead::Size();
- Rep.SerializeToArray(ptr, Head.Length);
- pBuffer->Length = MessageHead::Size() + Head.Length;
- Write(pBuffer);
- pBuffer->Release(__FILE__, __LINE__);
- }
- int32_t CUserHandler::Fd()
- {
- return _fd;
- }
- void CUserHandler::NotifyAnswer(int32_t peer, int32_t index, const char* type, const char* sdp)
- {
- //std::cout << sdp << std::endl;
- remote::Answer Rep;
- Rep.set_peer(peer);
- Rep.set_index(index);
- Rep.set_type(type);
- Rep.set_sdp(sdp);
- CIOBuffer* pBuffer = CIOBuffer::Alloc(__FILE__, __LINE__);
- MessageHead Head;
- Head.Command = remote::sc_NotifyAnswer;
- Head.Length = Rep.ByteSizeLong();
- Head.Serialize(pBuffer->Buffer);
- auto ptr = pBuffer->Buffer + MessageHead::Size();
- Rep.SerializeToArray(ptr, Head.Length);
- pBuffer->Length = MessageHead::Size() + Head.Length;
- Write(pBuffer);
- pBuffer->Release(__FILE__, __LINE__);
- }
- void CUserHandler::OnClose()
- {
- if (_egoType == EgoType::User)
- {
- remote::SCDelRobot rot;
- rot.set_peer(_uid);
- CIOBuffer* pBuffer = CIOBuffer::Alloc(__FILE__, __LINE__);
- MessageHead Head;
- Head.Command = remote::sc_NotifyDel;
- Head.Length = rot.ByteSizeLong();
- Head.Serialize(pBuffer->Buffer);
- auto ptr = pBuffer->Buffer + MessageHead::Size();
- rot.SerializeToArray(ptr, Head.Length);
- pBuffer->Length = MessageHead::Size() + Head.Length;
- CUserManager::GetInstance().BroadCast(pBuffer);
- pBuffer->Release(__FILE__, __LINE__);
-
- }
- if (_peer != -1)
- CUserManager::GetInstance().LeavePeerVideo(_peer, _uid, _egoType);
- printf("a socket closed %d", _uid);
-
- CUserManager::GetInstance().Remove(this);
- close(_fd);
- }
|