This action will force synchronization from 伤神小怪兽/HP-Socket, which will overwrite any changes that you have made since you forked the repository, and can not be recovered!!!
Synchronous operation will process in the background and will refresh the page when finishing processing. Please be patient.
/** Copyright: JessMA Open Source (ldcsaa@gmail.com)** Author : Bruce Liang* Website : https://github.com/ldcsaa* Project : https://github.com/ldcsaa/HP-Socket* Blog : http://www.cnblogs.com/ldcsaa* Wiki : http://www.oschina.net/p/hp-socket* QQ Group : 44636872, 75375912** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/#include "TcpServer.h"#include "./common/FileHelper.h"BOOL CTcpServer::Start(LPCTSTR lpszBindAddress, USHORT usPort){if(!CheckParams() || !CheckStarting())return FALSE;PrepareStart();if(CreateListenSocket(lpszBindAddress, usPort))if(CreateWorkerThreads())if(StartAccept()){m_enState = SS_STARTED;return TRUE;}EXECUTE_RESTORE_ERROR(Stop());return FALSE;}void CTcpServer::SetLastError(EnSocketError code, LPCSTR func, int ec){m_enLastError = code;::SetLastError(ec);}BOOL CTcpServer::CheckParams(){if ((m_enSendPolicy >= SP_PACK && m_enSendPolicy <= SP_DIRECT) &&(m_enOnSendSyncPolicy >= OSSP_NONE && m_enOnSendSyncPolicy <= OSSP_RECEIVE) &&((int)m_dwMaxConnectionCount > 0 && m_dwMaxConnectionCount <= MAX_CONNECTION_COUNT) &&((int)m_dwWorkerThreadCount > 0 && m_dwWorkerThreadCount <= MAX_WORKER_THREAD_COUNT) &&((int)m_dwAcceptSocketCount > 0) &&((int)m_dwSocketBufferSize >= MIN_SOCKET_BUFFER_SIZE) &&((int)m_dwSocketListenQueue > 0) &&((int)m_dwFreeSocketObjLockTime >= 1000) &&((int)m_dwFreeSocketObjPool >= 0) &&((int)m_dwFreeBufferObjPool >= 0) &&((int)m_dwFreeSocketObjHold >= 0) &&((int)m_dwFreeBufferObjHold >= 0) &&((int)m_dwKeepAliveTime >= 1000 || m_dwKeepAliveTime == 0) &&((int)m_dwKeepAliveInterval >= 1000 || m_dwKeepAliveInterval == 0) )return TRUE;SetLastError(SE_INVALID_PARAM, __FUNCTION__, ERROR_INVALID_PARAMETER);return FALSE;}void CTcpServer::PrepareStart(){m_bfActiveSockets.Reset(m_dwMaxConnectionCount);m_lsFreeSocket.Reset(m_dwFreeSocketObjPool);m_bfObjPool.SetItemCapacity(m_dwSocketBufferSize);m_bfObjPool.SetPoolSize(m_dwFreeBufferObjPool);m_bfObjPool.SetPoolHold(m_dwFreeBufferObjHold);m_bfObjPool.Prepare();m_rcBuffers = make_unique<CBufferPtr[]>(m_dwWorkerThreadCount);for_each(m_rcBuffers.get(), m_rcBuffers.get() + m_dwWorkerThreadCount, [this](CBufferPtr& buff) {buff.Malloc(m_dwSocketBufferSize);});m_soListens = make_unique<SOCKET[]>(m_dwWorkerThreadCount);for_each(m_soListens.get(), m_soListens.get() + m_dwWorkerThreadCount, [](SOCKET& sock) {sock = INVALID_FD;});}BOOL CTcpServer::CheckStarting(){CSpinLock locallock(m_csState);if(m_enState == SS_STOPPED)m_enState = SS_STARTING;else{SetLastError(SE_ILLEGAL_STATE, __FUNCTION__, ERROR_INVALID_STATE);return FALSE;}return TRUE;}BOOL CTcpServer::CheckStoping(){if(m_enState != SS_STOPPED){CSpinLock locallock(m_csState);if(HasStarted()){m_enState = SS_STOPPING;return TRUE;}}SetLastError(SE_ILLEGAL_STATE, __FUNCTION__, ERROR_INVALID_STATE);return FALSE;}BOOL CTcpServer::CreateListenSocket(LPCTSTR lpszBindAddress, USHORT usPort){if(::IsStrEmpty(lpszBindAddress))lpszBindAddress = DEFAULT_IPV4_BIND_ADDRESS;HP_SOCKADDR addr;if(!::sockaddr_A_2_IN(lpszBindAddress, usPort, addr)){SetLastError(SE_SOCKET_CREATE, __FUNCTION__, ::WSAGetLastError());return FALSE;}for(DWORD i = 0; i < m_dwWorkerThreadCount; i++){m_soListens[i] = socket(addr.family, SOCK_STREAM, IPPROTO_TCP);SOCKET soListen = m_soListens[i];if(IS_INVALID_FD(soListen)){SetLastError(SE_SOCKET_CREATE, __FUNCTION__, ::WSAGetLastError());return FALSE;}::fcntl_SETFL(soListen, O_NOATIME | O_NONBLOCK | O_CLOEXEC);BOOL bOnOff = (m_dwKeepAliveTime > 0 && m_dwKeepAliveInterval > 0);VERIFY(IS_NO_ERROR(::SSO_KeepAliveVals(soListen, bOnOff, m_dwKeepAliveTime, m_dwKeepAliveInterval)));VERIFY(IS_NO_ERROR(::SSO_ReuseAddress(soListen, m_enReusePolicy)));VERIFY(IS_NO_ERROR(::SSO_NoDelay(soListen, m_bNoDelay)));if(IS_HAS_ERROR(::bind(soListen, addr.Addr(), addr.AddrSize()))){SetLastError(SE_SOCKET_BIND, __FUNCTION__, ::WSAGetLastError());return FALSE;}if(TRIGGER(FirePrepareListen(soListen)) == HR_ERROR){SetLastError(SE_SOCKET_PREPARE, __FUNCTION__, ENSURE_ERROR_CANCELLED);return FALSE;}if(IS_HAS_ERROR(::listen(soListen, m_dwSocketListenQueue))){SetLastError(SE_SOCKET_LISTEN, __FUNCTION__, ::WSAGetLastError());return FALSE;}}return TRUE;}BOOL CTcpServer::CreateWorkerThreads(){return m_ioDispatcher.Start(this, m_dwAcceptSocketCount, m_dwWorkerThreadCount);}BOOL CTcpServer::StartAccept(){for(int i = 0; i < (int)m_dwWorkerThreadCount; i++){SOCKET& soListen = m_soListens[i];if(!m_ioDispatcher.AddFD(i, soListen, EPOLLIN | EPOLLET, TO_PVOID(&soListen)))return FALSE;}return TRUE;}BOOL CTcpServer::Stop(){if(!CheckStoping())return FALSE;CloseListenSocket();DisconnectClientSocket();WaitForClientSocketClose();WaitForWorkerThreadEnd();ReleaseClientSocket();FireShutdown();ReleaseFreeSocket();Reset();return TRUE;}void CTcpServer::CloseListenSocket(){if(m_soListens){for_each(m_soListens.get(), m_soListens.get() + m_dwWorkerThreadCount, [](SOCKET& sock){if(sock != INVALID_FD){::ManualCloseSocket(sock);sock = INVALID_FD;}});::WaitFor(100);}}void CTcpServer::DisconnectClientSocket(){if(m_bfActiveSockets.Elements() == 0)return;TSocketObjPtrPool::IndexSet indexes;m_bfActiveSockets.CopyIndexes(indexes);for(auto it = indexes.begin(), end = indexes.end(); it != end; ++it)Disconnect(*it);}void CTcpServer::WaitForClientSocketClose(){while(m_bfActiveSockets.Elements() > 0)::WaitFor(50);}void CTcpServer::WaitForWorkerThreadEnd(){m_ioDispatcher.Stop();}void CTcpServer::ReleaseClientSocket(){VERIFY(m_bfActiveSockets.IsEmpty());m_bfActiveSockets.Reset();}void CTcpServer::ReleaseFreeSocket(){m_lsFreeSocket.Clear();ReleaseGCSocketObj(TRUE);VERIFY(m_lsGCSocket.IsEmpty());}void CTcpServer::Reset(){m_phSocket.Reset();m_bfObjPool.Clear();m_rcBuffers = nullptr;m_soListens = nullptr;m_enState = SS_STOPPED;m_evWait.SyncNotifyAll();}TSocketObj* CTcpServer::GetFreeSocketObj(CONNID dwConnID, SOCKET soClient){DWORD dwIndex;TSocketObj* pSocketObj = nullptr;if(m_lsFreeSocket.TryLock(&pSocketObj, dwIndex)){if(::GetTimeGap32(pSocketObj->freeTime) >= m_dwFreeSocketObjLockTime)VERIFY(m_lsFreeSocket.ReleaseLock(nullptr, dwIndex));else{VERIFY(m_lsFreeSocket.ReleaseLock(pSocketObj, dwIndex));pSocketObj = nullptr;}}if(!pSocketObj) pSocketObj = CreateSocketObj();pSocketObj->Reset(dwConnID, soClient);return pSocketObj;}TSocketObj* CTcpServer::CreateSocketObj(){return TSocketObj::Construct(m_phSocket, m_bfObjPool);}void CTcpServer::DeleteSocketObj(TSocketObj* pSocketObj){TSocketObj::Destruct(pSocketObj);}void CTcpServer::AddFreeSocketObj(TSocketObj* pSocketObj, EnSocketCloseFlag enFlag, EnSocketOperation enOperation, int iErrorCode){if(!InvalidSocketObj(pSocketObj))return;CloseClientSocketObj(pSocketObj, enFlag, enOperation, iErrorCode);m_bfActiveSockets.Remove(pSocketObj->connID);TSocketObj::Release(pSocketObj);ReleaseGCSocketObj();if(!m_lsFreeSocket.TryPut(pSocketObj))m_lsGCSocket.PushBack(pSocketObj);}void CTcpServer::ReleaseGCSocketObj(BOOL bForce){::ReleaseGCObj(m_lsGCSocket, m_dwFreeSocketObjLockTime, bForce);}BOOL CTcpServer::InvalidSocketObj(TSocketObj* pSocketObj){return TSocketObj::InvalidSocketObj(pSocketObj);}void CTcpServer::AddClientSocketObj(CONNID dwConnID, TSocketObj* pSocketObj, const HP_SOCKADDR& remoteAddr){ASSERT(FindSocketObj(dwConnID) == nullptr);pSocketObj->connTime = ::TimeGetTime();pSocketObj->activeTime = pSocketObj->connTime;remoteAddr.Copy(pSocketObj->remoteAddr);pSocketObj->SetConnected();VERIFY(m_bfActiveSockets.ReleaseLock(dwConnID, pSocketObj));}TSocketObj* CTcpServer::FindSocketObj(CONNID dwConnID){TSocketObj* pSocketObj = nullptr;if(m_bfActiveSockets.Get(dwConnID, &pSocketObj) != TSocketObjPtrPool::GR_VALID)pSocketObj = nullptr;return pSocketObj;}void CTcpServer::CloseClientSocketObj(TSocketObj* pSocketObj, EnSocketCloseFlag enFlag, EnSocketOperation enOperation, int iErrorCode, int iShutdownFlag){ASSERT(TSocketObj::IsExist(pSocketObj));if(enFlag == SCF_CLOSE)FireClose(pSocketObj, SO_CLOSE, SE_OK);else if(enFlag == SCF_ERROR)FireClose(pSocketObj, enOperation, iErrorCode);SOCKET socket = pSocketObj->socket;pSocketObj->socket = INVALID_SOCKET;::ManualCloseSocket(socket, iShutdownFlag);}BOOL CTcpServer::GetListenAddress(TCHAR lpszAddress[], int& iAddressLen, USHORT& usPort){ASSERT(lpszAddress != nullptr && iAddressLen > 0);if(!HasStarted()){::SetLastError(ERROR_INVALID_STATE);return FALSE;}return ::GetSocketLocalAddress(m_soListens[0], lpszAddress, iAddressLen, usPort);}BOOL CTcpServer::GetLocalAddress(CONNID dwConnID, TCHAR lpszAddress[], int& iAddressLen, USHORT& usPort){ASSERT(lpszAddress != nullptr && iAddressLen > 0);TSocketObj* pSocketObj = FindSocketObj(dwConnID);if(!TSocketObj::IsValid(pSocketObj)){::SetLastError(ERROR_OBJECT_NOT_FOUND);return FALSE;}return ::GetSocketLocalAddress(pSocketObj->socket, lpszAddress, iAddressLen, usPort);}BOOL CTcpServer::GetRemoteAddress(CONNID dwConnID, TCHAR lpszAddress[], int& iAddressLen, USHORT& usPort){ASSERT(lpszAddress != nullptr && iAddressLen > 0);TSocketObj* pSocketObj = FindSocketObj(dwConnID);if(!TSocketObj::IsExist(pSocketObj)){::SetLastError(ERROR_OBJECT_NOT_FOUND);return FALSE;}ADDRESS_FAMILY usFamily;return ::sockaddr_IN_2_A(pSocketObj->remoteAddr, usFamily, lpszAddress, iAddressLen, usPort);}BOOL CTcpServer::SetConnectionExtra(CONNID dwConnID, PVOID pExtra){TSocketObj* pSocketObj = FindSocketObj(dwConnID);return SetConnectionExtra(pSocketObj, pExtra);}BOOL CTcpServer::SetConnectionExtra(TSocketObj* pSocketObj, PVOID pExtra){if(!TSocketObj::IsExist(pSocketObj))::SetLastError(ERROR_OBJECT_NOT_FOUND);else{pSocketObj->extra = pExtra;return TRUE;}return FALSE;}BOOL CTcpServer::GetConnectionExtra(CONNID dwConnID, PVOID* ppExtra){TSocketObj* pSocketObj = FindSocketObj(dwConnID);return GetConnectionExtra(pSocketObj, ppExtra);}BOOL CTcpServer::GetConnectionExtra(TSocketObj* pSocketObj, PVOID* ppExtra){ASSERT(ppExtra != nullptr);if(!TSocketObj::IsExist(pSocketObj))::SetLastError(ERROR_OBJECT_NOT_FOUND);else{*ppExtra = pSocketObj->extra;return TRUE;}return FALSE;}BOOL CTcpServer::SetConnectionReserved(CONNID dwConnID, PVOID pReserved){TSocketObj* pSocketObj = FindSocketObj(dwConnID);return SetConnectionReserved(pSocketObj, pReserved);}BOOL CTcpServer::SetConnectionReserved(TSocketObj* pSocketObj, PVOID pReserved){if(!TSocketObj::IsExist(pSocketObj))::SetLastError(ERROR_OBJECT_NOT_FOUND);else{pSocketObj->reserved = pReserved;return TRUE;}return FALSE;}BOOL CTcpServer::GetConnectionReserved(CONNID dwConnID, PVOID* ppReserved){TSocketObj* pSocketObj = FindSocketObj(dwConnID);return GetConnectionReserved(pSocketObj, ppReserved);}BOOL CTcpServer::GetConnectionReserved(TSocketObj* pSocketObj, PVOID* ppReserved){ASSERT(ppReserved != nullptr);if(!TSocketObj::IsExist(pSocketObj))::SetLastError(ERROR_OBJECT_NOT_FOUND);else{*ppReserved = pSocketObj->reserved;return TRUE;}return FALSE;}BOOL CTcpServer::SetConnectionReserved2(CONNID dwConnID, PVOID pReserved2){TSocketObj* pSocketObj = FindSocketObj(dwConnID);return SetConnectionReserved2(pSocketObj, pReserved2);}BOOL CTcpServer::SetConnectionReserved2(TSocketObj* pSocketObj, PVOID pReserved2){if(!TSocketObj::IsExist(pSocketObj))::SetLastError(ERROR_OBJECT_NOT_FOUND);else{pSocketObj->reserved2 = pReserved2;return TRUE;}return FALSE;}BOOL CTcpServer::GetConnectionReserved2(CONNID dwConnID, PVOID* ppReserved2){TSocketObj* pSocketObj = FindSocketObj(dwConnID);return GetConnectionReserved2(pSocketObj, ppReserved2);}BOOL CTcpServer::GetConnectionReserved2(TSocketObj* pSocketObj, PVOID* ppReserved2){ASSERT(ppReserved2 != nullptr);if(!TSocketObj::IsExist(pSocketObj))::SetLastError(ERROR_OBJECT_NOT_FOUND);else{*ppReserved2 = pSocketObj->reserved2;return TRUE;}return FALSE;}BOOL CTcpServer::IsPauseReceive(CONNID dwConnID, BOOL& bPaused){TSocketObj* pSocketObj = FindSocketObj(dwConnID);if(!TSocketObj::IsValid(pSocketObj))::SetLastError(ERROR_OBJECT_NOT_FOUND);else{bPaused = pSocketObj->paused;return TRUE;}return FALSE;}BOOL CTcpServer::IsConnected(CONNID dwConnID){TSocketObj* pSocketObj = FindSocketObj(dwConnID);if(!TSocketObj::IsValid(pSocketObj)){::SetLastError(ERROR_OBJECT_NOT_FOUND);return FALSE;}return pSocketObj->HasConnected();}BOOL CTcpServer::GetPendingDataLength(CONNID dwConnID, int& iPending){TSocketObj* pSocketObj = FindSocketObj(dwConnID);if(!TSocketObj::IsValid(pSocketObj))::SetLastError(ERROR_OBJECT_NOT_FOUND);else{iPending = pSocketObj->Pending();return TRUE;}return FALSE;}DWORD CTcpServer::GetConnectionCount(){return m_bfActiveSockets.Elements();}BOOL CTcpServer::GetAllConnectionIDs(CONNID pIDs[], DWORD& dwCount){return m_bfActiveSockets.GetAllElementIndexes(pIDs, dwCount);}BOOL CTcpServer::GetConnectPeriod(CONNID dwConnID, DWORD& dwPeriod){BOOL isOK = TRUE;TSocketObj* pSocketObj = FindSocketObj(dwConnID);if(TSocketObj::IsValid(pSocketObj))dwPeriod = ::GetTimeGap32(pSocketObj->connTime);else{::SetLastError(ERROR_OBJECT_NOT_FOUND);isOK = FALSE;}return isOK;}BOOL CTcpServer::GetSilencePeriod(CONNID dwConnID, DWORD& dwPeriod){if(!m_bMarkSilence)return FALSE;BOOL isOK = TRUE;TSocketObj* pSocketObj = FindSocketObj(dwConnID);if(TSocketObj::IsValid(pSocketObj))dwPeriod = ::GetTimeGap32(pSocketObj->activeTime);else{::SetLastError(ERROR_OBJECT_NOT_FOUND);isOK = FALSE;}return isOK;}BOOL CTcpServer::Disconnect(CONNID dwConnID, BOOL bForce){TSocketObj* pSocketObj = FindSocketObj(dwConnID);if(!TSocketObj::IsValid(pSocketObj)){::SetLastError(ERROR_OBJECT_NOT_FOUND);return FALSE;}return m_ioDispatcher.SendCommandByFD(pSocketObj->socket, DISP_CMD_DISCONNECT, dwConnID, bForce);}BOOL CTcpServer::DisconnectLongConnections(DWORD dwPeriod, BOOL bForce){if(dwPeriod > MAX_CONNECTION_PERIOD)return FALSE;if(m_bfActiveSockets.Elements() == 0)return TRUE;DWORD now = ::TimeGetTime();TSocketObjPtrPool::IndexSet indexes;m_bfActiveSockets.CopyIndexes(indexes);for(auto it = indexes.begin(), end = indexes.end(); it != end; ++it){CONNID connID = *it;TSocketObj* pSocketObj = FindSocketObj(connID);if(TSocketObj::IsValid(pSocketObj) && (int)(now - pSocketObj->connTime) >= (int)dwPeriod)Disconnect(connID, bForce);}return TRUE;}BOOL CTcpServer::DisconnectSilenceConnections(DWORD dwPeriod, BOOL bForce){if(!m_bMarkSilence)return FALSE;if(dwPeriod > MAX_CONNECTION_PERIOD)return FALSE;if(m_bfActiveSockets.Elements() == 0)return TRUE;DWORD now = ::TimeGetTime();TSocketObjPtrPool::IndexSet indexes;m_bfActiveSockets.CopyIndexes(indexes);for(auto it = indexes.begin(), end = indexes.end(); it != end; ++it){CONNID connID = *it;TSocketObj* pSocketObj = FindSocketObj(connID);if(TSocketObj::IsValid(pSocketObj) && (int)(now - pSocketObj->activeTime) >= (int)dwPeriod)Disconnect(connID, bForce);}return TRUE;}BOOL CTcpServer::PauseReceive(CONNID dwConnID, BOOL bPause){TSocketObj* pSocketObj = FindSocketObj(dwConnID);if(!TSocketObj::IsValid(pSocketObj)){::SetLastError(ERROR_OBJECT_NOT_FOUND);return FALSE;}if(pSocketObj->paused == bPause)return TRUE;pSocketObj->paused = bPause;if(!bPause)return m_ioDispatcher.SendCommandByFD(pSocketObj->socket, DISP_CMD_UNPAUSE, pSocketObj->connID);return TRUE;}BOOL CTcpServer::OnBeforeProcessIo(const TDispContext* pContext, PVOID pv, UINT events){if(pv == &m_soListens[pContext->GetIndex()]){HandleAccept(pContext, events);return FALSE;}TSocketObj* pSocketObj = (TSocketObj*)(pv);if(!TSocketObj::IsValid(pSocketObj))return FALSE;if(events & _EPOLL_ALL_ERROR_EVENTS)pSocketObj->SetConnected(FALSE);pSocketObj->Increment();if(!TSocketObj::IsValid(pSocketObj)){pSocketObj->Decrement();return FALSE;}return TRUE;}VOID CTcpServer::OnAfterProcessIo(const TDispContext* pContext, PVOID pv, UINT events, BOOL rs){TSocketObj* pSocketObj = (TSocketObj*)(pv);if(TSocketObj::IsValid(pSocketObj)){ASSERT(rs && !(events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)));UINT evts = (pSocketObj->IsPending() ? EPOLLOUT : 0) | (pSocketObj->IsPaused() ? 0 : EPOLLIN);m_ioDispatcher.ModFD(pSocketObj->socket, evts | EPOLLRDHUP, pSocketObj);}pSocketObj->Decrement();}VOID CTcpServer::OnCommand(const TDispContext* pContext, TDispCommand* pCmd){switch(pCmd->type){case DISP_CMD_SEND:HandleCmdSend(pContext, (CONNID)(pCmd->wParam));break;case DISP_CMD_UNPAUSE:HandleCmdUnpause(pContext, (CONNID)(pCmd->wParam));break;case DISP_CMD_DISCONNECT:HandleCmdDisconnect(pContext, (CONNID)(pCmd->wParam), (BOOL)pCmd->lParam);break;}}VOID CTcpServer::HandleCmdSend(const TDispContext* pContext, CONNID dwConnID){TSocketObj* pSocketObj = FindSocketObj(dwConnID);if(TSocketObj::IsValid(pSocketObj) && pSocketObj->IsPending())m_ioDispatcher.ProcessIo(pContext, pSocketObj, EPOLLOUT);}VOID CTcpServer::HandleCmdUnpause(const TDispContext* pContext, CONNID dwConnID){TSocketObj* pSocketObj = FindSocketObj(dwConnID);if(!TSocketObj::IsValid(pSocketObj))return;if(BeforeUnpause(pSocketObj))m_ioDispatcher.ProcessIo(pContext, pSocketObj, EPOLLIN);elseAddFreeSocketObj(pSocketObj, SCF_ERROR, SO_RECEIVE, ENSURE_ERROR_CANCELLED);}VOID CTcpServer::HandleCmdDisconnect(const TDispContext* pContext, CONNID dwConnID, BOOL bForce){TSocketObj* pSocketObj = FindSocketObj(dwConnID);if(TSocketObj::IsValid(pSocketObj))m_ioDispatcher.ProcessIo(pContext, pSocketObj, EPOLLHUP);}BOOL CTcpServer::OnReadyRead(const TDispContext* pContext, PVOID pv, UINT events){return HandleReceive(pContext, (TSocketObj*)pv, RETRIVE_EVENT_FLAG_H(events));}BOOL CTcpServer::OnReadyWrite(const TDispContext* pContext, PVOID pv, UINT events){return HandleSend(pContext, (TSocketObj*)pv, RETRIVE_EVENT_FLAG_H(events));}BOOL CTcpServer::OnHungUp(const TDispContext* pContext, PVOID pv, UINT events){return HandleClose(pContext, (TSocketObj*)pv, SCF_CLOSE, events);}BOOL CTcpServer::OnError(const TDispContext* pContext, PVOID pv, UINT events){return HandleClose(pContext, (TSocketObj*)pv, SCF_ERROR, events);}VOID CTcpServer::OnDispatchThreadStart(THR_ID tid){OnWorkerThreadStart(tid);}VOID CTcpServer::OnDispatchThreadEnd(THR_ID tid){OnWorkerThreadEnd(tid);}BOOL CTcpServer::HandleClose(const TDispContext* pContext, TSocketObj* pSocketObj, EnSocketCloseFlag enFlag, UINT events){EnSocketOperation enOperation = SO_CLOSE;if(events & _EPOLL_HUNGUP_EVENTS)enOperation = SO_CLOSE;else if(events & EPOLLIN)enOperation = SO_RECEIVE;else if(events & EPOLLOUT)enOperation = SO_SEND;int iErrorCode = 0;if(enFlag == SCF_ERROR)iErrorCode = ::SSO_GetError(pSocketObj->socket);AddFreeSocketObj(pSocketObj, enFlag, enOperation, iErrorCode);return TRUE;}BOOL CTcpServer::HandleAccept(const TDispContext* pContext, UINT events){if(events & _EPOLL_ALL_ERROR_EVENTS){ASSERT(!HasStarted());return FALSE;}while(TRUE){HP_SOCKADDR addr;socklen_t addrLen = (socklen_t)addr.AddrSize();SOCKET soClient = ::accept(m_soListens[pContext->GetIndex()], addr.Addr(), &addrLen);if(soClient == INVALID_SOCKET){int code = ::WSAGetLastError();switch(code){case ERROR_WOULDBLOCK : return TRUE;case ERROR_CONNABORTED : continue;case ERROR_HANDLES_CLOSED : return FALSE;default : ERROR_EXIT2(EXIT_CODE_SOFTWARE, code);}}if(!::fcntl_SETFL(soClient, O_NOATIME | O_NONBLOCK | O_CLOEXEC)){::ManualCloseSocket(soClient, SHUT_RDWR);continue;}CONNID dwConnID = 0;if(!m_bfActiveSockets.AcquireLock(dwConnID)){::ManualCloseSocket(soClient, SHUT_RDWR);continue;}TSocketObj* pSocketObj = GetFreeSocketObj(dwConnID, soClient);AddClientSocketObj(dwConnID, pSocketObj, addr);if(TRIGGER(FireAccept(pSocketObj)) == HR_ERROR){AddFreeSocketObj(pSocketObj, SCF_NONE);continue;}UINT evts = (pSocketObj->IsPending() ? EPOLLOUT : 0) | (pSocketObj->IsPaused() ? 0 : EPOLLIN);if(!m_ioDispatcher.AddFD(pSocketObj->socket, evts | EPOLLRDHUP, pSocketObj)){AddFreeSocketObj(pSocketObj, SCF_ERROR, SO_ACCEPT, ::WSAGetLastError());continue;}}return TRUE;}BOOL CTcpServer::HandleReceive(const TDispContext* pContext, TSocketObj* pSocketObj, int flag){ASSERT(TSocketObj::IsValid(pSocketObj));if(m_bMarkSilence) pSocketObj->activeTime = ::TimeGetTime();CBufferPtr& buffer = m_rcBuffers[pContext->GetIndex()];int reads = flag ? -1 : MAX_CONTINUE_READS;for(int i = 0; i < reads || reads < 0; i++){if(pSocketObj->paused)break;int rc = (int)read(pSocketObj->socket, buffer.Ptr(), buffer.Size());if(rc > 0){if(TRIGGER(FireReceive(pSocketObj, buffer.Ptr(), rc)) == HR_ERROR){TRACE("<S-CNNID: %zu> OnReceive() event return 'HR_ERROR', connection will be closed !", pSocketObj->connID);AddFreeSocketObj(pSocketObj, SCF_ERROR, SO_RECEIVE, ENSURE_ERROR_CANCELLED);return FALSE;}}else if(rc == 0){AddFreeSocketObj(pSocketObj, SCF_CLOSE, SO_RECEIVE, SE_OK);return FALSE;}else{ASSERT(rc == SOCKET_ERROR);int code = ::WSAGetLastError();if(code == ERROR_WOULDBLOCK)break;AddFreeSocketObj(pSocketObj, SCF_ERROR, SO_RECEIVE, code);return FALSE;}}return TRUE;}BOOL CTcpServer::HandleSend(const TDispContext* pContext, TSocketObj* pSocketObj, int flag){ASSERT(TSocketObj::IsValid(pSocketObj));if(!pSocketObj->IsPending())return TRUE;BOOL bBlocked = FALSE;int writes = flag ? -1 : MAX_CONTINUE_WRITES;TBufferObjList& sndBuff = pSocketObj->sndBuff;TItemPtr itPtr(sndBuff);for(int i = 0; i < writes || writes < 0; i++){{CReentrantCriSecLock locallock(pSocketObj->csSend);itPtr = sndBuff.PopFront();}if(!itPtr.IsValid())break;ASSERT(!itPtr->IsEmpty());if(!SendItem(pSocketObj, itPtr, bBlocked))return FALSE;if(bBlocked){ASSERT(!itPtr->IsEmpty());CReentrantCriSecLock locallock(pSocketObj->csSend);sndBuff.PushFront(itPtr.Detach());break;}}return TRUE;}BOOL CTcpServer::SendItem(TSocketObj* pSocketObj, TItem* pItem, BOOL& bBlocked){while(!pItem->IsEmpty()){int rc = (int)write(pSocketObj->socket, pItem->Ptr(), pItem->Size());if(rc > 0){if(TRIGGER(FireSend(pSocketObj, pItem->Ptr(), rc)) == HR_ERROR){TRACE("<S-CNNID: %zu> OnSend() event should not return 'HR_ERROR' !!", pSocketObj->connID);ASSERT(FALSE);}pItem->Reduce(rc);}else if(rc == SOCKET_ERROR){int code = ::WSAGetLastError();if(code == ERROR_WOULDBLOCK){bBlocked = TRUE;break;}else{AddFreeSocketObj(pSocketObj, SCF_ERROR, SO_SEND, code);return FALSE;}}elseASSERT(FALSE);}return TRUE;}BOOL CTcpServer::Send(CONNID dwConnID, const BYTE* pBuffer, int iLength, int iOffset){ASSERT(pBuffer && iLength > 0);if(iOffset != 0) pBuffer += iOffset;WSABUF buffer;buffer.len = iLength;buffer.buf = (BYTE*)pBuffer;return SendPackets(dwConnID, &buffer, 1);}BOOL CTcpServer::DoSendPackets(CONNID dwConnID, const WSABUF pBuffers[], int iCount){ASSERT(pBuffers && iCount > 0);TSocketObj* pSocketObj = FindSocketObj(dwConnID);if(!TSocketObj::IsValid(pSocketObj)){::SetLastError(ERROR_OBJECT_NOT_FOUND);return FALSE;}return DoSendPackets(pSocketObj, pBuffers, iCount);}BOOL CTcpServer::DoSendPackets(TSocketObj* pSocketObj, const WSABUF pBuffers[], int iCount){ASSERT(pSocketObj && pBuffers && iCount > 0);int result = NO_ERROR;if(pBuffers && iCount > 0){CLocalSafeCounter localcounter(*pSocketObj);CReentrantCriSecLock locallock(pSocketObj->csSend);if(TSocketObj::IsValid(pSocketObj))result = SendInternal(pSocketObj, pBuffers, iCount);elseresult = ERROR_OBJECT_NOT_FOUND;}elseresult = ERROR_INVALID_PARAMETER;if(result != NO_ERROR)::SetLastError(result);return (result == NO_ERROR);}int CTcpServer::SendInternal(TSocketObj* pSocketObj, const WSABUF pBuffers[], int iCount){BOOL bPending = pSocketObj->IsPending();for(int i = 0; i < iCount; i++){int iBufLen = pBuffers[i].len;if(iBufLen > 0){BYTE* pBuffer = (BYTE*)pBuffers[i].buf;ASSERT(pBuffer);pSocketObj->sndBuff.Cat(pBuffer, iBufLen);ASSERT(pSocketObj->sndBuff.Length() > 0);}}if(!bPending && pSocketObj->IsPending()){if(!m_ioDispatcher.SendCommandByFD(pSocketObj->socket, DISP_CMD_SEND, pSocketObj->connID))return ::GetLastError();}return NO_ERROR;}BOOL CTcpServer::SendSmallFile(CONNID dwConnID, LPCTSTR lpszFileName, const LPWSABUF pHead, const LPWSABUF pTail){CFile file;CFileMapping fmap;WSABUF szBuf[3];HRESULT hr = ::MakeSmallFilePackage(lpszFileName, file, fmap, szBuf, pHead, pTail);if(FAILED(hr)){::SetLastError(hr);return FALSE;}return SendPackets(dwConnID, szBuf, 3);}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。