refactor receiving socket code in its own method

This commit is contained in:
Benjamin Sergeant 2020-01-09 12:00:34 -08:00
parent 1a47656ba0
commit 61bcc9d27d
2 changed files with 37 additions and 29 deletions

View File

@ -350,28 +350,9 @@ namespace ix
} }
else if (pollResult == PollResultType::ReadyForRead) else if (pollResult == PollResultType::ReadyForRead)
{ {
while (true) if (!receiveFromSocket())
{ {
ssize_t ret = _socket->recv((char*) &_readbuf[0], _readbuf.size()); return PollResult::AbnormalClose;
if (ret < 0 && Socket::isWaitNeeded())
{
break;
}
else if (ret <= 0)
{
// if there are received data pending to be processed, then delay the abnormal
// closure to after dispatch (other close code/reason could be read from the
// buffer)
closeSocket();
return PollResult::AbnormalClose;
}
else
{
_rxbuf.insert(_rxbuf.end(), _readbuf.begin(), _readbuf.begin() + ret);
}
} }
} }
else if (pollResult == PollResultType::Error) else if (pollResult == PollResultType::Error)
@ -1053,19 +1034,17 @@ namespace ix
wsheader_type::TEXT_FRAME, message, _enablePerMessageDeflate, onProgressCallback); wsheader_type::TEXT_FRAME, message, _enablePerMessageDeflate, onProgressCallback);
} }
ssize_t WebSocketTransport::send()
{
std::lock_guard<std::mutex> lock(_socketMutex);
return _socket->send((char*) &_txbuf[0], _txbuf.size());
}
bool WebSocketTransport::sendOnSocket() bool WebSocketTransport::sendOnSocket()
{ {
std::lock_guard<std::mutex> lock(_txbufMutex); std::lock_guard<std::mutex> lock(_txbufMutex);
while (_txbuf.size()) while (_txbuf.size())
{ {
ssize_t ret = send(); ssize_t ret = 0;
{
std::lock_guard<std::mutex> lock(_socketMutex);
ret = _socket->send((char*) &_txbuf[0], _txbuf.size());
}
if (ret < 0 && Socket::isWaitNeeded()) if (ret < 0 && Socket::isWaitNeeded())
{ {
@ -1086,6 +1065,34 @@ namespace ix
return true; return true;
} }
bool WebSocketTransport::receiveFromSocket()
{
while (true)
{
ssize_t ret = _socket->recv((char*) &_readbuf[0], _readbuf.size());
if (ret < 0 && Socket::isWaitNeeded())
{
break;
}
else if (ret <= 0)
{
// if there are received data pending to be processed, then delay the abnormal
// closure to after dispatch (other close code/reason could be read from the
// buffer)
closeSocket();
return false;
}
else
{
_rxbuf.insert(_rxbuf.end(), _readbuf.begin(), _readbuf.begin() + ret);
}
}
return true;
}
void WebSocketTransport::sendCloseFrame(uint16_t code, const std::string& reason) void WebSocketTransport::sendCloseFrame(uint16_t code, const std::string& reason)
{ {
bool compress = false; bool compress = false;

View File

@ -99,7 +99,6 @@ namespace ix
bool remote = false); bool remote = false);
void closeSocket(); void closeSocket();
ssize_t send();
ReadyState getReadyState() const; ReadyState getReadyState() const;
void setReadyState(ReadyState readyState); void setReadyState(ReadyState readyState);
@ -245,6 +244,8 @@ namespace ix
bool flushSendBuffer(); bool flushSendBuffer();
bool sendOnSocket(); bool sendOnSocket();
bool receiveFromSocket();
WebSocketSendInfo sendData(wsheader_type::opcode_type type, WebSocketSendInfo sendData(wsheader_type::opcode_type type,
const std::string& message, const std::string& message,
bool compress, bool compress,