Socket::Poll does not need a callback

This commit is contained in:
Benjamin Sergeant 2019-04-18 16:42:44 -07:00
parent 309b5ee1b3
commit 91106b7456
3 changed files with 87 additions and 77 deletions

View File

@ -45,17 +45,14 @@ namespace ix
close(); close();
} }
void Socket::poll(const OnPollCallback& onPollCallback, int timeoutSecs) PollResultType Socket::poll(int timeoutSecs)
{ {
if (_sockfd == -1) if (_sockfd == -1)
{ {
if (onPollCallback) onPollCallback(PollResultType::Error); return PollResultType::Error;
return;
} }
PollResultType pollResult = isReadyToRead(1000 * timeoutSecs); return isReadyToRead(1000 * timeoutSecs);
if (onPollCallback) onPollCallback(pollResult);
} }
PollResultType Socket::select(bool readyToRead, int timeoutMs) PollResultType Socket::select(bool readyToRead, int timeoutMs)

View File

@ -37,8 +37,6 @@ namespace ix
class Socket { class Socket {
public: public:
using OnPollCallback = std::function<void(PollResultType)>;
Socket(int fd = -1); Socket(int fd = -1);
virtual ~Socket(); virtual ~Socket();
bool init(std::string& errorMsg); bool init(std::string& errorMsg);
@ -46,8 +44,7 @@ namespace ix
void configure(); void configure();
// Functions to check whether there is activity on the socket // Functions to check whether there is activity on the socket
void poll(const OnPollCallback& onPollCallback, PollResultType poll(int timeoutSecs = kDefaultPollTimeout);
int timeoutSecs = kDefaultPollTimeout);
bool wakeUpFromPoll(uint8_t wakeUpCode); bool wakeUpFromPoll(uint8_t wakeUpCode);
PollResultType isReadyToWrite(int timeoutMs); PollResultType isReadyToWrite(int timeoutMs);

View File

@ -232,9 +232,15 @@ namespace ix
void WebSocketTransport::poll() void WebSocketTransport::poll()
{ {
_socket->poll( PollResultType pollResult = _socket->poll(_pingIntervalOrTimeoutGCDSecs);
[this](PollResultType pollResult)
if (_readyState == OPEN)
{
// if (1) ping timeout is enabled and (2) duration since last received ping response (PONG)
// exceeds the maximum delay, then close the connection
if (pingTimeoutExceeded())
{ {
<<<<<<< HEAD
if (_readyState == OPEN) if (_readyState == OPEN)
{ {
// if (1) ping timeout is enabled and (2) duration since last received ping response (PONG) // if (1) ping timeout is enabled and (2) duration since last received ping response (PONG)
@ -252,78 +258,88 @@ namespace ix
sendPing(ss.str()); sendPing(ss.str());
} }
} }
=======
close(1011, "Ping timeout");
}
// If (1) ping is enabled and no ping has been sent for a duration
// exceeding our ping interval, send a ping to the server.
else if (pingIntervalExceeded())
{
std::stringstream ss;
ss << kPingMessage << "::" << _pingIntervalSecs << "s";
sendPing(ss.str());
}
}
// Make sure we send all the buffered data // Make sure we send all the buffered data
// there can be a lot of it for large messages. // there can be a lot of it for large messages.
if (pollResult == PollResultType::SendRequest) if (pollResult == PollResultType::SendRequest)
{
while (!isSendBufferEmpty() && !_requestInitCancellation)
{
// Wait with a 10ms timeout until the socket is ready to write.
// This way we are not busy looping
PollResultType result = _socket->isReadyToWrite(10);
if (result == PollResultType::Error)
{ {
while (!isSendBufferEmpty() && !_requestInitCancellation) _socket->close();
setReadyState(CLOSED);
break;
}
else if (result == PollResultType::ReadyForWrite)
{
sendOnSocket();
}
}
}
else if (pollResult == PollResultType::ReadyForRead)
{
while (true)
{
ssize_t ret = _socket->recv((char*)&_readbuf[0], _readbuf.size());
if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK ||
_socket->getErrno() == EAGAIN))
{
break;
}
else if (ret <= 0)
{
_rxbuf.clear();
_socket->close();
{ {
// Wait with a 10ms timeout until the socket is ready to write. std::lock_guard<std::mutex> lock(_closeDataMutex);
// This way we are not busy looping _closeCode = kAbnormalCloseCode;
PollResultType result = _socket->isReadyToWrite(10); _closeReason = kAbnormalCloseMessage;
_closeWireSize = 0;
if (result == PollResultType::Error)
{
_socket->close();
setReadyState(CLOSED);
break;
}
else if (result == PollResultType::ReadyForWrite)
{
sendOnSocket();
}
} }
setReadyState(CLOSED);
break;
} }
else if (pollResult == PollResultType::ReadyForRead) else
{ {
while (true) _rxbuf.insert(_rxbuf.end(),
{ _readbuf.begin(),
ssize_t ret = _socket->recv((char*)&_readbuf[0], _readbuf.size()); _readbuf.begin() + ret);
}
}
}
else if (pollResult == PollResultType::Error)
{
_socket->close();
}
else if (pollResult == PollResultType::CloseRequest)
{
_socket->close();
}
if (ret < 0 && (_socket->getErrno() == EWOULDBLOCK || // Avoid a race condition where we get stuck in select
_socket->getErrno() == EAGAIN)) // while closing.
{ if (_readyState == CLOSING)
break; {
} _socket->close();
else if (ret <= 0) }
{
_rxbuf.clear();
_socket->close();
{
std::lock_guard<std::mutex> lock(_closeDataMutex);
_closeCode = kAbnormalCloseCode;
_closeReason = kAbnormalCloseMessage;
_closeWireSize = 0;
}
setReadyState(CLOSED);
break;
}
else
{
_rxbuf.insert(_rxbuf.end(),
_readbuf.begin(),
_readbuf.begin() + ret);
}
}
}
else if (pollResult == PollResultType::Error)
{
_socket->close();
}
else if (pollResult == PollResultType::CloseRequest)
{
_socket->close();
}
// Avoid a race condition where we get stuck in select
// while closing.
if (_readyState == CLOSING)
{
_socket->close();
}
},
_pingIntervalOrTimeoutGCDSecs);
} }
bool WebSocketTransport::isSendBufferEmpty() const bool WebSocketTransport::isSendBufferEmpty() const