同步操作将从 伤神小怪兽/HP-Socket 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
/** Copyright: JessMA Open Source (ldcsaa@gmail.com)** Author : Bruce Liang* Website : https://github.com/ldcsaa* Project : https://github.com/ldcsaa/HP-Socket* Blog : http://www.cnblogs.com/ldcsaa* Wiki : http://www.oschina.net/p/hp-socket* QQ Group : 44636872, 75375912** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/#include "UdpServer.h"#ifdef _UDP_SUPPORTEnHandleResult CUdpServer::TriggerFireAccept(TUdpSocketObj* pSocketObj){EnHandleResult rs = TRIGGER(FireAccept(pSocketObj));return rs;}BOOL CUdpServer::Start(LPCTSTR lpszBindAddress, USHORT usPort){if(!CheckParams() || !CheckStarting())return FALSE;PrepareStart();if(CreateListenSocket(lpszBindAddress, usPort))if(CreateWorkerThreads())if(StartAccept()){m_enState = SS_STARTED;return TRUE;}EXECUTE_RESTORE_ERROR(Stop());return FALSE;}void CUdpServer::SetLastError(EnSocketError code, LPCSTR func, int ec){m_enLastError = code;::SetLastError(ec);}BOOL CUdpServer::CheckParams(){if ((m_enSendPolicy >= SP_PACK && m_enSendPolicy <= SP_DIRECT) &&(m_enOnSendSyncPolicy >= OSSP_NONE && m_enOnSendSyncPolicy <= OSSP_RECEIVE) &&((int)m_dwMaxConnectionCount > 0 && m_dwMaxConnectionCount <= MAX_CONNECTION_COUNT) &&((int)m_dwWorkerThreadCount > 0 && m_dwWorkerThreadCount <= MAX_WORKER_THREAD_COUNT) &&((int)m_dwFreeSocketObjLockTime >= 1000) &&((int)m_dwFreeSocketObjPool >= 0) &&((int)m_dwFreeBufferObjPool >= 0) &&((int)m_dwFreeSocketObjHold >= 0) &&((int)m_dwFreeBufferObjHold >= 0) &&((int)m_dwMaxDatagramSize > 0 && m_dwMaxDatagramSize <= MAXIMUM_UDP_MAX_DATAGRAM_SIZE) &&((int)m_dwPostReceiveCount > 0) &&((int)m_dwDetectAttempts >= 0) &&((int)m_dwDetectInterval >= 1000 || m_dwDetectInterval == 0) )return TRUE;SetLastError(SE_INVALID_PARAM, __FUNCTION__, ERROR_INVALID_PARAMETER);return FALSE;}void CUdpServer::PrepareStart(){m_bfActiveSockets.Reset(m_dwMaxConnectionCount);m_lsFreeSocket.Reset(m_dwFreeSocketObjPool);m_bfObjPool.SetItemCapacity(m_dwMaxDatagramSize);m_bfObjPool.SetPoolSize(m_dwFreeBufferObjPool);m_bfObjPool.SetPoolHold(m_dwFreeBufferObjHold);m_bfObjPool.Prepare();m_quSends = make_unique<CSendQueue[]>(m_dwWorkerThreadCount);m_rcBuffers = make_unique<CBufferPtr[]>(m_dwWorkerThreadCount);for_each(m_rcBuffers.get(), m_rcBuffers.get() + m_dwWorkerThreadCount, [this](CBufferPtr& buff) {buff.Malloc(m_dwMaxDatagramSize);});m_soListens = make_unique<SOCKET[]>(m_dwWorkerThreadCount);for_each(m_soListens.get(), m_soListens.get() + m_dwWorkerThreadCount, [](SOCKET& sock) {sock = INVALID_FD;});}BOOL CUdpServer::CheckStarting(){CSpinLock locallock(m_csState);if(m_enState == SS_STOPPED)m_enState = SS_STARTING;else{SetLastError(SE_ILLEGAL_STATE, __FUNCTION__, ERROR_INVALID_STATE);return FALSE;}return TRUE;}BOOL CUdpServer::CheckStoping(){if(m_enState != SS_STOPPED){CSpinLock locallock(m_csState);if(HasStarted()){m_enState = SS_STOPPING;return TRUE;}}SetLastError(SE_ILLEGAL_STATE, __FUNCTION__, ERROR_INVALID_STATE);return FALSE;}BOOL CUdpServer::CreateListenSocket(LPCTSTR lpszBindAddress, USHORT usPort){if(::IsStrEmpty(lpszBindAddress))lpszBindAddress = DEFAULT_IPV4_BIND_ADDRESS;HP_SOCKADDR addr;if(!::sockaddr_A_2_IN(lpszBindAddress, usPort, addr)){SetLastError(SE_SOCKET_CREATE, __FUNCTION__, ::WSAGetLastError());return FALSE;}for(DWORD i = 0; i < m_dwWorkerThreadCount; i++){m_soListens[i] = socket(addr.family, SOCK_DGRAM, IPPROTO_UDP);SOCKET soListen = m_soListens[i];if(IS_INVALID_FD(soListen)){SetLastError(SE_SOCKET_CREATE, __FUNCTION__, ::WSAGetLastError());return FALSE;}::fcntl_SETFL(soListen, O_NOATIME | O_NONBLOCK | O_CLOEXEC);VERIFY(IS_NO_ERROR(::SSO_ReuseAddress(soListen, m_enReusePolicy)));if(IS_HAS_ERROR(::bind(soListen, addr.Addr(), addr.AddrSize()))){SetLastError(SE_SOCKET_BIND, __FUNCTION__, ::WSAGetLastError());return FALSE;}if(TRIGGER(FirePrepareListen(soListen)) == HR_ERROR){SetLastError(SE_SOCKET_PREPARE, __FUNCTION__, ENSURE_ERROR_CANCELLED);return FALSE;}}return TRUE;}BOOL CUdpServer::CreateWorkerThreads(){return m_ioDispatcher.Start(this, m_dwPostReceiveCount, m_dwWorkerThreadCount);}BOOL CUdpServer::StartAccept(){for(int i = 0; i < (int)m_dwWorkerThreadCount; i++){SOCKET& soListen = m_soListens[i];if(!m_ioDispatcher.AddFD(i, soListen, EPOLLIN | EPOLLOUT | EPOLLET, TO_PVOID(&soListen)))return FALSE;}return TRUE;}BOOL CUdpServer::Stop(){if(!CheckStoping())return FALSE;SendCloseNotify();CloseListenSocket();DisconnectClientSocket();WaitForClientSocketClose();WaitForWorkerThreadEnd();ReleaseClientSocket();FireShutdown();ReleaseFreeSocket();ClearSendQueues();Reset();return TRUE;}void CUdpServer::SendCloseNotify(){if(!m_soListens)return;if(m_bfActiveSockets.Elements() == 0)return;TUdpSocketObjPtrPool::IndexSet indexes;m_bfActiveSockets.CopyIndexes(indexes);for(auto it = indexes.begin(), end = indexes.end(); it != end; ++it){CONNID connID = *it;TUdpSocketObj* pSocketObj = FindSocketObj(connID);if(TUdpSocketObj::IsValid(pSocketObj))::SendUdpCloseNotify(m_soListens[pSocketObj->index], pSocketObj->remoteAddr);}::WaitFor(30);}void CUdpServer::CloseListenSocket(){if(m_soListens){for_each(m_soListens.get(), m_soListens.get() + m_dwWorkerThreadCount, [](SOCKET& sock){if(sock != INVALID_FD){::ManualCloseSocket(sock);sock = INVALID_FD;}});::WaitFor(70);}}void CUdpServer::DisconnectClientSocket(){if(m_bfActiveSockets.Elements() == 0)return;TUdpSocketObjPtrPool::IndexSet indexes;m_bfActiveSockets.CopyIndexes(indexes);for(auto it = indexes.begin(), end = indexes.end(); it != end; ++it)Disconnect(*it);}void CUdpServer::WaitForClientSocketClose(){while(m_bfActiveSockets.Elements() > 0)::WaitFor(50);}void CUdpServer::WaitForWorkerThreadEnd(){m_ioDispatcher.Stop();}void CUdpServer::ReleaseClientSocket(){VERIFY(m_bfActiveSockets.IsEmpty());m_bfActiveSockets.Reset();CWriteLock locallock(m_csClientSocket);m_mpClientAddr.clear();}void CUdpServer::ReleaseFreeSocket(){m_lsFreeSocket.Clear();ReleaseGCSocketObj(TRUE);VERIFY(m_lsGCSocket.IsEmpty());}void CUdpServer::ClearSendQueues(){if(m_quSends) for_each(m_quSends.get(), m_quSends.get() + m_dwWorkerThreadCount, [](CSendQueue& queue) {queue.UnsafeClear();});}void CUdpServer::Reset(){m_phSocket.Reset();m_bfObjPool.Clear();m_soListens = nullptr;m_rcBuffers = nullptr;m_quSends = nullptr;m_enState = SS_STOPPED;m_evWait.SyncNotifyAll();}TUdpSocketObj* CUdpServer::GetFreeSocketObj(CONNID dwConnID){DWORD dwIndex;TUdpSocketObj* pSocketObj = nullptr;if(m_lsFreeSocket.TryLock(&pSocketObj, dwIndex)){if(::GetTimeGap32(pSocketObj->freeTime) >= m_dwFreeSocketObjLockTime)VERIFY(m_lsFreeSocket.ReleaseLock(nullptr, dwIndex));else{VERIFY(m_lsFreeSocket.ReleaseLock(pSocketObj, dwIndex));pSocketObj = nullptr;}}if(!pSocketObj) pSocketObj = CreateSocketObj();pSocketObj->Reset(dwConnID);return pSocketObj;}TUdpSocketObj* CUdpServer::CreateSocketObj(){return TUdpSocketObj::Construct(m_phSocket, m_bfObjPool);}void CUdpServer::DeleteSocketObj(TUdpSocketObj* pSocketObj){TUdpSocketObj::Destruct(pSocketObj);}void CUdpServer::AddFreeSocketObj(TUdpSocketObj* pSocketObj, EnSocketCloseFlag enFlag, EnSocketOperation enOperation, int iErrorCode, BOOL bNotify){if(!InvalidSocketObj(pSocketObj))return;CloseClientSocketObj(pSocketObj, enFlag, enOperation, iErrorCode, bNotify);{m_bfActiveSockets.Remove(pSocketObj->connID);CWriteLock locallock(m_csClientSocket);m_mpClientAddr.erase(&pSocketObj->remoteAddr);}m_ioDispatcher.DelTimer(pSocketObj->index, pSocketObj->fdTimer);TUdpSocketObj::Release(pSocketObj);ReleaseGCSocketObj();if(!m_lsFreeSocket.TryPut(pSocketObj))m_lsGCSocket.PushBack(pSocketObj);}void CUdpServer::ReleaseGCSocketObj(BOOL bForce){::ReleaseGCObj(m_lsGCSocket, m_dwFreeSocketObjLockTime, bForce);}BOOL CUdpServer::InvalidSocketObj(TUdpSocketObj* pSocketObj){return TUdpSocketObj::InvalidSocketObj(pSocketObj);}void CUdpServer::AddClientSocketObj(int idx, CONNID dwConnID, TUdpSocketObj* pSocketObj, const HP_SOCKADDR& remoteAddr){ASSERT(FindSocketObj(dwConnID) == nullptr);pSocketObj->index = idx;pSocketObj->pHolder = this;pSocketObj->connTime = ::TimeGetTime();pSocketObj->activeTime = pSocketObj->connTime;if(IsNeedDetectConnection())pSocketObj->fdTimer = m_ioDispatcher.AddTimer(pSocketObj->index, m_dwDetectInterval, pSocketObj);remoteAddr.Copy(pSocketObj->remoteAddr);pSocketObj->SetConnected();VERIFY(m_bfActiveSockets.ReleaseLock(dwConnID, pSocketObj));CWriteLock locallock(m_csClientSocket);m_mpClientAddr[&pSocketObj->remoteAddr] = dwConnID;}TUdpSocketObj* CUdpServer::FindSocketObj(CONNID dwConnID){TUdpSocketObj* pSocketObj = nullptr;if(m_bfActiveSockets.Get(dwConnID, &pSocketObj) != TUdpSocketObjPtrPool::GR_VALID)pSocketObj = nullptr;return pSocketObj;}CONNID CUdpServer::FindConnectionID(const HP_SOCKADDR* pAddr){CONNID dwConnID = 0;CReadLock locallock(m_csClientSocket);TSockAddrMapCI it = m_mpClientAddr.find(pAddr);if(it != m_mpClientAddr.end())dwConnID = it->second;return dwConnID;}void CUdpServer::CloseClientSocketObj(TUdpSocketObj* pSocketObj, EnSocketCloseFlag enFlag, EnSocketOperation enOperation, int iErrorCode, BOOL bNotify){ASSERT(TUdpSocketObj::IsExist(pSocketObj));SOCKET soListen = m_soListens[pSocketObj->index];if(bNotify && soListen != INVALID_SOCKET)::SendUdpCloseNotify(soListen, pSocketObj->remoteAddr);if(enFlag == SCF_CLOSE)FireClose(pSocketObj, SO_CLOSE, SE_OK);else if(enFlag == SCF_ERROR)FireClose(pSocketObj, enOperation, iErrorCode);}BOOL CUdpServer::GetListenAddress(TCHAR lpszAddress[], int& iAddressLen, USHORT& usPort){ASSERT(lpszAddress != nullptr && iAddressLen > 0);if(!HasStarted()){::SetLastError(ERROR_INVALID_STATE);return FALSE;}return ::GetSocketLocalAddress(m_soListens[0], lpszAddress, iAddressLen, usPort);}BOOL CUdpServer::GetLocalAddress(CONNID dwConnID, TCHAR lpszAddress[], int& iAddressLen, USHORT& usPort){ASSERT(lpszAddress != nullptr && iAddressLen > 0);TUdpSocketObj* pSocketObj = FindSocketObj(dwConnID);if(!TUdpSocketObj::IsValid(pSocketObj)){::SetLastError(ERROR_OBJECT_NOT_FOUND);return FALSE;}return ::GetSocketLocalAddress(m_soListens[pSocketObj->index], lpszAddress, iAddressLen, usPort);}BOOL CUdpServer::GetRemoteAddress(CONNID dwConnID, TCHAR lpszAddress[], int& iAddressLen, USHORT& usPort){ASSERT(lpszAddress != nullptr && iAddressLen > 0);TUdpSocketObj* pSocketObj = FindSocketObj(dwConnID);if(!TUdpSocketObj::IsExist(pSocketObj)){::SetLastError(ERROR_OBJECT_NOT_FOUND);return FALSE;}ADDRESS_FAMILY usFamily;return ::sockaddr_IN_2_A(pSocketObj->remoteAddr, usFamily, lpszAddress, iAddressLen, usPort);}BOOL CUdpServer::SetConnectionExtra(CONNID dwConnID, PVOID pExtra){TUdpSocketObj* pSocketObj = FindSocketObj(dwConnID);return SetConnectionExtra(pSocketObj, pExtra);}BOOL CUdpServer::SetConnectionExtra(TUdpSocketObj* pSocketObj, PVOID pExtra){if(!TUdpSocketObj::IsExist(pSocketObj))::SetLastError(ERROR_OBJECT_NOT_FOUND);else{pSocketObj->extra = pExtra;return TRUE;}return FALSE;}BOOL CUdpServer::GetConnectionExtra(CONNID dwConnID, PVOID* ppExtra){TUdpSocketObj* pSocketObj = FindSocketObj(dwConnID);return GetConnectionExtra(pSocketObj, ppExtra);}BOOL CUdpServer::GetConnectionExtra(TUdpSocketObj* pSocketObj, PVOID* ppExtra){ASSERT(ppExtra != nullptr);if(!TUdpSocketObj::IsExist(pSocketObj))::SetLastError(ERROR_OBJECT_NOT_FOUND);else{*ppExtra = pSocketObj->extra;return TRUE;}return FALSE;}BOOL CUdpServer::SetConnectionReserved(CONNID dwConnID, PVOID pReserved){TUdpSocketObj* pSocketObj = FindSocketObj(dwConnID);return SetConnectionReserved(pSocketObj, pReserved);}BOOL CUdpServer::SetConnectionReserved(TUdpSocketObj* pSocketObj, PVOID pReserved){if(!TUdpSocketObj::IsExist(pSocketObj))::SetLastError(ERROR_OBJECT_NOT_FOUND);else{pSocketObj->reserved = pReserved;return TRUE;}return FALSE;}BOOL CUdpServer::GetConnectionReserved(CONNID dwConnID, PVOID* ppReserved){TUdpSocketObj* pSocketObj = FindSocketObj(dwConnID);return GetConnectionReserved(pSocketObj, ppReserved);}BOOL CUdpServer::GetConnectionReserved(TUdpSocketObj* pSocketObj, PVOID* ppReserved){ASSERT(ppReserved != nullptr);if(!TUdpSocketObj::IsExist(pSocketObj))::SetLastError(ERROR_OBJECT_NOT_FOUND);else{*ppReserved = pSocketObj->reserved;return TRUE;}return FALSE;}BOOL CUdpServer::SetConnectionReserved2(CONNID dwConnID, PVOID pReserved2){TUdpSocketObj* pSocketObj = FindSocketObj(dwConnID);return SetConnectionReserved2(pSocketObj, pReserved2);}BOOL CUdpServer::SetConnectionReserved2(TUdpSocketObj* pSocketObj, PVOID pReserved2){if(!TUdpSocketObj::IsExist(pSocketObj))::SetLastError(ERROR_OBJECT_NOT_FOUND);else{pSocketObj->reserved2 = pReserved2;return TRUE;}return FALSE;}BOOL CUdpServer::GetConnectionReserved2(CONNID dwConnID, PVOID* ppReserved2){TUdpSocketObj* pSocketObj = FindSocketObj(dwConnID);return GetConnectionReserved2(pSocketObj, ppReserved2);}BOOL CUdpServer::GetConnectionReserved2(TUdpSocketObj* pSocketObj, PVOID* ppReserved2){ASSERT(ppReserved2 != nullptr);if(!TUdpSocketObj::IsExist(pSocketObj))::SetLastError(ERROR_OBJECT_NOT_FOUND);else{*ppReserved2 = pSocketObj->reserved2;return TRUE;}return FALSE;}BOOL CUdpServer::IsPauseReceive(CONNID dwConnID, BOOL& bPaused){::SetLastError(ERROR_CALL_NOT_IMPLEMENTED);bPaused = FALSE;return FALSE;}BOOL CUdpServer::IsConnected(CONNID dwConnID){TUdpSocketObj* pSocketObj = FindSocketObj(dwConnID);if(!TUdpSocketObj::IsValid(pSocketObj)){::SetLastError(ERROR_OBJECT_NOT_FOUND);return FALSE;}return pSocketObj->HasConnected();}BOOL CUdpServer::GetPendingDataLength(CONNID dwConnID, int& iPending){TUdpSocketObj* pSocketObj = FindSocketObj(dwConnID);if(!TUdpSocketObj::IsValid(pSocketObj))::SetLastError(ERROR_OBJECT_NOT_FOUND);else{iPending = pSocketObj->Pending();return TRUE;}return FALSE;}DWORD CUdpServer::GetConnectionCount(){return m_bfActiveSockets.Elements();}BOOL CUdpServer::GetAllConnectionIDs(CONNID pIDs[], DWORD& dwCount){return m_bfActiveSockets.GetAllElementIndexes(pIDs, dwCount);}BOOL CUdpServer::GetConnectPeriod(CONNID dwConnID, DWORD& dwPeriod){BOOL isOK = TRUE;TUdpSocketObj* pSocketObj = FindSocketObj(dwConnID);if(TUdpSocketObj::IsValid(pSocketObj))dwPeriod = ::GetTimeGap32(pSocketObj->connTime);else{::SetLastError(ERROR_OBJECT_NOT_FOUND);isOK = FALSE;}return isOK;}BOOL CUdpServer::GetSilencePeriod(CONNID dwConnID, DWORD& dwPeriod){if(!m_bMarkSilence)return FALSE;BOOL isOK = TRUE;TUdpSocketObj* pSocketObj = FindSocketObj(dwConnID);if(TUdpSocketObj::IsValid(pSocketObj))dwPeriod = ::GetTimeGap32(pSocketObj->activeTime);else{::SetLastError(ERROR_OBJECT_NOT_FOUND);isOK = FALSE;}return isOK;}BOOL CUdpServer::Disconnect(CONNID dwConnID, BOOL bForce){TUdpSocketObj* pSocketObj = FindSocketObj(dwConnID);if(!TUdpSocketObj::IsValid(pSocketObj)){::SetLastError(ERROR_OBJECT_NOT_FOUND);return FALSE;}return m_ioDispatcher.SendCommandByIndex(pSocketObj->index, DISP_CMD_DISCONNECT, dwConnID, bForce);}BOOL CUdpServer::DisconnectLongConnections(DWORD dwPeriod, BOOL bForce){if(dwPeriod > MAX_CONNECTION_PERIOD)return FALSE;if(m_bfActiveSockets.Elements() == 0)return TRUE;DWORD now = ::TimeGetTime();TUdpSocketObjPtrPool::IndexSet indexes;m_bfActiveSockets.CopyIndexes(indexes);for(auto it = indexes.begin(), end = indexes.end(); it != end; ++it){CONNID connID = *it;TUdpSocketObj* pSocketObj = FindSocketObj(connID);if(TUdpSocketObj::IsValid(pSocketObj) && (int)(now - pSocketObj->connTime) >= (int)dwPeriod)Disconnect(connID, bForce);}return TRUE;}BOOL CUdpServer::DisconnectSilenceConnections(DWORD dwPeriod, BOOL bForce){if(!m_bMarkSilence)return FALSE;if(dwPeriod > MAX_CONNECTION_PERIOD)return FALSE;if(m_bfActiveSockets.Elements() == 0)return TRUE;DWORD now = ::TimeGetTime();TUdpSocketObjPtrPool::IndexSet indexes;m_bfActiveSockets.CopyIndexes(indexes);for(auto it = indexes.begin(), end = indexes.end(); it != end; ++it){CONNID connID = *it;TUdpSocketObj* pSocketObj = FindSocketObj(connID);if(TUdpSocketObj::IsValid(pSocketObj) && (int)(now - pSocketObj->activeTime) >= (int)dwPeriod)Disconnect(connID, bForce);}return TRUE;}BOOL CUdpServer::PauseReceive(CONNID dwConnID, BOOL bPause){::SetLastError(ERROR_CALL_NOT_IMPLEMENTED);return FALSE;}BOOL CUdpServer::OnBeforeProcessIo(const TDispContext* pContext, PVOID pv, UINT events){if(pv == &m_soListens[pContext->GetIndex()])return TRUE;if(!(events & _EPOLL_ALL_ERROR_EVENTS))DetectConnection(pv);return FALSE;}VOID CUdpServer::OnAfterProcessIo(const TDispContext* pContext, PVOID pv, UINT events, BOOL rs){}VOID CUdpServer::OnCommand(const TDispContext* pContext, TDispCommand* pCmd){CONNID dwConnID = (CONNID)(pCmd->wParam);switch(pCmd->type){case DISP_CMD_SEND:HandleCmdSend(dwConnID, (int)(pCmd->lParam));break;case DISP_CMD_DISCONNECT:HandleCmdDisconnect(dwConnID, (BOOL)pCmd->lParam);break;case DISP_CMD_TIMEOUT:HandleCmdTimeout(dwConnID);break;}}VOID CUdpServer::HandleCmdDisconnect(CONNID dwConnID, BOOL bForce){AddFreeSocketObj(FindSocketObj(dwConnID), SCF_CLOSE);}VOID CUdpServer::HandleCmdTimeout(CONNID dwConnID){AddFreeSocketObj(FindSocketObj(dwConnID), SCF_CLOSE, SO_UNKNOWN, 0, FALSE);}BOOL CUdpServer::OnReadyRead(const TDispContext* pContext, PVOID pv, UINT events){return HandleReceive(pContext, RETRIVE_EVENT_FLAG_H(events));}BOOL CUdpServer::OnReadyWrite(const TDispContext* pContext, PVOID pv, UINT events){return HandleSend(pContext, RETRIVE_EVENT_FLAG_H(events));}BOOL CUdpServer::OnHungUp(const TDispContext* pContext, PVOID pv, UINT events){return HandleClose(nullptr, SO_CLOSE, 0);}BOOL CUdpServer::OnError(const TDispContext* pContext, PVOID pv, UINT events){return HandleClose(nullptr, SO_CLOSE, -1);}VOID CUdpServer::OnDispatchThreadStart(THR_ID tid){OnWorkerThreadStart(tid);}VOID CUdpServer::OnDispatchThreadEnd(THR_ID tid){OnWorkerThreadEnd(tid);}BOOL CUdpServer::HandleClose(TUdpSocketObj* pSocketObj, EnSocketOperation enOperation, int iErrorCode){if(!HasStarted())return FALSE;else if(pSocketObj == nullptr){TRACE("HandleClose() -> OP: %d, EC: %d", enOperation, iErrorCode);return TRUE;}if(iErrorCode == -1)iErrorCode = ::SSO_GetError(m_soListens[pSocketObj->index]);EnSocketCloseFlag enFlag = IS_NO_ERROR(iErrorCode) ? SCF_CLOSE : SCF_ERROR;AddFreeSocketObj(pSocketObj, enFlag, enOperation, iErrorCode);return FALSE;}BOOL CUdpServer::HandleReceive(const TDispContext* pContext, int flag){int idx = pContext->GetIndex();CBufferPtr& buffer = m_rcBuffers[idx];int iBufferLen = (int)buffer.Size();while(TRUE){HP_SOCKADDR addr;socklen_t dwAddrLen = (socklen_t)addr.AddrSize();int rc = (int)recvfrom(m_soListens[idx], buffer.Ptr(), iBufferLen, MSG_TRUNC, addr.Addr(), &dwAddrLen);if(rc >= 0){CONNID dwConnID = FindConnectionID(&addr);if(dwConnID == 0){if(rc > iBufferLen)continue;if((dwConnID = HandleAccept(pContext, addr)) == 0)continue;}TUdpSocketObj* pSocketObj = FindSocketObj(dwConnID);ASSERT(pSocketObj->index == idx);if(!TUdpSocketObj::IsValid(pSocketObj))continue;if(rc == 0){HandleZeroBytes(pSocketObj);continue;}if(rc > iBufferLen){AddFreeSocketObj(pSocketObj, SCF_ERROR, SO_RECEIVE, ERROR_BAD_LENGTH);continue;}if(::IsUdpCloseNotify(buffer.Ptr(), rc)){AddFreeSocketObj(pSocketObj, SCF_CLOSE, SO_CLOSE, SE_OK, FALSE);continue;}if(TRIGGER(FireReceive(pSocketObj, buffer.Ptr(), rc)) == HR_ERROR){TRACE("<S-CNNID: %zu> OnReceive() event return 'HR_ERROR', connection will be closed !", dwConnID);AddFreeSocketObj(pSocketObj, SCF_ERROR, SO_RECEIVE, ENSURE_ERROR_CANCELLED);;continue;}}else if(rc == SOCKET_ERROR){int code = ::WSAGetLastError();if(code == ERROR_WOULDBLOCK)break;else if(!HandleClose(nullptr, SO_RECEIVE, code))return FALSE;}else{ASSERT(FALSE);}}return TRUE;}CONNID CUdpServer::HandleAccept(const TDispContext* pContext, HP_SOCKADDR& addr){int idx = pContext->GetIndex();CONNID dwConnID = 0;TUdpSocketObj* pSocketObj = nullptr;if(!m_bfActiveSockets.AcquireLock(dwConnID)){::SendUdpCloseNotify(m_soListens[idx], addr);return 0;}pSocketObj = GetFreeSocketObj(dwConnID);AddClientSocketObj(idx, dwConnID, pSocketObj, addr);if(TriggerFireAccept(pSocketObj) == HR_ERROR){AddFreeSocketObj(pSocketObj);dwConnID = 0;}return dwConnID;}void CUdpServer::HandleZeroBytes(TUdpSocketObj* pSocketObj){TRACE("<S-CNNID: %zu> recv 0 bytes (detect package)", pSocketObj->connID);pSocketObj->detectFails = 0;#if defined(DEBUG_TRACE)int rc = (int)#endifsendto(m_soListens[pSocketObj->index], nullptr, 0, 0, pSocketObj->remoteAddr.Addr(), pSocketObj->remoteAddr.AddrSize());TRACE("<S-CNNID: %zu> send 0 bytes (detect ack package - %s)", pSocketObj->connID, IS_HAS_ERROR(rc) ? "fail" : "succ");}BOOL CUdpServer::HandleSend(const TDispContext* pContext, int flag){CSendQueue& quSend = m_quSends[pContext->GetIndex()];CONNID dwConnID = 0;while(quSend.PopFront(&dwConnID))HandleCmdSend(dwConnID, flag);return TRUE;}VOID CUdpServer::HandleCmdSend(CONNID dwConnID, int flag){TUdpSocketObj* pSocketObj = FindSocketObj(dwConnID);if(!TUdpSocketObj::IsValid(pSocketObj) || !pSocketObj->IsPending())return;BOOL bBlocked = FALSE;int writes = flag ? -1 : MAX_CONTINUE_WRITES;TBufferObjList& sndBuff = pSocketObj->sndBuff;TItemPtr itPtr(sndBuff);for(int i = 0; i < writes || writes < 0; i++){{CReentrantCriSecLock locallock(pSocketObj->csSend);itPtr = sndBuff.PopFront();}if(!itPtr.IsValid())break;ASSERT(!itPtr->IsEmpty());if(!SendItem(pSocketObj, itPtr, bBlocked))return;if(bBlocked){{CReentrantCriSecLock locallock(pSocketObj->csSend);sndBuff.PushFront(itPtr.Detach());}m_quSends[pSocketObj->index].PushBack(dwConnID);break;}}if(!bBlocked && pSocketObj->IsPending())VERIFY(m_ioDispatcher.SendCommandByIndex(pSocketObj->index, DISP_CMD_SEND, dwConnID));}BOOL CUdpServer::SendItem(TUdpSocketObj* pSocketObj, TItem* pItem, BOOL& bBlocked){int rc = (int)sendto(m_soListens[pSocketObj->index], pItem->Ptr(), pItem->Size(), 0, pSocketObj->remoteAddr.Addr(), pSocketObj->remoteAddr.AddrSize());if(rc > 0){ASSERT(rc == pItem->Size());if(TRIGGER(FireSend(pSocketObj, pItem->Ptr(), rc)) == HR_ERROR){TRACE("<S-CNNID: %zu> OnSend() event should not return 'HR_ERROR' !!", pSocketObj->connID);ASSERT(FALSE);}}else if(rc == SOCKET_ERROR){int code = ::WSAGetLastError();if(code == ERROR_WOULDBLOCK)bBlocked = TRUE;else if(!HandleClose(pSocketObj, SO_SEND, code))return FALSE;}elseASSERT(FALSE);return TRUE;}BOOL CUdpServer::Send(CONNID dwConnID, const BYTE* pBuffer, int iLength, int iOffset){ASSERT(pBuffer && iLength > 0 && iLength <= (int)m_dwMaxDatagramSize);TUdpSocketObj* pSocketObj = FindSocketObj(dwConnID);return DoSend(pSocketObj, pBuffer, iLength, iOffset);}BOOL CUdpServer::DoSend(TUdpSocketObj* pSocketObj, const BYTE* pBuffer, int iLength, int iOffset){int result = NO_ERROR;if(TUdpSocketObj::IsValid(pSocketObj)){if(pBuffer && iLength > 0 && iLength <= (int)m_dwMaxDatagramSize){if(iOffset != 0) pBuffer += iOffset;TItemPtr itPtr(m_bfObjPool, m_bfObjPool.PickFreeItem());itPtr->Cat(pBuffer, iLength);result = SendInternal(pSocketObj, itPtr);}elseresult = ERROR_INVALID_PARAMETER;}elseresult = ERROR_OBJECT_NOT_FOUND;if(result != NO_ERROR)::SetLastError(result);return (result == NO_ERROR);}BOOL CUdpServer::SendPackets(CONNID dwConnID, const WSABUF pBuffers[], int iCount){ASSERT(pBuffers && iCount > 0);if(!pBuffers || iCount <= 0)return ERROR_INVALID_PARAMETER;TUdpSocketObj* pSocketObj = FindSocketObj(dwConnID);if(!TUdpSocketObj::IsValid(pSocketObj)){::SetLastError(ERROR_OBJECT_NOT_FOUND);return FALSE;}int result = NO_ERROR;int iLength = 0;int iMaxLen = (int)m_dwMaxDatagramSize;TItemPtr itPtr(m_bfObjPool, m_bfObjPool.PickFreeItem());for(int i = 0; i < iCount; i++){int iBufLen = pBuffers[i].len;if(iBufLen > 0){BYTE* pBuffer = (BYTE*)pBuffers[i].buf;ASSERT(pBuffer);iLength += iBufLen;if(iLength <= iMaxLen)itPtr->Cat(pBuffer, iBufLen);elsebreak;}}if(iLength > 0 && iLength <= iMaxLen)result = SendInternal(pSocketObj, itPtr);elseresult = ERROR_INCORRECT_SIZE;if(result != NO_ERROR)::SetLastError(result);return (result == NO_ERROR);}int CUdpServer::SendInternal(TUdpSocketObj* pSocketObj, TItemPtr& itPtr){BOOL bPending;{CLocalSafeCounter localcounter(*pSocketObj);CReentrantCriSecLock locallock(pSocketObj->csSend);if(!TUdpSocketObj::IsValid(pSocketObj))return ERROR_OBJECT_NOT_FOUND;bPending = pSocketObj->IsPending();pSocketObj->sndBuff.PushBack(itPtr.Detach());ASSERT(pSocketObj->sndBuff.Length() > 0);}if(!bPending && pSocketObj->IsPending())VERIFY(m_ioDispatcher.SendCommandByIndex(pSocketObj->index, DISP_CMD_SEND, pSocketObj->connID));return NO_ERROR;}void CUdpServer::DetectConnection(PVOID pv){TUdpSocketObj* pSocketObj = (TUdpSocketObj*)pv;if(TUdpSocketObj::IsValid(pSocketObj)){CUdpServer* pServer = (CUdpServer*)pSocketObj->pHolder;if(pSocketObj->detectFails >= pServer->m_dwDetectAttempts)VERIFY(m_ioDispatcher.SendCommandByIndex(pSocketObj->index, DISP_CMD_TIMEOUT, pSocketObj->connID));else::InterlockedIncrement(&pSocketObj->detectFails);::ReadTimer(pSocketObj->fdTimer);}}#endif
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。